From 5e955a6f09e68a38b978d70b51779222ae49ff68 Mon Sep 17 00:00:00 2001 From: rao-abdul-mannan Date: Tue, 13 Mar 2018 18:50:31 +0500 Subject: [PATCH] Use spark task in production jobs --- config/devstack.cfg | 5 + edx/analytics/tasks/common/spark.py | 324 ++++++++++++++++++ .../insights/tests/test_user_activity.py | 51 ++- edx/analytics/tasks/insights/user_activity.py | 88 ++++- edx/analytics/tasks/util/constants.py | 7 + edx/analytics/tasks/util/spark_util.py | 78 +++++ 6 files changed, 516 insertions(+), 37 deletions(-) create mode 100644 edx/analytics/tasks/common/spark.py create mode 100644 edx/analytics/tasks/util/constants.py create mode 100644 edx/analytics/tasks/util/spark_util.py diff --git a/config/devstack.cfg b/config/devstack.cfg index 264373fa23..32d84ed1ee 100644 --- a/config/devstack.cfg +++ b/config/devstack.cfg @@ -131,3 +131,8 @@ api_root_url = http://localhost:8000/api/courses/v1/courses/ [course-blocks] api_root_url = http://localhost:8000/api/courses/v1/blocks/ + +[spark] +driver-memory=3g +executor-memory=3g +executor-cores=1 \ No newline at end of file diff --git a/edx/analytics/tasks/common/spark.py b/edx/analytics/tasks/common/spark.py new file mode 100644 index 0000000000..b86dd9d15d --- /dev/null +++ b/edx/analytics/tasks/common/spark.py @@ -0,0 +1,324 @@ +import ast +import json +import os +import tempfile +import zipfile + +import luigi.configuration +from luigi.contrib.spark import PySparkTask + +from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, PathSelectionByDateIntervalTask +from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin + +_file_path_to_package_meta_path = {} + + +def get_package_metadata_paths(): + """ + List of package metadata to be loaded on EMR cluster + """ + from distlib.database import DistributionPath + + if len(_file_path_to_package_meta_path) > 0: + return _file_path_to_package_meta_path + + dist_path = DistributionPath(include_egg=True) + for distribution in dist_path.get_distributions(): + metadata_path = distribution.path + for installed_file_path, _hash, _size in distribution.list_installed_files(): + absolute_installed_file_path = installed_file_path + if not os.path.isabs(installed_file_path): + absolute_installed_file_path = os.path.join(os.path.dirname(metadata_path), installed_file_path) + normalized_file_path = os.path.realpath(absolute_installed_file_path) + _file_path_to_package_meta_path[normalized_file_path] = metadata_path + + return _file_path_to_package_meta_path + + +def dereference(f): + if os.path.islink(f): + # by joining with the dirname we are certain to get the absolute path + return dereference(os.path.join(os.path.dirname(f), os.readlink(f))) + else: + return f + + +def create_packages_archive(packages, archive_dir_path): + """ + Create a zip archive for all the packages listed in packages and returns the list of zip file location. + """ + import zipfile + archives_list = [] + package_metadata_paths = get_package_metadata_paths() + metadata_to_add = dict() + + package_zip_path = os.path.join(archive_dir_path, 'packages.zip') + package_zip = zipfile.ZipFile(package_zip_path, "w", compression=zipfile.ZIP_DEFLATED) + archives_list.append(package_zip_path) + + def add(src, dst, package_name): + # Ensure any entry points and other egg-info metadata is also transmitted along with + # this file. If it is associated with any egg-info directories, ship them too. + metadata_path = package_metadata_paths.get(os.path.realpath(src)) + if metadata_path: + metadata_to_add[package_name] = metadata_path + + package_zip.write(src, dst) + + def add_files_for_package(sub_package_path, root_package_path, root_package_name, package_name): + for root, dirs, files in os.walk(sub_package_path): + if '.svn' in dirs: + dirs.remove('.svn') + for f in files: + if not f.endswith(".pyc") and not f.startswith("."): + add(dereference(root + "/" + f), + root.replace(root_package_path, root_package_name) + "/" + f, + package_name) + + for package in packages: + # Archive each package + if not getattr(package, "__path__", None) and '.' in package.__name__: + package = __import__(package.__name__.rpartition('.')[0], None, None, 'non_empty') + + n = package.__name__.replace(".", "/") + + # Check length of path, because the attribute may exist and be an empty list. + if len(getattr(package, "__path__", [])) > 0: + # TODO: (BUG) picking only the first path does not + # properly deal with namespaced packages in different + # directories + p = package.__path__[0] + + if p.endswith('.egg') and os.path.isfile(p): + raise 'Not going to archive egg files!!!' + # Add the entire egg file + # p = p[:p.find('.egg') + 4] + # add(dereference(p), os.path.basename(p)) + + else: + # include __init__ files from parent projects + root = [] + for parent in package.__name__.split('.')[0:-1]: + root.append(parent) + module_name = '.'.join(root) + directory = '/'.join(root) + + add(dereference(__import__(module_name, None, None, 'non_empty').__path__[0] + "/__init__.py"), + directory + "/__init__.py", + package.__name__) + + add_files_for_package(p, p, n, package.__name__) + + else: + f = package.__file__ + if f.endswith("pyc"): + f = f[:-3] + "py" + if n.find(".") == -1: + add(dereference(f), os.path.basename(f), package.__name__) + else: + add(dereference(f), n + ".py", package.__name__) + + # include metadata in the same zip file + metadata_path = metadata_to_add.get(package.__name__) + if metadata_path is not None: + add_files_for_package(metadata_path, metadata_path, os.path.basename(metadata_path), package.__name__) + + return archives_list + + +class EventLogSelectionMixinSpark(EventLogSelectionDownstreamMixin): + """ + Extract events corresponding to a specified time interval. + """ + path_targets = None + + def __init__(self, *args, **kwargs): + """ + Call path selection task to get list of log files matching the pattern + """ + super(EventLogSelectionDownstreamMixin, self).__init__(*args, **kwargs) + self.lower_bound_date_string = self.interval.date_a.strftime('%Y-%m-%d') # pylint: disable=no-member + self.upper_bound_date_string = self.interval.date_b.strftime('%Y-%m-%d') # pylint: disable=no-member + + def get_log_schema(self): + """ + Get spark based schema for processing event logs + :return: Spark schema + """ + from pyspark.sql.types import StructType, StringType + event_schema = StructType().add("POST", StringType(), True).add("GET", StringType(), True) + module_schema = StructType().add("display_name", StringType(), True) \ + .add("original_usage_key", StringType(), True) \ + .add("original_usage_version", StringType(), True) \ + .add("usage_key", StringType(), True) + context_schema = StructType().add("command", StringType(), True) \ + .add("course_id", StringType(), True) \ + .add("module", module_schema) \ + .add("org_id", StringType(), True) \ + .add("path", StringType(), True) \ + .add("user_id", StringType(), True) + + event_log_schema = StructType() \ + .add("username", StringType(), True) \ + .add("event_type", StringType(), True) \ + .add("ip", StringType(), True) \ + .add("agent", StringType(), True) \ + .add("host", StringType(), True) \ + .add("referer", StringType(), True) \ + .add("accept_language", StringType(), True) \ + .add("event", event_schema) \ + .add("event_source", StringType(), True) \ + .add("context", context_schema) \ + .add("time", StringType(), True) \ + .add("name", StringType(), True) \ + .add("page", StringType(), True) \ + .add("session", StringType(), True) + + return event_log_schema + + def get_event_log_dataframe(self, spark, *args, **kwargs): + from pyspark.sql.functions import to_date, udf, struct, date_format + path_targets = PathSelectionByDateIntervalTask( + source=self.source, + interval=self.interval, + pattern=self.pattern, + date_pattern=self.date_pattern, + ).output() + self.path_targets = [task.path for task in path_targets] + dataframe = spark.read.format('json').load(self.path_targets, schema=self.get_log_schema()) + dataframe = dataframe.filter(dataframe['time'].isNotNull()) \ + .withColumn('event_date', date_format(to_date(dataframe['time']), 'yyyy-MM-dd')) + dataframe = dataframe.filter( + (dataframe['event_date'] >= self.lower_bound_date_string) & + (dataframe['event_date'] < self.upper_bound_date_string) + ) + return dataframe + + +class SparkJobTask(OverwriteOutputMixin, PySparkTask): + """ + Wrapper for spark task + """ + + _spark = None + _spark_context = None + _sql_context = None + _hive_context = None + _tmp_dir = None + + driver_memory = luigi.Parameter( + config_path={'section': 'spark', 'name': 'driver-memory'}, + description='Memory for spark driver', + significant=False, + ) + executor_memory = luigi.Parameter( + config_path={'section': 'spark', 'name': 'executor-memory'}, + description='Memory for each executor', + significant=False, + ) + executor_cores = luigi.Parameter( + config_path={'section': 'spark', 'name': 'executor-cores'}, + description='No. of cores for each executor', + significant=False, + ) + always_log_stderr = False # log stderr if spark fails, True for verbose log + + def init_spark(self, sc): + """ + Initialize spark, sql and hive context + :param sc: Spark context + """ + from pyspark.sql import SparkSession, SQLContext, HiveContext + self._sql_context = SQLContext(sc) + self._spark_context = sc + self._spark = SparkSession.builder.getOrCreate() + self._hive_context = HiveContext(sc) + + def spark_job(self): + """ + Spark code for the job + """ + raise NotImplementedError + + def get_config_from_args(self, key, *args, **kwargs): + """ + Returns `value` of `key` after parsing string argument + """ + default_value = kwargs.get('default_value', None) + str_arg = args[0] + config_dict = ast.literal_eval(str_arg) + value = config_dict.get(key, default_value) + return value + + def _load_internal_dependency_on_cluster(self, *args): + """ + creates a zip of package and loads it on spark worker nodes + Loading via luigi configuration does not work as it creates a tar file whereas spark does not load tar files + """ + + # import packages to be loaded on cluster + import edx + import luigi + import opaque_keys + import stevedore + import bson + import ccx_keys + import cjson + import boto + import filechunkio + import ciso8601 + import chardet + import urllib3 + import certifi + import idna + import requests + import six + + dependencies_list = [] + # get cluster dependencies from *args + cluster_dependencies = self.get_config_from_args('cluster_dependencies', *args, default_value=None) + if cluster_dependencies is not None: + cluster_dependencies = json.loads(cluster_dependencies) + if isinstance(cluster_dependencies, list): + dependencies_list += cluster_dependencies + + packages = [edx, luigi, opaque_keys, stevedore, bson, ccx_keys, cjson, boto, filechunkio, ciso8601, chardet, + urllib3, certifi, idna, requests, six] + self._tmp_dir = tempfile.mkdtemp() + dependencies_list += create_packages_archive(packages, self._tmp_dir) + if len(dependencies_list) > 0: + for file in dependencies_list: + self._spark_context.addPyFile(file) + + def get_luigi_configuration(self): + """ + Return luigi configuration as dict for spark task + luigi configuration cannot be retrieved directly from luigi's get_config method inside spark task + """ + + return None + + def app_options(self): + """ + List of options that needs to be passed to spark task + """ + options = {} + task_config = self.get_luigi_configuration() # load task dependencies first if any + if isinstance(task_config, dict): + options = task_config + configuration = luigi.configuration.get_config() + cluster_dependencies = configuration.get('spark', 'edx_egg_files', None) # spark worker nodes dependency + if cluster_dependencies is not None: + options['cluster_dependencies'] = cluster_dependencies + return [options] + + def _clean(self): + """Do any cleanup after job here""" + import shutil + shutil.rmtree(self._tmp_dir) + + def main(self, sc, *args): + self.init_spark(sc) # initialize spark contexts + self._load_internal_dependency_on_cluster(*args) # load packages on EMR cluster for spark worker nodes + self.spark_job(*args) # execute spark job + self._clean() # cleanup after spark job diff --git a/edx/analytics/tasks/insights/tests/test_user_activity.py b/edx/analytics/tasks/insights/tests/test_user_activity.py index fe431543f7..0afdd23276 100644 --- a/edx/analytics/tasks/insights/tests/test_user_activity.py +++ b/edx/analytics/tasks/insights/tests/test_user_activity.py @@ -6,15 +6,14 @@ import json from unittest import TestCase -from ddt import data, ddt, unpack from luigi import date_interval -from mock import Mock, call +from ddt import data, ddt, unpack from edx.analytics.tasks.common.tests.map_reduce_mixins import MapperTestMixin, ReducerTestMixin -from edx.analytics.tasks.insights.user_activity import ( - ACTIVE_LABEL, PLAY_VIDEO_LABEL, POST_FORUM_LABEL, PROBLEM_LABEL, InsertToMysqlCourseActivityTask, UserActivityTask -) +from edx.analytics.tasks.insights.user_activity import InsertToMysqlCourseActivityTask, UserActivityTask +from edx.analytics.tasks.util.constants import PredicateLabels from edx.analytics.tasks.util.tests.opaque_key_mixins import InitializeLegacyKeysMixin, InitializeOpaqueKeysMixin +from mock import Mock, call @ddt @@ -91,21 +90,21 @@ def test_whitespace_username(self): def test_good_dummy_event(self): line = self.create_event_log_line() event = tuple(self.task.mapper(line)) - expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, ACTIVE_LABEL)),) + expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)),) self.assertEquals(event, expected) def test_play_video_event(self): line = self.create_event_log_line(event_source='browser', event_type='play_video') event = tuple(self.task.mapper(line)) - expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PLAY_VIDEO_LABEL))) + expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.PLAY_VIDEO_LABEL))) self.assertEquals(event, expected) def test_problem_event(self): line = self.create_event_log_line(event_source='server', event_type='problem_check') event = tuple(self.task.mapper(line)) - expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PROBLEM_LABEL))) + expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.PROBLEM_LABEL))) self.assertEquals(event, expected) @data(('edx.forum.thread.created', True), ('edx.forum.response.created', True), ('edx.forum.comment.created', True), @@ -115,11 +114,11 @@ def test_post_forum_event(self, event_type, is_labeled_forum): line = self.create_event_log_line(event_source='server', event_type=event_type) event = tuple(self.task.mapper(line)) if is_labeled_forum: - expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, POST_FORUM_LABEL))) + expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.POST_FORUM_LABEL))) else: # The voted event is not a "discussion activity" and thus does not get the POST_FORUM_LABEL - expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, ACTIVE_LABEL)),) + expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)),) self.assertEquals(event, expected) def test_exclusion_of_events_by_source(self): @@ -154,13 +153,13 @@ def test_multiple(self): outputs.append(output) expected = ( - (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PLAY_VIDEO_LABEL)), - (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PLAY_VIDEO_LABEL)), - ('2013-12-24', (self.encoded_course_id, self.username, '2013-12-24', ACTIVE_LABEL)), - ('2013-12-24', (self.encoded_course_id, self.username, '2013-12-24', PROBLEM_LABEL)), - ('2013-12-16', (self.encoded_course_id, self.username, '2013-12-16', ACTIVE_LABEL)), + (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.PLAY_VIDEO_LABEL)), + (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.PLAY_VIDEO_LABEL)), + ('2013-12-24', (self.encoded_course_id, self.username, '2013-12-24', PredicateLabels.ACTIVE_LABEL)), + ('2013-12-24', (self.encoded_course_id, self.username, '2013-12-24', PredicateLabels.PROBLEM_LABEL)), + ('2013-12-16', (self.encoded_course_id, self.username, '2013-12-16', PredicateLabels.ACTIVE_LABEL)), ) self.assertItemsEqual(outputs, expected) @@ -183,10 +182,10 @@ def setUp(self): def test_multiple(self): values = ( - (self.encoded_course_id, self.username, '2013-12-01', ACTIVE_LABEL), - (self.encoded_course_id, self.username, '2013-12-01', ACTIVE_LABEL), - (self.encoded_course_id, self.username, '2013-12-01', PLAY_VIDEO_LABEL), - (self.encoded_course_id, self.username, '2013-12-01', PLAY_VIDEO_LABEL), + (self.encoded_course_id, self.username, '2013-12-01', PredicateLabels.ACTIVE_LABEL), + (self.encoded_course_id, self.username, '2013-12-01', PredicateLabels.ACTIVE_LABEL), + (self.encoded_course_id, self.username, '2013-12-01', PredicateLabels.PLAY_VIDEO_LABEL), + (self.encoded_course_id, self.username, '2013-12-01', PredicateLabels.PLAY_VIDEO_LABEL), ) mock_output_file = Mock() @@ -194,9 +193,9 @@ def test_multiple(self): self.task.multi_output_reducer('2013-12-01', values, mock_output_file) self.assertEquals(len(mock_output_file.write.mock_calls), 4) - expected_string = '\t'.join((self.encoded_course_id, self.username, '2013-12-01', ACTIVE_LABEL, '2')) + expected_string = '\t'.join((self.encoded_course_id, self.username, '2013-12-01', PredicateLabels.ACTIVE_LABEL, '2')) self.assertIn(call(expected_string), mock_output_file.write.mock_calls) - expected_string = '\t'.join((self.encoded_course_id, self.username, '2013-12-01', PLAY_VIDEO_LABEL, '2')) + expected_string = '\t'.join((self.encoded_course_id, self.username, '2013-12-01', PredicateLabels.PLAY_VIDEO_LABEL, '2')) self.assertIn(call(expected_string), mock_output_file.write.mock_calls) diff --git a/edx/analytics/tasks/insights/user_activity.py b/edx/analytics/tasks/insights/user_activity.py index 67f55eb8cf..e7c28984ae 100644 --- a/edx/analytics/tasks/insights/user_activity.py +++ b/edx/analytics/tasks/insights/user_activity.py @@ -11,7 +11,9 @@ from edx.analytics.tasks.common.mapreduce import MapReduceJobTaskMixin, MultiOutputMapReduceJobTask from edx.analytics.tasks.common.mysql_load import MysqlInsertTask from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, EventLogSelectionMixin +from edx.analytics.tasks.common.spark import EventLogSelectionMixinSpark, SparkJobTask from edx.analytics.tasks.insights.calendar_task import CalendarTableTask +from edx.analytics.tasks.util.constants import PredicateLabels from edx.analytics.tasks.util.decorators import workflow_entry_point from edx.analytics.tasks.util.hive import BareHiveTableTask, HivePartitionTask, WarehouseMixin, hive_database_name from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin @@ -20,11 +22,6 @@ log = logging.getLogger(__name__) -ACTIVE_LABEL = "ACTIVE" -PROBLEM_LABEL = "ATTEMPTED_PROBLEM" -PLAY_VIDEO_LABEL = "PLAYED_VIDEO" -POST_FORUM_LABEL = "POSTED_FORUM" - class UserActivityTask(OverwriteOutputMixin, WarehouseMixin, EventLogSelectionMixin, MultiOutputMapReduceJobTask): """ @@ -73,18 +70,18 @@ def get_predicate_labels(self, event): if event_type.startswith('edx.course.enrollment.'): return [] - labels = [ACTIVE_LABEL] + labels = [PredicateLabels.ACTIVE_LABEL] if event_source == 'server': if event_type == 'problem_check': - labels.append(PROBLEM_LABEL) + labels.append(PredicateLabels.PROBLEM_LABEL) if event_type.startswith('edx.forum.') and event_type.endswith('.created'): - labels.append(POST_FORUM_LABEL) + labels.append(PredicateLabels.POST_FORUM_LABEL) if event_source in ('browser', 'mobile'): if event_type == 'play_video': - labels.append(PLAY_VIDEO_LABEL) + labels.append(PredicateLabels.PLAY_VIDEO_LABEL) return labels @@ -144,6 +141,76 @@ def run(self): return super(UserActivityTask, self).run() +class UserActivityTaskSpark(EventLogSelectionMixinSpark, WarehouseMixin, SparkJobTask): + """ + UserActivityTask converted to spark + + """ + + output_root = None + marker = luigi.Parameter( + config_path={'section': 'map-reduce', 'name': 'marker'}, + significant=False, + description='A URL location to a directory where a marker file will be written on task completion.', + ) + + def output_dir(self): + """ + Output directory for spark task + """ + return get_target_from_url(url_path_join(self.warehouse_path, 'user_activity')) + + def output(self): + marker_url = url_path_join(self.marker, str(hash(self))) + return get_target_from_url(marker_url, marker=True) + + def output_paths(self): + """ + Output partition paths + """ + table_path = url_path_join(self.warehouse_path, 'user_activity') + return map( + lambda date: get_target_from_url( + url_path_join( + table_path, 'dt={}'.format(date.isoformat()) + ) + ), + self.interval + ) + + def on_success(self): # pragma: no cover + """Overload the success method to touch the _SUCCESS file. Any class that uses a separate Marker file from the + data file will need to override the base on_success() call to create this marker.""" + self.output().touch_marker() + + def run(self): + self.remove_output_on_overwrite() + removed_partitions = [target.remove() for target in self.output_paths() if target.exists()] + super(UserActivityTaskSpark, self).run() + + def spark_job(self, *args): + from edx.analytics.tasks.util.spark_util import get_event_predicate_labels, get_course_id + from pyspark.sql.functions import udf, struct, split, explode, lit + from pyspark.sql.types import StringType + df = self.get_event_log_dataframe(self._spark) + # register udfs + get_labels = udf(get_event_predicate_labels, StringType()) + get_courseid = udf(get_course_id, StringType()) + df = df.filter( + (df['event_source'] != 'task') & + ~ df['event_type'].startswith('edx.course.enrollment.') & + (df['username'] != '') + ) + df = df.withColumn('all_labels', get_labels(df['event_type'], df['event_source'])) \ + .withColumn('course_id', get_courseid(df['context'])) + df = df.filter(df['course_id'] != '') # remove rows with empty course_id + df = df.withColumn('label', explode(split(df['all_labels'], ','))) + result = df.select('course_id', 'username', 'event_date', 'label') \ + .groupBy('course_id', 'username', 'event_date', 'label').count() + result = result.withColumn('dt', lit(result['event_date'])) # generate extra column for partitioning + result.coalesce(1).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t') + + class UserActivityDownstreamMixin(WarehouseMixin, EventLogSelectionDownstreamMixin, MapReduceJobTaskMixin): """All parameters needed to run the UserActivityTableTask task.""" @@ -169,10 +236,9 @@ def requires(self): overwrite_from_date = self.date - datetime.timedelta(days=self.overwrite_n_days) overwrite_interval = luigi.date_interval.Custom(overwrite_from_date, self.date) - yield UserActivityTask( + yield UserActivityTaskSpark( interval=overwrite_interval, warehouse_path=self.warehouse_path, - n_reduce_tasks=self.n_reduce_tasks, overwrite=True, ) diff --git a/edx/analytics/tasks/util/constants.py b/edx/analytics/tasks/util/constants.py new file mode 100644 index 0000000000..1c7e8bf1b5 --- /dev/null +++ b/edx/analytics/tasks/util/constants.py @@ -0,0 +1,7 @@ +class PredicateLabels(object): + """Constants for predicate labels.""" + + ACTIVE_LABEL = "ACTIVE" + PROBLEM_LABEL = "ATTEMPTED_PROBLEM" + PLAY_VIDEO_LABEL = "PLAYED_VIDEO" + POST_FORUM_LABEL = "POSTED_FORUM" diff --git a/edx/analytics/tasks/util/spark_util.py b/edx/analytics/tasks/util/spark_util.py new file mode 100644 index 0000000000..7669d68190 --- /dev/null +++ b/edx/analytics/tasks/util/spark_util.py @@ -0,0 +1,78 @@ +"""Support for spark tasks""" +import edx.analytics.tasks.util.opaque_key_util as opaque_key_util +from edx.analytics.tasks.util.constants import PredicateLabels + + +def get_event_predicate_labels(event_type, event_source): + """ + Creates labels by applying hardcoded predicates to a single event. + Don't pass whole event row to any spark UDF as it generates a different output than expected + """ + # We only want the explicit event, not the implicit form. + # return 'test' + + labels = PredicateLabels.ACTIVE_LABEL + + # task & enrollment events are filtered out by spark later as it speeds up due to less # of records + + if event_source == 'server': + if event_type == 'problem_check': + labels += ',' + PredicateLabels.PROBLEM_LABEL + + if event_type.startswith('edx.forum.') and event_type.endswith('.created'): + labels += ',' + PredicateLabels.POST_FORUM_LABEL + + if event_source in ('browser', 'mobile'): + if event_type == 'play_video': + labels += ',' + PredicateLabels.PLAY_VIDEO_LABEL + + return labels + + +def get_key_value_from_event(event, key, default_value=None): + """ + Get value from event dict by key + Pyspark does not support dict.get() method, so this approach seems reasonable + """ + try: + default_value = event[key] + except KeyError: + pass + return default_value + + +def get_course_id(event_context, from_url=False): + """ + Gets course_id from event's data. + Don't pass whole event row to any spark UDF as it generates a different output than expected + """ + if event_context == '' or event_context is None: + # Assume it's old, and not worth logging... + return '' + + # Get the course_id from the data, and validate. + course_id = opaque_key_util.normalize_course_id(get_key_value_from_event(event_context, 'course_id', '')) + if course_id: + if opaque_key_util.is_valid_course_id(course_id): + return course_id + + return '' + + # TODO : make it work with url as well + # Try to get the course_id from the URLs in `event_type` (for implicit + # server events) and `page` (for browser events). + # if from_url: + # source = get_key_value_from_event(event, 'event_source') + # + # if source == 'server': + # url = get_key_value_from_event(event, 'event_type', '') + # elif source == 'browser': + # url = get_key_value_from_event(event, 'page', '') + # else: + # url = '' + # + # course_key = opaque_key_util.get_course_key_from_url(url) + # if course_key: + # return unicode(course_key) + # + # return ''