Skip to content

Commit

Permalink
add window in GCoverages
Browse files Browse the repository at this point in the history
  • Loading branch information
Joseph Kuo committed Mar 4, 2024
1 parent 2218bc3 commit e2ac991
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 35 deletions.
109 changes: 78 additions & 31 deletions genomkit/coverages/gcoverages.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pandas as pd
import pysam
from tqdm import tqdm
from genomkit import GRegions, GRegion


class GCoverages:
Expand All @@ -13,7 +14,7 @@ class GCoverages:
genomic coverages. It provides utilities for handling and analyzing the
interactions of many genomic coverages.
"""
def __init__(self, bin_size: int = 1):
def __init__(self, bin_size: int = 1, load: str = ""):
"""Initialize GCoverages object.
:param bin_size: Size of the bin for coverage calculation.
Expand All @@ -22,45 +23,87 @@ def __init__(self, bin_size: int = 1):
"""
self.coverage = {}
self.bin_size = bin_size

def load_coverage_from_bigwig(self, filename: str):
if load.lower().endswith(".bw") or load.lower().endswith(".bigwig"):
self.load_coverage_from_bigwig(load)
elif load.lower().endswith(".bam"):
self.calculate_coverage_from_bam(load)
elif load.lower().endswith(".bed"):
from genomkit import GRegions
regions = GRegions(name=load, load=load)
self.calculate_coverage_GRegions(regions=regions,
scores=regions)

def load_coverage_from_bigwig(self, filename: str, windows=False):
"""Load coverage data from a bigwig file.
:param filename: Path to the bigwig file.
:type filename: str
:param windows: GRegions for extracting the coverage profile
:type windows: GRegions
"""
bw = pyBigWig.open(filename)
chromosomes = bw.chroms()
for chrom, chrom_length in chromosomes.items():
coverage = bw.values(chrom, 0, chrom_length, numpy=True)
if self.bin_size > 1:
coverage = [np.mean(coverage[i:i+self.bin_size])
for i in range(0, len(coverage), self.bin_size)]
self.coverage[chrom] = coverage
if not windows:
# Get the coverage on the whole genome
chrom_regions = GRegions(name="chromosomes")
chromosomes = bw.chroms()
for chrom, chrom_length in chromosomes.items():
chrom_regions.add(GRegion(sequence=chrom,
start=0,
end=int(chrom_length)))
for r in chrom_regions:
coverage = bw.values(r.sequence, r.start, r.end, numpy=True)
if self.bin_size > 1:
coverage = [np.mean(coverage[i:i+self.bin_size])
for i in range(0, len(coverage),
self.bin_size)]
self.coverage[r] = coverage
else:
# Get only the coverage on the defined regions
assert isinstance(windows, GRegions)
for window in windows:
coverage = bw.values(window.sequence,
window.start,
window.end,
numpy=True)
if self.bin_size > 1:
coverage = [np.mean(coverage[i:i+self.bin_size])
for i in range(0, len(coverage),
self.bin_size)]
self.coverage[window] = coverage
bw.close()

def calculate_coverage_from_bam(self, filename: str):
def calculate_coverage_from_bam(self, filename: str, windows=False):
"""Calculate coverage from a BAM file.
:param filename: Path to the BAM file.
:type filename: str
:param windows: GRegions for extracting the coverage profile
:type windows: GRegions
"""
bam = pysam.AlignmentFile(filename, "rb")
for pileupcolumn in bam.pileup():
chrom = bam.get_reference_name(pileupcolumn.reference_id)
if chrom not in self.coverage:
self.coverage[chrom] = [0] * bam.get_reference_length(chrom)
self.coverage[chrom][pileupcolumn.reference_pos] += 1
if not windows:
# Get the coverage of the whole genome
for pileupcolumn in bam.pileup():
chrom = bam.get_reference_name(pileupcolumn.reference_id)
chrom_len = bam.get_reference_length(chrom)
r = GRegion(sequence=chrom, start=0, end=chrom_len)
if chrom not in [r.sequence for r in self.coverage.keys()]:
self.coverage[r] = [0] * chrom_len
self.coverage[r][pileupcolumn.reference_pos] += 1
if windows:
for r in windows:
self.coverage[r] = [0] * len(r)
for pileupcolumn in bam.pileup(r.sequence, r.start, r.end):
self.coverage[r][pileupcolumn.reference_pos-r.start] += 1
bam.close()
# Adjust for bin size
for chrom in self.coverage:
for r in self.coverage.keys():
if self.bin_size > 1:
self.coverage[chrom] = [sum(self.coverage[chrom]
[i:i+self.bin_size])
/ self.bin_size
for i in range(0,
len(self.coverage[chrom]),
self.bin_size)]
self.coverage[r] = [sum(self.coverage[r][i:i+self.bin_size]) /
self.bin_size
for i in range(0,
len(self.coverage[r]),
self.bin_size)]

def calculate_coverage_GRegions(self, regions, scores,
strandness: bool = False):
Expand Down Expand Up @@ -93,7 +136,7 @@ def calculate_coverage_GRegions(self, regions, scores,
for i in range(start_ind, end_ind):
self.coverage[region][i] += target.score

def get_coverage(self, seq_name: str):
def get_coverage(self, gregion: str):
"""Get coverage data for a specific sequence by name. This sequence
can be a chromosome or a genomic region.
Expand All @@ -102,7 +145,7 @@ def get_coverage(self, seq_name: str):
:return: Coverage data for the specified sequence.
:rtype: numpy array
"""
return self.coverage.get(seq_name, [])
return self.coverage.get(gregion, [])

def filter_regions_coverage(self, regions):
"""Filter regions for their coverages.
Expand All @@ -114,12 +157,16 @@ def filter_regions_coverage(self, regions):
:rtype: dict
"""
filtered_coverages = {}
for region in regions:
if region.sequence in self.coverage:
start = region.start - 1 # 0-based indexing
end = region.end
coverage = self.coverage[region.sequence][start:end]
filtered_coverages[region] = coverage
for r in regions:
for w in self.coverage.keys():
if r.overlap(w):
offset1 = max(r.start - w.start, 0)
offset2 = min(r.end - w.start, w.end - w.start)
if self.bin_size > 1:
offset1 = offset1 // self.bin_size
offset2 = offset2 // self.bin_size
coverage = self.coverage[w][offset1:offset2]
filtered_coverages[r] = coverage
return filtered_coverages

def total_sequencing_depth(self):
Expand Down
30 changes: 30 additions & 0 deletions genomkit/coverages/gcoverages_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from genomkit import GCoverages
from collections import OrderedDict


class GCoveragesSet:
"""
GCoveragesSet module
This module contains functions and classes for working with multiple
collections of genomic coverages. It provides utilities for handling and
analyzing the interactions of many genomic coverages.
"""

def __init__(self, name: str = "", load_dict=None):
"""Initiate a GCoveragesSet object which can contain multiple
GCoverages.
:param name: Define the name, defaults to ""
:type name: str, optional
:param load_dict: Given the file paths of multiple GCoverages as a
dictionary with names as keys and values as file
paths, defaults to None
:type load_dict: dict, optional
"""
self.collection = OrderedDict()
if load_dict:
for name, filename in load_dict.items():
self.add(name=name,
regions=GCoverages(name=name,
load=filename))
12 changes: 8 additions & 4 deletions tests/test_gcoverages.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def test_load_coverage_from_bigwig(self):
# cov.load_coverage_from_bigwig(filename="tests/test_files/bigwig/test.bw")
# {'1': 195471971, '10': 130694993}
self.assertEqual(len(cov.coverage), 2)
self.assertEqual(list(cov.coverage.keys())[0], "1")
self.assertEqual(list(cov.coverage.keys())[1], "10")
self.assertEqual(list(cov.coverage.keys())[0].sequence, "1")
self.assertEqual(list(cov.coverage.keys())[1].sequence, "10")
cov = GCoverages()
cov.load_coverage_from_bigwig(
filename=os.path.join(script_path,
Expand Down Expand Up @@ -54,14 +54,18 @@ def test_get_coverage(self):
filename=os.path.join(script_path,
"test_files/bigwig/test.bw")
)
res = cov.get_coverage("10")
res = cov.get_coverage(GRegion(sequence="10",
start=0,
end=130694993))
self.assertEqual(len(res), 130694993)
cov = GCoverages()
cov.calculate_coverage_from_bam(
filename=os.path.join(script_path,
"test_files/bam/Col0_C1.100k.bam")
)
res = cov.get_coverage("1")
res = cov.get_coverage(GRegion(sequence="1",
start=0,
end=30427671))
self.assertEqual(len(res), 30427671)

def test_filter_regions_coverage(self):
Expand Down

0 comments on commit e2ac991

Please sign in to comment.