Skip to content
This repository was archived by the owner on May 1, 2024. It is now read-only.

Commit 2ab865d

Browse files
user location spark initial task
1 parent 0b8b937 commit 2ab865d

File tree

2 files changed

+105
-0
lines changed

2 files changed

+105
-0
lines changed

edx/analytics/tasks/insights/location_per_course.py

+93
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from edx.analytics.tasks.common.pathutil import (
1616
EventLogSelectionDownstreamMixin, EventLogSelectionMixin, PathSelectionByDateIntervalTask
1717
)
18+
from edx.analytics.tasks.common.spark import EventLogSelectionMixinSpark, SparkJobTask
1819
from edx.analytics.tasks.insights.database_imports import ImportStudentCourseEnrollmentTask
1920
from edx.analytics.tasks.util import eventlog
2021
from edx.analytics.tasks.util.decorators import workflow_entry_point
@@ -163,6 +164,98 @@ def run(self):
163164
target.open("w").close() # touch the file
164165

165166

167+
class LastDailyIpAddressOfUserTaskSpark(EventLogSelectionMixinSpark, WarehouseMixin, SparkJobTask):
168+
"""Spark alternate of LastDailyIpAddressOfUserTask"""
169+
170+
output_parent_dir = 'last_ip_of_user_id'
171+
marker = luigi.Parameter(
172+
config_path={'section': 'map-reduce', 'name': 'marker'},
173+
significant=False,
174+
description='A URL location to a directory where a marker file will be written on task completion.',
175+
)
176+
177+
def output_dir(self):
178+
"""
179+
Output directory for spark task
180+
"""
181+
return get_target_from_url(
182+
url_path_join(
183+
self.warehouse_path,
184+
self.output_parent_dir
185+
)
186+
)
187+
188+
def output(self):
189+
"""
190+
Marker output path
191+
"""
192+
marker_url = url_path_join(self.marker, str(hash(self)))
193+
return get_target_from_url(marker_url, marker=True)
194+
195+
def output_paths(self):
196+
"""
197+
Output partition paths
198+
"""
199+
return map(
200+
lambda date: get_target_from_url(
201+
url_path_join(
202+
self.hive_partition_path(self.output_parent_dir, date.isoformat())
203+
)
204+
),
205+
self.interval
206+
)
207+
208+
def on_success(self): # pragma: no cover
209+
# rename files on success to the format used in parent task
210+
self.output().touch_marker()
211+
212+
def run(self):
213+
self.remove_output_on_overwrite()
214+
removed_partitions = [target.remove() for target in self.output_paths() if target.exists()]
215+
super(LastDailyIpAddressOfUserTaskSpark, self).run()
216+
217+
def spark_job(self, *args):
218+
from edx.analytics.tasks.util.spark_util import get_event_predicate_labels, get_course_id, get_event_time_string
219+
from pyspark.sql.functions import udf, struct, split, explode, lit, col
220+
from pyspark.sql.window import Window
221+
from pyspark.sql.types import ArrayType, StringType
222+
df = self.get_event_log_dataframe(self._spark)
223+
# register udfs
224+
get_event_time = udf(get_event_time_string, StringType())
225+
get_courseid = udf(get_course_id, StringType())
226+
df = df.filter(
227+
(df['event_source'] != 'task') &
228+
~ df['event_type'].startswith('edx.course.enrollment.') &
229+
(df['context.user_id'] != '')
230+
)
231+
df = df.withColumn('course_id', get_courseid(df['context'])) \
232+
.withColumn('timestamp', get_event_time(df['time']))
233+
df = df.filter("course_id != '' or timestamp != '' or ip != ''")
234+
df.createOrReplaceTempView('location')
235+
query = """
236+
SELECT
237+
timestamp,
238+
ip,
239+
user_id,
240+
course_id,
241+
dt
242+
FROM (
243+
SELECT
244+
event_date as dt,
245+
context.user_id as user_id,
246+
course_id,
247+
timestamp,
248+
ip,
249+
ROW_NUMBER() over ( PARTITION BY event_date, context.user_id, course_id ORDER BY timestamp desc) as rank
250+
FROM location
251+
) user_location
252+
WHERE rank <= 1
253+
ORDER BY user_id
254+
"""
255+
result = self._spark.sql(query)
256+
result.coalesce(1).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t')
257+
258+
166259
class LastCountryOfUserDownstreamMixin(
167260
WarehouseMixin,
168261
OverwriteOutputMixin,

edx/analytics/tasks/util/spark_util.py

+12
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,18 @@ def get_key_value_from_event(event, key, default_value=None):
4141
return default_value
4242

4343

44+
def get_event_time_string(event_time):
45+
"""Returns the time of the event as an ISO8601 formatted string."""
46+
try:
47+
# Get entry, and strip off time zone information. Keep microseconds, if any.
48+
timestamp = event_time.split('+')[0]
49+
if '.' not in timestamp:
50+
timestamp = '{datetime}.000000'.format(datetime=timestamp)
51+
return timestamp
52+
except Exception: # pylint: disable=broad-except
53+
return None
54+
55+
4456
def get_course_id(event_context, from_url=False):
4557
"""
4658
Gets course_id from event's data.

0 commit comments

Comments
 (0)