Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regression using Flink 1.20 relating to timestamp precision #143

Open
davidradl opened this issue Feb 3, 2025 · 5 comments
Open

Regression using Flink 1.20 relating to timestamp precision #143

davidradl opened this issue Feb 3, 2025 · 5 comments

Comments

@davidradl
Copy link
Contributor

davidradl commented Feb 3, 2025

I have a test case

 CREATE TABLE `source_1`
 (
     `customerId`                   STRING,
     `param_string_date_time`       TIMESTAMP_LTZ(9),
     `ts` TIMESTAMP(3),
     WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE
 )
 WITH (
     'connector' = 'filesystem',
     'path' = '/Users/davidradley/temp/regressionissue.txt',
     'format' = 'json'
 );

with the file contents:

{ "orderId": 1, "customerId": "1", "param_string_date_time": "2022-11-20 00:00:00.000Z", "ts": "2020-04-15 08:05:00.000" }
{ "orderId": 2, "customerId": "2", "param_string_date_time": "2022-11-21 00:00:00.000Z", "ts": "2020-04-15 08:07:00.000" }
{ "orderId": 3, "customerId": "3", "param_string_date_time": "2022-11-22 00:00:00.000Z", "ts": "2020-04-15 08:09:00.000" }
{ "orderId": 4, "customerId": "4", "param_string_date_time": "2022-11-23 00:00:00.000Z", "ts": "2020-04-15 08:11:00.000" }

I then create a view

CREATE TEMPORARY VIEW `api_1_source_1__API` AS
SELECT *, PROCTIME() AS `proc_time`
FROM `source_1`; 

and a lookup table

CREATE TEMPORARY TABLE `api_1_lookup__API`
(
    `customerId`           STRING,
    `param_string_date_time` TIMESTAMP_LTZ(6)
)
WITH (
    'connector' = 'rest-lookup',
    'url' = 'http://localhost:8089/api1',
    'format' = 'json',
    'asyncPolling' = 'false',
    'lookup-method' = 'GET',
    'gid.connector.http.source.lookup.header.Content-Type' = 'application/json',
    'gid.connector.http.source.lookup.header.Origin' = '*',
    'gid.connector.http.source.lookup.header.X-Content-Type-Options' = 'nosniff',
    'gid.connector.http.source.lookup.request.timeout' = '30',
    'gid.connector.http.source.lookup.request.thread-pool.size' = '8',
    'gid.connector.http.source.lookup.response.thread-pool.size' = '4'
);

and so a lookup

SELECT
    `api_1_source_1__API`.`customerId` AS `customerId`,
    `api_1_lookup__API`.`param_string_date_time`  AS `param_string_date_time`
FROM `api_1_source_1__API`
JOIN `api_1_lookup__API` FOR SYSTEM_TIME AS OF `api_1_source_1__API`.`proc_time` ON
    `api_1_lookup__API`.`customerId`=`api_1_source_1__API`.`customerId` AND
    `api_1_lookup__API`.`param_string_date_time`=`api_1_source_1__API`.`param_string_date_time`;
;

At 1.19.1, the query that goes up has 2 query params customerId and param_string_date_time
At 1.20, the query that goes up has 1 query param customerId

This is caused by a flink PR which introduces logic to do compares at the maximum precision. The implication of this is that the table planner ends up seeing TIMESTAMP(6) and TIMESTAMP(9) as the 2 types in commonPhysicalLookupJoin analyzeLookupKeys. So it decides that this is not going to be a lookup key instead it treats it as a join condition.

Unfortunately the http connector currently is not looking for join conditions, so we lose this.

For JDBC connectors doing a look up, they support join conditions so the lookup join succeeds.

some thoughts / observations:

  • the http lookup connector to look for equality join conditions using similar logic to the JDBC connector then go ahead with this equality join condition as if it were a join key.
  • alternatively the lookup connector could be more explicit as to which conditions should be treated a lookup keys and which are join conditions, this would allow non-equality join conditions to be handled in the connector as a new capability
  • unfortunately applyFilters comes in an interface supportsFilterPushdown, which is only driven for scan sources.

@grzegorz8 @kzarzycki WDYT?

@grzegorz8
Copy link
Member

grzegorz8 commented Mar 20, 2025

Thanks for reporting the issue! Today I'll try to focus on this.

Is there an easy workaround for that? What if you cast param_string_date_time TIMESTAMP_LTZ(9) to TIMESTAMP_LTZ(6)?


EDIT: I made my own example for better understanding.

CREATE TABLE Orders (
    user_id INTEGER,
    order_time TIMESTAMP(6),
    proc_time AS proctime()
) WITH (
  'connector' = 'datagen',
  'number-of-rows' = '5',
  'rows-per-second' = '1'
);

CREATE TABLE `user_data` (
    `user_id` INTEGER,
    `date_from` TIMESTAMP(6), -- or TIMESTAMP(3)
    `first_name` STRING,
    `last_name` STRING
)
WITH (
    'connector' = 'rest-lookup',
    'url' = 'http://http-service:8000/dummy',
    'format' = 'json',
    'asyncPolling' = 'false',
    'lookup-method' = 'GET'
);

SELECT *
FROM Orders o
LEFT JOIN user_data
FOR SYSTEM_TIME AS OF o.proc_time AS u
ON o.user_id = u.user_id AND o.order_time = u.date_from
;

Then I run explain twice: (1) when date_from is TIMESTAMP(6), (2) when date_from is TIMESTAMP(3).

| == Abstract Syntax Tree ==
LogicalProject(user_id=[$0], order_time=[$1], proc_time=[$2], user_id0=[$3], date_from=[$4], first_name=[$5], last_name=[$6])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1, 2}])
   :- LogicalProject(user_id=[$0], order_time=[$1], proc_time=[PROCTIME()])
   :  +- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
   +- LogicalFilter(condition=[AND(=($cor0.user_id, $0), =($cor0.order_time, $1))])
      +- LogicalSnapshot(period=[$cor0.proc_time])
         +- LogicalTableScan(table=[[default_catalog, default_database, user_data]])

== Optimized Physical Plan ==
Calc(select=[user_id, order_time, PROCTIME_MATERIALIZE(proc_time) AS proc_time, user_id0, date_from, first_name, last_name])
+- LookupJoin(table=[default_catalog.default_database.user_data], joinType=[LeftOuterJoin], lookup=[user_id=user_id, date_from=order_time], select=[user_id, order_time, proc_time, user_id0, date_from, first_name, last_name])
   +- Calc(select=[user_id, order_time, PROCTIME() AS proc_time])
      +- TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[user_id, order_time])

== Optimized Execution Plan ==
Calc(select=[user_id, order_time, PROCTIME_MATERIALIZE(proc_time) AS proc_time, user_id0, date_from, first_name, last_name])
+- LookupJoin(table=[default_catalog.default_database.user_data], joinType=[LeftOuterJoin], lookup=[user_id=user_id, date_from=order_time], select=[user_id, order_time, proc_time, user_id0, date_from, first_name, last_name])
   +- Calc(select=[user_id, order_time, PROCTIME() AS proc_time])
      +- TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[user_id, order_time])
 |
| == Abstract Syntax Tree ==
LogicalProject(user_id=[$0], order_time=[$1], proc_time=[$2], user_id0=[$3], date_from=[$4], first_name=[$5], last_name=[$6])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1, 2}])
   :- LogicalProject(user_id=[$0], order_time=[$1], proc_time=[PROCTIME()])
   :  +- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
   +- LogicalFilter(condition=[AND(=($cor0.user_id, $0), =($cor0.order_time, CAST($1):TIMESTAMP(6)))])
      +- LogicalSnapshot(period=[$cor0.proc_time])
         +- LogicalTableScan(table=[[default_catalog, default_database, user_data]])

== Optimized Physical Plan ==
Calc(select=[user_id, order_time, PROCTIME_MATERIALIZE(proc_time) AS proc_time, user_id0, date_from, first_name, last_name])
+- LookupJoin(table=[default_catalog.default_database.user_data], joinType=[LeftOuterJoin], lookup=[user_id=user_id], joinCondition=[=(order_time, date_from0)], select=[user_id, order_time, proc_time, user_id, date_from, first_name, last_name, CAST(date_from AS TIMESTAMP(6)) AS date_from0])
   +- Calc(select=[user_id, order_time, PROCTIME() AS proc_time])
      +- TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[user_id, order_time])

== Optimized Execution Plan ==
Calc(select=[user_id, order_time, PROCTIME_MATERIALIZE(proc_time) AS proc_time, user_id0, date_from, first_name, last_name])
+- LookupJoin(table=[default_catalog.default_database.user_data], joinType=[LeftOuterJoin], lookup=[user_id=user_id], joinCondition=[(order_time = date_from0)], select=[user_id, order_time, proc_time, user_id, date_from, first_name, last_name, CAST(date_from AS TIMESTAMP(6)) AS date_from0])
   +- Calc(select=[user_id, order_time, PROCTIME() AS proc_time])
      +- TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[user_id, order_time])
 |

Important diff (lookup=[] section):

+- LookupJoin(table=[default_catalog.default_database.user_data], joinType=[LeftOuterJoin], lookup=[user_id=user_id, date_from=order_time], select=[user_id, order_time, proc_time, user_id0, date_from, first_name, last_name])
+- LookupJoin(table=[default_catalog.default_database.user_data], joinType=[LeftOuterJoin], lookup=[user_id=user_id], joinCondition=[(order_time = date_from0)], select=[user_id, order_time, proc_time, user_id, date_from, first_name, last_name, CAST(date_from AS TIMESTAMP(6)) AS date_from0])

@grzegorz8
Copy link
Member

@davidradl I am afraid that none of the options mentioned is feasible. At least this is how I see it.


the http lookup connector to look for equality join conditions using similar logic to the JDBC connector then go ahead with this equality join condition as if it were a join key.

How can we extract information about other join condition? Unfortunately, LookupContext has only getKeys() method.

public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext)

alternatively the lookup connector could be more explicit as to which conditions should be treated a lookup keys and which are join conditions, this would allow non-equality join conditions to be handled in the connector as a new capability

Could you please elaborate how we can influence query planner on how to decide which condition is a lookup condition?


unfortunately applyFilters comes in an interface supportsFilterPushdown, which is only driven for scan sources.

You are right. It seems that applyFilters() is called only for scan.

@davidradl
Copy link
Contributor Author

@grzegorz8 yes I think the only real option would be implement the scan interface. At the moment we have a circumvention that we know what the users filters are from the request mapping in the config - this will be in the code I am currently working on for #99.

@grzegorz8
Copy link
Member

@grzegorz8 yes I think the only real option would be implement the scan interface. At the moment we have a circumvention that we know what the users filters are from the request mapping in the config - this will be in the code I am currently working on for #99.

Adding scan capability requires a lot of effort. It would be very nice feature, but it does not solve the issue. The planner can still decide to use lookup instead of scan, unless you specify join type via a hint.

What works for me is to explicitly cast timestamp on the left side to the timestamp with precision defined in lookup table. In my example, the query should look like:

SELECT *
FROM Orders o
LEFT JOIN user_data
FOR SYSTEM_TIME AS OF o.proc_time AS u
ON o.user_id = u.user_id AND CAST(o.order_time AS TIMESTAMP(3)) = u.date_from
;

@davidradl
Copy link
Contributor Author

@grzegorz8 yes thanks for looking at this - we are also looking at aligning the precisions so this would come through as a lookup key.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants