Skip to content

Commit 46a232e

Browse files
committed
chore(weave): add call-id sharding to distributed cluster migration
1 parent 10d7f23 commit 46a232e

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

tests/trace_server/test_clickhouse_trace_server_migrator.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,16 @@ def test_create_distributed_table_sql():
387387
assert sql.strip() == expected.strip()
388388

389389

390+
def test_create_distributed_table_sql_id_sharded():
391+
"""Test distributed table creation SQL for ID-sharded tables."""
392+
distributed_migrator = DistributedClickHouseTraceServerMigrator(
393+
Mock(), replicated_cluster="test_cluster", migration_dir=DEFAULT_MIGRATION_DIR
394+
)
395+
sql = distributed_migrator._create_distributed_table_sql("calls_complete")
396+
expected = "CREATE TABLE IF NOT EXISTS calls_complete ON CLUSTER test_cluster\n AS calls_complete_local\n ENGINE = Distributed(test_cluster, currentDatabase(), calls_complete_local, sipHash64(id))"
397+
assert sql.strip() == expected.strip()
398+
399+
390400
def test_format_distributed_sql():
391401
"""Test distributed SQL formatting for CREATE TABLE and other DDL."""
392402
distributed_migrator = DistributedClickHouseTraceServerMigrator(

weave/trace_server/clickhouse_trace_server_migrator.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@
8585
# Constants for table naming conventions
8686
VIEW_SUFFIX = "_view"
8787

88+
# Tables that use ID-based sharding (sipHash64(field)) instead of random sharding
89+
# in distributed mode. Maps table name to the field used for sharding.
90+
# This ensures all data for a specific ID goes to the same shard, enabling
91+
# efficient point lookups.
92+
ID_SHARDED_TABLES: dict[str, str] = {"calls_complete": "id"}
93+
8894

8995
@dataclass(frozen=True)
9096
class PostMigrationHookContext:
@@ -781,12 +787,21 @@ def _format_distributed_sql(self, sql_query: str) -> DistributedTransformResult:
781787
)
782788

783789
def _create_distributed_table_sql(self, table_name: str) -> str:
784-
"""Generate SQL to create a distributed table."""
790+
"""Generate SQL to create a distributed table.
791+
792+
For tables in ID_SHARDED_TABLES, uses sipHash64(field) as the sharding key
793+
to ensure all data for a specific ID goes to the same shard, enabling
794+
efficient point lookups. Other tables use rand() for even distribution.
795+
"""
785796
local_table_name = table_name + ch_settings.LOCAL_TABLE_SUFFIX
797+
if shard_field := ID_SHARDED_TABLES.get(table_name):
798+
sharding_key = f"sipHash64({shard_field})"
799+
else:
800+
sharding_key = "rand()"
786801
return f"""
787802
CREATE TABLE IF NOT EXISTS {table_name} ON CLUSTER {self.replicated_cluster}
788803
AS {local_table_name}
789-
ENGINE = Distributed({self.replicated_cluster}, currentDatabase(), {local_table_name}, rand())
804+
ENGINE = Distributed({self.replicated_cluster}, currentDatabase(), {local_table_name}, {sharding_key})
790805
"""
791806

792807
@staticmethod

0 commit comments

Comments
 (0)