forked from Hua-CM/HuaSmallTools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
parse_uniprot_header.py
92 lines (82 loc) · 3.96 KB
/
parse_uniprot_header.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# -*- coding: utf-8 -*-
# @Time : 2020/11/15 13:49
# @Author : Zhongyi Hua
# @FileName: parse_uniprot_header.py
# @Usage:
# @Note:
# @E-mail: [email protected]
import pandas as pd
import re
class UniprotParse:
def __init__(self, _input_fasta):
self.input = _input_fasta
self.output = None
def parse(self):
with open(self.input, 'r', encoding='utf-8') as f:
_header_list = [line.strip('>') for line in f if line.startswith('>')]
_header_parse_list = []
for _header in _header_list:
_ele_dict = {}
prefix_pre = None
for _ele in _header.split():
prefix_now = re.match('sp\||OX=|OS=|GN=|PE=|SV=', _ele)
if prefix_now:
prefix_pre = prefix_now.group().strip("=")
try:
_ele_dict[prefix_pre] = [_ele.split('=')[1]]
except IndexError:
_ele_dict[prefix_pre] = [_ele]
else:
_ele_dict[prefix_pre].append(_ele)
_ele_dict2 = {}
for _key, _value in _ele_dict.items():
if _key == 'sp|':
_ele_dict2['ID'], _ele_dict2['Entry'] = _value[0].split('|')[1:3]
_ele_dict2['Description'] = ' '.join(_value[1:])
else:
_ele_dict2[_key] = ' '.join(_value)
_header_parse_list.append(_ele_dict2)
self.output = pd.DataFrame(_header_parse_list)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description="This is the script to get Uniprot fasta header information"
"and use it intrepret BLAST results")
sub_parser = parser.add_subparsers(title='', dest='interpret/parse')
sub_parser.required = True
parse_parser = sub_parser.add_parser(
'parse', help='Parse Uniprot fasta headers to a table')
parse_parser.add_argument('-i', '--input_file', required=True,
help='<filepath> The uniprot fasta')
parse_parser.add_argument('-o', '--output_file', required=True,
help='<filepath> The output path')
parse_parser.set_defaults(subcmd="parse")
interpret_parser = sub_parser.add_parser(
'interpret', help='Interpret BLAST results')
interpret_parser.add_argument('-i', '--input_file', required=True,
help='<filepath> The BLAST result, only format six is acceptable')
interpret_parser.add_argument('-u', '--uniprot', required=True,
help='<filepath> The niprot fasta header information generated by "parse" function')
interpret_parser.add_argument('-c', '--column', required=True, type=int,
help='<int> Specify which column in BLAST result contains the identifier of Uniprot')
interpret_parser.add_argument('-o', '--output_file', required=True,
help='<filepath> The output path')
interpret_parser.set_defaults(subcmd="interpret")
args = parser.parse_args()
if args.subcmd == "parse":
uni = UniprotParse(args.input_file)
uni.parse()
uni.output.to_csv(args.output_file, index=False, sep='\t')
if args.subcmd == "interpret":
blast_result = pd.read_table(args.input_file, header=None)
uniprot_info = pd.read_table(args.uniprot)
try:
blast_result[args.column-1] = blast_result[args.column-1].apply(lambda x: x.split('|')[1])
except:
pass
finally:
result = pd.merge(blast_result, uniprot_info[['ID', 'GN', 'Description']],
left_on=args.column-1,
right_on='ID',
how='left')
result.drop('ID', axis=1, inplace=True)
result.to_csv(args.output_file, header=False, index=False, sep='\t', float_format='%.3g')