diff --git a/docs/CHANGELOG.rst b/docs/CHANGELOG.rst index 5023f51a0..30388d42c 100644 --- a/docs/CHANGELOG.rst +++ b/docs/CHANGELOG.rst @@ -15,6 +15,7 @@ Added ----- - The (pre)release is built and pushed to the ``PyPI`` automatically when tag is pushed to the repository +- Add ``pca-beta`` process Changed ------- @@ -23,6 +24,8 @@ Changed compatible with the new annotation model - Use GRCh38.109 genome annotation version in snpEff processes - Remove sample annotation functionality from ``geo-import`` process +- Deprecate ``pca.py`` in tools and transfer the code to the new + ``pca-beta`` process Fixed ----- diff --git a/resolwe_bio/processes/clustering/pca.py b/resolwe_bio/processes/clustering/pca.py new file mode 100644 index 000000000..69aa51cc9 --- /dev/null +++ b/resolwe_bio/processes/clustering/pca.py @@ -0,0 +1,313 @@ +"""Principal component analysis process.""" + +import json + +import numpy as np +import pandas as pd +from sklearn.decomposition import PCA as plotPCA +from sklearn.preprocessing import StandardScaler + +from resolwe.process import ( + BooleanField, + DataField, + JsonField, + ListField, + Persistence, + Process, + SchedulingClass, + StringField, +) + + +def component_top_factors(component, allgenes_array, max_size=20): + """Return top 20 absolute factors.""" + abs_component = np.abs(component) + size = min(component.size, max_size) + unordered_ixs = np.argpartition(abs_component, -size)[-size:] + ixs = unordered_ixs[np.argsort(abs_component[unordered_ixs])[::-1]] + if ixs.size == 0: + return [] + return list(zip(np.array(allgenes_array)[ixs].tolist(), component[ixs].tolist())) + + +def get_pca(expressions=pd.DataFrame(), gene_labels=[], error=None): + """Compute PCA.""" + if not gene_labels: + gene_labels = expressions.index + skipped_gene_labels = list(set(gene_labels).difference(expressions.index)) + + pca = plotPCA(n_components=2, whiten=True) + pca_expressions = pca.fit_transform(expressions.transpose()) + + if expressions.shape[0] < 2: + error("At least two genes are required for PCA analysis.") + if expressions.shape[1] < 2: + error("At least two samples are required for PCA analysis.") + + coordinates = [ + t[:2].tolist() if len(t) > 1 else [t[0], 0.0] for t in pca_expressions + ] + all_components = [ + component_top_factors(component, gene_labels) for component in pca.components_ + ] + if np.isnan(pca.explained_variance_ratio_).any(): + all_explained_variance_ratios = [0.0 for _ in pca.explained_variance_ratio_] + else: + all_explained_variance_ratios = pca.explained_variance_ratio_.tolist() + + result = { + "coordinates": coordinates, + "all_components": all_components, + "all_explained_variance_ratios": all_explained_variance_ratios, + "skipped_gene_labels": skipped_gene_labels, + "warning": None, + } + + return result + + +def save_pca(result={}, sample_ids=[], max_size=10): + """Save PCA.""" + output_fn = "pca.json" + data = { + "flot": { + "data": result["coordinates"], + "xlabel": "PC 1", + "ylabel": "PC 2", + "sample_ids": sample_ids, + }, + "zero_gene_symbols": result["skipped_gene_labels"], + "components": result["all_components"][:max_size], + "all_components": result["all_components"], + "explained_variance_ratios": result["all_explained_variance_ratios"][:max_size], + "all_explained_variance_ratios": result["all_explained_variance_ratios"], + } + + if output_fn: + with open(output_fn, "w") as outfile: + json.dump(data, outfile, separators=(",", ":"), allow_nan=False) + else: + print(json.dumps(data, separators=(",", ":"), allow_nan=False)) + + +def read_csv(fname): + """Read CSV file and return Pandas DataFrame.""" + csv = pd.read_csv( + filepath_or_buffer=fname, + sep="\t", + header=0, + index_col=0, + dtype={ + 0: str, + 1: float, + }, + keep_default_na=False, + ) + csv.index = csv.index.map(str) + return csv + + +def transform(expressions, log2=True, const=1.0, z_score=True, error=None): + """Compute log2 and normalize expression values. + + Parameters: + - log2: use log2(x+const) transformation + - const: an additive constant used in computation of log2 + - z_score: use Z-score normalization using StandardScaler + """ + if log2: + expressions = expressions.applymap(lambda x: np.log2(x + const)) + if expressions.isnull().values.any(): + error("Cannot apply log2 to expression values.") + if z_score: + scaler = StandardScaler() + expressions_arr = scaler.fit_transform(expressions) + expressions = pd.DataFrame(expressions_arr, index=expressions.index) + + return expressions + + +def get_csv(fnames): + """Read CSV files, perform transformations and return Pandas DataFrame.""" + expressions = [read_csv(fname) for fname in fnames] + return pd.concat(expressions, axis=1, join="inner") + + +def run_pca( + exp_paths, + rc_paths, + sample_ids, + gene_labels, + log2_transform, + standard_scaler, + error, +): + """Read data, run PCA, and output results.""" + expressions = get_csv(exp_paths) + expressions = transform( + error=error, + expressions=expressions, + log2=log2_transform, + z_score=standard_scaler, + ) + + # Filter out genes with low expression values + if rc_paths: + read_counts = get_csv(rc_paths) + percentile_80 = np.percentile(read_counts, 80, axis=1) + retained_genes = percentile_80 > 100 + expressions = expressions.loc[retained_genes] + + result = get_pca(expressions, gene_labels, error) + save_pca(result, sample_ids) + + +class PCA(Process): + """Principal component analysis process.""" + + slug = "pca-beta" + name = "PCA" + requirements = { + "expression-engine": "jinja", + "executor": { + "docker": { + "image": "public.ecr.aws/genialis/resolwebio/rnaseq:6.0.0", + }, + }, + "resources": { + "cores": 1, + "memory": 4096, + "storage": 10, + }, + } + data_name = "{{ alignment|name|default('?') }}" + version = "1.0.0" + process_type = "data:pca" + category = "Enrichment and Clustering" + entity = { + "type": "sample", + "input": "alignment", + } + scheduling_class = SchedulingClass.INTERACTIVE + persistence = Persistence.TEMP + + description = "Principal component analysis (PCA)" + + class Input: + """Input fields.""" + + exps = ListField(DataField(data_type="expression"), label="Expressions") + genes = ListField(StringField(), label="Gene subset", required=False) + source = StringField( + label="Gene ID database of selected genes", + description="This field is required if gene subset is set.", + required=False, + ) + species = StringField( + label="Species", + description="Species latin name. This field is required if gene subset is set.", + choices=[ + ("Homo sapiens", "Homo sapiens"), + ("Mus musculus", "Mus musculus"), + ("Rattus norvegicus", "Rattus norvegicus"), + ("Dictyostelium discoideum", "Dictyostelium discoideum"), + ("Odocoileus virginianus texanus", "Odocoileus virginianus texanus"), + ("Solanum tuberosum", "Solanum tuberosum"), + ], + required=False, + ) + low_expression_filter = BooleanField( + label="Low expression filter", + default=True, + description="Filter the input expression matrix to remove genes " + "with low expression values. Only genes with 80th percentile above " + "read count of 100 are retained.", + ) + log2 = BooleanField( + label="Log-transform expressions", + default=True, + description="Transform expressions with log2(x + 1) before PCA.", + ) + standard_scaler = BooleanField( + label="Transform input data using StandardScaler", + default=True, + description="Apply the StandardScaler transformation before PCA.", + ) + + class Output: + """Output fields.""" + + pca = JsonField(label="PCA") + + def run(self, inputs, outputs): + """Run the process.""" + + expressions = inputs.exps + + if inputs.genes and not (inputs.source and inputs.species): + self.error( + "Gene ID database and Species must be provided if gene subset is selected." + ) + + for exp in expressions: + if exp.output.source != expressions[0].output.source: + self.error( + "Input samples are of different Gene ID databases: " + f"{exp.output.source} and {expressions[0].output.source}." + ) + if exp.output.species != expressions[0].output.species: + self.error( + "Input samples are of different Species: " + f"{exp.output.species} and {expressions[0].output.species}." + ) + if exp.output.build != expressions[0].output.build: + self.error( + "Input samples are of different Build: " + f"{exp.output.build} and {expressions[0].output.build}." + ) + if exp.output.feature_type != expressions[0].output.feature_type: + self.error( + "Input samples are of different Feature type: " + f"{exp.output.feature_type} and {expressions[0].output.feature_type}." + ) + + if inputs.genes: + if exp.output.source != inputs.source: + self.error( + "Selected genes must be annotated by the same genome database as all expression files." + f"Gene IDs are from {inputs.source} database while sample {exp.name} has gene IDs from {exp.output.source} database." + ) + if exp.output.species != inputs.species: + self.error( + "Selected genes must be from the same species as all expression files." + f"Selected genes are from {inputs.species}, while expression {exp.name} is from {exp.output.species}." + ) + + expression_paths = [exp.output.exp.path for exp in expressions] + sample_ids = [exp.entity.id for exp in expressions] + + if inputs.low_expression_filter: + rc_paths = [exp.output.rc.path for exp in expressions] + else: + rc_paths = None + + if len(rc_paths) != len(expression_paths): + self.warning( + "Cannot apply low expression filter to input samples. Check if all samples have raw counts available." + ) + rc_paths = None + + try: + run_pca( + exp_paths=expression_paths, + rc_paths=rc_paths, + sample_ids=sample_ids, + gene_labels=inputs.genes, + log2_transform=inputs.log2, + standard_scaler=inputs.standard_scaler, + error=self.error, + ) + except Exception as e: + self.error(f"PCA computation failed: {e}") + + outputs.pca = "pca.json" diff --git a/resolwe_bio/tests/files/pca_plot_beta.json.gz b/resolwe_bio/tests/files/pca_plot_beta.json.gz new file mode 100644 index 000000000..85a9e30d2 Binary files /dev/null and b/resolwe_bio/tests/files/pca_plot_beta.json.gz differ diff --git a/resolwe_bio/tests/files/pca_plot_beta_2.json.gz b/resolwe_bio/tests/files/pca_plot_beta_2.json.gz new file mode 100644 index 000000000..ff03a5cca Binary files /dev/null and b/resolwe_bio/tests/files/pca_plot_beta_2.json.gz differ diff --git a/resolwe_bio/tests/processes/test_pca.py b/resolwe_bio/tests/processes/test_pca.py index bd22baa02..188e86ea9 100644 --- a/resolwe_bio/tests/processes/test_pca.py +++ b/resolwe_bio/tests/processes/test_pca.py @@ -4,6 +4,63 @@ class PcaProcessorTestCase(KBBioProcessTestCase): + @with_resolwe_host + @tag_process("pca-beta") + def test_beta_pca(self): + with self.preparation_stage(): + expression_1 = self.prepare_expression( + f_rc="exp_1_rc.tab.gz", + f_exp="exp_1_tpm.tab.gz", + f_type="TPM", + source="DICTYBASE", + species="Dictyostelium discoideum", + ) + expression_2 = self.prepare_expression( + f_rc="exp_2_rc.tab.gz", + f_exp="exp_2_tpm.tab.gz", + f_type="TPM", + source="DICTYBASE", + species="Dictyostelium discoideum", + ) + + inputs = { + "exps": [expression_1.pk, expression_2.pk], + "source": "DICTYBASE", + "species": "Dictyostelium discoideum", + } + pca = self.run_process("pca-beta", inputs) + saved_json, test_json = self.get_json( + "pca_plot_beta.json.gz", pca.output["pca"] + ) + self.assertAlmostEqualGeneric( + test_json["flot"]["data"], saved_json["flot"]["data"] + ) + self.assertAlmostEqualGeneric( + test_json["explained_variance_ratios"], + saved_json["explained_variance_ratios"], + ) + self.assertAlmostEqualGeneric(test_json["components"], saved_json["components"]) + self.assertEqual(len(pca.process_warning), 0) + + inputs["genes"] = [ + "DPU_G0067106", + "DPU_G0067102", + "DPU_G0067112", + ] # all non-zero + pca = self.run_process("pca-beta", inputs) + saved_json, test_json = self.get_json( + "pca_plot_beta_2.json.gz", pca.output["pca"] + ) + self.assertAlmostEqualGeneric( + test_json["flot"]["data"], saved_json["flot"]["data"] + ) + self.assertAlmostEqualGeneric( + test_json["explained_variance_ratios"], + saved_json["explained_variance_ratios"], + ) + self.assertAlmostEqualGeneric(test_json["components"], saved_json["components"]) + self.assertEqual(len(pca.process_warning), 0) + @with_resolwe_host @tag_process("pca") def test_pca(self): diff --git a/resolwe_bio/tools/pca.py b/resolwe_bio/tools/pca.py deleted file mode 100755 index 71cc2a433..000000000 --- a/resolwe_bio/tools/pca.py +++ /dev/null @@ -1,145 +0,0 @@ -#!/usr/bin/env python3 -"""Principal components analysis.""" - -import argparse -import json - -import numpy as np -import pandas as pd -from resolwe_runtime_utils import send_message, warning -from sklearn.decomposition import PCA - - -def get_args(): - """Parse command-line arguments.""" - parser = argparse.ArgumentParser(description="PCA") - parser.add_argument( - "--sample-files", "-f", nargs="+", help="Sample file names", required=True - ) - parser.add_argument( - "--sample-ids", "-i", nargs="+", help="Sample IDs", required=True - ) - parser.add_argument("--gene-labels", "-g", nargs="+", help="Filter genes by label") - parser.add_argument( - "--components", "-c", help="Number of PCA components", type=int, default=2 - ) - parser.add_argument("--output-fn", "-o", help="Output file name") - return parser.parse_args() - - -def component_top_factors(component, allgenes_array, max_size=20): - """Return top 20 absolute factors.""" - abs_component = np.abs(component) - size = min(component.size, max_size) - unordered_ixs = np.argpartition(abs_component, -size)[-size:] - ixs = unordered_ixs[np.argsort(abs_component[unordered_ixs])[::-1]] - if ixs.size == 0: - return [] - return list(zip(np.array(allgenes_array)[ixs].tolist(), component[ixs].tolist())) - - -def get_pca(expressions=pd.DataFrame(), n_components=2, gene_labels=[]): - """Compute PCA.""" - if not gene_labels: - gene_labels = expressions.index - skipped_gene_labels = list(set(gene_labels).difference(expressions.index)) - - if expressions.shape[0] < 2 or expressions.shape[1] < 2: - coordinates = [[0.0, 0.0] for i in range(expressions.shape[1])] - all_components = [[], []] - all_explained_variance_ratios = [0.0, 0.0] - else: - pca = PCA(n_components=n_components, whiten=True) - pca_expressions = pca.fit_transform(expressions.transpose()) - - coordinates = [ - t[:2].tolist() if len(t) > 1 else [t[0], 0.0] for t in pca_expressions - ] - all_components = [ - component_top_factors(component, gene_labels) - for component in pca.components_ - ] - if np.isnan(pca.explained_variance_ratio_).any(): - all_explained_variance_ratios = [0.0 for _ in pca.explained_variance_ratio_] - else: - all_explained_variance_ratios = pca.explained_variance_ratio_.tolist() - - result = { - "coordinates": coordinates, - "all_components": all_components, - "all_explained_variance_ratios": all_explained_variance_ratios, - "skipped_gene_labels": skipped_gene_labels, - "warning": None, - } - - if expressions.empty: - send_message( - warning( - "Gene selection and filtering resulted in no genes. Please select different samples or genes." - ) - ) - - return result - - -def save_pca(result={}, sample_ids=[], output_fn=None, max_size=10): - """Save PCA.""" - data = { - "flot": { - "data": result["coordinates"], - "xlabel": "PC 1", - "ylabel": "PC 2", - "sample_ids": sample_ids, - }, - "zero_gene_symbols": result["skipped_gene_labels"], - "components": result["all_components"][:max_size], - "all_components": result["all_components"], - "explained_variance_ratios": result["all_explained_variance_ratios"][:max_size], - "all_explained_variance_ratios": result["all_explained_variance_ratios"], - } - - if output_fn: - with open(output_fn, "w") as outfile: - json.dump(data, outfile, separators=(",", ":"), allow_nan=False) - else: - print(json.dumps(data, separators=(",", ":"), allow_nan=False)) - - -def read_csv(fname): - """Read CSV file and return Pandas DataFrame.""" - csv = pd.read_csv( - filepath_or_buffer=fname, - sep="\t", - header=0, - index_col=0, - dtype={ - 0: str, - 1: float, - }, - keep_default_na=False, - ) - csv.index = csv.index.map(str) - return csv - - -def get_csv(fnames): - """Read CSV files and return Pandas DataFrame.""" - expressions = [read_csv(fname) for fname in fnames] - return pd.concat(expressions, axis=1, join="inner") - - -def main(): - """Read data, run PCA, and output results.""" - args = get_args() - expressions = get_csv(args.sample_files) - - if args.gene_labels: - gene_labels = set(args.gene_labels).intersection(expressions.index) - expressions = expressions.loc[gene_labels] - - result = get_pca(expressions, args.components, args.gene_labels) - save_pca(result, args.sample_ids, args.output_fn) - - -if __name__ == "__main__": - main()