Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a6ac6d6
Fix bad rows resizing (#68)
istreeter Jul 12, 2024
8073c8a
Extend health probe to report unhealthy on more error scenarios (#69)
istreeter Jul 12, 2024
ba90853
Allow disregarding Iglu field's nullability when creating output colu…
istreeter Jul 15, 2024
fbce127
Bump hudi to 0.15.0 (#70)
istreeter Jul 18, 2024
056d9a8
Bump aws-hudi to 1.0.0-beta2 (#71)
istreeter Jul 22, 2024
30e9e2a
Implement alerting and retrying mechanisms
spenes Jul 22, 2024
c2c088a
Add alert & retry for delta/s3 initialization (#74)
oguzhanunlu Aug 1, 2024
cd920c4
Hudi loader should fail early if missing permissions on Glue catalog …
istreeter Aug 2, 2024
8feb154
Make alert messages more human-readable (#75)
istreeter Aug 5, 2024
2fe599a
Iceberg fail fast if missing permissions on the catalog (#76)
istreeter Aug 7, 2024
b67cb86
Create table concurrently with subscribing to stream of events (#77)
istreeter Aug 8, 2024
89b0647
common-streams 0.8.x with refactored health monitoring (#78)
istreeter Aug 30, 2024
e186cf5
Add option to exit on missing Iglu schemas (#79)
istreeter Sep 5, 2024
972e471
Avoid error on duplicate view name (#80)
istreeter Sep 9, 2024
ebb087e
Upgrade common-streams 0.8.0-M4 (#81)
istreeter Sep 20, 2024
cd32b9b
Delete files asynchronously (#82)
istreeter Sep 20, 2024
dc7e9a7
Upgrade common-streams to 0.8.0-M5 (#83)
istreeter Oct 6, 2024
6fefeef
PDP-1324 - Handle initialization errors for lake loader Delta Azure
benjben Oct 17, 2024
159afd6
PDP-1323 Handle initialization errors for GCP
benjben Oct 24, 2024
e39d411
DynamoDB runtime dependencies for Delta S3 multi-writer
istreeter Oct 22, 2024
3952d13
Upgrade dependencies (#92)
oguzhanunlu Oct 31, 2024
c598236
Disable asynchronous deletes for Hudi (#94)
istreeter Oct 31, 2024
0b2d505
Prepare for 0.5.0 release
oguzhanunlu Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,28 @@
Version 0.5.0 (2024-11-01)
--------------------------
Disable asynchronous deletes for Hudi (#94)
Upgrade dependencies (#92)
DynamoDB runtime dependencies for Delta S3 multi-writer
PDP-1323 Handle initialization errors for GCP
PDP-1324 - Handle initialization errors for lake loader Delta Azure
Upgrade common-streams to 0.8.0-M5 (#83)
Delete files asynchronously (#82)
Upgrade common-streams 0.8.0-M4 (#81)
Avoid error on duplicate view name (#80)
Add option to exit on missing Iglu schemas (#79)
common-streams 0.8.x with refactored health monitoring (#78)
Create table concurrently with subscribing to stream of events (#77)
Iceberg fail fast if missing permissions on the catalog (#76)
Make alert messages more human-readable (#75)
Hudi loader should fail early if missing permissions on Glue catalog (#72)
Add alert & retry for delta/s3 initialization (#74)
Implement alerting and retrying mechanisms
Bump aws-hudi to 1.0.0-beta2 (#71)
Bump hudi to 0.15.0 (#70)
Allow disregarding Iglu field's nullability when creating output columns (#66)
Extend health probe to report unhealthy on more error scenarios (#69)
Fix bad rows resizing (#68)

Version 0.4.1 (2024-06-04)
--------------------------
Fix missing s3 dependency for Delta
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

This project contains applications required to load Snowplow data into Open Table Formats.

Lake Loader 0.4.1 supports [Delta](https://docs.delta.io/latest/index.html), [Iceberg](https://iceberg.apache.org/docs/latest/) and [Hudi](https://hudi.apache.org/docs/overview/) as output formats.
Lake Loader 0.5.0 supports [Delta](https://docs.delta.io/latest/index.html), [Iceberg](https://iceberg.apache.org/docs/latest/) and [Hudi](https://hudi.apache.org/docs/overview/) as output formats.

Check out [the example config files](./config) for how to configure your lake loader.

Expand All @@ -22,7 +22,7 @@ Basic usage:
docker run \
-v /path/to/config.hocon:/var/config.hocon \
-v /path/to/iglu.json:/var/iglu.json \
snowplow/lake-loader-azure:0.4.1 \
snowplow/lake-loader-azure:0.5.0 \
--config /var/config.hocon \
--iglu-config /var/iglu.json
```
Expand All @@ -35,7 +35,7 @@ The GCP lake loader reads the stream of enriched events from Pubsub and writes t
docker run \
-v /path/to/config.hocon:/var/config.hocon \
-v /path/to/iglu.json:/var/iglu.json \
snowplow/lake-loader-gcp:0.4.1 \
snowplow/lake-loader-gcp:0.5.0 \
--config /var/config.hocon \
--iglu-config /var/iglu.json
```
Expand All @@ -48,7 +48,7 @@ The AWS lake loader reads the stream of enriched events from Kinesis and writes
docker run \
-v /path/to/config.hocon:/var/config.hocon \
-v /path/to/iglu.json:/var/iglu.json \
snowplow/lake-loader-aws:0.4.1 \
snowplow/lake-loader-aws:0.5.0 \
--config /var/config.hocon \
--iglu-config /var/iglu.json
```
Expand Down Expand Up @@ -79,7 +79,7 @@ Licensed under the [Snowplow Limited Use License Agreement][license]. _(If you a
[build-image]: https://github.com/snowplow-incubator/snowplow-lake-loader/workflows/CI/badge.svg
[build]: https://github.com/snowplow-incubator/snowplow-lake-loader/actions/workflows/ci.yml

[release-image]: https://img.shields.io/badge/release-0.4.1-blue.svg?style=flat
[release-image]: https://img.shields.io/badge/release-0.5.0-blue.svg?style=flat
[releases]: https://github.com/snowplow-incubator/snowplow-lake-loader/releases

[license]: https://docs.snowplow.io/limited-use-license-1.0
Expand Down
25 changes: 15 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,25 @@ lazy val core: Project = project
lazy val azure: Project = project
.in(file("modules/azure"))
.settings(BuildSettings.azureSettings)
.settings(libraryDependencies ++= Dependencies.azureDependencies ++ Dependencies.spark35RuntimeDependencies)
.dependsOn(core)
.settings(libraryDependencies ++= Dependencies.azureDependencies ++ Dependencies.icebergDeltaRuntimeDependencies)
.dependsOn(core, deltaIceberg)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin)

lazy val gcp: Project = project
.in(file("modules/gcp"))
.settings(BuildSettings.gcpSettings)
.settings(libraryDependencies ++= Dependencies.gcpDependencies ++ Dependencies.spark35RuntimeDependencies)
.dependsOn(core)
.settings(libraryDependencies ++= Dependencies.gcpDependencies ++ Dependencies.icebergDeltaRuntimeDependencies)
.dependsOn(core, deltaIceberg)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin)

lazy val aws: Project = project
.in(file("modules/aws"))
.settings(BuildSettings.awsSettings)
.settings(libraryDependencies ++= Dependencies.awsDependencies ++ Dependencies.spark35RuntimeDependencies)
.dependsOn(core)
.settings(libraryDependencies ++= Dependencies.awsDependencies ++ Dependencies.icebergDeltaRuntimeDependencies)
.dependsOn(core, deltaIceberg)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin)

/** Packaging: Extra runtime dependencies for alternative assets * */
/** Packaging: Extra runtime dependencies for alternative assets */

lazy val hudi: Project = project
.in(file("packaging/hudi"))
Expand All @@ -64,6 +64,11 @@ lazy val biglake: Project = project
.settings(BuildSettings.commonSettings ++ BuildSettings.biglakeSettings)
.settings(libraryDependencies ++= Dependencies.biglakeDependencies)

lazy val deltaIceberg: Project = project
.in(file("packaging/delta-iceberg"))
.settings(BuildSettings.commonSettings)
.settings(libraryDependencies ++= Dependencies.icebergDeltaRuntimeDependencies)

/**
* Packaging: Alternative assets
*
Expand All @@ -79,7 +84,7 @@ lazy val awsHudi: Project = project
.settings(libraryDependencies ++= Dependencies.awsDependencies ++ Dependencies.hudiAwsDependencies)
.dependsOn(core)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin)
.dependsOn(hudi % "runtime->runtime")
.dependsOn(hudi % "runtime->runtime;compile->compile")

lazy val gcpHudi: Project = project
.in(file("modules/gcp"))
Expand All @@ -89,7 +94,7 @@ lazy val gcpHudi: Project = project
.settings(libraryDependencies ++= Dependencies.gcpDependencies)
.dependsOn(core)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin)
.dependsOn(hudi % "runtime->runtime")
.dependsOn(hudi % "runtime->runtime;compile->compile")

lazy val azureHudi: Project = project
.in(file("modules/azure"))
Expand All @@ -99,7 +104,7 @@ lazy val azureHudi: Project = project
.settings(libraryDependencies ++= Dependencies.azureDependencies)
.dependsOn(core)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin)
.dependsOn(hudi % "runtime->runtime")
.dependsOn(hudi % "runtime->runtime;compile->compile")

lazy val gcpBiglake: Project = gcp
.withId("gcpBiglake")
Expand Down
53 changes: 49 additions & 4 deletions config/config.aws.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@
"maxRecords": 1000
}

# -- The number of batches of events which are pre-fetched from kinesis.
# -- Increasing this above 1 is not known to improve performance.
"bufferSize": 3

# -- Name of this KCL worker used in the dynamodb lease table
"workerIdentifier": ${HOSTNAME}

Expand Down Expand Up @@ -181,6 +177,24 @@
"writerParallelismFraction": 0.5
}

# Retry configuration for lake operation failures
"retries": {

# -- Configures exponential backoff on errors related to how lake is set up for this loader.
# -- Examples include authentication errors and permissions errors.
# -- This class of errors are reported periodically to the monitoring webhook.
"setupErrors": {
"delay": "30 seconds"
}

# -- Configures exponential backoff errors that are likely to be transient.
# -- Examples include server errors and network errors
"transientErrors": {
"delay": "1 second"
"attempts": 5
}
}

# -- Schemas that won't be loaded to the lake. Optional, default value []
"skipSchemas": [
"iglu:com.acme/skipped1/jsonschema/1-0-0"
Expand All @@ -189,6 +203,25 @@
"iglu:com.acme/skipped4/jsonschema/*-*-*"
]

# -- Whether the loader should crash and exit if it fails to resolve an Iglu Schema.
# -- We recommend `true` because Snowplow enriched events have already passed validation, so a missing schema normally
# -- indicates an error that needs addressing.
# -- Change to `false` so events go the failed events stream instead of crashing the loader.
"exitOnMissingIgluSchema": true

# -- Whether the output parquet files should declare nested fields as non-nullable according to the Iglu schema.
# -- When true (default), nested fields are nullable only if they are not required fields according to the Iglu schema.
# -- When false, all nested fields are defined as nullable in the output table's schemas
# -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct.
"respectIgluNullability": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down Expand Up @@ -225,6 +258,18 @@
}
}

# -- Report alerts and heartbeats to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
# How often to send the heartbeat event
"heartbeat": "60.minutes"
}

# -- Open a HTTP server that returns OK only if the app is healthy
"healthProbe": {
"port": 8000
Expand Down
49 changes: 49 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,24 @@
"writerParallelismFraction": 0.5
}

# Retry configuration for lake operation failures
"retries": {

# -- Configures exponential backoff on errors related to how lake is set up for this loader.
# -- Examples include authentication errors and permissions errors.
# -- This class of errors are reported periodically to the monitoring webhook.
"setupErrors": {
"delay": "30 seconds"
}

# -- Configures exponential backoff errors that are likely to be transient.
# -- Examples include server errors and network errors
"transientErrors": {
"delay": "1 second"
"attempts": 5
}
}

# -- Schemas that won't be loaded to the lake. Optional, default value []
"skipSchemas": [
"iglu:com.acme/skipped1/jsonschema/1-0-0"
Expand All @@ -160,6 +178,25 @@
"iglu:com.acme/skipped4/jsonschema/*-*-*"
]

# -- Whether the loader should crash and exit if it fails to resolve an Iglu Schema.
# -- We recommend `true` because Snowplow enriched events have already passed validation, so a missing schema normally
# -- indicates an error that needs addressing.
# -- Change to `false` so events go the failed events stream instead of crashing the loader.
"exitOnMissingIgluSchema": true

# -- Whether the output parquet files should declare nested fields as non-nullable according to the Iglu schema.
# -- When true (default), nested fields are nullable only if they are not required fields according to the Iglu schema.
# -- When false, all nested fields are defined as nullable in the output table's schemas
# -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct.
"respectIgluNullability": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down Expand Up @@ -196,6 +233,18 @@
}
}

# -- Report alerts and heartbeats to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
# How often to send the heartbeat event
"heartbeat": "60.minutes"
}

# -- Open a HTTP server that returns OK only if the app is healthy
"healthProbe": {
"port": 8000
Expand Down
49 changes: 49 additions & 0 deletions config/config.gcp.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,24 @@
"writerParallelismFraction": 0.5
}

# Retry configuration for lake operation failures
"retries": {

# -- Configures exponential backoff on errors related to how lake is set up for this loader.
# -- Examples include authentication errors and permissions errors.
# -- This class of errors are reported periodically to the monitoring webhook.
"setupErrors": {
"delay": "30 seconds"
}

# -- Configures exponential backoff errors that are likely to be transient.
# -- Examples include server errors and network errors
"transientErrors": {
"delay": "1 second"
"attempts": 5
}
}

# -- Schemas that won't be loaded to the lake. Optional, default value []
"skipSchemas": [
"iglu:com.acme/skipped1/jsonschema/1-0-0"
Expand All @@ -168,6 +186,25 @@
"iglu:com.acme/skipped4/jsonschema/*-*-*"
]

# -- Whether the loader should crash and exit if it fails to resolve an Iglu Schema.
# -- We recommend `true` because Snowplow enriched events have already passed validation, so a missing schema normally
# -- indicates an error that needs addressing.
# -- Change to `false` so events go the failed events stream instead of crashing the loader.
"exitOnMissingIgluSchema": true

# -- Whether the output parquet files should declare nested fields as non-nullable according to the Iglu schema.
# -- When true (default), nested fields are nullable only if they are not required fields according to the Iglu schema.
# -- When false, all nested fields are defined as nullable in the output table's schemas
# -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct.
"respectIgluNullability": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down Expand Up @@ -204,6 +241,18 @@
}
}

# -- Report alerts and heartbeats to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
# How often to send the heartbeat event
"heartbeat": "60.minutes"
}

# -- Open a HTTP server that returns OK only if the app is healthy
"healthProbe": {
"port": 8000
Expand Down
2 changes: 1 addition & 1 deletion modules/aws/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
}
"spark": {
"conf": {
"fs.s3a.aws.credentials.provider": "com.snowplowanalytics.snowplow.lakes.AssumedRoleCredentialsProviderV1"
"fs.s3a.aws.credentials.provider": "com.snowplowanalytics.snowplow.lakes.AssumedRoleCredentialsProvider"
"fs.s3a.assumed.role.session.name": "snowplow-lake-loader"
"fs.s3a.assumed.role.session.duration": "1h"
}
Expand Down
Loading