Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http source select exception #108

Closed
731879846kk opened this issue Jul 17, 2024 · 3 comments
Closed

http source select exception #108

731879846kk opened this issue Jul 17, 2024 · 3 comments
Assignees

Comments

@731879846kk
Copy link

Flink SQL> CREATE TABLE Customers (

      name STRING, 
      age INT,
      memoLong BIGINT,
      memoFloat STRING,
      memoDouble STRING,
      memoBigDecimal STRING,
      memoBoolean BOOLEAN,
      memoNull STRING

) WITH (
'connector' = 'rest-lookup',
'format' = 'json',
'url' = 'localhost:8080/getresult',
'asyncPolling' = 'true'
);
[INFO] Execute statement succeed.

Flink SQL> select * from Customers;

[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE].
Missing conversion is FlinkLogicalTableSourceScan[convention: LOGICAL -> STREAM_PHYSICAL]
There is 1 empty subset: rel#135:RelSubset#2.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], the relevant part of the original plan is as follows
119:FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, Customers]], fields=[name, age, memoLong, memoFloat, memoDouble, memoBigDecimal, memoBoolean, memoNull])

Root: rel#133:RelSubset#3.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]
Original rel:
FlinkLogicalSink(subset=[rel#117:RelSubset#1.LOGICAL.any.None: 0.[NONE].[NONE]], table=[anonymous_collect$1], fields=[name, age, memoLong, memoFloat, memoDouble, memoBigDecimal, memoBoolean, memoNull]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 121
FlinkLogicalTableSourceScan(subset=[rel#120:RelSubset#0.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, Customers]], fields=[name, age, memoLong, memoFloat, memoDouble, memoBigDecimal, memoBoolean, memoNull]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 7.3E9 io, 0.0 network, 0.0 memory}, id = 119

Sets:
Set#2, type: RecordType(VARCHAR(2147483647) name, INTEGER age, BIGINT memoLong, VARCHAR(2147483647) memoFloat, VARCHAR(2147483647) memoDouble, VARCHAR(2147483647) memoBigDecimal, BOOLEAN memoBoolean, VARCHAR(2147483647) memoNull)
rel#130:RelSubset#2.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#119
rel#119:FlinkLogicalTableSourceScan.LOGICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, Customers],fields=name, age, memoLong, memoFloat, memoDouble, memoBigDecimal, memoBoolean, memoNull), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 7.3E9 io, 0.0 network, 0.0 memory}
rel#135:RelSubset#2.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
Set#3, type: RecordType(VARCHAR(2147483647) name, INTEGER age, BIGINT memoLong, VARCHAR(2147483647) memoFloat, VARCHAR(2147483647) memoDouble, VARCHAR(2147483647) memoBigDecimal, BOOLEAN memoBoolean, VARCHAR(2147483647) memoNull)
rel#132:RelSubset#3.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#131
rel#131:FlinkLogicalSink.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#130,table=anonymous_collect$1,fields=name, age, memoLong, memoFloat, memoDouble, memoBigDecimal, memoBoolean, memoNull), rowcount=1.0E8, cumulative cost={2.0E8 rows, 2.0E8 cpu, 7.3E9 io, 0.0 network, 0.0 memory}
rel#133:RelSubset#3.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
rel#134:AbstractConverter.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#132,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf}
rel#136:StreamPhysicalSink.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#135,table=anonymous_collect$1,fields=name, age, memoLong, memoFloat, memoDouble, memoBigDecimal, memoBoolean, memoNull), rowcount=1.0E8, cumulative cost={inf}

Graphviz:
digraph G {
root [style=filled,label="Root"];
subgraph cluster2{
label="Set 2 RecordType(VARCHAR(2147483647) name, INTEGER age, BIGINT memoLong, VARCHAR(2147483647) memoFloat, VARCHAR(2147483647) memoDouble, VARCHAR(2147483647) memoBigDecimal, BOOLEAN memoBoolean, VARCHAR(2147483647) memoNull)";
rel119 [label="rel#119:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, Customers],fields=name, age, memoLong, memoFloat, memoDouble, memoBigDecimal, memoBoolean, memoNull\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 7.3E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
subset130 [label="rel#130:RelSubset#2.LOGICAL.any.None: 0.[NONE].[NONE]"]
subset135 [label="rel#135:RelSubset#2.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]",color=red]
}
subgraph cluster3{
label="Set 3 RecordType(VARCHAR(2147483647) name, INTEGER age, BIGINT memoLong, VARCHAR(2147483647) memoFloat, VARCHAR(2147483647) memoDouble, VARCHAR(2147483647) memoBigDecimal, BOOLEAN memoBoolean, VARCHAR(2147483647) memoNull)";
rel131 [label="rel#131:FlinkLogicalSink\ninput=RelSubset#130,table=anonymous_collect$1,fields=name, age, memoLong, memoFloat, memoDouble, memoBigDecimal, memoBoolean, memoNull\nrows=1.0E8, cost={2.0E8 rows, 2.0E8 cpu, 7.3E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
rel134 [label="rel#134:AbstractConverter\ninput=RelSubset#132,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8, cost={inf}",shape=box]
rel136 [label="rel#136:StreamPhysicalSink\ninput=RelSubset#135,table=anonymous_collect$1,fields=name, age, memoLong, memoFloat, memoDouble, memoBigDecimal, memoBoolean, memoNull\nrows=1.0E8, cost={inf}",shape=box]
subset132 [label="rel#132:RelSubset#3.LOGICAL.any.None: 0.[NONE].[NONE]"]
subset133 [label="rel#133:RelSubset#3.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
}
root -> subset133;
subset130 -> rel119[color=blue];
subset132 -> rel131[color=blue]; rel131 -> subset130[color=blue];
subset133 -> rel134; rel134 -> subset132;
subset133 -> rel136; rel136 -> subset135;
}
I dont know how to deal with it

@731879846kk
Copy link
Author

localhost:8080/getresult can getresult in local environment . i dont know where is the problem

@grzegorz8
Copy link
Member

HTTP Connector implements only lookup capability:

public class HttpLookupTableSource
    implements LookupTableSource, SupportsProjectionPushDown, SupportsLimitPushDown 

HttpLookupTableSource does not implement ScanTableSource, so you cannot run select * which is a scan operation. You need to use LookupJoin syntax (https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/joins/#lookup-join).

@kristoffSC
Copy link
Collaborator

Exactly like @grzegorz8 said,
This connector supports only Lookup source and Async Sink, although it would be cool to see a Scan source feature here I think it would be unpractical - `take a look at this here

@kristoffSC kristoffSC self-assigned this Aug 1, 2024
@grzegorz8 grzegorz8 changed the title http source select excpetion http source select exception Aug 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants