Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump common-streams to 0.8.0-M6 #51

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@
}
}

# -- Configuration of internal http client used for alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

# -- Optional, configure telemetry
# -- All the fields are optional
"telemetry": {
Expand Down
11 changes: 7 additions & 4 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
"maxRecords": 1000
}

# -- The number of batches of events which are pre-fetched from kinesis.
# -- Increasing this above 1 is not known to improve performance.
"bufferSize": 1

}

"output": {
Expand Down Expand Up @@ -170,6 +166,13 @@
}
}

# -- Configuration of internal http client used for alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

# -- Optional, configure telemetry
# -- All the fields are optional
"telemetry": {
Expand Down
11 changes: 11 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
# -- The actual value used is guided by runtime statistics collected by the pubsub client library.
"minDurationPerAckExtension": "60 seconds"
"maxDurationPerAckExtension": "600 seconds"

# -- The maximum number of streaming pulls we allow on a single GRPC transport channel before opening another channel.
# -- This advanced setting is only relevant on extremely large VMs, or with a high value of `parallelPullCount`.
"maxPullsPerTransportChannel": 16
}

"output": {
Expand Down Expand Up @@ -151,6 +155,13 @@
}
}

# -- Configuration of internal http client used for alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

# -- Optional, configure telemetry
# -- All the fields are optional
"telemetry": {
Expand Down
4 changes: 4 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@
}

"telemetry": ${snowplow.defaults.telemetry}

"http": {
"client": ${snowplow.defaults.http.client}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDeco

import scala.concurrent.duration.FiniteDuration
import scala.util.Try
import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{HttpClient, Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._

case class Config[+Source, +Sink](
Expand All @@ -32,7 +32,8 @@ case class Config[+Source, +Sink](
retries: Config.Retries,
skipSchemas: List[SchemaCriterion],
telemetry: Telemetry.Config,
monitoring: Config.Monitoring
monitoring: Config.Monitoring,
http: Config.Http
)

object Config {
Expand Down Expand Up @@ -99,6 +100,8 @@ object Config {
transientErrors: Retrying.Config.ForTransient
)

case class Http(client: HttpClient.Config)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
implicit val configuration = Configuration.default.withDiscriminator("type")
implicit val urlDecoder = Decoder.decodeString.emapTry { str =>
Expand All @@ -125,6 +128,7 @@ object Config {
implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]
implicit val httpDecoder = deriveConfiguredDecoder[Http]
deriveConfiguredDecoder[Config[Source, Sink]]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.implicits._
import cats.effect.unsafe.implicits.global
import cats.effect.{Async, Resource}
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, HttpClient, Webhook}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, TableManager}
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.client.Client

case class Environment[F[_]](
Expand Down Expand Up @@ -48,7 +46,7 @@ object Environment {
sourceAndAck <- Resource.eval(toSource(config.input))
sourceReporter = sourceAndAck.isHealthy(config.monitoring.healthProbe.unhealthyLatency).map(_.showIfUnhealthy)
appHealth <- Resource.eval(AppHealth.init[F, Alert, RuntimeService](List(sourceReporter)))
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
httpClient <- HttpClient.resource[F](config.http.client)
_ <- HealthProbe.resource(config.monitoring.healthProbe.port, appHealth)
_ <- Webhook.resource(config.monitoring.webhook, appInfo, httpClient, appHealth)
badSink <- toSink(config.output.bad.sink).onError(_ => Resource.eval(appHealth.beUnhealthyForRuntimeService(RuntimeService.BadSink)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import cats.effect.{ExitCode, IO}
import com.comcast.ip4s.Port
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, HttpClient, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.sinks.kafka.KafkaSinkConfig
import com.snowplowanalytics.snowplow.snowflake.Config.Snowflake
import com.snowplowanalytics.snowplow.sources.kafka.KafkaSourceConfig
Expand Down Expand Up @@ -130,8 +130,9 @@ object KafkaConfigSpec {
metrics = Config.Metrics(None),
sentry = None,
healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes),
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 60.minutes)
)
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 5.minutes)
),
http = Config.Http(HttpClient.Config(maxConnectionsPerServer = 4))
)

/**
Expand Down Expand Up @@ -229,6 +230,7 @@ object KafkaConfigSpec {
),
webhook =
Webhook.Config(endpoint = Some(uri"https://webhook.acme.com"), tags = Map("pipeline" -> "production"), heartbeat = 60.minutes)
)
),
http = Config.Http(HttpClient.Config(maxConnectionsPerServer = 4))
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ import cats.effect.{ExitCode, IO}
import com.comcast.ip4s.Port
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, HttpClient, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.sinks.kinesis.{BackoffPolicy, KinesisSinkConfig}
import com.snowplowanalytics.snowplow.snowflake.Config.Snowflake
import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig
import eu.timepit.refined.types.all.PosInt
import org.http4s.implicits.http4sLiteralsSyntax
import org.specs2.Specification

Expand Down Expand Up @@ -67,7 +66,6 @@ object KinesisConfigSpec {
workerIdentifier = "testWorkerId",
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
bufferSize = PosInt.unsafeFrom(1),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
Expand Down Expand Up @@ -128,8 +126,9 @@ object KinesisConfigSpec {
metrics = Config.Metrics(None),
sentry = None,
healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes),
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 60.minutes)
)
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 5.minutes)
),
http = Config.Http(HttpClient.Config(maxConnectionsPerServer = 4))
)

/**
Expand All @@ -142,7 +141,6 @@ object KinesisConfigSpec {
workerIdentifier = "testWorkerId",
initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
bufferSize = PosInt.unsafeFrom(1),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
Expand Down Expand Up @@ -223,6 +221,7 @@ object KinesisConfigSpec {
),
webhook =
Webhook.Config(endpoint = Some(uri"https://webhook.acme.com"), tags = Map("pipeline" -> "production"), heartbeat = 60.minutes)
)
),
http = Config.Http(HttpClient.Config(maxConnectionsPerServer = 4))
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.comcast.ip4s.Port
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, HttpClient, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.sinks.pubsub.PubsubSinkConfig
import com.snowplowanalytics.snowplow.snowflake.Config.Snowflake
import com.snowplowanalytics.snowplow.sources.pubsub.PubsubSourceConfig
Expand Down Expand Up @@ -62,14 +62,15 @@ class PubsubConfigSpec extends Specification with CatsEffect {
object PubsubConfigSpec {
private val minimalConfig = Config[PubsubSourceConfig, PubsubSinkConfig](
input = PubsubSourceConfig(
subscription = PubsubSourceConfig.Subscription("myproject", "snowplow-enriched"),
parallelPullFactor = 0.5,
bufferMaxBytes = 10000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = GcpUserAgent("Snowplow OSS", "snowflake-loader"),
shutdownTimeout = 30.seconds
subscription = PubsubSourceConfig.Subscription("myproject", "snowplow-enriched"),
parallelPullFactor = 0.5,
bufferMaxBytes = 10000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = GcpUserAgent("Snowplow OSS", "snowflake-loader"),
shutdownTimeout = 30.seconds,
maxPullsPerTransportChannel = 16
),
output = Config.Output(
good = Config.Snowflake(
Expand Down Expand Up @@ -125,23 +126,25 @@ object PubsubConfigSpec {
metrics = Config.Metrics(None),
sentry = None,
healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes),
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 60.minutes)
)
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 5.minutes)
),
http = Config.Http(HttpClient.Config(maxConnectionsPerServer = 4))
)

/**
* Environment variables for Snowflake private key and passphrase are set in BuildSettings.scala
*/
private val extendedConfig = Config[PubsubSourceConfig, PubsubSinkConfig](
input = PubsubSourceConfig(
subscription = PubsubSourceConfig.Subscription("myproject", "snowplow-enriched"),
parallelPullFactor = 0.5,
bufferMaxBytes = 1000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = GcpUserAgent("Snowplow OSS", "snowflake-loader"),
shutdownTimeout = 30.seconds
subscription = PubsubSourceConfig.Subscription("myproject", "snowplow-enriched"),
parallelPullFactor = 0.5,
bufferMaxBytes = 1000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = GcpUserAgent("Snowplow OSS", "snowflake-loader"),
shutdownTimeout = 30.seconds,
maxPullsPerTransportChannel = 16
),
output = Config.Output(
good = Config.Snowflake(
Expand Down Expand Up @@ -217,6 +220,7 @@ object PubsubConfigSpec {
),
webhook =
Webhook.Config(endpoint = Some(uri"https://webhook.acme.com"), tags = Map("pipeline" -> "production"), heartbeat = 60.minutes)
)
),
http = Config.Http(HttpClient.Config(maxConnectionsPerServer = 4))
)
}
4 changes: 1 addition & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ object Dependencies {
val protobuf = "3.25.5" // Version override

// Snowplow
val streams = "0.8.0-M2"
val streams = "0.8.0-M6"

// tests
val specs2 = "4.20.0"
val catsEffectSpecs2 = "1.5.0"

}

val blazeClient = "org.http4s" %% "http4s-blaze-client" % V.http4s
val http4sCirce = "org.http4s" %% "http4s-circe" % V.http4s
val decline = "com.monovore" %% "decline-effect" % V.decline
val circeGenericExtra = "io.circe" %% "circe-generic-extras" % V.circe
Expand Down Expand Up @@ -81,7 +80,6 @@ object Dependencies {
streamsCore,
loaders,
runtime,
blazeClient,
http4sCirce,
decline,
sentry,
Expand Down
Loading