Skip to content

Commit 4a99726

Browse files
committed
PDP-1324 - Handle initialization errors for lake loader Delta Azure
1 parent dc7e9a7 commit 4a99726

File tree

2 files changed

+34
-1
lines changed
  • modules

2 files changed

+34
-1
lines changed

modules/azure/src/main/scala/com.snowplowanalytics.snowplow.lakes/AzureApp.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@
1010

1111
package com.snowplowanalytics.snowplow.lakes
1212

13+
import java.net.UnknownHostException
14+
1315
import scala.reflect._
1416

17+
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException
18+
1519
import com.snowplowanalytics.snowplow.sources.kafka.{KafkaSource, KafkaSourceConfig}
1620
import com.snowplowanalytics.snowplow.sinks.kafka.{KafkaSink, KafkaSinkConfig}
1721
import com.snowplowanalytics.snowplow.azure.AzureAuthenticationCallbackHandler
@@ -35,5 +39,23 @@ object AzureApp extends LoaderApp[KafkaSourceConfig, KafkaSinkConfig](BuildInfo)
3539

3640
override def badSink: SinkProvider = KafkaSink.resource(_, classTag[SinkAuthHandler])
3741

38-
override def isDestinationSetupError: DestinationSetupErrorCheck = TableFormatSetupError.check
42+
override def isDestinationSetupError: DestinationSetupErrorCheck = {
43+
// Authentication issue (wrong OAuth endpoint, wrong client id, wrong secret..)
44+
case e: AbfsRestOperationException if e.getStatusCode == -1 =>
45+
e.getErrorMessage()
46+
// Wrong container name
47+
case e: AbfsRestOperationException if e.getStatusCode() == 404 =>
48+
s"${e.getErrorMessage()} (e.g. wrong container name)"
49+
// Service principal missing permissions for container (role assignement missing or wrong role)
50+
case e: AbfsRestOperationException if e.getStatusCode() == 403 =>
51+
s"Missing permissions for the destination (needs \"Storage Blob Data Contributor\" assigned to the service principal for the container)"
52+
// Soft delete not disabled
53+
case e: AbfsRestOperationException if e.getStatusCode() == 409 =>
54+
"Blob soft delete must be disabled on the storage account"
55+
case _: UnknownHostException =>
56+
"Wrong storage name"
57+
// Exceptions common to the table format - Delta/Iceberg/Hudi
58+
case TableFormatSetupError.check(t) =>
59+
t
60+
}
3961
}

modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
package com.snowplowanalytics.snowplow.lakes.tables
1212

13+
import java.net.InetAddress
14+
1315
import cats.implicits._
1416
import cats.effect.Sync
1517
import org.typelevel.log4cats.Logger
@@ -55,7 +57,16 @@ class DeltaWriter(config: Config.Delta) extends Writer {
5557
.build()
5658
}: Unit
5759

60+
// For Azure a wrong storage name means an invalid hostname and infinite retries when creating the Delta table
61+
// If the hostname is invalid, UnknownHostException gets thrown
62+
val checkHostname =
63+
if (List("abfs", "abfss").contains(config.location.getScheme))
64+
Sync[F].blocking(InetAddress.getByName(config.location.getHost()))
65+
else
66+
Sync[F].unit
67+
5868
Logger[F].info(s"Creating Delta table ${config.location} if it does not already exist...") >>
69+
checkHostname >>
5970
Sync[F]
6071
.blocking(builder.execute())
6172
.void

0 commit comments

Comments
 (0)