diff --git a/Makefile b/Makefile index 66da412..656b70c 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,12 @@ test: ## Run everything but just with 3 batches to test full pipeline snakemake $(SMK_PARAMS) -j 99999 --config batches=data/batches_small.txt -- download # download is not benchmarked scripts/benchmark.py --log logs/benchmarks/test_match_$(DATETIME).txt "snakemake $(SMK_PARAMS) --config batches=data/batches_small.txt nb_best_hits=1 -- match" scripts/benchmark.py --log logs/benchmarks/test_map_$(DATETIME).txt "snakemake $(SMK_PARAMS) --config batches=data/batches_small.txt nb_best_hits=1 -- map" - diff -s <(gunzip --stdout output/reads_1___reads_2___reads_3___reads_4.sam_summary.gz | cut -f -3) <(xzcat data/reads_1___reads_2___reads_3___reads_4.sam_summary.xz | cut -f -3) + @if diff -q <(gunzip --stdout output/reads_1___reads_2___reads_3___reads_4.sam_summary.gz | cut -f -3) <(xzcat data/reads_1___reads_2___reads_3___reads_4.sam_summary.xz | cut -f -3); then\ + echo "Success! Test run produced the expected output.";\ + else\ + echo "WARNING: FAIL. Test run DID NOT produce the expected output.";\ + exit 1;\ + fi download: ## Download the 661k assemblies and COBS indexes, not benchmarked snakemake $(SMK_PARAMS) -j 99999 -- download diff --git a/Snakefile b/Snakefile index cd8bb4f..0e4afc9 100644 --- a/Snakefile +++ b/Snakefile @@ -28,8 +28,8 @@ def get_all_query_filenames(): def get_batches(): - return sorted([x.strip() for x in open(config["batches"])]) - + with open(config["batches"]) as fin: + return sorted([x.strip() for x in fin]) def get_filename_for_all_queries(): return "___".join(get_all_query_filenames()) diff --git a/readme.md b/readme.md index 688933e..60372d1 100644 --- a/readme.md +++ b/readme.md @@ -14,11 +14,6 @@ Others are non-standard (which you might need to install) and standard (which yo * `snakemake >= 6.2.0` * `mamba >= 0.20.0` -If you want to benchmark the pipeline and is on `Mac OS X`, you need to install `gnu-time`: -``` -brew install gnu-time -``` - ### Standard * `bash` * `make` @@ -28,6 +23,18 @@ brew install gnu-time * `grep` * `awk` * `diff` +* `cat` +* `gzip` +* `cut` + +### Benchmarking + +If you want to benchmark the pipeline and is on `Mac OS X`, you need to install `gnu-time`: +``` +brew install gnu-time +``` + +You will also get more benchmarking stats if `psutil` is installed. ## Walkthrough diff --git a/scripts/benchmark.py b/scripts/benchmark.py index 4dacf2e..c8cf90f 100755 --- a/scripts/benchmark.py +++ b/scripts/benchmark.py @@ -29,6 +29,8 @@ def main(): log_file = Path(args.log) log_file.parent.mkdir(parents=True, exist_ok=True) tmp_log_file = Path(f"{log_file}.tmp") + is_benchmarking_pipeline = args.command.split()[0] == "snakemake" + with open(log_file, "w") as log_fh: formatted_command = " ".join(args.command.replace("\\\n", " ").strip().split()) print(f"# Benchmarking command: {formatted_command}", file=log_fh) @@ -36,19 +38,39 @@ def main(): "real(s)", "sys(s)", "user(s)", "percent_CPU", "max_RAM(kb)", "FS_inputs", "FS_outputs", "elapsed_time_alt(s)" ] + if is_benchmarking_pipeline: + header.append("max_delta_system_RAM(kb)") print("\t".join(header), file=log_fh) time_command = get_time_command() benchmark_command = f'{time_command} -o {tmp_log_file} -f "%e\t%S\t%U\t%P\t%M\t%I\t%O"' start_time = datetime.datetime.now() - subprocess.check_call(f'{benchmark_command} {args.command}', shell=True) + main_process = subprocess.Popen(f'{benchmark_command} {args.command}', shell=True) + if is_benchmarking_pipeline: + RAM_tmp_log_file = Path(f"{log_file}.RAM.tmp") + RAM_benchmarking_process = subprocess.Popen([sys.executable, "scripts/get_RAM_usage.py", str(RAM_tmp_log_file), + str(main_process.pid)]) + return_code = main_process.wait() + if return_code: + raise subprocess.CalledProcessError(return_code, main_process.args, + output=main_process.stdout, stderr=main_process.stderr) + end_time = datetime.datetime.now() elapsed_seconds = (end_time - start_time).total_seconds() with open(tmp_log_file) as log_fh_tmp, open(log_file, "a") as log_fh: log_line = log_fh_tmp.readline().strip() log_line += f"\t{elapsed_seconds}" + + if is_benchmarking_pipeline: + RAM_benchmarking_process.kill() + with open(RAM_tmp_log_file) as RAM_tmp_log_fh: + RAM_usage = RAM_tmp_log_fh.readline().strip() + log_line += f"\t{RAM_usage}" + RAM_tmp_log_file.unlink() + print(log_line, file=log_fh) + tmp_log_file.unlink() diff --git a/scripts/filter_queries.py b/scripts/filter_queries.py index 9707671..624c762 100755 --- a/scripts/filter_queries.py +++ b/scripts/filter_queries.py @@ -176,13 +176,11 @@ def _create_query_dict(fx_fn, keep_matches): def process_cobs_file(self, cobs_fn): for i, (qname, batch, matches) in enumerate(cobs_iterator(cobs_fn)): print(f"Processing batch {batch} query #{i} ({qname})", file=sys.stderr) - #try: - # _ = self._query_dict[qname] - #except KeyError: - # self._query_dict[qname] = SingleQuery(qname, self._keep_matches) - #print(f"qname {qname} batch {batch} matches {matches}") + try: + _ = self._query_dict[qname] + except KeyError: + self._query_dict[qname] = SingleQuery(qname, self._keep_matches) self._query_dict[qname].add_matches(batch, matches) - #print(qname) def print_tsv_summary(self): d = self._query_dict diff --git a/scripts/get_RAM_usage.py b/scripts/get_RAM_usage.py new file mode 100644 index 0000000..07928e3 --- /dev/null +++ b/scripts/get_RAM_usage.py @@ -0,0 +1,25 @@ +import time +import sys +from pathlib import Path + +log_file = Path(sys.argv[1]) +log_file.parent.mkdir(parents=True, exist_ok=True) + +initial_RAM = None +max_RAM_usage = 0 + +try: + import psutil + while True: + RAM_usage = psutil.virtual_memory().used + RAM_usage = RAM_usage / 1024 + max_RAM_usage = max(max_RAM_usage, RAM_usage) + if initial_RAM is None: + initial_RAM = RAM_usage + mof_search_RAM_usage = max_RAM_usage - initial_RAM + with open(log_file, "w", buffering=1024) as fout: + fout.write(f"{mof_search_RAM_usage:.2f}") + time.sleep(0.1) +except ImportError: + with open(log_file, "w", buffering=1024) as fout: + fout.write("N/A (psutil not installed)") diff --git a/scripts/test_xz.py b/scripts/test_xz.py index 761e8bf..676ffe1 100755 --- a/scripts/test_xz.py +++ b/scripts/test_xz.py @@ -1,10 +1,8 @@ #! /usr/bin/env python3 import argparse -import collections import lzma import os -import re import sys diff --git a/truth_pipeline/Snakefile b/truth_pipeline/Snakefile new file mode 100644 index 0000000..70644d2 --- /dev/null +++ b/truth_pipeline/Snakefile @@ -0,0 +1,25 @@ +configfile: "config.yaml" + +from pathlib import Path + +batches = [f"batch_{i:03}" for i in range(662)] +assemblies_dir = Path(config["assemblies_dir"]) +kmer_size = int(config["k"]) + + +rule all: + input: [f"output/{batch}.counts.txt" for batch in batches] + + +rule count_kmers: + input: + assembly_batch=f"{assemblies_dir}/{{batch}}", + reads=config["reads"] + output: + kmer_counts = "output/{batch}.counts.txt" + conda: + "envs/jellyfish.yaml" + shadow: "shallow" + params: + k=kmer_size + script: "scripts/count_kmers.py" diff --git a/truth_pipeline/config.yaml b/truth_pipeline/config.yaml new file mode 100644 index 0000000..e0aeb87 --- /dev/null +++ b/truth_pipeline/config.yaml @@ -0,0 +1,3 @@ +assemblies_dir: "/hps/nobackup/iqbal/leandro/cobs/compress_all_data/Assemblies" +reads: /hps/nobackup/iqbal/leandro/cobs/mof-search/truth_pipeline/reads.fa +k: 31 \ No newline at end of file diff --git a/truth_pipeline/envs/jellyfish.yaml b/truth_pipeline/envs/jellyfish.yaml new file mode 100644 index 0000000..8746275 --- /dev/null +++ b/truth_pipeline/envs/jellyfish.yaml @@ -0,0 +1,4 @@ +channels: + - bioconda +dependencies: + - jellyfish=2.2.10 diff --git a/truth_pipeline/run_local.sh b/truth_pipeline/run_local.sh new file mode 100755 index 0000000..6e79abc --- /dev/null +++ b/truth_pipeline/run_local.sh @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +snakemake -j4 --use-singularity --use-conda -p diff --git a/truth_pipeline/scripts/count_kmers.py b/truth_pipeline/scripts/count_kmers.py new file mode 100644 index 0000000..20cbb70 --- /dev/null +++ b/truth_pipeline/scripts/count_kmers.py @@ -0,0 +1,33 @@ +from pathlib import Path +import subprocess + + +def count_kmers(batch_dir: Path, reads_file: Path, kmer_size: int, kmer_counts_filename: Path): + with open(kmer_counts_filename, "w") as kmer_counts_fh: + for file in batch_dir.iterdir(): + if file.name.endswith(".fa.gz"): + file = file.resolve() + subprocess.check_call(f"zcat {file} | jellyfish count --canonical -m {kmer_size} -s 100M -t 1 --if {reads_file} /dev/fd/0", shell=True) + kmer_counts = subprocess.check_output(f"jellyfish dump mer_counts.jf", shell=True) + kmer_counts = kmer_counts.decode("utf-8") + kmer_counts = kmer_counts.strip().split("\n") + + kmer_present = 0 + kmer_absent = 0 + for line in kmer_counts: + if line[0] == ">": + line = line.strip() + if line != ">0": + kmer_present += 1 + else: + kmer_absent += 1 + sample = file.name.split(".")[0] + print( + f"{sample} {kmer_present} {kmer_absent} {kmer_present / (kmer_present + kmer_absent)}", + file=kmer_counts_fh) + + +count_kmers(Path(snakemake.input.assembly_batch), + Path(snakemake.input.reads), + int(snakemake.params.k), + Path(snakemake.output.kmer_counts)) diff --git a/truth_pipeline/submit_lsf.sh b/truth_pipeline/submit_lsf.sh new file mode 100755 index 0000000..6faa411 --- /dev/null +++ b/truth_pipeline/submit_lsf.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +set -eux + +MEMORY=2000 +PROFILE="lsf" +LOG_DIR=logs/ +JOB_NAME="snakemake_mof_search_truth_pipeline."$(date --iso-8601='minutes') + +mkdir -p $LOG_DIR + +bsub -R "select[mem>$MEMORY] rusage[mem=$MEMORY] span[hosts=1]" \ + -M "$MEMORY" \ + -o "$LOG_DIR"/"$JOB_NAME".o \ + -e "$LOG_DIR"/"$JOB_NAME".e \ + -J "$JOB_NAME" \ + snakemake --profile "$PROFILE" "$@" + +exit 0