-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathamber_async_re.py
214 lines (189 loc) · 8.35 KB
/
amber_async_re.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import os
import amberio.ambertools as at
from amberio.amberrun import read_amber_groupfile, amberrun_from_files
from pj_async_re import async_re_job, _exit
__all__ = ['pj_amber_job', 'amber_states_from_configobj',
'extract_amber_coordinates', 'SUPPORTED_AMBER_ENGINES',
'DISANG_NAME', 'DUMPAVE_EXT'
]
# TODO?: Cuda
SUPPORTED_AMBER_ENGINES = {'AMBER': 'sander', 'SANDER': 'sander',
'AMBER-SANDER': 'sander', 'PMEMD': 'pmemd',
'AMBER-PMEMD': 'pmemd'
}
DISANG_NAME = 'restraint.RST' # hardcoded AMBER restraint file name
DUMPAVE_EXT = 'TRACE' # hardcoded file extension for restraint coordinates
def which(program):
def is_exe(filename):
return os.path.exists(filename) and os.access(filename,os.X_OK)
path,name = os.path.split(program)
if path:
if is_exe(program):
return program
for path in os.environ['PATH'].split(os.pathsep):
exe = os.path.join(path,program)
if is_exe(exe):
return exe
return None
class pj_amber_job(async_re_job):
def _checkInput(self):
async_re_job._checkInput(self)
try:
engine = str(self.keywords.get('ENGINE')).upper()
engine = SUPPORTED_AMBER_ENGINES[engine]
except KeyError:
_exit('Requested ENGINE (%s) is either invalid or not '
'currently supported.'%self.keywords.get('ENGINE'))
if int(self.keywords.get('SUBJOB_CORES')) > 1:
self.spmd = 'mpi'
engine = '%s.MPI'%engine
if ((engine == 'sander.MPI' and not at.SANDER_MPI_EXE)
or engine == 'pmemd.MPI' and not at.PMEMD_MPI_EXE):
no_exe = True
else:
no_exe = False
else:
self.spmd = 'single'
if ((engine == 'sander' and not at.SANDER_SERIAL_EXE)
or engine == 'pmemd' and not at.PMEMD_SERIAL_EXE):
no_exe = True
else:
no_exe = False
if no_exe:
_exit('Cannot find %s executable. Is it compiled and in '
'AMBERHOME/bin?'%engine)
self.exe = os.path.join(at.AMBERHOME,'bin',engine)
self.states = amber_states_from_configobj(self.keywords,self.verbose)
self.nreplicas = len(self.states)
def _buildInpFile(self, repl, state = None):
"""
For a given replica:
1) determine the current state
2) write a new mdin file (change to a restart input if cycle > 1)
3) link to a new prmtop
4) link to a new ref file (as needed)
5) link to the inpcrd from cycle = 0 if cycle = 1
"""
wdir = 'r%d'%int(repl)
if state is None:
sid = self.status[int(repl)]['stateid_current']
else:
sid = int(state)
cyc = self.status[int(repl)]['cycle_current']
title = ' replica %d : state %d : cycle %d'%(int(repl),sid,cyc)
new_state = self.states[sid]
new_state.mdin.title = title
# Modify the template as appropriate.
if cyc > 1:
new_state.restart()
if new_state.has_restraints:
rstr_file = '%s/%s'%(wdir,DISANG_NAME)
new_state.rstr.title = title
new_state.rstr.write(rstr_file)
trace_file = '%s_%d.%s'%(self.basename,cyc,DUMPAVE_EXT)
new_state.mdin.nmr_vars['DUMPAVE'] = trace_file
new_state.mdin.write_mdin('%s/mdin'%wdir)
# Links
prmtop = new_state.filenames['prmtop']
self._linkReplicaFile('prmtop',prmtop,repl)
if new_state.has_refc:
refc = new_state.filenames['ref']
self._linkReplicaFile('refc',refc,repl)
if cyc == 1:
inpcrd = new_state.filenames['inpcrd']
self._linkReplicaFile('%s_0.rst7'%self.basename,inpcrd,repl)
def _launchReplica(self, repl, cyc):
"""Launch an AMBER sub-job using pilot-job.
The input files for AMBER that define a state are assumed to be
the default names mdin, prmtop, and refc. These files are always
re-written or symlinked to in _buildInpFile().
"""
# Working directory for this replica
wdir = '%s/r%d'%(os.getcwd(),repl)
# Cycle dependent input and output file names
inpcrd = '%s_%d.rst7'%(self.basename,cyc-1)
mdout = '%s_%d.out'%(self.basename,cyc)
mdcrd = '%s_%d.nc'%(self.basename,cyc)
restrt = '%s_%d.rst7'%(self.basename,cyc)
stdout = '%s_%d.log'%(self.basename,cyc)
stderr = '%s_%d.err'%(self.basename,cyc)
args = ['-O','-c',inpcrd,'-o',mdout,'-x',mdcrd,'-r',restrt]
amber_env = ['AMBERHOME=%s'%at.AMBERHOME, 'MKL_HOME=%s'%at.MKL_HOME]
amber_env.extend(self.engine_environment)
# Compute Unit (i.e. Job) description
cpt_unit_desc = {
'executable': self.exe,
'environment': amber_env,
'arguments': args,
'output': stdout,
'error': stderr,
'working_directory': wdir,
'number_of_processes': int(self.keywords.get('SUBJOB_CORES')),
'spmd_variation': self.spmd,
}
compute_unit = self.pilotcompute.submit_compute_unit(cpt_unit_desc)
return compute_unit
def _hasCompleted(self, repl, cyc):
"""
Return true if an AMBER replica has completed a cycle.
Basically checks if the restart file exists.
"""
# TODO: Parse the output file and look for more sure signs of
# completion?
rst = 'r%d/%s_%d.rst7'%(repl,self.basename,cyc)
if os.path.exists(rst):
return async_re_job._hasCompleted(self,repl,cyc)
else:
return False
def _extractLastCoordinates(self, repl):
"""
Return a 3N list of coordinates from the last restart (rst7) file
of a given replica.
"""
cycle = self.status[repl]['cycle_current']
return extract_amber_coordinates(repl,self.basename,cycle)
def extract_amber_coordinates(replica, cycle, basename):
restrt_name = 'r%d/%s_%d.rst7'%(replica,basename,cycle)
return at.Rst7.open(restrt_name).coordinates
def amber_states_from_configobj(keywords, verbose=False):
"""Return an AmberRunCollection from an ASyncRE command file."""
# keywords = ConfigObj(command_file)
try:
engine = keywords.get('ENGINE').upper()
engine = SUPPORTED_AMBER_ENGINES[engine]
except KeyError:
_exit('Requested ENGINE (%s) is either invalid or not currently '
'supported.'%keywords.get('ENGINE'))
# Set up the general state/replica information - 2 methods
#
# (1) If present, read the AMBER groupfile and define the states,
if keywords.get('AMBER_GROUPFILE') is not None:
groupfile = keywords.get('AMBER_GROUPFILE')
states = read_amber_groupfile(groupfile,engine)
if verbose:
print ('Created %d replicas from AMBER groupfile: %s'
%(len(states),groupfile))
# (2) otherwise assume that the states can be inferred from the extfiles
# and input from a specific application (e.g. umbrella sampling).
else:
basename = keywords.get('ENGINE_INPUT_BASENAME')
extfiles = keywords.get('ENGINE_INPUT_EXTFILES')
if extfiles is not None and extfiles != '':
extfiles = extfiles.split(',')
else:
extfiles = None
nreplicas = int(keywords.get('NREPLICAS'))
if nreplicas is None:
_exit('Could not determine the replica count from the input '
'provided (set NREPLICAS directly or provide an AMBER '
'groupfile)')
try:
states = amberrun_from_files(basename,extfiles,nreplicas,'-O',
engine)
except IOError:
_exit('Problem creating replicas, not enough information?')
if verbose:
print ('Created %d replicas using the provided '
'ENGINE_INPUT_EXTFILES and ENGINE_INPUT_BASENAME'
%nreplicas)
return states