Skip to content

Commit

Permalink
Merge pull request #7 from snowplow-incubator/lake-loader-local
Browse files Browse the repository at this point in the history
Lake loader local
  • Loading branch information
miike authored Dec 9, 2024
2 parents ebed39b + 64575f8 commit 23324f3
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 5 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ If you would like to (optionally) run the Snowflake streaming loader as well you
1. Configure your Snowflake private key and warehouse details in your `.env` file. You will need a [private key](https://docs.snowflake.com/en/user-guide/key-pair-auth) set up rather than a username / password as this is what the app uses for authentication.
2. Launch docker compose with the warehouse you would like:
* For Snowflake streaming loader use: `docker-compose --profile snowflake-loader up` which will launch the normal components + the Snowflake Kinesis loader.
* For the Lake loader use `--profile lake-loader`.
* For the Lake loader use `--profile lake-loader` (you can use Lake Loader to load to a remote blob storage service, e.g., S3 or locally using Localstack).
* For the BigQuery loader use `--profile bigquery-loader`.
3. Send some events!

Expand Down Expand Up @@ -142,6 +142,12 @@ e.g., for BASH use
export SERVICE_ACCOUNT_CREDENTIALS=$(cat /path/to/your/service-account-key.json)
```

### Lake loader

Lake loader can use a remote object store (e.g., AWS S3, GCS, Blob Storage) etc but will work equally well writing to Localstack S3. An example configuration of this can be found in `loaders/lake_loader_config_iceberg_s3.hocon`.

If you wish to load to a different (local) bucket ensure that the resource is created in `init-aws.sh` before attempting to run the loader. Once loading has been setup you can view the data that lake loader writes out in your browser using: `https://snowplow-lake-loader.s3.localhost.localstack.cloud:4566/` or the equivalent name for your bucket.

## Incomplete events

Currently incomplete events load into the same table as successful events. This is deliberate - but can be overwritten by specifying a different table in the `incomplete` loader HOCON configuration file.
Expand Down
11 changes: 7 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ services:

stream-collector:
container_name: "snowplow-stream-collector"
image: snowplow/scala-stream-collector-kinesis:3.2.0-distroless
image: snowplow/scala-stream-collector-kinesis:3.2.1-distroless
command: ["--config", "/snowplow/collector/config.hocon"]
depends_on:
- localstack
Expand Down Expand Up @@ -221,19 +221,22 @@ services:
- enrich
- iglu-server
volumes:
- "./loaders/lake_loader_config.hocon:/loaders/lake_loader_config.hocon"
- "./loaders/lake_loader_config_iceberg_s3.hocon:/loaders/lake_loader_config.hocon"
- "./iglu-client:/snowplow/iglu-client"
profiles: [lake-loader]
environment:
- "ACCEPT_LICENSE=${ACCEPT_LICENSE}"
- "AWS_ENDPOINT_URL=http://localhost.localstack.cloud:4566"
- "AWS_ENDPOINT_URL_S3=http://s3.localhost.localstack.cloud:4566"
- "AWS_ACCOUNT_ID=000000000000"
- "AWS_ACCESS_KEY_ID=localstack"
- "AWS_SECRET_ACCESS_KEY=doesntmatter"
- "AWS_REGION=ap-southeast-2"
- "AWS_ENDPOINT_S3=http://localhost.localstack.cloud:4566"
- "AWS_ENDPOINT_URL=http://localhost.localstack.cloud:4566"

extra_hosts:
- "localhost.localstack.cloud:host-gateway"
- "s3.localhost.localstack.cloud:host-gateway"
- "snowplow-lake-loader.s3.localhost.localstack.cloud:host-gateway"

bigquery-loader:
container_name: bigquery-loader
Expand Down
53 changes: 53 additions & 0 deletions loaders/lake_loader_config_iceberg_s3.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"license": {
"accept": ${ACCEPT_LICENSE}
},
"input": {
"streamName": "enriched-good"
"appName": "lake-loader-iceberg"
"customEndpoint": "http://localhost.localstack.cloud:4566"
"initialPosition": {
"type": "LATEST"
}
"retrievalMode": {
"type": "FanOut"
}
"workerIdentifier": ${HOSTNAME}
"leaseDuration": "10 seconds"
}

"output": {

"good": {
"type": "Iceberg",
"database": "atomic"
"table": "events"
"location": "s3a://snowplow-lake-loader/events" # this can also be S3!
"catalog": {
"type": "Hadoop"
}
}

"bad": {
"streamName": "bad"
}
}

"spark": {

# -- How many times a Spark task should be retried in case of failure.
"taskRetries": 3
# -- Any valid spark configuration key/value.
# -- This can be blank in most setups because the loader already sets sensible defaults.
# https://github.com/apache/iceberg/commit/fa6403b1888847ce04de50c6b159dd43fdcb4590
"conf": {
"fs.s3a.endpoint": "http://s3.localhost.localstack.cloud:4566"
}
}

"windowing": "15 seconds" # deliberately short period for frequent writes
"inMemBatchBytes": 1000
"telemetry": {
"disable": true
}
}

0 comments on commit 23324f3

Please sign in to comment.