Skip to content

Commit

Permalink
[WIP] common-streams 0.8.x with refactored health monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Aug 16, 2024
1 parent 27e44b6 commit 306c97a
Show file tree
Hide file tree
Showing 24 changed files with 287 additions and 675 deletions.
6 changes: 4 additions & 2 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,17 @@
}
}

# -- Report alerts to the webhook
# -- Report alerts and heartbeats to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
}
# How often to send the heartbeat event
"heartbeat": "60.minutes"
}
}

# -- Optional, configure telemetry
Expand Down
4 changes: 3 additions & 1 deletion config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,16 @@
}
}

# -- Report alerts to the webhook
# -- Report alerts and heartbeats to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
# How often to send the heartbeat event
"heartbeat": "60.minutes"
}
}

Expand Down
11 changes: 7 additions & 4 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
# -- pubsub subscription for the source of enriched events
"subscription": "projects/myproject/subscriptions/snowplow-enriched"

# -- How many threads are used by the pubsub client library for fetching events
"parallelPullCount": 3
# -- Controls how many threads are used internally by the pubsub client library for fetching events.
# -- The number of threads is equal to this factor multiplied by the number of availble cpu cores
"parallelPullFactor": 0.5

# -- How many bytes can be buffered by the loader app before blocking the pubsub client library
# -- from fetching more events.
Expand Down Expand Up @@ -137,15 +138,17 @@
}
}

# -- Report alerts to the webhook
# -- Report alerts and heartbeats to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
}
# How often to send the heartbeat event
"heartbeat": "60.minutes"
}
}

# -- Optional, configure telemetry
Expand Down
1 change: 1 addition & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"prefix": "snowplow.snowflake-loader"
}
}
"webhook": ${snowplow.defaults.webhook}
"sentry": {
"tags": {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,78 +10,27 @@

package com.snowplowanalytics.snowplow.snowflake

import cats.implicits._
import cats.Show
import cats.implicits.showInterpolator
import com.snowplowanalytics.iglu.core.circe.implicits.igluNormalizeDataJson
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.snowplow.runtime.AppInfo
import io.circe.Json
import io.circe.syntax.EncoderOps

import java.sql.SQLException
import com.snowplowanalytics.snowplow.runtime.SetupExceptionMessages

sealed trait Alert
object Alert {

/** Restrict the length of an alert message to be compliant with alert iglu schema */
private val MaxAlertPayloadLength = 4096

final case class FailedToCreateEventsTable(cause: Throwable) extends Alert
final case class FailedToAddColumns(columns: List[String], cause: Throwable) extends Alert
final case class FailedToOpenSnowflakeChannel(cause: Throwable) extends Alert
final case class FailedToParsePrivateKey(cause: Throwable) extends Alert
final case class FailedToCreateEventsTable(cause: SetupExceptionMessages) extends Alert
final case class FailedToAddColumns(columns: List[String], cause: SetupExceptionMessages) extends Alert
final case class FailedToOpenSnowflakeChannel(cause: SetupExceptionMessages) extends Alert
final case class FailedToParsePrivateKey(cause: SetupExceptionMessages) extends Alert
final case class TableIsMissingAtomicColumn(columnName: String) extends Alert

def toSelfDescribingJson(
alert: Alert,
appInfo: AppInfo,
tags: Map[String, String]
): Json =
SelfDescribingData(
schema = SchemaKey("com.snowplowanalytics.monitoring.loader", "alert", "jsonschema", SchemaVer.Full(1, 0, 0)),
data = Json.obj(
"appName" -> appInfo.name.asJson,
"appVersion" -> appInfo.version.asJson,
"message" -> getMessage(alert).asJson,
"tags" -> tags.asJson
)
).normalize

private def getMessage(alert: Alert): String = {
val full = alert match {
case FailedToCreateEventsTable(cause) => show"Failed to create events table: $cause"
case FailedToAddColumns(columns, cause) => show"Failed to add columns: ${columns.mkString("[", ",", "]")}. Cause: $cause"
case FailedToOpenSnowflakeChannel(cause) => show"Failed to open Snowflake channel: $cause"
case FailedToParsePrivateKey(cause) => show"Failed to parse private key: $cause"
case TableIsMissingAtomicColumn(colName) => show"Table is missing required column $colName"
}

full.take(MaxAlertPayloadLength)
implicit def showAlert: Show[Alert] = Show {
case FailedToCreateEventsTable(cause) => show"Failed to create events table: $cause"
case FailedToAddColumns(columns, cause) => show"Failed to add columns: ${columns.mkString("[", ",", "]")}. Cause: $cause"
case FailedToOpenSnowflakeChannel(cause) => show"Failed to open Snowflake channel: $cause"
case FailedToParsePrivateKey(cause) => show"Failed to parse private key: $cause"
case TableIsMissingAtomicColumn(colName) => show"Table is missing required column $colName"
}

private implicit def throwableShow: Show[Throwable] = {
def removeDuplicateMessages(in: List[String]): List[String] =
in match {
case h :: t :: rest =>
if (h.contains(t)) removeDuplicateMessages(h :: rest)
else if (t.contains(h)) removeDuplicateMessages(t :: rest)
else h :: removeDuplicateMessages(t :: rest)
case fewer => fewer
}

def accumulateMessages(t: Throwable): List[String] = {
val nextMessage = t match {
case t: SQLException => Some(s"${t.getMessage} = SqlState: ${t.getSQLState}")
case t => Option(t.getMessage)
}
Option(t.getCause) match {
case Some(cause) => nextMessage.toList ::: accumulateMessages(cause)
case None => nextMessage.toList
}
}

Show.show { t =>
removeDuplicateMessages(accumulateMessages(t)).mkString(": ")
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.Id
import cats.syntax.either._
import io.circe.Decoder
import io.circe.generic.extras.semiauto._
import io.circe.generic.extras.Configuration
Expand All @@ -23,9 +22,8 @@ import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDeco

import scala.concurrent.duration.FiniteDuration
import scala.util.Try
import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics, Telemetry}
import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._
import org.http4s.{ParseFailure, Uri}

case class Config[+Source, +Sink](
input: Source,
Expand Down Expand Up @@ -90,17 +88,15 @@ object Config {
metrics: Metrics,
sentry: Option[Sentry],
healthProbe: HealthProbe,
webhook: Option[Webhook]
webhook: Webhook.Config
)

final case class Webhook(endpoint: Uri, tags: Map[String, String])

case class SetupErrorRetries(delay: FiniteDuration)
case class TransientErrorRetries(delay: FiniteDuration, attempts: Int)

case class Retries(
setupErrors: SetupErrorRetries,
transientErrors: TransientErrorRetries
setupErrors: Retrying.Config.ForSetup,
transientErrors: Retrying.Config.ForTransient
)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
Expand All @@ -125,15 +121,9 @@ object Config {
case SentryM(None, _) =>
None
}
implicit val http4sUriDecoder: Decoder[Uri] =
Decoder[String].emap(s => Either.catchOnly[ParseFailure](Uri.unsafeFromString(s)).leftMap(_.toString))

implicit val metricsDecoder = deriveConfiguredDecoder[Metrics]
implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe]
implicit val webhookDecoder = deriveConfiguredDecoder[Webhook]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
implicit val setupRetries = deriveConfiguredDecoder[SetupErrorRetries]
implicit val transientRetries = deriveConfiguredDecoder[TransientErrorRetries]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]
deriveConfiguredDecoder[Config[Source, Sink]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@

package com.snowplowanalytics.snowplow.snowflake

import cats.implicits._
import cats.effect.unsafe.implicits.global
import cats.effect.{Async, Resource}
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe}
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, Webhook}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.AppHealth.Service
import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, TableManager}
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import org.http4s.blaze.client.BlazeClientBuilder
Expand All @@ -29,19 +29,14 @@ case class Environment[F[_]](
tableManager: TableManager[F],
channel: Channel.Provider[F],
metrics: Metrics[F],
appHealth: AppHealth[F],
appHealth: AppHealth.Interface[F, Alert, RuntimeService],
batching: Config.Batching,
schemasToSkip: List[SchemaCriterion],
badRowMaxSize: Int
)

object Environment {

private val initialAppHealth: Map[Service, Boolean] = Map(
Service.Snowflake -> false,
Service.BadSink -> true
)

def fromConfig[F[_]: Async, SourceConfig, SinkConfig](
config: Config[SourceConfig, SinkConfig],
appInfo: AppInfo,
Expand All @@ -51,18 +46,16 @@ object Environment {
for {
_ <- Sentry.capturingAnyException(appInfo, config.monitoring.sentry)
sourceAndAck <- Resource.eval(toSource(config.input))
appHealth <- Resource.eval(AppHealth.init(config.monitoring.healthProbe.unhealthyLatency, sourceAndAck, initialAppHealth))
_ <- HealthProbe.resource(
config.monitoring.healthProbe.port,
appHealth.status()
)
sourceReporter = sourceAndAck.isHealthy(config.monitoring.healthProbe.unhealthyLatency).map(_.showIfUnhealthy)
appHealth <- Resource.eval(AppHealth.init[F, Alert, RuntimeService](List(sourceReporter)))
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
monitoring <- Monitoring.create[F](config.monitoring.webhook, appInfo, httpClient)
_ <- HealthProbe.resource(config.monitoring.healthProbe.port, appHealth)
_ <- Webhook.resource(config.monitoring.webhook, appInfo, httpClient, appHealth)
badSink <- toSink(config.output.bad.sink)
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
tableManager <- Resource.eval(TableManager.make(config.output.good, appHealth, config.retries, monitoring))
channelOpener <- Channel.opener(config.output.good, config.batching, config.retries, monitoring, appHealth)
channelProvider <- Channel.provider(channelOpener, config.retries, appHealth, monitoring)
tableManager <- Resource.eval(TableManager.make(config.output.good, appHealth, config.retries))
channelOpener <- Channel.opener(config.output.good, config.batching, config.retries, appHealth)
channelProvider <- Channel.provider(channelOpener, config.retries, appHealth)
} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
Expand Down
Loading

0 comments on commit 306c97a

Please sign in to comment.