Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating a stream using schema inference by ID and using a custom timestamp does not work #10370

Open
filpano opened this issue Jul 10, 2024 · 0 comments

Comments

@filpano
Copy link

filpano commented Jul 10, 2024

Describe the bug
I have the following stream:

CREATE OR REPLACE STREAM MYSTREAM WITH (KAFKA_TOPIC='my-topic', TIMESTAMP='bookingEndedAt', VALUE_FORMAT='AVRO');

This works. However, for reproducible migrations, I wish to explicitly state the schema ID as follows:

CREATE OR REPLACE STREAM MYSTREAM WITH (KAFKA_TOPIC='my-topic', TIMESTAMP='bookingEndedAt', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=1);

However, this gives an error similar to the following:

The TIMESTAMP column set in the WITH clause does not exist in the schema: 'BOOKINGENDEDAT'

Casing seems to have no effect on this.

To Reproduce
ksql> version
Version: 7.6.1

  1. Create schema that contains a custom timestamp field
  2. Try to create a stream using both the TIMESTAMP and VALUE_SCHEMA_ID stream properties

There is a full example (including schema) here: https://stackoverflow.com/questions/78447435/custom-timestamp-in-ksqldb-stream-with-avro-in-schema-registry which is about the same issue. To prevent link-rot:

Avro schema:

{
  "type": "record",
  "name": "Price",
  "namespace": "dh762",
  "fields": [
    {
      "name": "node_id",
      "type": "string"
    },
    {
      "name": "timestamp",
      "type": "string"
    }
  ]
}

ksql statement:

CREATE STREAM stream_1 (
    node_id VARCHAR KEY
) WITH (
    KAFKA_TOPIC='prices',
    VALUE_FORMAT='AVRO',
    PARTITIONS=3,
    VALUE_SCHEMA_ID=1,
    TIMESTAMP='timestamp',
    TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ss.SSSX'
); 

Expected behavior
The stream should be created using the custom timestamp field.

Actual behaviour
An error message:

The TIMESTAMP column set in the WITH clause does not exist in the schema: 'BOOKINGENDEDAT'

The following stacktrace is visible in the ksqldb server logs:

[2024-07-10 14:55:48,933] INFO Processed unsuccessfully: KsqlRequest{configOverrides={ksql.streams.consumer.isolation.level=read_committed, ksql.streams.producer.compression.type=zstd, processing.guarantee=exactly_once_v2, auto.offset.reset=earliest}, requestProperties={}, commandSequenceNumber=Optional[18]} (daf40317-c0c3-337e-96c4-78c6d9460c2d): CREATE OR REPLACE STREAM stream1 WITH (KAFKA_TOPIC='[string]', TIMESTAMP='[string]', VALUE_FORMAT='[string]', VALUE_SCHEMA_ID='0'); (io.confluent.ksql.logging.query.QueryLogger)
2024-07-10T14:55:48.933889112Z io.confluent.ksql.util.KsqlStatementException: The TIMESTAMP column set in the WITH clause does not exist in the schema: 'BOOKINGENDEDAT'
2024-07-10T14:55:48.933890822Z 	at io.confluent.ksql.engine.EngineExecutor.plan(EngineExecutor.java:688)
2024-07-10T14:55:48.933892397Z 	at io.confluent.ksql.engine.SandboxedExecutionContext.plan(SandboxedExecutionContext.java:159)
2024-07-10T14:55:48.933894021Z 	at io.confluent.ksql.rest.server.computation.ValidatedCommandFactory.createForPlannedQuery(ValidatedCommandFactory.java:290)
2024-07-10T14:55:48.933895618Z 	at io.confluent.ksql.rest.server.computation.ValidatedCommandFactory.createCommand(ValidatedCommandFactory.java:140)
2024-07-10T14:55:48.933897173Z 	at io.confluent.ksql.rest.server.computation.ValidatedCommandFactory.create(ValidatedCommandFactory.java:87)
2024-07-10T14:55:48.933898690Z 	at io.confluent.ksql.rest.server.validation.RequestValidator.validate(RequestValidator.java:177)
2024-07-10T14:55:48.933900212Z 	at io.confluent.ksql.rest.server.validation.RequestValidator.validate(RequestValidator.java:129)
2024-07-10T14:55:48.933901730Z 	at io.confluent.ksql.rest.server.resources.KsqlResource.handleKsqlStatements(KsqlResource.java:310)
2024-07-10T14:55:48.933903245Z 	at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeKsqlRequest$2(KsqlServerEndpoints.java:183)
2024-07-10T14:55:48.933904771Z 	at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOldApiEndpointOnWorker$24(KsqlServerEndpoints.java:348)
2024-07-10T14:55:48.933906370Z 	at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOnWorker$23(KsqlServerEndpoints.java:334)
2024-07-10T14:55:48.933907923Z 	at io.vertx.core.impl.ContextBase.lambda$null$0(ContextBase.java:137)
2024-07-10T14:55:48.933909411Z 	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
2024-07-10T14:55:48.933911107Z 	at io.vertx.core.impl.ContextBase.lambda$executeBlocking$1(ContextBase.java:135)
2024-07-10T14:55:48.933912612Z 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2024-07-10T14:55:48.933914114Z 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2024-07-10T14:55:48.933915619Z 	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
2024-07-10T14:55:48.933917135Z 	at java.base/java.lang.Thread.run(Thread.java:829)
2024-07-10T14:55:48.933918677Z Caused by: io.confluent.ksql.util.KsqlException: The TIMESTAMP column set in the WITH clause does not exist in the schema: 'BOOKINGENDEDAT'
2024-07-10T14:55:48.933920249Z 	at io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory.lambda$create$0(TimestampExtractionPolicyFactory.java:60)
2024-07-10T14:55:48.933921839Z 	at java.base/java.util.Optional.orElseThrow(Optional.java:408)
2024-07-10T14:55:48.933923323Z 	at io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory.create(TimestampExtractionPolicyFactory.java:58)
2024-07-10T14:55:48.933926370Z 	at io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory.validateTimestampColumn(TimestampExtractionPolicyFactory.java:42)
2024-07-10T14:55:48.933927966Z 	at io.confluent.ksql.ddl.commands.CreateSourceFactory.buildTimestampColumn(CreateSourceFactory.java:313)
2024-07-10T14:55:48.933929511Z 	at io.confluent.ksql.ddl.commands.CreateSourceFactory.createStreamCommand(CreateSourceFactory.java:121)
2024-07-10T14:55:48.933931039Z 	at io.confluent.ksql.ddl.commands.CommandFactories.handleCreateStream(CommandFactories.java:134)
2024-07-10T14:55:48.933932581Z 	at io.confluent.ksql.util.HandlerMaps$BuilderR2.lambda$castHandler2$2(HandlerMaps.java:840)
2024-07-10T14:55:48.933934093Z 	at io.confluent.ksql.ddl.commands.CommandFactories.create(CommandFactories.java:112)
2024-07-10T14:55:48.933935590Z 	at io.confluent.ksql.engine.EngineContext.createDdlCommand(EngineContext.java:233)
2024-07-10T14:55:48.933937110Z 	at io.confluent.ksql.engine.EngineExecutor.plan(EngineExecutor.java:634)
2024-07-10T14:55:48.933938623Z 	... 17 more
2024-07-10T14:55:48.934484457Z 

What is a bit odd at this point is that the logs contain the following query:

(daf40317-c0c3-337e-96c4-78c6d9460c2d): CREATE OR REPLACE STREAM stream1 WITH (KAFKA_TOPIC='[string]', TIMESTAMP='[string]', VALUE_FORMAT='[string]', VALUE_SCHEMA_ID='0'); (io.confluent.ksql.logging.query.QueryLogger)

which I'm guessing is some kind of obfuscation in the logs (note that the stream is called stream1 in the logs, and the VALUE_SCHEMA_ID='0' is not the same ID that I passed during creation!)

Additional context
I think this functionality is a hard requirement for reproducible deployments and migration scenarios. The initial creation of the stream should use e.g. version 1 of a a schema in my example above. If the VALUE_SCHEMA_ID is omitted from the CREATE STREAM query, the stream would use the current, latest version of the schema backing the topic, which might already be at version e.g. 2, causing potential compatibility problems. This is somewhat amortized in my case due to the fact that we are using BACKWARD_TRANSITIVE compatibility mode, but it does make it so that there are situations where the deployments are not necessarily deterministic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant