Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 0.2.2 #39

Merged
merged 4 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
Version 0.2.2 (2024-04-05)
--------------------------
Treat all client errors as setup errors when opening channel (#37)
Missing atomic column should be treated as a setup error (#38)
Use TIMESTAMP_TZ types when creating the table (#36)

Version 0.2.1 (2024-02-26)
--------------------------
Change the channel’s app name to Snowplow_Streaming (#34)
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Basic usage:
```bash
docker run \
-v /path/to/config.hocon:/var/config.hocon \
snowplow/snowflake-loader-kafka:0.2.1 \
snowplow/snowflake-loader-kafka:0.2.2 \
--config /var/config.hocon
```

Expand All @@ -30,7 +30,7 @@ The GCP snowflake loader reads the stream of enriched events from Pubsub.
```bash
docker run \
-v /path/to/config.hocon:/var/config.hocon \
snowplow/snowflake-loader-pubsub:0.2.1 \
snowplow/snowflake-loader-pubsub:0.2.2 \
--config /var/config.hocon
```

Expand All @@ -41,7 +41,7 @@ The AWS snowflake loader reads the stream of enriched events from Kinesis.
```bash
docker run \
-v /path/to/config.hocon:/var/config.hocon \
snowplow/snowflake-loader-kinesis:0.2.1 \
snowplow/snowflake-loader-kinesis:0.2.2 \
--config /var/config.hocon
```

Expand Down Expand Up @@ -71,7 +71,7 @@ Licensed under the [Snowplow Limited Use License Agreement][license]. _(If you a
[build-image]: https://github.com/snowplow-incubator/snowplow-snowflake-streaming-loader/workflows/CI/badge.svg
[build]: https://github.com/snowplow-incubator/snowplow-snowflake-streaming-loader/actions/workflows/ci.yml

[release-image]: https://img.shields.io/badge/release-0.2.1-blue.svg?style=flat
[release-image]: https://img.shields.io/badge/release-0.2.2-blue.svg?style=flat
[releases]: https://github.com/snowplow-incubator/snowplow-snowflake-streaming-loader/releases

[license]: https://docs.snowplow.io/limited-use-license-1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ object Alert {
final case class FailedToAddColumns(columns: List[String], cause: Throwable) extends Alert
final case class FailedToOpenSnowflakeChannel(cause: Throwable) extends Alert
final case class FailedToParsePrivateKey(cause: Throwable) extends Alert
final case class TableIsMissingAtomicColumn(columnName: String) extends Alert

def toSelfDescribingJson(
alert: Alert,
Expand All @@ -52,25 +53,35 @@ object Alert {
case FailedToAddColumns(columns, cause) => show"Failed to add columns: ${columns.mkString("[", ",", "]")}. Cause: $cause"
case FailedToOpenSnowflakeChannel(cause) => show"Failed to open Snowflake channel: $cause"
case FailedToParsePrivateKey(cause) => show"Failed to parse private key: $cause"
case TableIsMissingAtomicColumn(colName) => show"Table is missing required column $colName"
}

full.take(MaxAlertPayloadLength)
}

private implicit def throwableShow: Show[Throwable] = {
def go(acc: List[String], next: Throwable): String = {
val nextMessage = next match {
def removeDuplicateMessages(in: List[String]): List[String] =
in match {
case h :: t :: rest =>
if (h.contains(t)) removeDuplicateMessages(h :: rest)
else if (t.contains(h)) removeDuplicateMessages(t :: rest)
else h :: removeDuplicateMessages(t :: rest)
case fewer => fewer
}

def accumulateMessages(t: Throwable): List[String] = {
val nextMessage = t match {
case t: SQLException => Some(s"${t.getMessage} = SqlState: ${t.getSQLState}")
case t => Option(t.getMessage)
}
val msgs = nextMessage.filterNot(msg => acc.headOption.contains(msg)) ++: acc

Option(next.getCause) match {
case Some(cause) => go(msgs, cause)
case None => msgs.reverse.mkString(": ")
Option(t.getCause) match {
case Some(cause) => nextMessage.toList ::: accumulateMessages(cause)
case None => nextMessage.toList
}
}

Show.show(go(Nil, _))
Show.show { t =>
removeDuplicateMessages(accumulateMessages(t)).mkString(": ")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ object SnowflakeRetrying {

/** Is an error associated with setting up Snowflake as a destination */
private def isSetupError[F[_]: Sync](t: Throwable): F[Boolean] = t match {
case CausedByIngestResponseException(ire) if ire.getErrorCode === 403 =>
true.pure[F]
case CausedByIngestResponseException(ire) =>
if (ire.getErrorCode >= 400 && ire.getErrorCode < 500)
true.pure[F]
else
false.pure[F]
case _: SecurityException =>
// Authentication failure, i.e. user unrecognized or bad private key
true.pure[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import net.snowflake.client.jdbc.SnowflakeSQLException
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import com.snowplowanalytics.snowplow.loaders.transform.AtomicFields

import scala.util.matching.Regex

trait TableManager[F[_]] {
Expand Down Expand Up @@ -50,8 +52,11 @@ object TableManager {

override def addColumns(columns: List[String]): F[Unit] =
SnowflakeRetrying.withRetries(appHealth, retriesConfig, monitoring, Alert.FailedToAddColumns(columns, _)) {
Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]") *>
executeAddColumnsQuery(columns)
for {
_ <- Logger[F].info(s"Altering table to add columns [${columns.mkString(", ")}]")
addable <- columns.traverse(addableColumn(appHealth, monitoring, _))
_ <- executeAddColumnsQuery(addable)
} yield ()
}

def executeInitTableQuery(): F[Unit] = {
Expand All @@ -68,31 +73,58 @@ object TableManager {
}
}

def executeAddColumnsQuery(columns: List[String]): F[Unit] =
def executeAddColumnsQuery(columns: List[AddableColumn]): F[Unit] =
transactor.rawTrans.apply {
columns.traverse_ { column =>
sqlAlterTable(config, column).update.run.void
.recoverWith {
case e: SnowflakeSQLException if e.getErrorCode === 1430 =>
Logger[ConnectionIO].info(show"Column already exists: $column")
Logger[ConnectionIO].info(show"Column already exists: ${column.name}")
}
}
}
}
}

private sealed trait AddableColumn {
def name: String
}
private object AddableColumn {
case class UnstructEvent(name: String) extends AddableColumn
case class Contexts(name: String) extends AddableColumn
}

private def addableColumn[F[_]: Async](
appHealth: AppHealth[F],
monitoring: Monitoring[F],
name: String
): F[AddableColumn] =
name match {
case reUnstruct() => Sync[F].pure(AddableColumn.UnstructEvent(name))
case reContext() => Sync[F].pure(AddableColumn.Contexts(name))
case other if AtomicFields.withLoadTstamp.exists(_.name === other) =>
Logger[F].error(s"Table is missing required field $name. Will do nothing but wait for loader to be killed") *>
appHealth.setServiceHealth(AppHealth.Service.Snowflake, false) *>
// This is a type of "setup" error, so we send a monitoring alert
monitoring.alert(Alert.TableIsMissingAtomicColumn(name)) *>
// We don't want to crash and exit, because we don't want to spam Sentry with exceptions about setup errors.
// But there's no point in continuing or retrying. Instead we just block the fiber so the health probe appears unhealthy.
Async[F].never
case other =>
Sync[F].raiseError(new IllegalStateException(s"Cannot alter table to add unrecognized column $other"))
}

private val reUnstruct: Regex = "^unstruct_event_.*$".r
private val reContext: Regex = "^contexts_.*$".r

private def sqlAlterTable(config: Config.Snowflake, colName: String): Fragment = {
private def sqlAlterTable(config: Config.Snowflake, addableColumn: AddableColumn): Fragment = {
val tableName = fqTableName(config)
val colType = colName match {
case reUnstruct() => "OBJECT"
case reContext() => "ARRAY"
case other => throw new IllegalStateException(s"Cannot alter table to add column $other")
val colType = addableColumn match {
case AddableColumn.UnstructEvent(_) => "OBJECT"
case AddableColumn.Contexts(_) => "ARRAY"
}
val colTypeFrag = Fragment.const0(colType)
val colNameFrag = Fragment.const0(colName)
val colNameFrag = Fragment.const0(addableColumn.name)
sql"""
ALTER TABLE identifier($tableName)
ADD COLUMN $colNameFrag $colTypeFrag
Expand All @@ -108,16 +140,16 @@ object TableManager {
CREATE TABLE IF NOT EXISTS identifier($tableName) (
app_id VARCHAR,
platform VARCHAR,
etl_tstamp TIMESTAMP,
collector_tstamp TIMESTAMP NOT NULL,
dvce_created_tstamp TIMESTAMP,
etl_tstamp TIMESTAMP_NTZ,
collector_tstamp TIMESTAMP_NTZ NOT NULL,
dvce_created_tstamp TIMESTAMP_NTZ,
event VARCHAR,
event_id VARCHAR NOT NULL UNIQUE,
txn_id INTEGER,
name_tracker VARCHAR,
v_tracker VARCHAR,
v_collector VARCHAR NOT NULL,
v_etl VARCHAR NOT NULL,
v_collector VARCHAR NOT NULL,
v_etl VARCHAR NOT NULL,
user_id VARCHAR,
user_ipaddress VARCHAR,
user_fingerprint VARCHAR,
Expand Down Expand Up @@ -223,18 +255,18 @@ object TableManager {
mkt_clickid VARCHAR,
mkt_network VARCHAR,
etl_tags VARCHAR,
dvce_sent_tstamp TIMESTAMP,
dvce_sent_tstamp TIMESTAMP_NTZ,
refr_domain_userid VARCHAR,
refr_dvce_tstamp TIMESTAMP,
refr_dvce_tstamp TIMESTAMP_NTZ,
domain_sessionid VARCHAR,
derived_tstamp TIMESTAMP,
derived_tstamp TIMESTAMP_NTZ,
event_vendor VARCHAR,
event_name VARCHAR,
event_format VARCHAR,
event_version VARCHAR,
event_fingerprint VARCHAR,
true_tstamp TIMESTAMP,
load_tstamp TIMESTAMP,
true_tstamp TIMESTAMP_NTZ,
load_tstamp TIMESTAMP_NTZ,
CONSTRAINT event_id_pk PRIMARY KEY(event_id)
)
"""
Expand Down
Loading