Source code for polypy.read

"""
Read functions of `polypy`. Herein contains classes to read DL_POLY HISTORY
/ CONFIG files and DL_MONTE ARCHIVE files. All of the data that is
extracted from these files is stored in a trajectory class that is compatible
with all three file types.
"""

# Copyright (c) Adam R. Symington
# Distributed under the terms of the MIT License
# author: Adam R. Symington

import os as os
import numpy as np
from polypy import utils as ut


[docs]class Trajectory(): """ The :py:class:`polypy.read.Trajectory` class evaluates the positions of all atoms in the simulation. Args: atom_list (:py:class:`list`): List of unique atom names in trajectory. datatype (:py:attr:`str`): Datatype of the original dataset e.g. DL_POLY HISTORY or CONFIG. """ def __init__(self, atom_list, datatype): self.atom_list = atom_list self.data_type = datatype self.cartesian_trajectory = [] self.fractional_trajectory = [] self.reciprocal_lv = [] self.atom_name = [] self.lv = [] self.cell_lengths = [] self.atoms_in_history = 0 self.timesteps = 0 self.total_atoms = 0 self.atoms_at_timestep = [] self.record_number = [] self.time = [] self.simulation_timestep = None def _clean_data(self): """ Converts the data from lists / floats to numpy arrays / integers. """ self.cartesian_trajectory = np.asarray(self.cartesian_trajectory, dtype=float) self.fractional_trajectory = np.asarray(self.fractional_trajectory, dtype=float) self.atom_name = np.asarray(self.atom_name, dtype=str) self.lv = np.asarray(self.lv, dtype=float) self.reciprocal_lv = np.asarray(self.reciprocal_lv, dtype=float) self.cell_lengths = np.asarray(self.cell_lengths, dtype=float) self.timesteps = int(self.timesteps) self.total_atoms = int(self.total_atoms) if self.data_type == "DL_POLY HISTORY": self.simulation_timestep = (self.record_number[1] - self.record_number[0]) * self.time[0]
[docs] def get_config(self, timestep): """ Isolates a specific DL_POLY CONFIG from a HISTORY file. Args: timestep (:py:class:`int`): Timestep of desired CONFIG. Returns: config_trajectory (:py:class:`polypy.read.Trajectory`): Trajectory object for desired CONFIG. """ if self.data_type == "DL_POLY CONFIG": raise ValueError("Only one timestep was found") config_trajectory = Trajectory(self.atom_list, "DL_POLY CONFIG") config_trajectory.cartesian_trajectory = np.split( self.cartesian_trajectory, self.timesteps)[timestep] config_trajectory.fractional_trajectory = np.split( self.fractional_trajectory, self.timesteps)[timestep] config_trajectory.atom_name = np.split(self.atom_name, self.timesteps)[timestep] config_trajectory.reciprocal_lv = self.reciprocal_lv[timestep] config_trajectory.lv = self.lv[timestep] config_trajectory.cell_lengths = np.split(self.cell_lengths, self.timesteps)[timestep] config_trajectory.atoms_in_history = self.total_atoms config_trajectory.record_number = self.record_number[timestep] config_trajectory.time = self.time[timestep] config_trajectory.simulation_timestep = self.simulation_timestep config_trajectory.timesteps = 1 config_trajectory.total_atoms = self.total_atoms return config_trajectory
[docs] def get_atom(self, atom): """ Isolates the trajectory for a specific atom type. Args: atom (:py:class:`str`): Atom label. Returns: atom_trajectory (:py:class:`polypy.read.Trajectory`): Trajectory object for desired atom. """ if np.unique(self.atom_list).size == 1: raise ValueError("Only one atom type was found") atom_trajectory = Trajectory([atom], self.data_type) for i in range(0, self.atom_name.size): if self.atom_name[i] == atom: atom_trajectory.cartesian_trajectory.append( self.cartesian_trajectory[i]) atom_trajectory.fractional_trajectory.append( self.fractional_trajectory[i]) atom_trajectory.atom_name.append(self.atom_name[i]) atom_trajectory.atoms_in_history = atom_trajectory.atoms_in_history + 1 atom_trajectory.reciprocal_lv = self.reciprocal_lv atom_trajectory.lv = self.lv atom_trajectory.cell_lengths = self.cell_lengths atom_trajectory.timesteps = self.timesteps atom_trajectory.total_atoms = atom_trajectory.atoms_in_history / self.timesteps atom_trajectory.time = self.time atom_trajectory.record_number = self.record_number atom_trajectory.timesteps = 1 atom_trajectory._clean_data() return atom_trajectory
[docs] def remove_initial_timesteps(self, timesteps_to_exclude): """ Removes timesteps from the beggining of a simulation Args: timesteps_to_exclude (:py:class:`int`): Number of timesteps to exclude Returns: new_trajectory (:py:class:`polypy.read.Trajectory`): Trajectory object. """ rows_to_exclude = timesteps_to_exclude * self.total_atoms if self.data_type == "DL_POLY CONFIG": raise ValueError("Only one timestep was found") new_trajectory = Trajectory(self.atom_list, self.data_type) new_trajectory.cartesian_trajectory = self.cartesian_trajectory[rows_to_exclude:] new_trajectory.fractional_trajectory = self.fractional_trajectory[rows_to_exclude:] new_trajectory.atom_name = self.atom_name new_trajectory.reciprocal_lv = self.reciprocal_lv[timesteps_to_exclude:] new_trajectory.lv = self.lv[timesteps_to_exclude:] new_trajectory.cell_lengths = self.cell_lengths[timesteps_to_exclude:] new_trajectory.atoms_in_history = self.atoms_in_history - (self.total_atoms * timesteps_to_exclude) new_trajectory.record_number = self.record_number[timesteps_to_exclude:] new_trajectory.time = self.time[timesteps_to_exclude:] new_trajectory.simulation_timestep = self.simulation_timestep new_trajectory.timesteps = self.timesteps - timesteps_to_exclude new_trajectory.total_atoms = self.total_atoms return new_trajectory
[docs] def remove_final_timesteps(self, timesteps_to_exclude): """ Removes timesteps from the end of a simulation Args: timesteps_to_exclude (:py:class:`int`): Number of timesteps to exclude Returns: new_trajectory (:py:class:`polypy.read.Trajectory`): Trajectory object. """ rows_to_exclude = timesteps_to_exclude * self.total_atoms if self.data_type == "DL_POLY CONFIG": raise ValueError("Only one timestep was found") new_trajectory = Trajectory(self.atom_list, self.data_type) new_trajectory.cartesian_trajectory = self.cartesian_trajectory[:rows_to_exclude] new_trajectory.fractional_trajectory = self.fractional_trajectory[:rows_to_exclude] new_trajectory.atom_name = self.atom_name new_trajectory.reciprocal_lv = self.reciprocal_lv[:timesteps_to_exclude] new_trajectory.lv = self.lv[:timesteps_to_exclude] new_trajectory.cell_lengths = self.cell_lengths[:timesteps_to_exclude] new_trajectory.atoms_in_history = self.atoms_in_history - (self.total_atoms * timesteps_to_exclude) new_trajectory.record_number = self.record_number[:timesteps_to_exclude] new_trajectory.time = self.time[:timesteps_to_exclude] new_trajectory.simulation_timestep = self.simulation_timestep new_trajectory.timesteps = self.timesteps - timesteps_to_exclude new_trajectory.total_atoms = self.total_atoms return new_trajectory
[docs]class History(): """ The :py:class:`polypy.read.Trajectory` class evaluates the positions of all atoms in the simulation. Args: atom_list (:py:class:`list`): List of unique atom names in trajectory. datatype (:py:attr:`str`): Datatype of the original dataset e.g. DL_POLY HISTORY. """ def __init__(self, file, atom_list): self.file = file if os.path.isfile(self.file) is False: raise ValueError("File does not exist") self.atom_list = atom_list self.data_type = "DL_POLY HISTORY" self.trajectory = Trajectory(self.atom_list, self.data_type) self.read_history() self.trajectory.total_atoms = self.trajectory.atoms_in_history / self.trajectory.timesteps self._check_data() self.trajectory._clean_data()
[docs] def read_history(self): """ Reads a DL_POLY HISTORY file line by line and updates a :py:class:`polypy.read.Trajectory` object. """ c = 0 name = False tstep = False current_lv = [] history = open(self.file, 'r') for line in history: split_line = line.split() if c == 3: c = 0 tstep = False current_lv = np.asarray(current_lv, dtype=float) rcplvs, length = ut.calculate_rcplvs(current_lv) self.trajectory.cell_lengths.append(length) self.trajectory.lv.append(current_lv) self.trajectory.reciprocal_lv.append(rcplvs) current_lv = [] if c < 3 and tstep is True: if c == 4: current_lv.insert(0, split_line) else: current_lv.append(split_line) c = c + 1 if name: name = False self.trajectory.cartesian_trajectory.append(split_line) frac = np.matmul(rcplvs, np.asarray(split_line, dtype=float)) frac = np.mod(frac, 1) self.trajectory.fractional_trajectory.append(frac) if split_line[0] in self.atom_list: self.trajectory.atom_name.append(split_line[0]) name = True self.trajectory.atoms_in_history = self.trajectory.atoms_in_history + 1 if split_line[0] == "timestep": self.trajectory.timesteps = self.trajectory.timesteps + 1 tstep = True self.trajectory.record_number.append(float(split_line[1])) self.trajectory.time.append(float(split_line[5])) history.close()
def _check_data(self): """ Error handling function. """ if self.trajectory.total_atoms == int(self.trajectory.total_atoms) is False: raise ValueError("The total number of atoms is not constant across each timestep") if self.trajectory.total_atoms == 0: raise ValueError("No Atoms of specified type exist within the HISTORY file")
[docs]class Config: """ The :py:class:`polypy.read.Trajectory` class evaluates the positions of all atoms in a CONFIG. Args: atom_list (:py:class:`list`): List of unique atom names in trajectory. datatype (:py:attr:`str`): Datatype of the original dataset e.g. DL_POLY CONFIG. """ def __init__(self, file, atom_list): self.file = file if os.path.isfile(self.file) is False: raise ValueError("File does not exist") self.atom_list = atom_list self.data_type = "DL_POLY CONFIG" self.trajectory = Trajectory(self.atom_list, self.data_type) self.read_config() self.trajectory.timesteps = 1 self._check_data() self.trajectory._clean_data()
[docs] def read_config(self): """ Read a DL_POLY HISTORY file line by line and updates a :py:class:`polypy.read.Trajectory` object. """ config = open(self.file, 'r') name = False title = config.readline() stuff = config.readline() lv = [] for i in range(0, 3): lline = config.readline() lv.append(lline.split()) lv = np.asarray(lv, dtype=float) rcplvs, length = ut.calculate_rcplvs(lv) self.trajectory.cell_lengths.append(length) self.trajectory.reciprocal_lv.append(rcplvs) for line in config: split_line = line.split() if name: name = False self.trajectory.cartesian_trajectory.append(split_line) frac = np.matmul(rcplvs, np.asarray(split_line, dtype=float)) frac = np.mod(frac, 1) self.trajectory.fractional_trajectory.append(frac.flatten()) if split_line[0] in self.atom_list: self.trajectory.atom_name.append(split_line[0]) name = True self.trajectory.total_atoms = self.trajectory.total_atoms + 1 self.trajectory.atoms_in_history = self.trajectory.atoms_in_history + 1 config.close()
def _check_data(self): """ Error handling function. """ if self.trajectory.total_atoms == 0: raise ValueError("No Atoms of specified type exist within the CONFIG file")
[docs]class Archive(): """ The :py:class:`polypy.read.Trajectory` class evaluates the positions of all atoms in a ARCHIVE. Args: atom_list (:py:class:`list`): List of unique atom names in trajectory. datatype (:py:attr:`str`): Datatype of the original dataset e.g. DL_MONTE ARCHIVE. """ def __init__(self, file, atom_list): self.file = file if os.path.isfile(self.file) is False: raise ValueError("File does not exist") self.atom_list = atom_list self.data_type = "DL_MONTE ARCHIVE" self.trajectory = Trajectory(self.atom_list, self.data_type) self.read_archive() self.trajectory._clean_data()
[docs] def read_archive(self): """ Read a DL_MONTE ARCHIVE file line by line and updates a :py:class:`polypy.read.Trajectory` object. """ count = 0 lv_count = 0 timestep_atom_count = 0 skipline = 1 current_lv = [] atom_name_encountered = False timestep = True archive = open(self.file, 'r') config_label = archive.readline().split() self.trajectory.timesteps = self.trajectory.timesteps + 1 for line in archive: split_line = line.split() if lv_count == 3: current_lv = np.asarray(current_lv, dtype=float) rcplvs, length = ut.calculate_rcplvs(current_lv) self.trajectory.cell_lengths.append(length) self.trajectory.lv.append(current_lv) self.trajectory.reciprocal_lv.append(rcplvs) lv_count = 0 skipline = 0 timestep = False current_lv = [] if lv_count < 3 and timestep is True and skipline == 2: current_lv.append(split_line) lv_count = lv_count + 1 if atom_name_encountered: atom_name_encountered = False self.trajectory.cartesian_trajectory.append(split_line[:3]) frac = np.matmul(rcplvs, np.asarray(split_line[:3], dtype=float)) frac = np.mod(frac, 1) self.trajectory.fractional_trajectory.append(frac) timestep_atom_count = timestep_atom_count + 1 if split_line[0] in self.atom_list: self.trajectory.atom_name.append(split_line[0]) atom_name_encountered = True count = count + 1 if split_line[0] == config_label[0]: self.trajectory.timesteps = self.trajectory.timesteps + 1 timestep = True skipline = 1 self.trajectory.atoms_at_timestep.append(timestep_atom_count) self.trajectory.record_number.append(self.trajectory.timesteps) self.trajectory.time.append(self.trajectory.timesteps) timestep_atom_count = 0 elif timestep is True: skipline = 2 archive.close()