@@ -195,32 +195,65 @@ def get_event_log_dataframe(self, spark, *args, **kwargs):
195
195
return dataframe
196
196
197
197
198
- class SparkJobTask (OverwriteOutputMixin , PySparkTask ):
199
- """
200
- Wrapper for spark task
201
- """
202
-
203
- _spark = None
204
- _spark_context = None
205
- _sql_context = None
206
- _hive_context = None
207
- _tmp_dir = None
208
-
198
+ class SparkConfigurationMixin (object ):
199
+ """Configuration parameters for spark task."""
209
200
driver_memory = luigi .Parameter (
210
201
config_path = {'section' : 'spark' , 'name' : 'driver-memory' },
211
202
description = 'Memory for spark driver' ,
212
203
significant = False ,
213
204
)
205
+ driver_cores = luigi .Parameter (
206
+ config_path = {'section' : 'spark' , 'name' : 'driver-cores' },
207
+ description = 'Number of cores for driver' ,
208
+ significant = False ,
209
+ )
214
210
executor_memory = luigi .Parameter (
215
211
config_path = {'section' : 'spark' , 'name' : 'executor-memory' },
216
212
description = 'Memory for each executor' ,
217
213
significant = False ,
218
214
)
219
215
executor_cores = luigi .Parameter (
220
216
config_path = {'section' : 'spark' , 'name' : 'executor-cores' },
221
- description = 'No. of cores for each executor' ,
217
+ description = 'Number of cores for each executor' ,
218
+ significant = False ,
219
+ )
220
+ num_executors = luigi .Parameter (
221
+ config_path = {'section' : 'spark' , 'name' : 'num-executors' },
222
+ description = 'Number of executors to launch' ,
223
+ significant = False ,
224
+ )
225
+ master = luigi .Parameter (
226
+ config_path = {'section' : 'spark' , 'name' : 'master' },
227
+ description = 'Master url for spark job' ,
222
228
significant = False ,
223
229
)
230
+ deploy_mode = luigi .Parameter (
231
+ config_path = {'section' : 'spark' , 'name' : 'deploy-mode' },
232
+ description = 'Deploy mode for driver program' ,
233
+ significant = False ,
234
+ )
235
+ spark_config = luigi .Parameter (
236
+ config_path = {'section' : 'spark' , 'name' : 'conf' },
237
+ description = 'Spark configuration' ,
238
+ default = []
239
+ )
240
+
241
+ @property
242
+ def conf (self ):
243
+ return self ._dict_config (self .spark_config )
244
+
245
+
246
+ class SparkJobTask (SparkConfigurationMixin , OverwriteOutputMixin , PySparkTask ):
247
+ """
248
+ Wrapper for spark task
249
+ """
250
+
251
+ _spark = None
252
+ _spark_context = None
253
+ _sql_context = None
254
+ _hive_context = None
255
+ _tmp_dir = None
256
+
224
257
always_log_stderr = False # log stderr if spark fails, True for verbose log
225
258
226
259
def init_spark (self , sc ):
0 commit comments