File tree Expand file tree Collapse file tree 2 files changed +29
-10
lines changed
source/packages/services/assetlibraryhistory Expand file tree Collapse file tree 2 files changed +29
-10
lines changed Original file line number Diff line number Diff line change @@ -135,11 +135,13 @@ Resources:
135135 Properties :
136136 TopicRulePayload :
137137 Actions :
138- - Lambda :
139- FunctionArn : !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${EventsLambdaFunction}'
138+ - Kinesis :
139+ PartitionKey : ${topic(4)}
140+ RoleArn : !GetAtt LambdaExecutionRole.Arn
141+ StreamName : !Ref EventsKinesisStream
140142 Description : ' Saves AssetLibrary configuration changes'
141143 AwsIotSqlVersion : ' 2016-03-23'
142- RuleDisabled : ' false'
144+ RuleDisabled : false
143145 Sql : !Sub "SELECT * FROM '${AssetLibraryEventsTopic}'"
144146
145147 LambdaInvocationPermission :
@@ -285,6 +287,13 @@ Resources:
285287 ReadCapacityUnits : ' 5'
286288 WriteCapacityUnits : ' 5'
287289
290+ EventsKinesisStream :
291+ Type : AWS::Kinesis::Stream
292+ Properties :
293+ Name : !Sub 'cdf-assetlibrary-history-stream-${Environment}'
294+ StreamModeDetails :
295+ StreamMode : ' on-demand'
296+
288297 EventsLambdaFunction :
289298 Type : AWS::Serverless::Function
290299 Properties :
@@ -301,6 +310,15 @@ Resources:
301310 APP_CONFIG : !Ref ApplicationConfigurationOverride
302311 AWS_DYNAMODB_TABLE_EVENTS : !Ref HistoryTable
303312 Tracing : Active
313+ Events :
314+ Stream :
315+ Type : Kinesis
316+ Properties :
317+ Stream : !GetAtt EventsKinesisStream.Arn
318+ BatchSize : 100 # What should this be?
319+ MaximumBatchingWindowInSeconds : 30
320+ StartingPosition : LATEST
321+ MaximumRetryAttempts : 1
304322
305323 RESTLambdaFunction :
306324 Type : AWS::Serverless::Function
Original file line number Diff line number Diff line change @@ -15,17 +15,18 @@ import 'reflect-metadata';
1515import { logger } from '@awssolutions/simple-cdf-logger' ;
1616import { container } from './di/inversify.config' ;
1717import { TYPES } from './di/types' ;
18- import { EventModel } from './events/events.models' ;
1918import { EventsService } from './events/events.service' ;
2019
2120const eventsService : EventsService = container . get ( TYPES . EventsService ) ;
2221
23- exports . iot_rule_handler = async ( event : EventModel , _context : unknown ) => {
22+ exports . iot_rule_handler = async ( event : any , _context : unknown ) => {
2423 logger . debug ( `events.service create: in: event: ${ JSON . stringify ( event ) } ` ) ;
2524
26- // TODO validation
27-
28- await eventsService . create ( event ) ;
29-
30- logger . debug ( 'events.service create: exit:' ) ;
25+ for ( const record of event . Records ) {
26+ // Kinesis record data is base64 encoded
27+ const payload = JSON . parse ( Buffer . from ( record . kinesis . data , 'base64' ) . toString ( 'ascii' ) ) ;
28+ // TODO validation
29+ await eventsService . create ( payload ) ;
30+ logger . debug ( 'events.service create: exit:' ) ;
31+ }
3132} ;
You can’t perform that action at this time.
0 commit comments