forked from b-cube/semantics_pipeline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bagofwords_workflow.py
67 lines (51 loc) · 1.67 KB
/
bagofwords_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import luigi
import glob
import os
from tasks.text_tasks import BagOfWordsFromParsedTask
from tasks.text_tasks import BagOfWordsFromXMLTask
from tasks.task_helpers import parse_yaml
from tasks.task_helpers import run_init
class BowWorkflow(luigi.Task):
doc_dir = luigi.Parameter()
yaml_file = luigi.Parameter()
start_index = luigi.Parameter(default=0)
end_index = luigi.Parameter(default=1000)
def requires(self):
return [
BagOfWordsFromParsedTask(
input_file=f,
yaml_file=self.yaml_file
) for f in self._iterator()
]
def output(self):
return luigi.LocalTarget('log.txt')
def run(self):
print 'running'
def _iterator(self):
for f in glob.glob(os.path.join(self.doc_dir, '*.json'))[self.start_index:self.end_index]:
yield f
def _configure(self):
config = parse_yaml(self.yaml_file)
run_init(config)
class BowFromXmlWorkflow(luigi.Task):
doc_dir = luigi.Parameter()
yaml_file = luigi.Parameter()
start_index = luigi.Parameter(default=0)
end_index = luigi.Parameter(default=1000)
def requires(self):
return [
BagOfWordsFromXMLTask(
input_file=f,
yaml_file=self.yaml_file
) for f in self._iterator()
]
def output(self):
return luigi.LocalTarget('log.txt')
def run(self):
print 'running'
def _iterator(self):
for f in glob.glob(os.path.join(self.doc_dir, '*.json'))[self.start_index:self.end_index]:
yield f
def _configure(self):
config = parse_yaml(self.yaml_file)
run_init(config)