Skip to content

Commit

Permalink
Separate retry policies for transient errors and setup errors (close #24
Browse files Browse the repository at this point in the history
)
  • Loading branch information
istreeter authored Jan 11, 2024
1 parent c8ae9aa commit 5e71e79
Show file tree
Hide file tree
Showing 15 changed files with 513 additions and 65 deletions.
16 changes: 14 additions & 2 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,20 @@

# Retry configuration for Snowflake operation failures
"retries": {
# Starting backoff period
"backoff": "30 seconds"

# -- Configures exponential backoff on errors related to how Snowflake is set up for this loader.
# -- Examples include authentication errors and permissions errors.
# -- This class of errors are reported periodically to the monitoring webhook.
"setupErrors": {
"delay": "30 seconds"
}

# -- Configures exponential backoff errors that are likely to be transient.
# -- Examples include server errors and network errors
"transientErrors": {
"delay": "1 second"
"attempts": 5
}
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
Expand Down
16 changes: 14 additions & 2 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,20 @@

# Retry configuration for Snowflake operation failures
"retries": {
# Starting backoff period
"backoff": "30 seconds"

# -- Configures exponential backoff on errors related to how Snowflake is set up for this loader.
# -- Examples include authentication errors and permissions errors.
# -- This class of errors are reported periodically to the monitoring webhook.
"setupErrors": {
"delay": "30 seconds"
}

# -- Configures exponential backoff errors that are likely to be transient.
# -- Examples include server errors and network errors
"transientErrors": {
"delay": "1 second"
"attempts": 5
}
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
Expand Down
16 changes: 14 additions & 2 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,20 @@

# Retry configuration for Snowflake operation failures
"retries": {
# Starting backoff period
"backoff": "30 seconds"

# -- Configures exponential backoff on errors related to how Snowflake is set up for this loader.
# -- Examples include authentication errors and permissions errors.
# -- This class of errors are reported periodically to the monitoring webhook.
"setupErrors": {
"delay": "30 seconds"
}

# -- Configures exponential backoff errors that are likely to be transient.
# -- Examples include server errors and network errors
"transientErrors": {
"delay": "1 second"
"attempts": 5
}
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
Expand Down
8 changes: 7 additions & 1 deletion modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
}

"retries": {
"backoff": "30 seconds"
"setupErrors": {
"delay": "30 seconds"
}
"transientErrors": {
"delay": "1 second"
"attempts: 5
}
}

"skipSchemas": []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ object Alert {
final case class FailedToCreateEventsTable(cause: Throwable) extends Alert
final case class FailedToAddColumns(columns: List[String], cause: Throwable) extends Alert
final case class FailedToOpenSnowflakeChannel(cause: Throwable) extends Alert
final case class FailedToParsePrivateKey(cause: Throwable) extends Alert

def toSelfDescribingJson(
alert: Alert,
Expand All @@ -40,6 +41,7 @@ object Alert {
case FailedToCreateEventsTable(cause) => show"Failed to create events table: $cause"
case FailedToAddColumns(columns, cause) => show"Failed to add columns: ${columns.mkString("[", ",", "]")}. Cause: $cause"
case FailedToOpenSnowflakeChannel(cause) => show"Failed to open Snowflake channel: $cause"
case FailedToParsePrivateKey(cause) => show"Failed to parse private key: $cause"
}

full.take(MaxAlertPayloadLength)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,13 @@ object Config {

final case class Webhook(endpoint: Uri, tags: Map[String, String])

case class Retries(backoff: FiniteDuration)
case class SetupErrorRetries(delay: FiniteDuration)
case class TransientErrorRetries(delay: FiniteDuration, attempts: Int)

case class Retries(
setupErrors: SetupErrorRetries,
transientErrors: TransientErrorRetries
)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
implicit val configuration = Configuration.default.withDiscriminator("type")
Expand Down Expand Up @@ -116,6 +122,8 @@ object Config {
implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe]
implicit val webhookDecoder = deriveConfiguredDecoder[Webhook]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
implicit val setupRetries = deriveConfiguredDecoder[SetupErrorRetries]
implicit val transientRetries = deriveConfiguredDecoder[TransientErrorRetries]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]
deriveConfiguredDecoder[Config[Source, Sink]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object Environment {
badSink <- toSink(config.output.bad.sink)
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
tableManager <- Resource.eval(TableManager.make(config.output.good, appHealth, config.retries, monitoring))
channelOpener <- Channel.opener(config.output.good, config.batching)
channelOpener <- Channel.opener(config.output.good, config.batching, config.retries, monitoring, appHealth)
channelProvider <- Channel.provider(channelOpener, config.retries, appHealth, monitoring)
} yield Environment(
appInfo = appInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ object Monitoring {
config match {
case Some(webhookConfig) =>
val request = buildHttpRequest(webhookConfig, message)
executeHttpRequest(webhookConfig, httpClient, request)
Logger[F].info(show"Sending alert to ${webhookConfig.endpoint} with details of the setup error...") *>
executeHttpRequest(webhookConfig, httpClient, request)
case None =>
Logger[F].debug("Webhook monitoring is not configured, skipping alert")
}
Expand All @@ -48,11 +49,13 @@ object Monitoring {
.use { response =>
if (response.status.isSuccess) Sync[F].unit
else {
response.as[String].flatMap(body => Logger[F].error(s"Webhook ${webhookConfig.endpoint} returned non-2xx response:\n$body"))
response
.as[String]
.flatMap(body => Logger[F].error(show"Webhook ${webhookConfig.endpoint} returned non-2xx response:\n$body"))
}
}
.handleErrorWith { e =>
Logger[F].error(e)(s"Webhook ${webhookConfig.endpoint} resulted in exception without a response")
Logger[F].error(e)(show"Webhook ${webhookConfig.endpoint} resulted in exception without a response")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,15 @@ object Channel {

}

def opener[F[_]: Async](config: Config.Snowflake, batchingConfig: Config.Batching): Resource[F, Opener[F]] =
def opener[F[_]: Async](
config: Config.Snowflake,
batchingConfig: Config.Batching,
retriesConfig: Config.Retries,
monitoring: Monitoring[F],
appHealth: AppHealth[F]
): Resource[F, Opener[F]] =
for {
client <- createClient(config, batchingConfig)
client <- createClient(config, batchingConfig, retriesConfig, monitoring, appHealth)
} yield new Opener[F] {
def open: F[CloseableChannel[F]] = createChannel[F](config, client).map(impl[F])
}
Expand All @@ -118,11 +124,8 @@ object Channel {
): Resource[F, Channel[F]] = {

def make(poll: Poll[F]) = poll {
SnowflakeRetrying.retryIndefinitely(health, retries) {
SnowflakeRetrying.withRetries(health, retries, monitoring, Alert.FailedToOpenSnowflakeChannel(_)) {
opener.open
.onError { cause =>
monitoring.alert(Alert.FailedToOpenSnowflakeChannel(cause))
}
}
}

Expand Down Expand Up @@ -208,18 +211,25 @@ object Channel {
props
}

private def createClient[F[_]: Sync](
private def createClient[F[_]: Async](
config: Config.Snowflake,
batchingConfig: Config.Batching
batchingConfig: Config.Batching,
retriesConfig: Config.Retries,
monitoring: Monitoring[F],
appHealth: AppHealth[F]
): Resource[F, SnowflakeStreamingIngestClient] = {
val make = Sync[F].delay {
SnowflakeStreamingIngestClientFactory
.builder("snowplow") // client name is not important
.setProperties(channelProperties(config, batchingConfig))
// .setParameterOverrides(Map.empty.asJava) // Not needed, as all params can also be set with Properties
.build
def make(poll: Poll[F]) = poll {
SnowflakeRetrying.withRetries(appHealth, retriesConfig, monitoring, Alert.FailedToOpenSnowflakeChannel(_)) {
Sync[F].blocking {
SnowflakeStreamingIngestClientFactory
.builder("snowplow") // client name is not important
.setProperties(channelProperties(config, batchingConfig))
// .setParameterOverrides(Map.empty.asJava) // Not needed, as all params can also be set with Properties
.build
}
}
}
Resource.fromAutoCloseable(make)
Resource.makeFull(make)(client => Sync[F].blocking(client.close()))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,59 @@
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.snowflake
package com.snowplowanalytics.snowplow.snowflake.processing

import cats.effect.{Async, Sync}
import cats.implicits._
import doobie.Transactor
import net.snowflake.ingest.utils.{Utils => SnowflakeSdkUtils}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.security.PrivateKey
import java.util.Properties

import com.snowplowanalytics.snowplow.snowflake.{Alert, AppHealth, Config, Monitoring}

object JdbcTransactor {

private val driver: String = "net.snowflake.client.jdbc.SnowflakeDriver"

def make[F[_]: Async](config: Config.Snowflake): F[Transactor[F]] =
private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def make[F[_]: Async](
config: Config.Snowflake,
monitoring: Monitoring[F],
appHealth: AppHealth[F]
): F[Transactor[F]] =
for {
privateKey <- parsePrivateKey[F](config)
privateKey <- parsePrivateKey[F](config, monitoring, appHealth)
props = jdbcProperties(config, privateKey)
} yield Transactor.fromDriverManager[F](driver, config.url.getJdbcUrl, props, None)

private def parsePrivateKey[F[_]: Sync](config: Config.Snowflake): F[PrivateKey] =
Sync[F].delay { // Wrap in Sync because these can raise exceptions
config.privateKeyPassphrase match {
case Some(passphrase) =>
SnowflakeSdkUtils.parseEncryptedPrivateKey(config.privateKey, passphrase)
case None =>
SnowflakeSdkUtils.parsePrivateKey(config.privateKey)
private def parsePrivateKey[F[_]: Async](
config: Config.Snowflake,
monitoring: Monitoring[F],
appHealth: AppHealth[F]
): F[PrivateKey] =
Sync[F]
.delay { // Wrap in Sync because these can raise exceptions
config.privateKeyPassphrase match {
case Some(passphrase) =>
SnowflakeSdkUtils.parseEncryptedPrivateKey(config.privateKey, passphrase)
case None =>
SnowflakeSdkUtils.parsePrivateKey(config.privateKey)
}
}
.onError { e =>
Logger[F].error(e)("Could not parse the Snowflake private key. Will do nothing but wait for loader to be killed") *>
appHealth.setServiceHealth(AppHealth.Service.Snowflake, false) *>
// This is a type of "setup" error, so we send a monitoring alert
monitoring.alert(Alert.FailedToParsePrivateKey(e)) *>
// We don't want to crash and exit, because we don't want to spam Sentry with exceptions about setup errors.
// But there's no point in continuing or retrying. Instead we just block the fiber so the health probe appears unhealthy.
Async[F].never
}
}

private def jdbcProperties(config: Config.Snowflake, privateKey: PrivateKey): Properties = {
val props = new Properties()
Expand Down
Loading

0 comments on commit 5e71e79

Please sign in to comment.