Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add collector to target latency metrics #50

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
}

# Retry configuration for Snowflake operation failures
"retries": {

Expand All @@ -95,7 +95,7 @@
"delay": "1 second"
"attempts": 5
}
}
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
"skipSchemas": [
Expand All @@ -104,7 +104,7 @@
"iglu:com.acme/skipped3/jsonschema/1-*-*",
"iglu:com.acme/skipped4/jsonschema/*-*-*"
]

"monitoring": {
"metrics": {

Expand Down Expand Up @@ -135,16 +135,25 @@
"myTag": "xyz"
}
}
# -- Report alerts to the webhook

# -- Report alerts and heartbeats to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
}
# How often to send the heartbeat event
"heartbeat": "60.minutes"
}
}

# -- Configuration of internal http client used for alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

# -- Optional, configure telemetry
Expand Down
25 changes: 15 additions & 10 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
"maxRecords": 1000
}

# -- The number of batches of events which are pre-fetched from kinesis.
# -- Increasing this above 1 is not known to improve performance.
"bufferSize": 1

}

"output": {
Expand All @@ -46,7 +42,7 @@
"privateKey": ${SNOWFLAKE_PRIVATE_KEY}

# -- optional, passphrase for the private key
"privateKeyPassphrase": ${?SNOWFLAKE_PRIVATE_KEY_PASSPHRASE}
"privateKeyPassphrase": ${?SNOWFLAKE_PRIVATE_KEY_PASSPHRASE}

# -- optional, snowflake role which the snowflake user should assume
"role": "snowplow_loader"
Expand Down Expand Up @@ -99,7 +95,7 @@
# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
}

# Retry configuration for Snowflake operation failures
"retries": {

Expand All @@ -116,7 +112,7 @@
"delay": "1 second"
"attempts": 5
}
}
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
"skipSchemas": [
Expand All @@ -125,7 +121,7 @@
"iglu:com.acme/skipped3/jsonschema/1-*-*",
"iglu:com.acme/skipped4/jsonschema/*-*-*"
]

"monitoring": {
"metrics": {

Expand Down Expand Up @@ -156,15 +152,24 @@
"myTag": "xyz"
}
}
# -- Report alerts to the webhook

# -- Report alerts and heartbeats to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
# How often to send the heartbeat event
"heartbeat": "60.minutes"
}
}

# -- Configuration of internal http client used for alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

Expand Down
30 changes: 22 additions & 8 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
# -- pubsub subscription for the source of enriched events
"subscription": "projects/myproject/subscriptions/snowplow-enriched"

# -- How many threads are used by the pubsub client library for fetching events
"parallelPullCount": 3
# -- Controls how many threads are used internally by the pubsub client library for fetching events.
# -- The number of threads is equal to this factor multiplied by the number of availble cpu cores
"parallelPullFactor": 0.5

# -- How many bytes can be buffered by the loader app before blocking the pubsub client library
# -- from fetching more events.
Expand All @@ -18,6 +19,10 @@
# -- The actual value used is guided by runtime statistics collected by the pubsub client library.
"minDurationPerAckExtension": "60 seconds"
"maxDurationPerAckExtension": "600 seconds"

# -- The maximum number of streaming pulls we allow on a single GRPC transport channel before opening another channel.
# -- This advanced setting is only relevant on extremely large VMs, or with a high value of `parallelPullCount`.
"maxPullsPerTransportChannel": 16
}

"output": {
Expand Down Expand Up @@ -79,7 +84,7 @@
# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
}

# Retry configuration for Snowflake operation failures
"retries": {

Expand All @@ -96,7 +101,7 @@
"delay": "1 second"
"attempts": 5
}
}
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
"skipSchemas": [
Expand All @@ -105,7 +110,7 @@
"iglu:com.acme/skipped3/jsonschema/1-*-*",
"iglu:com.acme/skipped4/jsonschema/*-*-*"
]

"monitoring": {
"metrics": {

Expand Down Expand Up @@ -136,16 +141,25 @@
"myTag": "xyz"
}
}
# -- Report alerts to the webhook

# -- Report alerts and heartbeats to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
}
# How often to send the heartbeat event
"heartbeat": "60.minutes"
}
}

# -- Configuration of internal http client used for alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

# -- Optional, configure telemetry
Expand Down
9 changes: 7 additions & 2 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"maxDelay": "1 second"
"uploadConcurrency": 3
}

"retries": {
"setupErrors": {
"delay": "30 seconds"
Expand All @@ -33,7 +33,7 @@
"attempts": 5
}
}

"skipSchemas": []

"monitoring": {
Expand All @@ -43,6 +43,7 @@
"prefix": "snowplow.snowflake-loader"
}
}
"webhook": ${snowplow.defaults.webhook}
"sentry": {
"tags": {
}
Expand All @@ -54,4 +55,8 @@
}

"telemetry": ${snowplow.defaults.telemetry}

"http": {
"client": ${snowplow.defaults.http.client}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,78 +10,39 @@

package com.snowplowanalytics.snowplow.snowflake

import cats.implicits._
import cats.Show
import cats.implicits.showInterpolator
import com.snowplowanalytics.iglu.core.circe.implicits.igluNormalizeDataJson
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.snowplow.runtime.AppInfo
import io.circe.Json
import io.circe.syntax.EncoderOps

import java.sql.SQLException
import com.snowplowanalytics.snowplow.runtime.SetupExceptionMessages

sealed trait Alert
object Alert {

/** Restrict the length of an alert message to be compliant with alert iglu schema */
private val MaxAlertPayloadLength = 4096

final case class FailedToCreateEventsTable(cause: Throwable) extends Alert
final case class FailedToAddColumns(columns: List[String], cause: Throwable) extends Alert
final case class FailedToOpenSnowflakeChannel(cause: Throwable) extends Alert
final case class FailedToParsePrivateKey(cause: Throwable) extends Alert
final case class FailedToConnectToSnowflake(cause: SetupExceptionMessages) extends Alert
final case class FailedToShowTables(
database: String,
schema: String,
cause: SetupExceptionMessages
) extends Alert
final case class FailedToCreateEventsTable(
database: String,
schema: String,
cause: SetupExceptionMessages
) extends Alert
final case class FailedToAddColumns(columns: List[String], cause: SetupExceptionMessages) extends Alert
final case class FailedToOpenSnowflakeChannel(cause: SetupExceptionMessages) extends Alert
final case class FailedToParsePrivateKey(cause: SetupExceptionMessages) extends Alert
final case class TableIsMissingAtomicColumn(columnName: String) extends Alert

def toSelfDescribingJson(
alert: Alert,
appInfo: AppInfo,
tags: Map[String, String]
): Json =
SelfDescribingData(
schema = SchemaKey("com.snowplowanalytics.monitoring.loader", "alert", "jsonschema", SchemaVer.Full(1, 0, 0)),
data = Json.obj(
"appName" -> appInfo.name.asJson,
"appVersion" -> appInfo.version.asJson,
"message" -> getMessage(alert).asJson,
"tags" -> tags.asJson
)
).normalize

private def getMessage(alert: Alert): String = {
val full = alert match {
case FailedToCreateEventsTable(cause) => show"Failed to create events table: $cause"
case FailedToAddColumns(columns, cause) => show"Failed to add columns: ${columns.mkString("[", ",", "]")}. Cause: $cause"
case FailedToOpenSnowflakeChannel(cause) => show"Failed to open Snowflake channel: $cause"
case FailedToParsePrivateKey(cause) => show"Failed to parse private key: $cause"
case TableIsMissingAtomicColumn(colName) => show"Table is missing required column $colName"
}

full.take(MaxAlertPayloadLength)
implicit def showAlert: Show[Alert] = Show {
case FailedToConnectToSnowflake(cause) => show"Failed to connect to Snowflake: $cause"
case FailedToShowTables(db, schema, cause) => show"Failed to SHOW tables in Snowflake schema $db.$schema: $cause"
case FailedToCreateEventsTable(db, schema, cause) => show"Failed to create table in schema $db.$schema: $cause"
case FailedToAddColumns(columns, cause) => show"Failed to add columns: ${columns.mkString("[", ",", "]")}. Cause: $cause"
case FailedToOpenSnowflakeChannel(cause) => show"Failed to open Snowflake channel: $cause"
case FailedToParsePrivateKey(cause) => show"Failed to parse private key: $cause"
case TableIsMissingAtomicColumn(colName) => show"Existing table is incompatible with Snowplow: Missing required column $colName"
}

private implicit def throwableShow: Show[Throwable] = {
def removeDuplicateMessages(in: List[String]): List[String] =
in match {
case h :: t :: rest =>
if (h.contains(t)) removeDuplicateMessages(h :: rest)
else if (t.contains(h)) removeDuplicateMessages(t :: rest)
else h :: removeDuplicateMessages(t :: rest)
case fewer => fewer
}

def accumulateMessages(t: Throwable): List[String] = {
val nextMessage = t match {
case t: SQLException => Some(s"${t.getMessage} = SqlState: ${t.getSQLState}")
case t => Option(t.getMessage)
}
Option(t.getCause) match {
case Some(cause) => nextMessage.toList ::: accumulateMessages(cause)
case None => nextMessage.toList
}
}

Show.show { t =>
removeDuplicateMessages(accumulateMessages(t)).mkString(": ")
}
}
}
Loading
Loading