From 08665ae8cb6b04c0bcbd0bd971ec83be86266bf2 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Mon, 16 Oct 2023 14:43:54 +0100 Subject: [PATCH] Add health probe endpoint --- .../core/src/main/resources/reference.conf | 4 ++++ .../Config.scala | 14 ++++++++++--- .../Environment.scala | 21 ++++++++++++++----- .../LoaderApp.scala | 2 +- .../Run.scala | 6 +++--- .../MockEnvironment.scala | 4 +++- project/Dependencies.scala | 3 +-- 7 files changed, 39 insertions(+), 15 deletions(-) diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index f651b6b..c8548e9 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -29,6 +29,10 @@ "tags": { } } + "healthProbe": { + "port": "8000" + "unhealthyLatency": "5 minutes" + } } "telemetry": { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala index 3f39cba..9119048 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala @@ -13,6 +13,7 @@ import io.circe.generic.extras.semiauto._ import io.circe.generic.extras.Configuration import io.circe.config.syntax._ import net.snowflake.ingest.utils.SnowflakeURL +import com.comcast.ip4s.Port import scala.concurrent.duration.FiniteDuration import scala.util.Try @@ -82,9 +83,12 @@ object Config { type Sentry = SentryM[Id] + case class HealthProbe(port: Port, unhealthyLatency: FiniteDuration) + case class Monitoring( metrics: Metrics, - sentry: Option[Sentry] + sentry: Option[Sentry], + healthProbe: HealthProbe ) implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = { @@ -104,8 +108,12 @@ object Config { case SentryM(None, _) => None } - implicit val metricsDecoder = deriveConfiguredDecoder[Metrics] - implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring] + implicit val metricsDecoder = deriveConfiguredDecoder[Metrics] + implicit val portDecoder = Decoder.decodeInt.emap { port => + Port.fromInt(port).toRight("Invalid port") + } + implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe] + implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring] deriveConfiguredDecoder[Config[Source, Sink]] } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala index cdd8399..21d2803 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala @@ -8,6 +8,7 @@ package com.snowplowanalytics.snowplow.snowflake import cats.implicits._ +import cats.Functor import cats.effect.{Async, Resource, Sync} import cats.effect.unsafe.implicits.global import org.http4s.client.Client @@ -17,7 +18,7 @@ import io.sentry.Sentry import com.snowplowanalytics.snowplow.sources.SourceAndAck import com.snowplowanalytics.snowplow.sinks.Sink import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager} -import com.snowplowanalytics.snowplow.loaders.runtime.AppInfo +import com.snowplowanalytics.snowplow.loaders.runtime.{AppInfo, HealthProbe} case class Environment[F[_]]( appInfo: AppInfo, @@ -35,21 +36,23 @@ object Environment { def fromConfig[F[_]: Async, SourceConfig, SinkConfig]( config: Config[SourceConfig, SinkConfig], appInfo: AppInfo, - source: SourceConfig => SourceAndAck[F], - sink: SinkConfig => Resource[F, Sink[F]] + toSource: SourceConfig => F[SourceAndAck[F]], + toSink: SinkConfig => Resource[F, Sink[F]] ): Resource[F, Environment[F]] = for { _ <- enableSentry[F](appInfo, config.monitoring.sentry) httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource - badSink <- sink(config.output.bad) + badSink <- toSink(config.output.bad) metrics <- Resource.eval(Metrics.build(config.monitoring.metrics)) xa <- Resource.eval(SQLUtils.transactor[F](config.output.good)) _ <- Resource.eval(SQLUtils.createTable(config.output.good, xa)) tblManager = TableManager.fromTransactor(config.output.good, xa) channelProvider <- ChannelProvider.make(config.output.good, config.batching) + sourceAndAck <- Resource.eval(toSource(config.input)) + _ <- HealthProbe.resource(config.monitoring.healthProbe.port, isHealthy(config.monitoring.healthProbe, sourceAndAck)) } yield Environment( appInfo = appInfo, - source = source(config.input), + source = sourceAndAck, badSink = badSink, httpClient = httpClient, tblManager = tblManager, @@ -79,4 +82,12 @@ object Environment { Resource.unit[F] } + private def isHealthy[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] = + source.processingLatency.map { latency => + if (latency > config.unhealthyLatency) + HealthProbe.Unhealthy(show"Processing latency is $latency") + else + HealthProbe.Healthy + } + } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/LoaderApp.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/LoaderApp.scala index 642df87..5a02d4b 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/LoaderApp.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/LoaderApp.scala @@ -26,7 +26,7 @@ abstract class LoaderApp[SourceConfig: Decoder, SinkConfig: Decoder]( super.runtimeConfig.copy(cpuStarvationCheckInterval = 10.seconds) type SinkProvider = SinkConfig => Resource[IO, Sink[IO]] - type SourceProvider = SourceConfig => SourceAndAck[IO] + type SourceProvider = SourceConfig => IO[SourceAndAck[IO]] def source: SourceProvider def badSink: SinkProvider diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Run.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Run.scala index 3f0bd1d..7bc783b 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Run.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Run.scala @@ -28,7 +28,7 @@ object Run { def fromCli[F[_]: Async, SourceConfig: Decoder, SinkConfig: Decoder]( appInfo: AppInfo, - toSource: SourceConfig => SourceAndAck[F], + toSource: SourceConfig => F[SourceAndAck[F]], toBadSink: SinkConfig => Resource[F, Sink[F]] ): Opts[F[ExitCode]] = { val configPathOpt = Opts.option[Path]("config", help = "path to config file") @@ -37,7 +37,7 @@ object Run { private def fromConfigPaths[F[_]: Async, SourceConfig: Decoder, SinkConfig: Decoder]( appInfo: AppInfo, - toSource: SourceConfig => SourceAndAck[F], + toSource: SourceConfig => F[SourceAndAck[F]], toBadSink: SinkConfig => Resource[F, Sink[F]], pathToConfig: Path ): F[ExitCode] = { @@ -55,7 +55,7 @@ object Run { private def fromConfig[F[_]: Async, SourceConfig, SinkConfig]( appInfo: AppInfo, - toSource: SourceConfig => SourceAndAck[F], + toSource: SourceConfig => F[SourceAndAck[F]], toBadSink: SinkConfig => Resource[F, Sink[F]], config: Config[SourceConfig, SinkConfig] ): F[ExitCode] = diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala index f5d1e85..8f97d90 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala @@ -17,7 +17,7 @@ import com.snowplowanalytics.snowplow.sinks.Sink import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager} import com.snowplowanalytics.snowplow.loaders.runtime.AppInfo -import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} case class MockEnvironment(state: Ref[IO, Vector[MockEnvironment.Action]], environment: Environment[IO]) @@ -91,6 +91,8 @@ object MockEnvironment { state.update(_ :+ Checkpointed(chunk.toList)) } .drain + + def processingLatency: IO[FiniteDuration] = IO.pure(Duration.Zero) } private def testSink(ref: Ref[IO, Vector[Action]]): Sink[IO] = Sink[IO] { batch => diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 8032d98..a2dfb6a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -27,7 +27,7 @@ object Dependencies { val awsSdk2 = "2.20.135" // Snowplow - val streams = "0.1.0-M4" + val streams = "0.1.0-M5" // tests val specs2 = "4.20.0" @@ -50,7 +50,6 @@ object Dependencies { val jaxb = "javax.xml.bind" % "jaxb-api" % V.jaxb val stsSdk2 = "software.amazon.awssdk" % "sts" % V.awsSdk2 - // snowplow: Note jackson-databind 2.14.x is incompatible with Spark val streamsCore = "com.snowplowanalytics" %% "streams-core" % V.streams val kinesis = "com.snowplowanalytics" %% "kinesis" % V.streams val kafka = "com.snowplowanalytics" %% "kafka" % V.streams