14
14
ManifestInputTargetMixin , convert_to_manifest_input_if_necessary , remove_manifest_target_if_exists
15
15
)
16
16
from edx .analytics .tasks .util .overwrite import OverwriteOutputMixin
17
- from edx .analytics .tasks .util .url import get_target_from_url , url_path_join
17
+ from edx .analytics .tasks .util .url import UncheckedExternalURL , get_target_from_url , url_path_join
18
18
19
19
_file_path_to_package_meta_path = {}
20
20
@@ -163,31 +163,42 @@ class PathSelectionTaskSpark(EventLogSelectionDownstreamMixin, luigi.WrapperTask
163
163
"""
164
164
Path selection task with manifest feature for spark
165
165
"""
166
- requirements = None
166
+ targets = None
167
167
manifest_id = luigi .Parameter (
168
168
description = 'File name for manifest'
169
169
)
170
+ manifest_dir = luigi .Parameter (
171
+ description = 'Directory for manifest files'
172
+ )
173
+ pyspark_logger = luigi .Parameter (
174
+ description = 'Pyspark logger' ,
175
+ default = None
176
+ )
170
177
171
178
def requires (self ):
172
- yield PathSelectionByDateIntervalTask (
179
+ if not self .targets :
180
+ if self .pyspark_logger :
181
+ self .pyspark_logger .warn ("PathSelectionTaskSpark=> targets not found, refreshing!" )
182
+ self .targets = self ._get_targets ()
183
+ else :
184
+ if self .pyspark_logger :
185
+ self .pyspark_logger .warn ("PathSelectionTaskSpark=> targets already exist" )
186
+ return self .targets
187
+
188
+ def _get_targets (self ):
189
+ input = PathSelectionByDateIntervalTask (
173
190
source = self .source ,
174
191
interval = self .interval ,
175
192
pattern = self .pattern ,
176
193
date_pattern = self .date_pattern
194
+ ).output ()
195
+ targets = luigi .task .flatten (
196
+ convert_to_manifest_input_if_necessary (self .manifest_id , input , self .manifest_dir )
177
197
)
178
-
179
- def get_target_paths (self ):
180
- log .warn ("PathSelectionTaskSpark: checking requirements {}" .format (self .manifest_id ))
181
- if not self .requirements :
182
- log .warn ("PathSelectionTaskSpark: requirements not found, refreshing!!" )
183
- targets = luigi .task .flatten (
184
- convert_to_manifest_input_if_necessary (self .manifest_id , self .input ())
185
- )
186
- self .requirements = targets
187
- return self .requirements
198
+ return [UncheckedExternalURL (target .path ) for target in targets ]
188
199
189
200
def output (self ):
190
- return self .get_target_paths ()
201
+ return [ target . output () for target in self .requires ()]
191
202
192
203
193
204
class EventLogSelectionMixinSpark (EventLogSelectionDownstreamMixin ):
@@ -240,19 +251,26 @@ def get_log_schema(self):
240
251
return event_log_schema
241
252
242
253
def get_input_rdd (self , * args ):
243
- manifest_target = self .get_manifest_path (* args )
244
- self .log .warn ("PYSPARK LOGGER : Getting input rdd ---> target : {}" .format (manifest_target .path ))
245
- if manifest_target .exists ():
254
+ manifest_path = self .get_config_from_args ('manifest_path' , * args , default_value = '' )
255
+ targets = PathSelectionTaskSpark (
256
+ source = self .source ,
257
+ interval = self .interval ,
258
+ pattern = self .pattern ,
259
+ date_pattern = self .date_pattern ,
260
+ manifest_id = self .manifest_id ,
261
+ manifest_dir = manifest_path ,
262
+ pyspark_logger = self .log
263
+ ).output ()
264
+ if len (targets ) and 'manifest' in targets [0 ].path :
246
265
# Reading manifest as rdd with spark is alot faster as compared to hadoop.
247
266
# Currently, we're getting only 1 manifest file per request, so we will create a single rdd from it.
248
267
# If there are multiple manifest files, each file can be read as rdd and then union it with other manifest rdds
249
- self .log .warn ("PYSPARK LOGGER: Reading manifest file :: {} " .format (manifest_target .path ))
250
- source_rdd = self ._spark .sparkContext .textFile (manifest_target .path )
268
+ self .log .warn ("PYSPARK LOGGER: Reading manifest file :: {} " .format (targets [ 0 ] .path ))
269
+ source_rdd = self ._spark .sparkContext .textFile (targets [ 0 ] .path , 1 )
251
270
else :
252
271
# maybe we only need to broadcast it ( on cluster ) and not create rdd. lets see
253
272
self .log .warn ("PYSPARK LOGGER: Reading normal targets" )
254
- input_targets = luigi .task .flatten (self .input ())
255
- source_rdd = self ._spark .sparkContext .parallelize ([target .path for target in input_targets ])
273
+ source_rdd = self ._spark .sparkContext .parallelize ([target .path for target in targets ])
256
274
return source_rdd
257
275
258
276
def get_event_log_dataframe (self , spark , * args , ** kwargs ):
@@ -309,7 +327,7 @@ def manifest_id(self):
309
327
'interval' : self .interval ,
310
328
'pattern' : self .pattern ,
311
329
'date_pattern' : self .date_pattern ,
312
- 'spark' :'for_some_difference_with_hadoop_manifest'
330
+ 'spark' : 'for_some_difference_with_hadoop_manifest'
313
331
}
314
332
return str (hash (frozenset (params .items ()))).replace ('-' , 'n' )
315
333
@@ -322,15 +340,6 @@ def get_manifest_path(self, *args):
322
340
)
323
341
)
324
342
325
- def requires (self ):
326
- yield PathSelectionTaskSpark (
327
- source = self .source ,
328
- interval = self .interval ,
329
- pattern = self .pattern ,
330
- date_pattern = self .date_pattern ,
331
- manifest_id = self .manifest_id
332
- )
333
-
334
343
def spark_job (self ):
335
344
"""
336
345
Spark code for the job
0 commit comments