diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/README.md b/packages/aws-cdk-lib/aws-lambda-event-sources/README.md index 57aa2b3fe3e5c..a2b0d9f9483ab 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/README.md +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/README.md @@ -93,7 +93,7 @@ fn.addEventSource(new S3EventSource(bucket, { ``` In the example above, `S3EventSource` is accepting `Bucket` type as parameter. -However, Functions like `from_bucket_name` and `from_bucket_arn` will return `IBucket` +However, Functions like `from_bucket_name` and `from_bucket_arn` will return `IBucket` and is not compliant with `S3EventSource`. If this is the case, please consider using `S3EventSourceV2` instead, this class accepts `IBucket`. @@ -189,7 +189,7 @@ fn.addEventSource(new DynamoEventSource(table, { })); ``` -The following code sets up a Lambda function with a DynamoDB event source. A filter is applied to only send DynamoDB events to +The following code sets up a Lambda function with a DynamoDB event source. A filter is applied to only send DynamoDB events to the Lambda function when the `id` column is a boolean that equals `true`. ```ts @@ -232,7 +232,7 @@ behavior: * __onFailure__: In the event a record fails and consumes all retries, the record will be sent to SQS queue or SNS topic that is specified here * __parallelizationFactor__: The number of batches to concurrently process on each shard. * __retryAttempts__: The maximum number of times a record should be retried in the event of failure. -* __startingPosition__: Will determine where to begin consumption. 'LATEST' will start at the most recent record and ignore all records that arrived prior to attaching the event source, 'TRIM_HORIZON' will start at the oldest record and ensure you process all available data, while 'AT_TIMESTAMP' will start reading records from a specified time stamp. Note that 'AT_TIMESTAMP' is only supported for Amazon Kinesis streams. +* __startingPosition__: Will determine where to begin consumption. 'LATEST' will start at the most recent record and ignore all records that arrived prior to attaching the event source, 'TRIM_HORIZON' will start at the oldest record and ensure you process all available data, while 'AT_TIMESTAMP' will start reading records from a specified time stamp. * __startingPositionTimestamp__: The time stamp from which to start reading. Used in conjunction with __startingPosition__ when set to 'AT_TIMESTAMP'. * __tumblingWindow__: The duration in seconds of a processing window when using streams. * __enabled__: If the DynamoDB Streams event source mapping should be enabled. The default is true. @@ -252,7 +252,14 @@ myFunction.addEventSource(new KinesisEventSource(stream, { ## Kafka -You can write Lambda functions to process data either from [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) or a [self managed Kafka](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) cluster. +You can write Lambda functions to process data either from [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) or a [self-managed Kafka](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) cluster. The following parameters will impact to the polling behavior: + +* __startingPosition__: Will determine where to begin consumption. 'LATEST' will start at the most recent record and ignore all records that arrived prior to attaching the event source, 'TRIM_HORIZON' will start at the oldest record and ensure you process all available data, while 'AT_TIMESTAMP' will start reading records from a specified time stamp. +* __startingPositionTimestamp__: The time stamp from which to start reading. Used in conjunction with __startingPosition__ when set to 'AT_TIMESTAMP'. +* __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low). +* __maxBatchingWindow__: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of possibly delaying processing. +* __onFailure__: In the event a record fails and consumes all retries, the record will be sent to SQS queue or SNS topic that is specified here +* __enabled__: If the Kafka event source mapping should be enabled. The default is true. The following code sets up Amazon MSK as an event source for a lambda function. Credentials will need to be configured to access the MSK cluster, as described in [Username/Password authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html). diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts index 3c75a45a51447..95fcace8848c8 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts @@ -55,6 +55,13 @@ export interface KafkaEventSourceProps extends BaseStreamEventSourceProps { * @default - discarded records are ignored */ readonly onFailure?: lambda.IEventSourceDlq; + + /** + * The time from which to start reading, in Unix time seconds. + * + * @default - no timestamp + */ + readonly startingPositionTimestamp?: number; } /** @@ -148,6 +155,15 @@ export class ManagedKafkaEventSource extends StreamEventSource { constructor(props: ManagedKafkaEventSourceProps) { super(props); + + if (props.startingPosition === lambda.StartingPosition.AT_TIMESTAMP && !props.startingPositionTimestamp) { + throw new Error('startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP'); + } + + if (props.startingPosition !== lambda.StartingPosition.AT_TIMESTAMP && props.startingPositionTimestamp) { + throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP'); + } + this.innerProps = props; } @@ -159,6 +175,7 @@ export class ManagedKafkaEventSource extends StreamEventSource { filters: this.innerProps.filters, filterEncryption: this.innerProps.filterEncryption, startingPosition: this.innerProps.startingPosition, + startingPositionTimestamp: this.innerProps.startingPositionTimestamp, sourceAccessConfigurations: this.sourceAccessConfigurations(), kafkaTopic: this.innerProps.topic, kafkaConsumerGroupId: this.innerProps.consumerGroupId, @@ -239,6 +256,15 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { } else if (!props.secret) { throw new Error('secret must be set if Kafka brokers accessed over Internet'); } + + if (props.startingPosition === lambda.StartingPosition.AT_TIMESTAMP && !props.startingPositionTimestamp) { + throw new Error('startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP'); + } + + if (props.startingPosition !== lambda.StartingPosition.AT_TIMESTAMP && props.startingPositionTimestamp) { + throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP'); + } + this.innerProps = props; } @@ -253,6 +279,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { kafkaTopic: this.innerProps.topic, kafkaConsumerGroupId: this.innerProps.consumerGroupId, startingPosition: this.innerProps.startingPosition, + startingPositionTimestamp: this.innerProps.startingPositionTimestamp, sourceAccessConfigurations: this.sourceAccessConfigurations(), onFailure: this.innerProps.onFailure, supportS3OnFailureDestination: true, diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts index d5b1df2c9a657..074e34aeb938a 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts @@ -308,6 +308,48 @@ describe('KafkaEventSource', () => { }); }); + test('AT_TIMESTAMP starting position', () => { + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + fn.addEventSource(new sources.ManagedKafkaEventSource({ + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + }), + ); + + Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', { + StartingPosition: 'AT_TIMESTAMP', + StartingPositionTimestamp: 1640995200, + }); + }); + + test('startingPositionTimestamp missing throws error', () => { + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => new sources.ManagedKafkaEventSource({ + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.AT_TIMESTAMP, + })).toThrow(/startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP/); + }); + + test('startingPositionTimestamp without AT_TIMESTAMP throws error', () => { + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => new sources.ManagedKafkaEventSource({ + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.LATEST, + startingPositionTimestamp: 1640995200, + })).toThrow(/startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP/); + }); }); describe('self-managed kafka', () => { @@ -998,5 +1040,56 @@ describe('KafkaEventSource', () => { expect(mskEventMapping.eventSourceMappingId).toBeDefined(); expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); }); + + test('AT_TIMESTAMP starting position', () => { + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const bootstrapServers = ['kafka-broker:9092']; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const kafkaTopic = 'some-topic'; + + fn.addEventSource(new sources.SelfManagedKafkaEventSource({ + bootstrapServers, + secret: secret, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + }), + ); + + Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', { + StartingPosition: 'AT_TIMESTAMP', + StartingPositionTimestamp: 1640995200, + }); + }); + + test('startingPositionTimestamp missing throws error', () => { + const stack = new cdk.Stack(); + const bootstrapServers = ['kafka-broker:9092']; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const kafkaTopic = 'some-topic'; + + expect(() => new sources.SelfManagedKafkaEventSource({ + bootstrapServers, + secret: secret, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.AT_TIMESTAMP, + })).toThrow(/startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP/); + }); + + test('startingPositionTimestamp without AT_TIMESTAMP throws error', () => { + const stack = new cdk.Stack(); + const bootstrapServers = ['kafka-broker:9092']; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const kafkaTopic = 'some-topic'; + + expect(() => new sources.SelfManagedKafkaEventSource({ + bootstrapServers, + secret: secret, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.LATEST, + startingPositionTimestamp: 1640995200, + })).toThrow(/startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP/); + }); }); });