Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support gzip compression for S3 source connector #1339

Open
coolstone1985 opened this issue Aug 8, 2024 · 0 comments
Open

Support gzip compression for S3 source connector #1339

coolstone1985 opened this issue Aug 8, 2024 · 0 comments

Comments

@coolstone1985
Copy link

What version of the Stream Reactor are you reporting this issue for?

7.4.1

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

Yes. We are running Kafka version 3.6.0

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

Yes. We are using S3 as source.

Have you read the docs?

Yes.

What is the expected behaviour?

If files in S3 are in compressed format (eg. gzip/snappy, etc) then it should decompress the data and send it to kafka. Otherwise, if the data is already uncompressed, it should send it as is.

What was observed?

If the data is in gzipped format, for example, JSON gzip, it is unable to send data correctly. It tries to use the json parsing on the blob it gets from S3 and fails.

What is your Connect cluster configuration (connect-avro-distributed.properties)?

We use MSK connect. Our corresponding worker config is

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
consumer.auto.offset.reset=latest
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

What is your connector properties configuration (my-connector.properties)?

connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
topics=sinkConnectorParams.topicRegex
tasks.max=8
connect.s3.aws.region=us-east-2
connect.s3.compression.codec=GZIP
connect.s3.kcql=insert into ${topicName} select * from ${bucketName}:${prefix} STOREAS \`JSON\` COMPRESSION \`GZIP\`
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
errors.log.enable=true
connect.s3.aws.auth.mode=Default
kancko pushed a commit to kancko/stream-reactor that referenced this issue Dec 18, 2024
Add JsonStreamReader in order to properly process Json data compressed with GZIP compression codec

Resolves: lensesio#1339
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants