Skip to content

Commit 159afd6

Browse files
committed
PDP-1323 Handle initialization errors for GCP
1 parent 6fefeef commit 159afd6

File tree

4 files changed

+27
-7
lines changed

4 files changed

+27
-7
lines changed

modules/azure/src/main/scala/com.snowplowanalytics.snowplow.lakes/AzureApp.scala

+5-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import java.net.UnknownHostException
1414

1515
import scala.reflect._
1616

17+
import cats.implicits._
18+
1719
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException
1820

1921
import com.snowplowanalytics.snowplow.sources.kafka.{KafkaSource, KafkaSourceConfig}
@@ -44,13 +46,13 @@ object AzureApp extends LoaderApp[KafkaSourceConfig, KafkaSinkConfig](BuildInfo)
4446
case AuthenticationError(e) =>
4547
e
4648
// Wrong container name
47-
case e: AbfsRestOperationException if e.getStatusCode == 404 =>
49+
case e: AbfsRestOperationException if e.getStatusCode === 404 =>
4850
s"The specified filesystem does not exist (e.g. wrong container name)"
4951
// Service principal missing permissions for container (role assignement missing or wrong role)
50-
case e: AbfsRestOperationException if e.getStatusCode == 403 =>
52+
case e: AbfsRestOperationException if e.getStatusCode === 403 =>
5153
s"Missing permissions for the destination (needs \"Storage Blob Data Contributor\" assigned to the service principal for the container)"
5254
// Soft delete not disabled
53-
case e: AbfsRestOperationException if e.getStatusCode == 409 =>
55+
case e: AbfsRestOperationException if e.getStatusCode === 409 =>
5456
"Blob soft delete must be disabled on the storage account"
5557
case _: UnknownHostException =>
5658
"Wrong storage name"

modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala

+15-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010

1111
package com.snowplowanalytics.snowplow.lakes
1212

13+
import cats.implicits._
14+
15+
import com.google.api.client.googleapis.json.GoogleJsonResponseException
16+
1317
import com.snowplowanalytics.snowplow.sources.pubsub.{PubsubSource, PubsubSourceConfig}
1418
import com.snowplowanalytics.snowplow.sinks.pubsub.{PubsubSink, PubsubSinkConfig}
1519

@@ -19,5 +23,15 @@ object GcpApp extends LoaderApp[PubsubSourceConfig, PubsubSinkConfig](BuildInfo)
1923

2024
override def badSink: SinkProvider = PubsubSink.resource(_)
2125

22-
override def isDestinationSetupError: DestinationSetupErrorCheck = TableFormatSetupError.check
26+
override def isDestinationSetupError: DestinationSetupErrorCheck = {
27+
// Destination bucket doesn't exist
28+
case e: GoogleJsonResponseException if e.getDetails.getCode === 404 =>
29+
"The specified bucket does not exist"
30+
// Permissions missing for Cloud Storage
31+
case e: GoogleJsonResponseException if e.getDetails.getCode === 403 =>
32+
e.getDetails.getMessage
33+
// Exceptions common to the table format - Delta/Iceberg/Hudi
34+
case TableFormatSetupError.check(t) =>
35+
t
36+
}
2337
}

packaging/delta-iceberg/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala

+4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ package com.snowplowanalytics.snowplow.lakes
1212

1313
import org.apache.iceberg.exceptions.{ForbiddenException => IcebergForbiddenException, NotFoundException => IcebergNotFoundException}
1414

15+
import org.apache.spark.sql.delta.DeltaAnalysisException
16+
1517
object TableFormatSetupError {
1618

1719
// Check if given exception is specific to iceberg format
@@ -22,5 +24,7 @@ object TableFormatSetupError {
2224
case e: IcebergForbiddenException =>
2325
// No permission to create a table in Glue catalog
2426
e.getMessage
27+
case e: DeltaAnalysisException if e.errorClass == Some("DELTA_CREATE_TABLE_WITH_NON_EMPTY_LOCATION") =>
28+
"Destination not empty and not a Delta table"
2529
}
2630
}

project/Dependencies.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,9 @@ object Dependencies {
132132

133133
val icebergDeltaRuntimeDependencies = Seq(
134134
iceberg,
135-
delta % Runtime,
136-
Spark.coreForIcebergDelta % Runtime,
137-
Spark.sqlForIcebergDelta % Runtime
135+
delta,
136+
Spark.coreForIcebergDelta,
137+
Spark.sqlForIcebergDelta
138138
)
139139

140140
val coreDependencies = Seq(

0 commit comments

Comments
 (0)