Skip to content
This repository was archived by the owner on May 1, 2024. It is now read-only.

Commit f65be75

Browse files
convert internal reporting user activity partition task to spark
1 parent 93cef66 commit f65be75

File tree

3 files changed

+113
-4
lines changed

3 files changed

+113
-4
lines changed

edx/analytics/tasks/common/spark.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ def _load_internal_dependency_on_cluster(self, *args):
260260
import certifi
261261
import idna
262262
import requests
263+
import six
263264

264265
dependencies_list = []
265266
# get cluster dependencies from *args
@@ -270,7 +271,7 @@ def _load_internal_dependency_on_cluster(self, *args):
270271
dependencies_list += cluster_dependencies
271272

272273
packages = [edx, luigi, opaque_keys, stevedore, bson, ccx_keys, cjson, boto, filechunkio, ciso8601, chardet,
273-
urllib3, certifi, idna, requests]
274+
urllib3, certifi, idna, requests, six]
274275
self._tmp_dir = tempfile.mkdtemp()
275276
dependencies_list += create_packages_archive(packages, self._tmp_dir)
276277
if len(dependencies_list) > 0:

edx/analytics/tasks/insights/user_activity.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ def spark_job(self, *args):
213213
result = df.select('course_id', 'username', 'event_date', 'label') \
214214
.groupBy('course_id', 'username', 'event_date', 'label').count()
215215
result = result.withColumn('dt', lit(result['event_date'])) # generate extra column for partitioning
216-
result.coalesce(1).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t')
216+
result.coalesce(2).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t')
217217

218218

219219
class UserActivityDownstreamMixin(WarehouseMixin, EventLogSelectionDownstreamMixin, MapReduceJobTaskMixin):
@@ -388,7 +388,7 @@ def spark_job(self, *args):
388388
interval_end=self.interval.date_b.isoformat(),
389389
)
390390
result = self._sql_context.sql(query)
391-
result.coalesce(1).write.csv(self.output().path, mode='overwrite', sep='\t')
391+
result.coalesce(2).write.csv(self.output().path, mode='overwrite', sep='\t')
392392
# with dataframe
393393
# from pyspark.sql.functions import concat, lit, countDistinct
394394
# user_activity_df = user_activity_df.filter(

edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py

+109-1
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@
33
44
On the roadmap is to write a task that runs validation queries on the aggregated Hive data pre-load.
55
"""
6+
import datetime
67
import logging
78

89
import luigi.date_interval
910

11+
from edx.analytics.tasks.common.spark import SparkJobTask
1012
from edx.analytics.tasks.common.vertica_load import VerticaCopyTask, VerticaCopyTaskMixin
1113
from edx.analytics.tasks.insights.database_imports import ImportAuthUserTask
12-
from edx.analytics.tasks.insights.user_activity import InsertToMysqlCourseActivityTask, UserActivityTableTask
14+
from edx.analytics.tasks.insights.user_activity import InsertToMysqlCourseActivityTask, UserActivityTableTask, \
15+
UserActivityTaskSpark
1316
from edx.analytics.tasks.util.hive import (
1417
BareHiveTableTask, HivePartition, HivePartitionTask, WarehouseMixin, hive_database_name
1518
)
@@ -42,6 +45,111 @@ def columns(self):
4245
]
4346

4447

48+
class InternalReportingUserActivityPartitionTaskSpark(WarehouseMixin, SparkJobTask):
49+
"""Spark alternative of InternalReportingUserActivityPartitionTask"""
50+
51+
date = luigi.DateParameter()
52+
overwrite_n_days = luigi.IntParameter(
53+
config_path={'section': 'user-activity', 'name': 'overwrite_n_days'},
54+
significant=False,
55+
)
56+
57+
def run(self):
58+
self.remove_output_on_overwrite()
59+
super(InternalReportingUserActivityPartitionTaskSpark, self).run()
60+
61+
def requires(self):
62+
required_tasks = [
63+
ImportAuthUserTask(overwrite=False, destination=self.warehouse_path)
64+
]
65+
if self.overwrite_n_days > 0:
66+
overwrite_from_date = self.date - datetime.timedelta(days=self.overwrite_n_days)
67+
overwrite_interval = luigi.date_interval.Custom(overwrite_from_date, self.date)
68+
required_tasks.append(
69+
UserActivityTaskSpark(
70+
interval=overwrite_interval,
71+
warehouse_path=self.warehouse_path,
72+
output_root=self._get_user_activity_hive_table_path(),
73+
overwrite=True,
74+
)
75+
)
76+
yield required_tasks
77+
78+
def _get_auth_user_hive_table_path(self):
79+
import_date = datetime.datetime.utcnow().date() # we only need to join import date's data with user activity
80+
return url_path_join(
81+
self.warehouse_path,
82+
'auth_user',
83+
'dt={}'.format(import_date.isoformat())
84+
)
85+
86+
def _get_auth_user_table_schema(self):
87+
from pyspark.sql.types import StructType, StringType
88+
schema = StructType().add("id", StringType(), True) \
89+
.add("username", StringType(), True) \
90+
.add("last_login", StringType(), True) \
91+
.add("date_joined", StringType(), True) \
92+
.add("is_active", StringType(), True) \
93+
.add("is_superuser", StringType(), True) \
94+
.add("is_staff", StringType(), True) \
95+
.add("email", StringType(), True) \
96+
.add("dt", StringType(), True)
97+
return schema
98+
99+
def _get_user_activity_hive_table_path(self, *args):
100+
return url_path_join(
101+
self.warehouse_path,
102+
'user_activity'
103+
)
104+
105+
def _get_user_activity_table_schema(self):
106+
from pyspark.sql.types import StructType, StringType
107+
schema = StructType().add("course_id", StringType(), True) \
108+
.add("username", StringType(), True) \
109+
.add("date", StringType(), True) \
110+
.add("category", StringType(), True) \
111+
.add("count", StringType(), True) \
112+
.add("dt", StringType(), True)
113+
return schema
114+
115+
def spark_job(self, *args):
116+
auth_user_df = self._spark.read.csv(
117+
self._get_auth_user_hive_table_path(),
118+
schema=self._get_auth_user_table_schema(),
119+
sep='\x01',
120+
nullValue='\N'
121+
)
122+
user_activity_df = self._spark.read.csv(
123+
self._get_user_activity_hive_table_path(*args),
124+
sep='\t',
125+
schema=self._get_user_activity_table_schema()
126+
)
127+
self._sql_context.registerDataFrameAsTable(auth_user_df, 'auth_user')
128+
self._sql_context.registerDataFrameAsTable(user_activity_df, 'user_activity')
129+
query = """
130+
SELECT
131+
au.id,
132+
ua.course_id,
133+
ua.date,
134+
ua.category,
135+
ua.count
136+
FROM auth_user au
137+
JOIN user_activity ua
138+
ON au.username = ua.username
139+
"""
140+
result = self._sql_context.sql(query)
141+
result.coalesce(2).write.csv(self.output().path, mode='overwrite', sep='\t')
142+
143+
def output(self):
144+
return get_target_from_url(
145+
url_path_join(
146+
self.warehouse_path,
147+
'internal_reporting_user_activity',
148+
'dt={}'.format(self.date.isoformat())
149+
)
150+
)
151+
152+
45153
class InternalReportingUserActivityPartitionTask(HivePartitionTask):
46154
"""Aggregate the user activity table in Hive."""
47155

0 commit comments

Comments
 (0)