Skip to content

feat: add QuixDatalakeSink for writing Kafka data to blob storage in … #950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

ovv
Copy link
Contributor

@ovv ovv commented Jun 23, 2025

…Avro format

  • Introduce QuixDatalakeSink and QuixDatalakeSinkConfig for persisting raw Kafka topic data to blob storage (S3, ADLS, GCS, etc.) using fsspec.
  • Store data in Avro format with Hive-style partitioning and maintain Parquet metadata indexes for efficient querying.
  • Add integration with Quix Data Catalog for cache refresh.
  • Update pyproject.toml with new dependencies for datalake support.
  • Add comprehensive tests for sink functionality, including partitioning, headers, multiple flushes, and empty flushes.

…Avro format

- Introduce QuixDatalakeSink and QuixDatalakeSinkConfig for persisting raw Kafka topic data to blob storage (S3, ADLS, GCS, etc.) using fsspec.
- Store data in Avro format with Hive-style partitioning and maintain Parquet metadata indexes for efficient querying.
- Add integration with Quix Data Catalog for cache refresh.
- Update pyproject.toml with new dependencies for datalake support.
- Add comprehensive tests for sink functionality, including partitioning, headers, multiple flushes, and empty flushes.
@ovv ovv requested a review from Copilot June 23, 2025 15:24
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a new sink, QuixDatalakeSink, for writing raw Kafka topic data in Avro format to blob storage with Hive-style partitioning and Parquet metadata, along with integration for refreshing a Quix Data Catalog cache. It also adds supporting tests and updates project dependencies.

  • Add QuixDatalakeSink and QuixDatalakeSinkConfig, implement Avro file writing, Parquet metadata indexing, and optional Data Catalog notification
  • Provide a dummy fsspec filesystem and comprehensive tests covering partitioning, headers, multiple records, and multiple flushes
  • Update pyproject.toml to include new datalake dependencies and import ignores

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
quixstreams/sinks/core/datalake/sink.py Implements QuixDatalakeSink, configuration, Avro/Parquet I/O, and Data Catalog notification
tests/test_quixstreams/test_sinks/test_core/test_datalake_sink.py Adds end-to-end tests for basic writes, partition handling, headers, multi-record, and flush logic
quixstreams/sinks/core/datalake/init.py Exports QuixDatalakeSink and its config in the package
quixstreams/sinks/init.py Registers new sink and config in the main sinks module
pyproject.toml Adds datalake extra requirements and mypy ignores for fsspec
Comments suppressed due to low confidence (3)

quixstreams/sinks/core/datalake/sink.py:139

  • [nitpick] The list comprehension unpacks header tuples using k, which shadows the outer variable k for the record key; use distinct names (e.g., header_key, header_value) to avoid confusion.
                "headers": [{"key": k, "value": v} for k, v in headers],

quixstreams/sinks/core/datalake/sink.py:238

  • There are no tests covering the Data Catalog notification logic; consider adding tests for scenarios when datacatalog_api_url is set and when the notification succeeds or fails.
    def _notify_datacatalog(self, topic: str) -> None:

quixstreams/sinks/core/datalake/sink.py:104

  • [nitpick] Private methods like _get_protocol lack docstrings; consider adding brief docstrings to explain their purpose and parameters.
    def _get_protocol(self, url: str) -> str:

ovv and others added 4 commits June 23, 2025 17:46
- Remove QuixDatalakeSinkConfig dataclass; pass config as direct arguments to QuixDatalakeSink.
- Refactor sink to maintain a per-key/partition index and write Parquet index files accordingly.
- Update flush logic to persist Avro files and Parquet index files per key/partition.
- Ensure Data Catalog notification is called once per topic after flush.
- Update tests to match new config and index logic.
@ovv ovv closed this Jun 24, 2025
@ovv ovv deleted the ovv/quixdatalakesink branch June 24, 2025 13:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant