Skip to content

Commit 14cfbef

Browse files
committed
Loader to start healthy if no privilege to create tables but table already exists (close #29)
1 parent febedd0 commit 14cfbef

File tree

4 files changed

+11
-2
lines changed

4 files changed

+11
-2
lines changed

modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ object Processing {
3737
def stream[F[_]: Async](env: Environment[F]): Stream[F, Nothing] = {
3838
val eventProcessingConfig = EventProcessingConfig(EventProcessingConfig.NoWindowing)
3939
Stream.eval(env.tableManager.initializeEventsTable()) *>
40+
Stream.eval(env.channel.opened.use_) *>
4041
env.source.stream(eventProcessingConfig, eventProcessor(env))
4142
}
4243

modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/SnowflakeRetrying.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ object SnowflakeRetrying {
5555
// Authentication failure, i.e. user unrecognized or bad private key
5656
true.pure[F]
5757
case sql: java.sql.SQLException if sql.getErrorCode === 2003 =>
58-
// Object does not exist or not authorized
58+
// Object does not exist or not authorized to view it
59+
true.pure[F]
60+
case sql: java.sql.SQLException if sql.getErrorCode === 3001 =>
61+
// Insufficient privileges
5962
true.pure[F]
6063
case _ =>
6164
false.pure[F]

modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/TableManager.scala

+4
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ object TableManager {
5959
Logger[ConnectionIO].info(s"Creating table $tableName if it does not already exist...") *>
6060
sqlCreateTable(tableName).update.run.void
6161
}
62+
.recoverWith {
63+
case sql: java.sql.SQLException if sql.getErrorCode === 3001 =>
64+
Logger[F].info(s"Access denied when trying to create table. Will ignore error and assume table already exists.")
65+
}
6266
}
6367

6468
def executeAddColumnsQuery(columns: List[String]): F[Unit] =

modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ class ProcessingSpec extends Specification with CatsEffect {
6767
} yield state should beEqualTo(
6868
Vector(
6969
Action.InitEventsTable,
70+
Action.OpenedChannel,
7071
Action.SentToBad(6),
7172
Action.AddedGoodCountMetric(0),
7273
Action.AddedBadCountMetric(6),
@@ -240,9 +241,9 @@ class ProcessingSpec extends Specification with CatsEffect {
240241
} yield state should beEqualTo(
241242
Vector(
242243
Action.InitEventsTable,
244+
Action.OpenedChannel,
243245
Action.SetLatencyMetric(42123),
244246
Action.SetLatencyMetric(42123),
245-
Action.OpenedChannel,
246247
Action.WroteRowsToSnowflake(4),
247248
Action.AddedGoodCountMetric(4),
248249
Action.AddedBadCountMetric(0),

0 commit comments

Comments
 (0)