-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConfComparer.py
398 lines (341 loc) · 17.3 KB
/
ConfComparer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
#!/usr/bin/python
import argparse
import csv
import os
import sys
from enum import Enum, auto
import mmap
import logging
from pathlib import Path
from shutil import which
import subprocess
import tempfile
from typing import Callable, Dict, List, Optional
##############################
# CONSTANTS AND CUSTOM TYPES #
##############################
# supported extensions in this list must be UPPERCASE
_SUPPORTED_EXTENSIONS: List[str] = [".PDB"]
##############
# EXCEPTIONS #
##############
class ConfComparerError(Exception):
"""General exception for this project"""
################
# HELPER STUFF #
################
class MoleculeNameCase(Enum):
NO_CHANGE = auto()
LOWER = auto()
UPPER = auto()
_CASE_TRANSFORMING_FUNCTIONS: Dict[MoleculeNameCase, Callable[[str], str]] = {
MoleculeNameCase.NO_CHANGE: (lambda s: s),
MoleculeNameCase.LOWER: str.lower,
MoleculeNameCase.UPPER: str.upper
}
########################
# RUNTIME DATA STORAGE #
########################
class _Storage:
_script_folder = Path(__file__).resolve().parent
templates_dir: Path = _script_folder.joinpath("templates")
input_dir: Path = _script_folder.joinpath("input")
output_dir: Path = _script_folder.joinpath("output")
SB_executable_path: Path = _script_folder.joinpath("SB_batch", "SiteBinderCMD.exe")
use_mono: bool = False
tolerance: float = 1.0
print_list = False
print_RMSD_chart = False
print_summary = True
#################################
# ESSENTIAL CLASSES AND OBJECTS #
#################################
class MoleculeBase:
"""
Represents any molecule, carries info about name and path to source file
"""
def __init__(self,
molecule_file_path: Optional[Path],
name_case: MoleculeNameCase = MoleculeNameCase.NO_CHANGE):
self._case_transforming_func = _CASE_TRANSFORMING_FUNCTIONS[name_case]
self.path = self.extension = None
self.name = "none"
self.ligand = ""
if molecule_file_path is None:
return
path = molecule_file_path.resolve()
ext = path.suffix
if path and ext.upper() not in _SUPPORTED_EXTENSIONS:
raise ConfComparerError(f"file '{path}': '{ext}' is not a supported file extension")
# SB uses file name without extension(s) as molecule name
self.name = path.name[:-len(ext)]
self.path = str(path)
self.extension = ext
def get_name(self) -> str:
return self._case_transforming_func(self.name)
@classmethod
def get_ligand_from_pdb(cls, molecule_path: Path) -> str:
with molecule_path.open("rb") as f:
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as s:
atom_record_start = s.find(b"HETATM")
if atom_record_start == -1:
atom_record_start = s.find(b"ATOM")
if atom_record_start == -1:
return ""
s.seek(atom_record_start + 17)
return s.read(3).decode()
class Conformation(MoleculeBase):
"""Represents particular conformation, carries info about name and path to template."""
_NoneConformation: Optional["Conformation"] = None
def __init__(self, template_file_path: Optional[Path]):
super().__init__(template_file_path, MoleculeNameCase.UPPER)
@classmethod
def get_none_conformation(cls) -> "Conformation":
if cls._NoneConformation is None:
cls._NoneConformation = cls(None)
return cls._NoneConformation
class Molecule(MoleculeBase):
"""
Represents particular molecule, carries info about name, path to source file and computed
RMSD values for various templates.
"""
def __init__(self, molecule_file_path: Optional[Path]):
super().__init__(molecule_file_path)
if self.extension.upper() == ".PDB" and molecule_file_path:
self.ligand = self.get_ligand_from_pdb(molecule_file_path)
self.conformation_map: Dict[str, float] = {}
@property
def conformation(self) -> str:
try:
conf, *_ = {conf: rmsd for conf, rmsd in sorted(self.conformation_map.items(),
key=lambda item: item[1])
if rmsd <= _Storage.tolerance}
return conf
except ValueError:
return Conformation.get_none_conformation().get_name()
def update_conformation_map(self, conformation_name: str, rmsd: float):
self.conformation_map[conformation_name] = rmsd
##############
# CORE LOGIC #
##############
class Analysis:
def __init__(self):
self._prepare_environment()
self.conformations: Dict[str, Conformation] = dict()
self.molecules: Dict[str, Molecule] = dict()
self._expected_result_header = '"File","Rmsd","MatchedCount","ASize","BSize"'
self.result_list_path = _Storage.output_dir.joinpath("results_individual_molecules.txt")
self.result_rmsd_chart_path = _Storage.output_dir.joinpath("result_rmsd_chart.csv")
self.result_summary_path = _Storage.output_dir.joinpath("result_summary.txt")
def execute(self):
# initialize conformations
logging.info("Loading templates...")
for f in _Storage.templates_dir.rglob("*"):
if f.suffix.upper() not in _SUPPORTED_EXTENSIONS:
continue
conf = Conformation(f.resolve())
self.conformations[conf.get_name()] = conf
logging.info("Templates loaded.")
# initialize molecules
logging.info("Loading molecules...")
for f in _Storage.input_dir.rglob("*"):
if f.suffix.upper() not in _SUPPORTED_EXTENSIONS:
continue
mol = Molecule(f.resolve())
self.molecules[mol.get_name()] = mol
logging.info("Molecules loaded.")
# work in a temporary folder where all the intermediate files can be stored during
# the execution
with tempfile.TemporaryDirectory() as tmp_dir:
for conf_name, conf in self.conformations.items():
logging.info(f"Starting SB analysis of conformation {conf_name}...")
conf_folder = Path(tmp_dir).joinpath(conf_name)
conf_folder.mkdir(mode=0o777)
# create list of paths of molecules to be tested, add path of template on the first
# line (it will be used as reference and discarded by SiteBinder)
conf_input_file = conf_folder.joinpath(f"sb_input.list")
logging.info(f"Generating input file {conf_input_file}...")
with conf_input_file.open("w") as f:
for file_path in [conf.path] + [m.path for m in self.molecules.values()]:
f.write(f"{file_path}\n")
# Usage: SiteBinderCMD input.txt rmsd.csv pairing.csv
rmsd_output_file = conf_folder.joinpath(f"sb_output_rmsd.csv")
pairing_output_file = conf_folder.joinpath(f"sb_output_pairing.csv")
command_args = [arg for arg
in (_Storage.crossplatform_runner, _Storage.SB_executable_path,
conf_input_file, rmsd_output_file, pairing_output_file)
if arg]
logging.info(f"Starting SiteBinder...")
sb_process = subprocess.Popen(command_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=False)
sb_process.wait()
stdout, stderr = [stream.decode().strip() for stream in sb_process.communicate()]
sb_process.stdout.close()
if stdout or stderr or sb_process.returncode != 0:
logging.error(f"Error within processing conformation '{conf_name}', SiteBinder message:"
f"\nstdout: {stdout}\nstderr: {stderr}\n---")
sys.exit(1)
# process results for this conformation
with rmsd_output_file.open("r") as f:
# first line is a header
if f.readline().strip() != self._expected_result_header:
raise ConfComparerError(f"Result file '{rmsd_output_file}' is not in "
f"expected format")
# rest of the file are actual data, one entry per line
for entry in list(csv.reader(f)):
self._process_entry(conf_name, entry)
logging.info(f"Analysis of conformation {conf_name} finished.")
self._generate_result_files()
print(file=sys.stderr)
self._print_results()
def _generate_result_files(self):
# simple list of molecules and theirs conformations
with self.result_list_path.open("w") as f:
f.writelines(
[f"[{molecule.ligand or 'UNKNOWN'}] {name}: {molecule.conformation}\n"
for name, molecule in self.molecules.items()]
)
# complex CSV table of RMSD values of every molecule with every conformation template
with self.result_rmsd_chart_path.open("w") as f:
# header with conformation names
f.write("Ligand_name;Ring_ID;")
for conf_name, _ in self.conformations.items():
f.write(f"{conf_name};")
f.write("\n")
# individual molecules
for name, molecule in self.molecules.items():
f.write(f"{molecule.ligand};{name};")
# use conf name from self.conformations instead of molecule.conformation map
# because of header order
for conf_name, _ in self.conformations.items():
f.write(f"{molecule.conformation_map.get(conf_name, 'NaN')};")
f.write("\n")
# summary
# get number of occurrences for every conformation
conf_absolute_numbers = {}
for molecule in self.molecules.values():
conf = molecule.conformation
conf_absolute_numbers[conf] = conf_absolute_numbers.get(conf, 0) + 1
# get total number of molecules
total_num = len(self.molecules)
# generate summary
with self.result_summary_path.open("w") as f:
for conf_name in list(self.conformations) + [
Conformation.get_none_conformation().get_name()]:
f.write(f"{conf_name:<13} {conf_absolute_numbers.get(conf_name, 0): 4} "
f"{conf_absolute_numbers.get(conf_name, 0) * 100 / total_num:.2f}%\n")
f.write(f"{'TOTAL':<13} {total_num: 4} 100%")
def _print_results(self):
if _Storage.print_list:
print(self.result_list_path.read_text())
if _Storage.print_RMSD_chart:
print(self.result_rmsd_chart_path.read_text())
if _Storage.print_summary:
print(self.result_summary_path.read_text())
def _process_entry(self, conf_name: str, entry: List[str]) -> None:
if len(entry) == 5:
name, rmsd, matched_count, a_size, b_size = entry
elif len(entry) == 6:
# SB uses simple comma as both CSV separator and decimal point, what is
# incredibly confusing...
name, rmsd_1, rmsd_2, matched_count, a_size, b_size = entry
rmsd = f"{rmsd_1}.{rmsd_2}"
else:
logging.warning(f"Skipping entry '{entry}' as the list contains unexpected number "
f"of members that cannot be matched to expected header "
f"'{self._expected_result_header}'")
return
if int(matched_count) != int(a_size) != int(b_size):
logging.warning(f"Molecule '{name}' is not of the same type as template "
f"of conformation '{conf_name}' (they probably differ in "
f"size or contain incompatible atoms.")
try:
self.molecules[name].update_conformation_map(conf_name, float(rmsd))
except KeyError:
logging.warning(f"Attempt to update unknown molecule '{name}' with RMSD value '{rmsd}' "
f"for conformation '{conf_name}'")
@staticmethod
def _prepare_environment():
error_messages: List[str] = []
# input and template directories must exist as well as SB executable
if not _Storage.input_dir.is_dir():
msg = f"Input directory '{_Storage.input_dir}' was not found."
error_messages.append(msg)
logging.error(msg)
if not _Storage.templates_dir.is_dir():
msg = f"Templates directory '{_Storage.templates_dir}' was not found."
error_messages.append(msg)
logging.error(msg)
if not _Storage.SB_executable_path.is_file():
msg = f"SB executable '{_Storage.SB_executable_path}' not found."
error_messages.append(msg)
logging.error(msg)
if _Storage.crossplatform_runner and not which(_Storage.crossplatform_runner):
msg = f"'{_Storage.crossplatform_runner}' command unavailable."
error_messages.append(msg)
logging.error(msg)
# output directory can already exist or be created
try:
_Storage.output_dir.mkdir(mode=0o777, parents=True, exist_ok=True)
except (OSError, FileNotFoundError):
msg = "Output directory was not found and cannot be created."
error_messages.append(msg)
logging.error(msg)
if error_messages:
raise ConfComparerError(" ".join(error_messages))
if __name__ == "__main__":
# parse command line arguments
parser = argparse.ArgumentParser(
description="Determine conformation of carbon rings by comparing them to conformation "
"templates. Execution results will be stored in output dir, only brief "
"statistics are printed to standard output by default (this behaviour can "
"be altered by script arguments, see documentation of supported options).")
parser.add_argument("-t", "--templates_dir", type=Path, default=_Storage.templates_dir,
help=f"path to directory containing templates (default: "
f"'{_Storage.templates_dir}')")
parser.add_argument("-i", "--input_dir", type=Path, default=_Storage.input_dir,
help=f"path to directory containing input files (default: "
f"'{_Storage.input_dir}')")
parser.add_argument("-o", "--output_dir", type=Path, default=_Storage.output_dir,
help=f"path to directory to be filled with results (will be created if "
f"does not exist, (default: '{_Storage.output_dir}')")
parser.add_argument("-e", "--SB_executable_path", type=Path,
default=_Storage.SB_executable_path,
help=f"path to SiteBinder executable (default: "
f"'{_Storage.SB_executable_path}')")
parser.add_argument("-d", "--delta_RMSD_tolerance", type=float, default=_Storage.tolerance,
help="highest acceptable RMSD deviation between conformation template and "
"tested molecule to be considered being in given conformation")
parser.add_argument("-c", "--crossplatform_runner", type=str, default='',
help="command used to run SiteBinder executable (for Windows), typically "
"'mono' or 'wine' on Unix-like systems; if specified, analysis will "
"be executed as 'CROSS-PLATFORM_RUNNER PATH/TO/SB_EXECUTABLE "
"input.txt rmsd.csv pairings.csv'")
parser.add_argument("-l", "--print_list", action="store_true",
help="print the execution results as list of molecules to standard output "
"(can be also found later in output directory)")
parser.add_argument("-r", "--print_RMSD_chart", action="store_true",
help="print the execution results as RMSD chart in CSV format to standard "
"output (can be also found later in output directory)")
parser.add_argument("-S", "--print_summary_off", action="store_true",
help="do not print the execution results as brief statistics to standard "
"output (still can be found later in output directory)")
args = parser.parse_args()
# load script parameters
_Storage.templates_dir = args.templates_dir
_Storage.input_dir = args.input_dir
_Storage.output_dir = args.output_dir
_Storage.SB_executable_path = args.SB_executable_path
_Storage.crossplatform_runner = args.crossplatform_runner
_Storage.tolerance = args.delta_RMSD_tolerance
_Storage.print_list = args.print_list
_Storage.print_RMSD_chart = args.print_RMSD_chart
_Storage.print_summary = not args.print_summary_off
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logging.info(f"Starting ConfComparer on input data: {_Storage.input_dir}...\n"
f"Templates used: {_Storage.templates_dir}")
analysis = Analysis()
analysis.execute()
logging.info(f'ConfComparer has completed successfully')