-
Notifications
You must be signed in to change notification settings - Fork 58
Description
Hi,
Thank you for your project, but wanted to add few comments about schema in ClickHouse.
signoz-otel-collector/cmd/signozschemamigrator/schema_migrator/squashed_metrics_migrations.go
Line 237 in 421ade2
| ShardingKey: "cityHash64(env, temporality, metric_name, fingerprint)", |
Sharding key is utilized to solve 2 problems:
- Equal distribution of data across many shards (on INSERT)
- Control amount of instances selected for query (on SELECT)
First problem have several solutions.
INSERT directly to local tables.
Distribution is fully controlled on ingestion service via 2 properties:
- Decision of CH node to which write data
- Decision on size of individual data batch.
Pros of this method is zero overhead on redistribution of data across shards, zero fanout.
Cons is that ingestor need to round robin across many CH servers or there should be sufficient number of ingestors and load distributed across them to do "sharding" before ingestor layer and random data distribution.
INSERT to Distributed table with 0 sharding key + insert_distributed_one_random_shard
Basically it allow us to have any number of sticky ingestors to CH nodes, but rely that size of batch + CH servers which INSERT data to random node + "law of large numbers" to distribute data equally across many nodes.
Cons of this approach is more processing on CH side (send to another shard) and random data distribution between shards.
rand()
Main difference from method 2, is that this "distribution" happen on level of individual row, which theoretically should lead to more equal row distribution across shards. (regardless of batch size), but practically it have higher performance impact (now each inserts translates to N smaller inserts to each shard) without any perceivable improvement over 2, if batch size is reasonable.
Cons is higher cost compared to method 2 and again random distribution of data.
cityHash64(column_x)
Compared to method 3, it should have same or lower performance impact on writes. Lower performance impact can happen if your batch have same column_x value, or such values that results in the same cityHash64 value/shard id value.
But, now it have benefit of data having the same column_x reside on the same shard.
So, better compression (if table is sorted by column_x) and smaller query scope (if table is sorted by column_x or skip index created) as only shard will have data and others will just return OK quickly after index scan.
It also applies to all columns which have correlation with column_x value, filtering by them also will benefit (from perspective of amount data read) from sharding by column_x. even if such columns are not part of sharding key
In certain cases, it can pose disadvantage, if amount of data now stored on single shard (having the same column_x) value, making queries slower as processing of data fully happening on single node.
cityHash64(column_x, column_y)
And behavior completely changes, if you have multiple columns in shading key.
First, chance of all your data from single batch to land on single instance(so, to have lower performance impact) getting lower, as now you need to have multiple columns have same value in batch.
But, it also affect behavior of queries.
Lets consider 3 examples
- SELECT * FROM table WHERE column_x = A
If column_x = A have multiple values of column_y, it does mean that it may need to read data from several shards. (as cityHash will be completely different)
- SELECT * FROM table WHERE column_y = B
Same logic applies here.
- SELECT * FROM table WHERE column_x = A AND column_y = B
And only this query variant will benefit from sharding key with 2 columns in it.
Lets compare it with case if we had sharding by only column_x
- SELECT * FROM table WHERE column_x = A
Only one shard have data with column_x = A, so only one shard will actually read data
- SELECT * FROM table WHERE column_y = B
Because column_y is not part of sharding key, 2 cases are possible:
- It's actually dependent value from column_x (like user_id-device_id relationship), so there is implicity sharding happening by column_y as well, and we lets say have bloom filter index by column_y value, again only one shard will actually read data.
- It's completely non depended value, so we fallback to case when we need to read all shards
- SELECT * FROM table WHERE column_x = A AND column_y = B
Like case 1, because only one shard have value column_x = A, only one shard will read data. and it will work regardless existence condition for column_y.
Practically, it gives better guarantees to user for more query patterns compared to approach if we had both columns in sharding key. as it works for 2(may be 3 if there is correlation between them) cases, instead of 1 with most of query conditions.
Only worse side of this approach is possibility that column_x have low number of values which can lead to bad distribution of data across shards. Usually it's not big concern as we can select column with big number of uniq values. (fingerprint_id as most cardinal for example)
It's also usually a good idea to have "orthogonal" SHARDING and ORDER BY key of table.
Ie, for example shard by fingerprint_id and ORDER BY (metric_name) (or (env, temporality, metric_name, fingerprint_id), env, temporality have low effect here as they are low cardinal columns (commonly/always used in WHERE) and will not harm distribution too much)
Basically, it does mean that for query like
SELECT * FROM table WHERE metric_name = 'xxxx' AND fingerprint_id = 'xxxx' AND env = 'xxx' and temporality = 'xxxx'
It will select only one shard (by fingerprint_id = 'xxxx') and on this shard for each partition, it will select only one mark range (by metric_name = 'xxxx' AND env = 'xxx' and temporality = 'xxxx')
Another option to consider, is to have ~2 level sharding.
Lets say there is organization with many projects (or tenants), some of them are big, some of them are small.
Dashboards usually exist/query within context of single project.
So, if there is not too much data for particular project, it would be really great if queries for it would touch only one shard (or smaller subset of shards for big cluster). Because query for particular panel can get multiple metrics for single instance or multiple instances for single metric, both cases imply that if we shard by any column, several shards could be queried, which is bad as it introduce us to tail latencies (it start to be a problem with dozens of shards queried by single query). But if we shard our data by project_id, any query to this project will read only one shard.
But, it(sharding by project_id) wouldn't work well for big projects, as we want to distribute data for them across many shards to parallelize processing.
So, idea is to define "explicit" sharding_key column, which value will be populated on ingestor or in ClickHouse, by something like (this is example for ClickHouse side) cityHash64(if(dictGetUInt8('projects', 'is_big', project_id), fingerprint_id, project_id)).
Which will give us good locality for small and enough parallelization for huge projects.