diff --git a/VERSIONLOG.md b/VERSIONLOG.md index 2dee0ca9..88f37f98 100644 --- a/VERSIONLOG.md +++ b/VERSIONLOG.md @@ -1,5 +1,9 @@ # Scilifelab_epps Version Log +## 20241016.1 + +Remove index orientation checker + ## 20241015.1 Improve project validator EPP diff --git a/scripts/index_distance_checker.py b/scripts/index_distance_checker.py index e2139eca..9572d899 100644 --- a/scripts/index_distance_checker.py +++ b/scripts/index_distance_checker.py @@ -6,10 +6,9 @@ import sys from argparse import ArgumentParser -import psycopg2 import yaml from genologics.config import BASEURI, PASSWORD, USERNAME -from genologics.entities import Process, Project +from genologics.entities import Process from genologics.lims import Lims from data.Chromium_10X_indexes import Chromium_10X_indexes @@ -79,209 +78,6 @@ def verify_indexes(data): return message -def verify_orientation(data): - message = [] - connection = psycopg2.connect( - user=config["username"], - host=config["url"], - database=config["db"], - password=config["password"], - ) - cursor = connection.cursor() - query = ( - "select reagenttype.name from reagenttype " "where reagenttype.name like '{}%';" - ) - # We only search against part of the index sets that exist in LIMS - index_sets_10nt = [ - "AmpliconUD_UDP_", - "IDT_10nt_UD_", - "NexteraUD_UDP_", - "QIAseq_UX_UDI_", - "v2_IDT_10nt_UD_", - "v3_Illumina_10nt_UD_", - "xGen_Normalase_10nt_UDI_", - "%_SI-NN-", - "%_SI-NT-", - "%_SI-TN-", - "%_SI-TS-", - "%_SI-TT-", - ] - index_sets_8nt = [ - "NexteraCD", - "NexteraXT_", - "Nextera16S_", - "Nextera FS Dual", - "SMARTerDNA_", - "SMARTerV2_", - "SMARTer_RNA_UD_", - "Swift_SNAP_", - "TAKARA_8nt_UDI_", - "TruSeqUDv2-UDI_", - "v2_Illumina_TruSeq_8nt_UD_", - "v2_NexteraXT_", - "xGen_8nt_UDI_", - ] - pools = {x["pool"] for x in data} - for p in sorted(pools): - subset = [ - i for i in data if i["pool"] == p and not is_special_idx(i["idx_name"]) - ] - if not subset: - continue - subset = sorted(subset, key=lambda d: d["sn"]) - if NGISAMPLE_PAT.findall(subset[0].get("sn", "")): - project_id = subset[0]["sn"].split("_")[0] - project_info = Project(lims, id=project_id) - seq_platform = project_info.udf.get("Sequencing platform") - else: - # The error message is skipped here since the verify_samplename function will check the names of all samples - seq_platform = "" - idx1_len = list(set([len(i["idx1"]) for i in subset if i["idx1"]])) - idx2_len = list(set([len(i["idx2"]) for i in subset if i["idx2"]])) - if len(idx1_len) == len(idx2_len) == 1 and idx1_len[0] == idx2_len[0] == 8: - search_index_sets = index_sets_8nt - elif len(idx1_len) == len(idx2_len) == 1 and idx1_len[0] == idx2_len[0] == 10: - search_index_sets = index_sets_10nt - else: - message.append( - f"Unable to check index orientations due to index length for pool {p}" - ) - continue - # Search through the index sets for the first and last samples in the pool to save time - flag_idx_search = False - for idx_set in search_index_sets: - cursor.execute(query.format(idx_set)) - query_output = cursor.fetchall() - flag_first_sample = "" - flag_last_sample = "" - for out in query_output: - index1 = IDX_PAT.findall(out[0])[0][0] - index2 = IDX_PAT.findall(out[0])[0][1] - # Convert index 2 to RC for MiSeq projects - if seq_platform: - if "MISEQ" in seq_platform.upper(): - index2 = rc(index2) - # Check the first sample - if subset[0]["idx1"] == index1 and subset[0]["idx2"] == index2: - flag_first_sample = "CORRECT" - elif subset[0]["idx1"] == rc(index1) and subset[0]["idx2"] == index2: - flag_first_sample = "Index1_RC" - elif subset[0]["idx1"] == index1 and subset[0]["idx2"] == rc(index2): - flag_first_sample = "Index2_RC" - elif subset[0]["idx1"] == rc(index1) and subset[0]["idx2"] == rc( - index2 - ): - flag_first_sample = "Index1_and_Index2_RC" - elif subset[0]["idx1"] == index2 and subset[0]["idx2"] == index1: - flag_first_sample = "Index1_and_Index2_Swapped" - elif subset[0]["idx1"] == rc(index2) and subset[0]["idx2"] == index1: - flag_first_sample = "Index1_and_Index2_Swapped_plus_Index1_RC" - elif subset[0]["idx1"] == index2 and subset[0]["idx2"] == rc(index1): - flag_first_sample = "Index1_and_Index2_Swapped_plus_Index2_RC" - elif subset[0]["idx1"] == rc(index2) and subset[0]["idx2"] == rc( - index1 - ): - flag_first_sample = ( - "Index1_and_Index2_Swapped_plus_Index1_and_Index2_RC" - ) - # Check the last sample - if subset[-1]["idx1"] == index1 and subset[-1]["idx2"] == index2: - flag_last_sample = "CORRECT" - elif subset[-1]["idx1"] == rc(index1) and subset[-1]["idx2"] == index2: - flag_last_sample = "Index1_RC" - elif subset[-1]["idx1"] == index1 and subset[-1]["idx2"] == rc(index2): - flag_last_sample = "Index2_RC" - elif subset[-1]["idx1"] == rc(index1) and subset[-1]["idx2"] == rc( - index2 - ): - flag_last_sample = "Index1_and_Index2_RC" - elif subset[0]["idx1"] == index2 and subset[0]["idx2"] == index1: - flag_last_sample = "Index1_and_Index2_Swapped" - elif subset[0]["idx1"] == rc(index2) and subset[0]["idx2"] == index1: - flag_last_sample = "Index1_and_Index2_Swapped_plus_Index1_RC" - elif subset[0]["idx1"] == index2 and subset[0]["idx2"] == rc(index1): - flag_last_sample = "Index1_and_Index2_Swapped_plus_Index2_RC" - elif subset[0]["idx1"] == rc(index2) and subset[0]["idx2"] == rc( - index1 - ): - flag_last_sample = ( - "Index1_and_Index2_Swapped_plus_Index1_and_Index2_RC" - ) - # Make a conclusion - if flag_first_sample == flag_last_sample == "CORRECT": - flag_idx_search = True - break - elif flag_first_sample == flag_last_sample == "Index1_RC": - message.append( - f"Seems that Index 1 needs to be converted to RC for pool {p}" - ) - flag_idx_search = True - break - elif flag_first_sample == flag_last_sample == "Index2_RC": - message.append( - f"Seems that Index 2 needs to be converted to RC for pool {p}" - ) - flag_idx_search = True - break - elif flag_first_sample == flag_last_sample == "Index1_and_Index2_RC": - message.append( - f"Seems that both Index 1 and Index 2 need to be converted to RC for pool {p}" - ) - flag_idx_search = True - break - elif flag_first_sample == flag_last_sample == "Index1_and_Index2_Swapped": - message.append( - f"Seems that Index 1 and Index 2 are swapped for pool {p}" - ) - flag_idx_search = True - break - elif ( - flag_first_sample - == flag_last_sample - == "Index1_and_Index2_Swapped_plus_Index1_RC" - ): - message.append( - f"Seems that Index 1 and Index 2 are swapped, and Index 1 needs to be converted to RC for pool {p}" - ) - flag_idx_search = True - break - elif ( - flag_first_sample - == flag_last_sample - == "Index1_and_Index2_Swapped_plus_Index2_RC" - ): - message.append( - f"Seems that Index 1 and Index 2 are swapped, and Index 2 needs to be converted to RC for pool {p}" - ) - flag_idx_search = True - break - elif ( - flag_first_sample - == flag_last_sample - == "Index1_and_Index2_Swapped_plus_Index1_and_Index2_RC" - ): - message.append( - f"Seems that Index 1 and Index 2 are swapped, and both Index 1 and Index 2 need to be converted to RC for pool {p}" - ) - flag_idx_search = True - break - elif flag_first_sample != flag_last_sample: - message.append(f"Inconsistent Index pattern detected for pool {p}") - flag_idx_search = True - break - if not flag_idx_search: - message.append( - f"Unable to find matched index set to check orientation for pool {p}" - ) - - # Close connections - if connection: - cursor.close() - connection.close() - - return message - - def is_special_idx(idx_name): if ( TENX_DUAL_PAT.findall(idx_name) @@ -612,7 +408,6 @@ def main(lims, pid, auto): message += verify_placement(data) message += verify_indexes(data) message += verify_samplename(data) - message += verify_orientation(data) else: message = check_index_distance(data) warning_start = "**Warnings from Verify Indexes and Placement EPP: **\n"