Skip to content

Commit

Permalink
Merge pull request #222 from leoisl/fix_188
Browse files Browse the repository at this point in the history
Uses psutil to measure RAM, add a pipeline to check the truth kmer intersection with a query and other misc changes
  • Loading branch information
karel-brinda authored Apr 3, 2023
2 parents 56725cf + 768d768 commit b8f69f6
Show file tree
Hide file tree
Showing 13 changed files with 157 additions and 17 deletions.
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ test: ## Run everything but just with 3 batches to test full pipeline
snakemake $(SMK_PARAMS) -j 99999 --config batches=data/batches_small.txt -- download # download is not benchmarked
scripts/benchmark.py --log logs/benchmarks/test_match_$(DATETIME).txt "snakemake $(SMK_PARAMS) --config batches=data/batches_small.txt nb_best_hits=1 -- match"
scripts/benchmark.py --log logs/benchmarks/test_map_$(DATETIME).txt "snakemake $(SMK_PARAMS) --config batches=data/batches_small.txt nb_best_hits=1 -- map"
diff -s <(gunzip --stdout output/reads_1___reads_2___reads_3___reads_4.sam_summary.gz | cut -f -3) <(xzcat data/reads_1___reads_2___reads_3___reads_4.sam_summary.xz | cut -f -3)
@if diff -q <(gunzip --stdout output/reads_1___reads_2___reads_3___reads_4.sam_summary.gz | cut -f -3) <(xzcat data/reads_1___reads_2___reads_3___reads_4.sam_summary.xz | cut -f -3); then\
echo "Success! Test run produced the expected output.";\
else\
echo "WARNING: FAIL. Test run DID NOT produce the expected output.";\
exit 1;\
fi

download: ## Download the 661k assemblies and COBS indexes, not benchmarked
snakemake $(SMK_PARAMS) -j 99999 -- download
Expand Down
4 changes: 2 additions & 2 deletions Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def get_all_query_filenames():


def get_batches():
return sorted([x.strip() for x in open(config["batches"])])

with open(config["batches"]) as fin:
return sorted([x.strip() for x in fin])

def get_filename_for_all_queries():
return "___".join(get_all_query_filenames())
Expand Down
17 changes: 12 additions & 5 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ Others are non-standard (which you might need to install) and standard (which yo
* `snakemake >= 6.2.0`
* `mamba >= 0.20.0`

If you want to benchmark the pipeline and is on `Mac OS X`, you need to install `gnu-time`:
```
brew install gnu-time
```

### Standard
* `bash`
* `make`
Expand All @@ -28,6 +23,18 @@ brew install gnu-time
* `grep`
* `awk`
* `diff`
* `cat`
* `gzip`
* `cut`

### Benchmarking

If you want to benchmark the pipeline and is on `Mac OS X`, you need to install `gnu-time`:
```
brew install gnu-time
```

You will also get more benchmarking stats if `psutil` is installed.

## Walkthrough

Expand Down
24 changes: 23 additions & 1 deletion scripts/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,48 @@ def main():
log_file = Path(args.log)
log_file.parent.mkdir(parents=True, exist_ok=True)
tmp_log_file = Path(f"{log_file}.tmp")
is_benchmarking_pipeline = args.command.split()[0] == "snakemake"

with open(log_file, "w") as log_fh:
formatted_command = " ".join(args.command.replace("\\\n", " ").strip().split())
print(f"# Benchmarking command: {formatted_command}", file=log_fh)
header = [
"real(s)", "sys(s)", "user(s)", "percent_CPU", "max_RAM(kb)", "FS_inputs", "FS_outputs",
"elapsed_time_alt(s)"
]
if is_benchmarking_pipeline:
header.append("max_delta_system_RAM(kb)")
print("\t".join(header), file=log_fh)

time_command = get_time_command()
benchmark_command = f'{time_command} -o {tmp_log_file} -f "%e\t%S\t%U\t%P\t%M\t%I\t%O"'

start_time = datetime.datetime.now()
subprocess.check_call(f'{benchmark_command} {args.command}', shell=True)
main_process = subprocess.Popen(f'{benchmark_command} {args.command}', shell=True)
if is_benchmarking_pipeline:
RAM_tmp_log_file = Path(f"{log_file}.RAM.tmp")
RAM_benchmarking_process = subprocess.Popen([sys.executable, "scripts/get_RAM_usage.py", str(RAM_tmp_log_file),
str(main_process.pid)])
return_code = main_process.wait()
if return_code:
raise subprocess.CalledProcessError(return_code, main_process.args,
output=main_process.stdout, stderr=main_process.stderr)

end_time = datetime.datetime.now()
elapsed_seconds = (end_time - start_time).total_seconds()
with open(tmp_log_file) as log_fh_tmp, open(log_file, "a") as log_fh:
log_line = log_fh_tmp.readline().strip()
log_line += f"\t{elapsed_seconds}"

if is_benchmarking_pipeline:
RAM_benchmarking_process.kill()
with open(RAM_tmp_log_file) as RAM_tmp_log_fh:
RAM_usage = RAM_tmp_log_fh.readline().strip()
log_line += f"\t{RAM_usage}"
RAM_tmp_log_file.unlink()

print(log_line, file=log_fh)

tmp_log_file.unlink()


Expand Down
10 changes: 4 additions & 6 deletions scripts/filter_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,11 @@ def _create_query_dict(fx_fn, keep_matches):
def process_cobs_file(self, cobs_fn):
for i, (qname, batch, matches) in enumerate(cobs_iterator(cobs_fn)):
print(f"Processing batch {batch} query #{i} ({qname})", file=sys.stderr)
#try:
# _ = self._query_dict[qname]
#except KeyError:
# self._query_dict[qname] = SingleQuery(qname, self._keep_matches)
#print(f"qname {qname} batch {batch} matches {matches}")
try:
_ = self._query_dict[qname]
except KeyError:
self._query_dict[qname] = SingleQuery(qname, self._keep_matches)
self._query_dict[qname].add_matches(batch, matches)
#print(qname)

def print_tsv_summary(self):
d = self._query_dict
Expand Down
25 changes: 25 additions & 0 deletions scripts/get_RAM_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import time
import sys
from pathlib import Path

log_file = Path(sys.argv[1])
log_file.parent.mkdir(parents=True, exist_ok=True)

initial_RAM = None
max_RAM_usage = 0

try:
import psutil
while True:
RAM_usage = psutil.virtual_memory().used
RAM_usage = RAM_usage / 1024
max_RAM_usage = max(max_RAM_usage, RAM_usage)
if initial_RAM is None:
initial_RAM = RAM_usage
mof_search_RAM_usage = max_RAM_usage - initial_RAM
with open(log_file, "w", buffering=1024) as fout:
fout.write(f"{mof_search_RAM_usage:.2f}")
time.sleep(0.1)
except ImportError:
with open(log_file, "w", buffering=1024) as fout:
fout.write("N/A (psutil not installed)")
2 changes: 0 additions & 2 deletions scripts/test_xz.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#! /usr/bin/env python3

import argparse
import collections
import lzma
import os
import re
import sys


Expand Down
25 changes: 25 additions & 0 deletions truth_pipeline/Snakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
configfile: "config.yaml"

from pathlib import Path

batches = [f"batch_{i:03}" for i in range(662)]
assemblies_dir = Path(config["assemblies_dir"])
kmer_size = int(config["k"])


rule all:
input: [f"output/{batch}.counts.txt" for batch in batches]


rule count_kmers:
input:
assembly_batch=f"{assemblies_dir}/{{batch}}",
reads=config["reads"]
output:
kmer_counts = "output/{batch}.counts.txt"
conda:
"envs/jellyfish.yaml"
shadow: "shallow"
params:
k=kmer_size
script: "scripts/count_kmers.py"
3 changes: 3 additions & 0 deletions truth_pipeline/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
assemblies_dir: "/hps/nobackup/iqbal/leandro/cobs/compress_all_data/Assemblies"
reads: /hps/nobackup/iqbal/leandro/cobs/mof-search/truth_pipeline/reads.fa
k: 31
4 changes: 4 additions & 0 deletions truth_pipeline/envs/jellyfish.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
channels:
- bioconda
dependencies:
- jellyfish=2.2.10
2 changes: 2 additions & 0 deletions truth_pipeline/run_local.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/usr/bin/env bash
snakemake -j4 --use-singularity --use-conda -p
33 changes: 33 additions & 0 deletions truth_pipeline/scripts/count_kmers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from pathlib import Path
import subprocess


def count_kmers(batch_dir: Path, reads_file: Path, kmer_size: int, kmer_counts_filename: Path):
with open(kmer_counts_filename, "w") as kmer_counts_fh:
for file in batch_dir.iterdir():
if file.name.endswith(".fa.gz"):
file = file.resolve()
subprocess.check_call(f"zcat {file} | jellyfish count --canonical -m {kmer_size} -s 100M -t 1 --if {reads_file} /dev/fd/0", shell=True)
kmer_counts = subprocess.check_output(f"jellyfish dump mer_counts.jf", shell=True)
kmer_counts = kmer_counts.decode("utf-8")
kmer_counts = kmer_counts.strip().split("\n")

kmer_present = 0
kmer_absent = 0
for line in kmer_counts:
if line[0] == ">":
line = line.strip()
if line != ">0":
kmer_present += 1
else:
kmer_absent += 1
sample = file.name.split(".")[0]
print(
f"{sample} {kmer_present} {kmer_absent} {kmer_present / (kmer_present + kmer_absent)}",
file=kmer_counts_fh)


count_kmers(Path(snakemake.input.assembly_batch),
Path(snakemake.input.reads),
int(snakemake.params.k),
Path(snakemake.output.kmer_counts))
18 changes: 18 additions & 0 deletions truth_pipeline/submit_lsf.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env bash
set -eux

MEMORY=2000
PROFILE="lsf"
LOG_DIR=logs/
JOB_NAME="snakemake_mof_search_truth_pipeline."$(date --iso-8601='minutes')

mkdir -p $LOG_DIR

bsub -R "select[mem>$MEMORY] rusage[mem=$MEMORY] span[hosts=1]" \
-M "$MEMORY" \
-o "$LOG_DIR"/"$JOB_NAME".o \
-e "$LOG_DIR"/"$JOB_NAME".e \
-J "$JOB_NAME" \
snakemake --profile "$PROFILE" "$@"

exit 0

0 comments on commit b8f69f6

Please sign in to comment.