Skip to content

Conversation

@AdityaTeltia
Copy link

@AdityaTeltia AdityaTeltia commented Jan 11, 2026

Summary

Implements Apache Flink Catalog integration for Apicurio Registry, enabling Flink SQL jobs to resolve table schemas directly from the registry.

Closes #7002

Design Notes

Architecture

The implementation follows Flink's Catalog SPI pattern:

  • Read-only catalog - Tables are resolved from registry artifacts, not created via DDL
  • Lazy schema resolution - Schemas are fetched on-demand with configurable TTL caching
  • Clean separation - Converters are isolated from catalog logic for extensibility

Key Decisions

  1. Groups → Databases mapping - Natural fit since both represent namespaces
  2. Artifacts → Tables mapping - Each artifact version represents a table schema
  3. Builder pattern for CatalogConfig - Avoids constructor with many parameters, checkstyle compliant
  4. State schemas in separate group - Uses {groupId}-state suffix to isolate state schemas from table schemas
  5. Avro-based compatibility checking - Leverages Avro's built-in SchemaCompatibility for reliable evolution validation

Future Considerations

  • Protobuf schema support (converter needed)
  • Watermark/timestamp column extraction from schema metadata
  • Kafka connector deep integration

Changes

Core Catalog

  • ApicurioCatalog - Flink Catalog implementation backed by Apicurio Registry
  • ApicurioCatalogFactory - SPI factory for SQL CLI configuration
  • ApicurioCatalogOptions - Configuration options (URL, auth, caching)

Schema Converters

  • AvroFlinkTypeConverter - Converts Avro schemas to Flink row types
  • JsonSchemaFlinkTypeConverter - Converts JSON Schema to Flink row types

State Schema Evolution

  • StateSchemaService - Register and track Flink state schemas
  • SchemaCompatibilityChecker - Validate BACKWARD/FORWARD/FULL compatibility
  • SavepointValidator - Validate savepoint compatibility before job restart

Authentication

  • Basic auth (username/password)
  • OAuth2 (token endpoint, client ID, client secret)

Concept Mapping

Apicurio Registry Flink Catalog
Groups Databases
Artifacts Tables

Testing

  • 21 unit tests passing
  • Integration test (ApicurioCatalogIT) for live registry testing
  • 0 checkstyle violations

Usage

CREATE CATALOG my_registry WITH (
  'type' = 'apicurio',
  'registry.url' = 'http://localhost:8080/apis/registry/v3'
);
USE CATALOG my_registry;
SHOW DATABASES;

@EricWittmann
Copy link
Member

Wow thanks for working on this! I'll try to dig into this to understand what you've done asap. 👍

@AdityaTeltia
Copy link
Author

Hi @EricWittmann , just checking if you’ve had a chance to take an initial look. I can patch the PR over the weekend based on your feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Apache Flink Catalog Integration

2 participants