Skip to content

Commit

Permalink
Merge pull request #31 from Ensembl/vep-results-endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
jyothishnt authored Aug 14, 2024
2 parents 2feeacf + 3f99a37 commit 054ab9e
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 34 deletions.
72 changes: 66 additions & 6 deletions app/tests/test_vep.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,33 @@
"""

TEST_PAGING_VCF = f"""##fileformat=VCFv4.2
##fileDate=20160824
##INFO=<ID=CSQ,Number=.,Type=String,Description="{CSQ_DESCRIPTION}">
#CHROM POS ID REF ALT QUAL FILTER INFO
chr19 82664 id_01 C T 50 PASS CSQ={CSQ_1}
chr19 82829 id_02 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_03 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_04 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_05 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_06 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_07 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_08 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_09 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_10 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_11 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_12 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_13 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_14 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_15 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_16 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_17 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_18 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_19 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_20 T A 50 PASS CSQ={CSQ_2}
chr19 82829 id_21 T A 50 PASS CSQ={CSQ_2}
"""


def test_get_prediction_index_map():

Expand Down Expand Up @@ -141,16 +168,15 @@ def test_get_alt_allele_details_intergenic():
results.predicted_molecular_consequences[0].consequences[0]
== "intergenic_variant"
)



def test_get_results_from_stream():
results = get_results_from_stream(100, 0, StringIO(TEST_VCF))

expected_index = {TARGET_COLUMNS[x]: x for x in range(0, len(TARGET_COLUMNS))}

assert len(results.variants) == 2

assert results.metadata.pagination.page == 0
assert results.metadata.pagination.page == 1
assert results.metadata.pagination.per_page == 100
assert results.metadata.pagination.total == 2

Expand All @@ -176,18 +202,52 @@ def test_get_results_from_stream():
== None
)

def test_paging():
results = get_results_from_stream(5, 1, StringIO(TEST_PAGING_VCF))

assert(results.metadata.pagination.page == 1)
assert(results.metadata.pagination.per_page == 5)
assert(results.metadata.pagination.total == 21)

assert(results.variants[0].name == "id_01")
assert(results.variants[-1].name == "id_05")

results = get_results_from_stream(5, 2, StringIO(TEST_PAGING_VCF))
assert(results.variants[0].name == "id_06")
assert(results.variants[-1].name == "id_10")

results = get_results_from_stream(5, 3, StringIO(TEST_PAGING_VCF))
assert(results.variants[0].name == "id_11")
assert(results.variants[-1].name == "id_15")

results = get_results_from_stream(5, 4, StringIO(TEST_PAGING_VCF))
assert(results.variants[0].name == "id_16")
assert(results.variants[-1].name == "id_20")

results = get_results_from_stream(5, 5, StringIO(TEST_PAGING_VCF))
assert(results.variants[0].name == "id_21")
assert(len(results.variants) == 1)

def test_negative_paging():
results = get_results_from_stream(5, 6, StringIO(TEST_PAGING_VCF))
assert(len(results.variants) == 0)
assert(results.metadata.pagination.total == 21)


@pytest.mark.skip(reason="Used to test against a real VCF file")
def test_get_results_with_file_and_dump():

vcf_path = (
"/Users/jon/Programming/vep-vcf-results/vep-output-phase1-options-plus-con.vcf"
#"/Users/jon/Programming/vep-vcf-results/vep-output-phase1-options-plus-con.vcf"
"/Users/jon/Programming/ensembl-web-tools-api/test_VEP.vcf.gz"
)
results = get_results_from_path(100, 2, vcf_path)
results = get_results_from_path(100, 1, vcf_path)

expected_index = {TARGET_COLUMNS[x]: x for x in range(0, len(TARGET_COLUMNS))}

with open("dump.json", "w") as test_dump:
test_dump.write(results.json())

assert len(results.variants) == 100
#assert results.variants[0].name =="rs1405511870"
assert results.metadata.pagination.total == 1
assert len(results.variants) == 1
58 changes: 39 additions & 19 deletions app/vep/utils/vcf_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
object as defined in APISpecification"""

import vcfpy
from typing import List, Dict, Any
from typing import List, Dict, Any, IO

from vep.models import vcf_results_model as model
from app.vep.models import vcf_results_model as model

TARGET_COLUMNS = [
"Allele",
Expand Down Expand Up @@ -51,9 +51,8 @@ def _set_allele_type(alt_one_bp: bool, ref_one_bp: bool, ref_alt_equal_bp: bool)


def _get_prediction_index_map(
csq_header: str,
target_columns: List[str] = None
) -> Dict:
csq_header: str, target_columns: List[str] = None
) -> Dict:
"""Creates a dictionary of column indexes based
on the CSQ info description"""
if not target_columns:
Expand Down Expand Up @@ -101,7 +100,7 @@ def _get_alt_allele_details(
if len(cons) == 0:
cons = []
else:
cons = cons.split('&')
cons = cons.split("&")
if csq_values[index_map["Feature_type"]] == "Transcript":
is_cononical = (
_get_csq_value(csq_values, "CANONICAL", "NO", index_map) == "YES"
Expand Down Expand Up @@ -147,9 +146,9 @@ def get_results_from_path(
) -> model.VepResultsResponse:
"""Helper method that converts a file path to a stream
for use with get_results_from_stream"""
# Check file file exists
with open(vcf_path, encoding="utf-8") as vcf_stream:
return get_results_from_stream(page_size, page, vcf_stream)
# Todo check file file exists
vcf_records = vcfpy.Reader.from_path(vcf_path)
return _get_results_from_vcfpy(page_size, page, vcf_records)


def get_results_from_stream(
Expand All @@ -160,6 +159,14 @@ def get_results_from_stream(

# Load vcf
vcf_records = vcfpy.Reader.from_stream(vcf_stream)
return _get_results_from_vcfpy(page_size, page, vcf_records)


def _get_results_from_vcfpy(
page_size: int, page: int, vcf_records: vcfpy.Reader
) -> model.VepResultsResponse:
"""Generates a page of VCF data in the format described in
APISpecification.yaml for a given VCFPY reader"""

# Parse csq header
prediction_index_map = _get_prediction_index_map(
Expand All @@ -168,7 +175,9 @@ def get_results_from_stream(

# handle offset
count = 0
offset = page_size * page
if page < 1:
page = 1
offset = page_size * (page - 1)

# This is very slow. We need to find a better way of handling this.
# vcfpy __next__ might be the key as it reads lines
Expand All @@ -177,10 +186,21 @@ def get_results_from_stream(
count += 1
if count >= offset:
break

#user asked for a page out of range
if offset > count:
return model.VepResultsResponse(
metadata=model.Metadata(
pagination=model.PaginationMetadata(
page=page, per_page=page_size, total=count
)
),
variants=[],
)

# build page
variants = []
count = 0
page_count = 0

# populate page
for record in vcf_records:
Expand Down Expand Up @@ -209,30 +229,30 @@ def get_results_from_stream(
model.Variant(
name=";".join(record.ID) if len(record.ID) > 0 else ".",
location=location,
reference_allele=model.ReferenceVariantAllele(allele_sequence=record.REF),
reference_allele=model.ReferenceVariantAllele(
allele_sequence=record.REF
),
alternative_alleles=alt_alleles,
allele_type=_set_allele_type(
longest_alt < 2, ref_len < 2, longest_alt == ref_len
)[0],
)
)

count += 1
if count >= page_size:
page_count += 1
if page_count >= page_size:
break

# Also very slow. We could compute this and add it to the VCF header
total = offset + count
total = offset + page_count
for _r in vcf_records:
total += 1

return model.VepResultsResponse(
metadata=model.Metadata(
pagination=model.PaginationMetadata(
page=page,
per_page=page_size,
total=total
page=page, per_page=page_size, total=total
)
),
variants=variants
variants=variants,
)
54 changes: 45 additions & 9 deletions app/vep/vep_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from requests import HTTPError
from starlette.responses import JSONResponse, PlainTextResponse, FileResponse
from fastapi import Request, status, APIRouter
from enum import Enum

from core.error_response import response_error_handler
from core.logging import InterceptHandler
Expand All @@ -33,13 +34,20 @@
)
from vep.models.upload_vcf_files import Streamer, MaxBodySizeException
from vep.utils.nextflow import launch_workflow, get_workflow_status
from vep.utils.vcf_results import get_results_from_path
import json
from pydantic import FilePath

logging.getLogger().handlers = [InterceptHandler()]

router = APIRouter()

class VepStatus(str, Enum):
submitted = "SUBMITTED"
running = "RUNNING"
succeeded = "SUCCEEDED"
failed = "FAILED"
cancelled = "CANCELLED"

@router.post("/submissions", name="submit_vep")
async def submit_vep(request: Request):
Expand Down Expand Up @@ -95,23 +103,23 @@ async def vep_status(request: Request, submission_id: str):
logging.debug(e)
return response_error_handler(result={"status": 500})

def get_vep_results_file_path(input_vcf_file):
input_vcf_path = FilePath(input_vcf_file)
return input_vcf_path.with_name(input_vcf_path.stem + "_VEP").with_suffix(".vcf.gz")


@router.get("/submissions/{submission_id}/download", name="download_results")
async def download_results(request: Request, submission_id: str):
try:
workflow_status = await get_workflow_status(submission_id)
submission_status = PipelineStatus(
submission_id=submission_id, status=workflow_status["workflow"]["status"]
submission_id=submission_id, status=workflow_status
)
if submission_status.status == "SUCCEEDED":
if submission_status.status == VepStatus.succeeded:
input_vcf_file = workflow_status["workflow"]["params"]["vcf"]
input_vcf_path = FilePath(input_vcf_file)
results_file_path = input_vcf_path.with_name("_VEP").with_suffix(".vcf.gz")
return FileResponse(
results_file_path,
media_type="application/gzip",
filename=results_file_path.name,
)
results_file_path = get_vep_results_file_path(input_vcf_file)

return FileResponse(results_file_path, media_type="application/gzip", filename=results_file_path.name)
else:
response_msg = {
"details": f"A submission with id {submission_id} is not yet finished",
Expand All @@ -135,3 +143,31 @@ async def download_results(request: Request, submission_id: str):
except Exception as e:
logging.debug(e)
return response_error_handler(result={"status": 500})

@router.get("/submissions/{submission_id}/results", name="view_results")
async def fetch_results(request: Request, submission_id: str, page: int, per_page: int):
try:
workflow_status = await get_workflow_status(submission_id)
submission_status = PipelineStatus(submission_id=submission_id, status=workflow_status)
if submission_status.status == VepStatus.succeeded:
input_vcf_file = workflow_status["workflow"]["params"]["vcf"]
results_file_path = get_vep_results_file_path(input_vcf_file)
return get_results_from_path(vcf_path = results_file_path, page=page, page_size = per_page)

except HTTPError as http_error:
if http_error.response.status_code in [403,400]:
response_msg = json.dumps(
{
"status_code": status.HTTP_404_NOT_FOUND,
"details": f"A submission with id {submission_id} was not found",
}
)
return JSONResponse(
content=response_msg, status_code=status.HTTP_404_NOT_FOUND
)
return response_error_handler(
result={"status": http_error.response.status_code}
)
except Exception as e:
logging.debug(e)
return response_error_handler(result={"status": 500})

0 comments on commit 054ab9e

Please sign in to comment.