Skip to content
This repository has been archived by the owner on Jun 13, 2023. It is now read-only.

Commit

Permalink
fix(kinesis): add useful info for kinesis.putRecords trace (#460)
Browse files Browse the repository at this point in the history
* fix(kinesis): add useful info for kinesis.putRecords trace

* fix(kinesis): add total_record_count in kinesis trigger lambda

* fix(kinesis): put back the letter A in the comment

* fix(kinesis): failed record count is nullable

* fix(kinesis): fix syntax because nodejs8x

Co-authored-by: Daniele Frasca <[email protected]>
Co-authored-by: Ran Ribenzaft <[email protected]>
  • Loading branch information
3 people authored Mar 18, 2021
1 parent 6aa1922 commit 8291b78
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 15 deletions.
41 changes: 26 additions & 15 deletions src/events/aws_sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@ const tryRequire = require('../try_require');

const AWS = tryRequire('aws-sdk');

const AWS_TYPES = {
kinesis: {
putRecord: 'putRecord',
putRecords: 'putRecords',
},
};

const s3EventCreator = {
/**
* Updates an event with the appropriate fields from a S3 request
Expand Down Expand Up @@ -94,15 +87,26 @@ const kinesisEventCreator = {
*/
requestHandler(request, event) {
const parameters = request.params || {};
const { operation } = request;
const resource = event.getResource();

resource.setName(`${parameters.StreamName}`);
if (request.operation !== AWS_TYPES.kinesis.putRecords) {
resource.setName(`${parameters.StreamName}` || 'Kinesis');
switch (operation) {
case 'putRecord':
eventInterface.addToMetadata(event, {
partition_key: `${parameters.PartitionKey}`,
}, {
data: `${parameters.Data}`,
});
break;
case 'putRecords':
eventInterface.addToMetadata(event, {
total_record_count: `${parameters.Records.length}`,
}, {
data: JSON.stringify(parameters.Records),
});
break;
default:
break;
}
},

Expand All @@ -112,18 +116,25 @@ const kinesisEventCreator = {
* @param {proto.event_pb.Event} event The event to update the data on
*/
responseHandler(response, event) {
let errorMessages = '';
let errorMessagesCount = 0;
switch (response.request.operation) {
case AWS_TYPES.kinesis.putRecord:
case 'putRecord':
eventInterface.addToMetadata(event, {
shard_id: `${response.data.ShardId}`,
sequence_number: `${response.data.SequenceNumber}`,
});
break;
case AWS_TYPES.kinesis.putRecords:
case 'putRecords':
if (response.data.FailedRecordCount && response.data.FailedRecordCount > 0) {
errorMessages = JSON.stringify(response.data.Records
.map(item => item.ErrorMessage));
errorMessagesCount = response.data.FailedRecordCount;
}
eventInterface.addToMetadata(event, {
failed_record_count: `${response.data.FailedRecordCount}`,
shard_id: `${response.data.Records[0].ShardId}`,
sequence_number: `${response.data.Records[0].SequenceNumber}`,
total_record_count: `${response.data.Records.length}`,
failed_record_count: `${errorMessagesCount}`,
kinesis_error_messages: errorMessages,
});
break;
default:
Expand Down
1 change: 1 addition & 0 deletions src/triggers/aws_lambda.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ function createKinesisTrigger(event, trigger) {
invoke_identity: event.Records[0].invokeIdentityArn,
sequence_number: event.Records[0].kinesis.sequenceNumber,
partition_key: event.Records[0].kinesis.partitionKey,
total_record_count: event.Records.length,
});
}

Expand Down

0 comments on commit 8291b78

Please sign in to comment.