Skip to content

Commit 5bd0fb3

Browse files
Updated ncbi api processing to use multiprocessing instead of threading
Split apiworker into a separate ncbi library module to allow functionality with importlib Updated processing library to only remove added system paths after module execution
1 parent 3e06798 commit 5bd0fb3

File tree

3 files changed

+249
-214
lines changed

3 files changed

+249
-214
lines changed

dataSources/ncbi/llib/apiWorker.py

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
import sys
2+
import logging
3+
import requests
4+
from multiprocessing import Queue
5+
from requests.adapters import HTTPAdapter, Retry
6+
7+
def apiWorker(queue: Queue, id: int, apiKey: str, recordsPerCall: int, accessions: list[str]):
8+
headers = {
9+
"accept": "application/json",
10+
"api-key": apiKey
11+
}
12+
13+
params = {
14+
"page_size": recordsPerCall
15+
}
16+
17+
summaryFields = {
18+
"assembly_name": "asm_name",
19+
"pa_accession": "gbrs_paired_asm",
20+
"total_number_of_chromosomes": "replicon_count",
21+
"number_of_scaffolds": "scaffold_count",
22+
"number_of_component_sequences": "contig_count",
23+
"provider": "annotation_provider",
24+
"name": "annotation_name",
25+
"assembly_type": "assembly_type",
26+
"gc_percent": "gc_percent",
27+
"total_gene_count": "total_gene_count",
28+
"protein_coding_gene_count": "protein_coding_gene_count",
29+
"non_coding_gene_count": "non_coding_gene_count"
30+
}
31+
32+
# Suppress logs about retrying urls
33+
logging.getLogger("requests").setLevel(logging.CRITICAL)
34+
logging.getLogger("urllib3").setLevel(logging.CRITICAL)
35+
36+
session = requests.Session()
37+
retries = Retry(total=5, backoff_factor=0.1)
38+
session.mount("https://", HTTPAdapter(max_retries=retries))
39+
40+
collectionAmount = (len(accessions) / recordsPerCall).__ceil__()
41+
accessionStrings = []
42+
for collectionNumber in range(collectionAmount):
43+
accessionStrings.append("%2C".join(accessions[collectionNumber*recordsPerCall:(collectionNumber+1)*recordsPerCall]))
44+
45+
try:
46+
for string in accessionStrings:
47+
url = f"https://api.ncbi.nlm.nih.gov/datasets/v2alpha/genome/accession/{string}/dataset_report"
48+
response = session.get(url, headers=headers, params=params)
49+
data = response.json()
50+
records = data.get("reports", [])
51+
for record in records:
52+
queue.put(parseRecord(record, list(summaryFields)))
53+
except KeyboardInterrupt:
54+
pass
55+
56+
queue.put(id)
57+
58+
def parseRecord(record: dict, excludeFields: list) -> dict:
59+
def _extractKeys(d: dict, keys: list[str], prefix: str = "", suffix: str = "") -> dict:
60+
retVal = {}
61+
for key, value in d.items():
62+
if key not in keys:
63+
continue
64+
65+
if prefix and not key.startswith(prefix):
66+
key = f"{prefix}_{key}"
67+
68+
if suffix and not key.endswith(suffix):
69+
key = f"{key}_{suffix}"
70+
71+
retVal |= {key: value}
72+
73+
return retVal
74+
75+
def _extractListKeys(l: list[dict], keys: list[str], prefix: str = "", suffix: str = "") -> list:
76+
retVal = []
77+
for item in l:
78+
retVal.append(_extractKeys(item, keys, prefix, suffix))
79+
return retVal
80+
81+
def _extract(item: any, keys: list[str], prefix: str = "", suffix: str = "") -> any:
82+
if isinstance(item, list):
83+
return _extractListKeys(item, keys, prefix, suffix)
84+
elif isinstance(item, dict):
85+
return _extractKeys(item, keys, prefix, suffix)
86+
else:
87+
raise Exception(f"Unexpected item: {item}")
88+
89+
# Annotation info
90+
annotationInfo = record.get("annotation_info", {})
91+
annotationFields = [
92+
"busco", # - busco
93+
"method", # - method
94+
"name", # - name
95+
"pipeline", # - pipeline
96+
"provider", # - provider
97+
"release_date", # - releaseDate
98+
"release_version", # - releaseVersion ?
99+
"software_version", # - softwareVersion
100+
"stats", # - stats
101+
"status" # - status
102+
]
103+
104+
annotationSubFields = {
105+
"busco": [ # - busco
106+
"busco_lineage", # - buscoLineage
107+
"busco_ver", # - buscoVer
108+
"complete" # - complete
109+
],
110+
"stats": { # - stats
111+
"gene_counts": [ # - geneCounts
112+
"non_coding", # - nonCoding
113+
"other", # - other
114+
"protein_coding", # - proteinCoding
115+
"pseudogene", # - pseudogene
116+
"total" # - total
117+
]
118+
}
119+
}
120+
121+
annotationInfo: dict = _extract(annotationInfo, annotationFields)
122+
annotationInfo.update(_extract(annotationInfo.pop("busco", {}), annotationSubFields["busco"], "busco"))
123+
annotationInfo.update(_extract(annotationInfo.pop("stats", {}).get("gene_counts", {}), annotationSubFields["stats"]["gene_counts"], suffix="gene_count"))
124+
125+
# Assembly info
126+
assemblyInfo = record.get("assembly_info", {})
127+
assemblyFields = [
128+
"assembly_name", # - assemblyName
129+
"assembly_status", # - assemblyStatus
130+
"assembly_type", # - assemblyType
131+
"description", # - description ?
132+
"synonym", # - synonym ?
133+
"paired_assembly", # - pairedAssembly
134+
"linked_assemblies", # - linkedAssemblies repeated ?
135+
"diploid_role", # - diploidRole ?
136+
"atypical", # - atypical ?
137+
"genome_notes", # - genomeNotes repeated
138+
"sequencing_tech", # - sequencingTech
139+
"assembly_method", # - assemblyMethod
140+
"comments", # - comments
141+
"suppression_reason" # - suppressionReason ?
142+
]
143+
144+
assemblySubFields = {
145+
"paired_assembly": [ # - pairedAssembly
146+
"accession", # - accession
147+
"only_genbank", # - onlyGenbank
148+
"only_refseq", # - onlyRefseq ?
149+
"changed", # - Changed ?
150+
"manual_diff", # - manualDiff ?
151+
"status", # - status
152+
],
153+
"linked_assemblies": [ # - linkedAssemblies repeated ?
154+
"linked_assembly", # - linkedAssembly ?
155+
"assembly_type" # - assemblyType ?
156+
],
157+
"atypical": [ # - atypical ?
158+
"is_atypical", # - isAtypical ?
159+
"warnings" # - warnings repeated ?
160+
]
161+
}
162+
163+
assemblyInfo: dict = _extract(assemblyInfo, assemblyFields)
164+
assemblyInfo["comments"] = assemblyInfo.get("comments", "").replace("\n", "").replace("\t", "")
165+
166+
assemblyInfo.update(_extract(assemblyInfo.pop("paired_assembly", {}), assemblySubFields["paired_assembly"], "pa"))
167+
assemblyInfo.update(_extract(assemblyInfo.pop("linked_assemblies", {}), assemblySubFields["linked_assemblies"], "la"))
168+
assemblyInfo.update(_extract(assemblyInfo.pop("atypical", {}), assemblySubFields["atypical"], "at"))
169+
170+
assemblyStats = record.get("assembly_stats", {}) # Unpack normally
171+
172+
currentAccession = {"current_accession": record.get("current_accession", "")} # Should always exist
173+
174+
# May not exist
175+
organelleInfo = record.get("organelle_info", []) # - organelleInfo ?
176+
organelleInfoFields = [
177+
"description", # - description ?
178+
"submitter", # - submitter ?
179+
"total_seq_length", # - totalSeqLength ?
180+
"bioproject" # - Bioproject related
181+
]
182+
183+
organelleData = {}
184+
for info in _extract(organelleInfo, organelleInfoFields, "organelle"):
185+
organelleData[info.pop("description", "Unknown")] = info
186+
187+
typeMaterial = record.get("type_material", {}) # - typeMaterial ?
188+
typeMaterialFields = [
189+
"type_label", # - typeLabel
190+
"type_display_text", # - typeDisplayText
191+
]
192+
193+
typeMaterial = _extract(typeMaterial, typeMaterialFields)
194+
recordData = annotationInfo | assemblyInfo | assemblyStats | currentAccession | organelleData | typeMaterial
195+
return {key: value for key, value in recordData.items() if key not in excludeFields}

0 commit comments

Comments
 (0)