Skip to content

Commit b1bd9a7

Browse files
author
EC2 Default User
committed
Adding support for workflow cron expressions. Some changes to how coarse IAM permissions are granted to reduce LakeFormation location permisison redundancy.
2 parents b8f422f + 1b2e9a5 commit b1bd9a7

File tree

7 files changed

+75
-27
lines changed

7 files changed

+75
-27
lines changed

baseline_binaries/ojdbc8.jar

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
YOU NEED TO DOWNLOAD ojdbc8.jar FROM https://www.oracle.com/database/technologies/appdev/jdbc-ucp-19c-downloads.html and replace this file with the file you downloaded.

bin/aws.ts

+9
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ const exisitingResourceImportStack = new cdk.Stack(app, 'resourceImportStack', {
5050

5151

5252

53+
5354
// Grant permissions:
5455

5556
// const exampleExistingIamUser = iam.User.fromUserName(exisitingResourceImportStack, 'exampleUserGrantee', '--- EXISTING IAM USERNAME GOES HERE --' );
@@ -59,6 +60,14 @@ const exisitingResourceImportStack = new cdk.Stack(app, 'resourceImportStack', {
5960
// exampleS3DataSet.grantIamRead(exampleExistingIamRole);
6061

6162

63+
// chemblStack.grantIamRead(analyticsStack.NotebookRole);
64+
// openTargetsStack.grantIamRead(analyticsStack.NotebookRole);
65+
// bindingDBStack.grantIamRead(analyticsStack.NotebookRole);
66+
67+
68+
// const exampleUser = iam.User.fromUserName(coreDataLake, 'exampleGrantee', 'paul1' );
69+
70+
6271
// var exampleGrant: DataLakeEnrollment.TablePermissionGrant = {
6372
// tables: ["association_data", "evidence_data","target_list","disease_list"],
6473
// DatabasePermissions: [DataLakeEnrollment.DatabasePermission.Alter, DataLakeEnrollment.DatabasePermission.CreateTable, DataLakeEnrollment.DatabasePermission.Drop],

lib/constructs/data-lake-enrollment.ts

+4
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@ export class DataLakeEnrollment extends cdk.Construct {
1818
public CoarseAthenaAccessPolicy: iam.ManagedPolicy;
1919
private CoarseResourceAccessPolicy: iam.ManagedPolicy;
2020
private CoarseIamPolciesApplied: boolean;
21+
private WorkflowCronScheduleExpression?: string;
22+
2123

2224
constructor(scope: cdk.Construct, id: string, props: DataLakeEnrollment.DataLakeEnrollmentProps) {
2325
super(scope, id);
2426

2527

2628
this.DataSetName = props.DataSetName;
2729
this.CoarseIamPolciesApplied = false;
30+
this.WorkflowCronScheduleExpression = props.WorkflowCronScheduleExpression;
2831

2932
}
3033

@@ -542,6 +545,7 @@ export namespace DataLakeEnrollment
542545
GlueScriptPath: string;
543546
GlueScriptArguments: any;
544547
DataSetName: string;
548+
WorkflowCronScheduleExpression?: string;
545549
}
546550

547551
export interface DatabasePermissionGrant {

lib/constructs/data-set-enrollment.ts

+40-15
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ export interface DataSetEnrollmentProps extends cdk.StackProps {
154154
GlueScriptArguments: any;
155155
SourceAccessPolicy?: iam.Policy;
156156
MaxDPUs: number;
157+
WorkflowCronScheduleExpression?: string;
157158
}
158159

159160

@@ -258,7 +259,7 @@ export class DataSetEnrollment extends cdk.Construct {
258259
},
259260
name: `${props.dataSetName}_src_to_dl_etl`,
260261
timeout: 2880,
261-
glueVersion: "1.0",
262+
glueVersion: "2.0",
262263
maxCapacity: props.MaxDPUs,
263264
command: {
264265
scriptLocation: `s3://${glueScript.s3BucketName}/${glueScript.s3ObjectKey}`,
@@ -292,12 +293,13 @@ export class DataSetEnrollment extends cdk.Construct {
292293
workfowName: `${props.dataSetName}_DataLakeEnrollmentWorkflow`,
293294
srcCrawler: sourceCrawler,
294295
etlJob: etl_job,
295-
datalakeCrawler: datalake_crawler
296-
296+
datalakeCrawler: datalake_crawler,
297+
WorkflowCronScheduleExpression: props.WorkflowCronScheduleExpression
297298
});
298299

299-
}
300+
}
300301

302+
301303

302304
}
303305

@@ -307,32 +309,55 @@ export interface DataLakeEnrollmentWorkflowProps {
307309
srcCrawler: glue.CfnCrawler,
308310
etlJob: glue.CfnJob,
309311
datalakeCrawler: glue.CfnCrawler
312+
WorkflowCronScheduleExpression?: string;
310313
}
311314

312315
export class DataLakeEnrollmentWorkflow extends cdk.Construct {
313316

314-
public readonly StartTrigger: glue.CfnTrigger;
317+
public StartTrigger: glue.CfnTrigger;
315318
public readonly SrcCrawlerCompleteTrigger: glue.CfnTrigger;
316319
public readonly ETLCompleteTrigger: glue.CfnTrigger;
317320
public readonly Workflow: glue.CfnWorkflow;
321+
private readonly sourceCrawler: glue.CfnCrawler;
318322

319323
constructor(scope: cdk.Construct, id: string, props: DataLakeEnrollmentWorkflowProps) {
320324
super(scope, id);
321325

322326
this.Workflow = new glue.CfnWorkflow(this, "etlWorkflow", {
323327
name: props.workfowName
324328
});
329+
330+
this.sourceCrawler = props.srcCrawler;
325331

326-
this.StartTrigger = new glue.CfnTrigger(this,"startTrigger",{
327-
actions: [
328-
{
329-
crawlerName: props.srcCrawler.name
330-
}
331-
],
332-
type: "ON_DEMAND",
333-
name: `startWorkflow-${this.Workflow.name}`,
334-
workflowName: this.Workflow.name
335-
});
332+
if(props.WorkflowCronScheduleExpression == null){
333+
this.StartTrigger = new glue.CfnTrigger(this,"startTrigger",{
334+
actions: [
335+
{
336+
crawlerName: props.srcCrawler.name
337+
}
338+
],
339+
type: "ON_DEMAND",
340+
name: `startWorkflow-${this.Workflow.name}`,
341+
workflowName: this.Workflow.name
342+
});
343+
344+
}else{
345+
346+
this.StartTrigger = new glue.CfnTrigger(this,"startTrigger",{
347+
actions: [
348+
{
349+
crawlerName: this.sourceCrawler.name
350+
}
351+
],
352+
type: "SCHEDULED",
353+
schedule: props.WorkflowCronScheduleExpression,
354+
name: `startWorkflow-${this.Workflow.name}`,
355+
workflowName: this.Workflow.name
356+
});
357+
}
358+
359+
360+
336361

337362
this.SrcCrawlerCompleteTrigger = new glue.CfnTrigger(this,"srcCrawlerCompleteTrigger",{
338363
predicate: {

lib/constructs/s3-data-set-enrollment.ts

+15-6
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,21 @@ export class S3dataSetEnrollment extends DataLakeEnrollment{
8181
var tableFolderName = prefixFolders[prefixFolders.length-2]
8282
var tableFolderName = tableFolderName.toLowerCase().replace(/\./g,"_").replace(/-/g,"_");
8383

84-
s3DataLakePaths.push({
85-
path: `s3://${props.dataLakeBucket.bucketName}/${dataSetName}/${tableFolderName}/`
86-
});
84+
if(props.sourceBucketDataPrefixes.length > 1){
85+
s3DataLakePaths.push({
86+
path: `s3://${props.dataLakeBucket.bucketName}/${dataSetName}/${tableFolderName}/`
87+
});
88+
}else{
89+
s3DataLakePaths.push({
90+
path: `s3://${props.dataLakeBucket.bucketName}/${dataSetName}/`
91+
});
92+
}
93+
94+
8795
}
96+
8897

89-
90-
98+
9199
this.DataEnrollment = new DataSetEnrollment(this, `${props.DataSetName}-s3Enrollment`, {
92100
dataLakeBucket: props.dataLakeBucket,
93101
dataSetName: dataSetName,
@@ -100,7 +108,8 @@ export class S3dataSetEnrollment extends DataLakeEnrollment{
100108
DataLakeTargets: {
101109
s3Targets: s3DataLakePaths
102110
},
103-
GlueScriptArguments: props.GlueScriptArguments
111+
GlueScriptArguments: props.GlueScriptArguments,
112+
WorkflowCronScheduleExpression: props.WorkflowCronScheduleExpression
104113
});
105114

106115

scripts/glue.s3import.fullcopy.rds.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@
4545

4646
for table in tables:
4747

48-
datasource = glueContext.create_dynamic_frame.from_catalog(database = glue_database, table_name = table)
49-
dropnullfields = DropNullFields.apply(frame = datasource, transformation_ctx = "dropnullfields1")
48+
datasource = glueContext.create_dynamic_frame.from_catalog(database = glue_database, table_name = table, transformation_ctx = "datasource")
49+
dropnullfields = DropNullFields.apply(frame = datasource, transformation_ctx = "dropnullfields")
5050

5151
try:
52-
datasink = glueContext.write_dynamic_frame.from_options(frame = dropnullfields, connection_type = "s3", connection_options = {"path": "s3://"+dataLakeBucket + dataLakePrefix + table}, format = target_format)
52+
datasink = glueContext.write_dynamic_frame.from_options(frame = dropnullfields, connection_type = "s3", connection_options = {"path": "s3://"+dataLakeBucket + dataLakePrefix + table}, format = target_format, transformation_ctx = "datasink")
5353
except:
5454
print("Unable to write" + table)
5555

scripts/glue.s3import.fullcopy.s3.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@
4545

4646
for table in tables:
4747

48-
datasource = glueContext.create_dynamic_frame.from_catalog(database = glue_database, table_name = table)
49-
dropnullfields = DropNullFields.apply(frame = datasource, transformation_ctx = "dropnullfields1")
48+
datasource = glueContext.create_dynamic_frame.from_catalog(database = glue_database, table_name = table, transformation_ctx = "datasource")
49+
dropnullfields = DropNullFields.apply(frame = datasource, transformation_ctx = "dropnullfields")
5050

5151
try:
52-
datasink = glueContext.write_dynamic_frame.from_options(frame = dropnullfields, connection_type = "s3", connection_options = {"path": "s3://"+dataLakeBucket + dataLakePrefix + table}, format = target_format)
52+
datasink = glueContext.write_dynamic_frame.from_options(frame = dropnullfields, connection_type = "s3", connection_options = {"path": "s3://"+dataLakeBucket + dataLakePrefix + table}, format = target_format, transformation_ctx = "datasink")
5353
except:
5454
print("Unable to write" + table)
5555

0 commit comments

Comments
 (0)