Skip to content

Http Sink Not Pushing into deadletterqueue topic #1819

@sahilpaudel-pe

Description

@sahilpaudel-pe

Hi There,

Below is my Http Sink Connector config, even after retry is exhausted it is not pushing into the dlq topic.

{
    "connector.class": "io.lenses.streamreactor.connect.http.sink.HttpSinkConnector",
    "tasks.max": "3",
    "topics": "prod-labso-dbzm.LABSO.dbo.tbl_ResultDetails",
    "connect.http.method": "POST",
    "connect.http.endpoint": "https://prod-labxpert.private.thyrocare.com/api/v1/cdc/result-change-event",
    "connect.http.connection.timeout.ms": 1000,
    "connect.http.retries.max.retries": 5,
    "connect.http.retry.mode": "exponential",
    "connect.http.json.tidy": true,
    "connect.http.batch.count": 1,
    "connect.http.null.payload.handler": "custom",
    "connect.http.null.payload.handler.custom": "",
    "connect.http.sink.error.policy": "DLQ",
    "connect.http.sink.dlq.policy": "topic",
    "connect.http.sink.dlq.topic.name": "prod-labso-dbzm.labso.cdc-failed-events",
    "consumer.override.auto.offset.reset": "latest",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schemas.enable": false,
    "connect.http.request.headers": "Content-Type:application/json",
    "connect.http.request.content": "{{value}}",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "errors.deadletterqueue.context.headers.enable": true,
    "errors.deadletterqueue.topic.name": "prod-labso-dbzm.labso.cdc-failed-events"
}

Error i see in the application is

Request threw an exception on attempt #5. Retrying after 7439 milliseconds   [org.http4s.client.middleware.Retry]
java.lang.IllegalStateException: Dispatcher already closed
	at async_ @ org.http4s.jdkhttpclient.package$.$anonfun$fromCompletableFuture$1(package.scala:39)
	at delay @ org.http4s.jdkhttpclient.JdkHttpClient$.$anonfun$apply$21(JdkHttpClient.scala:246)
	at use @ fs2.Compiler$Target.compile(Compiler.scala:157)
	at tupled @ org.http4s.jdkhttpclient.JdkHttpClient$.convertResponse$1(JdkHttpClient.scala:177)
2025-08-04 04:51:30,532 INFO   ||  10.86.85.198 - - [04/Aug/2025:04:51:30 +0000] "GET /connectors HTTP/1.1" 200 83 "-" "kube-probe/1.31+" 0   [org.apache.kafka.connect.runtime.rest.RestServer]
2025-08-04 04:51:30,689 INFO   ||  10.86.93.237 - - [04/Aug/2025:04:51:30 +0000] "GET /connectors/labs-prod-results-http-sink-connector-v1 HTTP/1.1" 200 2840 "-" "PostmanRuntime/7.44.1" 1   [org.apache.kafka.connect.runtime.rest.RestServer]
2025-08-04 04:51:32,464 INFO   ||  Request method=POST uri=https://prod-labxpert.private.thyrocare.com/api/v1/cdc/result-change-event headers=Content-Length: 6205,Content-Type: application/json threw an exception on attempt #6. Giving up.   [org.http4s.client.middleware.Retry]
java.lang.IllegalStateException: Dispatcher already closed
	at async_ @ org.http4s.jdkhttpclient.package$.$anonfun$fromCompletableFuture$1(package.scala:39)
	at delay @ org.http4s.jdkhttpclient.JdkHttpClient$.$anonfun$apply$21(JdkHttpClient.scala:246)
	at use @ fs2.Compiler$Target.compile(Compiler.scala:157)
	at tupled @ org.http4s.jdkhttpclient.JdkHttpClient$.convertResponse$1(JdkHttpClient.scala:177)
2025-08-04 04:51:32,465 ERROR  ||  Error in HttpWriter but not reached threshold so ignoring   [io.lenses.streamreactor.connect.http.sink.HttpWriter]
io.lenses.streamreactor.connect.http.sink.client.HttpResponseFailure: Dispatcher already closed
	at io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender.$anonfun$executeRequestAndHandleErrors$3(HttpRequestSender.scala:229)
	at apply @ io.lenses.streamreactor.connect.http.sink.HttpWriter.reportResult(HttpWriter.scala:161)
	at map @ io.lenses.streamreactor.connect.http.sink.HttpWriter.$anonfun$flush$2(HttpWriter.scala:139)
	at delay @ io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender.$anonfun$sendHttpRequest$19(HttpRequestSender.scala:177)
	at map @ io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender.$anonfun$sendHttpRequest$19(HttpRequestSender.scala:177)
	at flatMap @ io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender.$anonfun$sendHttpRequest$15(HttpRequestSender.scala:173)
	at modify @ fs2.internal.Scope.close(Scope.scala:262)
	at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
Caused by: java.lang.IllegalStateException: Dispatcher already closed
	at async_ @ org.http4s.jdkhttpclient.package$.$anonfun$fromCompletableFuture$1(package.scala:39)
	at delay @ org.http4s.jdkhttpclient.JdkHttpClient$.$anonfun$apply$21(JdkHttpClient.scala:246)
	at use @ fs2.Compiler$Target.compile(Compiler.scala:157)
	at tupled @ org.http4s.jdkhttpclient.JdkHttpClient$.convertResponse$1(JdkHttpClient.scala:177)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions