From 7070ecd655cc2f27137477573a4237be7fcb93d8 Mon Sep 17 00:00:00 2001 From: Albern S Date: Fri, 2 Jul 2021 22:49:48 +0700 Subject: [PATCH] Refactoring and Optimization Refactoring by applying combinations of refactoring, Methods on an unoptimized code and identify which combination of refactoring methods results in better optimization of code. --- scripts/add_branch_labels.py | 26 +- scripts/add_labels.py | 127 +- scripts/developer_scripts/parse_metadata.py | 1142 +++++++++-------- .../developer_scripts/parse_new_sequences.py | 176 ++- scripts/explicit_translation.py | 8 +- scripts/mutation_summary.py | 8 +- scripts/sanitize_metadata.py | 2 +- scripts/sanitize_sequences.py | 2 +- 8 files changed, 779 insertions(+), 712 deletions(-) diff --git a/scripts/add_branch_labels.py b/scripts/add_branch_labels.py index 613d28df3..944b5d744 100644 --- a/scripts/add_branch_labels.py +++ b/scripts/add_branch_labels.py @@ -12,11 +12,11 @@ def extract_spike_mutations(node_data): return data def extract_clade_labels(node_data): - data = {} - for name, node in node_data["nodes"].items(): - if "clade_annotation" in node: - data[name] = node["clade_annotation"] - return data + return { + name: node["clade_annotation"] + for name, node in node_data["nodes"].items() + if "clade_annotation" in node + } if __name__ == '__main__': parser = argparse.ArgumentParser( @@ -44,14 +44,14 @@ def extract_clade_labels(node_data): def attach_labels(n): # closure if n["name"] in spike_mutations or n["name"] in clade_labels: - if "branch_attrs" not in n: - n["branch_attrs"]={} - if "labels" not in n["branch_attrs"]: - n["branch_attrs"]["labels"]={} - if n["name"] in spike_mutations: - n["branch_attrs"]["labels"]["spike_mutations"] = spike_mutations[n["name"]] - if n["name"] in clade_labels: - n["branch_attrs"]["labels"]["emerging_lineage"] = clade_labels[n["name"]] + if "branch_attrs" not in n: + n["branch_attrs"]={} + if "labels" not in n["branch_attrs"]: + n["branch_attrs"]["labels"]={} + if n["name"] in spike_mutations: + n["branch_attrs"]["labels"]["spike_mutations"] = spike_mutations[n["name"]] + if n["name"] in clade_labels: + n["branch_attrs"]["labels"]["emerging_lineage"] = clade_labels[n["name"]] if "children" in n: for c in n["children"]: diff --git a/scripts/add_labels.py b/scripts/add_labels.py index f98363a90..294572fde 100644 --- a/scripts/add_labels.py +++ b/scripts/add_labels.py @@ -1,65 +1,94 @@ import argparse -import json -from Bio import Phylo -from collections import defaultdict +from augur.utils import read_metadata +from Bio import SeqIO +import csv +import sys -def attach_labels(d, labeled_nodes): - if "children" in d: - for c in d["children"]: - if c["name"] in labeled_nodes: - if "labels" not in c["branch_attrs"]: - c["branch_attrs"]["labels"] = {} - c['branch_attrs']['labels']['mlabel'] = labeled_nodes[c["name"]][0] - print(c['branch_attrs']['labels']) - attach_labels(c, labeled_nodes) +EMPTY = '' +# This script was written in preparation for a future augur where commands +# may take multiple metadata files, thus making this script unnecessary! +# +# Merging logic: +# - Order of supplied TSVs matters +# - All columns are included (i.e. union of all columns present) +# - The last non-empty value read (from different TSVs) is used. I.e. values are overwritten. +# - Missing data is represented by an empty string +# +# We use one-hot encoding to specify which origin(s) a piece of metadata came from -if __name__ == '__main__': +def parse_args(): parser = argparse.ArgumentParser( - description="Remove extraneous colorings", + description=""" + Custom script to combine metadata files from different origins. + In the case where metadata files specify different values, the latter provided file will take priority. + Columns will be added for each origin with values "yes" or "no" to identify the input source (origin) of each sample. + """, formatter_class=argparse.ArgumentDefaultsHelpFormatter ) + parser.add_argument('--metadata', required=True, nargs='+', metavar="TSV", help="Metadata files") + parser.add_argument('--origins', required=True, nargs='+', metavar="STR", help="Names of origins (order should match provided metadata)") + parser.add_argument('--output', required=True, metavar="TSV", help="Output (merged) metadata") + return parser.parse_args() - parser.add_argument('--input', type=str, metavar="JSON", required=True, help="input Auspice JSON") - parser.add_argument('--tree', type=str, required=True, help="tree file") - parser.add_argument('--clades', type=str, required=True, help="clades") - parser.add_argument('--mutations', type=str, required=True, help="mutations") - parser.add_argument('--output', type=str, metavar="JSON", required=True, help="output Auspice JSON") - args = parser.parse_args() - - T = Phylo.read(args.tree, 'newick') +if __name__ == '__main__': + args = parse_args() + try: + assert(len(args.metadata)==len(args.origins)) + assert(len(args.origins)>1) + except AssertionError: + print("Error. Please check your inputs - there must be the same number of metadata files as origins provided, and there must be more than one of each!") + sys.exit(2) - with open(args.mutations, "r") as f: - mutation_json = json.load(f)['nodes'] + # READ IN METADATA FILES + metadata = [] + for (origin, fname) in zip(args.origins, args.metadata): + data, columns = read_metadata(fname) + metadata.append({'origin': origin, "fname": fname, 'data': data, 'columns': columns, 'strains': {s for s in data.keys()}}) - with open(args.clades, "r") as f: - clades_json = json.load(f)['nodes'] + # SUMMARISE INPUT METADATA + print(f"Parsed {len(metadata)} metadata TSVs") + for m in metadata: + print(f"\t{m['origin']} ({m['fname']}): {len(m['data'].keys())} strains x {len(m['columns'])} columns") - with open(args.input, "r") as f: - input_json = json.load(f) + # BUILD UP COLUMN NAMES FROM MULTIPLE INPUTS TO PRESERVE ORDER + combined_columns = [] + for m in metadata: + combined_columns.extend([c for c in m['columns'] if c not in combined_columns]) + combined_columns.extend(list(args.origins)) - nodes = {} - for n in T.find_clades(order='postorder'): - if n.is_terminal(): - n.tip_count=1 - else: - n.tip_count = sum([c.tip_count for c in n]) - nodes[n.name] = {'tip_count':n.tip_count} + # ADD IN VALUES ONE BY ONE, OVERWRITING AS NECESSARY + combined_data = metadata[0]['data'] + for strain in combined_data: + for column in combined_columns: + if column not in combined_data[strain]: + combined_data[strain][column] = EMPTY - labels = defaultdict(list) - for node in nodes: - for m in mutation_json[node]['muts']: - if m[0] in 'ACGT' and m[-1] in 'ACGT': - clade = clades_json[node]['clade_membership'] - tmp_label = (clade, m) - labels[tmp_label].append((node, nodes[node]['tip_count'])) + for idx in range(1, len(metadata)): + for strain, row in metadata[idx]['data'].items(): + if strain not in combined_data: + combined_data[strain] = {c:EMPTY for c in combined_columns} + for column in combined_columns: + if column in row: + existing_value = combined_data[strain][column] + new_value = row[column] + # overwrite _ANY_ existing value if the overwriting value is non empty (and different)! + if new_value != EMPTY and new_value != existing_value: + if existing_value != EMPTY: + print(f"[{strain}::{column}] Overwriting {combined_data[strain][column]} with {new_value}") + combined_data[strain][column] = new_value - labeled_nodes = defaultdict(list) - for label in labels: - node = sorted(labels[label], key=lambda x:-x[1])[0] - labeled_nodes[node[0]].append('/'.join(label)) + # one-hot encoding for origin + # note that we use "yes" / "no" here as Booleans are problematic for `augur filter` + for metadata_entry in metadata: + origin = metadata_entry['origin'] + for strain in combined_data: + combined_data[strain][origin] = "yes" if strain in metadata_entry['strains'] else "no" - attach_labels(input_json["tree"], labeled_nodes) + print(f"Combined metadata: {len(combined_data.keys())} strains x {len(combined_columns)} columns") - with open(args.output, 'w') as f: - json.dump(input_json, f, indent=2) + with open(args.output, 'w') as fh: + tsv_writer = csv.writer(fh, delimiter='\t') + tsv_writer.writerow(combined_columns) + for row in combined_data.values(): + tsv_writer.writerow([row[column] for column in combined_columns]) diff --git a/scripts/developer_scripts/parse_metadata.py b/scripts/developer_scripts/parse_metadata.py index 4a54c25b6..00ff9dda2 100644 --- a/scripts/developer_scripts/parse_metadata.py +++ b/scripts/developer_scripts/parse_metadata.py @@ -2,98 +2,103 @@ from difflib import SequenceMatcher from pathlib import Path - # Things to make things recogised as Cruise ships & ignored/special treatment -cruise_abbrev = ["Grand Princess", "Cruise", "cruise", "Diamond Princess"] +cruise_abbrev = [ "Grand Princess", "Cruise", "cruise", "Diamond Princess" ] -#path to files used in the script +# path to files used in the script path_to_config_files = "scripts/developer_scripts/config_files_parse_metadata/" path_to_output_files = "scripts/developer_scripts/output_files_parse_metadata/" -Path(path_to_output_files).mkdir(parents=True, exist_ok=True) +Path(path_to_output_files).mkdir(parents = True, exist_ok = True) + def bold(s): - return('\033[1m' + s + '\033[0m') + return ('\033[1m' + s + '\033[0m') + ################################################################################ # Utils for reading files ################################################################################ # Read files which store duplicates, variants etc. -def read_local_file(file_name): #TODO: how will final file structure look like? Also, combine everything into one file for compactness? +def read_local_file( + file_name): # TODO: how will final file structure look like? Also, combine everything into one file for compactness? path_file_name = path_to_config_files + file_name with open(path_file_name) as myfile: file_content = myfile.readlines() - first_files = [path_to_config_files+fi for fi in ["duplicates.txt", "accepted_exposure_additions.txt"]] + first_files = [ path_to_config_files + fi for fi in [ "duplicates.txt", "accepted_exposure_additions.txt" ] ] - if path_file_name in first_files: #simple list - return [line.strip() for line in file_content[1:]] + if path_file_name in first_files: # simple list + return [ line.strip() for line in file_content[ 1: ] ] - second_files = [path_to_config_files+fi for fi in ["wrong_regions.txt", "abbreviations.txt", "false_divisions.txt"] ] + second_files = [ path_to_config_files + fi for fi in + [ "wrong_regions.txt", "abbreviations.txt", "false_divisions.txt" ] ] - if path_file_name in second_files: #dictionary, keys seperated from content with tabs + if path_file_name in second_files: # dictionary, keys seperated from content with tabs content = {} - for line in file_content[1:]: + for line in file_content[ 1: ]: l = line.strip().split("\t") - if l[0] in content: - print("Attention, duplicate found while reading " + file_name + ": " + l[0] + " -> " + l[1] + ", " + content[l[0]]) - content[l[0]] = l[1] + if l[ 0 ] in content: + print("Attention, duplicate found while reading " + file_name + ": " + l[ 0 ] + " -> " + l[ 1 ] + ", " + + content[ l[ 0 ] ]) + content[ l[ 0 ] ] = l[ 1 ] return content - third_files = [path_to_config_files+fi for fi in ["variants.txt", "international_exceptions.txt"] ] + third_files = [ path_to_config_files + fi for fi in [ "variants.txt", "international_exceptions.txt" ] ] - if path_file_name in third_files: #need two level-dict - if path_file_name == path_to_config_files+"variants.txt": + if path_file_name in third_files: # need two level-dict + if path_file_name == path_to_config_files + "variants.txt": content = {'location': {}, 'division': {}, 'country': {}, 'region': {}} - if path_file_name == path_to_config_files+"international_exceptions.txt": + if path_file_name == path_to_config_files + "international_exceptions.txt": content = {'location': {}, 'division': {}} - for line in file_content[1:]: + for line in file_content[ 1: ]: if line == "\n": continue l = line.strip().split("\t") if line.endswith("\t\n"): - l = [l[0], l[1], ""] #allow empty assignment of hierarchy (e.g. set location to blank) - entry = l[2] + l = [ l[ 0 ], l[ 1 ], "" ] # allow empty assignment of hierarchy (e.g. set location to blank) + entry = l[ 2 ] if len(l) == 4: - entry = (l[2],l[3]) - if l[0] not in content: - content[l[0]] = {} - if l[1] not in content[l[0]]: # allow duplicates (e.g. multiple "San Rafael" in different divisions) - content[l[0]][l[1]] = [] - else: #check whether already existing variant has hierarchical ordering or not + entry = (l[ 2 ], l[ 3 ]) + if l[ 0 ] not in content: + content[ l[ 0 ] ] = {} + if l[ 1 ] not in content[ l[ 0 ] ]: # allow duplicates (e.g. multiple "San Rafael" in different divisions) + content[ l[ 0 ] ][ l[ 1 ] ] = [ ] + else: # check whether already existing variant has hierarchical ordering or not conflict = False - for c in content[l[0]][l[1]]: + for c in content[ l[ 0 ] ][ l[ 1 ] ]: if type(c) is not tuple: - print("Warning: Variant " + str(entry) + " can not be applied due to the presence of another instance of this name in variants.txt without hierarchical ordering.") + print("Warning: Variant " + str( + entry) + " can not be applied due to the presence of another instance of this name in variants.txt without hierarchical ordering.") conflict = True - if conflict: + if conflict: continue - content[l[0]][l[1]].append(entry) + content[ l[ 0 ] ][ l[ 1 ] ].append(entry) return content - fourth_files = [path_to_config_files + fi for fi in ["manual_adjustments.txt"]] + fourth_files = [ path_to_config_files + fi for fi in [ "manual_adjustments.txt" ] ] - if path_file_name in fourth_files: # / and tab as separator + if path_file_name in fourth_files: # / and tab as separator content = {} - for line in file_content[1:]: + for line in file_content[ 1: ]: if line == "\n": continue - l = line.strip().split("\t")[0].split("/") + line.strip().split("\t")[1].split("/") + l = line.strip().split("\t")[ 0 ].split("/") + line.strip().split("\t")[ 1 ].split("/") if len(l) < 8: - for i in range(8-len(l)): + for i in range(8 - len(l)): l.append("") - k = "/".join(l[:4]) - c = "/".join(l[4:]) + k = "/".join(l[ :4 ]) + c = "/".join(l[ 4: ]) if k in content: - print("Attention, duplicate found while reading " + file_name + ": " + k + " -> " + c + ", " + content[k]) - content[k] = c + print("Attention, duplicate found while reading " + file_name + ": " + k + " -> " + c + ", " + content[ + k ]) + content[ k ] = c return content - # Read ordering and lat_longs file and return as dictionary: def read_geography_file(file_name, hierarchical = False): lat_longs = ("lat_longs" in file_name) @@ -106,38 +111,40 @@ def read_geography_file(file_name, hierarchical = False): data = {"location": {}, "division": {}, "country": {}, "region": {}} else: # dictionary containing all locations, divisions etc. as lists - data = {"location": [], "division": [], "country": [], "region": []} + data = {"location": [ ], "division": [ ], "country": [ ], "region": [ ]} color_ordering_other = {} for line in data_file: if line == "\n": continue l = line.strip().split("\t") - if l[0][:1] == "#": #if a comment - ignore! + if l[ 0 ][ :1 ] == "#": # if a comment - ignore! continue - type = l[0] #location, division etc - name = l[1] + type = l[ 0 ] # location, division etc + name = l[ 1 ] if lat_longs: - if name not in data[type]: - data[type][name] = (float(l[2]), float(l[3])) + if name not in data[ type ]: + data[ type ][ name ] = (float(l[ 2 ]), float(l[ 3 ])) else: - print("Duplicate in lat_longs? (" + l[0] + " " + l[1] + ")\n") # if already in the dictionary, print warning + print("Duplicate in lat_longs? (" + l[ 0 ] + " " + l[ + 1 ] + ")\n") # if already in the dictionary, print warning else: if type in data: - if name not in data[type]: - data[type].append(name) + if name not in data[ type ]: + data[ type ].append(name) else: - print("Duplicate in color_ordering? (" + l[0] + " " + l[1] + ")\n") # if already in the dictionary, print warning + print("Duplicate in color_ordering? (" + l[ 0 ] + " " + l[ + 1 ] + ")\n") # if already in the dictionary, print warning else: if type not in color_ordering_other: - color_ordering_other[type] = [] - color_ordering_other[type].append(name) + color_ordering_other[ type ] = [ ] + color_ordering_other[ type ].append(name) if lat_longs: return data else: return data, color_ordering_other - else: #hierarchical structure of ordering for checking similar names only in the same country + else: # hierarchical structure of ordering for checking similar names only in the same country data = {"Asia": {}, "Oceania": {}, "Africa": {}, "Europe": {}, "South America": {}, "North America": {}} region = "" @@ -149,101 +156,67 @@ def read_geography_file(file_name, hierarchical = False): continue if line.startswith("###"): if len(line.split("### ")) > 1: # country - country = line.strip().split("### ")[1] - if country not in data[region]: - data[region][country] = {} + country = line.strip().split("### ")[ 1 ] + if country not in data[ region ]: + data[ region ][ country ] = {} else: if line.startswith("#"): if len(line.split("# ")) > 1: # region or division - place = line.strip().split("# ")[1] + place = line.strip().split("# ")[ 1 ] if place in data: region = place else: division = place - if division not in data[region][country]: - data[region][country][division] = [] + if division not in data[ region ][ country ]: + data[ region ][ country ][ division ] = [ ] else: l = line.strip().split("\t") - type = l[0] # location, division etc - place = l[1] + type = l[ 0 ] # location, division etc + place = l[ 1 ] if type == "division": division = place - if division not in data[region][country]: - data[region][country][division] = [] + if division not in data[ region ][ country ]: + data[ region ][ country ][ division ] = [ ] if type == "location": location = place - if location not in data[region][country][division]: - data[region][country][division].append(location) + if location not in data[ region ][ country ][ division ]: + data[ region ][ country ][ division ].append(location) return data -replace_special_char = { - "é":"e", - "è":"e", - "ü":"ue", - "ä":"ae", - "ö":"oe", - "í":"i", - "ó":"o", - "ç":"c", - "á":"a", - "'":" ", - "â":"a", - "š":"s", - "ť":"t", - "ñ":"n", - "ř":"r", - "ž":"z", - "ů":"u", - "ý":"y", - "ě":"e", - "ň":"n", - "ã":"a", - "ê":"e", - "č":"c", - "ô":"o", - "ı":"i", - "ú": "u", - "ś":"s", - "ą":"q", - "à":"a", - "å":"a", - "ł":"l", - "-":" ", - "î": "i", - "ŕ": "r", - "ľ": "l", - "ď": "d" -} +replace_special_char = {"é": "e", "è": "e", "ü": "ue", "ä": "ae", "ö": "oe", "í": "i", "ó": "o", "ç": "c", "á": "a", + "'": " ", "â": "a", "š": "s", "ť": "t", "ñ": "n", "ř": "r", "ž": "z", "ů": "u", "ý": "y", "ě": "e", "ň": "n", + "ã": "a", "ê": "e", "č": "c", "ô": "o", "ı": "i", "ú": "u", "ś": "s", "ą": "q", "à": "a", "å": "a", "ł": "l", + "-": " ", "î": "i", "ŕ": "r", "ľ": "l", "ď": "d"} def clean_string(s): s = s.lower() for c in replace_special_char: - s = s.replace(c, replace_special_char[c]) + s = s.replace(c, replace_special_char[ c ]) return s def pre_sort_lat_longs(lat_longs): - dataset = {"location": [], "division": [], "country": [], "region": []} - regions = ["Africa", "Asia", "Europe", "North America", "Oceania", "South America"] + dataset = {"location": [ ], "division": [ ], "country": [ ], "region": [ ]} + regions = [ "Africa", "Asia", "Europe", "North America", "Oceania", "South America" ] for line in lat_longs: if line == "\n": continue - dataset[line.split("\t")[0]].append(line) + dataset[ line.split("\t")[ 0 ] ].append(line) - lat_longs_sorted = [] + lat_longs_sorted = [ ] - regions_list = [] + regions_list = [ ] for type in dataset: - no_special_char = {clean_string(dataset[type][i].split("\t")[1]): i for i in range(len(dataset[type]))} + no_special_char = {clean_string(dataset[ type ][ i ].split("\t")[ 1 ]): i for i in range(len(dataset[ type ]))} for line in sorted(no_special_char): - i = no_special_char[line] - line_orig = dataset[type][i] - if line_orig.startswith("country") and line_orig.split("\t")[1] in regions: + i = no_special_char[ line ] + line_orig = dataset[ type ][ i ] + if line_orig.startswith("country") and line_orig.split("\t")[ 1 ] in regions: regions_list.append(line_orig) continue lat_longs_sorted.append(line_orig) @@ -255,9 +228,8 @@ def pre_sort_lat_longs(lat_longs): return lat_longs_sorted -#Function to support supervised addition of new entries into lat_longs. The user must review every new entry and approve it to be written into the lat_longs file. Ground truth lat_longs is not overwritten, but a copy is made in the developer_scripts folder. +# Function to support supervised addition of new entries into lat_longs. The user must review every new entry and approve it to be written into the lat_longs file. Ground truth lat_longs is not overwritten, but a copy is made in the developer_scripts folder. def auto_add_lat_longs(new_lat_longs): - with open("defaults/lat_longs.tsv") as f: lat_longs = f.readlines() lat_longs = pre_sort_lat_longs(lat_longs) @@ -266,22 +238,23 @@ def auto_add_lat_longs(new_lat_longs): continue correct_hierarchy = False for i in range(len(lat_longs)): - if lat_longs[i] == "\n" and not correct_hierarchy: + if lat_longs[ i ] == "\n" and not correct_hierarchy: continue - if lat_longs[i] != "\n" and entry[:4] != lat_longs[i][:4]: #first characters correspond to country, division, location etc. + if lat_longs[ i ] != "\n" and entry[ :4 ] != lat_longs[ i ][ + :4 ]: # first characters correspond to country, division, location etc. continue correct_hierarchy = True - if lat_longs[i] != "\n" and clean_string(entry) > clean_string(lat_longs[i]): + if lat_longs[ i ] != "\n" and clean_string(entry) > clean_string(lat_longs[ i ]): continue print("\n") for k in range(3): - print(lat_longs[i-3+k].strip()) + print(lat_longs[ i - 3 + k ].strip()) print(bold(entry)) for k in range(3): - print(lat_longs[i+k].strip()) + print(lat_longs[ i + k ].strip()) answer = input("Approve of this new entry (y)?") if answer == "y": - lat_longs = lat_longs[:i] + [entry + "\n" ] + lat_longs[i:] + lat_longs = lat_longs[ :i ] + [ entry + "\n" ] + lat_longs[ i: ] break local_file = path_to_output_files + "lat_longs.tsv" @@ -290,7 +263,6 @@ def auto_add_lat_longs(new_lat_longs): f.write(line) - ################################################################################ # Step 1: Collection of data from metadata file in hierarchical manner ################################################################################ @@ -301,38 +273,38 @@ def auto_add_lat_longs(new_lat_longs): def read_metadata(metadata): data = {} - for line in metadata[1:]: + for line in metadata[ 1: ]: l = line.split("\t") - region = l[5] - country = l[6] - division = l[7] - location = l[8] - id = l[2] - strain = l[0] - - host = l[14] - if host == "Neovison vison" or host == "Mustela lutreola": + region = l[ 5 ] + country = l[ 6 ] + division = l[ 7 ] + location = l[ 8 ] + id = l[ 2 ] + strain = l[ 0 ] + + host = l[ 14 ] + if host == "Neovison vison" or host == "Mustela lutreola": print("Adjust host " + host + " to Mink") additions_to_annotation.append(strain + "\t" + id + "\thost\tMink # previously " + host) - problematic_char = ["'", "`"] + problematic_char = [ "'", "`" ] for c in problematic_char: if c in strain: strain2 = strain.replace(c, "-") print("Adjust strain " + strain + " to " + strain2) additions_to_annotation.append(strain + "\t" + id + "\tstrain\t" + strain2 + " # previously " + strain) - if region not in data: - data[region] = {} - if country not in data[region]: - data[region][country] = {} - if division not in data[region][country]: - data[region][country][division] = {} - if location not in data[region][country][division]: - data[region][country][division][location] = [] - data[region][country][division][location].append(strain + "\t" + id) # store strain and id of each seq with this combination of region/country/division/location + data[ region ] = {} + if country not in data[ region ]: + data[ region ][ country ] = {} + if division not in data[ region ][ country ]: + data[ region ][ country ][ division ] = {} + if location not in data[ region ][ country ][ division ]: + data[ region ][ country ][ division ][ location ] = [ ] + data[ region ][ country ][ division ][ location ].append( + strain + "\t" + id) # store strain and id of each seq with this combination of region/country/division/location return data @@ -344,20 +316,20 @@ def read_exposure(data, metadata): accepted_additions = read_local_file("accepted_exposure_additions.txt") print("\n=============================\n") - #print("Travel history includes:") + # print("Travel history includes:") bad_div = {} bad_ctry = {} - for line in metadata[1:]: + for line in metadata[ 1: ]: l = line.split("\t") - region2 = l[9] - country2 = l[10] - division2 = l[11] - id = l[2] - strain = l[0] + region2 = l[ 9 ] + country2 = l[ 10 ] + division2 = l[ 11 ] + id = l[ 2 ] + strain = l[ 0 ] - if region2 == "United Kingdom": #TODO: separate this, make it more applicable for other countries + if region2 == "United Kingdom": # TODO: separate this, make it more applicable for other countries region2 = "Europe" division2 = country2 country2 = "United Kingdom" @@ -369,35 +341,35 @@ def read_exposure(data, metadata): s2 = country2 + " (" + region2 + ")" if s2 in bad_ctry: - bad_ctry[s2].append(line.strip()) + bad_ctry[ s2 ].append(line.strip()) else: - if country2 not in data[region2]: + if country2 not in data[ region2 ]: if s2 not in accepted_additions and country2 != region2: - bad_ctry[s2] = [line.strip()] + bad_ctry[ s2 ] = [ line.strip() ] else: - data[region2][country2] = {} - #print("Added country " + bold(s2) + " to the dataset") #optional confirmation of added countries + data[ region2 ][ + country2 ] = {} # print("Added country " + bold(s2) + " to the dataset") #optional confirmation of added countries if s in bad_div: - bad_div[s].append(line.strip()) + bad_div[ s ].append(line.strip()) else: - if country2 in data[region2]: - if division2 not in data[region2][country2]: + if country2 in data[ region2 ]: + if division2 not in data[ region2 ][ country2 ]: if s not in accepted_additions and division2 != country2: - bad_div[s] = [line.strip()] + bad_div[ s ] = [ line.strip() ] else: - data[region2][country2][division2] = {} - #print("Added division " + bold(s) + " to the dataset") #optional confirmation of added divisions + data[ region2 ][ country2 ][ + division2 ] = {} # print("Added division " + bold(s) + " to the dataset") #optional confirmation of added divisions print("\n\nUnchecked travel histories: (consider adding to accepted_exposure_additions.txt)\n") for division in bad_div: print("Strains with unknown division " + bold(division)) - for l in bad_div[division]: + for l in bad_div[ division ]: print(l) print() print() for country in bad_ctry: print("Strains with unknown country " + bold(country)) - for l in bad_ctry[country]: + for l in bad_ctry[ country ]: print(l) print() print("\n=============================\n") @@ -412,117 +384,131 @@ def read_exposure(data, metadata): # Correct the metadata dictionary in a given manner # e.g. switch all locations and strains from a misspelled division to the correct division # e.g. turn a certain false division into a location below the correct division, and move all connected strains -def correct_data(data, type, corrections, add_annotations = True): #TODO: add region correction (e.g. for Turkey, Georgia) +def correct_data(data, type, corrections, + add_annotations = True): # TODO: add region correction (e.g. for Turkey, Georgia) if type == "region": for (region, region_correct) in corrections: if region_correct not in data: - data[region_correct] = {} - for country in data[region]: - if country not in data[region_correct]: - data[region_correct][country] = {} - for division in data[region][country]: - if division not in data[region_correct][country]: - data[region_correct][country][division] = {} - for location in data[region][country][division]: - if location not in data[region_correct][country][division]: - data[region_correct][country][division][location] = [] - for strain in data[region][country][division][location]: + data[ region_correct ] = {} + for country in data[ region ]: + if country not in data[ region_correct ]: + data[ region_correct ][ country ] = {} + for division in data[ region ][ country ]: + if division not in data[ region_correct ][ country ]: + data[ region_correct ][ country ][ division ] = {} + for location in data[ region ][ country ][ division ]: + if location not in data[ region_correct ][ country ][ division ]: + data[ region_correct ][ country ][ division ][ location ] = [ ] + for strain in data[ region ][ country ][ division ][ location ]: if region != region_correct: if add_annotations: - additions_to_annotation.append(strain + "\tregion\t" + region_correct + " # previously " + region) - data[region_correct][country][division][location].append(strain) - del data[region] + additions_to_annotation.append( + strain + "\tregion\t" + region_correct + " # previously " + region) + data[ region_correct ][ country ][ division ][ location ].append(strain) + del data[ region ] if type == "country": for (region, country, region_correct, country_correct) in corrections: - if country_correct not in data[region_correct]: - data[region_correct][country_correct] = {} - for division in data[region][country]: - if division not in data[region_correct][country_correct]: - data[region_correct][country_correct][division] = {} - for location in data[region][country][division]: - if location not in data[region_correct][country_correct][division]: - data[region_correct][country_correct][division][location] = [] - for strain in data[region][country][division][location]: + if country_correct not in data[ region_correct ]: + data[ region_correct ][ country_correct ] = {} + for division in data[ region ][ country ]: + if division not in data[ region_correct ][ country_correct ]: + data[ region_correct ][ country_correct ][ division ] = {} + for location in data[ region ][ country ][ division ]: + if location not in data[ region_correct ][ country_correct ][ division ]: + data[ region_correct ][ country_correct ][ division ][ location ] = [ ] + for strain in data[ region ][ country ][ division ][ location ]: if country != country_correct: if add_annotations: - additions_to_annotation.append(strain + "\tcountry\t" + country_correct + " # previously " + country) + additions_to_annotation.append( + strain + "\tcountry\t" + country_correct + " # previously " + country) if region != region_correct: if add_annotations: - additions_to_annotation.append(strain + "\tregion\t" + region_correct + " # previously " + region) - data[region_correct][country_correct][division][location].append(strain) - del data[region][country] + additions_to_annotation.append( + strain + "\tregion\t" + region_correct + " # previously " + region) + data[ region_correct ][ country_correct ][ division ][ location ].append(strain) + del data[ region ][ country ] if type == "division": for (region, country, division, region_correct, country_correct, division_correct) in corrections: - if country_correct not in data[region_correct]: - data[region_correct][country_correct] = {} - if division_correct not in data[region_correct][country_correct]: - data[region_correct][country_correct][division_correct] = {} - for location in data[region][country][division]: - if location not in data[region_correct][country_correct][division_correct]: - data[region_correct][country_correct][division_correct][location] = [] - for strain in data[region][country][division][location]: + if country_correct not in data[ region_correct ]: + data[ region_correct ][ country_correct ] = {} + if division_correct not in data[ region_correct ][ country_correct ]: + data[ region_correct ][ country_correct ][ division_correct ] = {} + for location in data[ region ][ country ][ division ]: + if location not in data[ region_correct ][ country_correct ][ division_correct ]: + data[ region_correct ][ country_correct ][ division_correct ][ location ] = [ ] + for strain in data[ region ][ country ][ division ][ location ]: if division != division_correct: if add_annotations: - additions_to_annotation.append(strain + "\tdivision\t" + division_correct + " # previously " + division) + additions_to_annotation.append( + strain + "\tdivision\t" + division_correct + " # previously " + division) if country != country_correct: if add_annotations: - additions_to_annotation.append(strain + "\tcountry\t" + country_correct + " # previously " + country) + additions_to_annotation.append( + strain + "\tcountry\t" + country_correct + " # previously " + country) if region != region_correct: if add_annotations: - additions_to_annotation.append(strain + "\tregion\t" + region_correct + " # previously " + region) - data[region_correct][country_correct][division_correct][location].append(strain) - del data[region][country][division] + additions_to_annotation.append( + strain + "\tregion\t" + region_correct + " # previously " + region) + data[ region_correct ][ country_correct ][ division_correct ][ location ].append(strain) + del data[ region ][ country ][ division ] if type == "location": - for (region, country, division, location, region_correct, country_correct, division_correct, location_correct) in corrections: - if country_correct not in data[region_correct]: - data[region_correct][country_correct] = {} - if division_correct not in data[region_correct][country_correct]: - data[region_correct][country_correct][division_correct] = {} - if location_correct not in data[region_correct][country_correct][division_correct]: - data[region_correct][country_correct][division_correct][location_correct] = [] - for strain in data[region][country][division][location]: + for (region, country, division, location, region_correct, country_correct, division_correct, + location_correct) in corrections: + if country_correct not in data[ region_correct ]: + data[ region_correct ][ country_correct ] = {} + if division_correct not in data[ region_correct ][ country_correct ]: + data[ region_correct ][ country_correct ][ division_correct ] = {} + if location_correct not in data[ region_correct ][ country_correct ][ division_correct ]: + data[ region_correct ][ country_correct ][ division_correct ][ location_correct ] = [ ] + for strain in data[ region ][ country ][ division ][ location ]: if location != location_correct: if add_annotations: - additions_to_annotation.append(strain + "\tlocation\t" + location_correct + " # previously " + location) + additions_to_annotation.append( + strain + "\tlocation\t" + location_correct + " # previously " + location) if division != division_correct: if add_annotations: - additions_to_annotation.append(strain + "\tdivision\t" + division_correct + " # previously " + division) + additions_to_annotation.append( + strain + "\tdivision\t" + division_correct + " # previously " + division) if country != country_correct: if add_annotations: - additions_to_annotation.append(strain + "\tcountry\t" + country_correct + " # previously " + country) + additions_to_annotation.append( + strain + "\tcountry\t" + country_correct + " # previously " + country) if region != region_correct: if add_annotations: - additions_to_annotation.append(strain + "\tregion\t" + region_correct + " # previously " + region) - data[region_correct][country_correct][division_correct][location_correct].append(strain) - del data[region][country][division][location] - if data[region][country][division] == {}: - del data[region][country][division] - if data[region][country] == {}: - del data[region][country] + additions_to_annotation.append( + strain + "\tregion\t" + region_correct + " # previously " + region) + data[ region_correct ][ country_correct ][ division_correct ][ location_correct ].append(strain) + del data[ region ][ country ][ division ][ location ] + if data[ region ][ country ][ division ] == {}: + del data[ region ][ country ][ division ] + if data[ region ][ country ] == {}: + del data[ region ][ country ] if type == "div_to_loc": for location in corrections: - (region, country, division) = corrections[location] - if division not in data[region][country]: - data[region][country][division] = {} - for sub_location in data[region][country][location]: + (region, country, division) = corrections[ location ] + if division not in data[ region ][ country ]: + data[ region ][ country ][ division ] = {} + for sub_location in data[ region ][ country ][ location ]: if sub_location != "": print("Attention, additional location assigned to false division: " + sub_location) - if location not in data[region][country][division]: - data[region][country][division][location] = [] - for strain in data[region][country][location][sub_location]: + if location not in data[ region ][ country ][ division ]: + data[ region ][ country ][ division ][ location ] = [ ] + for strain in data[ region ][ country ][ location ][ sub_location ]: if add_annotations: - additions_to_annotation.append(strain + "\tdivision\t" + division + " # previously false division " + location) + additions_to_annotation.append( + strain + "\tdivision\t" + division + " # previously false division " + location) additions_to_annotation.append(strain + "\tlocation\t" + location) - data[region][country][division][location].append(strain) - del data[region][country][location] + data[ region ][ country ][ division ][ location ].append(strain) + del data[ region ][ country ][ location ] return data + # Search the ordering file for a similar name as the one given, and return it if the score is above a fixed threshold def check_similar(ordering, name, type): diff_max = 0 @@ -546,11 +532,10 @@ def check_similar(ordering, name, type): ##### Step 2.0: -def adjust_to_database(data): #TODO: temporary solution, needs reworking +def adjust_to_database(data): # TODO: temporary solution, needs reworking for region in data: - for country in data[region]: + for country in data[ region ]: if country + ".txt" in listdir(path_to_config_files + "country_ordering/"): - variants = {} with open(path_to_config_files + "country_ordering/" + country + "_variants.txt") as myfile: country_variants = myfile.readlines() @@ -558,81 +543,100 @@ def adjust_to_database(data): #TODO: temporary solution, needs reworking if line == "\n": continue l = line.strip().split("\t") - variants[l[0]] = l[1] + variants[ l[ 0 ] ] = l[ 1 ] with open(path_to_config_files + "country_ordering/" + country + ".txt") as myfile: country_ordering = myfile.readlines() arrondissement_to_location = {} location_to_arrondissement = {} - provinces = [] + provinces = [ ] duplicates = {} for line in country_ordering: if line == "\n" or "------" in line: continue if line.startswith("### "): - province = clean_string(line.strip()[4:]) + province = clean_string(line.strip()[ 4: ]) provinces.append(province) continue if line.startswith("# "): - arrondissement = line.strip()[2:] - arrondissement_to_location[clean_string(arrondissement)] = [] + arrondissement = line.strip()[ 2: ] + arrondissement_to_location[ clean_string(arrondissement) ] = [ ] continue location = line.strip() - if location not in arrondissement_to_location[clean_string(arrondissement)]: - arrondissement_to_location[clean_string(arrondissement)].append(location) + if location not in arrondissement_to_location[ clean_string(arrondissement) ]: + arrondissement_to_location[ clean_string(arrondissement) ].append(location) if clean_string(location) in location_to_arrondissement: - if location_to_arrondissement[clean_string(location)] != arrondissement: - duplicates[clean_string(location)] = (arrondissement, location_to_arrondissement[clean_string(location)]) - location_to_arrondissement[clean_string(location)] = arrondissement + if location_to_arrondissement[ clean_string(location) ] != arrondissement: + duplicates[ clean_string(location) ] = ( + arrondissement, location_to_arrondissement[ clean_string(location) ]) + location_to_arrondissement[ clean_string(location) ] = arrondissement - for division in data[region][country]: - - for location in data[region][country][division]: + for division in data[ region ][ country ]: + for location in data[ region ][ country ][ division ]: division_c = clean_string(division) if division == country: continue - + if division_c in provinces and location == "": continue # division appears two times in country ordering - advise to pick one if division_c in duplicates: - print("Attention duplicate: " + bold(division) + " found in " + bold(duplicates[division_c][0]) + " and " + bold(duplicates[division_c][1])) + print("Attention duplicate: " + bold(division) + " found in " + bold( + duplicates[ division_c ][ 0 ]) + " and " + bold(duplicates[ division_c ][ 1 ])) print("Suggestion: check additional info for zip code") print("Suggestion: check additional info for zip code") continue ### location given if location != "": - # consistent with dataset - if clean_string(location) in location_to_arrondissement and division == location_to_arrondissement[clean_string(location)]: + if clean_string(location) in location_to_arrondissement and division == \ + location_to_arrondissement[ clean_string(location) ]: continue # other way around (in case of duplicates overwriting each other in location_to_arrondissement) - if division_c in arrondissement_to_location and location in arrondissement_to_location[division_c]: + if division_c in arrondissement_to_location and location in arrondissement_to_location[ + division_c ]: continue # location given, but with wrong division - adjust to correct division - if clean_string(location) in location_to_arrondissement and division != location_to_arrondissement[clean_string(location)]: + if clean_string(location) in location_to_arrondissement and division != \ + location_to_arrondissement[ clean_string(location) ]: print("Wrong division " + bold(division) + " given for location " + bold(location)) - print("Suggestion: add [" + "/".join([region, country, division, location]) + "\t" + "/".join([region, country, location_to_arrondissement[clean_string(location)], location]) + "] to manual_adjustments.txt") + print("Suggestion: add [" + "/".join( + [ region, country, division, location ]) + "\t" + "/".join( + [ region, country, location_to_arrondissement[ clean_string(location) ], + location ]) + "] to manual_adjustments.txt") continue # location given, but with wrong spelling. Division is correct - adjust to correct location - if location in variants and clean_string(variants[location]) in location_to_arrondissement and division == location_to_arrondissement[clean_string(variants[location])]: - print("Location " + bold(location) + " should be adjusted to " + bold(variants[location])) - print("Suggestion: add [" + "/".join([region, country, division, location]) + "\t" + "/".join([region, country, division, variants[location]]) + "] to manual_adjustments.txt") + if location in variants and clean_string( + variants[ location ]) in location_to_arrondissement and division == \ + location_to_arrondissement[ clean_string(variants[ location ]) ]: + print("Location " + bold(location) + " should be adjusted to " + bold( + variants[ location ])) + print("Suggestion: add [" + "/".join( + [ region, country, division, location ]) + "\t" + "/".join( + [ region, country, division, + variants[ location ] ]) + "] to manual_adjustments.txt") continue # location given, but with wrong spelling. Division false - adjust both location and division - if location in variants and clean_string(variants[location]) in location_to_arrondissement and division != location_to_arrondissement[clean_string(variants[location])]: - print("Location " + bold(location) + " should be adjusted to " + bold(variants[location]) + ". Wrong division " + bold(division) + " given for location " + bold(variants[location])) - print("Suggestion: add [" + "/".join([region, country, division, location]) + "\t" + "/".join([region, country, location_to_arrondissement[clean_string(variants[location])], variants[location]]) + "] to manual_adjustments.txt") + if location in variants and clean_string( + variants[ location ]) in location_to_arrondissement and division != \ + location_to_arrondissement[ clean_string(variants[ location ]) ]: + print("Location " + bold(location) + " should be adjusted to " + bold( + variants[ location ]) + ". Wrong division " + bold( + division) + " given for location " + bold(variants[ location ])) + print("Suggestion: add [" + "/".join( + [ region, country, division, location ]) + "\t" + "/".join( + [ region, country, location_to_arrondissement[ clean_string(variants[ location ]) ], + variants[ location ] ]) + "] to manual_adjustments.txt") continue @@ -643,27 +647,42 @@ def adjust_to_database(data): #TODO: temporary solution, needs reworking continue # given division is proper, but misspelled - adjust spelling - if division in variants and (clean_string(variants[division]) in provinces or clean_string(variants[division]) in arrondissement_to_location): - print("Division " + bold(division) + " should be adjusted to " + bold(variants[division])) - print("Suggestion: add [" + "/".join([region, country, division, location]) + "\t" + "/".join([region, country, variants[division], location]) + "] to manual_adjustments.txt") + if division in variants and ( + clean_string(variants[ division ]) in provinces or clean_string( + variants[ division ]) in arrondissement_to_location): + print("Division " + bold(division) + " should be adjusted to " + bold( + variants[ division ])) + print("Suggestion: add [" + "/".join( + [ region, country, division, location ]) + "\t" + "/".join( + [ region, country, variants[ division ], + location ]) + "] to manual_adjustments.txt") continue # given division is actually a location if division_c in location_to_arrondissement: - print("Given division " + bold(division) + " is actually a location within division " + bold(location_to_arrondissement[division_c])) - print("Suggestion: add [" + "/".join([region, country, division, location]) + "\t" + "/".join([region, country, location_to_arrondissement[division_c], division]) + "] to manual_adjustments.txt") + print("Given division " + bold( + division) + " is actually a location within division " + bold( + location_to_arrondissement[ division_c ])) + print("Suggestion: add [" + "/".join( + [ region, country, division, location ]) + "\t" + "/".join( + [ region, country, location_to_arrondissement[ division_c ], + division ]) + "] to manual_adjustments.txt") continue # given division is misspelled and location - if division in variants and clean_string(variants[division]) in location_to_arrondissement: - print("Given division " + bold(division) + " is a misspelled location " + bold(variants[division]) + " within division " + bold(location_to_arrondissement[clean_string(variants[division])])) - print("Suggestion: add [" + "/".join([region, country, division, location]) + "\t" + "/".join([region, country, location_to_arrondissement[clean_string(variants[division])], variants[division]]) + "] to manual_adjustments.txt") + if division in variants and clean_string( + variants[ division ]) in location_to_arrondissement: + print("Given division " + bold(division) + " is a misspelled location " + bold( + variants[ division ]) + " within division " + bold( + location_to_arrondissement[ clean_string(variants[ division ]) ])) + print("Suggestion: add [" + "/".join( + [ region, country, division, location ]) + "\t" + "/".join( + [ region, country, location_to_arrondissement[ clean_string(variants[ division ]) ], + variants[ division ] ]) + "] to manual_adjustments.txt") continue - print("Missing combination in " + country + " database: " + bold(division + ", " + location)) - print("\n=============================\n") return data @@ -672,14 +691,15 @@ def adjust_to_database(data): #TODO: temporary solution, needs reworking def manual_adjustments(data): manual_adjustments = read_local_file("manual_adjustments.txt") - seqs_to_correct = [] + seqs_to_correct = [ ] for region in data: - for country in data[region]: - for division in data[region][country]: - for location in data[region][country][division]: + for country in data[ region ]: + for division in data[ region ][ country ]: + for location in data[ region ][ country ][ division ]: for g in manual_adjustments: (region2, country2, division2, location2) = g.split("/") - (region_correct, country_correct, division_correct, location_correct) = manual_adjustments[g].split("/") + (region_correct, country_correct, division_correct, location_correct) = manual_adjustments[ + g ].split("/") if region2 == "*": region2 = region if region_correct == "*": @@ -698,33 +718,36 @@ def manual_adjustments(data): location_correct = location if region == region2 and country == country2 and division == division2 and location == location2: - seqs_to_correct.append((region, country, division, location, region_correct, country_correct, division_correct, location_correct)) - print("Manual adjustment: " + bold("/".join([region, country, division, location])) + " -> " + bold("/".join([region_correct, country_correct, division_correct, location_correct]))) + seqs_to_correct.append(( + region, country, division, location, region_correct, country_correct, division_correct, + location_correct)) + print("Manual adjustment: " + bold( + "/".join([ region, country, division, location ])) + " -> " + bold( + "/".join([ region_correct, country_correct, division_correct, location_correct ]))) data = correct_data(data, "location", seqs_to_correct) print("\n=============================\n") return data - ##### Step 2.1: Apply all known variants stored in an external file variants.txt def apply_variants(data): variants = read_local_file("variants.txt") - countries_to_switch = [] + countries_to_switch = [ ] for region in data: - for country in data[region]: - if country in variants['country']: + for country in data[ region ]: + if country in variants[ 'country' ]: match_found = False # if the first entry has no specified hierarchy, all other entries of this place name are ignored - if type(variants['country'][country][0]) is not tuple: + if type(variants[ 'country' ][ country ][ 0 ]) is not tuple: match_found = True - country_correct = variants['country'][country][0] + country_correct = variants[ 'country' ][ country ][ 0 ] else: - for country_option in variants['country'][country]: - if country_option[1] == "(" + region + ")": + for country_option in variants[ 'country' ][ country ]: + if country_option[ 1 ] == "(" + region + ")": match_found = True - country_correct = country_option[0] + country_correct = country_option[ 0 ] break if match_found: print("Apply variant (country): " + bold(country) + " -> " + bold(country_correct)) @@ -732,20 +755,20 @@ def apply_variants(data): data = correct_data(data, "country", countries_to_switch) - divisions_to_switch = [] + divisions_to_switch = [ ] for region in data: - for country in data[region]: - for division in data[region][country]: - if division in variants['division']: + for country in data[ region ]: + for division in data[ region ][ country ]: + if division in variants[ 'division' ]: match_found = False - if type(variants['division'][division][0]) is not tuple: + if type(variants[ 'division' ][ division ][ 0 ]) is not tuple: match_found = True - division_correct = variants['division'][division][0] + division_correct = variants[ 'division' ][ division ][ 0 ] else: - for division_option in variants['division'][division]: - if division_option[1] == "(" + region + ", " + country + ")": + for division_option in variants[ 'division' ][ division ]: + if division_option[ 1 ] == "(" + region + ", " + country + ")": match_found = True - division_correct = division_option[0] + division_correct = division_option[ 0 ] break if match_found: print("Apply variant (division): " + bold(division) + " -> " + bold(division_correct)) @@ -753,177 +776,197 @@ def apply_variants(data): data = correct_data(data, "division", divisions_to_switch) - locations_to_switch = [] + locations_to_switch = [ ] for region in data: - for country in data[region]: - for division in data[region][country]: - for location in data[region][country][division]: - if location in variants['location']: + for country in data[ region ]: + for division in data[ region ][ country ]: + for location in data[ region ][ country ][ division ]: + if location in variants[ 'location' ]: match_found = False - if type(variants['location'][location][0]) is not tuple: + if type(variants[ 'location' ][ location ][ 0 ]) is not tuple: match_found = True - location_correct = variants['location'][location][0] + location_correct = variants[ 'location' ][ location ][ 0 ] else: - for location_option in variants['location'][location]: - if location_option[1] == "(" + region + ", " + country + ", " + division + ")": + for location_option in variants[ 'location' ][ location ]: + if location_option[ 1 ] == "(" + region + ", " + country + ", " + division + ")": match_found = True - location_correct = location_option[0] + location_correct = location_option[ 0 ] break if match_found: print("Apply variant (location): " + bold(location) + " -> " + bold(location_correct)) - locations_to_switch.append((region, country, division, location, region, country, division, location_correct)) + locations_to_switch.append( + (region, country, division, location, region, country, division, location_correct)) data = correct_data(data, "location", locations_to_switch) print("\n=============================\n") return data -def apply_typical_errors(data): #TODO: rename, maybe join with UK as region? also use correct_data() + +def apply_typical_errors(data): # TODO: rename, maybe join with UK as region? also use correct_data() wrong_regions = read_local_file("wrong_regions.txt") - countries_to_switch = [] + countries_to_switch = [ ] for country in wrong_regions: - region_correct = wrong_regions[country] + region_correct = wrong_regions[ country ] for region in data: if region == region_correct: continue - if country in data[region]: - print("Found incorrect region " + bold(region) + " for country " + bold(country) + " (correct region: " + bold(region_correct) + ")" ) + if country in data[ region ]: + print("Found incorrect region " + bold(region) + " for country " + bold( + country) + " (correct region: " + bold(region_correct) + ")") countries_to_switch.append((region, country, region_correct, country)) data = correct_data(data, "country", countries_to_switch) - - print("\nAdjustments made to avoid international duplicates (e.g. cruise ships) for generation of color_ordering.tsv:\n") - divisions_to_switch = [] - locations_to_switch = [] + print( + "\nAdjustments made to avoid international duplicates (e.g. cruise ships) for generation of color_ordering.tsv:\n") + divisions_to_switch = [ ] + locations_to_switch = [ ] international_exceptions = read_local_file("international_exceptions.txt") for region in data: - for country in data[region]: - for division in data[region][country]: - if division in international_exceptions["division"]: - (region_correct, country_correct) = tuple(international_exceptions["division"][division][0].split(", ")) + for country in data[ region ]: + for division in data[ region ][ country ]: + if division in international_exceptions[ "division" ]: + (region_correct, country_correct) = tuple( + international_exceptions[ "division" ][ division ][ 0 ].split(", ")) if region == region_correct and country == country_correct: continue - print("division " + division + ": " + region + ", " + country + " => " + region_correct + ", " + country_correct) + print( + "division " + division + ": " + region + ", " + country + " => " + region_correct + ", " + country_correct) divisions_to_switch.append((region, country, division, region_correct, country_correct, division)) - for location in data[region][country][division]: - if location in international_exceptions["location"]: - (region_correct, country_correct, division_correct) = tuple(international_exceptions["location"][location][0].split(", ")) + for location in data[ region ][ country ][ division ]: + if location in international_exceptions[ "location" ]: + (region_correct, country_correct, division_correct) = tuple( + international_exceptions[ "location" ][ location ][ 0 ].split(", ")) if region_correct == region and country_correct == country and division_correct == division: continue - print("location " + location + ": " + region + ", " + country + ", " + division + " => " + region_correct + ", " + country_correct + ", " + division_correct) - locations_to_switch.append((region, country, division, location, region_correct, country_correct, division_correct, location)) - data = correct_data(data, "division", divisions_to_switch, add_annotations = False) #Changes only needed for generation of color_ordering to avoid international duplicates, should stay in original metadata + print( + "location " + location + ": " + region + ", " + country + ", " + division + " => " + region_correct + ", " + country_correct + ", " + division_correct) + locations_to_switch.append(( + region, country, division, location, region_correct, country_correct, division_correct, + location)) + data = correct_data(data, "division", divisions_to_switch, + add_annotations = False) # Changes only needed for generation of color_ordering to avoid international duplicates, should stay in original metadata data = correct_data(data, "location", locations_to_switch, add_annotations = False) print() return data + ##### Step 2.2 Check for "false" division that appear as location elsewhere (known cases stored in false_divisions.txt as well as checking for new cases) def check_false_divisions(data): - # Known false divisions div_as_loc_known = {} known_false_divisions = read_local_file("false_divisions.txt") for region in data: - for country in data[region]: - for division in data[region][country]: + for country in data[ region ]: + for division in data[ region ][ country ]: if division in known_false_divisions: - div_as_loc_known[division] = (region, country, known_false_divisions[division]) - print("False division corrected: " + bold(division) + " (true division: " + bold(known_false_divisions[division]) + ")") + div_as_loc_known[ division ] = (region, country, known_false_divisions[ division ]) + print("False division corrected: " + bold(division) + " (true division: " + bold( + known_false_divisions[ division ]) + ")") data = correct_data(data, "div_to_loc", div_as_loc_known) - # Check for unknown cases: div_as_loc = {} for region in data: - for country in data[region]: - for division in data[region][country]: + for country in data[ region ]: + for division in data[ region ][ country ]: if division != "": - for location in data[region][country][division]: + for location in data[ region ][ country ][ division ]: if location != "": - if location in data[region][country] and location != division: - div_as_loc[location] = (region, country, division) - print("Unknown location found as division: " + bold(location) + " (true division: " + bold(division) + ")") - print("(Suggestion: add " + "[" + "/".join([region, country, location, ""]) + "\t" + "/".join([region, country, division, location]) + "]" + " to manual_adjustments.txt)") - if list(data[region][country][location]) != [""]: - print("Attention: location(s) " + ", ".join(data[region][country][location]) + " would be lost.") + if location in data[ region ][ country ] and location != division: + div_as_loc[ location ] = (region, country, division) + print("Unknown location found as division: " + bold( + location) + " (true division: " + bold(division) + ")") + print("(Suggestion: add " + "[" + "/".join( + [ region, country, location, "" ]) + "\t" + "/".join( + [ region, country, division, location ]) + "]" + " to manual_adjustments.txt)") + if list(data[ region ][ country ][ location ]) != [ "" ]: + print("Attention: location(s) " + ", ".join( + data[ region ][ country ][ location ]) + " would be lost.") print("\n=============================\n") + ##### Step 2.3: Check for duplicate divisions/locations in different countries/divisions (known cases stored in duplicates.txt as well as checking for new cases) def check_duplicate(data): - - #Check known duplicates + # Check known duplicates # TODO: Only locations covered properly (divisions: only alert) duplicates = read_local_file("duplicates.txt") abbreviations = read_local_file("abbreviations.txt") - duplicate_locations = [] + duplicate_locations = [ ] for region in data: - for country in data[region]: - for division in data[region][country]: - for location in data[region][country][division]: + for country in data[ region ]: + for division in data[ region ][ country ]: + for location in data[ region ][ country ][ division ]: if location in duplicates: print("Known duplicate detected: " + bold(location)) if abbreviations.get(division) is not None: - print("Please add [" + "/".join([region, country, division, location]) + "\t" + "/".join([region, country, division, location + " " + abbreviations[division]]) + "] to manual_adjustments.txt") - location_correct = location + " " + abbreviations[division] - duplicate_locations.append((region, country, division, location, region, country, division, location_correct)) + print("Please add [" + "/".join([ region, country, division, location ]) + "\t" + "/".join( + [ region, country, division, + location + " " + abbreviations[ division ] ]) + "] to manual_adjustments.txt") + location_correct = location + " " + abbreviations[ division ] + duplicate_locations.append( + (region, country, division, location, region, country, division, location_correct)) else: - print("No abbreviation for " + division + ", please add one to abbreviations.txt and rerun.") + print( + "No abbreviation for " + division + ", please add one to abbreviations.txt and rerun.") data = correct_data(data, "location", duplicate_locations) - #Check for new cases + # Check for new cases division_to_country = {} location_to_division = {} for region in data: - for country in data[region]: - for division in data[region][country]: + for country in data[ region ]: + for division in data[ region ][ country ]: if division == "": continue if division not in division_to_country: - division_to_country[division] = [] - division_to_country[division].append((country, region)) - for location in data[region][country][division]: + division_to_country[ division ] = [ ] + division_to_country[ division ].append((country, region)) + for location in data[ region ][ country ][ division ]: if location == "": continue if location not in location_to_division: - location_to_division[location] = [] - location_to_division[location].append((division, country, region)) + location_to_division[ location ] = [ ] + location_to_division[ location ].append((division, country, region)) print() cruise_ship_duplicates = 0 - #TODO: a bit chaotic, go over it again + # TODO: a bit chaotic, go over it again for division in division_to_country: - if len(division_to_country[division]) > 1: - if not any(x in division for x in cruise_abbrev): #ignore cruise ship ones - if division_to_country[division][0][1] == division_to_country[division][1][1]: - s = ", ".join([country for (country, region) in division_to_country[division]]) + if len(division_to_country[ division ]) > 1: + if not any(x in division for x in cruise_abbrev): # ignore cruise ship ones + if division_to_country[ division ][ 0 ][ 1 ] == division_to_country[ division ][ 1 ][ 1 ]: + s = ", ".join([ country for (country, region) in division_to_country[ division ] ]) else: - s = ", ".join([country + " (" + region + ")" for (country, region) in division_to_country[division]]) + s = ", ".join( + [ country + " (" + region + ")" for (country, region) in division_to_country[ division ] ]) print("New duplicate division detected: " + bold(division + " (" + s + ")")) else: cruise_ship_duplicates = cruise_ship_duplicates + 1 - if cruise_ship_duplicates: print("("+str(cruise_ship_duplicates)+" cruise ship entries ignored for duplicate divisions)") - + if cruise_ship_duplicates: print( + "(" + str(cruise_ship_duplicates) + " cruise ship entries ignored for duplicate divisions)") cruise_ship_duplicates = 0 cruis_ship_abbrev = 0 for location in location_to_division: - if len(location_to_division[location]) > 1: - if location_to_division[location][0][1] == location_to_division[location][1][1]: - if location_to_division[location][0][2] == location_to_division[location][1][2]: - s = ", ".join([division for (division, country, region) in location_to_division[location]]) + if len(location_to_division[ location ]) > 1: + if location_to_division[ location ][ 0 ][ 1 ] == location_to_division[ location ][ 1 ][ 1 ]: + if location_to_division[ location ][ 0 ][ 2 ] == location_to_division[ location ][ 1 ][ 2 ]: + s = ", ".join([ division for (division, country, region) in location_to_division[ location ] ]) else: - s = ", ".join([division + " (" + country + ", " + region + ")" for (division, country, region) in location_to_division[location]]) + s = ", ".join([ division + " (" + country + ", " + region + ")" for (division, country, region) in + location_to_division[ location ] ]) else: - s = ", ".join([division + " (" + country + ")" for (division, country, region) in location_to_division[location]]) - + s = ", ".join([ division + " (" + country + ")" for (division, country, region) in + location_to_division[ location ] ]) if "Cruise" in location: cruise_ship_duplicates = cruise_ship_duplicates + 1 @@ -931,223 +974,239 @@ def check_duplicate(data): print("New duplicate location detected: " + bold(location + " (in both " + s + ")")) print("Suggestion: Add " + location + " to duplicates.txt") - - for (division, country, region) in location_to_division[location]: + for (division, country, region) in location_to_division[ location ]: if division not in abbreviations: if not any(x in division for x in cruise_abbrev): - print("Attention: Missing abbreviation for " + bold(division) + " (Suggestion: add to abbreviations.txt)") + print("Attention: Missing abbreviation for " + bold( + division) + " (Suggestion: add to abbreviations.txt)") else: cruis_ship_abbrev = cruis_ship_abbrev + 1 - - if cruise_ship_duplicates: print("("+str(cruise_ship_duplicates)+" cruise ship entries ignored for duplicate locations)") - if cruis_ship_abbrev: print("("+ str(cruis_ship_abbrev) + " cruise ship entries ignored for missing state abbreviations)") + if cruise_ship_duplicates: print( + "(" + str(cruise_ship_duplicates) + " cruise ship entries ignored for duplicate locations)") + if cruis_ship_abbrev: print( + "(" + str(cruis_ship_abbrev) + " cruise ship entries ignored for missing state abbreviations)") print("\n=============================\n") + ##### Step 2.4: Check for missing names in ordering and lat_longs as well as return a clean, reduced version of the metadata def check_for_missing(data): data_clean = {} - missing = {"country": [], "division": {}, "location": {}} - clean_missing = {"country": [], "division": {}, "location": {}} # Same as above, but without formatting or notes + missing = {"country": [ ], "division": {}, "location": {}} + clean_missing = {"country": [ ], "division": {}, "location": {}} # Same as above, but without formatting or notes for region in data: - data_clean[region] = {} + data_clean[ region ] = {} - for country in data[region]: - - if country not in ordering["country"] or country not in lat_longs["country"]: + for country in data[ region ]: + if country not in ordering[ "country" ] or country not in lat_longs[ "country" ]: s = bold(country) - if country not in ordering["country"] and country in lat_longs["country"]: + if country not in ordering[ "country" ] and country in lat_longs[ "country" ]: s = s + " (only missing in ordering => auto-added to color_ordering.tsv)" - data_clean[region][country] = {} + data_clean[ region ][ country ] = {} else: - if country in ordering["country"] and country not in lat_longs["country"]: + if country in ordering[ "country" ] and country not in lat_longs[ "country" ]: s = s + " (only missing in lat_longs)" else: - if country in ordering["division"] or country in lat_longs["division"]: + if country in ordering[ "division" ] or country in lat_longs[ "division" ]: s = s + " (present as division)" - missing["country"].append(s) + missing[ "country" ].append(s) if "(only missing in ordering" not in s: - clean_missing["country"].append(country) + clean_missing[ "country" ].append(country) else: - data_clean[region][country] = {} - + data_clean[ region ][ country ] = {} - for division in data[region][country]: + for division in data[ region ][ country ]: if division == "": continue - if division not in ordering["division"] or division not in lat_longs["division"]: + if division not in ordering[ "division" ] or division not in lat_longs[ "division" ]: s = bold(division) name0 = "" if country in hierarchical_ordering.get(region, ""): - name0 = check_similar(hierarchical_ordering[region][country], division, "division") - if division not in ordering["division"] and division in lat_longs["division"]: + name0 = check_similar(hierarchical_ordering[ region ][ country ], division, "division") + if division not in ordering[ "division" ] and division in lat_longs[ "division" ]: s = s + " (only missing in ordering => auto-added to color_ordering.tsv)" - if country not in data_clean[region]: - print("Conflict: division " + division + " should be added to color_ordering.tsv, but country " + country + " is missing from dataset") + if country not in data_clean[ region ]: + print( + "Conflict: division " + division + " should be added to color_ordering.tsv, but country " + country + " is missing from dataset") else: - data_clean[region][country][division] = [] - else: #only check for additional hints like "similar name" or "present as location" if not auto-added to color_ordering - if division in ordering["division"] and division not in lat_longs["division"]: + data_clean[ region ][ country ][ division ] = [ ] + else: # only check for additional hints like "similar name" or "present as location" if not auto-added to color_ordering + if division in ordering[ "division" ] and division not in lat_longs[ "division" ]: s = s + " (only missing in lat_longs)" else: if name0 != "": - s += " (similar name in same country: " + bold(name0) + " - consider adding " + "[" + "/".join([region, country, division, "*"]) + "\t" + "/".join([region, country, name0, "*"]) + "]" + " to manual_adjustments.txt)" - if division in ordering["location"] or division in lat_longs["location"]: + s += " (similar name in same country: " + bold( + name0) + " - consider adding " + "[" + "/".join( + [ region, country, division, "*" ]) + "\t" + "/".join( + [ region, country, name0, "*" ]) + "]" + " to manual_adjustments.txt)" + if division in ordering[ "location" ] or division in lat_longs[ "location" ]: s = s + " (present as location)" - if country not in missing["division"]: - missing["division"][country] = [] - clean_missing["division"][country] = [] - missing["division"][country].append(s) + if country not in missing[ "division" ]: + missing[ "division" ][ country ] = [ ] + clean_missing[ "division" ][ country ] = [ ] + missing[ "division" ][ country ].append(s) if "(only missing in ordering" not in s: - clean_missing["division"][country].append(division) + clean_missing[ "division" ][ country ].append(division) else: - if country not in data_clean[region]: - print("Conflict: division " + division + " should be added to color_ordering.tsv, but country " + country + " is missing from dataset") + if country not in data_clean[ region ]: + print( + "Conflict: division " + division + " should be added to color_ordering.tsv, but country " + country + " is missing from dataset") else: - data_clean[region][country][division] = [] + data_clean[ region ][ country ][ division ] = [ ] - for location in data[region][country][division]: + for location in data[ region ][ country ][ division ]: if location == "": continue - if location not in ordering["location"] or location not in lat_longs["location"]: + if location not in ordering[ "location" ] or location not in lat_longs[ "location" ]: s = bold(location) - name0 = check_similar(hierarchical_ordering[region][country][division], location, "location") if hierarchical_ordering[region].get(country) is not None and hierarchical_ordering[region][country].get(division) else "" - if location not in ordering["location"] and location in lat_longs["location"]: + name0 = check_similar(hierarchical_ordering[ region ][ country ][ division ], location, + "location") if hierarchical_ordering[ region ].get(country) is not None and \ + hierarchical_ordering[ region ][ country ].get(division) else "" + if location not in ordering[ "location" ] and location in lat_longs[ "location" ]: s = s + " (only missing in ordering => auto-added to color_ordering.tsv)" - if country not in data_clean[region]: - print("Conflict: location " + location + " should be added to color_ordering.tsv, but country " + country + " is missing from dataset") + if country not in data_clean[ region ]: + print( + "Conflict: location " + location + " should be added to color_ordering.tsv, but country " + country + " is missing from dataset") else: - if division not in data_clean[region][country]: - if not any(x in location for x in cruise_abbrev) and not any(x in division for x in cruise_abbrev): - print("Conflict: location " + location + " should be added to color_ordering.tsv, but division " + division + " is missing from dataset") + if division not in data_clean[ region ][ country ]: + if not any(x in location for x in cruise_abbrev) and not any( + x in division for x in cruise_abbrev): + print( + "Conflict: location " + location + " should be added to color_ordering.tsv, but division " + division + " is missing from dataset") else: - data_clean[region][country][division].append(location) - else: #only check for additional hints like "similar name" or "present as division" if not auto-added to color_ordering + data_clean[ region ][ country ][ division ].append(location) + else: # only check for additional hints like "similar name" or "present as division" if not auto-added to color_ordering if name0 != "": - s += " (similar name in same division: " + bold(name0) + " - consider adding " + "[" + "/".join([region, country, division, location]) + "\t" + "/".join([region, country, division, name0]) + "]" + " to manual_adjustments.txt)" - if location in ordering["location"] and location not in lat_longs["location"]: + s += " (similar name in same division: " + bold( + name0) + " - consider adding " + "[" + "/".join( + [ region, country, division, location ]) + "\t" + "/".join( + [ region, country, division, name0 ]) + "]" + " to manual_adjustments.txt)" + if location in ordering[ "location" ] and location not in lat_longs[ "location" ]: s = s + " (only missing in lat_longs)" - if location in ordering["division"] or location in lat_longs["division"]: + if location in ordering[ "division" ] or location in lat_longs[ "division" ]: s = s + " (present as division)" if country == "USA" and "County" not in location: - s = s + " (correction to County might be necessary using [" + "/".join([region, country, division, location]) + "\t" + "/".join([region, country, division, location + " County"]) + "]" - - if country not in missing["location"]: - missing["location"][country] = {} - clean_missing["location"][country] = {} - if division not in missing["location"][country]: + s = s + " (correction to County might be necessary using [" + "/".join( + [ region, country, division, location ]) + "\t" + "/".join( + [ region, country, division, location + " County" ]) + "]" + + if country not in missing[ "location" ]: + missing[ "location" ][ country ] = {} + clean_missing[ "location" ][ country ] = {} + if division not in missing[ "location" ][ country ]: if any(x in division for x in cruise_abbrev): - print("Cruise-associated division ignored ("+division+")") + print("Cruise-associated division ignored (" + division + ")") else: - missing["location"][country][division] = [] - clean_missing["location"][country][division] = [] - if not any(x in location for x in cruise_abbrev) and not any(x in division for x in cruise_abbrev): - missing["location"][country][division].append(s) + missing[ "location" ][ country ][ division ] = [ ] + clean_missing[ "location" ][ country ][ division ] = [ ] + if not any(x in location for x in cruise_abbrev) and not any( + x in division for x in cruise_abbrev): + missing[ "location" ][ country ][ division ].append(s) if "(only missing in ordering" not in s: - clean_missing["location"][country][division].append(location) + clean_missing[ "location" ][ country ][ division ].append(location) else: - print("Cruise-associated location ignored ("+location+")") + print("Cruise-associated location ignored (" + location + ")") else: - if country not in data_clean[region]: - print("Conflict: location " + location + " should be added to color_ordering.tsv, but country " + country + " is missing from dataset") + if country not in data_clean[ region ]: + print( + "Conflict: location " + location + " should be added to color_ordering.tsv, but country " + country + " is missing from dataset") else: - if division not in data_clean[region][country]: - if not any(x in location for x in cruise_abbrev) and not any(x in division for x in cruise_abbrev): - print("Conflict: location " + location + " should be added to color_ordering.tsv, but division " + division + " is missing from dataset") + if division not in data_clean[ region ][ country ]: + if not any(x in location for x in cruise_abbrev) and not any( + x in division for x in cruise_abbrev): + print( + "Conflict: location " + location + " should be added to color_ordering.tsv, but division " + division + " is missing from dataset") else: - data_clean[region][country][division].append(location) + data_clean[ region ][ country ][ division ].append(location) - if missing['location']: + if missing[ 'location' ]: print("\n\nMissing locations:") - for country in missing["location"]: + for country in missing[ "location" ]: print("# " + country + " #") - for division in missing["location"][country]: + for division in missing[ "location" ][ country ]: print(division) - for location in missing["location"][country][division]: + for location in missing[ "location" ][ country ][ division ]: print("\tlocation\t" + location) print() else: print("No missing locations") - if missing['division']: + if missing[ 'division' ]: print("\nMissing divisions:") - for country in missing["division"]: + for country in missing[ "division" ]: print("# " + country + " #") - for division in missing["division"][country]: + for division in missing[ "division" ][ country ]: print("division\t" + division) print() else: print("No missing divisions") - if missing['country']: + if missing[ 'country' ]: print("\nMissing countries:") - for country in missing["country"]: + for country in missing[ "country" ]: print("country\t" + country) else: print("No missing countries") - ##### Ask user if they want to look for lat-longs now, or end script for time being. - find_lat_longs = input("\n\nWould you like to look for lat-longs for these places now? y or n \n(it's suggested to make any necessary file additions before this step): ") + find_lat_longs = input( + "\n\nWould you like to look for lat-longs for these places now? y or n \n(it's suggested to make any necessary file additions before this step): ") if find_lat_longs.lower() == 'y': - from geopy.geocoders import Nominatim - geolocator = Nominatim(user_agent="hello@nextstrain.org") - new_lat_longs = [] + geolocator = Nominatim(user_agent = "hello@nextstrain.org") + new_lat_longs = [ ] print("Getting lat-long for missing places:\n") - for country in clean_missing["location"]: + for country in clean_missing[ "location" ]: print("# " + country + " #") - for division in clean_missing["location"][country]: - print("\ndivision: "+division) - for location in clean_missing["location"][country][division]: + for division in clean_missing[ "location" ][ country ]: + print("\ndivision: " + division) + for location in clean_missing[ "location" ][ country ][ division ]: if any(x in location for x in cruise_abbrev): - print(" One cruise ship location ignored ("+ location +").") + print(" One cruise ship location ignored (" + location + ").") continue - full_location = location +", "+ division+", "+country + full_location = location + ", " + division + ", " + country new_lat_longs.append(find_place("location", location, full_location, geolocator)) print() - - for country in clean_missing["division"]: + for country in clean_missing[ "division" ]: print("# " + country + " #") - for division in clean_missing["division"][country]: + for division in clean_missing[ "division" ][ country ]: print("division\t" + division) - full_division = division+", "+country + full_division = division + ", " + country new_lat_longs.append(find_place("division", division, full_division, geolocator)) print() - for country in clean_missing["country"]: + for country in clean_missing[ "country" ]: print(country) new_lat_longs.append(find_place("country", country, country, geolocator)) print("\nNew locations to be written out: ") - print(*new_lat_longs, sep='\n') + print(*new_lat_longs, sep = '\n') - with open(path_to_output_files+"new_lat-longs.tsv", 'w') as out: + with open(path_to_output_files + "new_lat-longs.tsv", 'w') as out: out.write("\n".join(new_lat_longs)) - print("New lat-longs written out to "+path_to_output_files+"new_lat-longs.tsv") + print("New lat-longs written out to " + path_to_output_files + "new_lat-longs.tsv") answer = input("Would you like to use auto-sort for these lat_longs? y or n") if answer == "y": auto_add_lat_longs(new_lat_longs) - print("\n=============================\n") return data_clean @@ -1155,9 +1214,10 @@ def check_for_missing(data): # Get the geo-locator to find a possible location - returns result # call with ex: 'Dallas, Texas, USA', geolocator def ask_geocoder(full_unknown_place, geolocator): - new_place = geolocator.geocode(full_unknown_place, language='en') + new_place = geolocator.geocode(full_unknown_place, language = 'en') return new_place + # Allows user to try typing different locations to get lat-long, or tell to leave blank # Call with ex: 'location', 'Dallas', 'Dallas, Texas, USA', geolocator def find_place(geo_level, place, full_place, geolocator): @@ -1189,7 +1249,7 @@ def find_place(geo_level, place, full_place, geolocator): print("\nCurrent place for missing {}:\t".format(geo_level) + full_place_string) - print("Geopy suggestion: "+ new_place_string) + print("Geopy suggestion: " + new_place_string) answer = input('Is this the right place? Type y or n: ') if answer.lower() == 'y': @@ -1198,8 +1258,8 @@ def find_place(geo_level, place, full_place, geolocator): else: # Let the user correct/have more detail for what's typed - print("For: "+full_place) - typed_place = input("Type a more specific place name or 'NA' to leave blank: ") + print("For: " + full_place) + typed_place = input("Type a more specific place name or 'NA' to leave blank: ") if typed_place.lower() == 'na': print("Writing out a line with blank lat-long to be filled by hand") answer = (geo_level + "\t" + place + "\t") @@ -1208,6 +1268,7 @@ def find_place(geo_level, place, full_place, geolocator): print(answer) return answer + ################################################################################ # Step 3: Storage of locations, divisions etc hierarchical manner ################################################################################ @@ -1220,7 +1281,7 @@ def sort_by_coordinates(data, coordinates): max_long = -150 min_long = 150 for hierarchy in data: - (lat, long) = coordinates[hierarchy] + (lat, long) = coordinates[ hierarchy ] max_lat = max(max_lat, lat) min_lat = min(min_lat, lat) max_long = max(max_long, long) @@ -1233,18 +1294,19 @@ def sort_by_coordinates(data, coordinates): loc_per_coord = {} for loc in data: if loc in coordinates: - coord = coordinates[loc][index] - if coordinates[loc][index] in loc_per_coord: - loc_per_coord[coord].append(loc) + coord = coordinates[ loc ][ index ] + if coordinates[ loc ][ index ] in loc_per_coord: + loc_per_coord[ coord ].append(loc) else: - loc_per_coord[coord] = [loc] + loc_per_coord[ coord ] = [ loc ] else: print("Missing coordinates: " + bold(loc)) - sorted_locs = [] + sorted_locs = [ ] for coord in sorted(loc_per_coord): - sorted_locs.extend(loc_per_coord[coord]) + sorted_locs.extend(loc_per_coord[ coord ]) return sorted_locs + # Write a given hierarchy (location, division, country, region, recency) into the new ordering file. # Sort locations and divisions by coordinates to retain proximity coloring def write_ordering(data, hierarchy): @@ -1252,52 +1314,48 @@ def write_ordering(data, hierarchy): if hierarchy == "location": mode = "w" - with open(path_to_output_files+"color_ordering.tsv", mode) as out: - if hierarchy not in ["region", "country", "division", "location"]: - for l in data[hierarchy]: + with open(path_to_output_files + "color_ordering.tsv", mode) as out: + if hierarchy not in [ "region", "country", "division", "location" ]: + for l in data[ hierarchy ]: out.write(hierarchy + "\t" + l + "\n") out.write("\n################\n\n\n") return # Give fixed order of regions to retain the usual coloring order - region_order = ["Asia", - "Oceania", - "Africa", - "Europe", - "South America", - "North America"] + region_order = [ "Asia", "Oceania", "Africa", "Europe", "South America", "North America" ] for region in region_order: - if hierarchy == "region": out.write("region\t" + region + "\n") continue out.write("\n# " + region + "\n") - for country in sort_by_coordinates(data[region], lat_longs["country"]): #TODO: would be nice to sort this by coordinate too, but would need to add most lat_longs first! + for country in sort_by_coordinates(data[ region ], lat_longs[ + "country" ]): # TODO: would be nice to sort this by coordinate too, but would need to add most lat_longs first! if hierarchy == "country": out.write("country\t" + country + "\n") continue if hierarchy == "location": - if sum([len(data[region][country][d]) for d in data[region][country]]) > 0: # only write country as a comment if there is data following it + if sum([ len(data[ region ][ country ][ d ]) for d in data[ region ][ + country ] ]) > 0: # only write country as a comment if there is data following it out.write("\n### " + country) if hierarchy == "division": - if len(data[region][country]) > 0: + if len(data[ region ][ country ]) > 0: out.write("\n### " + country + "\n") - for division in sort_by_coordinates(data[region][country], lat_longs["division"]): - + for division in sort_by_coordinates(data[ region ][ country ], lat_longs[ "division" ]): if hierarchy == "division": out.write("division\t" + division + "\n") continue - if len(data[region][country][division]) > 0: # only write division as a comment if there is data following it + if len(data[ region ][ country ][ + division ]) > 0: # only write division as a comment if there is data following it out.write("\n# " + division + "\n") - for location in sort_by_coordinates(data[region][country][division], lat_longs["location"]): + for location in sort_by_coordinates(data[ region ][ country ][ division ], lat_longs[ "location" ]): out.write("location\t" + location + "\n") if hierarchy == "location" or hierarchy == "division": @@ -1311,23 +1369,25 @@ def auto_add_annotations(additions_to_annotation): with open("../ncov-ingest/source-data/gisaid_annotations.tsv") as myfile: annotations = myfile.readlines() - types = {"geography": ["location", "division", "country", "region", "division_exposure", "country_exposure", "region_exposure"], "special": ["sampling_strategy", "date", "host", "strain"], "paper": ["title", "paper_url"], "genbank": ["genbank_accession"]} - sections = {"comments": [], "geography": [], "special": [], "paper": [], "genbank": []} + types = {"geography": [ "location", "division", "country", "region", "division_exposure", "country_exposure", + "region_exposure" ], "special": [ "sampling_strategy", "date", "host", "strain" ], + "paper": [ "title", "paper_url" ], "genbank": [ "genbank_accession" ]} + sections = {"comments": [ ], "geography": [ ], "special": [ ], "paper": [ ], "genbank": [ ]} print("The following annotations have unknown type:") - for list in [annotations, additions_to_annotation]: + for list in [ annotations, additions_to_annotation ]: for line in list: if not line.endswith("\n"): line = line + "\n" if line.startswith("#"): - sections["comments"].append(line) + sections[ "comments" ].append(line) continue - t1 = line.split("\t")[2] + t1 = line.split("\t")[ 2 ] type_found = False for t in types: - if t1 in types[t]: - if line not in sections[t]: - sections[t].append(line) + if t1 in types[ t ]: + if line not in sections[ t ]: + sections[ t ].append(line) type_found = True break if not type_found: @@ -1335,7 +1395,7 @@ def auto_add_annotations(additions_to_annotation): with open(path_to_output_files + "gisaid_annotations.tsv", "w") as out: for t in sections: - for l in sorted(sections[t]): + for l in sorted(sections[ t ]): out.write(l) print("New annotation auto-added to " + path_to_output_files + "gisaid_annotations.tsv") @@ -1345,43 +1405,38 @@ def auto_add_annotations(additions_to_annotation): with open(path_to_output_files + "gisaid_annotations.tsv") as list: for line in list: if not line.startswith("#"): - t = line.split("\t")[2] - if t not in types["paper"] and t not in types["genbank"]: - epi = line.split("\t")[1] + t = line.split("\t")[ 2 ] + if t not in types[ "paper" ] and t not in types[ "genbank" ]: + epi = line.split("\t")[ 1 ] if t not in duplicate_check: - duplicate_check[t] = [] - if epi not in duplicate_check[t]: - duplicate_check[t].append(epi) + duplicate_check[ t ] = [ ] + if epi not in duplicate_check[ t ]: + duplicate_check[ t ].append(epi) else: print("Attention: Duplicate annotation for " + epi + ", " + t) - - - - if __name__ == '__main__': - ################################################################################ # Step 0: Read data ################################################################################ # Read current metadata - #path_to_ncov = "../../" # TODO: adjust file structure properly + # path_to_ncov = "../../" # TODO: adjust file structure properly with open("data/downloaded_gisaid.tsv") as myfile: metadata = myfile.readlines() # Read orderings and lat_longs - ordering, ordering_other = read_geography_file("defaults/color_ordering.tsv") #TODO: combine with read_local_files()? + ordering, ordering_other = read_geography_file( + "defaults/color_ordering.tsv") # TODO: combine with read_local_files()? hierarchical_ordering = read_geography_file("defaults/color_ordering.tsv", True) lat_longs = read_geography_file("defaults/lat_longs.tsv") # List that will contain all proposed annotations collected throughout the script - additions_to_annotation = [] + additions_to_annotation = [ ] with open("../ncov-ingest/source-data/gisaid_annotations.tsv") as myfile: annotations = myfile.read() - ################################################################################ # Step 1: Collection of data from metadata file in hierarchical manner ################################################################################ @@ -1392,7 +1447,6 @@ def auto_add_annotations(additions_to_annotation): # Each location (also empty ones) hold a list of all strains & GISAID IDs with this region+country+division+location data = read_metadata(metadata) - ##### Step 1.2: Collection of regions, countries and divisions of exposure # In case some geographic units are only found in the exposure information of the metadata, iterate again over the metadata and add to the dataset # Since travel history related entries are prone to errors, check for each entry whether it collides with already existing data. @@ -1400,7 +1454,6 @@ def auto_add_annotations(additions_to_annotation): # TODO: Currently commented out due to numerous inconsistencies data = read_exposure(data, metadata) - ################################################################################ # Step 2: Clean up data ################################################################################ @@ -1413,7 +1466,7 @@ def auto_add_annotations(additions_to_annotation): data = adjust_to_database(data) ##### Step 2.1: Apply all known variants stored in an external file variants.txt - data = apply_typical_errors(data) #TODO: do this earlier (before reading metadata), join with UK as region? + data = apply_typical_errors(data) # TODO: do this earlier (before reading metadata), join with UK as region? data = apply_variants(data) ##### Step 2.2 Check for "false" division that appear as location elsewhere (known cases stored in false_divisions.txt as well as checking for new cases) @@ -1423,7 +1476,8 @@ def auto_add_annotations(additions_to_annotation): check_duplicate(data) ##### Step 2.4: Check for missing names in ordering and lat_longs as well as return a clean, reduced version of the metadata - data = check_for_missing(data) # =====> From here on, strains are dropped, only region/country/division/location remain + data = check_for_missing( + data) # =====> From here on, strains are dropped, only region/country/division/location remain ################################################################################ # Step 3: Storage of locations, divisions etc hierarchical manner @@ -1437,7 +1491,7 @@ def auto_add_annotations(additions_to_annotation): write_ordering(ordering_other, type) ##### Bonus step: Print out all collected annotations - if considered correct, they can be copied by the user to annotations.tsv - with open(path_to_output_files+"new_annotations.tsv", 'w') as out: + with open(path_to_output_files + "new_annotations.tsv", 'w') as out: out.write("\n".join(sorted(additions_to_annotation))) print("New annotation additions written out to " + path_to_output_files + "new_annotations.tsv") @@ -1445,20 +1499,22 @@ def auto_add_annotations(additions_to_annotation): # Only print line if not yet present # Print warning if this GISAID ID is already in the file - lines_exclude = ["title", "authors", "paper_url", "genbank_accession", "sampling_strategy"] - annot_lines_to_write = [] + lines_exclude = [ "title", "authors", "paper_url", "genbank_accession", "sampling_strategy" ] + annot_lines_to_write = [ ] for line in additions_to_annotation: if line in annotations: continue - #print(line) + # print(line) if len(line.split("\t")) == 4: - epi = line.split("\t")[1] + epi = line.split("\t")[ 1 ] if epi in annotations: - number_of_occurences = annotations.count(line.split("\t")[1]) - irrelevant_occurences = sum([(line.split("\t")[1] + "\t" + s) in annotations for s in lines_exclude]) + number_of_occurences = annotations.count(line.split("\t")[ 1 ]) + irrelevant_occurences = sum( + [ (line.split("\t")[ 1 ] + "\t" + s) in annotations for s in lines_exclude ]) if number_of_occurences > irrelevant_occurences: for l in annotations.split("\n"): if epi in l: if not l.startswith("#"): - print("Warning: " + epi + " already exists in annotations! (" + bold(line.split("\t")[2]) + " " + line.split("\t")[3] + " vs " + bold(l.split("\t")[2]) + " " + l.split("\t")[3] + ")") - + print("Warning: " + epi + " already exists in annotations! (" + bold( + line.split("\t")[ 2 ]) + " " + line.split("\t")[ 3 ] + " vs " + bold( + l.split("\t")[ 2 ]) + " " + l.split("\t")[ 3 ] + ")") diff --git a/scripts/developer_scripts/parse_new_sequences.py b/scripts/developer_scripts/parse_new_sequences.py index 5c64db400..f869e7eb6 100644 --- a/scripts/developer_scripts/parse_new_sequences.py +++ b/scripts/developer_scripts/parse_new_sequences.py @@ -154,19 +154,19 @@ def check_for_recency(counts, list_of_strains, lab_collection, path_to_metadata, print("\nSearching for twitter handles... ") - rare_countries = [] - for c in counts: - if c != "United Kingdom": - if c not in countries or countries[c] <= 20: - rare_countries.append(c) + rare_countries = [ + c + for c in counts + if c != "United Kingdom" and (c not in countries or countries[c] <= 20) + ] lab_dictionary = read_excel_lab_file(table_file_name) lab_collection_present = {} - for country in subm_labs: + for country, value in subm_labs.items(): if country not in lab_collection_present: lab_collection_present[country] = {} - for lab in subm_labs[country]: + for lab in value: n = subm_labs[country][lab] if country in lab_dictionary and lab.lower() in lab_dictionary[country]: k = lab_dictionary[country][lab.lower()] @@ -268,27 +268,22 @@ def check_dates(data, today): data.pop(id) continue - #Check for early dates - #if (year == 2020 and (month == 2 or month == 1)) or year == 2019: - #suspicious_sample_date[strain] = date - clade = data[id]["Nextstrain_clade"] dev = data[id]["clock_deviation"] if clade == "": print("Clade missing for sequence " + id) + elif clade not in clade_dates: + print("Unknown clade " + clade + " for sequence " + id) else: - if clade not in clade_dates: - print("Unknown clade " + clade + " for sequence " + id) - else: - clade_day = clade_dates[clade] - day_clade = int(clade_day[8:]) - month_clade = int(clade_day[5:7]) - year_clade = int(clade_day[:4]) - - if (year < year_clade) or (year == year_clade and month < month_clade) or (year == year_clade and month == month_clade and day < day_clade): - suspicious_sample_date[strain] = date + " (" + clade + ", clock deviation = " + dev + ")" - data.pop(id) - continue + clade_day = clade_dates[clade] + day_clade = int(clade_day[8:]) + month_clade = int(clade_day[5:7]) + year_clade = int(clade_day[:4]) + + if (year < year_clade) or (year == year_clade and month < month_clade) or (year == year_clade and month == month_clade and day < day_clade): + suspicious_sample_date[strain] = date + " (" + clade + ", clock deviation = " + dev + ")" + data.pop(id) + continue invalid_dates_by_country = {} @@ -304,15 +299,15 @@ def check_dates(data, today): print("\n----------------------------------------------\n") print("Invalid sample dates (automatically excluded from total counts):") - for country in invalid_dates_by_country: + for country, value_ in invalid_dates_by_country.items(): print(country) for date in invalid_dates_by_country[country]: - print(date + " (" + str(invalid_dates_by_country[country][date]) + ")") + print(date + " (" + str(value_[date]) + ")") print("") print("\nSample date before clade (automatically excluded from total counts):") - for strain in suspicious_sample_date: - print(strain + ": " + suspicious_sample_date[strain]) + for strain, value in suspicious_sample_date.items(): + print(strain + ": " + value) return data @@ -322,11 +317,10 @@ def check_dates(data, today): # Check for certain unique properties and potentially exclude (e.g. all sequences from a certain submission lab) def check_flagged_properties(data): - flagged_strains = {} - for p in flagged_properties: - flagged_strains[p] = {} - for name in flagged_properties[p]: - flagged_strains[p][name] = [] + flagged_strains = { + p: {name: [] for name in flagged_properties[p]} + for p in flagged_properties + } seqs_found = False for id in list(data.keys()): @@ -349,10 +343,10 @@ def check_flagged_properties(data): with open(path_to_outputs + "sequences_exclude.txt", "w") as out: out.write("\n\nStrains to add to exclude (based on flagged properties):\n") - for p in flagged_strains: + for p, value in flagged_strains.items(): for name in flagged_properties[p]: out.write(p + " = \"" + name + "\":\n") - for strain in flagged_strains[p][name]: + for strain in value[name]: out.write(strain + "\n") out.write("\n") @@ -401,26 +395,33 @@ def print_counts(data): counts[country][division] += 1 sum_total = 0 - for country in counts: - sum_country = 0 - for division in counts[country]: - sum_country += counts[country][division] + for country, value_ in counts.items(): + sum_country = sum(value_[division] for division in counts[country]) sum_total += sum_country print("\n----------------------------------------------\n") print("Total counts: " + str(sum_total)) with open(path_to_outputs + "tweet_resources.txt", "w") as out: - for country in counts: + for country, value in counts.items(): s = country + ": " sum_country = 0 for division in counts[country]: - sum_country += counts[country][division] + sum_country += value[division] s = s + str(sum_country) if len(counts[country]) == 1: s = s + " (" + division + ")" else: - s = s + " (" + ", ".join([str(counts[country][division]) + " " + division for division in counts[country]]) + ")" + s = ( + s + + " (" + + ", ".join( + str(counts[country][division]) + " " + division + for division in counts[country] + ) + + ")" + ) + print(s) out.write(s + "\n") out.write("\n\n\n") @@ -455,12 +456,9 @@ def collect_labs(data, table_file_name): originating_lab = data[id]["originating_lab"] author = data[id]["authors"] - if region not in submitting_labs: - submitting_labs[region] = {} - if country not in submitting_labs[region]: - submitting_labs[region][country] = [] - if submitting_lab not in submitting_labs[region][country]: - submitting_labs[region][country].append(submitting_lab) + _extracted_from_collect_labs_13( + region, submitting_labs, country, submitting_lab + ) if region not in originating_labs: originating_labs[region] = {} @@ -469,20 +467,13 @@ def collect_labs(data, table_file_name): if originating_lab not in originating_labs[region][country] and originating_lab != submitting_lab: originating_labs[region][country].append(originating_lab) - if region not in authors: - authors[region] = {} - if country not in authors[region]: - authors[region][country] = [] - if author not in authors[region][country]: - authors[region][country].append(author) - - + _extracted_from_collect_labs_13(region, authors, country, author) lab_dictionary = read_excel_lab_file(table_file_name) lab_UK = lab_dictionary["United Kingdom"]["COVID-19 Genomics UK Consortium".lower()] lab_collection = {} print("\nSubmitting labs:\n(Note: small differences in spelling might cause lab to not be identified. Consider adjusting the spelling in the spreadsheet!)\n") - for region in submitting_labs: + for region, value_ in submitting_labs.items(): if region not in lab_collection: lab_collection[region] = {} for country in sorted(submitting_labs[region]): @@ -490,7 +481,7 @@ def collect_labs(data, table_file_name): lab_collection[region][country] = [] s = country + ":\n" - for lab in submitting_labs[region][country]: + for lab in value_[country]: s += lab + ": " if country in lab_dictionary and lab.lower() in lab_dictionary[country]: k = lab_dictionary[country][lab.lower()] @@ -508,10 +499,10 @@ def collect_labs(data, table_file_name): print("----------------------------------------------\n") print("Originating labs (only printed if found in excel sheet):\n") - for region in originating_labs: + for region, value__ in originating_labs.items(): for country in originating_labs[region]: s = country + ":\n" - for lab in originating_labs[region][country]: + for lab in value__[country]: if country in lab_dictionary and lab.lower() in lab_dictionary[country]: s += lab s += ": " @@ -529,8 +520,8 @@ def collect_labs(data, table_file_name): print("----------------------------------------------\n") print("Authors (only printed if found in excel sheet):\n") - for region in authors: - for country in authors[region]: + for region, value in authors.items(): + for country in value: s = country + ":\n" for author in authors[region][country]: if country in lab_dictionary and author.lower() in lab_dictionary[country]: @@ -548,12 +539,22 @@ def collect_labs(data, table_file_name): print(s) - if "Europe" in lab_collection: - if "United Kingdom" in lab_collection["Europe"]: - lab_collection["Europe"]["United Kingdom"] = [lab_UK] + if ( + "Europe" in lab_collection + and "United Kingdom" in lab_collection["Europe"] + ): + lab_collection["Europe"]["United Kingdom"] = [lab_UK] return lab_collection +def _extracted_from_collect_labs_13(region, arg1, country, arg3): + if region not in arg1: + arg1[region] = {} + if country not in arg1[region]: + arg1[region][country] = [] + if arg3 not in arg1[region][country]: + arg1[region][country].append(arg3) + @@ -574,9 +575,9 @@ def overview_with_dates(data, file_name): with open(file_name, "w") as myfile: myfile.write("strain\tsampling date\tsubmission date\n") - for country in data_sorted: + for country, value in data_sorted.items(): myfile.write(country + "\n") - for s in data_sorted[country]: + for s in value: myfile.write(s + "\n") myfile.write("\n") @@ -598,10 +599,10 @@ def filter_for_date_region(data, path_to_outputs, params): with open(path_to_outputs + "special_check_" + region + "_" + str(month) + ".txt", "w") as myfile: myfile.write("New sequences from " + region + " after month " + str(month) + "\n\n") - for country in special_strains: + for country, value in special_strains.items(): myfile.write(country + "\n") for date in sorted(special_strains[country]): - myfile.write(date + ": " + str(special_strains[country][date]) + "\n") + myfile.write(date + ": " + str(value[date]) + "\n") myfile.write("\n") def prepare_tweet(counts, total_lab_collection, lab_collection): @@ -634,7 +635,7 @@ def prepare_tweet(counts, total_lab_collection, lab_collection): the = ["USA", "United Kingdom", "Democratic Republic of the Congo"] counts_country = {region: {country: sum(counts[country].values()) for country in total_lab_collection[region]} for region in total_lab_collection} - total = sum([sum(counts_country[region].values()) for region in counts_country]) + total = sum(sum(counts_country[region].values()) for region in counts_country) start_tweet = "Thanks to #opendata sharing via @GISAID, we've updated nextstrain.org/ncov with " + str( total) + " new #COVID19 #SARSCoV2 sequences!" @@ -649,13 +650,13 @@ def prepare_tweet(counts, total_lab_collection, lab_collection): length_prediction = [len(country) + len(", ".join(lab_collection[region][country])) for country in lab_collection[region]] if sum(length_prediction) > char_available: countries_extra = [] #extra large countries - while len(length_prediction) > 0 and max(length_prediction) > char_available: + while length_prediction and max(length_prediction) > char_available: i = np.argmax(length_prediction) countries_extra.append([countries_list[i]]) countries_list.pop(i) length_prediction.pop(i) - if len(countries_list) > 0: + if countries_list: countries = [] while(sum(length_prediction) > char_available): @@ -666,12 +667,11 @@ def prepare_tweet(counts, total_lab_collection, lab_collection): length_prediction = length_prediction[k:] countries.append(countries_list) - countries = countries + countries_extra + countries += countries_extra else: countries = countries_extra - i = 1 - for countries_list in countries: + for i, countries_list in enumerate(countries, start=1): h = [] for country in countries_list: @@ -683,8 +683,6 @@ def prepare_tweet(counts, total_lab_collection, lab_collection): if i > 1: r += str(i) tweet_collection_split[r] = (c, h) - i += 1 - else: h = [] for country in lab_collection[region]: @@ -696,10 +694,8 @@ def prepare_tweet(counts, total_lab_collection, lab_collection): tweet_collection_full[region] = (c, h) lengths[region] = len(", ".join(c)) + len(", ".join(h)) + len(links.get(region, "")) - tweet = [] - tweet.append((start_tweet + "\n\n", "\n\n[pic_Global]")) - - while len(lengths) > 0: + tweet = [(start_tweet + "\n\n", "\n\n[pic_Global]")] + while lengths: current_region = min(lengths, key=lengths.get) best_partner = "" current_length = lengths[current_region] @@ -723,16 +719,8 @@ def prepare_tweet(counts, total_lab_collection, lab_collection): l += " and " + links[best_partner] p += " " + "[pic_" + best_partner.replace(" ", "") + "]" - if len(c) > 1: - c = ", ".join(c[:-1]) + " and " + c[-1] - else: - c = c[0] - - if current_length > char_available: - h = " ".join(h) - else: - h = ", ".join(h) - + c = ", ".join(c[:-1]) + " and " + c[-1] if len(c) > 1 else c[0] + h = " ".join(h) if current_length > char_available else ", ".join(h) starter = random.choice(starters) s = starter[0] + c + starter[1] + l + ".\n\n" s += "(Thanks to " + h + ")\n\n" @@ -748,11 +736,7 @@ def prepare_tweet(counts, total_lab_collection, lab_collection): else: starter = random.choice(starters_split) l = "" - if len(c) > 1: - c = ", ".join(c[:-1]) + " and " + c[-1] - else: - c = c[0] - + c = ", ".join(c[:-1]) + " and " + c[-1] if len(c) > 1 else c[0] if len(", ".join(c)) + len(", ".join(h)) + len(l) > char_available: h = " ".join(h) else: @@ -780,7 +764,7 @@ def prepare_tweet_new_format(counts, rare_labs): counts_country = {region: {country: sum(counts[country].values()) for country in lab_collection[region]} for region in lab_collection} - total = sum([sum(counts_country[region].values()) for region in counts_country]) + total = sum(sum(counts_country[region].values()) for region in counts_country) start_tweet = "Thanks to #opendata sharing by @GISAID, we've updated nextstrain.org/ncov with " + str( total) + " new #COVID19 #SARSCoV2 sequences!" diff --git a/scripts/explicit_translation.py b/scripts/explicit_translation.py index ed0afc646..745effff5 100644 --- a/scripts/explicit_translation.py +++ b/scripts/explicit_translation.py @@ -1,4 +1,3 @@ - import argparse import json from Bio import Phylo, SeqIO @@ -25,12 +24,7 @@ node_data = {} for gene, translation in zip(genes, translations): - seqs = [] - for s in SeqIO.parse(translation, 'fasta'): - if s.id in leafs: - seqs.append(s) - - + seqs = [s for s in SeqIO.parse(translation, 'fasta') if s.id in leafs] tt = TreeAnc(tree=T, aln=MultipleSeqAlignment(seqs), alphabet='aa') tt.infer_ancestral_sequences(reconstruct_tip_states=True) diff --git a/scripts/mutation_summary.py b/scripts/mutation_summary.py index 627f776e9..b98d38b72 100644 --- a/scripts/mutation_summary.py +++ b/scripts/mutation_summary.py @@ -12,7 +12,7 @@ def read_reference(fname, genemap): ref = str(SeqIO.read(fname, 'fasta').seq) except: with open(fname, 'r') as fh: - ref = "".join([x.strip() for x in fh]) + ref = "".join(x.strip() for x in fh) translations = {} with open(genemap, 'r') as fh: @@ -47,7 +47,11 @@ def to_mutations(aln_file, ref, aa=False): for si, (name, seq) in enumerate(SimpleFastaParser(fh)): if si%1000==0 and si: print(f"sequence {si}") - res[name] = ",".join([f"{a}{p}{d}" for a,p,d in get_differences(ref, seq, ambiguous)]) + res[name] = ",".join( + f"{a}{p}{d}" + for a, p, d in get_differences(ref, seq, ambiguous) + ) + return res diff --git a/scripts/sanitize_metadata.py b/scripts/sanitize_metadata.py index 15c7891f3..6ac8797f9 100644 --- a/scripts/sanitize_metadata.py +++ b/scripts/sanitize_metadata.py @@ -143,7 +143,7 @@ def resolve_duplicates(metadata, strain_field, error_on_duplicates=False): # "?"), we end up returning the last record for a given strain as a # reasonable default. sort_fields = [strain_field] - if len(accession_fields) > 0: + if accession_fields: sort_fields.extend(accession_fields) # Return the last record from each group after sorting by strain and diff --git a/scripts/sanitize_sequences.py b/scripts/sanitize_sequences.py index 41e0a5957..bb3b61ef5 100644 --- a/scripts/sanitize_sequences.py +++ b/scripts/sanitize_sequences.py @@ -77,7 +77,7 @@ def drop_duplicate_sequences(sequences, error_on_duplicates=False): yield sequence # Report names of duplicate strains with different sequences when requested. - if len(duplicate_strains) > 0 and error_on_duplicates: + if duplicate_strains and error_on_duplicates: raise DuplicateSequenceError(", ".join(duplicate_strains))