Skip to content

Commit

Permalink
PDP-1323 Handle initialization errors for GCP
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Oct 25, 2024
1 parent 6fefeef commit 81aa6fd
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

package com.snowplowanalytics.snowplow.lakes

import com.google.api.client.googleapis.json.GoogleJsonResponseException

import com.snowplowanalytics.snowplow.sources.pubsub.{PubsubSource, PubsubSourceConfig}
import com.snowplowanalytics.snowplow.sinks.pubsub.{PubsubSink, PubsubSinkConfig}

Expand All @@ -19,5 +21,15 @@ object GcpApp extends LoaderApp[PubsubSourceConfig, PubsubSinkConfig](BuildInfo)

override def badSink: SinkProvider = PubsubSink.resource(_)

override def isDestinationSetupError: DestinationSetupErrorCheck = TableFormatSetupError.check
override def isDestinationSetupError: DestinationSetupErrorCheck = {
// Destination bucket doesn't exist
case e: GoogleJsonResponseException if e.getDetails.getCode == 404 =>
"The specified bucket does not exist"
// Permissions missing for Cloud Storage
case e: GoogleJsonResponseException if e.getDetails.getCode == 403 =>
e.getDetails.getMessage
// Exceptions common to the table format - Delta/Iceberg/Hudi
case TableFormatSetupError.check(t) =>
t
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package com.snowplowanalytics.snowplow.lakes

import org.apache.iceberg.exceptions.{ForbiddenException => IcebergForbiddenException, NotFoundException => IcebergNotFoundException}

import org.apache.spark.sql.delta.DeltaAnalysisException

object TableFormatSetupError {

// Check if given exception is specific to iceberg format
Expand All @@ -22,5 +24,7 @@ object TableFormatSetupError {
case e: IcebergForbiddenException =>
// No permission to create a table in Glue catalog
e.getMessage
case e: DeltaAnalysisException if e.errorClass == Some("DELTA_CREATE_TABLE_WITH_NON_EMPTY_LOCATION") =>
"Destination not empty and not a Delta table"
}
}
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ object Dependencies {

val icebergDeltaRuntimeDependencies = Seq(
iceberg,
delta % Runtime,
Spark.coreForIcebergDelta % Runtime,
Spark.sqlForIcebergDelta % Runtime
delta,
Spark.coreForIcebergDelta,
Spark.sqlForIcebergDelta
)

val coreDependencies = Seq(
Expand Down

0 comments on commit 81aa6fd

Please sign in to comment.