diff --git a/genomkit/coverages/gcoverages.py b/genomkit/coverages/gcoverages.py index d43b054..6e7c410 100644 --- a/genomkit/coverages/gcoverages.py +++ b/genomkit/coverages/gcoverages.py @@ -3,6 +3,7 @@ import pandas as pd import pysam from tqdm import tqdm +from genomkit import GRegions, GRegion class GCoverages: @@ -13,7 +14,7 @@ class GCoverages: genomic coverages. It provides utilities for handling and analyzing the interactions of many genomic coverages. """ - def __init__(self, bin_size: int = 1): + def __init__(self, bin_size: int = 1, load: str = ""): """Initialize GCoverages object. :param bin_size: Size of the bin for coverage calculation. @@ -22,45 +23,87 @@ def __init__(self, bin_size: int = 1): """ self.coverage = {} self.bin_size = bin_size - - def load_coverage_from_bigwig(self, filename: str): + if load.lower().endswith(".bw") or load.lower().endswith(".bigwig"): + self.load_coverage_from_bigwig(load) + elif load.lower().endswith(".bam"): + self.calculate_coverage_from_bam(load) + elif load.lower().endswith(".bed"): + from genomkit import GRegions + regions = GRegions(name=load, load=load) + self.calculate_coverage_GRegions(regions=regions, + scores=regions) + + def load_coverage_from_bigwig(self, filename: str, windows=False): """Load coverage data from a bigwig file. :param filename: Path to the bigwig file. :type filename: str + :param windows: GRegions for extracting the coverage profile + :type windows: GRegions """ bw = pyBigWig.open(filename) - chromosomes = bw.chroms() - for chrom, chrom_length in chromosomes.items(): - coverage = bw.values(chrom, 0, chrom_length, numpy=True) - if self.bin_size > 1: - coverage = [np.mean(coverage[i:i+self.bin_size]) - for i in range(0, len(coverage), self.bin_size)] - self.coverage[chrom] = coverage + if not windows: + # Get the coverage on the whole genome + chrom_regions = GRegions(name="chromosomes") + chromosomes = bw.chroms() + for chrom, chrom_length in chromosomes.items(): + chrom_regions.add(GRegion(sequence=chrom, + start=0, + end=int(chrom_length))) + for r in chrom_regions: + coverage = bw.values(r.sequence, r.start, r.end, numpy=True) + if self.bin_size > 1: + coverage = [np.mean(coverage[i:i+self.bin_size]) + for i in range(0, len(coverage), + self.bin_size)] + self.coverage[r] = coverage + else: + # Get only the coverage on the defined regions + assert isinstance(windows, GRegions) + for window in windows: + coverage = bw.values(window.sequence, + window.start, + window.end, + numpy=True) + if self.bin_size > 1: + coverage = [np.mean(coverage[i:i+self.bin_size]) + for i in range(0, len(coverage), + self.bin_size)] + self.coverage[window] = coverage bw.close() - def calculate_coverage_from_bam(self, filename: str): + def calculate_coverage_from_bam(self, filename: str, windows=False): """Calculate coverage from a BAM file. :param filename: Path to the BAM file. :type filename: str + :param windows: GRegions for extracting the coverage profile + :type windows: GRegions """ bam = pysam.AlignmentFile(filename, "rb") - for pileupcolumn in bam.pileup(): - chrom = bam.get_reference_name(pileupcolumn.reference_id) - if chrom not in self.coverage: - self.coverage[chrom] = [0] * bam.get_reference_length(chrom) - self.coverage[chrom][pileupcolumn.reference_pos] += 1 + if not windows: + # Get the coverage of the whole genome + for pileupcolumn in bam.pileup(): + chrom = bam.get_reference_name(pileupcolumn.reference_id) + chrom_len = bam.get_reference_length(chrom) + r = GRegion(sequence=chrom, start=0, end=chrom_len) + if chrom not in [r.sequence for r in self.coverage.keys()]: + self.coverage[r] = [0] * chrom_len + self.coverage[r][pileupcolumn.reference_pos] += 1 + if windows: + for r in windows: + self.coverage[r] = [0] * len(r) + for pileupcolumn in bam.pileup(r.sequence, r.start, r.end): + self.coverage[r][pileupcolumn.reference_pos-r.start] += 1 bam.close() # Adjust for bin size - for chrom in self.coverage: + for r in self.coverage.keys(): if self.bin_size > 1: - self.coverage[chrom] = [sum(self.coverage[chrom] - [i:i+self.bin_size]) - / self.bin_size - for i in range(0, - len(self.coverage[chrom]), - self.bin_size)] + self.coverage[r] = [sum(self.coverage[r][i:i+self.bin_size]) / + self.bin_size + for i in range(0, + len(self.coverage[r]), + self.bin_size)] def calculate_coverage_GRegions(self, regions, scores, strandness: bool = False): @@ -93,7 +136,7 @@ def calculate_coverage_GRegions(self, regions, scores, for i in range(start_ind, end_ind): self.coverage[region][i] += target.score - def get_coverage(self, seq_name: str): + def get_coverage(self, gregion: str): """Get coverage data for a specific sequence by name. This sequence can be a chromosome or a genomic region. @@ -102,7 +145,7 @@ def get_coverage(self, seq_name: str): :return: Coverage data for the specified sequence. :rtype: numpy array """ - return self.coverage.get(seq_name, []) + return self.coverage.get(gregion, []) def filter_regions_coverage(self, regions): """Filter regions for their coverages. @@ -114,12 +157,16 @@ def filter_regions_coverage(self, regions): :rtype: dict """ filtered_coverages = {} - for region in regions: - if region.sequence in self.coverage: - start = region.start - 1 # 0-based indexing - end = region.end - coverage = self.coverage[region.sequence][start:end] - filtered_coverages[region] = coverage + for r in regions: + for w in self.coverage.keys(): + if r.overlap(w): + offset1 = max(r.start - w.start, 0) + offset2 = min(r.end - w.start, w.end - w.start) + if self.bin_size > 1: + offset1 = offset1 // self.bin_size + offset2 = offset2 // self.bin_size + coverage = self.coverage[w][offset1:offset2] + filtered_coverages[r] = coverage return filtered_coverages def total_sequencing_depth(self): diff --git a/genomkit/coverages/gcoverages_set.py b/genomkit/coverages/gcoverages_set.py new file mode 100644 index 0000000..6d8d0ec --- /dev/null +++ b/genomkit/coverages/gcoverages_set.py @@ -0,0 +1,30 @@ +from genomkit import GCoverages +from collections import OrderedDict + + +class GCoveragesSet: + """ + GCoveragesSet module + + This module contains functions and classes for working with multiple + collections of genomic coverages. It provides utilities for handling and + analyzing the interactions of many genomic coverages. + """ + + def __init__(self, name: str = "", load_dict=None): + """Initiate a GCoveragesSet object which can contain multiple + GCoverages. + + :param name: Define the name, defaults to "" + :type name: str, optional + :param load_dict: Given the file paths of multiple GCoverages as a + dictionary with names as keys and values as file + paths, defaults to None + :type load_dict: dict, optional + """ + self.collection = OrderedDict() + if load_dict: + for name, filename in load_dict.items(): + self.add(name=name, + regions=GCoverages(name=name, + load=filename)) \ No newline at end of file diff --git a/tests/test_gcoverages.py b/tests/test_gcoverages.py index ca7af26..ff9da60 100644 --- a/tests/test_gcoverages.py +++ b/tests/test_gcoverages.py @@ -17,8 +17,8 @@ def test_load_coverage_from_bigwig(self): # cov.load_coverage_from_bigwig(filename="tests/test_files/bigwig/test.bw") # {'1': 195471971, '10': 130694993} self.assertEqual(len(cov.coverage), 2) - self.assertEqual(list(cov.coverage.keys())[0], "1") - self.assertEqual(list(cov.coverage.keys())[1], "10") + self.assertEqual(list(cov.coverage.keys())[0].sequence, "1") + self.assertEqual(list(cov.coverage.keys())[1].sequence, "10") cov = GCoverages() cov.load_coverage_from_bigwig( filename=os.path.join(script_path, @@ -54,14 +54,18 @@ def test_get_coverage(self): filename=os.path.join(script_path, "test_files/bigwig/test.bw") ) - res = cov.get_coverage("10") + res = cov.get_coverage(GRegion(sequence="10", + start=0, + end=130694993)) self.assertEqual(len(res), 130694993) cov = GCoverages() cov.calculate_coverage_from_bam( filename=os.path.join(script_path, "test_files/bam/Col0_C1.100k.bam") ) - res = cov.get_coverage("1") + res = cov.get_coverage(GRegion(sequence="1", + start=0, + end=30427671)) self.assertEqual(len(res), 30427671) def test_filter_regions_coverage(self):