Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameter to the /vcf endpoint to control whether VRS IDs are generated for REF alleles #74

Merged
merged 6 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ install_requires =
fastapi >= 0.95.0
python-multipart
uvicorn
ga4gh.vrs[extras] ~= 2.0.0a1
ga4gh.vrs[extras] ~= 2.0.0a2
psycopg[binary]
snowflake-connector-python ~= 3.4.1

Expand Down
41 changes: 23 additions & 18 deletions src/anyvar/extras/vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from ga4gh.vrs.extras.vcf_annotation import VCFAnnotator

from anyvar.anyvar import AnyVar
from anyvar.translate.translate import TranslatorConnectionException
from anyvar.translate.translate import TranslationException

_logger = logging.getLogger(__name__)

Expand All @@ -29,6 +29,7 @@ def annotate(
vrs_pickle_out: Optional[str] = None,
vrs_attributes: bool = False,
assembly: str = "GRCh38",
compute_for_ref: bool = True,
) -> None:
"""Annotates an input VCF file with VRS Allele IDs & creates a pickle file
containing the vrs object information.
Expand All @@ -40,13 +41,18 @@ def annotate(
fields in the INFO field. If `False` will not include these fields.
Only used if `vcf_out` is provided.
:param assembly: The assembly used in `vcf_in` data
:param compute_for_ref: If `True`, compute VRS IDs for REF alleles
"""
if self.av.object_store.batch_manager:
storage = self.av.object_store
with storage.batch_manager(storage): # type: ignore
return super().annotate(vcf_in, vcf_out, vrs_pickle_out, vrs_attributes, assembly)
return super().annotate(
vcf_in, vcf_out, vrs_pickle_out, vrs_attributes, assembly, compute_for_ref
)
else:
super().annotate(vcf_in, vcf_out, vrs_pickle_out, vrs_attributes, assembly)
super().annotate(
vcf_in, vcf_out, vrs_pickle_out, vrs_attributes, assembly, compute_for_ref
)

def _get_vrs_object(
self,
Expand Down Expand Up @@ -80,19 +86,18 @@ def _get_vrs_object(
Only used if `vcf_out` is provided. Not used by this implementation.
:return: nothing, but registers VRS objects with AnyVar storage and stashes IDs
"""
try:
vrs_object = self.av.translator.translate_vcf_row(vcf_coords)
except (TranslatorConnectionException, NotImplementedError):
pass
else:
if vrs_object:
self.av.put_object(vrs_object)
if output_pickle:
key = vrs_data_key if vrs_data_key else vcf_coords
vrs_data[key] = str(vrs_object.model_dump(exclude_none=True))
vrs_object = self.av.translator.translate_vcf_row(vcf_coords)
if vrs_object:
self.av.put_object(vrs_object)
if output_pickle:
key = vrs_data_key if vrs_data_key else vcf_coords
vrs_data[key] = str(vrs_object.model_dump(exclude_none=True))

if output_vcf:
allele_id = vrs_object.id if vrs_object else ""
vrs_field_data[self.VRS_ALLELE_IDS_FIELD].append(allele_id)

if output_vcf:
allele_id = vrs_object.id if vrs_object else ""
vrs_field_data[self.VRS_ALLELE_IDS_FIELD].append(allele_id)
else:
_logger.error(f"Translation failed: {vcf_coords}")
else:
raise TranslationException(
f"Translator returned empty VRS object for VCF coords {vcf_coords}"
)
9 changes: 7 additions & 2 deletions src/anyvar/restapi/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,15 @@ def register_vrs_object(
tags=[EndpointTag.VARIATIONS],
)
async def annotate_vcf(
request: Request, vcf: UploadFile = File(..., description="VCF to register and annotate")
request: Request,
vcf: UploadFile = File(..., description="VCF to register and annotate"),
for_ref: bool = Query(default=True, description="Whether to compute VRS IDs for REF alleles"),
):
"""Register alleles from a VCF and return a file annotated with VRS IDs.

:param request: FastAPI request object
:param vcf: incoming VCF file object
:param for_ref: whether to compute VRS IDs for REF alleles
:return: streamed annotated file
"""
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
Expand All @@ -210,7 +213,9 @@ async def annotate_vcf(
registrar = VcfRegistrar(av)
with tempfile.NamedTemporaryFile(delete=False) as temp_out_file:
try:
registrar.annotate(temp_file.name, vcf_out=temp_out_file.name)
registrar.annotate(
temp_file.name, vcf_out=temp_out_file.name, compute_for_ref=for_ref
)
except (TranslatorConnectionException, OSError) as e:
_logger.error(f"Encountered error during VCF registration: {e}")
return {"error": "VCF registration failed."}
Expand Down