From 5e71e7981b6901f653aa7157cdeb2b3264b98de7 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 11 Jan 2024 14:29:47 +0000 Subject: [PATCH] Separate retry policies for transient errors and setup errors (close #24) --- config/config.azure.reference.hocon | 16 +- config/config.kinesis.reference.hocon | 16 +- config/config.pubsub.reference.hocon | 16 +- .../core/src/main/resources/reference.conf | 8 +- .../Alert.scala | 2 + .../Config.scala | 10 +- .../Environment.scala | 2 +- .../Monitoring.scala | 9 +- .../processing/Channel.scala | 40 ++-- .../processing/JdbcTransactor.scala | 46 +++- .../processing/SnowflakeRetrying.scala | 72 +++++- .../processing/TableManager.scala | 14 +- .../processing/ChannelProviderSpec.scala | 105 ++++++++- .../processing/JdbcTransactorSpec.scala | 207 ++++++++++++++++++ .../processing/ProcessingSpec.scala | 15 +- 15 files changed, 513 insertions(+), 65 deletions(-) create mode 100644 modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactorSpec.scala diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index 6176c46..c95f865 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -81,8 +81,20 @@ # Retry configuration for Snowflake operation failures "retries": { - # Starting backoff period - "backoff": "30 seconds" + + # -- Configures exponential backoff on errors related to how Snowflake is set up for this loader. + # -- Examples include authentication errors and permissions errors. + # -- This class of errors are reported periodically to the monitoring webhook. + "setupErrors": { + "delay": "30 seconds" + } + + # -- Configures exponential backoff errors that are likely to be transient. + # -- Examples include server errors and network errors + "transientErrors": { + "delay": "1 second" + "attempts": 5 + } } # -- Schemas that won't be loaded to Snowflake. Optional, default value [] diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index 261cdde..2aa9450 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -102,8 +102,20 @@ # Retry configuration for Snowflake operation failures "retries": { - # Starting backoff period - "backoff": "30 seconds" + + # -- Configures exponential backoff on errors related to how Snowflake is set up for this loader. + # -- Examples include authentication errors and permissions errors. + # -- This class of errors are reported periodically to the monitoring webhook. + "setupErrors": { + "delay": "30 seconds" + } + + # -- Configures exponential backoff errors that are likely to be transient. + # -- Examples include server errors and network errors + "transientErrors": { + "delay": "1 second" + "attempts": 5 + } } # -- Schemas that won't be loaded to Snowflake. Optional, default value [] diff --git a/config/config.pubsub.reference.hocon b/config/config.pubsub.reference.hocon index b124c82..c00136f 100644 --- a/config/config.pubsub.reference.hocon +++ b/config/config.pubsub.reference.hocon @@ -82,8 +82,20 @@ # Retry configuration for Snowflake operation failures "retries": { - # Starting backoff period - "backoff": "30 seconds" + + # -- Configures exponential backoff on errors related to how Snowflake is set up for this loader. + # -- Examples include authentication errors and permissions errors. + # -- This class of errors are reported periodically to the monitoring webhook. + "setupErrors": { + "delay": "30 seconds" + } + + # -- Configures exponential backoff errors that are likely to be transient. + # -- Examples include server errors and network errors + "transientErrors": { + "delay": "1 second" + "attempts": 5 + } } # -- Schemas that won't be loaded to Snowflake. Optional, default value [] diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index eaabbe9..dd04fc7 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -17,7 +17,13 @@ } "retries": { - "backoff": "30 seconds" + "setupErrors": { + "delay": "30 seconds" + } + "transientErrors": { + "delay": "1 second" + "attempts: 5 + } } "skipSchemas": [] diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Alert.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Alert.scala index 2d4a4e0..3f5dce4 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Alert.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Alert.scala @@ -19,6 +19,7 @@ object Alert { final case class FailedToCreateEventsTable(cause: Throwable) extends Alert final case class FailedToAddColumns(columns: List[String], cause: Throwable) extends Alert final case class FailedToOpenSnowflakeChannel(cause: Throwable) extends Alert + final case class FailedToParsePrivateKey(cause: Throwable) extends Alert def toSelfDescribingJson( alert: Alert, @@ -40,6 +41,7 @@ object Alert { case FailedToCreateEventsTable(cause) => show"Failed to create events table: $cause" case FailedToAddColumns(columns, cause) => show"Failed to add columns: ${columns.mkString("[", ",", "]")}. Cause: $cause" case FailedToOpenSnowflakeChannel(cause) => show"Failed to open Snowflake channel: $cause" + case FailedToParsePrivateKey(cause) => show"Failed to parse private key: $cause" } full.take(MaxAlertPayloadLength) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala index dacc1bc..f9f60b9 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala @@ -88,7 +88,13 @@ object Config { final case class Webhook(endpoint: Uri, tags: Map[String, String]) - case class Retries(backoff: FiniteDuration) + case class SetupErrorRetries(delay: FiniteDuration) + case class TransientErrorRetries(delay: FiniteDuration, attempts: Int) + + case class Retries( + setupErrors: SetupErrorRetries, + transientErrors: TransientErrorRetries + ) implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = { implicit val configuration = Configuration.default.withDiscriminator("type") @@ -116,6 +122,8 @@ object Config { implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe] implicit val webhookDecoder = deriveConfiguredDecoder[Webhook] implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring] + implicit val setupRetries = deriveConfiguredDecoder[SetupErrorRetries] + implicit val transientRetries = deriveConfiguredDecoder[TransientErrorRetries] implicit val retriesDecoder = deriveConfiguredDecoder[Retries] deriveConfiguredDecoder[Config[Source, Sink]] } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala index d13879f..ce19371 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala @@ -58,7 +58,7 @@ object Environment { badSink <- toSink(config.output.bad.sink) metrics <- Resource.eval(Metrics.build(config.monitoring.metrics)) tableManager <- Resource.eval(TableManager.make(config.output.good, appHealth, config.retries, monitoring)) - channelOpener <- Channel.opener(config.output.good, config.batching) + channelOpener <- Channel.opener(config.output.good, config.batching, config.retries, monitoring, appHealth) channelProvider <- Channel.provider(channelOpener, config.retries, appHealth, monitoring) } yield Environment( appInfo = appInfo, diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Monitoring.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Monitoring.scala index 4f9dbe0..132ada2 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Monitoring.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Monitoring.scala @@ -29,7 +29,8 @@ object Monitoring { config match { case Some(webhookConfig) => val request = buildHttpRequest(webhookConfig, message) - executeHttpRequest(webhookConfig, httpClient, request) + Logger[F].info(show"Sending alert to ${webhookConfig.endpoint} with details of the setup error...") *> + executeHttpRequest(webhookConfig, httpClient, request) case None => Logger[F].debug("Webhook monitoring is not configured, skipping alert") } @@ -48,11 +49,13 @@ object Monitoring { .use { response => if (response.status.isSuccess) Sync[F].unit else { - response.as[String].flatMap(body => Logger[F].error(s"Webhook ${webhookConfig.endpoint} returned non-2xx response:\n$body")) + response + .as[String] + .flatMap(body => Logger[F].error(show"Webhook ${webhookConfig.endpoint} returned non-2xx response:\n$body")) } } .handleErrorWith { e => - Logger[F].error(e)(s"Webhook ${webhookConfig.endpoint} resulted in exception without a response") + Logger[F].error(e)(show"Webhook ${webhookConfig.endpoint} resulted in exception without a response") } } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Channel.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Channel.scala index c8d16a0..1f37136 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Channel.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Channel.scala @@ -95,9 +95,15 @@ object Channel { } - def opener[F[_]: Async](config: Config.Snowflake, batchingConfig: Config.Batching): Resource[F, Opener[F]] = + def opener[F[_]: Async]( + config: Config.Snowflake, + batchingConfig: Config.Batching, + retriesConfig: Config.Retries, + monitoring: Monitoring[F], + appHealth: AppHealth[F] + ): Resource[F, Opener[F]] = for { - client <- createClient(config, batchingConfig) + client <- createClient(config, batchingConfig, retriesConfig, monitoring, appHealth) } yield new Opener[F] { def open: F[CloseableChannel[F]] = createChannel[F](config, client).map(impl[F]) } @@ -118,11 +124,8 @@ object Channel { ): Resource[F, Channel[F]] = { def make(poll: Poll[F]) = poll { - SnowflakeRetrying.retryIndefinitely(health, retries) { + SnowflakeRetrying.withRetries(health, retries, monitoring, Alert.FailedToOpenSnowflakeChannel(_)) { opener.open - .onError { cause => - monitoring.alert(Alert.FailedToOpenSnowflakeChannel(cause)) - } } } @@ -208,18 +211,25 @@ object Channel { props } - private def createClient[F[_]: Sync]( + private def createClient[F[_]: Async]( config: Config.Snowflake, - batchingConfig: Config.Batching + batchingConfig: Config.Batching, + retriesConfig: Config.Retries, + monitoring: Monitoring[F], + appHealth: AppHealth[F] ): Resource[F, SnowflakeStreamingIngestClient] = { - val make = Sync[F].delay { - SnowflakeStreamingIngestClientFactory - .builder("snowplow") // client name is not important - .setProperties(channelProperties(config, batchingConfig)) - // .setParameterOverrides(Map.empty.asJava) // Not needed, as all params can also be set with Properties - .build + def make(poll: Poll[F]) = poll { + SnowflakeRetrying.withRetries(appHealth, retriesConfig, monitoring, Alert.FailedToOpenSnowflakeChannel(_)) { + Sync[F].blocking { + SnowflakeStreamingIngestClientFactory + .builder("snowplow") // client name is not important + .setProperties(channelProperties(config, batchingConfig)) + // .setParameterOverrides(Map.empty.asJava) // Not needed, as all params can also be set with Properties + .build + } + } } - Resource.fromAutoCloseable(make) + Resource.makeFull(make)(client => Sync[F].blocking(client.close())) } /** diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactor.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactor.scala index 79572c0..5075b0c 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactor.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactor.scala @@ -5,35 +5,59 @@ * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 */ -package com.snowplowanalytics.snowplow.snowflake +package com.snowplowanalytics.snowplow.snowflake.processing import cats.effect.{Async, Sync} import cats.implicits._ import doobie.Transactor import net.snowflake.ingest.utils.{Utils => SnowflakeSdkUtils} +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger import java.security.PrivateKey import java.util.Properties +import com.snowplowanalytics.snowplow.snowflake.{Alert, AppHealth, Config, Monitoring} + object JdbcTransactor { private val driver: String = "net.snowflake.client.jdbc.SnowflakeDriver" - def make[F[_]: Async](config: Config.Snowflake): F[Transactor[F]] = + private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] + + def make[F[_]: Async]( + config: Config.Snowflake, + monitoring: Monitoring[F], + appHealth: AppHealth[F] + ): F[Transactor[F]] = for { - privateKey <- parsePrivateKey[F](config) + privateKey <- parsePrivateKey[F](config, monitoring, appHealth) props = jdbcProperties(config, privateKey) } yield Transactor.fromDriverManager[F](driver, config.url.getJdbcUrl, props, None) - private def parsePrivateKey[F[_]: Sync](config: Config.Snowflake): F[PrivateKey] = - Sync[F].delay { // Wrap in Sync because these can raise exceptions - config.privateKeyPassphrase match { - case Some(passphrase) => - SnowflakeSdkUtils.parseEncryptedPrivateKey(config.privateKey, passphrase) - case None => - SnowflakeSdkUtils.parsePrivateKey(config.privateKey) + private def parsePrivateKey[F[_]: Async]( + config: Config.Snowflake, + monitoring: Monitoring[F], + appHealth: AppHealth[F] + ): F[PrivateKey] = + Sync[F] + .delay { // Wrap in Sync because these can raise exceptions + config.privateKeyPassphrase match { + case Some(passphrase) => + SnowflakeSdkUtils.parseEncryptedPrivateKey(config.privateKey, passphrase) + case None => + SnowflakeSdkUtils.parsePrivateKey(config.privateKey) + } + } + .onError { e => + Logger[F].error(e)("Could not parse the Snowflake private key. Will do nothing but wait for loader to be killed") *> + appHealth.setServiceHealth(AppHealth.Service.Snowflake, false) *> + // This is a type of "setup" error, so we send a monitoring alert + monitoring.alert(Alert.FailedToParsePrivateKey(e)) *> + // We don't want to crash and exit, because we don't want to spam Sentry with exceptions about setup errors. + // But there's no point in continuing or retrying. Instead we just block the fiber so the health probe appears unhealthy. + Async[F].never } - } private def jdbcProperties(config: Config.Snowflake, privateKey: PrivateKey): Properties = { val props = new Properties() diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala index 7f33760..161fd38 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala @@ -1,37 +1,99 @@ package com.snowplowanalytics.snowplow.snowflake.processing +import cats.Applicative import cats.effect.Sync import cats.implicits._ -import com.snowplowanalytics.snowplow.snowflake.{AppHealth, Config} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import retry._ import retry.implicits.retrySyntaxError +import net.snowflake.ingest.connection.IngestResponseException + +import java.lang.SecurityException + +import com.snowplowanalytics.snowplow.snowflake.{Alert, AppHealth, Config, Monitoring} object SnowflakeRetrying { private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] - def retryIndefinitely[F[_]: Sync: Sleep, A](appHealth: AppHealth[F], config: Config.Retries)(action: F[A]): F[A] = - retryUntilSuccessful(appHealth, config, action) <* + def withRetries[F[_]: Sync: Sleep, A]( + appHealth: AppHealth[F], + config: Config.Retries, + monitoring: Monitoring[F], + toAlert: Throwable => Alert + )( + action: F[A] + ): F[A] = + retryUntilSuccessful(appHealth, config, monitoring, toAlert, action) <* appHealth.setServiceHealth(AppHealth.Service.Snowflake, isHealthy = true) private def retryUntilSuccessful[F[_]: Sync: Sleep, A]( appHealth: AppHealth[F], config: Config.Retries, + monitoring: Monitoring[F], + toAlert: Throwable => Alert, action: F[A] ): F[A] = action .onError(_ => appHealth.setServiceHealth(AppHealth.Service.Snowflake, isHealthy = false)) + .retryingOnSomeErrors( + isWorthRetrying = isSetupError[F](_), + policy = policyForSetupErrors[F](config), + onError = logErrorAndSendAlert[F](monitoring, toAlert, _, _) + ) .retryingOnAllErrors( - policy = RetryPolicies.exponentialBackoff[F](config.backoff), - onError = (error, details) => Logger[F].error(error)(s"Executing Snowflake command failed. ${extractRetryDetails(details)}") + policy = policyForTransientErrors[F](config), + onError = logError[F](_, _) ) + /** Is an error associated with setting up Snowflake as a destination */ + private def isSetupError[F[_]: Sync](t: Throwable): F[Boolean] = t match { + case CausedByIngestResponseException(ire) if ire.getErrorCode === 403 => + true.pure[F] + case _: SecurityException => + // Authentication failure, i.e. user unrecognized or bad private key + true.pure[F] + case sql: java.sql.SQLException if sql.getErrorCode === 2003 => + // Object does not exist or not authorized + true.pure[F] + case _ => + false.pure[F] + } + + private def policyForSetupErrors[F[_]: Applicative](config: Config.Retries): RetryPolicy[F] = + RetryPolicies.exponentialBackoff[F](config.setupErrors.delay) + + private def policyForTransientErrors[F[_]: Applicative](config: Config.Retries): RetryPolicy[F] = + RetryPolicies.fullJitter[F](config.transientErrors.delay).join(RetryPolicies.limitRetries(config.transientErrors.attempts - 1)) + + private def logErrorAndSendAlert[F[_]: Sync]( + monitoring: Monitoring[F], + toAlert: Throwable => Alert, + error: Throwable, + details: RetryDetails + ): F[Unit] = + logError(error, details) *> monitoring.alert(toAlert(error)) + + private def logError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] = + Logger[F].error(error)(s"Executing Snowflake command failed. ${extractRetryDetails(details)}") + private def extractRetryDetails(details: RetryDetails): String = details match { case RetryDetails.GivingUp(totalRetries, totalDelay) => s"Giving up on retrying, total retries: $totalRetries, total delay: ${totalDelay.toSeconds} seconds" case RetryDetails.WillDelayAndRetry(nextDelay, retriesSoFar, cumulativeDelay) => s"Will retry in ${nextDelay.toSeconds} seconds, retries so far: $retriesSoFar, total delay so far: ${cumulativeDelay.toSeconds} seconds" } + + private object CausedByIngestResponseException { + def unapply(t: Throwable): Option[IngestResponseException] = + t match { + case ire: IngestResponseException => Some(ire) + case _ => + Option(t.getCause) match { + case Some(cause) => unapply(cause) + case _ => None + } + } + } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala index 3a2d361..554a944 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala @@ -9,7 +9,7 @@ package com.snowplowanalytics.snowplow.snowflake.processing import cats.effect.{Async, Sync} import cats.implicits._ -import com.snowplowanalytics.snowplow.snowflake.{Alert, AppHealth, Config, JdbcTransactor, Monitoring} +import com.snowplowanalytics.snowplow.snowflake.{Alert, AppHealth, Config, Monitoring} import doobie.implicits._ import doobie.{ConnectionIO, Fragment} import net.snowflake.client.jdbc.SnowflakeSQLException @@ -36,25 +36,19 @@ object TableManager { retriesConfig: Config.Retries, monitoring: Monitoring[F] ): F[TableManager[F]] = - JdbcTransactor.make(config).map { transactor => + JdbcTransactor.make(config, monitoring, appHealth).map { transactor => new TableManager[F] { override def initializeEventsTable(): F[Unit] = - SnowflakeRetrying.retryIndefinitely(appHealth, retriesConfig) { + SnowflakeRetrying.withRetries(appHealth, retriesConfig, monitoring, Alert.FailedToCreateEventsTable(_)) { Logger[F].info(s"Opening JDBC connection to ${config.url.getJdbcUrl}") *> executeInitTableQuery() - .onError { cause => - monitoring.alert(Alert.FailedToCreateEventsTable(cause)) - } } override def addColumns(columns: List[String]): F[Unit] = - SnowflakeRetrying.retryIndefinitely(appHealth, retriesConfig) { + SnowflakeRetrying.withRetries(appHealth, retriesConfig, monitoring, Alert.FailedToAddColumns(columns, _)) { Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *> executeAddColumnsQuery(columns) - .onError { cause => - monitoring.alert(Alert.FailedToAddColumns(columns, cause)) - } } def executeInitTableQuery(): F[Unit] = { diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProviderSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProviderSpec.scala index 63d3469..aab46f9 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProviderSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProviderSpec.scala @@ -15,6 +15,8 @@ import cats.effect.testing.specs2.CatsEffect import cats.effect.testkit.TestControl import scala.concurrent.duration.{DurationLong, FiniteDuration} +import java.sql.SQLException + import com.snowplowanalytics.snowplow.snowflake.{Alert, AppHealth, Config, Monitoring} import com.snowplowanalytics.snowplow.runtime.HealthProbe import com.snowplowanalytics.snowplow.snowflake.AppHealth.Service.{BadSink, Snowflake} @@ -28,9 +30,11 @@ class ChannelProviderSpec extends Specification with CatsEffect { Make no actions if the provider is never used $e1 Manage channel lifecycle after a channel is opened $e2 Manage channel lifecycle after an exception using the channel $e3 - Retry opening a channel when there is an exception opening the channel $e4 - Retry according to a single backoff policy when multiple concurrent fibers want to open a channel $e5 - Become healthy after recovering from an earlier failure $e6 + Retry opening a channel and send alerts when there is an setup exception opening the channel $e4 + Retry opening a channel if there is a transient exception opening the channel, with limited number of attempts and no monitoring alerts $e5 + Retry setup error according to a single backoff policy when multiple concurrent fibers want to open a channel $e6 + Become healthy after recovering from an earlier setup error $e7 + Become healthy after recovering from an earlier transient error $e8 """ def e1 = control.flatMap { c => @@ -91,7 +95,8 @@ class ChannelProviderSpec extends Specification with CatsEffect { def e4 = control.flatMap { c => // An channel opener that throws an exception when trying to open a channel val throwingOpener = new Channel.Opener[IO] { - def open: IO[Channel.CloseableChannel[IO]] = goBOOM + def open: IO[Channel.CloseableChannel[IO]] = + c.channelOpener.open *> raiseForSetupError } val io = Channel.provider(throwingOpener, retriesConfig, c.appHealth, c.monitoring).use { provider => @@ -99,9 +104,13 @@ class ChannelProviderSpec extends Specification with CatsEffect { } val expectedState = Vector( + Action.OpenedChannel, Action.SentAlert(0L), + Action.OpenedChannel, Action.SentAlert(30L), + Action.OpenedChannel, Action.SentAlert(90L), + Action.OpenedChannel, Action.SentAlert(210L) ) @@ -120,9 +129,41 @@ class ChannelProviderSpec extends Specification with CatsEffect { } def e5 = control.flatMap { c => + // An channel opener that throws an exception when trying to open a channel + val throwingOpener = new Channel.Opener[IO] { + def open: IO[Channel.CloseableChannel[IO]] = + c.channelOpener.open *> goBOOM + } + + val io = Channel.provider(throwingOpener, retriesConfig, c.appHealth, c.monitoring).use { provider => + provider.opened.use_ + } + + val expectedState = Vector( + Action.OpenedChannel, + Action.OpenedChannel, + Action.OpenedChannel, + Action.OpenedChannel, + Action.OpenedChannel + ) + + val test = for { + _ <- io.voidError + state <- c.state.get + health <- c.appHealth.status() + } yield List( + state should beEqualTo(expectedState), + health should beUnhealthy + ).reduce(_ and _) + + TestControl.executeEmbed(test) + } + + def e6 = control.flatMap { c => // An opener that throws an exception when trying to open a channel val throwingOpener = new Channel.Opener[IO] { - def open: IO[Channel.CloseableChannel[IO]] = goBOOM + def open: IO[Channel.CloseableChannel[IO]] = + c.channelOpener.open *> raiseForSetupError } // Three concurrent fibers wanting to open the channel: @@ -136,9 +177,13 @@ class ChannelProviderSpec extends Specification with CatsEffect { } val expectedState = Vector( + Action.OpenedChannel, Action.SentAlert(0L), + Action.OpenedChannel, Action.SentAlert(30L), + Action.OpenedChannel, Action.SentAlert(90L), + Action.OpenedChannel, Action.SentAlert(210L) ) @@ -155,14 +200,14 @@ class ChannelProviderSpec extends Specification with CatsEffect { TestControl.executeEmbed(test) } - def e6 = control.flatMap { c => + def e7 = control.flatMap { c => // An channel opener that throws an exception *once* and is healthy thereafter val throwingOnceOpener = Ref[IO].of(false).map { hasThrownException => new Channel.Opener[IO] { def open: IO[Channel.CloseableChannel[IO]] = hasThrownException.get.flatMap { case false => - hasThrownException.set(true) *> goBOOM + hasThrownException.set(true) *> c.channelOpener.open *> raiseForSetupError case true => c.channelOpener.open } @@ -176,6 +221,7 @@ class ChannelProviderSpec extends Specification with CatsEffect { } val expectedState = Vector( + Action.OpenedChannel, Action.SentAlert(0L), Action.OpenedChannel, Action.ClosedChannel @@ -192,6 +238,43 @@ class ChannelProviderSpec extends Specification with CatsEffect { TestControl.executeEmbed(test) } + def e8 = control.flatMap { c => + // An channel opener that throws an exception *once* and is healthy thereafter + val throwingOnceOpener = Ref[IO].of(false).map { hasThrownException => + new Channel.Opener[IO] { + def open: IO[Channel.CloseableChannel[IO]] = + hasThrownException.get.flatMap { + case false => + hasThrownException.set(true) *> c.channelOpener.open *> goBOOM + case true => + c.channelOpener.open + } + } + } + + val io = throwingOnceOpener.flatMap { channelOpener => + Channel.provider(channelOpener, retriesConfig, c.appHealth, c.monitoring).use { provider => + provider.opened.use_ + } + } + + val expectedState = Vector( + Action.OpenedChannel, + Action.OpenedChannel, + Action.ClosedChannel + ) + + val test = for { + _ <- io + state <- c.state.get + health <- c.appHealth.status() + } yield List( + state should beEqualTo(expectedState), + health should beHealthy + ).reduce(_ and _) + TestControl.executeEmbed(test) + } + /** Convenience matchers for health probe * */ def beHealthy: org.specs2.matcher.Matcher[HealthProbe.Status] = { (status: HealthProbe.Status) => @@ -228,7 +311,7 @@ object ChannelProviderSpec { monitoring: Monitoring[IO] ) - def retriesConfig = Config.Retries(backoff = 30.seconds) + def retriesConfig = Config.Retries(Config.SetupErrorRetries(30.seconds), Config.TransientErrorRetries(1.second, 5)) def control: IO[Control] = for { @@ -274,4 +357,10 @@ object ChannelProviderSpec { t } + // Raise a known exception that indicates a problem with the warehouse setup + def raiseForSetupError[A]: IO[A] = IO.raiseError(new SQLException("boom!", "02000", 2003)).adaptError { t => + t.setStackTrace(Array()) // don't clutter our test logs + t + } + } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactorSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactorSpec.scala new file mode 100644 index 0000000..f03aa04 --- /dev/null +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/JdbcTransactorSpec.scala @@ -0,0 +1,207 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.snowflake.processing + +import cats.Id +import cats.effect.{IO, Ref} +import cats.effect.kernel.Outcome +import doobie.Transactor +import org.specs2.Specification +import org.specs2.matcher.MatchResult +import cats.effect.testing.specs2.CatsEffect +import cats.effect.testkit.TestControl +import net.snowflake.ingest.utils.SnowflakeURL + +import com.snowplowanalytics.snowplow.runtime.HealthProbe +import com.snowplowanalytics.snowplow.snowflake.{Alert, AppHealth, Config, MockEnvironment, Monitoring} + +import scala.concurrent.duration.DurationLong + +class JdbcTransactorSpec extends Specification with CatsEffect { + import JdbcTransactorSpec._ + + def is = s2""" + The JdbcTransactor when given a valid config should + Provide a transactor ${healthy1(goodConfig)} + Provide a transactor when key is encrypted ${healthy1(goodConfigWithEncryptedKey)} + Not send any alerts ${healthy2(goodConfig)} ${healthy2(goodConfigWithEncryptedKey)} + The JdbcTransactor when given an invalid config should + Sleep forever instead of providing a transactor ${unhealthy1(badConfigWithEncryptedKey)} ${unhealthy1(badConfigWithMissingPassphrase)} + Send a monitoring alert ${unhealthy2(badConfigWithEncryptedKey)} ${unhealthy2(badConfigWithMissingPassphrase)} + Change app health from unhealthy to healthy ${unhealthy3(badConfigWithEncryptedKey)} ${unhealthy3(badConfigWithEncryptedKey)} + """ + + def healthy1(config: Config.Snowflake) = MockEnvironment.build(Nil, MockEnvironment.Mocks.default).use { c => + val io = for { + monitoring <- testMonitoring + result <- JdbcTransactor.make(config, monitoring, c.environment.appHealth) + } yield result + + afterOneDay(io) { case Some(Outcome.Succeeded(_: Transactor[IO])) => + ok + } + } + + def healthy2(config: Config.Snowflake) = MockEnvironment.build(Nil, MockEnvironment.Mocks.default).use { c => + val io = for { + monitoring <- testMonitoring + _ <- JdbcTransactor.make(config, monitoring, c.environment.appHealth) + alerts <- monitoring.ref.get + } yield alerts + + afterOneDay(io) { case Some(Outcome.Succeeded(alerts)) => + alerts must beEmpty + } + } + + def unhealthy1(config: Config.Snowflake) = MockEnvironment.build(Nil, MockEnvironment.Mocks.default).use { c => + val io = for { + monitoring <- testMonitoring + _ <- JdbcTransactor.make(config, monitoring, c.environment.appHealth) + } yield () + + afterOneDay(io) { case None => + ok + } + } + + def unhealthy2(config: Config.Snowflake) = MockEnvironment.build(Nil, MockEnvironment.Mocks.default).use { c => + val io = for { + monitoring <- testMonitoring + _ <- JdbcTransactor.make(config, monitoring, c.environment.appHealth).start + _ <- IO.sleep(6.hours) + alerts <- monitoring.ref.get + } yield alerts + + afterOneDay(io) { case Some(Outcome.Succeeded(alerts)) => + alerts must beLike { case Vector(Alert.FailedToParsePrivateKey(_)) => + ok + } + } + } + + def unhealthy3(config: Config.Snowflake) = MockEnvironment.build(Nil, MockEnvironment.Mocks.default).use { c => + val io = for { + monitoring <- testMonitoring + _ <- c.environment.appHealth.setServiceHealth(AppHealth.Service.Snowflake, true) + healthBeforeTest <- c.environment.appHealth.status() + _ <- JdbcTransactor.make(config, monitoring, c.environment.appHealth).start + _ <- IO.sleep(6.hours) + healthAfterTest <- c.environment.appHealth.status() + } yield (healthBeforeTest, healthAfterTest) + + afterOneDay(io) { case Some(Outcome.Succeeded((healthBeforeTest, healthAfterTest))) => + (healthBeforeTest, healthAfterTest) must beLike { case (HealthProbe.Healthy, HealthProbe.Unhealthy(_)) => + ok + } + } + } + + def afterOneDay[A](io: IO[A])(pf: PartialFunction[Option[Outcome[Id, Throwable, A]], MatchResult[Any]]): IO[MatchResult[Any]] = + TestControl.execute(io).flatMap { ioControl => + for { + _ <- ioControl.tickFor(1.day) + results <- ioControl.results + } yield results must beLike(pf) + } + +} + +object JdbcTransactorSpec { + + val goodUnencryptedKey = """-----BEGIN PRIVATE KEY----- + |MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDDcm38cgshuiF2 + |IbilphDODAluv3r8paiPiKUGcwxLt6hY1kooVjvvu2dpKlvd34g3pfppneN1yJhc + |2+Me/iJD644TyBcp9fa6/F/bfczwXxbDPZ7Lc5OUfnGo4IVuDw2HaE1AAuT/Jgnq + |CUnx6RAF83AAvm1TtFQdRnbeFln1ril4zKQdD6w6sxu8ucbF9egTh4FpUDgNp0Hg + |K4SdjONp4cPt9LrN/0vGh3yh6svv9HApktBh9K2Wjjaxntmd9MlKpGCUrLWXDMun + |i16OIQ3bcOFh7aAP1SMRSHKAJiHqZIUBIJMlVJGKQCHfrMQ6CJSxO/sDJ1vC+CNd + |pTKy5JF9AgMBAAECggEADRznTjGkl42yYweeKNb8d6aNF3YXXU4MAh1L3SPo5keb + |LuKptQ0cFlh/dqnV4gv2Sq2DIITsVuGvf0NteI3aZK4wKRmanEEZXbBDCinljxcC + |IvVGayYE98iH/amaqiiuYrBXxnyrOocl0SLwaB+X6J5NnG8qTJxjrFcm8H2VaYs4 + |zCe3oJO7waZF/HrBeZZkG2zfae+p/EqqD72Pf/MVh431ApseWecEg+MPpgfBRita + |BieY6jEjQbiWXJgfS9bMZWVCLNcclwdQ7PS3yXV4uzxEbYiCcHKUwDap/2U8mena + |rJAmNykQSbgk1SwjgDMjUQ/ctAUASJZY3DQXACYmIQKBgQDmEr8neVBxs0rsB64D + |dzWLYzUaiuK9QVXITfF6dy8MCOwEm62yOvBVdhJR0cwUgxiPfmV/OV9NirbMtKEM + |+/ssWHFqic4J/nDVFrEeWrrzWk/onIrDgNGWf3XktVE2DmfKdCrssryDqZC0corl + |7w8OTMLGUybdTMp4U/35kGRNCQKBgQDZeMUhVMcRsX/YcpJuk0kzTc/v0SYL44LE + |mFyq3qlxdDP0CY0ie1c/vLdVe+h+mv378fAuLf664UpljYb5wbNEjfYz7RoW4+Um + |d19NSlK0ZqNZbiyK4FkfiF3OtLGjp+khz7EUAKNhnOPpPrs+KfvwsX14OzoOs0Y2 + |J7LMIVPx1QKBgQCmCHtYgkzScOAtq3Eh4SKL/8Ev8XClwYOldNJCXcZe+gVRYgOc + |rroIApg/4ZZUazMLQtz+Tin/rI409lmPJD1kCEN47g/52FwW+zRAwptNySwHowjl + |A468/CjZLxx3VTgDu4fKn0Y6AeGCx3KDcty7phudwh428BbhdUPAmTo4+QKBgARH + |1kmDq69zePq/tpYqnARAgdlMmp0dS1OnVBug6mDrUqJ5FagaGWuNwWYTXE4xqtIs + |vveJvDvdd2NsV73OzEKLMM9w2VSeA8KwEtYoolwesRRvkLzjEZ4HRyFseRqpkXMy + |7V9ha9XeCrZqn7Dnjqf8NmYJdGkZqkYinehat5ZJAoGBAICsQjZieYf01Z1KXzCc + |BC16iq6Mulic4JCDncPfMP/1bUtNiqm5vbRjt4rUJYt6GM78hvjQu9++TSCbeg9E + |zs8wGxJAaWbrvWSqXHwQyj35u0UkUkf58Dqb2bBepwEAyBdiFSgzJPdR7Y6rWFQk + |isyi39rDQSFfHqyi9WCjRzcg + |-----END PRIVATE KEY-----""".stripMargin + + val goodEncryptedKey = """-----BEGIN ENCRYPTED PRIVATE KEY----- + |MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQIq95GfE2zqkcCAggA + |MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECHuOkUb6DmoHBIIEyIO0xBYFGyFY + |fSEnQdfKCh2UnzrkxOVjCQyKake91BNmd3VREkeu8axSTkM9GGkuY8LzD/qCBegP + |weFEI2ij7xCKrvTiXpvGRkUhXMimh6i+bngkxWfn3oJg2zL15W34DVYZ50MzhPsf + |XUtFXFOdG3oDku94SqtwTk6FpQPd2oQyb1ERtbTXhYOneIpDEVoJ48/sNluMOrbe + |/pMjZy8rFBJKZgKTwKAjHqJW7dF4hfXvWqWFEkzZzO+fnJx4s9ufdk1ftbW0iO45 + |xfqzMTN4mlD9DZYktM2YGuoIaLqKphz6vSmoawzNKCJmAYBxjuPXFpoLSyozZ+NH + |42u9Q9muaZuT3RaJ6myvP+TbmI11zupecmqGalH4xFQDWyz6y154Gl09qQMry9VO + |Lb++2v/ZvsgkV/YL5bN3WtSKnXdCATFna16WStzw4E25cEUuDxCd24WMYyq7jqKK + |qMWHfNG/yoTKFiCkazEf5/04DFEBZJrpiIsv8K85dy2uPggYT6q1HrJIY0cu84ub + |lk2Ev8Xk89H/tdXbchRzjkw4WC2jwpkunUVqW4Rz58QsIfeaYZju+P8AWof07nn/ + |O361CDuM4pvE2F1oaKt6tRoQgqpnAaYkqLxi3U5VAw6uOWuHOY06Srht1aGn7lC/ + |TXtjBdeN0MYrUpNy8QXbZZ2zKedxLQyUEICa8wPeJd1+gmP3w5UJET2LNIY/Pesh + |SqDyQD7BzAGoz+QINYL+vRmR/8q+KyfX1orsmjIP3d9bCyq9chr18hgiSfWLIm8d + |woh2SDibjA3L8RBESukvPFifoU+n8E3AGZ9s7uuIzhkaSYHTVp2TeLRME4OqB4Cp + |EVMSiNqIOjrtv1EviUCTUyiPF/GzYzYFlXyvH5f2YuxqXvjvbAhv7VbXJoAhKF/q + |L4Ndsa5LxNNh6xLJKuTAeOYYlgSoAXRkRMwkhOlU73yonIeroeqqXYYjK45BCTEE + |YPvAER37Ttg5UrHwpVPzu8ltnZKnyRff190HK1GRvXHVScwSL5h1E+YcX/bIoVoN + |LcGFjqXhXsc1M/meJ4VnriwXQPJfQtk0ndfPiQoDrGNjHE5MqoOdTwSHT08qlkOT + |1AERbJTc3OzYtWKKgVOcqTeVeUfubWLdD0oRjxBuTi5iynZn9RjdpZ1lzzT8zajq + |Eh7APzc6ILLDU09MeKZp4HaiPvKEUumo0dxpgsWRKke9xcMO+KKgr06+gANHhwnC + |4FA8II9JZIoxSUgoaAysExaA8GD4KmZg/43i8ulZNgbEUV0VyV0rN3g3IkCKfImU + |muF9/r5e8meuj5jibvLMQxiDe5DEMiBkxqteMdXtLIMr9FUIwahhC1kQSgwF5Vqj + |lppulhtXWMx/gRF6ERn0Wk+IBYcl2FKvvIGFsP07BbyAEFQE05trtuZ+a/N1E9Ie + |V+SKr8V7hpAio1uHgWJf18rbXLQ5vkTvlKLduHcdgWusVo4Uv57/GKlY7cwqg3tT + |E45BWdNvJZdat3XEc78U0jw4qEhOeVyEZy6Ak9AzLiYHAhwadenWeeOYLCz3LdMv + |E5DRdBmWrUx8XDnblikYGvbQyc1h5c6Rf0VvaEp7W2phTW8dJLOwsI25LTBOwfj9 + |fYTrkl7M6H3hYijvsBW6Fg== + |-----END ENCRYPTED PRIVATE KEY-----""".stripMargin + + val goodEncryptedKeyPassphrase = "password1" + + val goodConfig: Config.Snowflake = Config.Snowflake( + url = new SnowflakeURL("https://abcdefg-example.snowflakecomputing.com"), + user = "snowplow", + privateKey = goodUnencryptedKey, + privateKeyPassphrase = None, + role = None, + database = "snowplow", + schema = "snowplow", + table = "events", + channel = "snowplow", + jdbcLoginTimeout = 60.seconds, + jdbcNetworkTimeout = 60.seconds, + jdbcQueryTimeout = 60.seconds + ) + + val goodConfigWithEncryptedKey = goodConfig.copy(privateKey = goodEncryptedKey, privateKeyPassphrase = Some(goodEncryptedKeyPassphrase)) + val badConfigWithEncryptedKey = goodConfigWithEncryptedKey.copy(privateKeyPassphrase = Some("nonsese")) + val badConfigWithMissingPassphrase = goodConfigWithEncryptedKey.copy(privateKeyPassphrase = None) + + class TestMonitoring(val ref: Ref[IO, Vector[Alert]]) extends Monitoring[IO] { + override def alert(message: Alert): IO[Unit] = + ref.update(_ :+ message) + } + + def testMonitoring: IO[TestMonitoring] = + for { + ref <- Ref[IO].of(Vector.empty[Alert]) + } yield new TestMonitoring(ref) +} diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala index 7f0cdb6..2001a60 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala @@ -106,7 +106,7 @@ class ProcessingSpec extends Specification with CatsEffect { Response.Success( Channel.WriteResult.WriteFailures( List( - Channel.WriteFailure(0L, List("unstruct_event_xyz_1", "contexts_abc_2"), new SFException(ErrorCode.INVALID_FORMAT_ROW)) + Channel.WriteFailure(0L, List("unstruct_event_xyz_1", "contexts_abc_2"), newSFException(ErrorCode.INVALID_FORMAT_ROW)) ) ) ), @@ -141,7 +141,7 @@ class ProcessingSpec extends Specification with CatsEffect { Response.Success( Channel.WriteResult.WriteFailures( List( - Channel.WriteFailure(0L, Nil, new SFException(ErrorCode.INVALID_FORMAT_ROW)) + Channel.WriteFailure(0L, Nil, newSFException(ErrorCode.INVALID_FORMAT_ROW)) ) ) ), @@ -173,7 +173,7 @@ class ProcessingSpec extends Specification with CatsEffect { Response.Success( Channel.WriteResult.WriteFailures( List( - Channel.WriteFailure(0L, Nil, new SFException(ErrorCode.INTERNAL_ERROR)) + Channel.WriteFailure(0L, Nil, newSFException(ErrorCode.INTERNAL_ERROR)) ) ) ), @@ -287,7 +287,7 @@ class ProcessingSpec extends Specification with CatsEffect { Response.Success( Channel.WriteResult.WriteFailures( List( - Channel.WriteFailure(0L, List.empty, new SFException(ErrorCode.INTERNAL_ERROR)) + Channel.WriteFailure(0L, List.empty, newSFException(ErrorCode.INTERNAL_ERROR)) ) ) ) @@ -347,4 +347,11 @@ object ProcessingSpec { TokenedEvents(serialized, token, None) } + // Helper to create a SFException, and remove the stacktrace so we don't clutter our test logs. + private def newSFException(errorCode: ErrorCode): SFException = { + val t = new SFException(errorCode) + t.setStackTrace(Array()) // don't clutter our test logs + t + } + }