This repository was archived by the owner on May 1, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 116
/
Copy pathspark.py
324 lines (275 loc) · 12.4 KB
/
spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
import ast
import json
import os
import tempfile
import zipfile
import luigi.configuration
from luigi.contrib.spark import PySparkTask
from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, PathSelectionByDateIntervalTask
from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin
_file_path_to_package_meta_path = {}
def get_package_metadata_paths():
"""
List of package metadata to be loaded on EMR cluster
"""
from distlib.database import DistributionPath
if len(_file_path_to_package_meta_path) > 0:
return _file_path_to_package_meta_path
dist_path = DistributionPath(include_egg=True)
for distribution in dist_path.get_distributions():
metadata_path = distribution.path
for installed_file_path, _hash, _size in distribution.list_installed_files():
absolute_installed_file_path = installed_file_path
if not os.path.isabs(installed_file_path):
absolute_installed_file_path = os.path.join(os.path.dirname(metadata_path), installed_file_path)
normalized_file_path = os.path.realpath(absolute_installed_file_path)
_file_path_to_package_meta_path[normalized_file_path] = metadata_path
return _file_path_to_package_meta_path
def dereference(f):
if os.path.islink(f):
# by joining with the dirname we are certain to get the absolute path
return dereference(os.path.join(os.path.dirname(f), os.readlink(f)))
else:
return f
def create_packages_archive(packages, archive_dir_path):
"""
Create a zip archive for all the packages listed in packages and returns the list of zip file location.
"""
import zipfile
archives_list = []
package_metadata_paths = get_package_metadata_paths()
metadata_to_add = dict()
package_zip_path = os.path.join(archive_dir_path, 'packages.zip')
package_zip = zipfile.ZipFile(package_zip_path, "w", compression=zipfile.ZIP_DEFLATED)
archives_list.append(package_zip_path)
def add(src, dst, package_name):
# Ensure any entry points and other egg-info metadata is also transmitted along with
# this file. If it is associated with any egg-info directories, ship them too.
metadata_path = package_metadata_paths.get(os.path.realpath(src))
if metadata_path:
metadata_to_add[package_name] = metadata_path
package_zip.write(src, dst)
def add_files_for_package(sub_package_path, root_package_path, root_package_name, package_name):
for root, dirs, files in os.walk(sub_package_path):
if '.svn' in dirs:
dirs.remove('.svn')
for f in files:
if not f.endswith(".pyc") and not f.startswith("."):
add(dereference(root + "/" + f),
root.replace(root_package_path, root_package_name) + "/" + f,
package_name)
for package in packages:
# Archive each package
if not getattr(package, "__path__", None) and '.' in package.__name__:
package = __import__(package.__name__.rpartition('.')[0], None, None, 'non_empty')
n = package.__name__.replace(".", "/")
# Check length of path, because the attribute may exist and be an empty list.
if len(getattr(package, "__path__", [])) > 0:
# TODO: (BUG) picking only the first path does not
# properly deal with namespaced packages in different
# directories
p = package.__path__[0]
if p.endswith('.egg') and os.path.isfile(p):
raise 'Not going to archive egg files!!!'
# Add the entire egg file
# p = p[:p.find('.egg') + 4]
# add(dereference(p), os.path.basename(p))
else:
# include __init__ files from parent projects
root = []
for parent in package.__name__.split('.')[0:-1]:
root.append(parent)
module_name = '.'.join(root)
directory = '/'.join(root)
add(dereference(__import__(module_name, None, None, 'non_empty').__path__[0] + "/__init__.py"),
directory + "/__init__.py",
package.__name__)
add_files_for_package(p, p, n, package.__name__)
else:
f = package.__file__
if f.endswith("pyc"):
f = f[:-3] + "py"
if n.find(".") == -1:
add(dereference(f), os.path.basename(f), package.__name__)
else:
add(dereference(f), n + ".py", package.__name__)
# include metadata in the same zip file
metadata_path = metadata_to_add.get(package.__name__)
if metadata_path is not None:
add_files_for_package(metadata_path, metadata_path, os.path.basename(metadata_path), package.__name__)
return archives_list
class EventLogSelectionMixinSpark(EventLogSelectionDownstreamMixin):
"""
Extract events corresponding to a specified time interval.
"""
path_targets = None
def __init__(self, *args, **kwargs):
"""
Call path selection task to get list of log files matching the pattern
"""
super(EventLogSelectionDownstreamMixin, self).__init__(*args, **kwargs)
self.lower_bound_date_string = self.interval.date_a.strftime('%Y-%m-%d') # pylint: disable=no-member
self.upper_bound_date_string = self.interval.date_b.strftime('%Y-%m-%d') # pylint: disable=no-member
def get_log_schema(self):
"""
Get spark based schema for processing event logs
:return: Spark schema
"""
from pyspark.sql.types import StructType, StringType
event_schema = StructType().add("POST", StringType(), True).add("GET", StringType(), True)
module_schema = StructType().add("display_name", StringType(), True) \
.add("original_usage_key", StringType(), True) \
.add("original_usage_version", StringType(), True) \
.add("usage_key", StringType(), True)
context_schema = StructType().add("command", StringType(), True) \
.add("course_id", StringType(), True) \
.add("module", module_schema) \
.add("org_id", StringType(), True) \
.add("path", StringType(), True) \
.add("user_id", StringType(), True)
event_log_schema = StructType() \
.add("username", StringType(), True) \
.add("event_type", StringType(), True) \
.add("ip", StringType(), True) \
.add("agent", StringType(), True) \
.add("host", StringType(), True) \
.add("referer", StringType(), True) \
.add("accept_language", StringType(), True) \
.add("event", event_schema) \
.add("event_source", StringType(), True) \
.add("context", context_schema) \
.add("time", StringType(), True) \
.add("name", StringType(), True) \
.add("page", StringType(), True) \
.add("session", StringType(), True)
return event_log_schema
def get_event_log_dataframe(self, spark, *args, **kwargs):
from pyspark.sql.functions import to_date, udf, struct, date_format
path_targets = PathSelectionByDateIntervalTask(
source=self.source,
interval=self.interval,
pattern=self.pattern,
date_pattern=self.date_pattern,
).output()
self.path_targets = [task.path for task in path_targets]
dataframe = spark.read.format('json').load(self.path_targets, schema=self.get_log_schema())
dataframe = dataframe.filter(dataframe['time'].isNotNull()) \
.withColumn('event_date', date_format(to_date(dataframe['time']), 'yyyy-MM-dd'))
dataframe = dataframe.filter(
(dataframe['event_date'] >= self.lower_bound_date_string) &
(dataframe['event_date'] < self.upper_bound_date_string)
)
return dataframe
class SparkJobTask(OverwriteOutputMixin, PySparkTask):
"""
Wrapper for spark task
"""
_spark = None
_spark_context = None
_sql_context = None
_hive_context = None
_tmp_dir = None
driver_memory = luigi.Parameter(
config_path={'section': 'spark', 'name': 'driver-memory'},
description='Memory for spark driver',
significant=False,
)
executor_memory = luigi.Parameter(
config_path={'section': 'spark', 'name': 'executor-memory'},
description='Memory for each executor',
significant=False,
)
executor_cores = luigi.Parameter(
config_path={'section': 'spark', 'name': 'executor-cores'},
description='No. of cores for each executor',
significant=False,
)
always_log_stderr = False # log stderr if spark fails, True for verbose log
def init_spark(self, sc):
"""
Initialize spark, sql and hive context
:param sc: Spark context
"""
from pyspark.sql import SparkSession, SQLContext, HiveContext
self._sql_context = SQLContext(sc)
self._spark_context = sc
self._spark = SparkSession.builder.getOrCreate()
self._hive_context = HiveContext(sc)
def spark_job(self):
"""
Spark code for the job
"""
raise NotImplementedError
def get_config_from_args(self, key, *args, **kwargs):
"""
Returns `value` of `key` after parsing string argument
"""
default_value = kwargs.get('default_value', None)
str_arg = args[0]
config_dict = ast.literal_eval(str_arg)
value = config_dict.get(key, default_value)
return value
def _load_internal_dependency_on_cluster(self, *args):
"""
creates a zip of package and loads it on spark worker nodes
Loading via luigi configuration does not work as it creates a tar file whereas spark does not load tar files
"""
# import packages to be loaded on cluster
import edx
import luigi
import opaque_keys
import stevedore
import bson
import ccx_keys
import cjson
import boto
import filechunkio
import ciso8601
import chardet
import urllib3
import certifi
import idna
import requests
import six
dependencies_list = []
# get cluster dependencies from *args
cluster_dependencies = self.get_config_from_args('cluster_dependencies', *args, default_value=None)
if cluster_dependencies is not None:
cluster_dependencies = json.loads(cluster_dependencies)
if isinstance(cluster_dependencies, list):
dependencies_list += cluster_dependencies
packages = [edx, luigi, opaque_keys, stevedore, bson, ccx_keys, cjson, boto, filechunkio, ciso8601, chardet,
urllib3, certifi, idna, requests, six]
self._tmp_dir = tempfile.mkdtemp()
dependencies_list += create_packages_archive(packages, self._tmp_dir)
if len(dependencies_list) > 0:
for file in dependencies_list:
self._spark_context.addPyFile(file)
def get_luigi_configuration(self):
"""
Return luigi configuration as dict for spark task
luigi configuration cannot be retrieved directly from luigi's get_config method inside spark task
"""
return None
def app_options(self):
"""
List of options that needs to be passed to spark task
"""
options = {}
task_config = self.get_luigi_configuration() # load task dependencies first if any
if isinstance(task_config, dict):
options = task_config
configuration = luigi.configuration.get_config()
cluster_dependencies = configuration.get('spark', 'edx_egg_files', None) # spark worker nodes dependency
if cluster_dependencies is not None:
options['cluster_dependencies'] = cluster_dependencies
return [options]
def _clean(self):
"""Do any cleanup after job here"""
import shutil
shutil.rmtree(self._tmp_dir)
def main(self, sc, *args):
self.init_spark(sc) # initialize spark contexts
self._load_internal_dependency_on_cluster(*args) # load packages on EMR cluster for spark worker nodes
self.spark_job(*args) # execute spark job
self._clean() # cleanup after spark job