-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapache_beam_etl_pipeline.py
57 lines (43 loc) · 1.8 KB
/
apache_beam_etl_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# -*- coding: utf-8 -*-
pip install apache-beam[interactive]
import apache_beam as beam
import re
import apache_beam.runners.interactive.interactive_beam as ib
p1 = beam.Pipeline()
class FormatElements(beam.DoFn):
def process(self,record):
year, producers = record
raw_str_producers = re.sub(r'\s*, and | and \s*|, ', ',', producers)
producers = raw_str_producers.split(',')
rows = []
if (len(producers) > 0):
for idx, producer in enumerate(producers):
rows.append(producer + "," + str(year))
return rows
class CalculateYearsInterval(beam.DoFn):
def process(self,record):
key, value = record
years = []
for idx, year in enumerate(value):
years.append(year[1])
years.sort()
followingWin = int(years[len(years) -1])
previousWin = int(years[len(years) -2])
interval = (followingWin - previousWin)
row = key, previousWin, followingWin, interval
return [row]
load_csv = (
p1
| "Import movie list" >> beam.io.ReadFromText("movielist.csv", skip_header_lines = 1)
| "Split by comma" >> beam.Map(lambda record: record.split(';'))
| "Filter by winners" >> beam.Filter(lambda record: record[4] == "yes")
| "Map producers and year columns" >> beam.Map(lambda record: [record[0], record[3]])
| "Format elements" >> beam.ParDo(FormatElements())
| "Split Producer and Year by comma" >> beam.Map(lambda producers: producers.split(','))
| "Group By producer's key" >> beam.GroupBy(lambda producer: producer[0])
| "Filter Producers with more than 2 awards" >> beam.Filter(lambda record: len(record[1]) > 1)
| "Calculate Years Interval" >> beam.ParDo(CalculateYearsInterval())
# | "Write to Text" >> beam.io.WriteToText('results.txt')
| "Print Results Year" >> beam.Map(print)
)
p1.run()