Skip to content

24.8.14 Backport of #79369 -- Ignore parse error in system.distributed_ddl_queue #843

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: customizations/24.8.14
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions src/Storages/System/StorageSystemDDLWorkerQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ namespace fs = std::filesystem;
namespace DB
{

namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
}

enum class Status : uint8_t
{
INACTIVE,
Expand Down Expand Up @@ -54,7 +59,7 @@ ColumnsDescription StorageSystemDDLWorkerQueue::getColumnsDescription()
{"entry_version", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()), "Version of the entry."},
{"initiator_host", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), "Host that initiated the DDL operation."},
{"initiator_port", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt16>()), "Port used by the initiator."},
{"cluster", std::make_shared<DataTypeString>(), "Cluster name."},
{"cluster", std::make_shared<DataTypeString>(), "Cluster name, empty if not determined."},
{"query", std::make_shared<DataTypeString>(), "Query executed."},
{"settings", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>()), "Settings used in the DDL operation."},
{"query_create_time", std::make_shared<DataTypeDateTime>(), "Query created time."},
Expand All @@ -77,10 +82,25 @@ static String clusterNameFromDDLQuery(ContextPtr context, const DDLTask & task)

String description = fmt::format("from {}", task.entry_path);
ParserQuery parser_query(end, settings.allow_settings_after_format_in_insert);
ASTPtr query = parseQuery(parser_query, begin, end, description,
settings.max_query_size,
settings.max_parser_depth,
settings.max_parser_backtracks);
ASTPtr query;

try
{
query = parseQuery(parser_query, begin, end, description,
settings.max_query_size,
settings.max_parser_depth,
settings.max_parser_backtracks);
}
catch (const Exception & e)
{
LOG_INFO(getLogger("StorageSystemDDLWorkerQueue"), "Failed to determine cluster");
if (e.code() == ErrorCodes::SYNTAX_ERROR)
{
/// ignore parse error and present available information
return "";
}
throw;
}

String cluster_name;
if (const auto * query_on_cluster = dynamic_cast<const ASTQueryWithOnCluster *>(query.get()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@
</shard>
</test_cluster>
</remote_servers>
<allow_zookeeper_write>1</allow_zookeeper_write>
</clickhouse>
128 changes: 107 additions & 21 deletions tests/integration/test_system_ddl_worker_queue/test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import time

from helpers.cluster import ClickHouseCluster

Expand All @@ -25,46 +26,131 @@ def started_cluster():
try:
cluster.start()

for i, node in enumerate([node1, node2]):
node.query("CREATE DATABASE testdb")
node.query(
"""CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table1', '{}') ORDER BY id;""".format(
i
)
)
for i, node in enumerate([node3, node4]):
node.query("CREATE DATABASE testdb")
node.query(
"""CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table2', '{}') ORDER BY id;""".format(
i
)
)
yield cluster

finally:
cluster.shutdown()


def maintain_test_table(test_table):
tmark = time.time() # to guarantee ZK path uniqueness

for i, node in enumerate([node1, node2]):
node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC")
node.query("DROP DATABASE IF EXISTS testdb")

node.query("CREATE DATABASE testdb")
node.query(
f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}1-{tmark}', '{i}') ORDER BY id;"
)
for i, node in enumerate([node3, node4]):
node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC")
node.query("DROP DATABASE IF EXISTS testdb")

node.query("CREATE DATABASE testdb")
node.query(
f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}2-{tmark}', '{i}') ORDER BY id;"
)


def test_distributed_ddl_queue(started_cluster):
test_table = "test_table"
maintain_test_table(test_table)
node1.query(
"INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)"
f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)"
)
node3.query(
"INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)"
f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)"
)
node2.query("SYSTEM SYNC REPLICA testdb.test_table")
node4.query("SYSTEM SYNC REPLICA testdb.test_table")
node2.query(f"SYSTEM SYNC REPLICA testdb.{test_table}")
node4.query(f"SYSTEM SYNC REPLICA testdb.{test_table}")

node1.query(
"ALTER TABLE testdb.test_table ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val",
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val",
settings={"replication_alter_partitions_sync": "2"},
)
for node in nodes:
node.query("SYSTEM SYNC REPLICA testdb.test_table")
assert node.query("SELECT somecolumn FROM testdb.test_table LIMIT 1") == "0\n"
node.query(f"SYSTEM SYNC REPLICA testdb.{test_table}")
assert (
node.query(f"SELECT somecolumn FROM testdb.{test_table} LIMIT 1") == "0\n"
)
assert (
node.query(
"SELECT If((SELECT count(*) FROM system.distributed_ddl_queue WHERE cluster='test_cluster' AND entry='query-0000000000') > 0, 'ok', 'fail')"
)
== "ok\n"
)

node1.query(
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somecolumn",
settings={"replication_alter_partitions_sync": "2"},
)


def test_distributed_ddl_rubbish(started_cluster):
test_table = "test_table_rubbish"
maintain_test_table(test_table)
node1.query(
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somenewcolumn UInt8 AFTER val",
settings={"replication_alter_partitions_sync": "2"},
)

zk_content = node1.query(
"SELECT name, value, path FROM system.zookeeper WHERE path LIKE '/clickhouse/task_queue/ddl%' SETTINGS allow_unrestricted_reads_from_keeper=true",
parse=True,
).to_dict("records")

original_query = ""
new_query = "query-artificial-" + str(time.monotonic_ns())

# Copy information about query (one that added 'somenewcolumn') with new query ID
# and broken query text (TABLE => TUBLE)
for row in zk_content:
if row["value"].find("somenewcolumn") >= 0:
original_query = row["name"]
break

rows_to_insert = []

for row in zk_content:
if row["name"] == original_query:
rows_to_insert.append(
{
"name": new_query,
"path": row["path"],
"value": row["value"].replace("TABLE", "TUBLE"),
}
)
continue
pos = row["path"].find(original_query)
if pos >= 0:
rows_to_insert.append(
{
"name": row["name"],
"path": row["path"].replace(original_query, new_query),
"value": row["value"],
}
)

# Ingest it to ZK
for row in rows_to_insert:
node1.query(
"insert into system.zookeeper (name, path, value) values ('{}', '{}', '{}')".format(
f'{row["name"]}', f'{row["path"]}', f'{row["value"]}'
)
)

# Ensure that data is visible via system.distributed_ddl_queue
assert (
int(
node1.query(
f"SELECT count(1) FROM system.distributed_ddl_queue WHERE entry='{new_query}' AND cluster=''"
)
)
== 4
)

node1.query(
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somenewcolumn",
settings={"replication_alter_partitions_sync": "2"},
)
Loading