Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uses psutil to measure RAM, add a pipeline to check the truth kmer intersection with a query and other misc changes #222

Merged
merged 22 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ test: ## Run everything but just with 3 batches to test full pipeline
snakemake $(SMK_PARAMS) -j 99999 --config batches=data/batches_small.txt -- download # download is not benchmarked
scripts/benchmark.py --log logs/benchmarks/test_match_$(DATETIME).txt "snakemake $(SMK_PARAMS) --config batches=data/batches_small.txt nb_best_hits=1 -- match"
scripts/benchmark.py --log logs/benchmarks/test_map_$(DATETIME).txt "snakemake $(SMK_PARAMS) --config batches=data/batches_small.txt nb_best_hits=1 -- map"
diff -s <(gunzip --stdout output/reads_1___reads_2___reads_3___reads_4.sam_summary.gz | cut -f -3) <(xzcat data/reads_1___reads_2___reads_3___reads_4.sam_summary.xz | cut -f -3)
@if diff -q <(gunzip --stdout output/reads_1___reads_2___reads_3___reads_4.sam_summary.gz | cut -f -3) <(xzcat data/reads_1___reads_2___reads_3___reads_4.sam_summary.xz | cut -f -3); then\
echo "Success! Test run produced the expected output.";\
else\
echo "WARNING: FAIL. Test run DID NOT produce the expected output.";\
exit 1;\
fi

download: ## Download the 661k assemblies and COBS indexes, not benchmarked
snakemake $(SMK_PARAMS) -j 99999 -- download
Expand Down
4 changes: 2 additions & 2 deletions Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def get_all_query_filenames():


def get_batches():
return sorted([x.strip() for x in open(config["batches"])])

with open(config["batches"]) as fin:
return sorted([x.strip() for x in fin])

def get_filename_for_all_queries():
return "___".join(get_all_query_filenames())
Expand Down
17 changes: 12 additions & 5 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ Others are non-standard (which you might need to install) and standard (which yo
* `snakemake >= 6.2.0`
* `mamba >= 0.20.0`

If you want to benchmark the pipeline and is on `Mac OS X`, you need to install `gnu-time`:
```
brew install gnu-time
```

### Standard
* `bash`
* `make`
Expand All @@ -28,6 +23,18 @@ brew install gnu-time
* `grep`
* `awk`
* `diff`
* `cat`
* `gzip`
* `cut`

### Benchmarking

If you want to benchmark the pipeline and is on `Mac OS X`, you need to install `gnu-time`:
```
brew install gnu-time
```

You will also get more benchmarking stats if `psutil` is installed.

## Walkthrough

Expand Down
24 changes: 23 additions & 1 deletion scripts/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,48 @@ def main():
log_file = Path(args.log)
log_file.parent.mkdir(parents=True, exist_ok=True)
tmp_log_file = Path(f"{log_file}.tmp")
is_benchmarking_pipeline = args.command.split()[0] == "snakemake"

with open(log_file, "w") as log_fh:
formatted_command = " ".join(args.command.replace("\\\n", " ").strip().split())
print(f"# Benchmarking command: {formatted_command}", file=log_fh)
header = [
"real(s)", "sys(s)", "user(s)", "percent_CPU", "max_RAM(kb)", "FS_inputs", "FS_outputs",
"elapsed_time_alt(s)"
]
if is_benchmarking_pipeline:
header.append("max_delta_system_RAM(kb)")
print("\t".join(header), file=log_fh)

time_command = get_time_command()
benchmark_command = f'{time_command} -o {tmp_log_file} -f "%e\t%S\t%U\t%P\t%M\t%I\t%O"'

start_time = datetime.datetime.now()
subprocess.check_call(f'{benchmark_command} {args.command}', shell=True)
main_process = subprocess.Popen(f'{benchmark_command} {args.command}', shell=True)
if is_benchmarking_pipeline:
RAM_tmp_log_file = Path(f"{log_file}.RAM.tmp")
RAM_benchmarking_process = subprocess.Popen([sys.executable, "scripts/get_RAM_usage.py", str(RAM_tmp_log_file),
str(main_process.pid)])
return_code = main_process.wait()
if return_code:
raise subprocess.CalledProcessError(return_code, main_process.args,
output=main_process.stdout, stderr=main_process.stderr)

end_time = datetime.datetime.now()
elapsed_seconds = (end_time - start_time).total_seconds()
with open(tmp_log_file) as log_fh_tmp, open(log_file, "a") as log_fh:
log_line = log_fh_tmp.readline().strip()
log_line += f"\t{elapsed_seconds}"

if is_benchmarking_pipeline:
RAM_benchmarking_process.kill()
with open(RAM_tmp_log_file) as RAM_tmp_log_fh:
RAM_usage = RAM_tmp_log_fh.readline().strip()
log_line += f"\t{RAM_usage}"
RAM_tmp_log_file.unlink()

print(log_line, file=log_fh)

tmp_log_file.unlink()


Expand Down
10 changes: 4 additions & 6 deletions scripts/filter_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,11 @@ def _create_query_dict(fx_fn, keep_matches):
def process_cobs_file(self, cobs_fn):
for i, (qname, batch, matches) in enumerate(cobs_iterator(cobs_fn)):
print(f"Processing batch {batch} query #{i} ({qname})", file=sys.stderr)
#try:
# _ = self._query_dict[qname]
#except KeyError:
# self._query_dict[qname] = SingleQuery(qname, self._keep_matches)
#print(f"qname {qname} batch {batch} matches {matches}")
try:
_ = self._query_dict[qname]
except KeyError:
self._query_dict[qname] = SingleQuery(qname, self._keep_matches)
self._query_dict[qname].add_matches(batch, matches)
#print(qname)

def print_tsv_summary(self):
d = self._query_dict
Expand Down
25 changes: 25 additions & 0 deletions scripts/get_RAM_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import time
import sys
from pathlib import Path

log_file = Path(sys.argv[1])
log_file.parent.mkdir(parents=True, exist_ok=True)

initial_RAM = None
max_RAM_usage = 0

try:
import psutil
while True:
RAM_usage = psutil.virtual_memory().used
RAM_usage = RAM_usage / 1024
max_RAM_usage = max(max_RAM_usage, RAM_usage)
if initial_RAM is None:
initial_RAM = RAM_usage
mof_search_RAM_usage = max_RAM_usage - initial_RAM
with open(log_file, "w", buffering=1024) as fout:
fout.write(f"{mof_search_RAM_usage:.2f}")
time.sleep(0.1)
except ImportError:
with open(log_file, "w", buffering=1024) as fout:
fout.write("N/A (psutil not installed)")
2 changes: 0 additions & 2 deletions scripts/test_xz.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#! /usr/bin/env python3

import argparse
import collections
import lzma
import os
import re
import sys


Expand Down
25 changes: 25 additions & 0 deletions truth_pipeline/Snakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
configfile: "config.yaml"

from pathlib import Path

batches = [f"batch_{i:03}" for i in range(662)]
assemblies_dir = Path(config["assemblies_dir"])
kmer_size = int(config["k"])


rule all:
input: [f"output/{batch}.counts.txt" for batch in batches]


rule count_kmers:
input:
assembly_batch=f"{assemblies_dir}/{{batch}}",
reads=config["reads"]
output:
kmer_counts = "output/{batch}.counts.txt"
conda:
"envs/jellyfish.yaml"
shadow: "shallow"
params:
k=kmer_size
script: "scripts/count_kmers.py"
3 changes: 3 additions & 0 deletions truth_pipeline/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
assemblies_dir: "/hps/nobackup/iqbal/leandro/cobs/compress_all_data/Assemblies"
reads: /hps/nobackup/iqbal/leandro/cobs/mof-search/truth_pipeline/reads.fa
k: 31
4 changes: 4 additions & 0 deletions truth_pipeline/envs/jellyfish.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
channels:
- bioconda
dependencies:
- jellyfish=2.2.10
2 changes: 2 additions & 0 deletions truth_pipeline/run_local.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/usr/bin/env bash
snakemake -j4 --use-singularity --use-conda -p
33 changes: 33 additions & 0 deletions truth_pipeline/scripts/count_kmers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from pathlib import Path
import subprocess


def count_kmers(batch_dir: Path, reads_file: Path, kmer_size: int, kmer_counts_filename: Path):
with open(kmer_counts_filename, "w") as kmer_counts_fh:
for file in batch_dir.iterdir():
if file.name.endswith(".fa.gz"):
file = file.resolve()
subprocess.check_call(f"zcat {file} | jellyfish count --canonical -m {kmer_size} -s 100M -t 1 --if {reads_file} /dev/fd/0", shell=True)
kmer_counts = subprocess.check_output(f"jellyfish dump mer_counts.jf", shell=True)
kmer_counts = kmer_counts.decode("utf-8")
kmer_counts = kmer_counts.strip().split("\n")

kmer_present = 0
kmer_absent = 0
for line in kmer_counts:
if line[0] == ">":
line = line.strip()
if line != ">0":
kmer_present += 1
else:
kmer_absent += 1
sample = file.name.split(".")[0]
print(
f"{sample} {kmer_present} {kmer_absent} {kmer_present / (kmer_present + kmer_absent)}",
file=kmer_counts_fh)


count_kmers(Path(snakemake.input.assembly_batch),
Path(snakemake.input.reads),
int(snakemake.params.k),
Path(snakemake.output.kmer_counts))
18 changes: 18 additions & 0 deletions truth_pipeline/submit_lsf.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env bash
set -eux

MEMORY=2000
PROFILE="lsf"
LOG_DIR=logs/
JOB_NAME="snakemake_mof_search_truth_pipeline."$(date --iso-8601='minutes')

mkdir -p $LOG_DIR

bsub -R "select[mem>$MEMORY] rusage[mem=$MEMORY] span[hosts=1]" \
-M "$MEMORY" \
-o "$LOG_DIR"/"$JOB_NAME".o \
-e "$LOG_DIR"/"$JOB_NAME".e \
-J "$JOB_NAME" \
snakemake --profile "$PROFILE" "$@"

exit 0