Skip to content

Commit

Permalink
Merge pull request #8 from tomsmaddox/master
Browse files Browse the repository at this point in the history
Configurable stream sources
  • Loading branch information
IanMeyers committed Mar 21, 2016
2 parents a929426 + e576645 commit cb7c93e
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 45 deletions.
29 changes: 22 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,40 @@ In order to effectively use this function, you should already have configured an

# Configuration

This AWS Lambda function uses either Tag information from Amazon Kinesis Stream, or a convention to determine which Delivery Stream to forward to. If Amazon Kinesis Streams are the source, the Delivery Stream can have any name, and to Tags are used to specify the Delivery Stream target. To Tag the Stream for Amazon Kinesis Firehose Delivery simply run the ```tagKinesisStream.sh``` script:
This Lambda functions can map stream sources to Kinesis Firehose Delivery Streams in a few different ways (listed in order of preference):
* Manually specified configuration (see [index.js:63](index.js#L59))
* A DynamoDB stream naming convention to determine which Delivery Stream to forward to
* An Kinesis Stream Tagging convention
* (Optionally) A default delivery stream.

## Using the Default Delivery Stream
In order to make sure that data will always be accepted by a Kinesis Firehose Delivery Stream this Lambda function can fail back to a default Delivery Stream if no manual configuration or other lookup has results.

This can be particularly helpful when developing and testing the integration of new data sources. In such cases you could have use the Default Delivery Stream to forward data to an S3 bucket with a one day retention period as specified in an [S3 Lifecycle Policy](http://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html).

The Default Delivery Stream is enabled by default in the Lambda function, however to use it there should be a Kinesis Firehose with a matching name. You can use the [createDefaultDeliveryStream.sh](createDefaultDeliveryStream.sh) script to orchestrate its creation.

*Note: We recommend the usage of default delivery streams only for non-production workloads. They can be disabled by setting ```USE_DEFAULT_DELIVERY_STREAMS = false``` (see [index.js:61](index.js#L61))*

## Specifying a Delivery Stream for a Kinesis Stream Source
If Amazon Kinesis Streams are the source, the Delivery Stream can be specified in configuration or tags can be used to specify the Delivery Stream target. To Tag the Stream for Amazon Kinesis Firehose Delivery simply run the ```tagKinesisStream.sh``` script:

```
tagStream.sh <My Kinesis Stream> <My Firehose Delivery Stream> <region>
where
<My Kinesis Stream> - The Amazon Kinesis Stream for which an event source has been created to the Forwarder Lambda function
<My Firehose Delivery Stream> - The Amazon Kinesis Firehose Delivery Stream which you've configured to deliver to the required destination
<region> - The region in which the Kinesis Stream & Firehose Delivery Stream have been created. Today only single region operation is permitted
```

This will add a new Stream Tag named ```ForwardToFirehoseStream``` on the Kinesis Stream with the value you supply. This is limited to delivery in the same region as the Kinesis Stream or DynamoDB table. You can run the script any time to update this value. To view the Tags configured on the Stream, simply run ```aws kinesis list-tags-for-stream --stream-name <My Kinesis Stream> --region <region>```

If you are using Amazon DynamoDB, then *the Firehose Delivery Stream must be the same name as the Amazon DynamoDB Table*.

Only single region deployments are supported today.
## Specifying a Delivery Stream for a DynamoDB Stream Source
If you are using Amazon DynamoDB, then manual configuration can be used or the Firehose Delivery Stream should have the same name as the Amazon DynamoDB Table.

# Deploying

To use this function, simply deploy the [LambdaStreamToFirehose-1.2.0.zip](https://github.com/awslabs/lambda-streams-to-firehose/blob/master/dist/LambdaStreamToFirehose-1.2.0.zip) to AWS Lambda with handler `index.handler`. You must ensure that it is deployed with an invocation role that includes the ability to write Amazon CloudWatch Logs, Read from Amazon Kinesis or Amazon DynamoDB Streams, and Write to Amazon Kinesis Firehose:
To use this function, simply deploy the [LambdaStreamToFirehose-1.3.0.zip](https://github.com/awslabs/lambda-streams-to-firehose/blob/master/dist/LambdaStreamToFirehose-1.3.0.zip) to AWS Lambda with handler `index.handler`. You must ensure that it is deployed with an invocation role that includes the ability to write Amazon CloudWatch Logs, Read from Amazon Kinesis or Amazon DynamoDB Streams, and Write to Amazon Kinesis Firehose:

```
{
Expand Down Expand Up @@ -92,7 +107,7 @@ To use this function, simply deploy the [LambdaStreamToFirehose-1.2.0.zip](https
}
```

You may choose to restrict the IAM role to be specific to a subset of Kinesis or DynamoDB Update Streams and Firehose endpoints.
You may choose to restrict the IAM role to be specific to a subset of Kinesis or DynamoDB Update Streams and Firehose endpoints.

Finally, create an Event Source (http://docs.aws.amazon.com/lambda/latest/dg/intro-core-components.html) for this function from the Stream to be forwarded to Firehose.

Expand Down
2 changes: 2 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ functionName=LambdaStreamToFirehose
filename=$functionName-$version.zip
region=eu-west-1

npm install

rm $filename 2>&1 >> /dev/null

zip -r $filename index.js package.json node_modules/ README.md LICENSE && mv -f $filename dist/$filename
Expand Down
117 changes: 117 additions & 0 deletions createDefaultDeliveryStream.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/bin/bash

# Kinesis Streams to Firehose
#
# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

region=eu-west-1

s3BucketNamePrefix="LambdaStreamsDefaultDeliveryBucket"
iamRoleName="LambdaStreamsDefaultDeliveryRole"
iamPolicyName="LambdaStreamsDefaultDeliveryPolicy"
deliveryStreamName="LambdaStreamsDefaultDeliveryStream"

which aws > /dev/null 2>&1

if [ $? != 0 ]; then
echo "This utility requires the AWS Cli, which can be installed using instructions found at http://docs.aws.amazon.com/cli/latest/userguide/installing.html"
exit -2
fi

randomChars=$(openssl rand -base64 8 | tr -dc 'a-cA-Z0-9')
suggestedBucketName="$s3BucketNamePrefix-$randomChars"
read -p "Please enter default stream destination bucket name [$suggestedBucketName]: " bucketName
bucketName=${bucketName:-$suggestedBucketName}

echo -n "Creating S3 Bucket: $bucketName.. "
aws s3 mb --region $region s3://$bucketName --output text |
if [ $? -ne 0 ]; then
exit 1
fi
echo "OK"

echo -n "Creating IAM Role: $iamRoleName.. "
roleArn=$(aws iam create-role --query "Role.Arn" --output text \
--role-name $iamRoleName \
--assume-role-policy-document "{
\"Version\": \"2012-10-17\",
\"Statement\": [
{
\"Sid\": \"PermitFirehoseAccess\",
\"Effect\": \"Allow\",
\"Principal\": {
\"Service\": \"firehose.amazonaws.com\"
},
\"Action\": \"sts:AssumeRole\"
}
]
}")
if [ $? -ne 0 ]; then
aws s3api delete-bucket --region $region --bucket $bucketName
exit 1
fi
echo "OK"

echo -n "Creating IAM Policy: $iamPolicyName.. "
aws iam put-role-policy \
--role-name $iamRoleName \
--policy-name $iamPolicyName \
--policy-document "{
\"Version\": \"2012-10-17\",
\"Statement\": [
{
\"Sid\": \"PermitFirehoseUsage\",
\"Effect\": \"Allow\",
\"Action\": [
\"s3:AbortMultipartUpload\",
\"s3:GetBucketLocation\",
\"s3:GetObject\",
\"s3:ListBucket\",
\"s3:ListBucketMultipartUploads\",
\"s3:PutObject\"
],
\"Resource\": [
\"arn:aws:s3:::$bucketName\",
\"arn:aws:s3:::$bucketName/*\"
]
}
]
}"
if [ $? -ne 0 ]; then
aws iam delete-role --role-name $iamRoleName
aws s3api delete-bucket --region $region --bucket $bucketName
exit 1
fi
echo "OK"


echo "Waiting..."
sleep 30

echo -n "Creating Kinesis Firehose Delivery Stream: $deliveryStreamName with role arn $roleArn and bucket $bucketName.. "
deliveryStreamArn=$(aws firehose create-delivery-stream --region $region --query "DeliveryStreamARN" --output text \
--delivery-stream-name $deliveryStreamName \
--s3-destination-configuration "RoleARN=$roleArn,BucketARN=arn:aws:s3:::$bucketName")
if [ $? -ne 0 ]; then
aws iam delete-role-policy \
--role-name $iamRoleName \
--policy-name $iamPolicyName
aws iam delete-role --role-name $iamRoleName
aws s3api delete-bucket --region $region --bucket $bucketName
exit 1
fi
echo "OK"

echo "Delivery Stream ARN: $deliveryStreamArn"
Binary file added dist/LambdaStreamToFirehose-1.1.1.zip
Binary file not shown.
Binary file added dist/LambdaStreamToFirehose-1.3.0.zip
Binary file not shown.
135 changes: 99 additions & 36 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,23 @@ var FIREHOSE_MAX_BATCH_BYTES = 4 * 1024 * 1024;
// should KPL checksums be calculated?
var computeChecksums = true;

var deliveryStreamMapping = {};
/*
* If the source Kinesis Stream's tags or DynamoDB Stream Name don't resolve to
* an existing Firehose, allow usage of a default delivery stream, or fail with
* an error.
*/
var USE_DEFAULT_DELIVERY_STREAMS = true;
/*
* Delivery stream mappings can be specified here to overwrite values provided
* by Kinesis Stream tags or DynamoDB stream name. (Helpful for debugging)
* Format:
* DDBStreamName: deliveryStreamName
* Or:
* FORWARD_TO_FIREHOSE_STREAM tag value: deliveryStreamName
*/
var deliveryStreamMapping = {
'DEFAULT': 'LambdaStreamsDefaultDeliveryStream'
};

var start;

Expand Down Expand Up @@ -227,6 +243,9 @@ exports.handler = function(event, context) {
* record.
*/
exports.processTransformedRecords = function(transformed, streamName, deliveryStreamName) {
if (debug) {
console.log('Processing transformed records');
}
// get the set of batch offsets based on the transformed record sizes
var batches = exports.getBatchRanges(transformed);

Expand Down Expand Up @@ -270,6 +289,9 @@ exports.handler = function(event, context) {
* stream
*/
exports.writeToFirehose = function(firehoseBatch, streamName, deliveryStreamName, callback) {
if (debug) {
console.log('Writing to firehose');
}
// write the batch to firehose with putRecordBatch
var putRecordBatchParams = {
DeliveryStreamName : deliveryStreamName,
Expand All @@ -296,7 +318,10 @@ exports.handler = function(event, context) {
* requests to forward to Firehose
*/
exports.processEvent = function(event, serviceName, streamName) {
// look up the delivery stream name in the mapping cache
if (debug) {
console.log('Processing event');
}
// look up the delivery stream name of the mapping cache
var deliveryStreamName = deliveryStreamMapping[streamName];

if (debug) {
Expand Down Expand Up @@ -378,11 +403,19 @@ exports.handler = function(event, context) {
* specified Kinesis Stream Name, using Tags
*/
exports.buildDeliveryMap = function(streamName, serviceName, event, callback) {
if (serviceName === DDB_SERVICE_NAME) {
if (debug) {
console.log('Building delivery stream mapping');
}
if (deliveryStreamMapping[streamName]) {
// A delivery stream has already been specified in configuration
// This could be indicative of debug usage.
USE_DEFAULT_DELIVERY_STREAMS = false;
exports.verifyDeliveryStreamMapping(streamName, event, callback);
} else if (serviceName === DDB_SERVICE_NAME) {
// dynamodb streams need the firehose delivery stream to match
// the table name
deliveryStreamMapping[streamName] = streamName;
callback();
exports.verifyDeliveryStreamMapping(streamName, event, callback);
} else {
// get the delivery stream name from Kinesis tag
exports.kinesis.listTagsForStream({
Expand All @@ -395,45 +428,73 @@ exports.handler = function(event, context) {
// name item
data.Tags.map(function(item) {
if (item.Key === FORWARD_TO_FIREHOSE_STREAM) {
/* Disable fallback to a default delivery stream as
* a FORWARD_TO_FIREHOSE_STREAM has been specifically
* set.
*/
USE_DEFAULT_DELIVERY_STREAMS = false;
deliveryStreamMapping[streamName] = item.Value;
}
});

if (!deliveryStreamMapping[streamName]) {
// fail as the stream isn't tagged for delivery, but
// since there is an event source configured we think
// this
// should have been done and is probably a
// misconfiguration
finish(event, ERROR, "Warning: Kinesis Stream " + streamName + " not tagged for Firehose delivery with Tag name " + FORWARD_TO_FIREHOSE_STREAM);
} else {
// validate the delivery stream name provided
var params = {
DeliveryStreamName : deliveryStreamMapping[streamName]
};
exports.firehose.describeDeliveryStream(params, function(err, data) {
if (err) {
// do not continue with the cached mapping
var deliveryStream = deliveryStreamMapping[streamName];
delete deliveryStreamMapping[streamName];

finish(event, ERROR, "Delivery Stream " + deliveryStream + " does not exist in region " + setRegion);
} else {
// call the specified callback - should have
// already been prepared by the calling function
callback();
}
});
}
exports.verifyDeliveryStreamMapping(streamName, event, callback);
}
});
}

};

/*
* if (debug) { console.log(JSON.stringify(event)); }
*/
exports.verifyDeliveryStreamMapping = function(streamName, event, callback) {
if (debug) {
console.log('Verifying delivery stream');
}
if (!deliveryStreamMapping[streamName]) {
if (USE_DEFAULT_DELIVERY_STREAMS) {
/* No delivery stream has been specified, probably as it's not
* configured in stream tags. Using default delivery stream.
* To prevent accidental forwarding of streams to a firehose set
* USE_DEFAULT_DELIVERY_STREAMS = false.
*/
deliveryStreamMapping[streamName] = deliveryStreamMapping['DEFAULT'];
} else {
/*
* Fail as no delivery stream mapping has been specified and we
* have not configured to use a default.
* Kinesis Streams should be tagged with
* ForwardToFirehoseStream = <DeliveryStreamName>
*/
finish(event, ERROR, "Warning: Kinesis Stream " + streamName + " not tagged for Firehose delivery with Tag name " + FORWARD_TO_FIREHOSE_STREAM);
return;
}
}
// validate the delivery stream name provided
var params = {
DeliveryStreamName : deliveryStreamMapping[streamName]
};
exports.firehose.describeDeliveryStream(params, function(err, data) {
if (err) {
// do not continue with the cached mapping
delete deliveryStreamMapping[streamName];

if (!USE_DEFAULT_DELIVERY_STREAMS || deliveryStreamMapping[streamName] == deliveryStreamMapping['DEFAULT']) {
finish(event, ERROR, "Could not find suitable delivery stream for " + streamName + " and the " +
"default delivery stream (" + deliveryStreamMapping['DEFAULT'] + ") either doesn't exist or is disabled.");
} else {
deliveryStreamMapping[streamName] = deliveryStreamMapping['DEFAULT'];
exports.verifyDeliveryStreamMapping(streamName, event, callback);
}
} else {
// call the specified callback - should have
// already
// been prepared by the calling function
callback();
}
});
}

/** End Runtime Functions */
if (debug) {
console.log(JSON.stringify(event));
}

// fail the function if the wrong event source type is being sent, or if
// there is no data, etc
Expand Down Expand Up @@ -467,8 +528,10 @@ exports.handler = function(event, context) {
// parse the stream name out of the event
var streamName = exports.getStreamName(event.Records[0].eventSourceARN);

if (!streamName) {
finish(event, ERROR, "Malformed Kinesis Stream ARN");
if (!deliveryStreamMapping[streamName]) {
// no delivery stream cached so far, so add this stream's tag value
// to the delivery map, and continue with processEvent
exports.buildDeliveryMap(streamName, serviceName, event, exports.processEvent.bind(undefined, event, serviceName, streamName));
} else {

if (deliveryStreamMapping.length === 0 || !deliveryStreamMapping[streamName]) {
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "lambda-stream-to-firehose",
"description": "An AWS Lambda function that forwards data from a Kinesis or DynamoDB Update Stream to a Kinesis Firehose Delivery Stream",
"version": "1.2.0",
"version": "1.3.0",
"dependencies": {
"async":"1.5.2",
"aws-kpl-deagg":"2.1.1"
Expand All @@ -28,4 +28,4 @@
"LICENSE",
"NOTICE.txt"
]
}
}

0 comments on commit cb7c93e

Please sign in to comment.