diff --git a/source/common/changes/@awssolutions/cdf-assetlibrary-history/fix_assetlibrary_history_batching_2023-09-17-14-03.json b/source/common/changes/@awssolutions/cdf-assetlibrary-history/fix_assetlibrary_history_batching_2023-09-17-14-03.json new file mode 100644 index 000000000..82cb2c584 --- /dev/null +++ b/source/common/changes/@awssolutions/cdf-assetlibrary-history/fix_assetlibrary_history_batching_2023-09-17-14-03.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@awssolutions/cdf-assetlibrary-history", + "comment": "added Kinesis batching for assetlibraryhistory", + "type": "none" + } + ], + "packageName": "@awssolutions/cdf-assetlibrary-history" +} \ No newline at end of file diff --git a/source/common/changes/@awssolutions/cdf-device-monitoring/fix_assetlibrary_history_batching_2023-09-17-14-03.json b/source/common/changes/@awssolutions/cdf-device-monitoring/fix_assetlibrary_history_batching_2023-09-17-14-03.json new file mode 100644 index 000000000..374b6f04b --- /dev/null +++ b/source/common/changes/@awssolutions/cdf-device-monitoring/fix_assetlibrary_history_batching_2023-09-17-14-03.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@awssolutions/cdf-device-monitoring", + "comment": "", + "type": "none" + } + ], + "packageName": "@awssolutions/cdf-device-monitoring" +} \ No newline at end of file diff --git a/source/packages/services/assetlibraryhistory/infrastructure/cfn-assetLibraryHistory.yml b/source/packages/services/assetlibraryhistory/infrastructure/cfn-assetLibraryHistory.yml index 36ad0666e..c427a6c83 100644 --- a/source/packages/services/assetlibraryhistory/infrastructure/cfn-assetLibraryHistory.yml +++ b/source/packages/services/assetlibraryhistory/infrastructure/cfn-assetLibraryHistory.yml @@ -135,21 +135,43 @@ Resources: Properties: TopicRulePayload: Actions: - - Lambda: - FunctionArn: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${EventsLambdaFunction}' + - Kinesis: + PartitionKey: ${topic(4)} + RoleArn: !GetAtt KinesisExecutionRole.Arn + StreamName: !Ref EventsKinesisStream Description: 'Saves AssetLibrary configuration changes' AwsIotSqlVersion: '2016-03-23' - RuleDisabled: 'false' + RuleDisabled: false Sql: !Sub "SELECT * FROM '${AssetLibraryEventsTopic}'" - LambdaInvocationPermission: - Type: AWS::Lambda::Permission + KinesisExecutionRole: + Type: AWS::IAM::Role Properties: - SourceArn: !Sub 'arn:aws:iot:${AWS::Region}:${AWS::AccountId}:rule/${AssetLibraryEventsRule}' - Action: lambda:InvokeFunction - Principal: iot.amazonaws.com - FunctionName: !GetAtt EventsLambdaFunction.Arn - SourceAccount: !Ref AWS::AccountId + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - iot.amazonaws.com + Action: sts:AssumeRole + Path: '/cdf/assetlibraryhistory/' + ManagedPolicyArns: + - !Ref KinesisPolicy + + KinesisPolicy: + Type: 'AWS::IAM::ManagedPolicy' + Properties: + Description: 'Role policy for IoT rule engine' + Path: '/cdf/assetlibraryhistory/' + PolicyDocument: + Version: '2012-10-17' + Statement: + - Action: + - 'kinesis:PutRecord' + Effect: Allow + Resource: + - !GetAtt EventsKinesisStream.Arn LambdaExecutionRole: Type: AWS::IAM::Role @@ -186,6 +208,21 @@ Resources: Resource: - !GetAtt HistoryTable.Arn - !Sub '${HistoryTable.Arn}/index/type-time-index' + - PolicyName: 'Kinesis' + PolicyDocument: + Version: '2012-10-17' + Statement: + - Action: + - 'kinesis:DescribeStream' + - 'kinesis:DescribeStreamSummary' + - 'kinesis:GetRecords' + - 'kinesis:GetShardIterator' + - 'kinesis:ListShards' + - 'kinesis:ListStreams' + - 'kinesis:SubscribeToShard' + Effect: Allow + Resource: + - !GetAtt EventsKinesisStream.Arn RESTLambdaExecutionRole: Type: AWS::IAM::Role @@ -285,6 +322,13 @@ Resources: ReadCapacityUnits: '5' WriteCapacityUnits: '5' + EventsKinesisStream: + Type: AWS::Kinesis::Stream + Properties: + Name: !Sub 'cdf-assetlibrary-history-stream-${Environment}' + StreamModeDetails: + StreamMode: 'ON_DEMAND' + EventsLambdaFunction: Type: AWS::Serverless::Function Properties: @@ -294,13 +338,22 @@ Resources: MemorySize: 512 Role: !GetAtt LambdaExecutionRole.Arn Runtime: nodejs18.x - Timeout: 30 + Timeout: 120 Environment: Variables: APP_CONFIG_DIR: 'config' APP_CONFIG: !Ref ApplicationConfigurationOverride AWS_DYNAMODB_TABLE_EVENTS: !Ref HistoryTable Tracing: Active + Events: + Stream: + Type: Kinesis + Properties: + Stream: !GetAtt EventsKinesisStream.Arn + BatchSize: 1000 + MaximumBatchingWindowInSeconds: 60 + StartingPosition: LATEST + MaximumRetryAttempts: 1 RESTLambdaFunction: Type: AWS::Serverless::Function diff --git a/source/packages/services/assetlibraryhistory/src/events/events.models.ts b/source/packages/services/assetlibraryhistory/src/events/events.models.ts index e6f961ad2..7b3e0f7fc 100644 --- a/source/packages/services/assetlibraryhistory/src/events/events.models.ts +++ b/source/packages/services/assetlibraryhistory/src/events/events.models.ts @@ -21,6 +21,27 @@ export interface EventModel { attributes: { [key: string]: string }; } +export interface KinesisRecord { + kinesis: { + kinesisSchemaVersion: string; + partitionKey: string; + sequenceNumber: string; + data: string; + approimateArrivalTimestamp: number; + }; + eventSource: string; + eventVersion: string; + eventID: string; + eventName: string; + invokeIdentityArn: string; + awsRegion: string; + eventSourceARN: string; +} + +export interface KinesisRecords { + Records: KinesisRecord[]; +} + export interface StateHistoryModel { objectId: string; type: Category; diff --git a/source/packages/services/assetlibraryhistory/src/lambda_iot_rule.ts b/source/packages/services/assetlibraryhistory/src/lambda_iot_rule.ts index 268d8d9b6..4b7944262 100644 --- a/source/packages/services/assetlibraryhistory/src/lambda_iot_rule.ts +++ b/source/packages/services/assetlibraryhistory/src/lambda_iot_rule.ts @@ -12,20 +12,24 @@ *********************************************************************************************************************/ import 'reflect-metadata'; -import { container } from './di/inversify.config'; import { logger } from '@awssolutions/simple-cdf-logger'; +import { container } from './di/inversify.config'; import { TYPES } from './di/types'; -import { EventModel } from './events/events.models'; +import { EventModel, KinesisRecords } from './events/events.models'; import { EventsService } from './events/events.service'; const eventsService: EventsService = container.get(TYPES.EventsService); -exports.iot_rule_handler = async (event: EventModel, _context: unknown) => { +exports.iot_rule_handler = async (event: KinesisRecords, _context: unknown) => { logger.debug(`events.service create: in: event: ${JSON.stringify(event)}`); - // TODO validation - - await eventsService.create(event); - - logger.debug('events.service create: exit:'); + for (const record of event.Records) { + // Kinesis record data is base64 encoded + const payload: EventModel = JSON.parse( + Buffer.from(record.kinesis.data, 'base64').toString('ascii') + ); + // TODO validation + await eventsService.create(payload); + logger.debug('events.service create: exit:'); + } }; diff --git a/source/packages/services/device-monitoring/README.md b/source/packages/services/device-monitoring/README.md index 3ada3ed25..1e9f358cf 100644 --- a/source/packages/services/device-monitoring/README.md +++ b/source/packages/services/device-monitoring/README.md @@ -4,7 +4,7 @@ A device monitoring module provides near real-time device status - connected and ## Introduction -The device monitoring module utilizes [AWS IoT Lifecycle Events](https://docs.aws.amazon.com/iot/latest/developerguide/life-cycle-events.html) feature. Whenever the device connects or disconnects, AWS IoT Core emits a smaple event that is then picked up by AWS Lambda. It parses the eventType and then updating the device attributes in Asset Library to connected as `true` or `false` +The device monitoring module utilizes [AWS IoT Lifecycle Events](https://docs.aws.amazon.com/iot/latest/developerguide/life-cycle-events.html) feature. Whenever the device connects or disconnects, AWS IoT Core emits a sample event that is then picked up by AWS Lambda. It parses the eventType and then updating the device attributes in Asset Library to connected as `true` or `false` The following sample represents the schema that AWS IoT core emits when a device connects or disconnects