From 81aa6fda2e72df5e16c2590c0b3377a7b153e255 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Thu, 24 Oct 2024 12:07:33 +0200 Subject: [PATCH] PDP-1323 Handle initialization errors for GCP --- .../GcpApp.scala | 14 +++++++++++++- .../TableFormatSetupError.scala | 4 ++++ project/Dependencies.scala | 6 +++--- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala b/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala index aa504ac8..ffb48284 100644 --- a/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala +++ b/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala @@ -10,6 +10,8 @@ package com.snowplowanalytics.snowplow.lakes +import com.google.api.client.googleapis.json.GoogleJsonResponseException + import com.snowplowanalytics.snowplow.sources.pubsub.{PubsubSource, PubsubSourceConfig} import com.snowplowanalytics.snowplow.sinks.pubsub.{PubsubSink, PubsubSinkConfig} @@ -19,5 +21,15 @@ object GcpApp extends LoaderApp[PubsubSourceConfig, PubsubSinkConfig](BuildInfo) override def badSink: SinkProvider = PubsubSink.resource(_) - override def isDestinationSetupError: DestinationSetupErrorCheck = TableFormatSetupError.check + override def isDestinationSetupError: DestinationSetupErrorCheck = { + // Destination bucket doesn't exist + case e: GoogleJsonResponseException if e.getDetails.getCode == 404 => + "The specified bucket does not exist" + // Permissions missing for Cloud Storage + case e: GoogleJsonResponseException if e.getDetails.getCode == 403 => + e.getDetails.getMessage + // Exceptions common to the table format - Delta/Iceberg/Hudi + case TableFormatSetupError.check(t) => + t + } } diff --git a/packaging/delta-iceberg/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala b/packaging/delta-iceberg/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala index ddd354e8..5fff87f5 100644 --- a/packaging/delta-iceberg/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala +++ b/packaging/delta-iceberg/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala @@ -12,6 +12,8 @@ package com.snowplowanalytics.snowplow.lakes import org.apache.iceberg.exceptions.{ForbiddenException => IcebergForbiddenException, NotFoundException => IcebergNotFoundException} +import org.apache.spark.sql.delta.DeltaAnalysisException + object TableFormatSetupError { // Check if given exception is specific to iceberg format @@ -22,5 +24,7 @@ object TableFormatSetupError { case e: IcebergForbiddenException => // No permission to create a table in Glue catalog e.getMessage + case e: DeltaAnalysisException if e.errorClass == Some("DELTA_CREATE_TABLE_WITH_NON_EMPTY_LOCATION") => + "Destination not empty and not a Delta table" } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1464d464..c438e369 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -132,9 +132,9 @@ object Dependencies { val icebergDeltaRuntimeDependencies = Seq( iceberg, - delta % Runtime, - Spark.coreForIcebergDelta % Runtime, - Spark.sqlForIcebergDelta % Runtime + delta, + Spark.coreForIcebergDelta, + Spark.sqlForIcebergDelta ) val coreDependencies = Seq(