-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1569005: Deduplicate queries and post actions with building queries from binary plan #2090
SNOW-1569005: Deduplicate queries and post actions with building queries from binary plan #2090
Conversation
…d-for-binary-operator
@@ -637,8 +637,15 @@ def build_binary( | |||
if post_action not in post_actions: | |||
post_actions.append(copy.copy(post_action)) | |||
else: | |||
merged_queries = select_left.queries[:-1] + select_right.queries[:-1] | |||
post_actions = select_left.post_actions + select_right.post_actions | |||
merged_queries = select_left.queries[:-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can merge the code for query merge here for the if else branch, we only need a parameter protection for referenced_ctes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
referenced_ctes are only enabled when we have _query_compilation_stage_enabled=True
right. This is the False branch.
What kind of protection are you thinking of exaclty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what i mean is the following code is common for both branch under
merged_queries = select_left.queries[:-1].copy()
for query in select_right.queries[:-1]:
if query not in merged_queries:
merged_queries.append(copy.copy(query))
post_actions = select_left.post_actions.copy()
for post_action in select_right.post_actions:
if post_action not in post_actions:
post_actions.append(copy.copy(post_action))
so we can merge the code under the if else branch, and we can have the following
merged_queries = select_left.queries[:-1].copy()
for query in select_right.queries[:-1]:
if query not in merged_queries:
merged_queries.append(copy.copy(query))
post_actions = select_left.post_actions.copy()
for post_action in select_right.post_actions:
if post_action not in post_actions:
post_actions.append(copy.copy(post_action))
if (
self.session.cte_optimization_enabled
and self.session._query_compilation_stage_enabled
):
referenced_ctes.update(select_left.referenced_ctes)
referenced_ctes.update(select_right.referenced_ctes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sfc-gh-aalam can you trigger the snowpark daily test with your pr to see if it passed with sql simplifier off
https://github.com/snowflakedb/snowpark-python/actions/runs/10395188222 |
query_count=18, | ||
query_count=13, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without dedup
0. ls '@"TESTDB_SNOWPARK_PYTHON"."TESTSCHEMA_SNOWPARK_PYTHON".SNOWPARK_TEMP_STAGE_6VYBSG222X'
1. SELECT "name" FROM ( SELECT * FROM TABLE ( RESULT_SCAN('01b67b2d-0001-9f54-0000-047d100fa0ea')))
2. CREATE TEMPORARY FUNCTION "TESTDB_SNOWPARK_PYTHON"."TESTSCHEMA_SNOWPARK_PYTHON".SNOWPARK_TEMP_TABLE_FUNCTION_28BQP90FQN(a...
3. CREATE OR REPLACE SCOPED TEMPORARY TABLE SNOWPARK_TEMP_TABLE_44LOL2QE6K("__index__" BIGINT, "A" DOUBLE, "B" DOUBLE, "C" DOUBLE, "__row_position__" BIGINT)
4. INSERT INTO SNOWPARK_TEMP_TABLE_44LOL2QE6K("__index__", "A", "B", "C", "__row_position__") VALUES (?, ?, ?, ?, ?)
5. CREATE TEMPORARY TABLE "TESTDB_SNOWPARK_PYTHON"."TESTSCHEMA_SNOWPARK_PYTHON"."SNOWPARK_TEMP_TABLE_LE4JZ1D6QF"("__row_position__" BIGINT, "LABEL" STRING(16777216), "VALUE" VARIANT) AS SELECT * FROM ( SELECT "__row_position__", "LABEL", "VALUE" FROM ( SELECT T_RIGHT."__row_position__", T_RIGHT."LABEL", T_RIGHT."VALUE" FROM ( SELECT "__index__", "A", "B", "C", "__row_position__", round(("__row_position__" / 1000 :: INT), 0) AS "partition_id" FROM ( SELECT * FROM (SNOWPARK_TEMP_TABLE_44LOL2QE6K))) AS T_LEFT JOIN TABLE ("TESTDB_SNOWPARK_PYTHON"."TESTSCHEMA_SNOWPARK_PYTHON".SNOWPARK_TEMP_TABLE_FUNCTION_28BQP90FQN("__row_position__", "A", "B", "C") OVER (PARTITION BY "partition_id" )) AS T_RIGHT))
6. DROP TABLE If EXISTS SNOWPARK_TEMP_TABLE_44LOL2QE6K
7. CREATE OR REPLACE SCOPED TEMPORARY TABLE SNOWPARK_TEMP_TABLE_44LOL2QE6K("__index__" BIGINT, "A" DOUBLE, "B" DOUBLE, "C" DOUBLE, "__row_position__" BIGINT)
8. alter session set query_tag = ' File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/modin/plugin/_internal/frame.py", line 706, in num_rows\n return count_rows(self.ordered_dataframe)\n File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/modin/plugin/_internal/utils.py", line 1668, in count_rows\n df.select(df.row_count_snowflake_quoted_identifier)\n File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/_internal/telemetry.py", line 151, in wrap\n result = func(*args, **kwargs)\n'
9. INSERT INTO SNOWPARK_TEMP_TABLE_44LOL2QE6K("__index__", "A", "B", "C", "__row_position__") VALUES (?, ?, ?, ?, ?)
10. alter session unset query_tag
11. CREATE OR REPLACE SCOPED TEMPORARY TABLE SNOWPARK_TEMP_TABLE_44LOL2QE6K("__index__" BIGINT, "A" DOUBLE, "B" DOUBLE, "C" DOUBLE, "__row_position__" BIGINT)
12. alter session set query_tag = ' File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/modin/plugin/_internal/frame.py", line 706, in num_rows\n return count_rows(self.ordered_dataframe)\n File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/modin/plugin/_internal/utils.py", line 1668, in count_rows\n df.select(df.row_count_snowflake_quoted_identifier)\n File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/_internal/telemetry.py", line 151, in wrap\n result = func(*args, **kwargs)\n'
13. INSERT INTO SNOWPARK_TEMP_TABLE_44LOL2QE6K("__index__", "A", "B", "C", "__row_position__") VALUES (?, ?, ?, ?, ?)
14. alter session unset query_tag
15. SELECT count(1) OVER ( ) AS "__row_count__" FROM ( SELECT...
16. DROP TABLE If EXISTS SNOWPARK_TEMP_TABLE_44LOL2QE6K
17. DROP TABLE If EXISTS SNOWPARK_TEMP_TABLE_44LOL2QE6K
with dedup
0. ls '@"TESTDB_SNOWPARK_PYTHON"."TESTSCHEMA_SNOWPARK_PYTHON".SNOWPARK_TEMP_STAGE_Y9M0URDKK1'
1. SELECT "name" FROM ( SELECT * FROM TABLE ( RESULT_SCAN('01b67b31-0001-9f56-0000-047d100f939a')))
2. CREATE TEMPORARY FUNCTION "TESTDB_SNOWPARK_PYTHON"."TESTSCHEMA_SNOWPARK_PYTHON".SNOWPARK_TEMP_TABLE_FUNCTION_1AHR3RQ86H...
3. CREATE OR REPLACE SCOPED TEMPORARY TABLE SNOWPARK_TEMP_TABLE_Y9C70KYN3S("__index__" BIGINT, "A" DOUBLE, "B" DOUBLE, "C" DOUBLE, "__row_position__" BIGINT)
4. INSERT INTO SNOWPARK_TEMP_TABLE_Y9C70KYN3S("__index__", "A", "B", "C", "__row_position__") VALUES (?, ?, ?, ?, ?)
5. CREATE TEMPORARY TABLE "TESTDB_SNOWPARK_PYTHON"."TESTSCHEMA_SNOWPARK_PYTHON"."SNOWPARK_TEMP_TABLE_2S5501TCU3"("__row_position__" BIGINT, "LABEL" STRING(16777216), "VALUE" VARIANT) AS SELECT ...
6. DROP TABLE If EXISTS SNOWPARK_TEMP_TABLE_Y9C70KYN3S
7. CREATE OR REPLACE SCOPED TEMPORARY TABLE SNOWPARK_TEMP_TABLE_Y9C70KYN3S("__index__" BIGINT, "A" DOUBLE, "B" DOUBLE, "C" DOUBLE, "__row_position__" BIGINT)
8. alter session set query_tag = ' File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/modin/plugin/_internal/frame.py", line 706, in num_rows\n return count_rows(self.ordered_dataframe)\n File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/modin/plugin/_internal/utils.py", line 1668, in count_rows\n df.select(df.row_count_snowflake_quoted_identifier)\n File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/_internal/telemetry.py", line 151, in wrap\n result = func(*args, **kwargs)\n'
9. INSERT INTO SNOWPARK_TEMP_TABLE_Y9C70KYN3S("__index__", "A", "B", "C", "__row_position__") VALUES (?, ?, ?, ?, ?)
10. alter session unset query_tag
11. SELECT count(1) OVER ( ) AS "__row_count__" FROM ( SELECT ...
12. DROP TABLE If EXISTS SNOWPARK_TEMP_TABLE_Y9C70KYN3S
11 queries includes 5 queries to prepare the temp table for df, including create, insert, | ||
drop the temp table (3.). and alter session to set and unset query_tag (2) and one select query. | ||
6 queries includes queries to create, insert, and drop the temp table (3), alter session | ||
to set and unset query_tag (2) and one select query. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0: CREATE OR REPLACE SCOPED TEMPORARY TABLE SNOWPARK_TEMP_TABLE_5GYC8B2HIQ("__c0__" BIGINT, "c1" BIGINT, "__row_position__" >
1: alter session set query_tag = ' File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/modin/plugin/_internal/ut>
2: INSERT INTO SNOWPARK_TEMP_TABLE_5GYC8B2HIQ("__c0__", "c1", "__row_position__") VALUES (?, ?, ?)
3: alter session unset query_tag
4: SELECT "__c0__", "c1" FROM ( SELECT * FROM (( SELECT "__index__" AS "__index__", "int_value_jpcc" AS "int_value_jpcc", "__r>
5: DROP TABLE If EXISTS SNOWPARK_TEMP_TABLE_5GYC8B2HIQ
---------------------------------------------------------------------
0: CREATE OR REPLACE SCOPED TEMPORARY TABLE SNOWPARK_TEMP_TABLE_NU1M8EN9NU("__index__" BIGINT, "__reduced__" DOUBLE, "__row_position__" BIGINT)
1: alter session set query_tag = ' File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/modin/plugin/_internal/utils.py", line 1332, in snowpark_to_pandas_helper\n pandas_df = ordered_dataframe.to_pandas(statement_params=statement_params, **kwargs)\n File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py", line 1740, in to_pandas\n return snowpark_dataframe.to_pandas(\n File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/_internal/telemetry.py", line 151, in wrap\n result = func(*args, **kwargs)\n'
2: INSERT INTO SNOWPARK_TEMP_TABLE_NU1M8EN9NU("__index__", "__reduced__", "__row_position__") VALUES (?, ?, ?)
3: alter session unset query_tag
4: CREATE OR REPLACE SCOPED TEMPORARY TABLE SNOWPARK_TEMP_TABLE_C3E4S98KC9("__c0__" BIGINT, "c1" BIGINT, "__row_position__" BIGINT)
5: alter session set query_tag = ' File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/modin/plugin/_internal/utils.py", line 1332, in snowpark_to_pandas_helper\n pandas_df = ordered_dataframe.to_pandas(statement_params=statement_params, **kwargs)\n File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py", line 1740, in to_pandas\n return snowpark_dataframe.to_pandas(\n File "/Users/aalam/Projects/snowpark-python/src/snowflake/snowpark/_internal/telemetry.py", line 151, in wrap\n result = func(*args, **kwargs)\n'
6: INSERT INTO SNOWPARK_TEMP_TABLE_C3E4S98KC9("__c0__", "c1", "__row_position__") VALUES (?, ?, ?)
7: alter session unset query_tag
8: SELECT "__c0__", "c1" FROM ( SELECT * FROM (( SELECT "__index__" AS "__index__", "int_value_uwjn" AS "int_value_uwjn", "__row_position__" AS "__row_position__", "__row_position___2nnd" AS "__row_position___2nnd" FROM ( SELECT "__index__", iff(("int_value" < 0 :: INT), ("int_value" + "__row_count__"), "int_value") AS "int_value_uwjn", "__row_position__", "__row_position___2nnd" FROM ( SELECT * FROM (( SELECT "__index__" AS "__index__", "int_value" AS "int_value", "__row_position__" AS "__row_position__" FROM ( SELECT "__index__", trunc("__reduced___1v84", 0) AS "int_value", "__row_position__" FROM ( SELECT "__index__", "__reduced__", "__row_position__", "__reduced__" AS "__reduced___1v84" FROM ( SELECT * FROM (SNOWPARK_TEMP_TABLE_NU1M8EN9NU))))) AS SNOWPARK_LEFT CROSS JOIN ( SELECT "__row_position_ouyp__" AS "__row_position_ouyp__", "__row_count__" AS "__row_count__", "__row_position___2nnd" AS "__row_position___2nnd" FROM ( SELECT (row_number() OVER ( ORDER BY "__row_position__" ASC NULLS LAST ) - 1 :: INT) AS "__row_position_ouyp__", "__row_count__", "__row_position__" AS "__row_position___2nnd" FROM ( SELECT "__c0__", "c1", "__row_position__", count(1) OVER ( ) AS "__row_count__" FROM ( SELECT * FROM (SNOWPARK_TEMP_TABLE_C3E4S98KC9)) LIMIT 1))) AS SNOWPARK_RIGHT)))) AS SNOWPARK_LEFT INNER JOIN ( SELECT "__c0__" AS "__c0__", "c1" AS "c1", "__row_position___6i0y" AS "__row_position___6i0y" FROM ( SELECT "__c0__", "c1", "__row_position__" AS "__row_position___6i0y" FROM ( SELECT * FROM (SNOWPARK_TEMP_TABLE_C3E4S98KC9)))) AS SNOWPARK_RIGHT ON EQUAL_NULL("int_value_uwjn", "__row_position___6i0y"))) ORDER BY "__row_position__" ASC NULLS LAST, "__row_position___2nnd" ASC NULLS LAST, "__row_position___6i0y" ASC NULLS LAST
9: DROP TABLE If EXISTS SNOWPARK_TEMP_TABLE_NU1M8EN9NU
0: DROP TABLE If EXISTS SNOWPARK_TEMP_TABLE_C3E4S98KC9
Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.
Fixes SNOW-1569005
Fill out the following pre-review checklist:
Please describe how your code solves the related issue.
When building plans from binary or set statements, we currently simple merge all queries. This PR fixes by deduplicating the queries.