diff --git a/CHANGELOG b/CHANGELOG index cf783fd..1e055c5 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,9 @@ +Version 0.2.2 (2024-04-05) +-------------------------- +Treat all client errors as setup errors when opening channel (#37) +Missing atomic column should be treated as a setup error (#38) +Use TIMESTAMP_TZ types when creating the table (#36) + Version 0.2.1 (2024-02-26) -------------------------- Change the channel’s app name to Snowplow_Streaming (#34) diff --git a/README.md b/README.md index dca0c41..eaf2c58 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ Basic usage: ```bash docker run \ -v /path/to/config.hocon:/var/config.hocon \ - snowplow/snowflake-loader-kafka:0.2.1 \ + snowplow/snowflake-loader-kafka:0.2.2 \ --config /var/config.hocon ``` @@ -30,7 +30,7 @@ The GCP snowflake loader reads the stream of enriched events from Pubsub. ```bash docker run \ -v /path/to/config.hocon:/var/config.hocon \ - snowplow/snowflake-loader-pubsub:0.2.1 \ + snowplow/snowflake-loader-pubsub:0.2.2 \ --config /var/config.hocon ``` @@ -41,7 +41,7 @@ The AWS snowflake loader reads the stream of enriched events from Kinesis. ```bash docker run \ -v /path/to/config.hocon:/var/config.hocon \ - snowplow/snowflake-loader-kinesis:0.2.1 \ + snowplow/snowflake-loader-kinesis:0.2.2 \ --config /var/config.hocon ``` @@ -71,7 +71,7 @@ Licensed under the [Snowplow Limited Use License Agreement][license]. _(If you a [build-image]: https://github.com/snowplow-incubator/snowplow-snowflake-streaming-loader/workflows/CI/badge.svg [build]: https://github.com/snowplow-incubator/snowplow-snowflake-streaming-loader/actions/workflows/ci.yml -[release-image]: https://img.shields.io/badge/release-0.2.1-blue.svg?style=flat +[release-image]: https://img.shields.io/badge/release-0.2.2-blue.svg?style=flat [releases]: https://github.com/snowplow-incubator/snowplow-snowflake-streaming-loader/releases [license]: https://docs.snowplow.io/limited-use-license-1.0 diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Alert.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Alert.scala index 949a039..ba88cca 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Alert.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Alert.scala @@ -30,6 +30,7 @@ object Alert { final case class FailedToAddColumns(columns: List[String], cause: Throwable) extends Alert final case class FailedToOpenSnowflakeChannel(cause: Throwable) extends Alert final case class FailedToParsePrivateKey(cause: Throwable) extends Alert + final case class TableIsMissingAtomicColumn(columnName: String) extends Alert def toSelfDescribingJson( alert: Alert, @@ -52,25 +53,35 @@ object Alert { case FailedToAddColumns(columns, cause) => show"Failed to add columns: ${columns.mkString("[", ",", "]")}. Cause: $cause" case FailedToOpenSnowflakeChannel(cause) => show"Failed to open Snowflake channel: $cause" case FailedToParsePrivateKey(cause) => show"Failed to parse private key: $cause" + case TableIsMissingAtomicColumn(colName) => show"Table is missing required column $colName" } full.take(MaxAlertPayloadLength) } private implicit def throwableShow: Show[Throwable] = { - def go(acc: List[String], next: Throwable): String = { - val nextMessage = next match { + def removeDuplicateMessages(in: List[String]): List[String] = + in match { + case h :: t :: rest => + if (h.contains(t)) removeDuplicateMessages(h :: rest) + else if (t.contains(h)) removeDuplicateMessages(t :: rest) + else h :: removeDuplicateMessages(t :: rest) + case fewer => fewer + } + + def accumulateMessages(t: Throwable): List[String] = { + val nextMessage = t match { case t: SQLException => Some(s"${t.getMessage} = SqlState: ${t.getSQLState}") case t => Option(t.getMessage) } - val msgs = nextMessage.filterNot(msg => acc.headOption.contains(msg)) ++: acc - - Option(next.getCause) match { - case Some(cause) => go(msgs, cause) - case None => msgs.reverse.mkString(": ") + Option(t.getCause) match { + case Some(cause) => nextMessage.toList ::: accumulateMessages(cause) + case None => nextMessage.toList } } - Show.show(go(Nil, _)) + Show.show { t => + removeDuplicateMessages(accumulateMessages(t)).mkString(": ") + } } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala index 57de785..b2efd78 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala @@ -59,8 +59,11 @@ object SnowflakeRetrying { /** Is an error associated with setting up Snowflake as a destination */ private def isSetupError[F[_]: Sync](t: Throwable): F[Boolean] = t match { - case CausedByIngestResponseException(ire) if ire.getErrorCode === 403 => - true.pure[F] + case CausedByIngestResponseException(ire) => + if (ire.getErrorCode >= 400 && ire.getErrorCode < 500) + true.pure[F] + else + false.pure[F] case _: SecurityException => // Authentication failure, i.e. user unrecognized or bad private key true.pure[F] diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala index 80c4233..7d6b650 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala @@ -19,6 +19,8 @@ import net.snowflake.client.jdbc.SnowflakeSQLException import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger +import com.snowplowanalytics.snowplow.loaders.transform.AtomicFields + import scala.util.matching.Regex trait TableManager[F[_]] { @@ -50,8 +52,11 @@ object TableManager { override def addColumns(columns: List[String]): F[Unit] = SnowflakeRetrying.withRetries(appHealth, retriesConfig, monitoring, Alert.FailedToAddColumns(columns, _)) { - Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *> - executeAddColumnsQuery(columns) + for { + _ <- Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") + addable <- columns.traverse(addableColumn(appHealth, monitoring, _)) + _ <- executeAddColumnsQuery(addable) + } yield () } def executeInitTableQuery(): F[Unit] = { @@ -68,31 +73,58 @@ object TableManager { } } - def executeAddColumnsQuery(columns: List[String]): F[Unit] = + def executeAddColumnsQuery(columns: List[AddableColumn]): F[Unit] = transactor.rawTrans.apply { columns.traverse_ { column => sqlAlterTable(config, column).update.run.void .recoverWith { case e: SnowflakeSQLException if e.getErrorCode === 1430 => - Logger[ConnectionIO].info(show"Column already exists: $column") + Logger[ConnectionIO].info(show"Column already exists: ${column.name}") } } } } } + private sealed trait AddableColumn { + def name: String + } + private object AddableColumn { + case class UnstructEvent(name: String) extends AddableColumn + case class Contexts(name: String) extends AddableColumn + } + + private def addableColumn[F[_]: Async]( + appHealth: AppHealth[F], + monitoring: Monitoring[F], + name: String + ): F[AddableColumn] = + name match { + case reUnstruct() => Sync[F].pure(AddableColumn.UnstructEvent(name)) + case reContext() => Sync[F].pure(AddableColumn.Contexts(name)) + case other if AtomicFields.withLoadTstamp.exists(_.name === other) => + Logger[F].error(s"Table is missing required field $name. Will do nothing but wait for loader to be killed") *> + appHealth.setServiceHealth(AppHealth.Service.Snowflake, false) *> + // This is a type of "setup" error, so we send a monitoring alert + monitoring.alert(Alert.TableIsMissingAtomicColumn(name)) *> + // We don't want to crash and exit, because we don't want to spam Sentry with exceptions about setup errors. + // But there's no point in continuing or retrying. Instead we just block the fiber so the health probe appears unhealthy. + Async[F].never + case other => + Sync[F].raiseError(new IllegalStateException(s"Cannot alter table to add unrecognized column $other")) + } + private val reUnstruct: Regex = "^unstruct_event_.*$".r private val reContext: Regex = "^contexts_.*$".r - private def sqlAlterTable(config: Config.Snowflake, colName: String): Fragment = { + private def sqlAlterTable(config: Config.Snowflake, addableColumn: AddableColumn): Fragment = { val tableName = fqTableName(config) - val colType = colName match { - case reUnstruct() => "OBJECT" - case reContext() => "ARRAY" - case other => throw new IllegalStateException(s"Cannot alter table to add column $other") + val colType = addableColumn match { + case AddableColumn.UnstructEvent(_) => "OBJECT" + case AddableColumn.Contexts(_) => "ARRAY" } val colTypeFrag = Fragment.const0(colType) - val colNameFrag = Fragment.const0(colName) + val colNameFrag = Fragment.const0(addableColumn.name) sql""" ALTER TABLE identifier($tableName) ADD COLUMN $colNameFrag $colTypeFrag @@ -108,16 +140,16 @@ object TableManager { CREATE TABLE IF NOT EXISTS identifier($tableName) ( app_id VARCHAR, platform VARCHAR, - etl_tstamp TIMESTAMP, - collector_tstamp TIMESTAMP NOT NULL, - dvce_created_tstamp TIMESTAMP, + etl_tstamp TIMESTAMP_NTZ, + collector_tstamp TIMESTAMP_NTZ NOT NULL, + dvce_created_tstamp TIMESTAMP_NTZ, event VARCHAR, event_id VARCHAR NOT NULL UNIQUE, txn_id INTEGER, name_tracker VARCHAR, v_tracker VARCHAR, - v_collector VARCHAR NOT NULL, - v_etl VARCHAR NOT NULL, + v_collector VARCHAR NOT NULL, + v_etl VARCHAR NOT NULL, user_id VARCHAR, user_ipaddress VARCHAR, user_fingerprint VARCHAR, @@ -223,18 +255,18 @@ object TableManager { mkt_clickid VARCHAR, mkt_network VARCHAR, etl_tags VARCHAR, - dvce_sent_tstamp TIMESTAMP, + dvce_sent_tstamp TIMESTAMP_NTZ, refr_domain_userid VARCHAR, - refr_dvce_tstamp TIMESTAMP, + refr_dvce_tstamp TIMESTAMP_NTZ, domain_sessionid VARCHAR, - derived_tstamp TIMESTAMP, + derived_tstamp TIMESTAMP_NTZ, event_vendor VARCHAR, event_name VARCHAR, event_format VARCHAR, event_version VARCHAR, event_fingerprint VARCHAR, - true_tstamp TIMESTAMP, - load_tstamp TIMESTAMP, + true_tstamp TIMESTAMP_NTZ, + load_tstamp TIMESTAMP_NTZ, CONSTRAINT event_id_pk PRIMARY KEY(event_id) ) """