@@ -195,32 +195,62 @@ def get_event_log_dataframe(self, spark, *args, **kwargs):
195
195
return dataframe
196
196
197
197
198
- class SparkJobTask (OverwriteOutputMixin , PySparkTask ):
199
- """
200
- Wrapper for spark task
201
- """
202
-
203
- _spark = None
204
- _spark_context = None
205
- _sql_context = None
206
- _hive_context = None
207
- _tmp_dir = None
208
-
198
+ class SparkConfigurationMixin (object ):
199
+ """Configuration parameters for spark task."""
209
200
driver_memory = luigi .Parameter (
210
201
config_path = {'section' : 'spark' , 'name' : 'driver-memory' },
211
202
description = 'Memory for spark driver' ,
212
203
significant = False ,
213
204
)
205
+ driver_cores = luigi .Parameter (
206
+ config_path = {'section' : 'spark' , 'name' : 'driver-cores' },
207
+ description = 'Number of cores for driver' ,
208
+ significant = False ,
209
+ )
214
210
executor_memory = luigi .Parameter (
215
211
config_path = {'section' : 'spark' , 'name' : 'executor-memory' },
216
212
description = 'Memory for each executor' ,
217
213
significant = False ,
218
214
)
219
215
executor_cores = luigi .Parameter (
220
216
config_path = {'section' : 'spark' , 'name' : 'executor-cores' },
221
- description = 'No. of cores for each executor' ,
217
+ description = 'Number of cores for each executor' ,
218
+ significant = False ,
219
+ )
220
+ num_executors = luigi .Parameter (
221
+ config_path = {'section' : 'spark' , 'name' : 'num-executors' },
222
+ description = 'Number of executors to launch' ,
223
+ significant = False ,
224
+ )
225
+ master = luigi .Parameter (
226
+ config_path = {'section' : 'spark' , 'name' : 'master' },
227
+ description = 'Master url for spark job' ,
222
228
significant = False ,
223
229
)
230
+ deploy_mode = luigi .Parameter (
231
+ config_path = {'section' : 'spark' , 'name' : 'deploy-mode' },
232
+ description = 'Deploy mode for driver program' ,
233
+ significant = False ,
234
+ )
235
+ conf = luigi .Parameter (
236
+ config_path = {'section' : 'spark' , 'name' : 'conf' },
237
+ description = 'Spark configuration' ,
238
+ default = None ,
239
+ significant = False ,
240
+ )
241
+
242
+
243
+ class SparkJobTask (SparkConfigurationMixin , OverwriteOutputMixin , PySparkTask ):
244
+ """
245
+ Wrapper for spark task
246
+ """
247
+
248
+ _spark = None
249
+ _spark_context = None
250
+ _sql_context = None
251
+ _hive_context = None
252
+ _tmp_dir = None
253
+
224
254
always_log_stderr = False # log stderr if spark fails, True for verbose log
225
255
226
256
def init_spark (self , sc ):
0 commit comments