diff --git a/.flake8 b/.flake8 deleted file mode 100644 index 791c6ee..0000000 --- a/.flake8 +++ /dev/null @@ -1,5 +0,0 @@ -[flake8] -exclude = - build - docs - tests \ No newline at end of file diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 9dc9b79..adbc119 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -14,7 +14,7 @@ jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: mamba-org/setup-micromamba@main with: environment-file: env.yml diff --git a/.github/workflows/flake.yml b/.github/workflows/flake.yml deleted file mode 100644 index f16489f..0000000 --- a/.github/workflows/flake.yml +++ /dev/null @@ -1,23 +0,0 @@ -# Flake8 test the repo. - -name: Flake8 - -on: - push: - branches: [ prod, main, WD, manke] - pull_request: - branches: [ prod, main, WD, manke] - -jobs: - build: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - name: flake8 - uses: actions/checkout@v2 - with: - python-version: 3.9.2 - - run: | - python -m pip install --upgrade pip - pip install flake8==4.0.1 - flake8 diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..b6690e0 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,8 @@ +name: Ruff +on: [push, pull_request] +jobs: + ruff: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: chartboost/ruff-action@v1 \ No newline at end of file diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 93510b0..6b56fc9 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -16,7 +16,7 @@ jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: mamba-org/setup-micromamba@main with: environment-file: env.yml @@ -25,7 +25,7 @@ jobs: - name: build run: | micromamba activate dissectBCL - pip install ./ + pip install .[dev] - name: pytest run: | micromamba activate dissectBCL diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9a9fe5a..cacf087 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,5 @@ repos: -- repo: "https://github.com/pycqa/flake8" - rev: "6.0.0" - hooks: - - id: flake8 - exclude: ^(build|docs|tests) \ No newline at end of file +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.5.2 + hooks: + - id: ruff \ No newline at end of file diff --git a/AUTHORS b/AUTHORS deleted file mode 100644 index 79b6645..0000000 --- a/AUTHORS +++ /dev/null @@ -1,14 +0,0 @@ -A.s <62971995+adRn-s@users.noreply.github.com> -David Koppstein -FerallOut <49016186+FerallOut@users.noreply.github.com> -Juan Caballero -Katarzyna Sikora -Ward D -Ward Deboutte -WardDeb -adRn-s -adRn-s -adRn-s -katarzyna.otylia.sikora@gmail.com -katarzyna.otylia.sikora@gmail.com -thomasmanke diff --git a/ChangeLog b/ChangeLog deleted file mode 100644 index 9377de6..0000000 --- a/ChangeLog +++ /dev/null @@ -1,413 +0,0 @@ -CHANGES -======= - -* split up zebrafish contamination into mito - rrna - zebrafish -* or it wasn't found on fexList // let's just omit info, code is the doc (?) -* autoreformatted for flake8 rule -* fixes -* works in my server :P -* FutureWarning: Calling int on a single element Series is deprecated and will raise a TypeError in the future -* flake8 fix -* URL should be complete -* use 1 config param (URL) instead of 3 -* flake8 E302 -* get\_contact\_details API endpoint // deprecated userList text file -* Lanesplit samples (#173) -* make lanesplit check on sampleIDs rather then samplenames, and lane aware -* Mycoplasma implement, email update (#172) -* Contam emails (#171) -* ChangeLog -* actual seq\_data dir in email -* mycoplasma include in prep -* include mycoplasma hyorhinis -* docs updates (#170) -* auto version for docs, fix readthedocs yaml (#169) -* auto version for docs, fix readthedocs yaml -* Docs (#168) -* update changelog -* include authors -* make sure doc pytest includes reqs from the doc folder -* init readthedocs yaml, split up doc requirements -* seq\_data dirs & docs (#166) -* Allow new seq dirs when older are not full yet. (#165) -* Allow new seq dirs when older are not full yet -* adding documentation for certificate change problems (#162) -* Main (#164) -* force listing of PIdirs to be sorted (#163) -* force listing of PIdirs to be sorted -* Update usage.rst -* updating certificate issue -* adding documentation for certificate change problems - -v0.2.6 ------- - -* Main (#159) -* Dev wd (#158) -* include 20 in comment as well -* miseq reqdepth override to 20 -* flake -* move reqdepth change in classes due to parkour returning everything -* santize sample names (space in names) -* software version to debug log in main func -* Main (#157) -* Fix148 ks (#156) -* flake fix -* cleanup -* gitignore -* added ulautDestroyer to sampleSheetClass parser -* fix #153 (#154) -* bumped pandas version in env.yml -* attempted to fix the pandas error -* Main (#152) -* old GH action was deprecated -* Fix #148 (#149) -* accent fixes in umlautDestroyer - -v0.2.5 ------- - -* MiSeq (#146) -* bugfix rerun w/ existing samplesheet (P5RC flag missing if no .bak) -* escape validation w/ novaseq -* follow new seqfac structure -* CompletionStatus check and initiation of failedrun -* include kraken explanation from ini file in mqc as feature -* fly rename contam - -v0.2.4 ------- - -* purge apostrofs -* flake minor formatting -* update email with some red -* automated P5 RC in Miseqs run if all samples are empty -* purge apostrofs -* flake minor formatting -* update email with some red -* automated P5 RC in Miseqs run if all samples are empty -* purge apostrofs -* flake minor formatting -* update email with some red -* automated P5 RC in Miseqs run if all samples are empty - -v0.2.3 ------- - -* sometimes projects are omitted from a flow cell (budgetary regions). Escape Nonetypes in the mqc building -* escape nones in libtypes -* include non-ommitted empty samples in kraken for email -* flake fix/precommit version boost -* Finalize omit samples in email. Only display project exitstats of that specific lane -* flake fix/precommit version boost -* Finalize omit samples in email. Only display project exitstats of that specific lane -* colliding samples that are removed from a demuxsheet cause issues in postmux stage. This commit escapes those -* flake8 demux -* simplify the substr check -* changing scATAC protocols in parkour -* bugfix fex bool parse -* bugfix fex bool parse - -v0.2.2 ------- - -* bugfixes (#127) -* Wd (#125) -* Wd (#124) -* Wd (#122) - -v0.2.1 ------- - -* tests & empty report fix (#120) (#121) -* WIP - fixes (#119) - -v0.2.0 ------- - -* Wd (#113) (#114) -* Wd (#113) -* include abs gotten reads, bugfix with parkourOrg in empty samples (#111) -* logging a mode + force config -* log restructuring -* fq file ending check + regex soup -* fq file ending check -* exit email when project folder doesnt exist -* split up ercc & vectors -* make sure sampleID is set before entering try/except block (#100) -* make sure sampleID is set before entering try/except block -* Precommit (#96) -* include prod in tests (#95) -* detmask (#94) -* Fix no index (#92) -* Wd (#88) -* flake fix -* add a check group check for all files in wd40 rel -* Docs (#87) -* left join only include samples in origin ss / demuxsheet -* Docs (#83) -* executables docs -* ixdepth1 config.ini explanation -* Wd (#80) -* flake -* purge print kraken reps, include diskspace per transferred project -* broken link aeae28S -* flake fixes -* contaminome mosquito&drosC -* Wd (#79) -* include 10xscATAC case -* Wd (#76) -* reload setlog, escape empty kraken reports, flake -* move over sleeper -* fly to drosophila in contaminome -* Wd (#74) -* purge fq screen in conf, mentions etc -* kraken (#73) -* shift to kraken -* Wd (#72) -* update env name in tests -* strip name from env -* quickload config, badges -* fix badges -* badges -* rtfd (#71) -* merge -* add rtfd link, slight update docs - -v0.1.1 ------- - -* executables&docs (#70) -* env fix -* release syntax -* merge include wd40 rel -* Initiate kraken build contaminome db. purge most executables and ship into conda env initiate docs -* fixed bug with Cc and transfered bioinfo\_cc to Bcc -* transfered hardcoded filenames and signatures to ini-file -* fixed flake8 issues -* added cc to bioinfo-core -* added src/tools to include emailProjectFinished.py -* minor wd40 changes -* flake -* init diag -* include wd40 in repo -* sys exits accompanied by error emails -* flake -* add version to email now as well -* parse fexsend l newline -* switch to mamba -* fex cmds to log -* get rid of dev exit -* update mask/dualIx/overwritten barcodes -* force writing a demuxSheet that can be manipulated with mixed single-dual index runs -* Update misc.py -* strip wd env -* include version upon invocation -* flake fix -* strip print clutter -* rebase Merge branch 'main' of github.com:maxplanck-ie/dissectBCL into WD -* move reqs to conda, purge dissect\_test, add args - -v0.1.0 ------- - -* versioning in mqc is done -* fix the version used in project reports -* strip some CICD -* strip conf read test from tests for now -* try abspath for env act -* rename env.yml -* mambo nr 5 -* split up step -* drop double uses -* add checkout action -* conda fix actions -* try conda tests -* flake fix misc -* cap flake8 -* revert flake to pip install alone -* change flake to conda build -* switch build conda -* add certs for API requests. Choose key depending on test vs prod run -* fix flake -* specify cert for requests -* remove bin folder switch from setup.py to setup.cfg specify entry points in setup cfg specify regular and test entry points -* retain log.warning rather than log.info for easier readability -* replaced np.isnan with pandas.isna -* added branch manke to pytest.yml -* added checks to postmux:matchIDtoName -* potential bugfix pp? -* add interOp parsing to report back reads PF -* flake8 fix -* additional check lanesplit for clashing samples -* change from pd.Series to dict in sampleID accession -* purge flake8 file / update gitignore -* free py minor fixes -* FIX sampleID not found @ 220620\_M\*\_1 -* flake fix -* bugfix duplicated sampleNames -* remove fPath main fun -* babysteps on API walk -* Less checks, more patches 🙈 -* parkour parsing -* minor bugfix for projects with a space, and descrptions with \\` -* flake fix -* fix parkour pushes for MiSeq runs -* print out hammings for now -* purge space in projects -* bugfix minP5 in general mask, strip except email -* email on crash - -v0.0.2 ------- - -* get rid of PE vs SE in requested (assuming requested is allways fragments) -* mqc enhancements -* flake pass -* add in sampleID to email, bugfix R1\_perc Q30 in non-lane split flowcells -* bugfix nonlanesplit - -v0.0.1 ------- - -* pycov req -* crash on parkour failure -* flake -* small changes -* pytest update retBC/retIX -* flake fix -* minor bug fixes, add md5sum -* flake testing -* pytest updates -* getting started with datafiles -* pytest change -* workflows update -* start testing -* Update tests Merge branch 'WD' of github.com:maxplanck-ie/dissectBCL into WD -* work on tests -* postmux fin -* force ints mask -* greeter & scATAC demux -* flake, omit some debugs -* tiny bugfix parsing email/drHouse -* fex & finalisation -* flake -* parkour push implement -* make sure we dont reprocess bfq.py -* implement sleeper -* classes flake -* flake fakenews -* postmux flake -* misc flake -* houseFlake -* demux flake -* functioning seqreport in multiqc -* start config to add requested column in general stats -* remove seqrep funcs, add multiqc yaml parser -* remove seqrep funcs, introduce umlaut destroyer -* remove seqrep from dissect -* start reworking multiqc to omit sequencingreport.pdf -* move from pyyaml to ruamel.yaml for multiqc yaml dumping -* start purging seqRep.pdf -* keep track todo -* logfile reorg -* round of G/R in seqRep -* flake8 -* email updates -* dissect ini example -* ignore -* flake finished -* flake drhouse -* remove class returns because of redundancy, add some return strings to make log/debug downstream easier -* drHouse flake -* dissect flake -* flake test misc -* fakenews flaker -* class flake -* finalise email -* email updates&start copy -* bugfix indexType parsing -* multipage seqreps + long sample names -* seqrep splitter -* seqrep updates -* minor bug fixes -* flake fin -* fakenews flaked -* demux flake -* logger flake -* seq report fin -* seq report -* demux flake -* sequencing report updates -* pdf changes and logger -* parse demux stats and update ss class -* update general tex template -* tex templates for PE and SE seq -* rebuilding seq report -* flake fakenews -* init sequencing report -* multiqc -* flake -* qc updates -* init fqc/clump -* fastqc pool running -* fastqc implement -* renaming -* badgetest -* flake test -* add pip install to pytest -* Create pytest.yml -* purge placeholders -* testing hello world -* ignore update for tests -* include test setup -* Update misc.py -* readline -* small bugfix -* test protect -* testrm -* test -* readme update -* loosen python version -* Update flake.yml -* flake python 3.9.2 -* Update flake.yml -* Update flake.yml -* Update flake.yml -* Update flake.yml -* Update flake.yml -* Rename flake to flake.yml -* flaketest -* placeholders test -* flaketest -* index2 update -* flake -* demux flake -* fkaenews flake -* flakeDissect -* flakes classes -* cleanup -* flake -* demux -* Up until demuxing -* demuxSheet -* masking updates -* preFQ init -* logger start -* updates -* pops -* updates -* classDefs -* flake8 preFQ -* flake8 dissect.py -* purge egg build -* fq -* purge build -* executable -* gitignore -* flowcell finder -* preFQ start -* gitignore -* remove build -* init -* Initial commit diff --git a/docs/executables.rst b/docs/executables.rst index 79cd109..6f710e2 100644 --- a/docs/executables.rst +++ b/docs/executables.rst @@ -20,12 +20,13 @@ Help can be called for every executable using: dissect ^^^^^^^ -dissect is the main pipeline function, and only has 1 optional argument, which specifies the path to the :ref:`config.ini ` file. +dissect is the main pipeline function, and only has 2 optional arguments, which specifies the path to the :ref:`config.ini ` file and (optional) a path to a flowcell to process. .. code-block:: console + dissect dissect -c config.ini - dissect --configfile config.ini + dissect -f /path/to/flowcell If no argument is specified, dissect looks for the config file in this path: @@ -41,8 +42,6 @@ wd40 wd40 is a set of 'helper' tools, intended to make life slightly easier. At this point there are 3 helper functions. These are mainly work in progress, so don't expect to much from these (yet). #. rel -#. cat -#. diag rel --- @@ -52,35 +51,10 @@ It can either be ran without arguments, which assumes the current working direct .. code-block:: console - wd40 rel wd40 rel path/to/folder Upon execution the fraction of files that have their rights changed will be printed per folder within a project. -cat ---- - -*cat* can be used to concatenate fastq files from 2 different flow cells. It requires three arguments: - -#. -f / --flowcells: paths to flowcells. At least 2 are required. -#. -p / --project: one project (and no more). -#. -o / --output: the output folder to write into. - -As a result, the fastq files in the specified project will be combined (cat'ed) and written into the output folder. - -Only use this if you know what you are doing, usually this is not really nice on the filesystem. - -diag ----- - -*diag* attempts to diagnose barcode issues. Currently work in process. It's usage is similar to *rel*: - -.. code-block:: console - - wd40 diag - wd40 diag path/to/folder - - .. _email: email @@ -88,20 +62,18 @@ email *email* notifies the :ref:`internal ` end user of a released project. It takes a project folder as a positional argument, and has a couple of other options: +#. --configfile: path to a configfile (default = ~/configs/dissectBCL_prod.ini) #. --notGood: flag that omits 'quality was good' string in the email. #. --analysis: flag that specifies that `BRB ` did an analysis for this project #. --cc: argument to include another email address in cc. #. --comment: include a string in the email -#. --noGalaxy: Obsolete. #. --fromPerson: Name of the person taking care of the data. (e.g. Max) #. --fromEmail: Email address of the person taking care of the data. #. --fromSignature: path to a txt file with an email signature #. --toEmail: email of the receiver. #. --toName: name of the receiver. -This executable will use the :ref:`userList ` from the parkour section in the config.ini file to infer --toEmail and --toName from the project folder argument. -In some cases there can be clashes though (e.g. duplicate last names), which'd require you to explicitely state --toEmail and --toName. - +The end user will be inferred by either setting it explicitely (--toEmail), or if not specified by querying parkour. Since this command is used quite often, it can be beneficial to alias this command to something relevant for you: .. code-block:: console diff --git a/docs/overview.rst b/docs/overview.rst index dc48c22..e714533 100644 --- a/docs/overview.rst +++ b/docs/overview.rst @@ -7,7 +7,7 @@ Find unprocessed flowcells. Upon execution, the pipeline will look for unprocessed flowcells. An unprocessed flowcell has two characteristics: - - a directory in *config[Dirs][baseDir]* that contains an *RTAComplete.txt* file. + - a directory in *config[Dirs][baseDir]* that contains an *RTAComplete.txt* file and an *CopyComplete.txt* file. - no matching directories under *config[Dirs][outputDir]* that contain either a *fastq.made* file, or a *communication.done* file Note that we split up a flowcell in lanes whenever we can (you can usually set a higher MisMatchIndex that way, retrieving more reads/sample). @@ -24,26 +24,21 @@ and in *config[Dirs][outputDir]* 220101_A00000_0000_AXXXXXXXXX_lanes_1 220101_A00000_0000_AXXXXXXXXX_lanes_2 -if *fastq.made* or *communication.done* exists in **either** the first or the second folder, 220101_A00000_0000_AXXXXXXXXX will be set as **processed**. +if *fastq.made* or *communication.done* exists in **either** the first or the second folder, 220101_A00000_0000_AXXXXXXXXX will be considered as **processed**. This is important in case you need to re-run flowcells: All flags need to be removed for **both** folders. demultiplex unprocessed flowcells. ---------------------------------- -Once an unprocessed flowcell is found, a couple of steps are done before we start demultiplexing: +Once an unprocessed flowcell is found, a couple of steps are performed. 1. Initiate a logfile *config[Dirs][logDir]* - 2. create a *flowcell class* - 3. create a *sampleSheet class* - 4. run bcl-convert - 5. run post processing steps - 1. fastqc - 2. kraken - 3. clumpify - 4. multiqc - 6. copy over the data to the *periphery* or upload to fexsend - 7. create a *drhouse class* and communicate results. - + 2. create the *flowcell class* + 3. prepConvert() - determine mismatches and masking. + 4. demux() - run demultiplexing with bclconvert. + 5. postmux() - run renaming of projects, clumping, fastqc, kraken, multiqc and md5sum calculation. + 6. fakenews() - upload project via fexsend (if applicable), collate quality metrics, create and send email. + 7. organiseLogs() - dump out configs and settings to the outLanes. kraken ------ @@ -65,38 +60,4 @@ The idea behind this is to ensure that hits against *vulgar name 1* won't give h This is used to mask rRNA sequences, but can actually be used for anything. Finally the taxmap is a dictionary with taxonomy names as keys, and a list with `[taxid, parent taxid, taxonomic rank]` as values. -A custom one is created as to not rely on a full NCBI taxonomy database dump. - -classes -------- - -flowcell class -^^^^^^^^^^^^^^ - -This will be initiated almost immediately after a new flowcell is found. -It will contain the paths taken from the config file and set the paths to samplesheet.csv & runInfo.xml. -A check is ran that all these paths exist, and runinfo is parsed as well. - -samplesheet class -^^^^^^^^^^^^^^^^^ - -After the flowcell class is made, the samplesheet class is created. -Here: - -- the samplesheet is read -- parkour is queried -- the samplesheet df & parkour df are merged -- the decision is made to split up a flowcell in lanes or not - - -drhouse class -^^^^^^^^^^^^^ - -For every lane outputted, a drHouse class is made that contains information about the run: - -- undetermined number of reads -- undetermined barcodes -- diskspace free -- etc... - -Here the final email to notify about the run is also created. +A custom one is created as to not rely on a full NCBI taxonomy database dump. \ No newline at end of file diff --git a/docs/usage.rst b/docs/usage.rst index 6edf9ac..b0f5cb5 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -10,7 +10,6 @@ There are a couple of pre-requisites 2. `BCL-convert `_ 3. A running `parkour `_ instance with API access. - To install dissectBCL, first clone the repository: .. code-block:: console @@ -37,13 +36,13 @@ activate the environment and pip install dissectBCL conda activate dissectBCL pip install . -The next task is to prepare the contaminome database. These files will get downloaded so make sure to run the command on a node that has internet access. -Note that the contaminome.yml file is included in the repository. Note that it takes a while to create this database, and that you need quite a lot of memory. -The final footprint of this database is around 20GB. +The next task is to prepare the contamination database. These files will get downloaded so make sure you have internet access. +Note that the contaminome.yml file is included in the repository. Note that it takes a while to create this database, and that you need quite a lot of memory (~20GB) +The final footprint of this database is around 30GB. .. code-block:: console - contam -c contaminome.yml -o /path/to/folder + contam --threads 10 -c contaminome.yml -o /path/to/folder Next task is to set up the ini file. A template is provided in the repository (*dissectBCL.ini*). It is *necessary* that all variables are filled in appropriately. By default the pipeline expects this ini file to be available under: @@ -72,13 +71,39 @@ or with a custom configfile location: dissect -c /path/to/dissectBCL.ini -Note that to run persistently on a server, dissect should be run from tmux or using `nohup dissect &`. -All other customisations have to happen in the ini file. Once running, the pipeline will check every hour. -Forcing a check can be done by killing the HUP: +Forcing to run a specific flowcell can also be done via the command line: .. code-block:: console - killall -HUP dissect + dissect -c /path/to/dissectBCL.ini -f /full/path/to/flowcell/directory + + +API +--- + +As of 0.3.0 a flow cell can be processed purely over a python shell: + +.. code-block:: python + + from dissectBCL.dissect import createFlowcell + f = createFlowcell("/path/to/config.ini", "/path/to/flowcell/") + f.prepConvert() + f.demux() + f.postmux() + f.fakenews() + f.organiseLogs() + +By default the logs are printed to stdout, but you can move them to a file as well. + +.. code-block:: python + + from dissectBCL.dissect import createFlowcell + f = createFlowcell("/path/to/config.ini", "/path/to/flowcell/", logFile = "/path/to/logfile") + f.prepConvert() + f.demux() + f.postmux() + f.fakenews() + f.organiseLogs() Hands-on @@ -108,14 +133,14 @@ The folders in the *periphery* can be released by running: .. code-block:: console - wd40 rel + wd40 rel /path/to/outLane/folder -in the outlane folders. -once this is done, the end user can be notified using +The release changes permissions to 750, and pushes back to parkour that the flow cell has been released. +Finally, you can notify the end user with the email functionality. .. code-block:: console - email + email -h Barcode issues ^^^^^^^^^^^^^^ @@ -125,11 +150,6 @@ Often, the biggest issues encountered will be wrong barcodes. An indication of t - high undetermined indices Entry points here would be the email received, cross-referenced with outlanefolder/Reports/Top_Unknown_Barcodes.csv and outlanefolder/demuxSheet.csv -You could get additional information by running - -.. code-block:: console - - wd40 diag Identify what (and if) changes can be made, backup the generated demuxSheet, and make changes accordingly. After the changes have been made in the demuxSheet: @@ -146,7 +166,7 @@ remove all the flags: - postmux.done - renamed.done -and rerun dissectBCL. Note that an existing demuxSheet in the folder won't be overwritten, allowing you to jump in. +and rerun dissectBCL. Note that an existing demuxSheet in the folder won't be overwritten but used as provided. Issues with Parkour verification ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/env.yml b/env.yml index 1033acf..4a109af 100644 --- a/env.yml +++ b/env.yml @@ -2,29 +2,13 @@ channels: - conda-forge - bioconda dependencies: - - python =3.10 - - flake8 >=4.0 + - python >=3.10 - matplotlib >=3.4 - pandas >=2.0 - - rich >=12.5 - - requests >=2.28 - - coverage >=6.4 - - pytest >=7.1 - - pytest-cov >=3.0 - - tabulate >=0.8 - - dominate >=2.7.0 - - ruamel.yaml >=0.17 - - pbr >=5.10 - - rich-click >=1.5 - biopython >=1.7 - - bbmap >= 38.98 - - multiqc >= 1.21 - - fastqc >= 0.11.9 - - kraken2 >= 2.1.2 - - blast >= 2.13.0 - - bedtools >= 2.30.0 - - pip - - pip: - - interop >=1.1 - - pytest-datafiles >=2.0 - - pre-commit + - bbmap >=38.98 + - multiqc >=1.21 + - fastqc >=0.11.9 + - kraken2 >=2.1.2 + - blast >=2.13.0 + - bedtools >=2.30.0 \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..de80b6f --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,44 @@ +[build-system] +requires = ["setuptools >= 61.0", "setuptools_scm>=8"] +build-backend = "setuptools.build_meta" + +[project] +name = "dissectBCL" +description = "demuliplexing illumina data" +readme = "README.md" +dynamic = ["version"] +keywords = ["demultiplexing", "illumina"] +authors = [ + {name = "WardDeb", email = 'w@rddeboutte.com'}, + {name = "Bioinfo-Core MPI-IE"} +] +requires-python = ">=3.10" +dependencies = [ + "rich-click", + "ruamel.yaml", + "tabulate", + "dominate", + "interop" +] +[project.optional-dependencies] +dev = [ + "ruff", + "coverage", + "pytest", + "pytest-cov", + "pytest-datafiles" +] + +[project.scripts] +dissect = "dissectBCL.dissect:dissect" +wd40 = "wd40.wd40:cli" +email = "tools.emailProjectFinished:main" +contam = "tools.prep_contaminome:main" + +[tool.setuptools_scm] + +[tool.ruff] +exclude = [ + "docs", + "tests" +] \ No newline at end of file diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 8308220..0000000 --- a/setup.cfg +++ /dev/null @@ -1,23 +0,0 @@ -[metadata] -name = dissectBCL -description = demuliplexing illumina data -long_description = file: README.md -keywords = demultiplexing, illumina -license = MIT -url = https://www.github.com/maxplanck-ie/dissectBCL -author = WardDeb -author_email = deboutte@ie-freiburg.mpg.de - - -[options] -package_dir = - = src -python_requires = >=3.7 - - -[options.entry_points] -console_scripts = - dissect = dissectBCL.dissect:dissect - wd40 = wd40.wd40:cli - email = tools.emailProjectFinished:main - contam = tools.prep_contaminome:main diff --git a/setup.py b/setup.py deleted file mode 100644 index aa10fa0..0000000 --- a/setup.py +++ /dev/null @@ -1,6 +0,0 @@ -from setuptools import setup - -setup( - setup_requires=['pbr'], - pbr=True, -) diff --git a/src/dissectBCL/demux.py b/src/dissectBCL/demux.py index 75a9695..1f99f3b 100644 --- a/src/dissectBCL/demux.py +++ b/src/dissectBCL/demux.py @@ -1,27 +1,17 @@ -from dissectBCL.misc import joinLis, hamming, lenMask -from dissectBCL.misc import P5Seriesret, matchingSheets -from dissectBCL.fakeNews import mailHome -from dissectBCL.drHouse import differentialDiagnosis +from dissectBCL.misc import joinLis +from dissectBCL.misc import hamming +from dissectBCL.misc import lenMask +from dissectBCL.misc import P5Seriesret +from Bio.Seq import Seq from itertools import combinations +import logging +import numpy as np import os -from subprocess import Popen, PIPE -import sys -from pathlib import Path import pandas as pd -import numpy as np -import logging +from pathlib import Path import shutil -def hamming2Mismatch(minVal): - if minVal > 2 and minVal <= 4: - return 1 - elif minVal > 4: - return 2 - else: - return 0 - - def misMatcher(P7s, P5s): """ return the number of mismatches allowed in demux. @@ -40,6 +30,15 @@ def misMatcher(P7s, P5s): return mmDic +def hamming2Mismatch(minVal): + if minVal > 2 and minVal <= 4: + return 1 + elif minVal > 4: + return 2 + else: + return 0 + + def detMask(seqRecipe, sampleSheetDF, outputFolder): """ Determine the actual masking. @@ -194,11 +193,6 @@ def prepConvert(flowcell, sampleSheet): ss_dict = sampleSheet.ssDic[outputFolder] ss = ss_dict['sampleSheet'] - # add check for bclconvert also here in addition to demux - # TODO maybe not possible? - # if (Path(outputDir / outputFolder / 'bclconvert.done')).exists(): - # continue - # determine mask, dualIx, PE, convertOpts, minP5, minP7 from seqRecipe (ss_dict['mask'], ss_dict['dualIx'], ss_dict['PE'], ss_dict['convertOpts'], minP5, minP7) = detMask( @@ -439,182 +433,152 @@ def parseStats(outputFolder, ssdf): return (newDF) -def demux(sampleSheet, flowcell, config): - logging.warning("Demux module") - # Double check for run failure - if flowcell.succesfullrun != 'SuccessfullyCompleted': - for outLane in sampleSheet.ssDic: - outputFolder = os.path.join( - flowcell.outBaseDir, outLane - ) - if not os.path.exists(outputFolder): - os.mkdir(outputFolder) - Path( - os.path.join(outputFolder, 'run.failed') - ).touch() - return ('sequencingfailed') - else: - for outLane in sampleSheet.ssDic: - logging.info("Demuxing {}".format(outLane)) - # Check outDir - outputFolder = os.path.join(flowcell.outBaseDir, outLane) - if not os.path.exists(outputFolder): - os.mkdir(outputFolder) - logging.info("{} created.".format(outputFolder)) - else: - logging.info( - "{} already exists. Moving on.".format(outputFolder) - ) - if flowcell.succesfullrun != 'SuccessfullyCompleted': - print("In failure if.") - Path( - os.path.join(outputFolder, 'run.failed') - ).touch() - mailHome( - "{} ignored".format(flowcell.name), - "RunCompletionStatus is not successfullycompleted.\n" + - "Marked for failure and ignored for the future.", - config, - toCore=True - ) - break - # Write the demuxSheet in the outputfolder - demuxOut = os.path.join(outputFolder, "demuxSheet.csv") - # Don't remake if demuxSheet exist - if not os.path.exists(demuxOut): - logging.info("Writing demuxSheet for {}".format(outLane)) - writeDemuxSheet( - demuxOut, - sampleSheet.ssDic[outLane], - sampleSheet.laneSplitStatus - ) - else: - logging.warning( - "demuxSheet for {} already exists!".format(outLane) - ) - man_mask, man_df, man_dualIx, man_mmdic = readDemuxSheet( - demuxOut - ) - if ( - sampleSheet.ssDic[outLane]['mismatch'] != man_mmdic - ): - logging.info( - "mismatch dic is changed from {} into {}".format( - sampleSheet.ssDic[outLane]['mismatch'], - man_mmdic - ) - ) - sampleSheet.ssDic[outLane]['mismatch'] = man_mmdic - # if mask is changed, update: - # Mask - if ( - 'mask' in sampleSheet.ssDic[outLane] - and man_mask != sampleSheet.ssDic[outLane]['mask'] - ): - logging.info( - "Mask is changed from {} into {}.".format( - sampleSheet.ssDic[outLane]['mask'], - man_mask - ) - ) - sampleSheet.ssDic[outLane]['mask'] = man_mask - # dualIx status - if ( - 'dualIx' in sampleSheet.ssDic[outLane] - and man_dualIx != sampleSheet.ssDic[outLane]['dualIx'] - ): - logging.info( - "dualIx is changed from {} into {}.".format( - sampleSheet.ssDic[outLane]['dualIx'], - man_dualIx - ) - ) - sampleSheet.ssDic[outLane]['dualIx'] = man_dualIx +def compareDemuxSheet(ssDic, demuxSheet): + ''' + a demuxSheet already exists, but we inferred demux parameters from the original data. + This happens in case we want to override some settings. + This function updates these parameters if necessary + Parameters: + - ssDic: Dictionary with parameters from a specific outLane, comes from sampleSheet class. + - demuxSheet: Path to the existing demuxSheet.csv + ''' + man_mask, man_df, man_dualIx, man_mmdic = readDemuxSheet( + demuxSheet + ) + # mmdic + if ssDic['mismatch'] != man_mmdic: + logging.warning(f"Demux - mmdic set as {man_mmdic}") + ssDic['mismatch'] = man_mmdic + # mask + if 'mask' in ssDic and man_mask != ssDic['mask']: + logging.warning(f"Demux - mask set as {man_mask}") + ssDic['mask'] = man_mask + # dualIx + if 'dualIx' in ssDic and man_dualIx != ssDic['dualIx']: + logging.warning(f"Demux - dualIx set as {man_dualIx}") + ssDic['dualIx'] = man_dualIx + # sampleSheet + ssDic['sampleSheet'] = matchingSheets( + ssDic['sampleSheet'], + man_df + ) + # P5RC + ssDic['P5RC'] = False + if demuxSheet.with_suffix('.bak').exists(): + logging.warning("Demux - P5RC detected") + ssDic['P5RC'] = True - # sampleSheet - sampleSheet.ssDic[outLane]['sampleSheet'] = matchingSheets( - sampleSheet.ssDic[outLane]['sampleSheet'], - man_df - ) - # Check for 'bak file' existence. - if os.path.exists(demuxOut + '.bak'): - sampleSheet.ssDic[outLane]['P5RC'] = True - else: - sampleSheet.ssDic[outLane]['P5RC'] = False - # Don't run bcl-convert if we have the touched flag. - if not os.path.exists( - os.path.join(outputFolder, 'bclconvert.done') - ): - # Run bcl-convert - bclOpts = [ - config['software']['bclconvert'], - '--output-directory', outputFolder, - '--force', - '--bcl-input-directory', flowcell.bclPath, - '--sample-sheet', demuxOut, - '--bcl-num-conversion-threads', "20", - '--bcl-num-compression-threads', "20", - "--bcl-sampleproject-subdirectories", "true", - ] - if not sampleSheet.laneSplitStatus: - bclOpts.append('--no-lane-splitting') - bclOpts.append('true') - logging.info("Starting BCLConvert") - logging.info(" ".join(bclOpts)) - bclRunner = Popen( - bclOpts, - stdout=PIPE - ) - exitcode = bclRunner.wait() - if exitcode == 0: - logging.info("bclConvert exit {}".format(exitcode)) - Path( - os.path.join(outputFolder, 'bclconvert.done') - ).touch() - if flowcell.sequencer == 'MiSeq': - if differentialDiagnosis( - outputFolder, - sampleSheet.ssDic[outLane]['dualIx'], - ): - logging.info("P5 RC triggered.") - # Purge existing reports. - logging.info("Purge existing Reports folder") - shutil.rmtree( - os.path.join(outputFolder, 'Reports') - ) - bclRunner = Popen( - bclOpts, - stdout=PIPE - ) - exitcode = bclRunner.wait() - logging.info( - "bclConvert P5fix exit {}".format(exitcode) - ) - # Update the sampleSheet with proper RC'ed indices. - sampleSheet.ssDic[outLane][ - 'sampleSheet' - ] = matchingSheets( - sampleSheet.ssDic[outLane]['sampleSheet'], - readDemuxSheet(demuxOut, what='df') - ) - sampleSheet.ssDic[outLane]['P5RC'] = True - else: - sampleSheet.ssDic[outLane]['P5RC'] = False - else: - logging.critical("bclConvert exit {}".format(exitcode)) - mailHome( - outLane, - 'BCL-convert exit {}. Investigate.'.format( - exitcode - ), - config, - toCore=True - ) - sys.exit(1) - - logging.info("Parsing stats for {}".format(outLane)) - sampleSheet.ssDic[outLane]['sampleSheet'] = parseStats( - outputFolder, - sampleSheet.ssDic[outLane]['sampleSheet'] - ) - return (0) + +def matchingSheets(autodf, mandf): + ''' + if demuxSheet is overwritten: + update indices from the manually-edited dataframe to the + given automatically generated DataFrame. + + :param autodf: Automatically generated sampleSheet DataFrame + from SampleSheet.ssDic[lane]['samplesheet']. + :param mandf: Manually edited samplesheet (from demuxSheet.csv in + output lane directory) + :type autodf: pandas.DataFrame + :type mandf: pandas.DataFrame + :returns: updated autodf with indices + ''' + # if there are no indices, just return the same autodf + if 'index' not in autodf.columns: + return autodf + + if len(autodf.index) != len(mandf.index): + logging.warning( + "Demux - number of samples changed in overwritten demuxSheet !" + ) + + dualIx = 'index2' in list(mandf.columns) + + for index, row in mandf.iterrows(): + sample_ID = row['Sample_ID'] + index = row['index'] + if dualIx: + index2 = row['index2'] + # grab the index in the autodf. + pdIx = autodf[autodf['Sample_ID'] == sample_ID].index + if dualIx: + if autodf.loc[pdIx, 'index'].values != index: + oriP7 = autodf.loc[pdIx, 'index'].values[0] + logging.debug(f"Demux - Changing P7 {oriP7} to {index} for {sample_ID}") + autodf.loc[pdIx, 'index'] = index + autodf.loc[pdIx, 'I7_Index_ID'] = np.nan + if autodf.loc[pdIx, 'index2'].values != index2: + oriP5 = autodf.loc[pdIx, 'index2'].values[0] + logging.debug(f"Demux - Changing P5 {oriP5} to {index2} for {sample_ID}") + autodf.loc[pdIx, 'index2'] = index2 + autodf.loc[pdIx, 'I5_Index_ID'] = np.nan + else: + # check index1, set index2 to na + if autodf.loc[pdIx, 'index'].values != index: + oriP7 = autodf.loc[pdIx, 'index'].values[0] + logging.debug(f"Demux - Changing P7 {oriP7} to {index} for {sample_ID}") + autodf.loc[pdIx, 'index'] = index + # change type as well! + autodf.loc[pdIx, 'I7_Index_ID'] = np.nan + # it's not dualIx, so set index2/I5_Index_ID to nan. + if 'index2' in list(autodf.columns): + autodf.loc[pdIx, 'index2'] = np.nan + if 'I5_Index_ID' in list(autodf.columns): + autodf.loc[pdIx, 'I5_Index_ID'] = np.nan + return (autodf) + + +def evalMiSeqP5(outPath, dualIx): + ''' + Evaluates MiSeq runs if P5's need to be RC'ed. + Takes the path for an outlane, + find out if all samples are empty + if that is the case, and the run is dualindexed, + rerun bclConvert with all P5s RC'ed. + ''' + # Known barcodes + KBCPath = os.path.join( + outPath, + 'Reports', + 'Demultiplex_Stats.csv' + ) + kbcDF = pd.read_csv(KBCPath) + # Test if > 90% of samples are virtually empty. + numLowreadSamples = len(kbcDF[kbcDF['# Reads'] < 1000]) + totalSamples = len(kbcDF[kbcDF['SampleID'] != 'Undetermined']) + if not numLowreadSamples/totalSamples == 1: + return False + logging.warning( + 'Demux - EvalP5: More then 90% samples empty. Attempting to salvage by RC the P5.' + ) + if not dualIx: # Only RC P5 operations for now. + return False + + # Read demuxSheet + demuxSheetPath = Path(outPath, 'demuxSheet.csv') + demuxSheet = [] + with open(demuxSheetPath) as f: + headStatus = True + for line in f: + if 'Sample_ID' in line.strip(): + headStatus = False + colnames = line.strip().split(',') + demuxSheet.append(colnames) + if headStatus: + demuxSheet.append(line.strip().split(',')) + else: + if 'Sample_ID' not in line.strip(): + demuxSheetLine = line.strip().split(',') + ixPos = colnames.index('index2') + oldIx = demuxSheetLine[ixPos] + newIx = str(Seq(oldIx).reverse_complement()) + demuxSheetLine[ixPos] = newIx + demuxSheet.append(demuxSheetLine) + shutil.move( + demuxSheetPath, + demuxSheetPath.with_suffix('.bak') + ) + with open(demuxSheetPath, 'w') as f: + for _l in demuxSheet: + f.write(','.join(_l) + '\n') + return True diff --git a/src/dissectBCL/dissect.py b/src/dissectBCL/dissect.py index 4f61a45..db9e7c4 100644 --- a/src/dissectBCL/dissect.py +++ b/src/dissectBCL/dissect.py @@ -1,17 +1,14 @@ -from dissectBCL import fakeNews, misc -from dissectBCL.classes import sampleSheetClass, flowCellClass -from dissectBCL.demux import prepConvert, demux -from dissectBCL.postmux import postmux -from dissectBCL.drHouse import initClass -from dissectBCL.fakeNews import mailHome -from rich import print, inspect -import os +from dissectBCL.misc import getNewFlowCell +from dissectBCL.misc import getConf +from dissectBCL.flowcell import flowCellClass +from importlib.metadata import version import logging +import os from pathlib import Path +from rich import print, inspect import rich_click as click -from importlib.metadata import version from time import sleep - +import sys @click.command( context_settings=dict( @@ -27,17 +24,24 @@ help='specify a custom ini file.', show_default=True ) -def dissect(configfile): +@click.option( + "-f", + "--flowcellpath", + required=False, + default=None, + help='specify a full path to a flow cell to process. Should be pointing to a directory written by an Illumina sequencer' +) +def dissect(configfile, flowcellpath): ''' define config file and start main dissect function. ''' print("This is dissectBCL version {}".format(version("dissectBCL"))) print("Loading conf from {}".format(configfile)) - config = misc.getConf(configfile) - main(config) + config = getConf(configfile) + main(config, flowcellpath) -def main(config): +def main(config, flowcellpath): ''' every hour checks for a new flow cell. if new flowcell: @@ -51,16 +55,11 @@ def main(config): # Set pipeline. while True: # Reload setlog - flowcellName, flowcellDir = misc.getNewFlowCell(config) + flowcellName, flowcellDir = getNewFlowCell(config, flowcellpath) if flowcellName: - # set exit stats - exitStats = {} # Define a logfile. - logFile = os.path.join( - config['Dirs']['flowLogDir'], - flowcellName + '.log' - ) + logFile = Path(config['Dirs']['flowLogDir'], flowcellName + '.log') # initiate log logging.basicConfig( @@ -71,111 +70,45 @@ def main(config): force=True ) # Set flowcellname in log. - logging.info("Log Initiated - flowcell:{}, filename:{}".format( - flowcellName, - logFile - )) + logging.info(f"Log Initiated - flowcell:{flowcellName}, filename:{logFile}") + print(f"Logfile set as {logFile}") # Include dissectBCL version in log - logging.info("dissectBCL - version {}".format( - version("dissectBCL") - )) + logging.info(f"dissectBCL - version {version("dissectBCL")}") # Include software versions in log for lib in config['softwareVers']: - logging.debug( - "{} = {}".format( - lib, - config['softwareVers'][lib] - ) - ) - # Create classes. - flowcell = flowCellClass( - name=flowcellName, - bclPath=flowcellDir, - inBaseDir=config['Dirs']['baseDir'], - outBaseDir=config['Dirs']['outputDir'], - origSS=os.path.join(flowcellDir, 'SampleSheet.csv'), - runInfo=os.path.join(flowcellDir, 'RunInfo.xml'), - runCompStat=os.path.join( - flowcellDir, 'RunCompletionStatus.xml' - ), - logFile=logFile, - config=config - ) - inspect(flowcell) - - # Parse sampleSheet information. - sampleSheet = sampleSheetClass( - flowcell.origSS, - flowcell.lanes, - config - ) - inspect(sampleSheet) - exitStats['premux'] = prepConvert( - flowcell, - sampleSheet, - ) + logging.debug(f"{lib} = {config['softwareVers'][lib]}") - # Start demultiplexing. - exitStats['demux'] = demux( - sampleSheet, - flowcell, - config - ) - inspect(sampleSheet) - # Break if the sequencing failed. - if not exitStats['demux'] == 'sequencingfailed': - # postmux - exitStats['postmux'] = postmux( - flowcell, - sampleSheet, - config - ) - - # transfer data - for outLane in sampleSheet.ssDic: - # Copy over files. - transferTime, shipDic = fakeNews.shipFiles( - os.path.join( - flowcell.outBaseDir, - outLane - ), - config - ) - exitStats[outLane] = shipDic - # Push stats to parkour. - exitStats[outLane]['pushParkour'] = fakeNews.pushParkour( - flowcell.flowcellID, - sampleSheet, - config, - flowcell.bclPath - ) - # Create diagnosis + parse QC stats - drHouse = initClass( - os.path.join( - flowcell.outBaseDir, - outLane - ), - flowcell.startTime, - sampleSheet.flowcell, - sampleSheet.ssDic[outLane], - transferTime, - exitStats, - config['Dirs']['baseDir'] - ) - inspect(drHouse) - # Create email. - subject, _html = drHouse.prepMail() - # Send it. - mailHome(subject, _html, config) - Path( - os.path.join( - flowcell.outBaseDir, - outLane, - 'communication.done' - ) - ).touch() - # Fix logs. - fakeNews.organiseLogs(flowcell, sampleSheet) + # Create class. + flowcell = flowCellClass(name=flowcellName, bclPath=flowcellDir, logFile=logFile, config=config) + # Run workflow + flowcell.prepConvert() + flowcell.demux() + flowcell.postmux() + flowcell.fakenews() + flowcell.organiseLogs() + inspect(flowcell) else: print("No flowcells found. Go back to sleep.") sleep(60*60) + +def createFlowcell(config, fpath, logFile = None): + config = getConf(config) + flowcellName, flowcellDir = getNewFlowCell(config, fpath) + if not logFile: + logging.basicConfig( + stream=sys.stdout, + level="DEBUG", + format="%(levelname)s %(asctime)s %(message)s", + filemode='a', + force=True + ) + logFile = 'STDOUT' + else: + logging.basicConfig( + filename=logFile, + level="DEBUG", + format="%(levelname)s %(asctime)s %(message)s", + filemode='a', + force=True + ) + return flowCellClass(name=flowcellName, bclPath=flowcellDir, logFile=logFile, config=config) \ No newline at end of file diff --git a/src/dissectBCL/drHouse.py b/src/dissectBCL/drHouse.py deleted file mode 100644 index 4e2df8f..0000000 --- a/src/dissectBCL/drHouse.py +++ /dev/null @@ -1,287 +0,0 @@ -from dissectBCL.classes import drHouseClass -from dissectBCL.misc import joinLis -import pandas as pd -import os -import shutil -import glob -import datetime -import logging -from Bio.Seq import Seq - - -def getDiskSpace(outputDir): - ''' - Return space free in GB - ''' - total, used, free = shutil.disk_usage(outputDir) - return (total // (2**30), free // (2**30)) - - -def matchOptdupsReqs(optDups, ssdf): - ''' - Takes a nested list (optDups) with: - [ - project, - sampleID, - sampleName, - optical_dups, - ] - Matches sampleID with ssdf, and gets gotten / req reads. - returns a nested list including req/got & got - this list is sorted by sampleID. - ''' - _optDups = [] - for lis in optDups: - sampleID = lis[1] - sampleName = lis[2] - req = ssdf[ - ssdf['Sample_ID'] == sampleID - ]['reqDepth'].values - got = ssdf[ - ssdf['Sample_ID'] == sampleID - ]['gotDepth'].values - reqvgot = float(got/req) - # isnull if sample is omitted from demuxsheet but in parkour. - if pd.isnull(got): - _optDups.append( - [ - lis[0], - sampleID, - sampleName, - lis[3], - 0, - 0 - ] # fill in zeroes - ) - else: - _optDups.append( - [ - lis[0], - sampleID, - sampleName, - lis[3], - round(reqvgot, 2), - int(got) - ] - ) - return (sorted(_optDups, key=lambda x: x[1])) - - -def differentialDiagnosis(outPath, dualIx): - ''' - Takes the path for an outlane, - find out if all samples are empty - if that is the case, and the run is dualindexed, - rerun bclConvert with all P5s RC'ed. - ''' - # Known barcodes - KBCPath = os.path.join( - outPath, - 'Reports', - 'Demultiplex_Stats.csv' - ) - kbcDF = pd.read_csv(KBCPath) - # Test if > 90% of samples are virtually empty. - numLowreadSamples = len(kbcDF[kbcDF['# Reads'] < 1000]) - totalSamples = len(kbcDF[kbcDF['SampleID'] != 'Undetermined']) - if not numLowreadSamples/totalSamples == 1: - return (False) - logging.warning( - 'More then 90% samples empty. Attempting to salvage by RC the P5.' - ) - if not dualIx: # Only RC P5 operations for now. - return (False) - - # Read demuxSheet - demuxSheetPath = os.path.join( - outPath, 'demuxSheet.csv' - ) - demuxSheet = [] - with open(demuxSheetPath) as f: - headStatus = True - for line in f: - if 'Sample_ID' in line.strip(): - headStatus = False - colnames = line.strip().split(',') - demuxSheet.append(colnames) - if headStatus: - demuxSheet.append(line.strip().split(',')) - else: - if 'Sample_ID' not in line.strip(): - demuxSheetLine = line.strip().split(',') - ixPos = colnames.index('index2') - oldIx = demuxSheetLine[ixPos] - newIx = str(Seq(oldIx).reverse_complement()) - demuxSheetLine[ixPos] = newIx - demuxSheet.append(demuxSheetLine) - shutil.move( - demuxSheetPath, - demuxSheetPath+'.bak' - ) - with open(demuxSheetPath, 'w') as f: - for _l in demuxSheet: - f.write(','.join(_l) + '\n') - return (True) - - -def initClass( - outPath, initTime, flowcellID, ssDic, transferTime, exitStats, solPath - ): - logging.info("init drHouse for {}".format(outPath)) - ssdf = ssDic['sampleSheet'] - barcodeMask = ssDic['mask'] - mismatch = " ".join( - [i + ': ' + str(j) for i, j in ssDic['mismatch'].items()] - ) - # Get undetermined - muxPath = os.path.join( - outPath, - 'Reports', - 'Demultiplex_Stats.csv' - ) - muxDF = pd.read_csv(muxPath) - totalReads = int(muxDF['# Reads'].sum()) - if len(muxDF[muxDF['SampleID'] == 'Undetermined']) == 1: - undReads = int( - muxDF[ - muxDF['SampleID'] == 'Undetermined' - ]['# Reads'].iloc[0] - ) - else: - undDic = dict( - muxDF[ - muxDF['SampleID'] == 'Undetermined' - ][['Lane', '# Reads']].values - ) - undStr = "" - for lane in undDic: - undStr += "Lane {}: {}% {}M, ".format( - lane, - round(100*undDic[lane]/totalReads, 2), - round(undDic[lane]/1000000, 2) - ) - undReads = undStr[:-2] - # topBarcodes - BCPath = os.path.join( - outPath, - 'Reports', - 'Top_Unknown_Barcodes.csv' - ) - bcDF = pd.read_csv(BCPath) - bcDF = bcDF.head(5) - BCs = [ - joinLis( - list(x), joinStr='+' - ) for x in bcDF.filter(like='index', axis=1).values - ] - BCReads = list(bcDF['# Reads']) - BCReadsPerc = list(bcDF['% of Unknown Barcodes']) - BCDic = {} - for entry in list( - zip(BCs, BCReads, BCReadsPerc) - ): - BCDic[entry[0]] = [round(float(entry[1])/1000000, 2), entry[2]] - # runTime - runTime = datetime.datetime.now() - initTime - # optDups - optDups = [] - for opt in glob.glob( - os.path.join( - outPath, - '*/*/*duplicate.txt' - ) - ): - project = opt.split('/')[-3].replace("FASTQC_", "") - sample = opt.split('/')[-1].replace(".duplicate.txt", "") - sampleID = opt.split('/')[-2].replace("Sample_", "") - with open(opt) as f: - dups = f.read() - dups = dups.strip().split() - if float(dups[1]) != 0: - optDups.append( - [ - project, - sampleID, - sample, - round(100*float(dups[0])/float(dups[1]), 2) - ] - ) - else: - optDups.append( - [ - project, - sampleID, - sample, - "NA" - ] - ) - IDprojectDic = pd.Series( - ssdf['Sample_Project'].values, - index=ssdf['Sample_ID'] - ).to_dict() - nameIDDic = pd.Series( - ssdf['Sample_Name'].values, - index=ssdf['Sample_ID'] - ).to_dict() - for sampleID in nameIDDic: - if not any(sampleID in sl for sl in optDups): - optDups.append( - [ - IDprojectDic[sampleID], - sampleID, - nameIDDic[sampleID], - 'NA' - ] - ) - optDups = matchOptdupsReqs(optDups, ssdf) - # optDups = matchIDtoName(optDups, ssdf) - # Fetch organism and fastqScreen - sampleDiv = {} - for screen in glob.glob( - os.path.join( - outPath, - '*/*/*.rep' - ) - ): - sampleID = screen.split('/')[-2].replace("Sample_", "") - sample = screen.split('/')[-1].replace('.rep', '') - - # samples with 0 reads still make an empty report. - # hence the try / except. - parkourOrg = str( # To string since NA is a float - ssdf[ssdf["Sample_ID"] == sampleID]['Organism'].values[0] - ) - try: - screenDF = pd.read_csv( - screen, sep='\t', header=None - ) - # tophit == max in column 2. - # ParkourOrganism - krakenOrg = screenDF.iloc[ - screenDF[2].idxmax() - ][5].replace(' ', '') - fraction = round( - screenDF[2].max()/screenDF[2].sum(), - 2 - ) - sampleDiv[sampleID] = [fraction, krakenOrg, parkourOrg] - except pd.errors.EmptyDataError: - sampleDiv[sampleID] = ['NA', 'None', parkourOrg] - - return (drHouseClass( - undetermined=undReads, - totalReads=totalReads, - topBarcodes=BCDic, - spaceFree_rap=getDiskSpace(outPath), - spaceFree_sol=getDiskSpace(solPath), - runTime=runTime, - optDup=optDups, - flowcellID=flowcellID, - outLane=outPath.split('/')[-1], - contamination=sampleDiv, - mismatch=mismatch, - barcodeMask=barcodeMask, - transferTime=transferTime, - exitStats=exitStats, - P5RC=ssDic['P5RC'] - )) diff --git a/src/dissectBCL/fakeNews.py b/src/dissectBCL/fakeNews.py index 2f22d96..3b2f7da 100644 --- a/src/dissectBCL/fakeNews.py +++ b/src/dissectBCL/fakeNews.py @@ -1,33 +1,28 @@ +from dissectBCL.misc import fetchLatestSeqDir +from dissectBCL.misc import fexUpload +from dissectBCL.misc import getDiskSpace +from dissectBCL.misc import joinLis +from dissectBCL.misc import matchOptdupsReqs +from dissectBCL.misc import sendMqcReports +from dissectBCL.misc import stripRights +from dissectBCL.misc import umlautDestroyer import datetime from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText -import requests -import pandas as pd -from dissectBCL.misc import retBCstr, retIxtype, retMean_perc_Q -from dissectBCL.misc import fetchLatestSeqDir, formatSeqRecipe -from dissectBCL.misc import umlautDestroyer, formatMisMatches from importlib.metadata import version +import interop +import json +import logging import os +import pandas as pd +from pathlib import Path +import requests +import ruamel.yaml import shutil import smtplib -import glob -import ruamel.yaml -import json -from subprocess import check_output, Popen import sys -import numpy as np -import interop -import logging -import pathlib -def getDiskSpace(outputDir): - ''' - Return space free in GB - ''' - total, used, free = shutil.disk_usage(outputDir) - return (total // (2**30), free // (2**30)) - def pullParkour(flowcellID, config): """ @@ -119,18 +114,19 @@ def pushParkour(flowcellID, sampleSheet, config, flowcellBase): # pushing out the 'Run statistics in parkour'. ''' we need: - - R1 > Q30 % - done - - R2 > Q30 % (if available) - done - - clusterPF (%) - done - - name (== laneStr) - done - - undetermined_indices (%) - done - - reads_pf (M) - can be obtained by parsing interop + - R1 > Q30 % + - R2 > Q30 % (if available) + - clusterPF (%) + - name (== laneStr) + - undetermined_indices (%) + - reads_pf (M) ''' + # Parse interop. iop_df = pd.DataFrame( interop.summary( interop.read( - flowcellBase + str(flowcellBase) ), 'Lane' ) @@ -144,12 +140,7 @@ def pushParkour(flowcellID, sampleSheet, config, flowcellBase): laneDict = {} for outLane in sampleSheet.ssDic: # Quality_Metrics.csv contains all the info we need. - qMetPath = os.path.join( - config['Dirs']['outputDir'], - outLane, - 'Reports', - 'Quality_Metrics.csv' - ) + qMetPath = Path(config['Dirs']['outputDir'], outLane, 'Reports', 'Quality_Metrics.csv') qdf = pd.read_csv(qMetPath) # If a flowcell is split, qMetPath contains only Lane 1 e.g. # If not, all lanes sit in qdf @@ -172,18 +163,18 @@ def pushParkour(flowcellID, sampleSheet, config, flowcellBase): ]["YieldQ30"].sum() / subdf['YieldQ30'].sum() * 100, 2 ) - Q30Dic = subdf.groupby("ReadNumber")['% Q30'].mean().to_dict() + Q30Dic = subdf[subdf['SampleID'] != 'Undetermined'].groupby("ReadNumber")['% Q30'].mean().to_dict() for read in Q30Dic: if 'I' not in str(read): readStr = 'read_{}'.format(read) laneDict[laneStr][readStr] = round(Q30Dic[read]*100, 2) laneDict[laneStr]["cluster_pf"] = round( - subdf["YieldQ30"].sum()/subdf["Yield"].sum() * 100, + subdf[subdf['SampleID'] != 'Undetermined']["YieldQ30"].sum()/subdf[subdf['SampleID'] != 'Undetermined']["Yield"].sum() * 100, 2 ) laneDict[laneStr]["name"] = laneStr d['matrix'] = json.dumps(list(laneDict.values())) - logging.info("Pushing FID with dic {} {}".format(FID, d)) + logging.info(f"fakenews - pushParkour - Pushing FID with dic {FID} {d}") pushParkStat = requests.post( config.get("parkour", "URL") + '/api/run_statistics/upload/', auth=( @@ -193,177 +184,13 @@ def pushParkour(flowcellID, sampleSheet, config, flowcellBase): data=d, verify=config['parkour']['cert'] ) - logging.info("ParkourPush return {}".format(pushParkStat)) + logging.info("fakenews - ParkourPush - return {}".format(pushParkStat)) return pushParkStat -def multiQC_yaml(config, flowcell, ssDic, project, laneFolder): - ''' - This function creates: - - config yaml, containing appropriate header information - - data string adding gen stats - - data string containing our old seqreport statistics. - Keep in mind we delete these after running mqc - ''' - ssdf = ssDic['sampleSheet'][ - ssDic['sampleSheet']['Sample_Project'] == project - ] - # data string genstats - mqcData = "# format: 'tsv'\n" - mqcData += "# plot_type: 'generalstats'\n" - mqcData += "# pconfig: \n" - mqcData += "Sample_Name\tSample_ID\tRequested\n" - reqDict = {} - reqsMax = 0 - for sample in list(ssdf['Sample_Name'].unique()): - sampleID = ssdf[ssdf['Sample_Name'] == sample]['Sample_ID'].values[0] - if ssdf[ssdf['Sample_Name'] == sample]['reqDepth'].values[0] == 'NA': - reqDepth = 'NA' - else: - reqDepth = float( - ssdf[ssdf['Sample_Name'] == sample]['reqDepth'].values[0] - ) - reqDepth = round(reqDepth/1000000, 2) - if reqDepth != 'NA': - if reqDepth > reqsMax: - reqsMax = reqDepth - sampleLis = glob.glob( - os.path.join( - laneFolder, - '*/*/' + sample + '*fastq.gz' - ) - ) - purgeSampleLis = [] - for i in sampleLis: - if 'optical' not in i: - purgeSampleLis.append(i) - for fullfqFile in purgeSampleLis: - fqFile = fullfqFile.split('/')[-1] - sampleBase = fqFile.replace(".fastq.gz", "") - reqDict[sampleBase] = [sampleID, reqDepth] - for sample in reqDict: - mqcData += "{}\t{}\t{}\n".format( - sample, reqDict[sample][0], reqDict[sample][1] - ) - # seqreport Data - seqreportData = "" - for index, row in ssdf.iterrows(): - if seqreportData == "": - meanQ_headers, Meanq = retMean_perc_Q(row, returnHeader=True) - percq30_headers, perc30 = retMean_perc_Q( - row, returnHeader=True, qtype='percQ30' - ) - seqreportData += \ - "\tSample ID\tLane\t{}\t{}\n".format( - meanQ_headers, - percq30_headers - ) - seqreportData += "{}\t{}\t{}\t{}\t{}\n".format( - row['Sample_Name'], - row['Sample_ID'], - "L" + str(row['Lane']), - Meanq, - perc30 - ) - else: - Meanq = retMean_perc_Q(row) - perc30 = retMean_perc_Q(row, qtype='percQ30') - seqreportData += "{}\t{}\t{}\t{}\t{}\n".format( - row['Sample_Name'], - row['Sample_ID'], - "L" + str(row['Lane']), - Meanq, - perc30 - ) - - # Index stats. - indexreportData = "" - # indexreportData = "\tSample ID\tBarcodes\tBarcode types\n" - for index, row in ssdf.iterrows(): - if indexreportData == "": - indexreportData += "\tSample ID\t{}\t{}\n".format( - retBCstr(row, returnHeader=True), - retIxtype(row, returnHeader=True) - ) - indexreportData += "{}\t{}\t{}\t{}\n".format( - row['Sample_Name'], - row['Sample_ID'], - retBCstr(row), - retIxtype(row) - ) - - # config yaml - # libraryTypes - libTypes = ', '.join(list( - ssdf['Library_Type'].fillna('None').unique() - )) - # indexTypes - ixTypes = ', '.join(list( - ssdf["indexType"].fillna('None').unique() - )) - # Protocols - protTypes = ', '.join(list( - ssdf["Description"].fillna('None').unique() - )) - # Organisms - orgs = ', '.join(list( - ssdf["Organism"].fillna('None').unique() - )) - # Resequencing runs are screwed up (e.g. don't contain the samples) - # switch total requested to NA - try: - sumReqRound = str( - round((ssdf['reqDepth'].sum())/1000000, 0) - ) - except TypeError: - sumReqRound = 'NA' - - mqcyml = { - "title": project, - "custom_logo": config["misc"]["mpiImg"], - "custom_logo_url": "https://www.ie-freiburg.mpg.de/", - "custom_logo_title": "MPI-IE", - "show_analysis_paths": False, - "show_analysis_time": False, - "fastqscreen_simpleplot": False, - "log_filesize_limit": 2000000000, - "report_header_info": [ - {"Contact E-mail": config["communication"]["bioinfoCore"]}, - {"Flowcell": flowcell.name}, - {"Sequencer Type": flowcell.sequencer}, - {"Read Lengths": formatSeqRecipe(flowcell.seqRecipe)}, - {"Demux. Mask": ssDic["mask"]}, - {"Mismatches": formatMisMatches(ssDic["mismatch"])}, - {"dissectBCL version": "{}".format(version("dissectBCL"))}, - {"bcl-convert version": config["softwareVers"]["bclconvert"]}, - {"Library Type": libTypes}, - {"Library Protocol": protTypes}, - {"Index Type": ixTypes}, - {"Organism": orgs}, - {"Requested reads": sumReqRound}, - {"Received reads": str( - round( - (ssdf['gotDepth'].replace( - 'NA', np.nan - ).dropna().sum())/1000000, - 0 - ) - )} - ], - "section_comments": { - "kraken": config["misc"]['krakenExpl'] - } - - } - return (mqcyml, mqcData, seqreportData, indexreportData) - - def mailHome(subject, _html, config, toCore=False): mailer = MIMEMultipart('alternative') - mailer['Subject'] = '[{}] [{}] '.format( - config['communication']['subject'], - version('dissectBCL') - ) + subject + mailer['Subject'] = f"[{config['communication']['subject']}] [{version('dissectBCL')}] " + subject mailer['From'] = config['communication']['fromAddress'] if toCore: mailer['To'] = config['communication']['bioinfoCore'] @@ -390,170 +217,65 @@ def mailHome(subject, _html, config, toCore=False): def shipFiles(outPath, config): transferStart = datetime.datetime.now() shipDic = {} - outLane = outPath.split('/')[-1] + outLane = outPath.name # Get directories from outPath. - for projectPath in glob.glob( - os.path.join( - outPath, - 'Project*' - ) - ): - project = projectPath.split('/')[-1] + for projectPath in outPath.glob('Project*'): + project = projectPath.name shipDic[project] = 'No' - logging.info("Shipping {}".format(project)) + logging.info("fakenews - Shipping {}".format(project)) PI = project.split('_')[-1].lower().replace( "cabezas-wallscheid", "cabezas" ) - fqcPath = projectPath.replace("Project_", "FASTQC_Project_") + fqcPath = Path(str(projectPath).replace("Project_", "FASTQC_Project_")) if PI in config['Internals']['PIs']: - logging.info("Found {}. Shipping internally.".format(PI)) - fqc = fqcPath.split('/')[-1] - enduserBase = os.path.join( - fetchLatestSeqDir( - os.path.join( - config['Dirs']['piDir'], - PI - ), - config['Internals']['seqDir'] - ), - outLane - ) - if not os.path.exists(enduserBase): - os.mkdir(enduserBase, mode=0o750) - copyStat_FQC = "" - copyStat_Project = "" - if os.path.exists( - os.path.join(enduserBase, fqc) - ): - shutil.rmtree( - os.path.join( - enduserBase, fqc - ) - ) - copyStat_FQC = "Replaced" - shutil.copytree( - fqcPath, - os.path.join( - enduserBase, - fqc + # Shipping + fqc = fqcPath.name + enduserBase = fetchLatestSeqDir(config, PI) / outLane + logging.info(f"fakenews - Found {PI}. Shipping internally to {enduserBase}.") + enduserBase.mkdir(mode=0o750, exist_ok=True) + replaceStatus = 'Copied' + if (enduserBase / fqc).exists(): + shutil.rmtree(enduserBase / fqc) + replaceStatus = 'Replaced' + try: + shutil.copytree(fqcPath, enduserBase / fqc) + except OSError: + logging.critical(f"Copying {fqcPath} into {enduserBase} failed.") + mailHome( + outPath.name, + f"{fqcPath} copying into {enduserBase} failed.", + config ) - ) - if copyStat_FQC != "Replaced": - copyStat_FQC = "Copied" - if os.path.exists( - os.path.join(enduserBase, project) - ): - shutil.rmtree( - os.path.join( - enduserBase, - project - ) + sys.exit() + if (enduserBase / project).exists(): + shutil.rmtree(enduserBase / project) + replaceStatus = 'Replaced' + try: + shutil.copytree(projectPath, enduserBase / project) + except OSError: + logging.critical(f"Copying {projectPath} into {enduserBase} failed.") + mailHome( + outPath.name, + f"{projectPath} copying into {enduserBase} failed.", + config ) - copyStat_Project = "Replaced" - shutil.copytree( - projectPath, - os.path.join( - enduserBase, - project - ) - ) - logging.info("Stripping group rights for {}".format(enduserBase)) - for r, dirs, files in os.walk(enduserBase): - for d in dirs: - os.chmod(os.path.join(r, d), 0o700) - for f in files: - os.chmod(os.path.join(r, f), 0o700) - if copyStat_Project != "Replaced": - copyStat_Project = "Copied" - if 'Replaced' in [copyStat_Project, copyStat_FQC]: - shipDic[project] = ['Transferred', "{}GB free".format( - getDiskSpace(enduserBase)[1] - )] - else: - shipDic[project] = ['Copied', "{}GB free".format( - getDiskSpace(enduserBase)[1] - )] + sys.exit() + # Strip rights + stripRights(enduserBase) + shipDic[project] = [replaceStatus, f"{getDiskSpace(enduserBase)[1]}GB free"] else: if not config['Internals'].getboolean('fex'): shipDic[project] = "Ignored( by config)" - logging.info( - "No fex upload for {} because of config".format(project) - ) + logging.info(f"fakenews - {project} not fex uploaded by config.") else: - shipDicStat = "Uploaded" - laneStr = fqcPath.split('/')[-2] - # If the same tarball is already present, replace it. - fexList = check_output( - [ - 'fexsend', - '-l', - config['communication']['fromAddress'] - ] - ).decode("utf-8").replace("\n", " ").split(' ') - logging.info("fexList: {}".format(fexList)) - tarBall = laneStr + '_' + project + '.tar' - if tarBall in fexList: - fexRm = [ - 'fexsend', - '-d', - tarBall, - config['communication']['fromAddress'] - ] - logging.info( - "Purging {} existing fex with:".format(project) - ) - logging.info("fexRm") - fexdel = Popen(fexRm) - fexdel.wait() - shipDicStat = "Replaced" - fexer = "tar cf - {} {} | fexsend -s {}.tar {}".format( - projectPath, - fqcPath, - laneStr + '_' + project, - config['communication']['fromAddress'] + shipDic[project] = fexUpload( + outLane, project, config['communication']['fromAddress'], + (projectPath, fqcPath) ) - logging.info("Pushing {} to fex with:".format(project)) - logging.info(fexer) - os.system(fexer) - shipDic[project] = shipDicStat - # Ship multiQC reports. - ''' - seqFacdir/Sequence_Quality_yyyy/Illumina_yyyy/outlane - ''' - yrstr = '20' + outLane[:2] - seqFacDir = os.path.join( - config['Dirs']['seqFacDir'], - 'Sequence_Quality_{}'.format(yrstr), - 'Illumina_{}'.format(yrstr), - outLane - ) - logging.info(f"Using {seqFacDir} to populate samba dir.") - if not os.path.exists(seqFacDir): - pathlib.Path( - seqFacDir - ).mkdir(parents=True, exist_ok=True) - _mqcreports = glob.glob( - os.path.join(outPath, '*', '*multiqc_report.html') - ) - if not _mqcreports: - logging.warning(f"No multiqcreports found under {outLane}") - for qcRepo in _mqcreports: - # to seqfacdir - outqcRepo = os.path.join( - seqFacDir, qcRepo.split('/')[-2] + '_multiqcreport.html' - ) - logging.info(f"Copying {qcRepo} over to {outqcRepo}") - shutil.copyfile(qcRepo, outqcRepo) - # to bioinfoCoredir - outqcBioinfo = os.path.join( - config['Dirs']['bioinfoCoreDir'], - qcRepo.split('/')[-2] + '_multiqcreport.html' - ) - logging.info(f"Copying {qcRepo} over to {outqcBioinfo}") - shutil.copyfile(qcRepo, outqcBioinfo) + sendMqcReports(outPath, config['Dirs']) transferStop = datetime.datetime.now() transferTime = transferStop - transferStart - return (transferTime, shipDic) + return {'transfertime': transferTime, 'shipDic': shipDic} def organiseLogs(flowcell, sampleSheet): @@ -614,3 +336,146 @@ def organiseLogs(flowcell, sampleSheet): flowcellInfo = os.path.join(_logDir, 'flowcellInfo.yaml') with open(flowcellInfo, 'w') as f: yaml1.dump(dic1, f) + +# outPath, initTime, flowcellID, ssDic, transferTime, exitStats, solPath +def gatherFinalMetrics(outLane, flowcell): + logging.info(f"fakenews - gatherFinalMetrics - {outLane}") + outPath = flowcell.outBaseDir / outLane + ssDic = flowcell.sampleSheet.ssDic[outLane] + ssdf = ssDic['sampleSheet'] + barcodeMask = ssDic['mask'] + mismatch = " ".join( + [i + ': ' + str(j) for i, j in ssDic['mismatch'].items()] + ) + # Get undetermined + muxDF = pd.read_csv(outPath / 'Reports' / 'Demultiplex_Stats.csv') + totalReads = int(muxDF['# Reads'].sum()) + if len(muxDF[muxDF['SampleID'] == 'Undetermined']) == 1: + undReads = int( + muxDF[ + muxDF['SampleID'] == 'Undetermined' + ]['# Reads'].iloc[0] + ) + else: + undDic = dict( + muxDF[ + muxDF['SampleID'] == 'Undetermined' + ][['Lane', '# Reads']].values + ) + undStr = "" + for lane in undDic: + undStr += "Lane {}: {}% {}M, ".format( + lane, + round(100*undDic[lane]/totalReads, 2), + round(undDic[lane]/1000000, 2) + ) + undReads = undStr[:-2] + # topBarcodes + bcDF = pd.read_csv(outPath / 'Reports' / 'Top_Unknown_Barcodes.csv') + bcDF = bcDF.head(5) + BCs = [ + joinLis( + list(x), joinStr='+' + ) for x in bcDF.filter(like='index', axis=1).values + ] + BCReads = list(bcDF['# Reads']) + BCReadsPerc = list(bcDF['% of Unknown Barcodes']) + BCDic = {} + for entry in list( + zip(BCs, BCReads, BCReadsPerc) + ): + BCDic[entry[0]] = [round(float(entry[1])/1000000, 2), entry[2]] + # runTime + runTime = datetime.datetime.now() - flowcell.startTime + # optDups + optDups = [] + for opt in outPath.glob("*/*/*duplicate.txt"): + project = opt.parts[-3].replace("FASTQC_", "") + sample = opt.name.replace(".duplicate.txt", "") + sampleID = opt.parts[-2].replace("Sample_", "") + with open(opt) as f: + dups = f.read() + dups = dups.strip().split() + if float(dups[1]) != 0: + optDups.append( + [ + project, + sampleID, + sample, + round(100*float(dups[0])/float(dups[1]), 2) + ] + ) + else: + optDups.append( + [ + project, + sampleID, + sample, + "NA" + ] + ) + IDprojectDic = pd.Series( + ssdf['Sample_Project'].values, + index=ssdf['Sample_ID'] + ).to_dict() + nameIDDic = pd.Series( + ssdf['Sample_Name'].values, + index=ssdf['Sample_ID'] + ).to_dict() + for sampleID in nameIDDic: + if not any(sampleID in sl for sl in optDups): + optDups.append( + [ + IDprojectDic[sampleID], + sampleID, + nameIDDic[sampleID], + 'NA' + ] + ) + optDups = matchOptdupsReqs(optDups, ssdf) + # Fetch organism and kraken reports + sampleDiv = {} + for screen in outPath.glob("*/*/*.rep"): + sampleID = screen.parts[-2].replace("Sample_", "") + sample = screen.name.replace('.rep', '') + + # samples with 0 reads still make an empty report. + # hence the try / except. + # 'mouse (GRCm39)' -> 'mouse' + parkourOrg = str( # To string since NA is a float + ssdf[ssdf["Sample_ID"] == sampleID]['Organism'].values[0] + ).split(' ')[0] + try: + screenDF = pd.read_csv( + screen, sep='\t', header=None + ) + # tophit == max in column 2. + # ParkourOrganism + krakenOrg = screenDF.iloc[ + screenDF[2].idxmax() + ][5].replace(' ', '') + fraction = round( + screenDF[2].max()/screenDF[2].sum(), + 2 + ) + sampleDiv[sampleID] = [fraction, krakenOrg, parkourOrg] + except pd.errors.EmptyDataError: + sampleDiv[sampleID] = ['NA', 'None', parkourOrg] + + return { + 'undetermined':undReads, + 'totalReads':totalReads, + 'topBarcodes':BCDic, + 'spaceFree_rap':getDiskSpace(outPath), + 'spaceFree_sol':getDiskSpace(flowcell.bclPath), + 'runTime':runTime, + 'optDup':optDups, + 'flowcellID':flowcell.flowcellID, + 'outLane':outLane, + 'contamination':sampleDiv, + 'mismatch':mismatch, + 'barcodeMask':barcodeMask, + 'transferTime': flowcell.transferTime, + 'exitStats': flowcell.exitStats, + 'P5RC':ssDic['P5RC'] + } \ No newline at end of file diff --git a/src/dissectBCL/classes.py b/src/dissectBCL/flowcell.py similarity index 58% rename from src/dissectBCL/classes.py rename to src/dissectBCL/flowcell.py index 76f6f54..5053169 100644 --- a/src/dissectBCL/classes.py +++ b/src/dissectBCL/flowcell.py @@ -1,23 +1,33 @@ -import os -import xml.etree.ElementTree as ET -import sys +from dissectBCL.demux import detMask, misMatcher +from dissectBCL.demux import writeDemuxSheet, readDemuxSheet, compareDemuxSheet +from dissectBCL.demux import evalMiSeqP5, parseStats +from dissectBCL.demux import matchingSheets +from dissectBCL.postmux import renameProject +from dissectBCL.postmux import validateFqEnds +from dissectBCL.postmux import qcs, clumper, kraken, md5_multiqc, moveOptDup from dissectBCL.fakeNews import pullParkour, mailHome -from dissectBCL.misc import umlautDestroyer -import pandas as pd +from dissectBCL.fakeNews import shipFiles, pushParkour +from dissectBCL.fakeNews import gatherFinalMetrics +from dissectBCL.misc import umlautDestroyer, P5Seriesret + import datetime -from tabulate import tabulate -from random import randint from dominate.tags import html, div, br import logging +import numpy as np +import pandas as pd +from pathlib import Path +from random import randint +import ruamel.yaml +import shutil +from subprocess import Popen, PIPE +import sys +from tabulate import tabulate +import xml.etree.ElementTree as ET class flowCellClass: - """This is a flowCell class, which contains: - - prior information set in the config file. - - inferred variables from the RunInfo.xml. - """ - # fileChecks + # Init - fileChecks def filesExist(self): """ Check if paths exist: @@ -34,19 +44,17 @@ def filesExist(self): self.inBaseDir, self.outBaseDir ]: - logging.info("Checking {}".format(f)) - if not os.path.exists(f): - logging.critical("{} doesn't exist. Exiting".format(f)) + logging.info(f"Init - Checking {f}") + if not f.exists(): + logging.critical(f"Init - {f} doesn't exist. Exiting") mailHome( self.name, - "{} does not exist. dissectBCL crashed.".format( - f - ), + f"{f} does not exist. dissectBCL crashed.", self.config ) sys.exit(1) - # Parse runInfo + # Init - Parse runInfo def parseRunInfo(self): """ Takes the path to runInfo.xml and parses it. @@ -56,7 +64,7 @@ def parseRunInfo(self): - the instrument (str) - the flowcellID (str) """ - logging.info("Parsing RunInfo.xml") + logging.info("Init - Parsing RunInfo.xml") tree = ET.parse(self.runInfo) root = tree.getroot() seqRecipe = {} @@ -80,12 +88,12 @@ def parseRunInfo(self): flowcellID = i.text return seqRecipe, lanes, instrument, flowcellID - # Validate successful run. + # Init - Validate successful run. def validateRunCompletion(self): """ validates succesfull completion status in xml. """ - logging.info("validateRunCompletion") + logging.info("Init - validateRunCompletion") if self.sequencer == 'Miseq': tree = ET.parse(self.runCompletionStatus) root = tree.getroot() @@ -97,18 +105,228 @@ def validateRunCompletion(self): _status = 'SuccessfullyCompleted' return (_status) - def __init__( - self, - name, - bclPath, - origSS, - runInfo, - runCompStat, - inBaseDir, - outBaseDir, - logFile, - config - ): + # demux - prepConvert + def prepConvert(self): + ''' + Determines mask, dualIx status, PE status, convertOptions and mismatches + ''' + logging.info("Demux - prepConvert - determine masking, indices, paired ends, and other options") + for outputFolder in self.sampleSheet.ssDic: + ss_dict = self.sampleSheet.ssDic[outputFolder] + ss = ss_dict['sampleSheet'] + + # determine mask, dualIx, PE, convertOpts, minP5, minP7 from seqRecipe + (ss_dict['mask'], ss_dict['dualIx'], ss_dict['PE'], + ss_dict['convertOpts'], minP5, minP7) = detMask( + self.seqRecipe, + ss, + outputFolder + ) + + # extra check to make sure all our indices are of equal size! + for min_ix, ix_str in ((minP5, 'index'), (minP7, 'index2')): + if min_ix and not np.isnan(min_ix): + ss[ix_str] = ss[ix_str].str[:min_ix] + + # determine mismatch + ss_dict['mismatch'] = misMatcher(ss['index'], P5Seriesret(ss)) + logging.info("Demux - prepConvert - mask in sampleSheet updated.") + self.exitStats['premux'] = 0 + + # demux - demux + def demux(self): + # Double check for run failure + if self.succesfullrun != 'SuccessfullyCompleted': + logging.warning("Demux - Run not succesfull, marking as failed.") + for outLane in self.sampleSheet.ssDic: + Path(self.outBaseDir, outLane).mkdir(exists_ok=True) + Path(self.outBaseDir, outLane, 'run.failed').touch() + mailHome( + f"{self.name} ignored", + "RunCompletionStatus is not successfullycompleted.\n" + + "Marked for failure and ignored for the future.", + self.config, + toCore=True + ) + else: + logging.info("Demux - Run succesfull, starting demux") + for outLane in self.sampleSheet.ssDic: + logging.info(f"Demux - {outLane}") + _ssDic = self.sampleSheet.ssDic[outLane] + # Set outputDirectory + outputFolder = Path(self.outBaseDir, outLane) + outputFolder.mkdir(exist_ok=True) + demuxOut = outputFolder / 'demuxSheet.csv' + # Don't remake if demuxSheet exist + if not demuxOut.exists(): + logging.info(f"Demux - Writing demuxSheet for {outLane}") + writeDemuxSheet( + demuxOut, + _ssDic, + self.sampleSheet.laneSplitStatus + ) + else: + logging.warning( + f"Demux - demuxSheet for {outLane} already exists, not changing it." + ) + compareDemuxSheet(_ssDic, demuxOut) + + # Don't run bcl-convert if we have the touched flag. + if not Path(outputFolder, 'bclconvert.done').exists(): + # Purge pre-existing reports / log folders + if Path(outputFolder, 'Reports').exists(): + shutil.rmtree(Path(outputFolder, 'Reports')) + if Path(outputFolder, 'Logs').exists(): + shutil.rmtree(Path(outputFolder, 'Logs')) + # Run bcl-convert + bclOpts = [ + self.config['software']['bclconvert'], + '--output-directory', outputFolder, + '--force', + '--bcl-input-directory', self.bclPath, + '--sample-sheet', demuxOut, + '--bcl-num-conversion-threads', "20", + '--bcl-num-compression-threads', "20", + "--bcl-sampleproject-subdirectories", "true", + ] + if not self.sampleSheet.laneSplitStatus: + bclOpts.append('--no-lane-splitting') + bclOpts.append('true') + logging.info("Demux - Starting BCLConvert") + logging.info(f"Demux - {bclOpts}") + bclRunner = Popen(bclOpts,stdout=PIPE) + exitcode = bclRunner.wait() + if exitcode == 0: + logging.info("Demux - bclConvert exit 0") + Path(outputFolder, 'bclconvert.done').touch() + if self.sequencer == 'MiSeq': + if evalMiSeqP5( + outputFolder, + _ssDic['dualIx'], + ): + logging.info("Demux - P5 RC triggered.") + # Purge existing reports. + logging.info("Demux - Purge existing Reports folder") + shutil.rmtree(Path(outputFolder, 'Reports')) + shutil.rmtree(Path(outputFolder, 'Logs')) + # Rerun BCLConvert + logging.info("Demux - Rerun BCLConvert") + bclRunner = Popen(bclOpts, stdout=PIPE) + exitcode = bclRunner.wait() + logging.info(f"Demux - bclConvert P5fix exit {exitcode}") + # Update the sampleSheet with proper RC'ed indices. + _ssDic['sampleSheet'] = matchingSheets( + _ssDic['sampleSheet'], + readDemuxSheet(demuxOut, what='df') + ) + _ssDic['P5RC'] = True + else: + _ssDic['P5RC'] = False + else: + _ssDic['P5RC'] = False + else: + logging.critical(f"Demux - BCLConvert exit {exitcode}") + mailHome( + outLane, + 'BCL-convert exit {}. Pipeline crashed.'.format( + exitcode + ), + self.config, + toCore=True + ) + sys.exit(1) + + logging.info(f"Demux - Parsing stats for {outLane}") + _ssDic['sampleSheet'] = parseStats(outputFolder, _ssDic['sampleSheet']) + self.exitStats['demux'] = 0 + + # postmux - postmux + def postmux(self): + logging.info("Postmux - Demux complete, starting postmux") + for outLane in self.sampleSheet.ssDic: + _ssDic = self.sampleSheet.ssDic[outLane] + laneFolder = Path(self.outBaseDir, outLane) + renameFlag = laneFolder / 'renamed.done' + postmuxFlag = laneFolder / 'postmux.done' + df = _ssDic['sampleSheet'] + projects = list(df['Sample_Project'].unique()) + + for project in projects: + if not renameFlag.exists(): + logging.info("Postmux - renaming {}".format(outLane)) + renameProject(laneFolder / project, df, self.sampleSheet.laneSplitStatus) + validateFqEnds(laneFolder / project, self) + if not postmuxFlag.exists(): + _sIDs = set(df[df['Sample_Project'] == project]['Sample_ID']) + # FQC + logging.info(f"Postmux - FastQC {outLane} - {project}") + qcs(project, laneFolder, _sIDs, self.config) + # Clump + logging.info(f"Postmux - Clumping {outLane} - {project}") + clumper(project, laneFolder, _sIDs, self.config, _ssDic['PE'], self.sequencer) + # kraken + logging.info(f"Postmux - kraken {outLane} - {project}") + kraken(project, laneFolder, _sIDs, self.config) + # multiQC + logging.info(f"Postmux - md5/multiqc {outLane} - {project}") + md5_multiqc(project, laneFolder, self) + # Move optical duplicates + moveOptDup(laneFolder) + renameFlag.touch() + postmuxFlag.touch() + self.exitStats['postmux'] = 0 + + # fakenews + def fakenews(self): + logging.info("fakenews - Postmux complete, starting fakenews.") + for outLane in self.sampleSheet.ssDic: + # Ship Files + logging.info(f"fakenews - shipFiles - {outLane}") + self.exitStats[outLane] = shipFiles(self.outBaseDir / outLane, self.config) + + # Push parkour + logging.info(f"fakenews - pushParkour - {outLane}") + self.exitStats[outLane]['pushParkour'] = pushParkour(self.flowcellID, self.sampleSheet, self.config, self.bclPath) + + # diagnoses / QCstats + logging.info(f"fakenews - gatherMetrics - {outLane}") + _h = drHouseClass(gatherFinalMetrics(outLane, self)) + logging.info(f"fakenews - prepMail - {outLane}") + subject, _html = _h.prepMail() + mailHome(subject, _html, self.config) + (self.outBaseDir / outLane / 'communication.done').touch() + + # organiseLogs + def organiseLogs(self): + for outLane in self.sampleSheet.ssDic: + logging.info(f"organiseLogs - Populating log dir for {outLane}") + _logDir = self.outBaseDir / outLane / 'Logs' + + # Write out ssdf. + outssdf = _logDir / 'sampleSheetdf.tsv' + self.sampleSheet.ssDic[outLane]['sampleSheet'].to_csv(outssdf, sep='\t') + + # write out outLaneInfo.yaml + dic0 = self.sampleSheet.ssDic[outLane] + del dic0['sampleSheet'] + yaml0 = ruamel.yaml.YAML() + yaml0.indent(mapping=2, sequence=4, offset=2) + with open(_logDir / 'outLaneInfo.yaml', 'w') as f: + yaml0.dump(dic0, f) + + # write out config.ini + dic1 = self.asdict() + with open(_logDir / 'config.ini', 'w') as f: + dic1['config'].write(f) + + # write out flowcellInfo.yaml + del dic1['config'] + yaml1 = ruamel.yaml.YAML() + yaml1.indent(mapping=2, sequence=4, offset=2) + with open(_logDir / 'flowcellInfo.yaml', 'w') as f: + yaml1.dump(dic1, f) + + def __init__(self, name, bclPath, logFile, config): sequencers = { 'A': 'NovaSeq', 'N': 'NextSeq', @@ -117,12 +335,12 @@ def __init__( logging.warning("Initiating flowcellClass {}".format(name)) self.name = name self.sequencer = sequencers[name.split('_')[1][0]] - self.bclPath = bclPath - self.origSS = origSS - self.runInfo = runInfo - self.runCompletionStatus = runCompStat - self.inBaseDir = inBaseDir - self.outBaseDir = outBaseDir + self.bclPath = Path(bclPath) + self.origSS = Path(bclPath, 'SampleSheet.csv') + self.runInfo = Path(bclPath, 'RunInfo.xml') + self.runCompletionStatus = Path(bclPath, 'RunCompletionStatus.xml') + self.inBaseDir = Path(config['Dirs']['baseDir']) + self.outBaseDir = Path(config['Dirs']['outputDir']) self.logFile = logFile self.config = config # Run filesChecks @@ -134,19 +352,27 @@ def __init__( self.instrument, \ self.flowcellID = self.parseRunInfo() self.startTime = datetime.datetime.now() + # Create sampleSheet information + self.sampleSheet = sampleSheetClass( + self.origSS, + self.lanes, + self.config + ) + self.exitStats = {} + self.transferTime = None def asdict(self): return { 'name': self.name, 'sequencer': self.sequencer, - 'bclPath': self.bclPath, - 'original sampleSheet': self.origSS, - 'runInfo': self.runInfo, - 'runCompletionStatus': self.runCompletionStatus, + 'bclPath': str(self.bclPath), + 'original sampleSheet': str(self.origSS), + 'runInfo': str(self.runInfo), + 'runCompletionStatus': str(self.runCompletionStatus), 'sucessfulRun': self.succesfullrun, - 'inBaseDir': self.inBaseDir, - 'outBaseDir': self.outBaseDir, - 'dissect logFile': self.logFile, + 'inBaseDir': str(self.inBaseDir), + 'outBaseDir': str(self.outBaseDir), + 'dissect logFile': str(self.logFile), 'seqRecipe': self.seqRecipe, 'lanes': self.lanes, 'instrument': self.instrument, @@ -367,7 +593,7 @@ def __init__(self, sampleSheet, lanes, config): logging.warning("initiating sampleSheetClass") self.runInfoLanes = lanes self.origSs = sampleSheet - self.flowcell = sampleSheet.split('/')[-2] + self.flowcell = sampleSheet.parts[-2] self.ssDic = self.parseSS(self.queryParkour(config)) @@ -556,36 +782,19 @@ def optDupRet(optDup): ) return (self.outLane, msg) - def __init__( - self, - undetermined, - totalReads, - topBarcodes, - spaceFree_rap, - spaceFree_sol, - runTime, - optDup, - flowcellID, - outLane, - contamination, - barcodeMask, - mismatch, - transferTime, - exitStats, - P5RC - ): - self.undetermined = undetermined - self.totalReads = totalReads - self.topBarcodes = topBarcodes - self.spaceFree_rap = spaceFree_rap - self.spaceFree_sol = spaceFree_sol - self.runTime = runTime - self.optDup = optDup - self.flowcellID = flowcellID - self.outLane = outLane - self.contamination = contamination - self.barcodeMask = barcodeMask - self.mismatch = mismatch - self.transferTime = transferTime - self.exitStats = exitStats - self.P5RC = P5RC + def __init__(self, qcdic): + self.undetermined = qcdic['undetermined'] + self.totalReads = qcdic['totalReads'] + self.topBarcodes = qcdic['topBarcodes'] + self.spaceFree_rap = qcdic['spaceFree_rap'] + self.spaceFree_sol = qcdic['spaceFree_sol'] + self.runTime = qcdic['runTime'] + self.optDup = qcdic['optDup'] + self.flowcellID = qcdic['flowcellID'] + self.outLane = qcdic['outLane'] + self.contamination = qcdic['contamination'] + self.barcodeMask = qcdic['barcodeMask'] + self.mismatch = qcdic['mismatch'] + self.transferTime = qcdic['transferTime'] + self.exitStats = qcdic['exitStats'] + self.P5RC = qcdic['P5RC'] diff --git a/src/dissectBCL/misc.py b/src/dissectBCL/misc.py index 5b5787a..f244e12 100644 --- a/src/dissectBCL/misc.py +++ b/src/dissectBCL/misc.py @@ -9,6 +9,8 @@ from importlib.metadata import version import sys import logging +import shutil +from rich import print def getConf(configfile, quickload=False): @@ -78,9 +80,21 @@ def getConf(configfile, quickload=False): def getNewFlowCell(config, fPath=None): # If there is a fPath set, just return that. if fPath: - flowcellName = fPath.split('/')[-1] + fPath = Path(fPath) + assert fPath.exists() + flowcellName = fPath.name flowcellDir = fPath - return (flowcellName, flowcellDir) + if not glob.glob( + os.path.join( + config['Dirs']['outputDir'], + flowcellName + '*', + 'communication.done' + ) + ): + return (flowcellName, flowcellDir) + else: + print(f"[red]{flowcellName} exists with a communication.done flag already.[/red]") + sys.exit() # set some config vars. baseDir = config['Dirs']['baseDir'] outBaseDir = config['Dirs']['outputDir'] @@ -232,24 +246,6 @@ def krakenfqs(IDdir): ) -def moveOptDup(laneFolder): - for txt in glob.glob( - os.path.join( - laneFolder, - '*', - '*', - '*duplicate.txt' - ) - ): - # Field -3 == project folder - # escape those already in a fastqc folder (reruns) - if 'FASTQC' not in txt: - pathLis = txt.split('/') - pathLis[-3] = 'FASTQC_' + pathLis[-3] - ofile = "/".join(pathLis) - os.rename(txt, ofile) - - def retBCstr(ser, returnHeader=False): if returnHeader: if 'index2' in list(ser.index): @@ -362,10 +358,12 @@ def fetchLatestSeqDir(PIpath, seqDir): ''' -def fetchLatestSeqDir(PIpath, seqDir): +def fetchLatestSeqDir(config, PI): ''' Fetch the latest sequencing_data dir in the PI directory ''' + PIpath = os.path.join(config['Dirs']['piDir'], PI) + seqDir = config['Internals']['seqDir'] seqDirNum = 0 for dirs in os.listdir(os.path.join(PIpath)): if seqDir in dirs: @@ -374,9 +372,9 @@ def fetchLatestSeqDir(PIpath, seqDir): if int(seqDirStrip) > seqDirNum: seqDirNum = int(seqDirStrip) if seqDirNum == 0: - return os.path.join(PIpath + '/sequencing_data') + return Path(os.path.join(PIpath + '/sequencing_data')) else: - return os.path.join(PIpath + '/sequencing_data' + str(seqDirNum)) + return Path(os.path.join(PIpath + '/sequencing_data' + str(seqDirNum))) def umlautDestroyer(germanWord): @@ -417,87 +415,252 @@ def umlautDestroyer(germanWord): return (_string.decode('utf-8').replace(' ', '')) -def validateFqEnds(dir): - """ - recursively looks for fastq.gz files, - validates the ending (e.g. R1, R2, I1, I2) - ignores 'Undetermined' - returns list of malformatted fastqs - """ - malformat = [] - for f in Path(dir).rglob('*fastq.gz'): - if 'Undetermined' not in os.path.basename(f): - e = os.path.basename(f).split('.')[0] - if e[-2:] not in [ - 'R1', 'R2', 'I1', 'I2' - ]: - malformat.append(e) - return (malformat) - - -def matchingSheets(autodf, mandf): +def multiQC_yaml(flowcell, project, laneFolder): ''' - if demuxSheet is overwritten: - update indices from the manually-edited dataframe to the - given automatically generated DataFrame. - - :param autodf: Automatically generated sampleSheet DataFrame - from SampleSheet.ssDic[lane]['samplesheet']. - :param mandf: Manually edited samplesheet (from demuxSheet.csv in - output lane directory) - :type autodf: pandas.DataFrame - :type mandf: pandas.DataFrame - :returns: updated autodf with indices + This function creates: + - config yaml, containing appropriate header information + - data string adding gen stats + - data string containing our old seqreport statistics. + Keep in mind we delete these after running mqc ''' - # if there are no indices, just return the same autodf - if 'index' not in autodf.columns: - return autodf + logging.info("Postmux - multiqc yaml creation") + ssDic = flowcell.sampleSheet.ssDic[laneFolder.name] + ssdf = ssDic['sampleSheet'][ssDic['sampleSheet']['Sample_Project'] == project] + # data string genstats + mqcData = "# format: 'tsv'\n" + mqcData += "# plot_type: 'generalstats'\n" + mqcData += "# pconfig: \n" + mqcData += "Sample_Name\tSample_ID\tRequested\n" + reqDict = {} + for sample in list(ssdf['Sample_Name'].unique()): + sampleID = ssdf[ssdf['Sample_Name'] == sample]['Sample_ID'].values[0] + if ssdf[ssdf['Sample_Name'] == sample]['reqDepth'].values[0] == 'NA': + reqDepth = 'NA' + else: + reqDepth = float( + ssdf[ssdf['Sample_Name'] == sample]['reqDepth'].values[0] + ) + reqDepth = round(reqDepth/1000000, 2) + # + sampleLis = sorted(laneFolder.glob(f"*/Sample_{sampleID}/*[IR][12].fastq.gz")) + for fqFile in sampleLis: + # basename, with 'fastq.gz' + reqDict[fqFile.with_suffix('').with_suffix('').name] = [sampleID, reqDepth] + for sample in reqDict: + mqcData += f"{sample}\t{reqDict[sample][0]}\t{reqDict[sample][1]}\n" + # seqreport Data + seqreportData = "" + for index, row in ssdf.iterrows(): + if seqreportData == "": + meanQ_headers, Meanq = retMean_perc_Q(row, returnHeader=True) + percq30_headers, perc30 = retMean_perc_Q( + row, returnHeader=True, qtype='percQ30' + ) + seqreportData += \ + "\tSample ID\tLane\t{}\t{}\n".format( + meanQ_headers, + percq30_headers + ) + seqreportData += "{}\t{}\t{}\t{}\t{}\n".format( + row['Sample_Name'], + row['Sample_ID'], + "L" + str(row['Lane']), + Meanq, + perc30 + ) + else: + Meanq = retMean_perc_Q(row) + perc30 = retMean_perc_Q(row, qtype='percQ30') + seqreportData += "{}\t{}\t{}\t{}\t{}\n".format( + row['Sample_Name'], + row['Sample_ID'], + "L" + str(row['Lane']), + Meanq, + perc30 + ) - if len(autodf.index) != len(mandf.index): - logging.warning( - "number of samples changed in overwritten demuxSheet !" + # Index stats. + indexreportData = "" + # indexreportData = "\tSample ID\tBarcodes\tBarcode types\n" + for index, row in ssdf.iterrows(): + if indexreportData == "": + indexreportData += "\tSample ID\t{}\t{}\n".format( + retBCstr(row, returnHeader=True), + retIxtype(row, returnHeader=True) + ) + indexreportData += "{}\t{}\t{}\t{}\n".format( + row['Sample_Name'], + row['Sample_ID'], + retBCstr(row), + retIxtype(row) ) - dualIx = 'index2' in list(mandf.columns) - - for index, row in mandf.iterrows(): - sample_ID = row['Sample_ID'] - index = row['index'] - if dualIx: - index2 = row['index2'] - # grab the index in the autodf. - pdIx = autodf[autodf['Sample_ID'] == sample_ID].index - if dualIx: - if autodf.loc[pdIx, 'index'].values != index: - logging.info("Changing P7 {} to {} for {}".format( - autodf.loc[pdIx, 'index'].values, - index, - sample_ID - )) - autodf.loc[pdIx, 'index'] = index - autodf.loc[pdIx, 'I7_Index_ID'] = np.nan - if autodf.loc[pdIx, 'index2'].values != index2: - logging.info("Changing P5 {} to {} for {}".format( - autodf.loc[pdIx, 'index2'].values, - index2, - sample_ID - )) - autodf.loc[pdIx, 'index2'] = index2 - autodf.loc[pdIx, 'I5_Index_ID'] = np.nan + # config yaml + # libraryTypes + libTypes = ', '.join(list( + ssdf['Library_Type'].fillna('None').unique() + )) + # indexTypes + ixTypes = ', '.join(list( + ssdf["indexType"].fillna('None').unique() + )) + # Protocols + protTypes = ', '.join(list( + ssdf["Description"].fillna('None').unique() + )) + # Organisms + orgs = ', '.join(list( + ssdf["Organism"].fillna('None').unique() + )) + # Resequencing runs are screwed up (e.g. don't contain the samples) + # switch total requested to NA + try: + sumReqRound = str( + round((ssdf['reqDepth'].sum())/1000000, 0) + ) + except TypeError: + sumReqRound = 'NA' + + mqcyml = { + "title": project, + "custom_logo": flowcell.config["misc"]["mpiImg"], + "custom_logo_url": "https://www.ie-freiburg.mpg.de/", + "custom_logo_title": "MPI-IE", + "show_analysis_paths": False, + "show_analysis_time": False, + "fastqscreen_simpleplot": False, + "log_filesize_limit": 2000000000, + "report_header_info": [ + {"Contact E-mail": flowcell.config["communication"]["bioinfoCore"]}, + {"Flowcell": flowcell.name}, + {"Sequencer Type": flowcell.sequencer}, + {"Read Lengths": formatSeqRecipe(flowcell.seqRecipe)}, + {"Demux. Mask": ssDic["mask"]}, + {"Mismatches": formatMisMatches(ssDic["mismatch"])}, + {"dissectBCL version": "{}".format(version("dissectBCL"))}, + {"bcl-convert version": flowcell.config["softwareVers"]["bclconvert"]}, + {"Library Type": libTypes}, + {"Library Protocol": protTypes}, + {"Index Type": ixTypes}, + {"Organism": orgs}, + {"Requested reads": sumReqRound}, + {"Received reads": str( + round( + (ssdf['gotDepth'].replace( + 'NA', np.nan + ).dropna().sum())/1000000, + 0 + ) + )} + ], + "section_comments": { + "kraken": flowcell.config["misc"]['krakenExpl'] + } + + } + return (mqcyml, mqcData, seqreportData, indexreportData) + + +def stripRights(enduserBase): + for r, dirs, files in os.walk(enduserBase): + for d in dirs: + os.chmod(os.path.join(r, d), 0o700) + for f in files: + os.chmod(os.path.join(r, f), 0o700) + +def getDiskSpace(outputDir): + ''' + Return space free in GB + ''' + total, used, free = shutil.disk_usage(outputDir) + return (total // (2**30), free // (2**30)) + +def fexUpload(outLane, project, fromA, opas): + ''' + outLane = 240619_M01358_0047_000000000-LKGP2_lanes_1 + project = Project_xxxx_user_PI + fromA = from sender (comes from config) + opas = (path/to/project_xxx_user_PI, path/to/FASTQC_project_xx_user_PI) + ''' + replaceStatus ='Uploaded' + tarBall = outLane + '_' + project + '.tar' + fexList = sp.check_output( + ['fexsend', '-l', fromA] + ).decode("utf-8").replace("\n", " ").split(' ') + if tarBall in fexList: + logging.info("fakenews - {project} found in fex. Replacing.") + fexRm = ['fexsend', '-d', tarBall, fromA] + fexdel = sp.Popen(fexRm) + fexdel.wait() + replaceStatus = 'Replaced' + fexsend = f"tar cf - {opas[0]} {opas[1]} | fexsend -s {tarBall} {fromA}" + os.system(fexsend) + return replaceStatus + +def sendMqcReports(outPath, tdirs): + ''' + Ship mqc reports to seqfacdir and bioinfocoredir. + outPath = /path/to/240619_M01358_0047_000000000-LKGP2_lanes_1 + tdirs = Dirs part from config, contains bioinfoCoreDir and seqFacDir + ''' + outLane = outPath.name + yrstr = '20' + outLane[:2] + BioInfoCoreDir = Path(tdirs['bioinfoCoreDir']) + seqFacDir = Path(tdirs['seqFacDir']) / f"Sequence_Quality_{yrstr}" / f"Illumina_{yrstr}" / outLane + seqFacDir.mkdir(parents=True, exist_ok=True) + for _mq in outPath.glob("*/*multiqc_report.html"): + sout = seqFacDir / _mq.name + bout = BioInfoCoreDir / f"{outLane}_{_mq.name}" + logging.info(f"fakenews - sedMqcReports - seqfac: {sout}") + logging.info(f"fakenews - sedMqcReports - bioinfo: {bout}") + shutil.copyfile(_mq, sout) + shutil.copyfile(_mq, bout) + +def matchOptdupsReqs(optDups, ssdf): + ''' + Takes a nested list (optDups) with: + [ + project, + sampleID, + sampleName, + optical_dups, + ] + Matches sampleID with ssdf, and gets gotten / req reads. + returns a nested list including req/got & got + this list is sorted by sampleID. + ''' + _optDups = [] + for lis in optDups: + sampleID = lis[1] + sampleName = lis[2] + req = ssdf[ + ssdf['Sample_ID'] == sampleID + ]['reqDepth'].values + got = ssdf[ + ssdf['Sample_ID'] == sampleID + ]['gotDepth'].values + reqvgot = float(got/req) + # isnull if sample is omitted from demuxsheet but in parkour. + if pd.isnull(got): + _optDups.append( + [ + lis[0], + sampleID, + sampleName, + lis[3], + 0, + 0 + ] # fill in zeroes + ) else: - # check index1, set index2 to na - if autodf.loc[pdIx, 'index'].values != index: - logging.info("Changing P7 {} to {} for {}".format( - autodf.loc[pdIx, 'index'].values, - index, - sample_ID - )) - autodf.loc[pdIx, 'index'] = index - # change type as well! - autodf.loc[pdIx, 'I7_Index_ID'] = np.nan - # it's not dualIx, so set index2/I5_Index_ID to nan. - if 'index2' in list(autodf.columns): - autodf.loc[pdIx, 'index2'] = np.nan - if 'I5_Index_ID' in list(autodf.columns): - autodf.loc[pdIx, 'I5_Index_ID'] = np.nan - return (autodf) + _optDups.append( + [ + lis[0], + sampleID, + sampleName, + lis[3], + round(reqvgot, 2), + int(got) + ] + ) + return (sorted(_optDups, key=lambda x: x[1])) \ No newline at end of file diff --git a/src/dissectBCL/postmux.py b/src/dissectBCL/postmux.py index f9ca04f..6c6c457 100644 --- a/src/dissectBCL/postmux.py +++ b/src/dissectBCL/postmux.py @@ -1,16 +1,16 @@ +from dissectBCL.fakeNews import mailHome +from dissectBCL.misc import krakenfqs +from dissectBCL.misc import multiQC_yaml +import hashlib +import logging import os -import glob -from dissectBCL.fakeNews import multiQC_yaml, mailHome -from dissectBCL.misc import krakenfqs, moveOptDup, validateFqEnds -from pathlib import Path -import re -import shutil -import sys from multiprocessing import Pool from pandas import isna -from subprocess import Popen, DEVNULL import ruamel.yaml -import logging +import re +import shutil +from subprocess import Popen, DEVNULL +import sys def matchIDtoName(ID, ssdf): @@ -48,36 +48,28 @@ def matchIDtoName(ID, ssdf): def renamefq(fqFile, projectFolder, ssdf, laneSplitStatus): - oldName = fqFile.split('/')[-1] + oldName = fqFile.name + # 24L002006_S63_L001_R2_001.fastq.gz -> 24L002006 sampleID = oldName.split('_')[0] + # 24L002006 -> sample_name.txt sampleName = matchIDtoName(sampleID, ssdf) - sampleIDPath = os.path.join( - projectFolder, - "Sample_" + sampleID - ) - if not os.path.exists(sampleIDPath): - os.mkdir(sampleIDPath) + sampleIDPath = projectFolder / f"Sample_{sampleID}" + sampleIDPath.mkdir(exist_ok = True) + # Create new name if laneSplitStatus: newName = oldName.replace(sampleID, sampleName) regstr = r"_S[0-9]?[0-9]?[0-9]?[0-9]_" regstr += r"+L[0-9][0-9][0-9]_+([IR][123])+_[0-9][0-9][0-9]" - newName = re.sub( - regstr, - r'_\1', - newName - ) + newName = re.sub(regstr, r'_\1', newName) newName.replace(sampleID, sampleName) else: newName = oldName.replace(sampleID, sampleName) regstr = r"_S[0-9]?[0-9]?[0-9]?[0-9]_" regstr += r"+([IR][123])+_[0-9][0-9][0-9]" - newName = re.sub( - regstr, - r'_\1', - newName - ) - return os.path.join(sampleIDPath, newName) + newName = re.sub(regstr, r'_\1', newName) + logging.debug(f"Postmux - rename - suggesting {oldName} into {newName}") + return sampleIDPath / newName def renameProject(projectFolder, ssdf, laneSplitStatus): @@ -86,23 +78,47 @@ def renameProject(projectFolder, ssdf, laneSplitStatus): rename project folder from e.g. 1906_Hein_B03_Hein -> Project_1906_Hein_B03_Hein """ - logging.info("Renaming {}".format(projectFolder)) - for fq in glob.glob( - os.path.join( - projectFolder, - '*fastq.gz' - ) - ): + + logging.info(f"Postmux - Renaming {projectFolder}") + for fq in projectFolder.glob('*fastq.gz'): newName = renamefq(fq, projectFolder, ssdf, laneSplitStatus) shutil.move(fq, newName) + # Finally rename the project folder. - projID = projectFolder.split('/')[-1] shutil.move( projectFolder, - projectFolder.replace(projID, "Project_" + projID) + projectFolder.with_stem("Project_" + projectFolder.stem) ) +def validateFqEnds(pdir, flowcell): + """ + recursively looks for fastq.gz files, + validates the ending (e.g. R1, R2, I1, I2) + ignores 'Undetermined' + + """ + malformat = [] + for f in pdir.rglob('*fastq.gz'): + if 'Undetermined' not in f: + e = f.name.split('.')[0] + if e[-2:] not in [ + 'R1', 'R2', 'I1', 'I2' + ]: + malformat.append(e) + if not malformat: + logging.info(f"Postmux - all fastq files in {pdir} have proper ending.") + else: + _msg = f"Improper fastq file format: {malformat}" + logging.critical(_msg) + mailHome( + flowcell.name, + _msg, + flowcell.config + ) + sys.exit(1) + + def fqcRunner(cmd): cmds = cmd.split(" ") qcRun = Popen(cmds, stdout=DEVNULL, stderr=DEVNULL) @@ -112,78 +128,48 @@ def fqcRunner(cmd): def qcs(project, laneFolder, sampleIDs, config): # make fastqc folder. - fqcFolder = os.path.join( - laneFolder, - "FASTQC_Project_" + project - ) - if not os.path.exists(fqcFolder): - os.mkdir(fqcFolder) + fqcFolder = laneFolder / f"FASTQC_Project_{project}" + fqcFolder.mkdir(exist_ok=True) fastqcCmds = [] for ID in sampleIDs: # Colliding samples are omitted, and don't have a folder. - if not os.path.exists( - os.path.join( - laneFolder, - 'Project_' + project, - 'Sample_' + ID - ) - ): + fqFolder = laneFolder / f"Project_{project}" / f"Sample_{ID}" + if not fqFolder.exists(): continue - else: - IDfolder = os.path.join( - laneFolder, - "FASTQC_Project_" + project, - "Sample_" + ID + IDFolder = fqcFolder / f"Sample_{ID}" + IDFolder.mkdir(exist_ok=True) + fqFiles = [str(i) for i in fqFolder.glob("*fastq.gz")] + # Don't do double work. + if len(list(IDFolder.glob("*zip"))) == 0: + fastqcCmds.append( + " ".join([ + 'fastqc', + '-a', + config['software']['fastqc_adapters'], + '-q', + '-t', + str(len(fqFiles)), + '-o', + IDFolder._str + ] + fqFiles) ) - if not os.path.exists(IDfolder): - os.mkdir(IDfolder) - fqFiles = glob.glob( - os.path.join( - laneFolder, - "Project_" + project, - "Sample_" + ID, - '*fastq.gz' - ) - ) - # Don't do double work. - if len(glob.glob( - os.path.join(IDfolder, "*zip") - )) == 0: - fastqcCmds.append( - " ".join([ - 'fastqc', - '-a', - config['software']['fastqc_adapters'], - '-q', - '-t', - str(len(fqFiles)), - '-o', - IDfolder - ] + fqFiles) - ) if fastqcCmds: - logging.info( - "fastqc example for {} - {}".format( - project, fastqcCmds[0] - ) - ) + logging.info(f"Postmux - FastQC - command example: {project} - {fastqcCmds[0]}") with Pool(20) as p: fqcReturns = p.map(fqcRunner, fastqcCmds) if fqcReturns.count(0) == len(fqcReturns): - logging.info("FastQC done for {}.".format(project)) + logging.info(f"Postmux - FastQC done for {project}.") else: - logging.critical( - "FastQC runs failed for {}. exiting.".format(project) - ) + logging.critical(f"Postmux - FastQC crashed for {project}. exiting.") mailHome( laneFolder, - "FastQC runs failed. Investigate.", + f"FastQC runs failed for project {project}.", config, toCore=True ) sys.exit(1) else: - logging.info("FastQCs already done for {}".format(project)) + logging.info(f"Postmux - Seems all FastQCs already done for {project}") def clmpRunner(cmd): @@ -204,7 +190,6 @@ def clmpRunner(cmd): def clumper(project, laneFolder, sampleIDs, config, PE, sequencer): - logging.info("Clump for {}".format(project)) clmpOpts = { 'general': [ 'out=tmp.fq.gz', @@ -226,86 +211,62 @@ def clumper(project, laneFolder, sampleIDs, config, PE, sequencer): clmpCmds = [] if sequencer != 'MiSeq': for ID in sampleIDs: - sampleDir = os.path.join( - laneFolder, - "Project_" + project, - "Sample_" + ID - ) - if os.path.exists(sampleDir): - if len(glob.glob( - os.path.join(sampleDir, '*optical_duplicates.fastq.gz') - )) == 0: - fqFiles = glob.glob( - os.path.join( - sampleDir, - "*fastq.gz" + sampleDir = laneFolder / f"Project_{project}" / f"Sample_{ID}" + if sampleDir.exists() and len(list(sampleDir.glob("*optical_duplicates*"))) == 0: + fqFiles = list(sampleDir.glob("*fastq.gz")) + if len(fqFiles) < 3: + if PE and len(fqFiles) == 2: + for i in fqFiles: + if '_R1.fastq.gz' in str(i): + in1 = "in=" + str(i) + baseName = i.name.replace('_R1.fastq.gz', '') + elif '_R2.fastq.gz' in str(i): + in2 = "in2=" + str(i) + clmpCmds.append( + 'clumpify.sh' + " " + + in1 + " " + + in2 + " " + + " ".join(clmpOpts['general']) + " " + + " ".join(clmpOpts[sequencer]) + " " + + str(sampleDir) + " " + + "1" + " " + + baseName ) - ) - if len(fqFiles) < 3: - if PE and len(fqFiles) == 2: - for i in fqFiles: - if '_R1.fastq.gz' in i: - in1 = "in=" + i - baseName = i.split('/')[-1].replace( - "_R1.fastq.gz", - "" - ) - elif '_R2.fastq.gz' in i: - in2 = "in2=" + i + elif not PE and len(fqFiles) == 1: + if '_R1.fastq.gz' in str(fqFiles[0]): + in1 = "in=" + str(fqFiles[0]) + baseName = fqFiles[0].name.replace('_R1.fastq.gz', '') clmpCmds.append( 'clumpify.sh' + " " + in1 + " " + - in2 + " " + " ".join(clmpOpts['general']) + " " + " ".join(clmpOpts[sequencer]) + " " + sampleDir + " " + - "1" + " " + + "0" + " " + baseName ) - elif not PE and len(fqFiles) == 1: - if '_R1.fastq.gz' in fqFiles[0]: - in1 = "in=" + fqFiles[0] - baseName = fqFiles[0].split('/')[-1].replace( - "_R1.fastq.gz", - "" - ) - clmpCmds.append( - 'clumpify.sh' + " " + - in1 + " " + - " ".join(clmpOpts['general']) + " " + - " ".join(clmpOpts[sequencer]) + " " + - sampleDir + " " + - "0" + " " + - baseName - ) - else: - logging.info("Not clumping {}".format(ID)) + else: + logging.info(f"Not clumping {ID}") if clmpCmds: - logging.info( - "clump example for {} - {}".format( - project, - clmpCmds[0] - ) - ) + logging.info(f"Postmux - Clump - command example: {project} - {clmpCmds[0]}") with Pool(5) as p: clmpReturns = p.map(clmpRunner, clmpCmds) if clmpReturns.count((0, 0)) == len(clmpReturns): - logging.info("Clumping done for {}.".format(project)) + logging.info(f"Postmux - Clumping done for {project}.") else: - logging.critical( - "Clumping failed for {}. exiting.".format(project) - ) + + logging.critical(f"Postmux - Clumping failed for {project}. Exiting.") mailHome( laneFolder, - "Clump runs failed. Investigate.", + f"Clump runs failed for {project}.", config, toCore=True ) sys.exit(1) else: - logging.info("No clump run for {}".format(project)) + logging.info(f"Postmux - Clump - No clump run for {project}") else: - logging.info("No clump for MiSeq.") + logging.info("Postmux - Clump - no clumping for MiSeq.") def krakRunner(cmd): @@ -316,118 +277,87 @@ def krakRunner(cmd): def kraken(project, laneFolder, sampleIDs, config): - logging.info("Kraken for {}".format(project)) krakenCmds = [] for ID in sampleIDs: - IDfolder = os.path.join( - laneFolder, - "FASTQC_Project_" + project, - "Sample_" + ID - ) - if os.path.exists(IDfolder): - if len(glob.glob( - os.path.join( - IDfolder, - '*.rep' - ) - )) == 0: - sampleFolder = os.path.join( - laneFolder, - "Project_" + project, - "Sample_" + ID - ) - reportname, fqs = krakenfqs(sampleFolder) - krakenCmds.append( - ' '.join([ - 'kraken2', - '--db', - config['software']['kraken2db'], - '--out', - '-', - '--threads', - '4', - '--report', - reportname - ] + fqs) - ) - if krakenCmds: - logging.info( - "kraken example for {} - {}".format( - project, - krakenCmds[0] + IDfolder = laneFolder / f"FASTQC_Project_{project}" / f"Sample_{ID}" + if IDfolder.exists() and len(list(IDfolder.glob("*.rep"))) == 0: + sampleFolder = laneFolder / f"Project_{project}" / f"Sample_{ID}" + reportname, fqs = krakenfqs(sampleFolder) + krakenCmds.append( + ' '.join([ + 'kraken2', + '--db', + config['software']['kraken2db'], + '--out', + '-', + '--threads', + '4', + '--report', + reportname + ] + fqs) ) - ) + if krakenCmds: + logging.info(f"Postmux - Kraken - command example: {project} - {krakenCmds[0]}") with Pool(10) as p: screenReturns = p.map(krakRunner, krakenCmds) if screenReturns.count(0) == len(screenReturns): - logging.info("kraken ran {}.".format(project)) + logging.info(f"Postmux - Kraken done for {project}.") else: - logging.critical( - "kraken failed for {}. exiting.".format(project) - ) + logging.critical(f"Postmux - Kraken failed for {project}. Exiting") mailHome( laneFolder, - "kraken runs failed. Investigate.", + f"Kraken runs failed for {project}.", config, toCore=True ) sys.exit(1) else: - logging.info( - "kraken files already present. Skipping {}".format(project) - ) + logging.info(f"Postmux - Kraken - No kraken run for {project}") -def multiqc(project, laneFolder, config, flowcell, sampleSheet): - logging.info("multiqc for {}".format(project)) - outLane = laneFolder.split('/')[-1] - QCFolder = os.path.join( - laneFolder, - 'FASTQC_Project_' + project - ) - projectFolder = os.path.join( - laneFolder, - "Project_" + project - ) +def md5Runner(fqfile): + return (fqfile.name, hashlib.md5(open(fqfile, 'rb').read()).hexdigest()) + + +def moveOptDup(laneFolder): + for txt in laneFolder.glob('*/*/*duplicate.txt'): + # Field -3 == project folder + # escape those already in a fastqc folder (reruns) + if 'FASTQC' not in str(txt): + pathLis = str(txt).split('/') + pathLis[-3] = 'FASTQC_' + pathLis[-3] + ofile = "/".join(pathLis) + os.rename(txt, ofile) + + +def md5_multiqc(project, laneFolder, flowcell): + QCFolder = laneFolder / f"FASTQC_Project_{project}" + projectFolder = laneFolder / f"Project_{project}" + # md5sums - md5CmdStr = \ - r"md5sum {} | {} | {} | {} > {}".format( - projectFolder + '/*/*fastq.gz', - r"sed 's/ //g'", - r"cut -d '/' -f1,8", - r"sed 's/\//\t/g'", - os.path.join(projectFolder, 'md5sums.txt') - ) - if not os.path.exists( - os.path.join(projectFolder, 'md5sums.txt') - ): - os.system(md5CmdStr) + logging.info(f"Postmux - md5sums - {project}") + md5out = projectFolder / 'md5sums.txt' + + if not md5out.exists(): + with Pool(20) as p: + _m5sums = p.map(md5Runner, list(projectFolder.glob("*/*fastq.gz"))) + with open(md5out, 'w') as f: + for _m5sum in sorted(_m5sums, key=lambda x:x[0]): + f.write(f"{_m5sum[0]}\t{_m5sum[1]}\n") + # Always overwrite the multiQC reports. RunTimes are marginal anyway. mqcConf, mqcData, seqrepData, indexreportData = multiQC_yaml( - config, flowcell, - sampleSheet.ssDic[outLane], project, laneFolder ) + yaml = ruamel.yaml.YAML() yaml.indent(mapping=2, sequence=4, offset=2) - confOut = os.path.join( - projectFolder, - 'mqc.yaml' - ) - dataOut = os.path.join( - QCFolder, - 'parkour_mqc.tsv' - ) - seqrepOut = os.path.join( - QCFolder, - 'Sequencing_Report_mqc.tsv' - ) - indexrepOut = os.path.join( - QCFolder, - 'Index_Info_mqc.tsv' - ) + confOut = projectFolder / 'mqc.yaml' + dataOut = QCFolder / 'parkour_mqc.tsv' + seqrepOut = QCFolder / 'Sequencing_Report_mqc.tsv' + indexrepOut = QCFolder / 'Index_Info_mqc.tsv' with open(confOut, 'w') as f: yaml.dump(mqcConf, f) with open(seqrepOut, 'w') as f: @@ -450,113 +380,17 @@ def multiqc(project, laneFolder, config, flowcell, sampleSheet): multiqcRun = Popen(multiqcCmd, stdout=DEVNULL, stderr=DEVNULL) exitcode = multiqcRun.wait() if exitcode == 0: - logging.info('multiqc ran for {}'.format(project)) + logging.info(f"Postmux - multiqc done for {project}") os.remove(confOut) os.remove(dataOut) os.remove(seqrepOut) os.remove(indexrepOut) else: - logging.critical("multiqc failed for {}".format(project)) + logging.critical(f"Postmux - multiqc failed for {project}") mailHome( laneFolder, - "multiQC runs failed. Investigate.", - config, + f"multiQC runs failed for {project}.", + flowcell.config, toCore=True ) - sys.exit(1) - return exitcode - - -def postmux(flowcell, sampleSheet, config): - logging.warning("Postmux module") - for outLane in sampleSheet.ssDic: - laneFolder = os.path.join(flowcell.outBaseDir, outLane) - # Don't rename if renamed.done flag is there. - renameFlag = os.path.join( - laneFolder, - 'renamed.done' - ) - postmuxFlag = os.path.join( - laneFolder, - 'postmux.done' - ) - df = sampleSheet.ssDic[outLane]['sampleSheet'] - projects = list(df['Sample_Project'].unique()) - # Renaming module - if not os.path.exists(renameFlag): - logging.info("Renaming {}".format(outLane)) - for project in projects: - renameProject( - os.path.join( - flowcell.outBaseDir, - outLane, - project - ), - df, - sampleSheet.laneSplitStatus - ) - # Sanity check for file endings. - fqEnds = validateFqEnds(laneFolder) - if not fqEnds: - logging.info("All fastq files have proper extension.") - else: - logging.critical( - "some fastq files aren't properly renamed: {}".format( - fqEnds - ) - ) - sys.exit( - "some fastq files aren't properly renamed: {}".format( - fqEnds - ) - ) - Path( - os.path.join(laneFolder, 'renamed.done') - ).touch() - # postMux module - if not os.path.exists(postmuxFlag): - logging.info("FastQC pool {}".format(outLane)) - for project in projects: - # FastQC - qcs( - project, - laneFolder, - set( - df[df['Sample_Project'] == project]['Sample_ID'] - ), - config - ) - # clump - clumper( - project, - laneFolder, - set( - df[df['Sample_Project'] == project]['Sample_ID'] - ), - config, - sampleSheet.ssDic[outLane]['PE'], - flowcell.sequencer - ) - # kraken - kraken( - project, - laneFolder, - set( - df[df['Sample_Project'] == project]['Sample_ID'] - ), - config - ) - # multiQC - multiqc( - project, - laneFolder, - config, - flowcell, - sampleSheet - ) - logging.info("Moving optical dup txt into FASTQC folder") - moveOptDup(laneFolder) - Path( - os.path.join(laneFolder, 'postmux.done') - ).touch() - return (0) + sys.exit(1) \ No newline at end of file diff --git a/src/wd40/release.py b/src/wd40/release.py index 9873a38..8d226d3 100644 --- a/src/wd40/release.py +++ b/src/wd40/release.py @@ -24,7 +24,8 @@ def fetchLatestSeqDir(pref, PI, postfix): return os.path.join(pref, PI, postfix + str(maxFolder)) -def fetchFolders(flowcellPath, piList, prefix, postfix): +def fetchFolders(flowcellPath, piList, prefix, postfix, fexBool, parkourVars): + parkourURL, parkourAuth, parkourCert, fromAddress = parkourVars institute_PIs = piList flowcellPath = os.path.abspath(flowcellPath) FID = flowcellPath.split("/")[-1] @@ -64,7 +65,32 @@ def fetchFolders(flowcellPath, piList, prefix, postfix): ) ) else: - print("[bold cyan]Ignoring {}.[/bold cyan]".format(proj)) + print(f"Assuming {proj} is fex'ed.") + fexList = ( + check_output(["fexsend", "-l", fromAddress]) + .decode("utf-8") + .replace("\n", " ") + .split(" ") + ) + + tarBall = FID + "_" + proj + ".tar" + if tarBall in fexList: + if fexBool: + d = {"data": tarBall, "metadata": None} + print( + f"{tarBall} found in fexlist. Added filepaths to Parkour2: ", + requests.post( + parkourURL + + "/api/requests/" + + proj.split("_")[1] + + "/put_filepaths/", + auth=parkourAuth, + data=d, + verify=parkourCert, + ), + ) + else: + print(f"{tarBall} not found in fex. Parkour not updated.") return projDic @@ -136,7 +162,10 @@ def rel( fexBool, fromAddress, ): - projDic = fetchFolders(flowcellPath, piList, prefix, postfix) + projDic = fetchFolders( + flowcellPath, piList, prefix, postfix, fexBool, + (parkourURL, parkourAuth, parkourCert, fromAddress) + ) print("Print number of changed/(changed+unchanged)!") for proj in projDic: """ @@ -168,20 +197,6 @@ def rel( "data": projDic[proj][1][1], "metadata": projDic[proj][1][1] + "/multiqc_report.html", } - elif fexBool: - fexList = ( - check_output(["fexsend", "-l", fromAddress]) - .decode("utf-8") - .replace("\n", " ") - .split(" ") - ) - tar_lane, tar_proj = projDic[proj][1][1].split("/")[-2:] - tarBall = tar_lane + "_" + tar_proj + ".tar" - if tarBall in fexList: - d = {"data": tarBall, "metadata": None} - else: - print("fexLink: ", tarBall, " not found!") - if d: print( "Adding filepaths to Parkour2:", requests.post( @@ -194,3 +209,5 @@ def rel( verify=parkourCert, ), ) # print the returned answer from the API + else: + print(f"{PI} not in piList for {proj}.")