-
Notifications
You must be signed in to change notification settings - Fork 380
Description
Stream Reactor version: 10.0.3
Kafka Connect version: confluentinc/cp-kafka-connect 7.9.2
I’m trying to use the Azure Data Lake sink connector. After fixing #1842, the DataLakeFileClient.readWithResponse(...) call executes fine, but now I'm getting an error when the connector attempts to write data to the index file that it creates in the Datalake. Here is the complete stack trace:
org.apache.kafka.connect.errors.ConnectException: error writing file ({\"owner\":\"a1d4e117-43c2-4d1b-9907-5e1e245dfa37\",\"committedOffset\":null,\"pendingState\":null}) Status code 412, \"{\"error\":{\"code\":\"ConditionNotMet\",\"message\":\"The condition specified using HTTP conditional header(s) is not met.\
RequestId:09e0ab5f-d01f-005f-078a-47e85d000000\
Time:2025-10-27T21:43:41.7192953Z\"}}\"
at io.lenses.streamreactor.connect.cloud.common.sink.CloudSinkTask.handleErrors(CloudSinkTask.scala:114)
at io.lenses.streamreactor.connect.cloud.common.sink.CloudSinkTask.open(CloudSinkTask.scala:261)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:661)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1300(WorkerSinkTask.java:78)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:752)
at org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker.invokePartitionsAssigned(ConsumerRebalanceListenerInvoker.java:65)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:425)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:504)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:415)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:511)
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateAssignmentMetadataIfNeeded(ClassicKafkaConsumer.java:657)
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:616)
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:596)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:498)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:340)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.azure.storage.file.datalake.models.DataLakeStorageException: Status code 412, \"{\"error\":{\"code\":\"ConditionNotMet\",\"message\":\"The condition specified using HTTP conditional header(s) is not met.\
RequestId:09e0ab5f-d01f-005f-078a-47e85d000000\
Time:2025-10-27T21:43:41.7192953Z\"}}\"
at com.azure.storage.file.datalake.implementation.util.ModelHelper.mapToDataLakeStorageException(ModelHelper.java:210)
at reactor.core.publisher.Mono.lambda$onErrorMap$30(Mono.java:3797)
at reactor.core.publisher.Mono.lambda$onErrorResume$32(Mono.java:3887)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:142)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2196)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2070)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
at reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2666)
at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:180)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
at reactor.core.publisher.MonoUsing$MonoUsingSubscriber.onNext(MonoUsing.java:232)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:126)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:191)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:129)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:439)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:493)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:796)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at com.azure.core.http.netty.implementation.AzureSdkHandler.channelRead(AzureSdkHandler.java:224)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:434)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:249)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1537)
at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1408)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1448)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1429)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:167)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.handle(AbstractNioChannel.java:445)
at io.netty.channel.nio.NioIoHandler$DefaultNioRegistration.handle(NioIoHandler.java:381)
at io.netty.channel.nio.NioIoHandler.processSelectedKey(NioIoHandler.java:575)
at io.netty.channel.nio.NioIoHandler.processSelectedKeysOptimized(NioIoHandler.java:550)
at io.netty.channel.nio.NioIoHandler.processSelectedKeys(NioIoHandler.java:491)
at io.netty.channel.nio.NioIoHandler.run(NioIoHandler.java:468)
at io.netty.channel.SingleThreadIoEventLoop.runIo(SingleThreadIoEventLoop.java:206)
at io.netty.channel.SingleThreadIoEventLoop.run(SingleThreadIoEventLoop.java:177)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:1073)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:100)
at reactor.core.publisher.Mono.block(Mono.java:1742)
at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:144)
at com.azure.storage.file.datalake.DataLakeFileClient.flushWithResponse(DataLakeFileClient.java:1034)
at com.azure.storage.file.datalake.DataLakeFileClient.flushWithResponse(DataLakeFileClient.java:979)
at io.lenses.streamreactor.connect.datalake.storage.DatalakeStorageInterface.$anonfun$writeBlobToFile$1(DatalakeStorageInterface.scala:407)
at scala.util.Try$.apply(Try.scala:217)
at io.lenses.streamreactor.connect.datalake.storage.DatalakeStorageInterface.tryWriteBlob$1(DatalakeStorageInterface.scala:389)
at io.lenses.streamreactor.connect.datalake.storage.DatalakeStorageInterface.writeBlobToFile(DatalakeStorageInterface.scala:412)
at io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerV2.$anonfun$createNewIndexFileNoOverwrite$1(IndexManagerV2.scala:228)
at scala.util.Either.flatMap(Either.scala:360)
at io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerV2.createNewIndexFileNoOverwrite(IndexManagerV2.scala:221)
at io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerV2.$anonfun$open$6(IndexManagerV2.scala:122)
at scala.util.Either.flatMap(Either.scala:360)
at io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerV2.$anonfun$open$5(IndexManagerV2.scala:118)
at scala.util.Either.flatMap(Either.scala:360)
at io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerV2.open(IndexManagerV2.scala:116)
at io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerV2.$anonfun$open$2(IndexManagerV2.scala:92)
at cats.effect.IOFiber.runLoop(IOFiber.scala:357)
at cats.effect.IOFiber.execR(IOFiber.scala:1397)
at cats.effect.IOFiber.run(IOFiber.scala:122)
at cats.effect.unsafe.WorkerThread.run(WorkerThread.scala:851)
I think the most relevant parts of this stack trace are:
Status code 412, \"{\"error\":{\"code\":\"ConditionNotMet\",\"message\":\"The condition specified using HTTP conditional header(s) is not met.
and
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:100)
at reactor.core.publisher.Mono.block(Mono.java:1742)
at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:144)
at com.azure.storage.file.datalake.DataLakeFileClient.flushWithResponse(DataLakeFileClient.java:1034)
at com.azure.storage.file.datalake.DataLakeFileClient.flushWithResponse(DataLakeFileClient.java:979)
at io.lenses.streamreactor.connect.datalake.storage.DatalakeStorageInterface.$anonfun$writeBlobToFile$1(DatalakeStorageInterface.scala:407)
at scala.util.Try$.apply(Try.scala:217)
at io.lenses.streamreactor.connect.datalake.storage.DatalakeStorageInterface.tryWriteBlob$1(DatalakeStorageInterface.scala:389)
at io.lenses.streamreactor.connect.datalake.storage.DatalakeStorageInterface.writeBlobToFile(DatalakeStorageInterface.scala:412)
at io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerV2.$anonfun$createNewIndexFileNoOverwrite$1(IndexManagerV2.scala:228)
at scala.util.Either.flatMap(Either.scala:360)
at io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerV2.createNewIndexFileNoOverwrite(IndexManagerV2.scala:221)
at io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerV2.$anonfun$open$6(IndexManagerV2.scala:122)
at scala.util.Either.flatMap(Either.scala:360)
at io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerV2.$anonfun$open$5(IndexManagerV2.scala:118)
at scala.util.Either.flatMap(Either.scala:360)
at io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerV2.open(IndexManagerV2.scala:116)
at io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerV2.$anonfun$open$2(IndexManagerV2.scala:92)
at cats.effect.IOFiber.runLoop(IOFiber.scala:357)
at cats.effect.IOFiber.execR(IOFiber.scala:1397)
at cats.effect.IOFiber.run(IOFiber.scala:122)
at cats.effect.unsafe.WorkerThread.run(WorkerThread.scala:851)
I think the problem is that the writeBlobToFile method creates a new file which must have an etag, then calls DataLakeFileClient.flushWithResponse with a condition that prevents the operation from running on any file with an etag.
To fix this issue, I'd propose changing the createFile call to createFileIfNotExists, so that if a file already exists it will not be overwritten, as well as removing this condition to protect any existing file when this method is called to create a new index file. I've created a PR with these proposed changes.