Skip to content

KafkaConnector authentication with Apicurio Azure AD #6032

@seb2020

Description

@seb2020

Description

Hi,

I am using Debezium in OpenShift with Apicurio (redhat flavor for both of them). I have configured a KafkaConnector to use the schema registry and it need to login with oauth through Azure.

It seems that KafkaConnector doesn't do the authentication and therefore doesn't have access to the registry. In the firewall or proxy log, there is no connection to Azure AD when the KafkaConnector is running. (if there is no connection the authentication cannot be working for me)

I have followed https://www.apicur.io/blog/2024/09/23/registry-azure-entraid-tutorial and with the same clientID that use KafkaConnector I can interact with the API. So it seems that the Apicurio side is working correctly with Azure.

Environment

OpenShift 4.16
Apicurio 2.6.8+0.1739457262.p
Streams for Apache Kafka 2.7.0-7

Steps to Reproduce

KafkaConnector configuration :

    key.converter: io.apicurio.registry.utils.converter.AvroConverter
    key.converter.apicurio.registry.url: https://xxx/apis/registry/v2
    key.converter.apicurio.rest.request.ssl.ca-bundle.location: /opt/kafka/external-configuration/ca-bundle/ca-certificates.crt
    key.converter.apicurio.registry.auto-register: true
    key.converter.apicurio.registry.find-latest: true
    key.converter.apicurio.registry.auth.service.token.endpoint: https://login.microsoftonline.com/<id>/oauth2/v2.0/token/
    key.converter.apicurio.registry.auth.client.id: xxx
    key.converter.apicurio.registry.auth.client.secret: xxx
    key.converter.apicurio.registry.auth.client.scope: xxx/.default

But it doesn't work. I can see in the log of Apicurio after enabling the debug log when the KafkaConnector is configured:

2025-02-25 06:16:13 DEBUG <> [io.quarkus.oidc.runtime.OidcAuthenticationMechanism$3] (executor-thread-196) Resolved OIDC tenant id: Default
2025-02-25 06:16:13 DEBUG <> [io.quarkus.oidc.runtime.BearerAuthenticationMechanism] (executor-thread-196) Starting a bearer access token authentication
2025-02-25 06:16:13 DEBUG <> [io.quarkus.oidc.runtime.OidcUtils] (executor-thread-196) Looking for a token in the authorization header
2025-02-25 06:16:13 DEBUG <> [io.quarkus.oidc.runtime.BearerAuthenticationMechanism] (executor-thread-196) Bearer access token is not available

And this is the log on KafkaConnector side:

2025-02-27 07:26:26,466 ERROR [kafka-connector-ulysse-int-postgres-apicurio|task-0] WorkerSourceTask{id=kafka-connector-ulysse-int-postgres-apicurio-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-kafka-connector-ulysse-int-postgres-apicurio-0]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:491)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:402)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.apicurio.rest.client.auth.exception.NotAuthorizedException: Authentication exception
    at io.apicurio.registry.rest.client.impl.ErrorHandler.handleErrorResponse(ErrorHandler.java:54)
    at io.apicurio.rest.client.handler.BodyHandler.lambda$toSupplierOfType$1(BodyHandler.java:55)
    at io.apicurio.rest.client.JdkHttpClient.sendRequest(JdkHttpClient.java:204)
    at io.apicurio.registry.rest.client.impl.RegistryClientImpl.createArtifact(RegistryClientImpl.java:311)
    at io.apicurio.registry.rest.client.RegistryClient.createArtifact(RegistryClient.java:147)
    at io.apicurio.registry.resolver.DefaultSchemaResolver.lambda$handleAutoCreateArtifact$4(DefaultSchemaResolver.java:280)
    at io.apicurio.registry.resolver.ERCache.lambda$getValue$0(ERCache.java:201)
    at io.apicurio.registry.resolver.ERCache.retry(ERCache.java:254)
    at io.apicurio.registry.resolver.ERCache.getValue(ERCache.java:200)
    at io.apicurio.registry.resolver.ERCache.getByContent(ERCache.java:175)
    at io.apicurio.registry.resolver.DefaultSchemaResolver.handleAutoCreateArtifact(DefaultSchemaResolver.java:278)
    at io.apicurio.registry.resolver.DefaultSchemaResolver.getSchemaFromRegistry(DefaultSchemaResolver.java:125)
    at io.apicurio.registry.resolver.DefaultSchemaResolver.lambda$resolveSchema$0(DefaultSchemaResolver.java:93)
    at java.base/java.util.Optional.orElseGet(Optional.java:364)
    at io.apicurio.registry.resolver.DefaultSchemaResolver.resolveSchema(DefaultSchemaResolver.java:93)
    at io.apicurio.registry.serde.AbstractKafkaSerializer.serialize(AbstractKafkaSerializer.java:83)
    at io.apicurio.registry.utils.converter.SerdeBasedConverter.fromConnectData(SerdeBasedConverter.java:122)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$5(AbstractWorkerSourceTask.java:491)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    ... 13 more

Expected vs Actual Behaviour

KafkaConnector doesn't do the authentication

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions