-
Notifications
You must be signed in to change notification settings - Fork 116
Conversation
8db79a8
to
5e955a6
Compare
Just noting that the travis builds failed: "ERROR: /edx/app/analytics-pipeline/edx/analytics/tasks/insights/tests/test_user_activity.py Imports are incorrectly sorted." |
@brianhw I sorted it with isort but it did not go away |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is really going to master, or if the proposal was to run this in its branch. If just the latter, then most of my nit-picking comments can be ignored. (I have different standards for master than for experimental branches.)
The main question I have is scope -- should this include as well all the Hive replacement that I thought you wrote? I only see the initial history task included here.
But if we were to run the hive replacement workflow, then I think it should be defined separately and pulled into a separate file, and possibly inheriting and overriding some original classes as necessary to be more DRY about it.
""" | ||
Create a zip archive for all the packages listed in packages and returns the list of zip file location. | ||
""" | ||
import zipfile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this here? It's already imported at the top of the file.
archives_list.append(package_zip_path) | ||
|
||
def add(src, dst, package_name): | ||
# Ensure any entry points and other egg-info metadata is also transmitted along with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/is/are/
.add("time", StringType(), True) \ | ||
.add("name", StringType(), True) \ | ||
.add("page", StringType(), True) \ | ||
.add("session", StringType(), True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there are a bunch of schema entries here that we do not use for any of our workflows. I would suggest that this base version be more minimal. If there are other entries that are needed in special cases, this can be overridden, or added to using a super() call.
I would suggest that all we need here are: user_id, course_id, org_id from context, and username, event_type, ip, event_source, context, and time from the base level.
Get spark based schema for processing event logs | ||
:return: Spark schema | ||
""" | ||
from pyspark.sql.types import StructType, StringType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be at the top of the file?
return event_log_schema | ||
|
||
def get_event_log_dataframe(self, spark, *args, **kwargs): | ||
from pyspark.sql.functions import to_date, udf, struct, date_format |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be at the top of the file?
from edx.analytics.tasks.util.tests.opaque_key_mixins import InitializeLegacyKeysMixin, InitializeOpaqueKeysMixin | ||
from mock import Mock, call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not clear on the reorganization here, but it breaks the isort test in the quality build. We separate out python imports, third party imports, and then our local code imports into separate groups. Presumably the ddt and mock imports should go back where they were.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isort fails the test in both cases, whether they are organized this way or not. Though when sorted on local, this is how isort rearranges it.
@@ -11,7 +11,9 @@ | |||
from edx.analytics.tasks.common.mapreduce import MapReduceJobTaskMixin, MultiOutputMapReduceJobTask | |||
from edx.analytics.tasks.common.mysql_load import MysqlInsertTask | |||
from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, EventLogSelectionMixin | |||
from edx.analytics.tasks.common.spark import EventLogSelectionMixinSpark, SparkJobTask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so this may answer my question about imports. If we put them at the top of common/spark.py, and this is included in our regular code, then we're making everything dependent on spark code. I think it should be pulled out into a separate workflow.
[spark] | ||
driver-memory=3g | ||
executor-memory=3g | ||
executor-cores=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add newline.
options = {} | ||
task_config = self.get_luigi_configuration() # load task dependencies first if any | ||
if isinstance(task_config, dict): | ||
options = task_config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these three lines above be removed? Was it an experiment that didn't work out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, let me explain these functions in more detail.
Since luigi configuration cannot be retrieved inside spark task, any spark task which will need to access those configuration should overwride get_luigi_configuration method and return required luigi configurations as a dictionary.
app_options method will then convert these dictionary configurations and pass it to spark task as command line arguments via spark-submit. Again, these methods aren't used in this task but they will be used later with other spark tasks.
|
||
def app_options(self): | ||
""" | ||
List of options that needs to be passed to spark task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does this get called? Also, maybe provide more explanation as to why the call to Luigi's configuration is being done here. It's not clear to me why we couldn't put this logic into _load_internal_dependency_on_cluster().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Luigi configuration cannot be retrieved directly inside a spark job. Due to this issue, any configuration that is required by spark job is provided as a command line argument to spark-submit. This method is called directly by luigi to map custom parameters to arguments.
This method along with get_luigi_configuration aren't used in this particular workflow but will be used for other spark tasks as used in #476
Purpose of this PR is to convert an existing hadoop task to spark & use it in production for atleast a month and see how it performs as well as figure out any issues encountered during this trial run.
Related ticket DE-573