-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathac_assign.py
executable file
·222 lines (177 loc) · 7 KB
/
ac_assign.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
#!/usr/bin/env python3
"""
Script for assigning new ACs.
Asks assigner for location of flatfile, the name of the curator that needs the
AC and the purpose. Reads the IDs from the flatfile and for each ID returns a new AC
from the available_acs list, then increments AC list to the next unassigned AC. Writes info
to assigned_acs file.
"""
import argparse
import os
import re
import shutil
from datetime import date
from pathlib import Path
NUMBER_BACKUPS = 5
WORKING_PATH = Path(r"F:\ACNumbers")
BACKUP_DIR = "backup"
AVAILABLE_ACS_FILE = "available_acs.txt"
ASSIGNED_ACS_FILE = "assigned_acs.txt"
class UnknownBackUpFileError(Exception):
pass
class AssignedAvailableFilesMismatchError(Exception):
pass
class NotTxtExtensionError(Exception):
pass
class NotEnoughAcsError(Exception):
pass
class AcFlatFileMismatchError(Exception):
pass
def get_ids_from_flat_file(flatfile):
if not os.path.exists(flatfile):
raise FileNotFoundError("flatfile")
ids = []
with open(flatfile, "r", encoding="utf-8") as f:
for line in f:
if line.startswith("ID "):
ids.append(line.split()[1])
return ids
def get_backup_files(backup_path):
return list(
backup_path.glob(get_filename_with_counter(AVAILABLE_ACS_FILE, "*"))
) + list(backup_path.glob(get_filename_with_counter(ASSIGNED_ACS_FILE, "*")))
def get_backup_file_counters(files):
available_stem = get_filename_without_txt(AVAILABLE_ACS_FILE)
assigned_stem = get_filename_without_txt(ASSIGNED_ACS_FILE)
p = re.compile(
rf"(?P<assigned_or_available>{available_stem}|{assigned_stem})\((?P<counter>\d+)\)\.txt$"
)
available_counters = []
assigned_counters = []
for file in files:
m = p.match(file.name)
if m:
counter = int(m.group("counter"))
if m.group("assigned_or_available") == available_stem:
available_counters.append(counter)
else:
assigned_counters.append(counter)
else:
raise UnknownBackUpFileError(file)
available_counters = sorted(available_counters)
assigned_counters = sorted(assigned_counters)
if available_counters != assigned_counters:
raise AssignedAvailableFilesMismatchError(
f"{available_counters} ≠ {assigned_counters}"
)
return available_counters
def get_counters_to_remove(counters):
return counters[:-NUMBER_BACKUPS]
def get_filename_without_txt(filename):
extension = ".txt"
if not filename.endswith(extension):
raise NotTxtExtensionError(filename)
return filename[: -len(extension)]
def get_filename_with_counter(filename, counter):
return f"{get_filename_without_txt(filename)}({counter}).txt"
def remove_old_backup_files(counters, backup_path):
for counter in get_counters_to_remove(counters):
for file_name in [
ASSIGNED_ACS_FILE,
AVAILABLE_ACS_FILE,
]:
file_path = backup_path / get_filename_with_counter(file_name, counter)
file_path.unlink()
def backup_files(working_path):
backup_path = working_path / BACKUP_DIR
if not backup_path.exists():
raise FileNotFoundError(backup_path)
files = get_backup_files(backup_path)
counters = get_backup_file_counters(files)
next_counter = counters[-1] + 1 if counters else 1
counters.append(next_counter)
shutil.copy2(
working_path / AVAILABLE_ACS_FILE,
backup_path / get_filename_with_counter(AVAILABLE_ACS_FILE, next_counter),
)
shutil.copy2(
working_path / ASSIGNED_ACS_FILE,
backup_path / get_filename_with_counter(ASSIGNED_ACS_FILE, next_counter),
)
remove_old_backup_files(counters, backup_path)
def read_available_acs_file(available_acs_file):
with open(available_acs_file, "r", encoding="utf-8") as f:
return f.read().splitlines()
def partition_available_acs(available_acs, flatfile_entry_ids):
n_flatfile_entry_ids = len(flatfile_entry_ids)
if len(available_acs) < n_flatfile_entry_ids:
raise NotEnoughAcsError
new_acs = available_acs[:n_flatfile_entry_ids]
rest_acs = available_acs[n_flatfile_entry_ids:]
return new_acs, rest_acs
def generate_assigned_acs_lines(new_acs, flatfile_entry_ids, today, user, comment):
if len(new_acs) != len(flatfile_entry_ids):
raise AcFlatFileMismatchError
for new_ac, flatfile_entry_id in zip(new_acs, flatfile_entry_ids):
print(f"{new_ac}\t{flatfile_entry_id}")
yield " ".join([today, new_ac, flatfile_entry_id, user, comment])
def ac_assign(flatfile, comment, working_dir, today, user):
flatfile_entry_ids = get_ids_from_flat_file(flatfile)
working_path = Path(working_dir)
if not working_path.exists():
raise FileNotFoundError("working path")
backup_files(working_path)
available_acs_file = working_path / AVAILABLE_ACS_FILE
available_acs = read_available_acs_file(available_acs_file)
new_acs, rest_acs = partition_available_acs(available_acs, flatfile_entry_ids)
assigned_acs_file = working_path / ASSIGNED_ACS_FILE
with open(assigned_acs_file, "a+", encoding="utf-8") as f:
print("Assigned accessions:")
for line in generate_assigned_acs_lines(
new_acs, flatfile_entry_ids, today, user, comment
):
print(line, file=f)
with open(available_acs_file, "w", encoding="utf-8") as f:
for ac in rest_acs:
print(ac, file=f)
print(f"There are {len(rest_acs)} ACs left in {AVAILABLE_ACS_FILE}")
def get_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
"--flatfile", "-f", required=True, type=str, help="Flat file path"
)
parser.add_argument(
"--comment",
"-c",
required=True,
type=str,
help="Curator name and purpose e.g. For Bobs curation work",
)
args = parser.parse_args()
return args.flatfile.strip(), args.comment.strip()
def main():
flatfile, comment = get_arguments()
today = date.today().strftime("%d/%m/%y")
user = os.getlogin()
try:
ac_assign(flatfile, comment, WORKING_PATH, today, user)
except UnknownBackUpFileError as err:
print(
f"Unknown file in backups detected: {err.args[0]}\n\nPlease remove before proceeding"
)
except AssignedAvailableFilesMismatchError as err:
print(
f"Mismatch between assigned and available backup files. Found these versions: {err.args[0]}"
)
except FileNotFoundError as err:
print(f"Could not find {err.args[0]}. Check location/path is correct.")
except NotTxtExtensionError as err:
print(f"Expected .txt extension for {err.args[0]}")
except NotEnoughAcsError:
print("There aren't enough available accessions for provided flat file")
except AcFlatFileMismatchError:
print(
"Sometheing went wrong as there aren't an equal number of accessions to assign and the number accessions in the flat file"
)
if __name__ == "__main__":
main()