-
Notifications
You must be signed in to change notification settings - Fork 0
/
data.py
106 lines (74 loc) · 3.09 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import zstandard as zstd
import json
import io
import glob
import random
from typing import Union, List, Tuple, Iterator
# performs line formatting. Nothing really needs to be done except extract the relevant field from the json dict
def _read_line(line: str) -> Union[str, None]:
line = line.strip()
# might not read data properly(shouldn't happen with the TextIOWrapper thing, but just in case).
try:
data = json.loads(line)
except ValueError:
return None
# if successful, data will be a dictionary, and we're only interested in the text.
return data['text']
# reads num_per_chunk documents from a file to prevent loading all the file in memory
def _read_file_chunk(text_stream: io.TextIOWrapper, lines: List[str], num_per_chunk: int) -> Tuple[List[str], bool]:
full_chunk = False
# add to list of lines
for line in text_stream:
line = _read_line(line)
if line is not None:
lines.append(line)
if len(lines) == num_per_chunk:
full_chunk = True
break
return lines, full_chunk
def read_data_folder(data_folder: str, num_per_chunk: int = 20000, randomize_files: bool = True) -> Iterator[List[str]]:
# get files in folder
file_paths = glob.glob(data_folder + "*")
# randomize file order
if randomize_files:
random.shuffle(file_paths)
cur_file = 0 # index of the file being read
lines = [] # stores the documents
text_stream = None # opened steam
full_chunk = False # flag indicating of the chunk is full
# while there are still files to read
while True:
# if the chunk is not full, try to open a new file
if not full_chunk:
# close previous file which we completely read
if text_stream:
text_stream.close()
# if there are no more files exit, but return if there is anything to return
if cur_file == len(file_paths):
if lines:
yield lines
break
# open file
compressed_file = open(file_paths[cur_file], 'rb')
# create decompressor
decompressor = zstd.ZstdDecompressor()
# Create a decompression byte stream
stream_reader = decompressor.stream_reader(compressed_file)
# Create a text stream
text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8')
cur_file += 1
# try to read one full chunk or until the end of the current file
lines, full_chunk = _read_file_chunk(text_stream, lines, num_per_chunk)
# only return when chunks are full. Required since a file could terminate while the chunk is incomplete
if full_chunk:
yield lines
lines = []
# some tests
if __name__ == "__main__":
from paths import dir_val, dir_train
for i, chunk in enumerate(read_data_folder(dir_val)):
print(i, len(chunk))
print(chunk[0][0:10])
for i, chunk in enumerate(read_data_folder(dir_train)):
print(i, len(chunk))
print(chunk[0][0:10])