Skip to content

Commit

Permalink
Bump common-streams to 0.8.0-M6
Browse files Browse the repository at this point in the history
Compared to common-streams 0.8.0-M2, this version adds:

- Re-implemented Kinesis source without fs2-kinesis
- Pubsub source opens more transport channels when necessary
- Changes default webhook heartbeat period to 5 minutes
- Http4s Client with configuration appropriate for common-streams apps

Other changes to common-streams are not relevant for snowflake loader,
so not mentioned here.
  • Loading branch information
istreeter committed Oct 21, 2024
1 parent 2920f55 commit 5d5c5b2
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 44 deletions.
7 changes: 7 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@
}
}

# -- Configuration of internal http client used for alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

# -- Optional, configure telemetry
# -- All the fields are optional
"telemetry": {
Expand Down
11 changes: 7 additions & 4 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
"maxRecords": 1000
}

# -- The number of batches of events which are pre-fetched from kinesis.
# -- Increasing this above 1 is not known to improve performance.
"bufferSize": 1

}

"output": {
Expand Down Expand Up @@ -170,6 +166,13 @@
}
}

# -- Configuration of internal http client used for alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

# -- Optional, configure telemetry
# -- All the fields are optional
"telemetry": {
Expand Down
11 changes: 11 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
# -- The actual value used is guided by runtime statistics collected by the pubsub client library.
"minDurationPerAckExtension": "60 seconds"
"maxDurationPerAckExtension": "600 seconds"

# -- The maximum number of streaming pulls we allow on a single GRPC transport channel before opening another channel.
# -- This advanced setting is only relevant on extremely large VMs, or with a high value of `parallelPullCount`.
"maxPullsPerTransportChannel": 16
}

"output": {
Expand Down Expand Up @@ -151,6 +155,13 @@
}
}

# -- Configuration of internal http client used for alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

# -- Optional, configure telemetry
# -- All the fields are optional
"telemetry": {
Expand Down
4 changes: 4 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@
}

"telemetry": ${snowplow.defaults.telemetry}

"http": {
"client": ${snowplow.defaults.http.client}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDeco

import scala.concurrent.duration.FiniteDuration
import scala.util.Try
import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{HttpClient, Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._

case class Config[+Source, +Sink](
Expand All @@ -32,7 +32,8 @@ case class Config[+Source, +Sink](
retries: Config.Retries,
skipSchemas: List[SchemaCriterion],
telemetry: Telemetry.Config,
monitoring: Config.Monitoring
monitoring: Config.Monitoring,
http: Config.Http
)

object Config {
Expand Down Expand Up @@ -99,6 +100,8 @@ object Config {
transientErrors: Retrying.Config.ForTransient
)

case class Http(client: HttpClient.Config)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
implicit val configuration = Configuration.default.withDiscriminator("type")
implicit val urlDecoder = Decoder.decodeString.emapTry { str =>
Expand All @@ -125,6 +128,7 @@ object Config {
implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]
implicit val httpDecoder = deriveConfiguredDecoder[Http]
deriveConfiguredDecoder[Config[Source, Sink]]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.implicits._
import cats.effect.unsafe.implicits.global
import cats.effect.{Async, Resource}
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, HttpClient, Webhook}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, TableManager}
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client

case class Environment[F[_]](
Expand Down Expand Up @@ -48,7 +46,7 @@ object Environment {
sourceAndAck <- Resource.eval(toSource(config.input))
sourceReporter = sourceAndAck.isHealthy(config.monitoring.healthProbe.unhealthyLatency).map(_.showIfUnhealthy)
appHealth <- Resource.eval(AppHealth.init[F, Alert, RuntimeService](List(sourceReporter)))
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
httpClient <- HttpClient.resource[F](config.http.client)
_ <- HealthProbe.resource(config.monitoring.healthProbe.port, appHealth)
_ <- Webhook.resource(config.monitoring.webhook, appInfo, httpClient, appHealth)
badSink <- toSink(config.output.bad.sink).onError(_ => Resource.eval(appHealth.beUnhealthyForRuntimeService(RuntimeService.BadSink)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import cats.effect.{ExitCode, IO}
import com.comcast.ip4s.Port
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, HttpClient, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.sinks.kafka.KafkaSinkConfig
import com.snowplowanalytics.snowplow.snowflake.Config.Snowflake
import com.snowplowanalytics.snowplow.sources.kafka.KafkaSourceConfig
Expand Down Expand Up @@ -130,8 +130,9 @@ object KafkaConfigSpec {
metrics = Config.Metrics(None),
sentry = None,
healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes),
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 60.minutes)
)
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 5.minutes)
),
http = Config.Http(HttpClient.Config(maxConnectionsPerServer = 4))
)

/**
Expand Down Expand Up @@ -229,6 +230,7 @@ object KafkaConfigSpec {
),
webhook =
Webhook.Config(endpoint = Some(uri"https://webhook.acme.com"), tags = Map("pipeline" -> "production"), heartbeat = 60.minutes)
)
),
http = Config.Http(HttpClient.Config(maxConnectionsPerServer = 4))
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ import cats.effect.{ExitCode, IO}
import com.comcast.ip4s.Port
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, HttpClient, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.sinks.kinesis.{BackoffPolicy, KinesisSinkConfig}
import com.snowplowanalytics.snowplow.snowflake.Config.Snowflake
import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig
import eu.timepit.refined.types.all.PosInt
import org.http4s.implicits.http4sLiteralsSyntax
import org.specs2.Specification

Expand Down Expand Up @@ -67,7 +66,6 @@ object KinesisConfigSpec {
workerIdentifier = "testWorkerId",
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
bufferSize = PosInt.unsafeFrom(1),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
Expand Down Expand Up @@ -128,8 +126,9 @@ object KinesisConfigSpec {
metrics = Config.Metrics(None),
sentry = None,
healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes),
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 60.minutes)
)
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 5.minutes)
),
http = Config.Http(HttpClient.Config(maxConnectionsPerServer = 4))
)

/**
Expand All @@ -142,7 +141,6 @@ object KinesisConfigSpec {
workerIdentifier = "testWorkerId",
initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
bufferSize = PosInt.unsafeFrom(1),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
Expand Down Expand Up @@ -223,6 +221,7 @@ object KinesisConfigSpec {
),
webhook =
Webhook.Config(endpoint = Some(uri"https://webhook.acme.com"), tags = Map("pipeline" -> "production"), heartbeat = 60.minutes)
)
),
http = Config.Http(HttpClient.Config(maxConnectionsPerServer = 4))
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.comcast.ip4s.Port
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, HttpClient, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.sinks.pubsub.PubsubSinkConfig
import com.snowplowanalytics.snowplow.snowflake.Config.Snowflake
import com.snowplowanalytics.snowplow.sources.pubsub.PubsubSourceConfig
Expand Down Expand Up @@ -62,14 +62,15 @@ class PubsubConfigSpec extends Specification with CatsEffect {
object PubsubConfigSpec {
private val minimalConfig = Config[PubsubSourceConfig, PubsubSinkConfig](
input = PubsubSourceConfig(
subscription = PubsubSourceConfig.Subscription("myproject", "snowplow-enriched"),
parallelPullFactor = 0.5,
bufferMaxBytes = 10000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = GcpUserAgent("Snowplow OSS", "snowflake-loader"),
shutdownTimeout = 30.seconds
subscription = PubsubSourceConfig.Subscription("myproject", "snowplow-enriched"),
parallelPullFactor = 0.5,
bufferMaxBytes = 10000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = GcpUserAgent("Snowplow OSS", "snowflake-loader"),
shutdownTimeout = 30.seconds,
maxPullsPerTransportChannel = 16
),
output = Config.Output(
good = Config.Snowflake(
Expand Down Expand Up @@ -125,23 +126,25 @@ object PubsubConfigSpec {
metrics = Config.Metrics(None),
sentry = None,
healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes),
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 60.minutes)
)
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 5.minutes)
),
http = Config.Http(HttpClient.Config(maxConnectionsPerServer = 4))
)

/**
* Environment variables for Snowflake private key and passphrase are set in BuildSettings.scala
*/
private val extendedConfig = Config[PubsubSourceConfig, PubsubSinkConfig](
input = PubsubSourceConfig(
subscription = PubsubSourceConfig.Subscription("myproject", "snowplow-enriched"),
parallelPullFactor = 0.5,
bufferMaxBytes = 1000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = GcpUserAgent("Snowplow OSS", "snowflake-loader"),
shutdownTimeout = 30.seconds
subscription = PubsubSourceConfig.Subscription("myproject", "snowplow-enriched"),
parallelPullFactor = 0.5,
bufferMaxBytes = 1000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = GcpUserAgent("Snowplow OSS", "snowflake-loader"),
shutdownTimeout = 30.seconds,
maxPullsPerTransportChannel = 16
),
output = Config.Output(
good = Config.Snowflake(
Expand Down Expand Up @@ -217,6 +220,7 @@ object PubsubConfigSpec {
),
webhook =
Webhook.Config(endpoint = Some(uri"https://webhook.acme.com"), tags = Map("pipeline" -> "production"), heartbeat = 60.minutes)
)
),
http = Config.Http(HttpClient.Config(maxConnectionsPerServer = 4))
)
}
4 changes: 1 addition & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ object Dependencies {
val protobuf = "3.25.5" // Version override

// Snowplow
val streams = "0.8.0-M2"
val streams = "0.8.0-M6"

// tests
val specs2 = "4.20.0"
val catsEffectSpecs2 = "1.5.0"

}

val blazeClient = "org.http4s" %% "http4s-blaze-client" % V.http4s
val http4sCirce = "org.http4s" %% "http4s-circe" % V.http4s
val decline = "com.monovore" %% "decline-effect" % V.decline
val circeGenericExtra = "io.circe" %% "circe-generic-extras" % V.circe
Expand Down Expand Up @@ -81,7 +80,6 @@ object Dependencies {
streamsCore,
loaders,
runtime,
blazeClient,
http4sCirce,
decline,
sentry,
Expand Down

0 comments on commit 5d5c5b2

Please sign in to comment.