-
Notifications
You must be signed in to change notification settings - Fork 9
Setup Guide
Snowflake Loader consists of two independent applications:
- Snowplow Snowflake Transformer - Spark job responsible for transformning enriched events into Snowflake-compatible format
- Snowplow Snowflake Loader - CLI application responsible for loading Snowplow-compatible enriched events into Snowflake DB
Both applications communicate through DynamoDB table", "called processing manifest and maintaining pipeline state.
While Snowflake loader is not publicly available - assets called "strawberry-loader" and "strawberry-transformer" instead of "snowplow-snowflake-loader" and "snowplow-snowflake-transformer" respectively.
During setup we'll refer to following configuration properties.
-
$AWS_ACCESS_KEY_ID
- AWS Access Key. Used to access DynamoDB and S3 -
$AWS_SECRET_KEY
- AWS Secret Key. Used to access DynamoDB and S3 -
$AWS_REGION
- AWS Region used by Transformer to access S3, DynamoDB -
$DYNAMODB_MANIFEST
- AWS DynamoDB table name with processing manifest. Needs to be created manually -
$STAGE_NAME
- Arbitrary name for Snowflake Stage. Stage created automatically during setup step (e.g.snowplow_stage
) -
$TRANSFORMER_OUTPUT
- URL for Snowflake step. This is usually S3 output dir of Snowflake Transformer (e.g.s3://com-acme-snowplow/snowflake
) -
$SNOWFLAKE_REGION
- AWS Region used by Snowflake to access its endpoint. Recommended to use same as$AWS_REGION
-
$SNOWFLAKE_USER
- Snowflake username. Must be obtained from Snowflake -
$SNOWFLAKE_PASSWORD
- Snowflake password. Must be obtained from Snowflake -
$SNOWFLAKE_ACCOUNT
- Snowflake account name. Must be obtained from Snowflake -
$SNOWFLAKE_WAREHOUSE
- Arbitrary name for Snowflake Warehouse. Warehouse automatically created during setup step (e.g.snowplow_wh
) -
$SNOWFLAKE_DB
- Snowflake Database name. Database must be created manually
Transformer accepts some of above options (except all Snowflake-specific) and $TRANSFORMER_INPUT
, which is enriched archive.
Snowflake Loader provides quick setup action that automatically creates following entities:
-
atomic
database schema -
atomic.events
table to store enriched events - File format - entity to describe how Snowplow enriched data should be processed
-
External Stage in
atomic
schema - reference to S3 path; output of Snowplow Snowflake Transformer - Virtual Warehouse - computing entity of Snowflake; smallest (X-Small) be default
All above safely can have default settings. Warehouse can be scaled up manually.
Two things you need to create manually are Snowflake database and DynamoDB table. After database is created, you can run autosetup.
To do this you need to use setup
CLI action for Snowflake Loader:
$ aws s3 cp s3://snowplow-hosted-assets/4-storage/strawberry-loader/strawberry-loader-0.2.0.jar .
$ java -jar strawberry-loader-0.2.0.jar \
setup \
--aws-access-key-id "${aws_access_key_id}" \
--aws-secret-access-key "${aws_secret_access_key}" \
--aws-region ${aws_region} \
--manifest-table "${manifest_table}" \
--snowflake-region ${snowflake_region} \
--stage-name "${stage_name}" \
--stage-url "${stage_url}" \
--user "${user}" \
--password "${password}" \
--account "${account}" \
--warehouse "${warehouse}" \
--db "${db}"
To use DynamoDB table as processing manifest you need to create table with partition key RunId
with string type.
We refer to table name as $DYNAMODB_MANIFEST
.
Snowplow data in Snowflake is stored in single fat table called atomic.events
. Schema can be optionally configured via --schema
option.
Initial atomic.events
DDL for Snowflake can be found in atomic-def.sql.
Dataflow Runner used to run Snowplow Snowflake Transformer Spark job on EMR cluster. (It could also run loader, but then we'll have to expose Snowflake credentials).
EMR Cluster has default configuration. Only ec2.keyName
and logUri
must be changed.
Everything else is optional. Edit and save below as cluster.json
:
{
"schema":"iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
"data":{
"name":"dataflow-runner - snowflake transformer",
"logUri":"s3://snowplow-snowflake-test/logs/",
"region":"us-east-1",
"credentials":{
"accessKeyId":"env",
"secretAccessKey":"env"
},
"roles":{
"jobflow":"EMR_EC2_DefaultRole",
"service":"EMR_DefaultRole"
},
"ec2":{
"amiVersion":"5.5.0",
"keyName":"key-name",
"location":{
"vpc":{
"subnetId":null
}
},
"instances":{
"master":{
"type":"m2.xlarge"
},
"core":{
"type":"m2.xlarge",
"count":1
},
"task":{
"type":"m1.medium",
"count":0,
"bid":"0.015"
}
}
},
"tags":[ ],
"bootstrapActionConfigs":[ ],
"configurations":[
{
"classification":"core-site",
"properties":{
"Io.file.buffer.size":"65536"
}
},
{
"classification":"mapred-site",
"properties":{
"Mapreduce.user.classpath.first":"true"
}
},
{
"classification":"yarn-site",
"properties":{
"yarn.resourcemanager.am.max-attempts":"1"
}
},
{
"classification":"spark",
"properties":{
"maximizeResourceAllocation":"true"
}
}
],
"applications":[ "Hadoop", "Spark" ]
}
}
Edit and save below as playbook.json
:
{
"schema":"iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
"data":{
"region":"{{.awsRegion}}",
"credentials":{
"accessKeyId":"env",
"secretAccessKey":"env"
},
"steps":[
{
"type":"CUSTOM_JAR",
"name":"Snowflake Transformer",
"actionOnFailure":"CANCEL_AND_WAIT",
"jar":"command-runner.jar",
"arguments":[
"spark-submit",
"--deploy-mode",
"cluster",
"--class",
"com.snowplowanalytics.snowflake.transformer.Main",
"s3://snowplow-hosted-assets/4-storage/strawberry-transformer/strawberry-transformer-0.2.0.jar",
"--input",
"{{.transformerInput}}",
"--output",
"{{.transformerOutput}}",
"--aws-access-key-id",
"{{.awsAccessKeyId}}",
"--aws-region",
"{{.awsRegion}}",
"--aws-secret-access-key",
"{{.awsSecretAccessKey}}",
"--manifest-table",
"{{.snowflakeManifest}}"
]
}
],
"tags":[ ]
}
}
To run above configuration you can use following command:
$ dataflow-runner run-transient --emr-config cluster.json --emr-playbook playbook.json --vars transformerInput,$TRANSFORMER_INPUT,transformerOutput,$TRANSFORMER_OUTPUT,awsAccessKeyId,$AWS_ACCESS_KEY_ID,awsSecretAccessKey,$AWS_SECRET_KEY,snowflakeManifest,$DYNAMODB_MANIFEST,awsRegion,$AWS_REGION
This will start Transformer on EMR cluster, which will process enriched data and populate DynamoDB manifest.
Next step is to run loader, which will load all pre-processed data:
$ java -jar strawberry-loader-0.2.0.jar \
load \
--aws-access-key-id $AWS_ACCESS_KEY_ID \
--aws-secret-access-key $AWS_SECRET_KEY \
--aws-region $AWS_REGION \
--manifest-table $DYNAMODB_MANIFEST \
--stage-name $STAGE_NAME \
--user $SNOWFLAKE_USER \
--password $SNOWFLAKE_PASSWORD \
--account $SNOWFLAKE_ACCOUNT \
--warehouse $SNOWFLAKE_WAREHOUSE \
--db $SNOWFLAKE_DB
In order to pre-populate manifest with run ids that have to be never loaded you can use backfill.py
script.
Script requires to have Python 3, Snowplow Python Analytics SDK 0.2.3+ and boto3:
$ pip install boto3 snowplow_analytics_sdk
$ wget https://raw.githubusercontent.com/snowplow-product/snowplow-snowflake-loader/release/0.2.0/backfill.py # Won't actually be downloaded as repository is private
Script accepts 6 required arguments. Notice startdate
, this is the date since which (inclusive) transformer should process run ids:
$ ./backfill.py \
--startdate 2017-08-22-01-01-01 \
--region $AWS_REGION \
--manifest-table-name $DYNAMODB_MANIFEST \
--enriched-archive $TRANSFORMER_INPUT \
--aws-access-key-id=$AWS_ACCESS_KEY_ID \
--aws-secret-access-key=$AWS_SECRET_KEY
- To update Snowflake infrastructure another
setup
must be launched - it'll create a new file formatsnowplow_enriched_json
- Both
setup
andload
subcommands now accept required--snowflake-region
option