-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmusefcsparser.py
381 lines (320 loc) · 12.5 KB
/
musefcsparser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# -*- coding: utf-8 -*-
# pylint: disable=C0325
# pylint: disable=C0103
# pylint: disable=R1710
"""
@author = Ivan Pokrovac
pylint global evaluation = 9.76/10
"""
import pandas as pd
import numpy as np
pd.options.mode.chained_assignment = None
class MuseFCSCreator():
"""
Documentation
"""
def __init__(self):
self.samples = None
self.data = None
self.meta = None
self.channel_names = None
self.header = None
self.text = None
self.meta_information = None
def read_all_headers(self, buf):
"""
This functions reads all headers in a given .fcs file
Information that headers extract is:
[1] Where TEXT begins (byte)
[2] How long TEXT is (byte)
[3] Where DATA begins (byte)
[4] How long DATA is (byte)
This information is then stored in pandas DataFrame structure
Parameters
----------
buf : a buffer like data opened in read binary mode.
Returns
-------
Pandas DataFrame n x 4 where columns are where TEXT begins, how long it is, where
DATA begins, how long is it
This dataframe is fed into "self.header" attribute
"""
def header_read(buf, begin=0):
"""
This function reads from a single header in a given .fcs file
Values of header are recorded in a list "listvar"
Position of next header is extracted by looking at where DATA ends
and adding +1 to that
Parameters
----------
buf : a buffer like data opened in read binary mode
begin : byte offset. The default is 0.
Returns
-------
Next offset - position of a next header
List of [TEXT BEGIN, TEXT SIZE, DATA BEGIN, DATA SIZE] in bytes
"""
buf.seek(begin) # starting at the given offset
stringvar = str(buf.read(56)) # reading header
listvar = stringvar.split() # spliting header
listvar.pop(0) # first element of header is "FCS" and it's useless
while len(listvar) > 4: # listvar needs only 4 elements, and elements are removed from
listvar.pop() # the tail until list is 4 elements long
# offsets are converted into string
listvar = [int(x) for x in listvar]
next_offset = listvar[-1]+1 # next offset is calculated
text_begin = listvar[0]
# the difference of BEGIN and END gives size-1
text_size = listvar[1]-listvar[0]
data_begin = listvar[2]
# the difference of BEGIN and END gives size-1
data_size = listvar[3]-listvar[2]
listvar = [text_begin, text_size, data_begin, data_size]
return(next_offset, listvar)
n = 0
offsets = [n]
list_of_lists = []
while True: # this loop ensures that entire .fcs file is read
try:
# begining of the .fcs is 0 bytes
next_off, listvar = header_read(buf, n)
n = next_off+n # offsets are summed together
offsets.append(n) # and put in a list
list_of_lists.append(listvar)
except ValueError:
break # ends the loop
header = pd.DataFrame(list_of_lists, columns=[
"text begin", "text size", "data begin", "data size"]) # header is loaded into dataframe
offsets.pop() # last offset is removed, as it is unnecessary
offsets = np.array(offsets)
# adding offsets to begin is necessary because
header["text begin"] = header["text begin"]+offsets
# MUSE does not have proper $NEXTDATA start
header["data begin"] = header["data begin"]+offsets
self.header = header
def read_texts(self, buf):
"""
This function creates a dataframe that contains all TEXT data
Dataframe is of shape N x M where N is number of samples, and M is the number of TEXT parts
Parameters
----------
buf : a buffer like data opened in read binary mode
Returns
-------
Dataframe of shape N x M where N is number of samples, and M is the number of TEXT parts.
Dataframe is fed into "self.text" attribute
"""
def dictionary_make(textstring, delimiter_text="/"):
"""
Creates a proper dictionary of TEXT part of .fcs file
Parameters
----------
textstring : string of type b'TEXT' that is split .
delimiter_text : TYPE, splitter
DESCRIPTION. The default is "/".
Returns
-------
None.
"""
first_list = textstring.split(delimiter_text)
first_list.pop(0) # remove b'
first_list.pop() # remove '
n = 0
dic = {}
while n < len(first_list):
key = first_list[n]
n = n+1
value = first_list[n]
n = n+1
dic[key] = value
return(dic)
# FIRST CRITICAL POINT
header = self.header
dic_list = []
begins = header["text begin"] # where TEXTs begin
sizes = header["text size"] # how long they are
for i in range(len(begins)):
begin = begins[i]
size = sizes[i]+1
buf.seek(begin)
textstring = str(buf.read(size))
dic_list.append(dictionary_make(textstring))
textdf = pd.DataFrame(dic_list) # textdf is made from dictionary list
textdf["$BEGINDATA"] = header["data begin"]
textdf["$DATASIZE"] = header["data size"]
self.text = textdf
# this complicated line gets the names of channels for the dataset
ch_names = [str(list(dict.fromkeys((textdf["$P"+str(i)+"S"])))[0])
for i in range(1, int(list(dict.fromkeys((textdf["$PAR"])))[0])+1)]
self.channel_names = ch_names
def read_data(self, buf):
"""
This function reads data from DATA segment of .FCS file
For it to work, it needs to have the name of all the channels
Number of parameters recorded "$PAR"
And total number of events "$TOT" recorded for each dataset
All of that information is given in TEXT
Therefore it's paramount that TEXT is read first'
Parameters
----------
buf : a buffer like data opened in read binary mode.
Returns
-------
Dataframe of shape (N x TOT(N)) x PAR+1 where N is the number of samples
TOT(N) number of events recorded per sample, and PAR is the number of parameters recorded
First column of the Dataframe is titled "Sample" and it
contains what sample is the data point from
this dataframe is fed into "self.data" attribute
"""
# SECOND CRITICAL POINT
textdf = self.text
names = self.channel_names
relevant = textdf[["GTI$SAMPLEID", "$BEGINDATA", "$DATASIZE"]]
relevant["$PAR"] = textdf["$PAR"].astype(int)
relevant["$TOT"] = textdf["$TOT"].astype(int)
segment_storage = []
for i in range(len(relevant)):
buf.seek(relevant["$BEGINDATA"][i])
segment = buf.read(relevant["$DATASIZE"][i]+1)
# THIS IS THE MOST IMPORTANT PART
segment_value = np.frombuffer(segment, dtype=np.float32)
# reshaping the array into proper form
segment_value = segment_value.reshape(
(relevant["$TOT"][i], relevant["$PAR"][i]))
segment_frame = pd.DataFrame(data=segment_value, columns=names)
segment_frame.insert(0, "Sample", relevant["GTI$SAMPLEID"][i])
segment_storage.append(segment_frame)
data = pd.concat(segment_storage)
####DODATI RESET INDEX#####
self.data = data
self.samples = list(relevant["GTI$SAMPLEID"])
def fix_metadata(self, buf):
"""
This function simply fixes metadata in a digestible dictionary form
Information about differences in sample is found in "#SAMPLE INFORMATION"
and log of events that MUSE records is found in "#LOG OF EVENTS"
Parameters
----------
buf : a buffer like data opened in read binary mode.
Returns
-------
Dictionary of meta information
This dictionary is fed in "self.meta_information" attribute
"""
meta_df = self.text.copy(deep=True)
together_dic = {}
for column in meta_df.columns:
if len(set(meta_df[column])) == 1 or np.sum(meta_df[column].isna()) > 0:
together_dic[column] = meta_df[column][0]
meta_df.drop(column, axis=1, inplace=True)
together_dic["#SAMPLE INFORMATION"] = meta_df
beglog = int(together_dic["GTI$BEGINLOG"])
endlog = int(together_dic["GTI$ENDLOG"])
buf.seek(beglog)
log = [str(x) for x in buf.read(endlog).splitlines()]
together_dic["#LOG OF EVENTS"] = log
self.meta_information = together_dic
self.meta=self.text
return()
def unify_channel_names(self):
"""
This functions makes sure that all channel names are uniformly named
MUSE assay changes the names of channels, but ultimately all channels are the same
FSC-HLin, FSC-HLog, RED-HLin, RED-HLog, YEL-HLin, YEL-HLog, FSC-W
as well as Time
Returns
-------
Modifies the self.data segment of object, as well as updating channel_names attribute
"""
channel_dic = {}
for item in self.channel_names:
channel_dic[item] = item[item.find("(")+1: item.find(")")]
if item == "Time":
channel_dic[item] = item
self.channel_names = [channel_dic[item] for item in self.channel_names]
self.data.rename(channel_dic, inplace=True, axis="columns")
def name_dataset_from_path(self, path):
"""
This function inserts a column titled "Name" into dataset
Default value of this title is the name of .FCS file being read
Parameters
----------
path : .FCS file location.
Returns
-------
Modifies self.data segment of object, inserting "Name" column
"""
elements = path.split("/")
while len(elements[-1]) == 0:
elements.pop()
data_name = elements[-1]
self.data.insert(0, "Name", data_name)
return(data_name)
def operate(self, path):
"""
Executes all the methods in the class
After execution, class has attributes .data and .meta
"""
buf = open(path, "rb")
self.read_all_headers(buf)
self.read_texts(buf)
self.read_data(buf)
self.fix_metadata(buf)
buf.close()
self.unify_channel_names()
self.name_dataset_from_path(path)
# END
def parse(path, what="data"):
"""
Parses .FCS file from a given path
Parameters
----------
path : path to the .FCS file.
what : What is returned :
"data" -> just data
"full" -> meta, data
"obj" ->museparser object
".
"""
creat = MuseFCSCreator()
creat.operate(path)
if what == "data":
return(creat.data)
if what == "full":
return((creat.meta, creat.data))
if what == "obj":
return(creat)
def text_explanation(path):
"""
From .TXT file (must be named the same as .FCS file, minus the extension) that contains
explanation for each sample entry
Parameters
----------
path : Location of .FCS file
Returns
-------
.FCS data with Sample, Replicate, and Name columns according to the .TXT file
"""
text_path = path.replace(".FCS", ".txt")
file = open(text_path, "r")
lines = file.read().splitlines()
ts_dic = {}
new_lines = []
for item in lines:
if item not in ts_dic.keys():
ts_dic[item] = 1
line = item+":"+str(ts_dic[item])
else:
ts_dic[item] = ts_dic[item]+1
line = item+":"+str(ts_dic[item])
new_lines.append(line)
file.close()
sample_lines = lines
replicate_lines = new_lines
data = parse(path)
keys = keys = list(dict.fromkeys(data["Sample"]))
sample_dictionary = dict(zip(keys, sample_lines))
replicate_dictionary = dict(zip(keys, replicate_lines))
data["Replicate"] = data["Sample"].map(replicate_dictionary)
data["Sample"] = data["Sample"].map(sample_dictionary)
return(data)