Skip to content

Commit

Permalink
refactored prep nmr
Browse files Browse the repository at this point in the history
  • Loading branch information
MAlberts committed Mar 19, 2024
1 parent d8e3928 commit 00a06dc
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 291 deletions.
276 changes: 77 additions & 199 deletions src/nmr_to_structure/prepare_input/nmr_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime
from functools import lru_cache
from pathlib import Path
from typing import Any, Dict, KeysView, List, Optional, Tuple
from typing import Any, Dict, KeysView, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
Expand All @@ -10,6 +10,7 @@
from rdkit.Chem import AllChem, rdMolDescriptors
from rxn.chemutils.tokenization import tokenize_smiles
from sklearn.model_selection import train_test_split
from functools import partial

DEFAULT_SEED = 3246
DEFAULT_NON_MATCHING_TOKEN = "<no_match> <no_match> <no_match> <no_match> <no_match> <no_match> <no_match> <no_match> <no_match> <no_match>"
Expand All @@ -18,7 +19,7 @@

# General Utilities #
def tokenize_formula(formula: str) -> list:
return re.findall("[A-Z][a-z]?|\d+|.", formula)
return ' '.join(re.findall("[A-Z][a-z]?|\d+|.", formula)) + ' '


def jitter(value: float, jitter_range: float = 2) -> float:
Expand Down Expand Up @@ -55,12 +56,12 @@ def evaluate_molecule(smiles: str) -> bool:
def save_set(data_set: pd.DataFrame, out_path: Path, set_type: str) -> None:
out_path.mkdir(parents=True, exist_ok=True)

smiles = list(data_set.smiles)
smiles = list(data_set.target)
with (out_path / f"tgt-{set_type}.txt").open("w") as f:
for item in smiles:
f.write(f"{item}\n")

nmr_input = data_set.nmr_input
nmr_input = data_set.source
with (out_path / f"src-{set_type}.txt").open("w") as f:
for item in nmr_input:
f.write(f"{item}\n")
Expand All @@ -84,205 +85,82 @@ def log_file_name_from_time(prefix: Optional[str] = None) -> str:
return prefix + "-" + now_formatted + ".log"


# Functions for making NMR strings #
def build_1H_peak(
HNMR_sim_peaks: dict,
peak: str,
jitter_peaks: bool = False,
mode: str = "adaptive",
token_space: str = "separate",
) -> Tuple[float, str]:
if (
HNMR_sim_peaks[peak]["rangeMax"] - HNMR_sim_peaks[peak]["rangeMin"] > 0.15
and mode == "adaptive"
) or mode == "range":
max_val = (
jitter(round(HNMR_sim_peaks[peak]["rangeMax"], 2), jitter_range=0.2)
if jitter_peaks
else round(HNMR_sim_peaks[peak]["rangeMax"], 2)
)
min_val = (
jitter(round(HNMR_sim_peaks[peak]["rangeMin"], 2), jitter_range=0.2)
if jitter_peaks
else round(HNMR_sim_peaks[peak]["rangeMin"], 2)
)

if token_space == "separate":
max_val = f"1H{max_val:.2f}"
min_val = f"1H{min_val:.2f}"
elif token_space == "shared":
max_val = f"{max_val * 10:.1f}"
min_val = f"{min_val * 10:.1f}"

peak_string = "| {} {} {} {}H ".format(
min_val,
max_val,
HNMR_sim_peaks[peak]["category"],
HNMR_sim_peaks[peak]["nH"],
)

return HNMR_sim_peaks[peak]["rangeMax"], peak_string

else:
centroid = (
jitter(round(HNMR_sim_peaks[peak]["centroid"], 2), jitter_range=0.2)
if jitter_peaks
else round(HNMR_sim_peaks[peak]["centroid"], 2)
)

if token_space == "separate":
centroid = f"1H{centroid:.2f}"
elif token_space == "shared":
centroid = f"{centroid * 10:.1f}"

peak_string = "| {} {} {}H ".format(
centroid, HNMR_sim_peaks[peak]["category"], HNMR_sim_peaks[peak]["nH"]
)

return HNMR_sim_peaks[peak]["centroid"], peak_string


def build_hnmr_string(
smiles: str,
peak_dict: dict,
mode: str = "adaptive",
header: bool = True,
token_space: str = "same",
n_aug: int = 0,
) -> List[str]:
# Construct NMR string

mol = Chem.MolFromSmiles(smiles)
formula = rdMolDescriptors.CalcMolFormula(mol)

if header:
formula_split = tokenize_formula(formula)
formula_tokenized = " ".join(list(filter(None, formula_split)))
nmr_header = f"{formula_tokenized} 1HNMR "
else:
nmr_header = "1HNMR "

peak_strings = list()

for i in range(n_aug + 1):
# No augmentation for the first set
processed_peak = dict()
for peak in peak_dict.keys():
peak_pos, peak_string = build_1H_peak(
peak_dict,
peak,
jitter_peaks=True if i > 0 else False,
mode=mode,
token_space=token_space,
)
processed_peak[peak_pos] = peak_string

# Order such that peaks are in ascending order
peak_string = nmr_header
for _, peak in sorted(processed_peak.items()):
peak_string = peak_string + peak

peak_strings.append(peak_string)

return peak_strings

def process_multiplet(
multiplets: List[Dict[str, Union[str, float, int]]],
encoding: str,
j_values: bool,
augment: bool
) -> Tuple[str, np.ndarray]:

multiplet_str = "1HNMR "
for peak in multiplets:
range_max = float(peak["rangeMax"])
range_min = float(peak["rangeMin"])
center = float(peak["centroid"])

if augment:
range_max = jitter(range_max, jitter_range=0.2)
range_min = jitter(range_min, jitter_range=0.2)
center = jitter(center, jitter_range=0.2)

formatted_peak = ""

if encoding == 'center':
formatted_peak = formatted_peak + "{:.2f} ".format(center)
elif encoding == 'range':
formatted_peak = formatted_peak + "{:.2f} {:.2f} ".format(range_max, range_min)
elif encoding == 'adaptive':
if (range_max - range_min) > 0.15:
formatted_peak = formatted_peak + "{:.2f} {:.2f} ".format(range_max, range_min)
else:
formatted_peak = formatted_peak + "{:.2f} ".format(center)
else:
raise ValueError(f"Unknown encoding {encoding}.")

formatted_peak = formatted_peak + "{} {}H ".format(
peak["category"],
peak["nH"],
)

js = str(peak["j_values"])
if j_values and js != "None":
split_js = js.split("_")
split_js = list(filter(None, split_js))

processed_js = ["{:.2f}".format(float(j)) for j in split_js]
formatted_js = "J " + " ".join(processed_js)

formatted_peak += formatted_js

multiplet_str += formatted_peak.strip() + " | "

# Remove last separating token
multiplet_str = multiplet_str[:-3]

return multiplet_str

def build_cnmr_string(
C_NMR_entry: dict,
header: bool = False,
smiles: Optional[str] = None,
token_space="shared",
n_aug: int = 0,
c_peaks: dict,
intensities: bool = False,
augment: bool = False,
) -> List[str]:
if header:
mol = Chem.MolFromSmiles(smiles)
formula = rdMolDescriptors.CalcMolFormula(mol)
formula_split = tokenize_formula(formula)
formula_tokenized = " ".join(list(filter(None, formula_split)))

nmr_header = f"{formula_tokenized} 13C_NMR"


nmr_string = "13C_NMR "

c_peaks_df = pd.DataFrame(list(c_peaks))
c_peaks_df = c_peaks_df.sort_values(by='delta (ppm)')

if augment:
jitter_fn = partial(jitter, jitter_range=0.5)
c_peaks_df['delta (ppm)'] = c_peaks_df['delta (ppm)'].apply(jitter_fn)

if intensities:
c_peaks_df.intensity = c_peaks_df.intensity / max(c_peaks_df.intensity)
c_peaks_df['peak_string'] = c_peaks_df.apply(lambda peak: "{:.1f} {:.1f} ".format(peak['delta (ppm)'], peak['intensity']), axis=1)
else:
nmr_header = "13C_NMR"

nmr_strings = list()
for i in range(n_aug + 1):
peaks = list()

for peak in C_NMR_entry["peaks"].values():
if peak["delta (ppm)"] > 230 or peak["delta (ppm)"] < -20:
continue

value = float(round(peak["delta (ppm)"], 1))
value_str = str(jitter(value, jitter_range=0.5) if i > 0 else value)

if token_space == "separate":
value_str = "13C" + str(value)

peaks.append(value_str)

peaks = sorted(peaks)

nmr_string = nmr_header
for peak in peaks:
nmr_string += f" {peak}"
nmr_strings.append(nmr_string)

return nmr_strings


def make_nmr(
mode: str,
component: str,
hnmr: Optional[dict] = None,
cnmr: Optional[dict] = None,
hnmr_mode: str = "range",
token_space: str = "shared",
) -> str:
if mode == "combined":
if hnmr is None or cnmr is None:
raise ValueError("For mode combined both hnmr and cnmr have to be defined.")

hnmr_string = build_hnmr_string(
smiles=component,
peak_dict=hnmr,
mode=hnmr_mode,
header=False,
token_space=token_space,
)[0]

cnmr_string = build_cnmr_string(
cnmr,
header=False,
token_space=token_space,
)[0]

nmr_string = f" {hnmr_string.strip()} {cnmr_string}"

elif mode == "hnmr":
if hnmr is None:
raise ValueError("For mode hnmr hnmr can't be None.")

hnmr_string = build_hnmr_string(
smiles=component,
peak_dict=hnmr,
mode=hnmr_mode,
header=False,
token_space=token_space,
)[0]

nmr_string = " " + hnmr_string

elif mode == "cnmr":
if cnmr is None:
raise ValueError("For mode cnmr cnmr can't be None.")

cnmr_string = build_cnmr_string(
cnmr,
header=False,
token_space=token_space,
)[0]
nmr_string = " " + cnmr_string
c_peaks_df['peak_string'] = c_peaks_df.apply(lambda peak: "{:.1f} ".format(peak['delta (ppm)']), axis=1)

nmr_string += ''.join(list(c_peaks_df['peak_string']))

return nmr_string

Expand Down
Loading

0 comments on commit 00a06dc

Please sign in to comment.