Skip to content

Commit

Permalink
Merge pull request #377 from chuan-wang/master
Browse files Browse the repository at this point in the history
Remove index orientation checker
  • Loading branch information
chuan-wang authored Oct 16, 2024
2 parents d2815b3 + 5056d2f commit 38d73ae
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 206 deletions.
4 changes: 4 additions & 0 deletions VERSIONLOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Scilifelab_epps Version Log

## 20241016.1

Remove index orientation checker

## 20241015.1

Improve project validator EPP
Expand Down
207 changes: 1 addition & 206 deletions scripts/index_distance_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
import sys
from argparse import ArgumentParser

import psycopg2
import yaml
from genologics.config import BASEURI, PASSWORD, USERNAME
from genologics.entities import Process, Project
from genologics.entities import Process
from genologics.lims import Lims

from data.Chromium_10X_indexes import Chromium_10X_indexes
Expand Down Expand Up @@ -79,209 +78,6 @@ def verify_indexes(data):
return message


def verify_orientation(data):
message = []
connection = psycopg2.connect(
user=config["username"],
host=config["url"],
database=config["db"],
password=config["password"],
)
cursor = connection.cursor()
query = (
"select reagenttype.name from reagenttype " "where reagenttype.name like '{}%';"
)
# We only search against part of the index sets that exist in LIMS
index_sets_10nt = [
"AmpliconUD_UDP_",
"IDT_10nt_UD_",
"NexteraUD_UDP_",
"QIAseq_UX_UDI_",
"v2_IDT_10nt_UD_",
"v3_Illumina_10nt_UD_",
"xGen_Normalase_10nt_UDI_",
"%_SI-NN-",
"%_SI-NT-",
"%_SI-TN-",
"%_SI-TS-",
"%_SI-TT-",
]
index_sets_8nt = [
"NexteraCD",
"NexteraXT_",
"Nextera16S_",
"Nextera FS Dual",
"SMARTerDNA_",
"SMARTerV2_",
"SMARTer_RNA_UD_",
"Swift_SNAP_",
"TAKARA_8nt_UDI_",
"TruSeqUDv2-UDI_",
"v2_Illumina_TruSeq_8nt_UD_",
"v2_NexteraXT_",
"xGen_8nt_UDI_",
]
pools = {x["pool"] for x in data}
for p in sorted(pools):
subset = [
i for i in data if i["pool"] == p and not is_special_idx(i["idx_name"])
]
if not subset:
continue
subset = sorted(subset, key=lambda d: d["sn"])
if NGISAMPLE_PAT.findall(subset[0].get("sn", "")):
project_id = subset[0]["sn"].split("_")[0]
project_info = Project(lims, id=project_id)
seq_platform = project_info.udf.get("Sequencing platform")
else:
# The error message is skipped here since the verify_samplename function will check the names of all samples
seq_platform = ""
idx1_len = list(set([len(i["idx1"]) for i in subset if i["idx1"]]))
idx2_len = list(set([len(i["idx2"]) for i in subset if i["idx2"]]))
if len(idx1_len) == len(idx2_len) == 1 and idx1_len[0] == idx2_len[0] == 8:
search_index_sets = index_sets_8nt
elif len(idx1_len) == len(idx2_len) == 1 and idx1_len[0] == idx2_len[0] == 10:
search_index_sets = index_sets_10nt
else:
message.append(
f"Unable to check index orientations due to index length for pool {p}"
)
continue
# Search through the index sets for the first and last samples in the pool to save time
flag_idx_search = False
for idx_set in search_index_sets:
cursor.execute(query.format(idx_set))
query_output = cursor.fetchall()
flag_first_sample = ""
flag_last_sample = ""
for out in query_output:
index1 = IDX_PAT.findall(out[0])[0][0]
index2 = IDX_PAT.findall(out[0])[0][1]
# Convert index 2 to RC for MiSeq projects
if seq_platform:
if "MISEQ" in seq_platform.upper():
index2 = rc(index2)
# Check the first sample
if subset[0]["idx1"] == index1 and subset[0]["idx2"] == index2:
flag_first_sample = "CORRECT"
elif subset[0]["idx1"] == rc(index1) and subset[0]["idx2"] == index2:
flag_first_sample = "Index1_RC"
elif subset[0]["idx1"] == index1 and subset[0]["idx2"] == rc(index2):
flag_first_sample = "Index2_RC"
elif subset[0]["idx1"] == rc(index1) and subset[0]["idx2"] == rc(
index2
):
flag_first_sample = "Index1_and_Index2_RC"
elif subset[0]["idx1"] == index2 and subset[0]["idx2"] == index1:
flag_first_sample = "Index1_and_Index2_Swapped"
elif subset[0]["idx1"] == rc(index2) and subset[0]["idx2"] == index1:
flag_first_sample = "Index1_and_Index2_Swapped_plus_Index1_RC"
elif subset[0]["idx1"] == index2 and subset[0]["idx2"] == rc(index1):
flag_first_sample = "Index1_and_Index2_Swapped_plus_Index2_RC"
elif subset[0]["idx1"] == rc(index2) and subset[0]["idx2"] == rc(
index1
):
flag_first_sample = (
"Index1_and_Index2_Swapped_plus_Index1_and_Index2_RC"
)
# Check the last sample
if subset[-1]["idx1"] == index1 and subset[-1]["idx2"] == index2:
flag_last_sample = "CORRECT"
elif subset[-1]["idx1"] == rc(index1) and subset[-1]["idx2"] == index2:
flag_last_sample = "Index1_RC"
elif subset[-1]["idx1"] == index1 and subset[-1]["idx2"] == rc(index2):
flag_last_sample = "Index2_RC"
elif subset[-1]["idx1"] == rc(index1) and subset[-1]["idx2"] == rc(
index2
):
flag_last_sample = "Index1_and_Index2_RC"
elif subset[0]["idx1"] == index2 and subset[0]["idx2"] == index1:
flag_last_sample = "Index1_and_Index2_Swapped"
elif subset[0]["idx1"] == rc(index2) and subset[0]["idx2"] == index1:
flag_last_sample = "Index1_and_Index2_Swapped_plus_Index1_RC"
elif subset[0]["idx1"] == index2 and subset[0]["idx2"] == rc(index1):
flag_last_sample = "Index1_and_Index2_Swapped_plus_Index2_RC"
elif subset[0]["idx1"] == rc(index2) and subset[0]["idx2"] == rc(
index1
):
flag_last_sample = (
"Index1_and_Index2_Swapped_plus_Index1_and_Index2_RC"
)
# Make a conclusion
if flag_first_sample == flag_last_sample == "CORRECT":
flag_idx_search = True
break
elif flag_first_sample == flag_last_sample == "Index1_RC":
message.append(
f"Seems that Index 1 needs to be converted to RC for pool {p}"
)
flag_idx_search = True
break
elif flag_first_sample == flag_last_sample == "Index2_RC":
message.append(
f"Seems that Index 2 needs to be converted to RC for pool {p}"
)
flag_idx_search = True
break
elif flag_first_sample == flag_last_sample == "Index1_and_Index2_RC":
message.append(
f"Seems that both Index 1 and Index 2 need to be converted to RC for pool {p}"
)
flag_idx_search = True
break
elif flag_first_sample == flag_last_sample == "Index1_and_Index2_Swapped":
message.append(
f"Seems that Index 1 and Index 2 are swapped for pool {p}"
)
flag_idx_search = True
break
elif (
flag_first_sample
== flag_last_sample
== "Index1_and_Index2_Swapped_plus_Index1_RC"
):
message.append(
f"Seems that Index 1 and Index 2 are swapped, and Index 1 needs to be converted to RC for pool {p}"
)
flag_idx_search = True
break
elif (
flag_first_sample
== flag_last_sample
== "Index1_and_Index2_Swapped_plus_Index2_RC"
):
message.append(
f"Seems that Index 1 and Index 2 are swapped, and Index 2 needs to be converted to RC for pool {p}"
)
flag_idx_search = True
break
elif (
flag_first_sample
== flag_last_sample
== "Index1_and_Index2_Swapped_plus_Index1_and_Index2_RC"
):
message.append(
f"Seems that Index 1 and Index 2 are swapped, and both Index 1 and Index 2 need to be converted to RC for pool {p}"
)
flag_idx_search = True
break
elif flag_first_sample != flag_last_sample:
message.append(f"Inconsistent Index pattern detected for pool {p}")
flag_idx_search = True
break
if not flag_idx_search:
message.append(
f"Unable to find matched index set to check orientation for pool {p}"
)

# Close connections
if connection:
cursor.close()
connection.close()

return message


def is_special_idx(idx_name):
if (
TENX_DUAL_PAT.findall(idx_name)
Expand Down Expand Up @@ -612,7 +408,6 @@ def main(lims, pid, auto):
message += verify_placement(data)
message += verify_indexes(data)
message += verify_samplename(data)
message += verify_orientation(data)
else:
message = check_index_distance(data)
warning_start = "**Warnings from Verify Indexes and Placement EPP: **\n"
Expand Down

0 comments on commit 38d73ae

Please sign in to comment.