-
Notifications
You must be signed in to change notification settings - Fork 0
/
import.py
87 lines (64 loc) · 2.28 KB
/
import.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from os import listdir
import json
import html
BULK_SIZE = 1000
INDEX = 'authors'
def get_actions_from_line(line: dict) -> list:
if 'name' not in line:
return list()
author_id = line['id']
papers = list()
if 'papers' in line:
for paper in line['papers']:
new_paper = {
'title': html.unescape(paper['title']),
'fields_of_study': (
'\n'.join(
[field for field in paper['fieldsOfStudy']]
) if 'fieldsOfStudy' in paper else None),
'coauthors': (
'\n'.join(
[html.unescape(coauthor) for coauthor
in paper['coauthors']]
) if 'coauthors' in paper else None)
}
papers.append(new_paper)
author = {
'name': html.unescape(line['name']),
'other_names': None,
'papers': papers or None
}
actions = [{
'_index': INDEX,
'_id': author_id,
'_source': author
}]
return actions
def main():
es = Elasticsearch()
files = listdir('./data/out')
for i, file in enumerate(files):
print('Processing file {} ({}/{})'.format(file, i + 1, len(files)))
with open('./data/out/{}'.format(file)) as f:
line_count = 0
bulk_actions = list()
# process line, when the bulk contains more than 3000 authors add
# those authors to the index
for line in f:
line = json.loads(line)
bulk_actions += get_actions_from_line(line)
line_count += 1
if len(bulk_actions) >= BULK_SIZE:
print('Bulk after line {}, bulk size: {}'.format(
line_count, len(bulk_actions)))
helpers.bulk(es, bulk_actions)
bulk_actions.clear()
# add last bulk which is probably smaller than 3000
if bulk_actions:
print('Bulk after line {}, bulk size: {}'.format(
line_count, len(bulk_actions)))
helpers.bulk(es, bulk_actions)
if __name__ == '__main__':
main()