-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmaster_runner.py
292 lines (234 loc) · 9.38 KB
/
master_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
#!/usr/bin/env python
import sys
# rely on short circuit evaluation so that we don't trip on Python 4.3 or such
if sys.version_info.major < 3 or sys.version_info.minor < 6:
sys.exit("Requires Python 3.6+ to run")
import os
import datetime
import argparse
import pprint
# for submitting slurm jobs
import subprocess
# for initializing dir
import uuid
import shutil
# for mail opts
import getpass
import grp
SBATCH_DEFAULT_EXECUTABLE = '/usr/bin/sbatch'
def main():
# get time range or manual data sources
args = get_parser().parse_args()
# Get SHTC files
if args.shtc_files:
filenames = args.shtc_files
elif args.start_time and args.end_time:
# get shtc from web
filenames = download_shtc_data(args.start_time, args.end_time)
# one of start and end time was specified, but not both
elif args.start_time or args.end_time:
print("-s/--start-time and -e/--end-time must be used together",
file=sys.stderr)
sys.exit(1)
else:
# error shtc_files or (start_time, end_time) not specified
print("Neither --shtc-files or "
"(-s/--start-time, -e/--end-time) were specified",
file=sys.stderr)
sys.exit(1)
# XXX not necessary anymore
## make sure you're given a url to get shtc from web
## shtc_url = os.getenv('SHTC_URL')
## if shtc_url is None:
## sys.exit("SHTC_URL not provided")
## shtc_filename = os.getenv('SHTC_FILENAME', default=f'{dir_name}/shtc.dat')
for shtc_fn in filenames:
# create and initialize directory
dir_name = init_dir(shtc_fn, skeleton_root=args.skeleton_dir)
# Generate mag data
# run maggrid_omp to generate maggrid.dat
if args.maggrid_file:
maggrid_file = args.maggrid_file
else:
maggrid_file = f'{dir_name}/maggrid.dat'
maggrid_jobid = submit_sbatch_job(
'maggrid.sbatch',
dir_name=dir_name,
sbatch_executable=args.sbatch_exec,
env_vars={
'SHTC_FILE': dir_name + '/' + os.path.basename(shtc_fn),
'MAGGRID_FILE': maggrid_file,
}
)
print(f"Running maggrid_omp as job {maggrid_jobid}")
# run mapb2s to get b1rs.dat
mapb2s_jobid = submit_sbatch_job(
'mapb2s.sbatch',
dir_name=dir_name,
sbatch_executable=args.sbatch_exec,
dependency=maggrid_jobid,
env_vars={
'MAGGRID_FILE': maggrid_file,
'B1RS_FILE': f'{dir_name}/b1rs.dat'
})
print(f"Queued mapb2s as job {mapb2s_jobid}")
# run combiner to combine the two
# NOTE combiner is obsolete
#combiner_jobid = submit_sbatch_job(
# 'combiner.sbatch', dependency=mapb2s_jobid,
# dir_name=dir_name,
# sbatch_executable=args.sbatch_exec,
# env_vars={
# 'COMBINER_B1RS_INFILENAME': f'{dir_name}/b1rs.dat',
# 'COMBINER_MAGGRID_INFILENAME': maggrid_file,
# 'COMBINER_MAGGRID_OUTFILENAME': f'{dir_name}/maggrid_combined.dat'
# }
#)
#print(f"Queued combiner as job {combiner_jobid}")
# Generation of mag data over
# Generate seeds
# Run main code
tt1s = [0, 60, 360, 1440]
for time in tt1s:
sim3d_env = {
'B1RS_FILE': f'{dir_name}/b1rs.dat',
'MAGGRID_FILE': maggrid_file,
'ROOT_DIR': dir_name,
'tt1': str(time)
}
if os.getenv('CME_DATA_FILE'):
sim3d_env['CME_DATA_FILE'] = os.getenv('CME_DATA_FILE')
sim3d_jobid = submit_sbatch_job(
'sim3d.sbatch',
#dependency=combiner_jobid,
dependency=mapb2s_jobid,
dir_name=dir_name,
sbatch_executable=args.sbatch_exec,
env_vars=sim3d_env
)
print(f"Submitted sim3d as job {sim3d_jobid}")
def get_parser():
root_parser = argparse.ArgumentParser()
dl_opts_group = root_parser.add_argument_group('download options')
dl_opts_group.add_argument(
'-s', '--start-time',
help='Starting period of search duration. Requires --end-time. '
'Passed to sunpy.net.attrs.Time, so the string must '
'conform to its initializer.')
dl_opts_group.add_argument(
'-e', '--end-time',
help='Ending period of search duration. Requires --start-time. '
'Passed to sunpy.net.attrs.Time, so the string must '
'conform to its initializer.')
root_parser.add_argument(
'--shtc-files', nargs='+',
help='Specify the location of the shtc files. '
'To be used if download through sunpy is not possible. '
'If set, --start-time and --end-time are ignored.')
root_parser.add_argument(
'--sbatch-exec', default=SBATCH_DEFAULT_EXECUTABLE,
help='Alternative sbatch binary executable')
root_parser.add_argument(
'--skeleton-dir', default='./skeleton-dir/',
help='Alternative path to skeleton directory')
root_parser.add_argument(
'--maggrid-file',
help='Pre-generated maggrid file (skips running maggrid)')
return root_parser
def submit_sbatch_job(script_fn: str,
dir_name: str,
dependency: int = None,
env_vars: {str: str} = None,
sbatch_executable: str = SBATCH_DEFAULT_EXECUTABLE) -> int:
mail_options = get_mail_opts()
log_options = get_logging_opts(dir_name)
args = [sbatch_executable, '--parsable', *mail_options, *log_options]
if dependency is not None:
args.append(f"--dependency=afterok:{dependency}")
args.append(script_fn)
if env_vars is None:
env_vars = {"PATH": os.getenv("PATH")}
else:
env_vars["PATH"] = os.getenv("PATH")
#print("submit_sbatch_job:{args = }")
proc = subprocess.Popen(args, stdout=subprocess.PIPE, env=env_vars)
jobid, _ = proc.communicate()
return int(jobid.decode().strip())
def download_shtc_data(start_time, end_time) -> [str]:
# for downloading data
try:
from sunpy.net import Fido, attrs as a
import gong_shtc
except ImportError as ex:
print("Could not import sunpy:", file=sys.stderr)
print(ex, file=sys.stderr)
print("Please provide the SHTC data files manually with the "
"--shtc-files flag", file=sys.stderr)
raise
# search for the data
results = Fido.search(a.Time(start_time, end_time),
a.Instrument('GONG_SHTC'))
print(f'Found {results.file_num} in time range {start_time} to {end_time}')
# download the data
downloaded_files = Fido.fetch(results)
return downloaded_files
def init_dir(shtc_path: str, skeleton_root: str) -> str:
# each run gets a different id to run under
# sth sth uuid
#data_dir = 'soho'
# everything is contained in a directory named (time now + random hex string)
dir_name = (
datetime.datetime.now().strftime('%Y%m%d-T%H%M%S')
+ '-'
+ uuid.uuid4().hex)
print(f'Initializing directory {dir_name} for {shtc_path}')
# make directory for this shtc file to live in
os.mkdir(dir_name)
print(f'Created directory {dir_name} for {shtc_path}')
shutil.copy(shtc_path, dir_name)
# copy over contents from skeleton directory:
# skeleton directory
# |- shtc file
# |- ...
# |- run-1/ # (for sim3d)
# | |- flux.dat
# | |- files.nml
# | |- loadptcl.dat
# | `- other stuff
# |- run-2/ # (for sim3d)
# | |- flux.dat
# | |- files.nml
# | |- loadptcl.dat
# | `- other stuff
# `- run-3/ # (for sim3d)
# | |- flux.dat
# | |- files.nml
# | |- loadptcl.dat
# `- other stuff
shutil.copytree(src=skeleton_root, dst=dir_name, dirs_exist_ok=True)
print(f'Finished initializing directory "{dir_name}" for {shtc_path}')
return dir_name
def get_mail_opts() -> (str, str):
username = getpass.getuser()
#student_gid = grp.getgrnam('student').gr_gid # bottleneck
#faculty_gid = grp.getgrnam('faculty').gr_gid # bottleneck
#staff_gid = grp.getgrnam('staff').gr_gid # bottleneck
student_gid = grp.getgrnam('student@fit.edu').gr_gid # bottleneck
faculty_gid = grp.getgrnam('faculty@fit.edu').gr_gid # bottleneck
staff_gid = grp.getgrnam('staff@fit.edu').gr_gid # bottleneck
pgid = os.getgid()
if pgid == student_gid: # user is a student
domain = 'my.fit.edu'
elif pgid == faculty_gid or pgid == staff_gid: # user is faculty or staff
domain = 'fit.edu'
username = username.replace('@fit.edu','')
email_address = f"{username}@{domain}"
mail_frequency = "all,time_limit,array_tasks"
mail_options = (f"--mail-type={mail_frequency}",
f"--mail-user={email_address}")
return mail_options
def get_logging_opts(dir_name) -> (str, str):
return ('--error=' + dir_name + '/%x.%J.err.txt',
'--output=' + dir_name + '/%x.%J.out.txt')
if __name__ == '__main__':
main()