-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathnlp_utils.py
117 lines (97 loc) · 3.81 KB
/
nlp_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import io
import collections
import numpy
import chainer
from chainer.backends import cuda
def split_text(text, char_based=False):
if char_based:
return list(text)
else:
return text.split()
def normalize_text(text):
return text.strip().lower()
def make_vocab(dataset, max_vocab_size=200000, min_freq=2):
counts = collections.defaultdict(int)
for tokens, _ in dataset:
for token in tokens:
counts[token] += 1
special = ['<eos>', '<unk>']
vocab = {x: i for i, x in enumerate(special)}
for w, c in sorted(counts.items(), key=lambda x: (-x[1], x[0])):
if len(vocab) >= max_vocab_size or c < min_freq:
break
vocab[w] = len(vocab)
return vocab
def read_vocab_list(path, max_vocab_size=200000):
vocab = {'<eos>': 0, '<unk>': 1}
with io.open(path, encoding='utf-8', errors='ignore') as f:
for l in f:
w = l.strip()
if w not in vocab and w:
vocab[w] = len(vocab)
if len(vocab) >= max_vocab_size:
break
return vocab
def make_array(tokens, vocab, add_eos=True):
unk_id = vocab['<unk>']
eos_id = vocab['<eos>']
ids = [vocab.get(token, unk_id) for token in tokens]
if add_eos:
ids.append(eos_id)
return numpy.array(ids, numpy.int32)
def transform_to_array(dataset, vocab, with_label=True):
if with_label:
return [(make_array(tokens, vocab), numpy.array([cls], numpy.int32))
for tokens, cls in dataset]
else:
return [make_array(tokens, vocab)
for tokens in dataset]
def convert_seq(batch, device=None, with_label=True):
def to_device_batch(batch):
if device is None:
return batch
elif device < 0:
return [chainer.dataset.to_device(device, x) for x in batch]
else:
xp = cuda.cupy.get_array_module(*batch)
concat = xp.concatenate(batch, axis=0)
sections = numpy.cumsum([len(x)
for x in batch[:-1]], dtype=numpy.int32)
concat_dev = chainer.dataset.to_device(device, concat)
batch_dev = cuda.cupy.split(concat_dev, sections)
return batch_dev
if with_label:
return {'xs': to_device_batch([x for x, _ in batch]),
'ys': to_device_batch([y for _, y in batch])}
else:
return to_device_batch([x for x in batch])
def transform_snli_to_array(dataset, vocab, with_label=True):
if with_label:
return [(make_array(premise, vocab),
make_array(hypothesis, vocab),
numpy.array([cls], numpy.int32))
for premise, hypothesis, cls in dataset]
else:
return [(make_array(premise, vocab), make_array(hypothesis, vocab))
for premise, hypothesis in dataset]
def convert_snli_seq(batch, device=None, with_label=True):
def to_device_batch(batch):
if device is None:
return batch
elif device < 0:
return [chainer.dataset.to_device(device, x) for x in batch]
else:
xp = cuda.cupy.get_array_module(*batch)
concat = xp.concatenate(batch, axis=0)
sections = numpy.cumsum([len(x)
for x in batch[:-1]], dtype=numpy.int32)
concat_dev = chainer.dataset.to_device(device, concat)
batch_dev = cuda.cupy.split(concat_dev, sections)
return batch_dev
if with_label:
return {'xs': (to_device_batch([x0 for x0, _, _ in batch]),
to_device_batch([x1 for _, x1, _ in batch])),
'ys': to_device_batch([y for _, _, y in batch])}
else:
return (to_device_batch([x0 for x0, _ in batch]),
to_device_batch([x1 for _, x1 in batch]))