Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@awssolutions/cdf-assetlibrary-history",
"comment": "added Kinesis batching for assetlibraryhistory",
"type": "none"
}
],
"packageName": "@awssolutions/cdf-assetlibrary-history"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@awssolutions/cdf-device-monitoring",
"comment": "",
"type": "none"
}
],
"packageName": "@awssolutions/cdf-device-monitoring"
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,43 @@ Resources:
Properties:
TopicRulePayload:
Actions:
- Lambda:
FunctionArn: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${EventsLambdaFunction}'
- Kinesis:
PartitionKey: ${topic(4)}
RoleArn: !GetAtt KinesisExecutionRole.Arn
StreamName: !Ref EventsKinesisStream
Description: 'Saves AssetLibrary configuration changes'
AwsIotSqlVersion: '2016-03-23'
RuleDisabled: 'false'
RuleDisabled: false
Sql: !Sub "SELECT * FROM '${AssetLibraryEventsTopic}'"

LambdaInvocationPermission:
Type: AWS::Lambda::Permission
KinesisExecutionRole:
Type: AWS::IAM::Role
Properties:
SourceArn: !Sub 'arn:aws:iot:${AWS::Region}:${AWS::AccountId}:rule/${AssetLibraryEventsRule}'
Action: lambda:InvokeFunction
Principal: iot.amazonaws.com
FunctionName: !GetAtt EventsLambdaFunction.Arn
SourceAccount: !Ref AWS::AccountId
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- iot.amazonaws.com
Action: sts:AssumeRole
Path: '/cdf/assetlibraryhistory/'
ManagedPolicyArns:
- !Ref KinesisPolicy

KinesisPolicy:
Type: 'AWS::IAM::ManagedPolicy'
Properties:
Description: 'Role policy for IoT rule engine'
Path: '/cdf/assetlibraryhistory/'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Action:
- 'kinesis:PutRecord'
Effect: Allow
Resource:
- !GetAtt EventsKinesisStream.Arn

LambdaExecutionRole:
Type: AWS::IAM::Role
Expand Down Expand Up @@ -186,6 +208,21 @@ Resources:
Resource:
- !GetAtt HistoryTable.Arn
- !Sub '${HistoryTable.Arn}/index/type-time-index'
- PolicyName: 'Kinesis'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Action:
- 'kinesis:DescribeStream'
- 'kinesis:DescribeStreamSummary'
- 'kinesis:GetRecords'
- 'kinesis:GetShardIterator'
- 'kinesis:ListShards'
- 'kinesis:ListStreams'
- 'kinesis:SubscribeToShard'
Effect: Allow
Resource:
- !GetAtt EventsKinesisStream.Arn

RESTLambdaExecutionRole:
Type: AWS::IAM::Role
Expand Down Expand Up @@ -285,6 +322,13 @@ Resources:
ReadCapacityUnits: '5'
WriteCapacityUnits: '5'

EventsKinesisStream:
Type: AWS::Kinesis::Stream
Properties:
Name: !Sub 'cdf-assetlibrary-history-stream-${Environment}'
StreamModeDetails:
StreamMode: 'ON_DEMAND'

EventsLambdaFunction:
Type: AWS::Serverless::Function
Properties:
Expand All @@ -294,13 +338,22 @@ Resources:
MemorySize: 512
Role: !GetAtt LambdaExecutionRole.Arn
Runtime: nodejs18.x
Timeout: 30
Timeout: 120
Environment:
Variables:
APP_CONFIG_DIR: 'config'
APP_CONFIG: !Ref ApplicationConfigurationOverride
AWS_DYNAMODB_TABLE_EVENTS: !Ref HistoryTable
Tracing: Active
Events:
Stream:
Type: Kinesis
Properties:
Stream: !GetAtt EventsKinesisStream.Arn
BatchSize: 1000
MaximumBatchingWindowInSeconds: 60
StartingPosition: LATEST
MaximumRetryAttempts: 1

RESTLambdaFunction:
Type: AWS::Serverless::Function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@ export interface EventModel {
attributes: { [key: string]: string };
}

export interface KinesisRecord {
kinesis: {
kinesisSchemaVersion: string;
partitionKey: string;
sequenceNumber: string;
data: string;
approimateArrivalTimestamp: number;
};
eventSource: string;
eventVersion: string;
eventID: string;
eventName: string;
invokeIdentityArn: string;
awsRegion: string;
eventSourceARN: string;
}

export interface KinesisRecords {
Records: KinesisRecord[];
}

export interface StateHistoryModel {
objectId: string;
type: Category;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,24 @@
*********************************************************************************************************************/
import 'reflect-metadata';

import { container } from './di/inversify.config';
import { logger } from '@awssolutions/simple-cdf-logger';
import { container } from './di/inversify.config';
import { TYPES } from './di/types';
import { EventModel } from './events/events.models';
import { EventModel, KinesisRecords } from './events/events.models';
import { EventsService } from './events/events.service';

const eventsService: EventsService = container.get(TYPES.EventsService);

exports.iot_rule_handler = async (event: EventModel, _context: unknown) => {
exports.iot_rule_handler = async (event: KinesisRecords, _context: unknown) => {
logger.debug(`events.service create: in: event: ${JSON.stringify(event)}`);

// TODO validation

await eventsService.create(event);

logger.debug('events.service create: exit:');
for (const record of event.Records) {
// Kinesis record data is base64 encoded
const payload: EventModel = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('ascii')
);
// TODO validation
await eventsService.create(payload);
logger.debug('events.service create: exit:');
}
};
2 changes: 1 addition & 1 deletion source/packages/services/device-monitoring/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ A device monitoring module provides near real-time device status - connected and

## Introduction

The device monitoring module utilizes [AWS IoT Lifecycle Events](https://docs.aws.amazon.com/iot/latest/developerguide/life-cycle-events.html) feature. Whenever the device connects or disconnects, AWS IoT Core emits a smaple event that is then picked up by AWS Lambda. It parses the eventType and then updating the device attributes in Asset Library to connected as `true` or `false`
The device monitoring module utilizes [AWS IoT Lifecycle Events](https://docs.aws.amazon.com/iot/latest/developerguide/life-cycle-events.html) feature. Whenever the device connects or disconnects, AWS IoT Core emits a sample event that is then picked up by AWS Lambda. It parses the eventType and then updating the device attributes in Asset Library to connected as `true` or `false`

The following sample represents the schema that AWS IoT core emits when a device connects or disconnects

Expand Down