Skip to content

Commit

Permalink
Update Citus Technical Documentation about the rebalancer (#7638)
Browse files Browse the repository at this point in the history
The sections about the rebalancer algorithm and the backround tasks were
empty.

---------

Co-authored-by: Marco Slot <[email protected]>
Co-authored-by: Steven Sheehy <[email protected]>
  • Loading branch information
3 people authored Jun 27, 2024
1 parent aaaf637 commit 58fef24
Showing 1 changed file with 190 additions and 8 deletions.
198 changes: 190 additions & 8 deletions src/backend/distributed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ The purpose of this document is to provide comprehensive technical documentation
- [Rebalancing algorithm](#rebalancing-algorithm)
- [Shard moves](#shard-moves)
- [Shard splits](#shard-splits)
- [Background tasks](#background-tasks)
- [Background task runner](#background-task-runner)
- [Resource cleanup](#resource-cleanup)
- [Logical decoding / CDC](#logical-decoding--cdc)
- [CDC ordering](#cdc-ordering)
Expand Down Expand Up @@ -2344,17 +2344,164 @@ In the past we had some bugs where we had a `palloc` failure while holding `Spin

# Rebalancing

A high-level overview of the rebalancer is given in [this rebalancer blog post](https://www.citusdata.com/blog/2021/03/13/scaling-out-postgres-with-citus-open-source-shard-rebalancer/).
A high-level overview of the shard rebalancer is given in [this rebalancer blog post][rebalancer-post]. It is a bit outdated though, specifically that it uses `rebalance_table_shards()` instead of the newer `citus_rebalance_start()`.

The shard rebalancer consists of 4 main parts:

1. The rebalancing algorithm: Decides what moves/splits it should do to make
the cluster balanced.
2. The background task runner: Runs a full rebalance according to a plan
created by the planner.
3. A shard group moves/split: These are the smallest units of work that the
rebalancer does, if this fails midway through the move is aborted and the
shard group remains unchanged.
4. Deferred cleanup: The source shards stay present for a while after a move to
let long-running read queries continue, eventually they need to be cleaned
up.

These parts interact, but they are pretty self-contained. Usually it's only
necessary to change one of them to add a feature/fix a bug.

[rebalancer-post]: https://www.citusdata.com/blog/2021/03/13/scaling-out-postgres-with-citus-open-source-shard-rebalancer/

## Rebalancing algorithm

The rebalancing algorithm tries to find an optimal placement of shard groups
across nodes. This is not an easy job, because this is a [co-NP-complete
problem](https://en.wikipedia.org/wiki/Knapsack_problem). So instead of going for
the fully optimal solution it uses a greedy approach to reach a local
optimum, which so far has proved effective in getting to a pretty optimal
solution.

Even though it won't result in the perfect balance, the greedy approach has two
important practical benefits over a perfect solution:
1. It's relatively easy to understand why the algorithm decided on a certain move.
2. Every move makes the balance better. So if the rebalance is cancelled midway
through, the cluster will always be in a better situation than before.

As described in the [this rebalancer blog post][rebalance-post] the algorithm
takes three inputs from the function in the `pg_dist_rebalance_strategy` table:

1. Is a shard group allowed on a certain node?
2. What is the "cost" of a shard group, relative to the other shard groups?
3. What is the "capacity" of a node, relative to the other nodes?

Cost and capacity are vague on purpose, this way users can choose their own
way to determine cost of a shard group, but **in practice "cost" is usually
disk size** (because `by_disk_size` is the default rebalance strategy).
Capacity is almost always set to 1, because almost all Citus clusters are
homogeneous (they contain the same nodes, except for maybe the coordinator). The
main usage for "Is a shard group allowed on a certain node?" is to be able to pin a
specific shard group to a specific node.

There is one last definition that you should know to understand the algorithm
and that is "utilization". Utilization is the total cost of all shard groups
divided by capacity. In practice this means that utilization is almost always
the same as cost because as explained above capacity is almost always 1. So if
you see "utilization" in the algorithm, for all intents and purposes you can
read it as "cost".

The way the general algorithm works is fairly straightforward. It starts by
creating an in-memory representation of the cluster, and then it tries to
improve that in-memory representation by making theoretical moves. So to be
clear the algorithm doesn't actually do any shard group moves, it only does
those moves to its in-memory representation. The way it determines what
theoretical moves to make is as follows (updating utilization of in-memory
nodes after every move):

1. Find all shard groups that are on a node where they are not allowed (due to
"Is a shard group allowed on a certain node?")
2. Order those nodes by cost
3. Move them one-by one to nodes with the lowest utilization where they are
allowed.
4. If the cluster is balanced we are done.
5. Take the most utilized node (A) and take the least utilized node (B).
6. Try moving the shard group with the highest cost from A to B.
7. If the balance is "better" commit this move and continue from step 4. (See subsection below for
what is "better")
8. If the balance is worse/equal try again from step 6 with the shard group
with the next highest cost on node A. If this was the lowest cost shard on
node A, then try with the highest cost shard again but on the next least
utilized node after node B. If no moves helped with the balance, try with
the next most utilized node after node A. If we tried all moves for all
nodes like this, we are done (we cannot get a better balance).


Of course, the devil is in the details though.

### When is the balance better?

The main way to determine if the balance is better is by comparing the
utilization of node A and B, before and after the move and seeing if they are
net closer to the average utilization of the nodes in the cluster. The easiest
way to explain this is with a simple example:

We have two nodes A and B. A has a utilization of 100GB and B has a utilization
of 70GB. So we will move a shard from A to B. A move of 15GB is obviously best,
it results in perfect balance (A=85GB, B=85GB). A move of a 10GB is still
great, both improved in balance (A=90GB, B=80GB). A move of 20GB is also good,
the result is the same as a move of 10GB only with the nodes swapped (A=80GB,
B=90GB).

The 10GB vs 20GB move shows a limitation of the current algorithm. The
algorithm mostly makes choices based on the end state, not on the cost of
moving a shard. This is usually not a huge problem in practice though.

### Thresholds

The algorithm is full of thresholds, the main reason these exist is because
moving shards around isn't free.

- `threshold`: Used to determine if the cluster is in a good enough state. For
the `by_disk_size` rebalance strategy this is 10%, so if all nodes are at
most 10% above or 10% below the average utilization then no moves are
necessary anymore (i.e. the nodes are balanced enough). The main reason for
this threshold is that these small differences in utilization are not
necessarily problematic and might very well resolve automatically over time. For example, consider a scenario in which
one shard gets mostly written in during the weekend, while another one during
the week. Moving shards on Monday and that you then have to move back on
Friday is not very helpful given the overhead of moving data around.
- `improvement_threshold`: This is used in cases where a shard group move from
node A to B swaps which node now has the highest utilization (so afterwards B
will have higher utilization than A). As described above this can still
result in better balance. This threshold is meant to work around a
particularly bad situation where we move a lot of data for very little
benefit. Imagine this situation: A=200GB and B=99, thus moving a 100GB shard
from A to B would bring their utilization closer to the average (A=100GB,
B=199GB). But obviously that's a tiny gain for a move of 100GB, which
probably takes lots of resources and time. The `improvement_threshold` is set
to 50% for the `by_disk_size` rebalance strategy. This means that this move
is only chosen if the utilization improvement is larger than 50% of the
utilization that the shard group causes on its current node.

### How do multiple colocation groups impact the rebalancer algorithm?

The previous section glossed over colocation groups a bit. The main reason for
that is that the algorithm doesn't handle multiple colocation groups very well.
If there are multiple colocation groups each colocation group gets balanced
completely separately. For the utilization calculations only the costs are used
for the shard groups in the colocation group that is currently being rebalanced.
The reasoning for this is that if you have two colocation groups, you probably
want to spread the shard groups from both colocation groups across multiple
nodes. And not have shard groups from colocation group 1 only be on node A and
shard groups from colocation group 2 only be on node B.

There is an important caveat here though for colocation groups that have fewer
shard groups than the number of nodes in the cluster (in practice these are
usually colocation groups used by schema based sharding, i.e. with a single
shard group): The rebalancer algorithm balances the shard groups from these
colocation groups as if they are all all part of a single colocation group.
The main reason for this is to make sure that schemas for schema based sharding
are spread evenly across the nodes.


## Shard moves

Shard moves move a shard group placement to a different node (group). Moves are orchestrated by the `citus_move_shard_placement` UDF, which is also the function that the rebalancer runs to move a shard.
Shard moves move a shard group placement to a different node (group). It would be more correct if these were called "shard **group** moves", but in many places we don't due to historical reasons. Moves are orchestrated by the `citus_move_shard_placement` UDF, which is also the function that the rebalancer runs to move a shard.

We implement blocking and non-blocking shard splits. Non-blocking shard moves use logical replication, which has an important limitation. If the (distributed) table does not have a replica identity (usually the primary key), then update/delete commands will error out once we create a publication. That means using a non-blocking move without a replica identity does incur some downtime. Since a blocking move is generally faster (in part because it forces out regular work), it may be less invasive. We therefore force the user to choose when trying to move a shard group that includes a table without a replica identity by supplying `shard_transfer_mode := 'force_logical'` or `shard_transfer_mode := 'block_writes'`.

The blocking-move is mostly a simplified variant of the non-blocking move (with locks taken upfront). A non-blocking move involves the following steps:
The blocking-move is mostly a simplified variant of the non-blocking move, where the write locks are taken upfront so that no catch-up using logical replication is needed. A non-blocking move involves the following steps:

- **Create the new shard group placement on the target node**. We also create constraints that do not involve an index and set up ownership and access control.
- **Create publication(s) on the source node**. We create publications containing the shards in the source shard group placement. We create one publications per table owner, mainly because we need one subscription per table owner to prevent privilege escalation issues on older versions of PostgreSQL (15 and below).
Expand All @@ -2379,7 +2526,7 @@ A workaround for the replica identity problem is to always assign REPLICA IDENTI

## Shard splits

Shard splits convert one shard group ("split parent") into two or more shard groups ("split children") by splitting the hash range. The new shard groups can be placed on the node itself, or on other nodes. We implement blocking and non-blocking shard splits. The blocking variant is mostly a simplified version of non-blocking, so we only cover non-blocking here. Shard splits have many similarities to shard moves, and have the same `shard_transfer_mode` choice.
Shard splits convert one shard group ("split parent") into two or more shard groups ("split children") by splitting the hash range. Just like with shard moves it would be more correct to call these "shard **group** splits", but again we often don't. The new shard groups can be placed on the node itself, or on other nodes. We implement blocking and non-blocking shard splits. The blocking variant is mostly a simplified version of non-blocking, so we only cover non-blocking here. Shard splits have many similarities to shard moves, and have the same `shard_transfer_mode` choice.

The shard split is a lengthy process performed by the `NonBlockingShardSplit` function, supported by a custom output plugin to handle writes that happen during the split. There are a few different entry-points in this logic, namely: `citus_split_shard_by_split_points`, `create_distributed_table_concurrently`, and `isolate_tenant_to_node`.

Expand Down Expand Up @@ -2409,19 +2556,54 @@ A difference between splits and moves is that the old shard ID disappears. In ca

## Background tasks

In the past the only way to trigger a rebalance was to call
`rebalance_table_shards()`, this function run the rebalance using the current
session. This has the huge downside that the connection needs to be kept open
until the rebalance completes. So eventually we [introduced
`citus_rebalance_start()`](https://www.citusdata.com/blog/2022/09/19/citus-11-1-shards-postgres-tables-without-interruption/#rebalance-background),
which uses a background worker to do the rebalancing, so users can disconnect
their client and the rebalance continues. It even automatically retries moves
if they failed for some reason.

The way this works is using a general background job infrastructure that Citus
has in the tables `pg_dist_backround_job` and `pg_dist_background_task`.
A job (often) contains multiple tasks. In case of the rebalancer, the job is
the full rebalance, and each of its tasks are separate shard group moves.

### Parallel background task execution

A big benefit of the background task infrastructure is that it can execute tasks
and jobs in parallel. This can make rebalancing go much faster especially in
clusters with many nodes. To ensure that we're not doing too many tasks in
parallel though we have a few ways to limit concurrency:

1. Tasks can depend on each other. This makes sure that one task doesn't start
before all the ones that it depends on have finished.
2. The maximum number of parallel tasks being executed at the same time can be
limited using `citus.max_background_task_executors`. The default for
this is 4.
3. Tasks can specify which nodes are involved in the task, that way we can
control that a single node is not involved into too many tasks. The
rebalancer specifies both the source and target node as being involved in
the task. That together with the default of 1 for
`citus.max_background_task_executors_per_node` makes sure that a node
doesn't handle more than a single shard move at once, while still allowing
moves involving different nodes to happen in parallel. For larger machines
it can be beneficial to increase the default a bit.

## Resource cleanup

During a shard move/split, some PostgreSQL objects can be created that live outside of the scope of any transaction or are committed early. We need to make sure those objects are dropped once the shard move ends, either through failure or success. For instance, subscriptions and publications used for logical replication need to be dropped in case of failure, but also the target shard (in case of failure) and source shard (in case of success).

To achieve that, we write records to pg_dist_cleanup before creating an object to remember that we need to clean it. We distinguish between a few scenarios:

**Cleanup-always**: For most resources that require cleanup records, cleanup should happen regardless of whether the operation succeeds or fails. For instance, subscriptions and publications should always be dropped. We achieve cleanup always by writing pg_dist_cleanup records in a subtransaction, and at the end of the operation we try to clean up object immediately and if it succeeds delete the record. If cleanup fails, we do not fail the whole operation, but instead leave the pg_dist_cleanup record in place for the maintenance daemon.

**Cleanup-on-failure**: Cleanup should only happen if the operation fails. The main example is the target shard of a move/split. We achieve cleanup-on-failure by writing pg_dist_cleanup records in a subtransaction (transaction on a localhost connection that commits immediately) and deleting them in the outer transaction that performs the move/split. That way, they remain in pg_dist_cleanup in case of failure, but disappear in case of success.

**Cleanup-deferred-on-success**: Cleanup should only happen after the operation (move/split) succeeds. We use this to clean the source shards of a shard move. We previously dropped shards immediately as part of the transaction, but this frequently led to deadlocks at the end of a shard move. We achieve cleanup-on-success by writing pg_dist_cleanup records as part of the outer transaction that performs the move/split.

**Cleanup-always**: For most resources that require cleanup records, cleanup should happen regardless of whether the operation succeeds or fails. For instance, subscriptions and publications should always be dropped. We achieve cleanup always by writing pg_dist_cleanup records in a subtransaction, and at the end of the operation we try to clean up object immediately and if it succeeds delete the record. If cleanup fails, we do not fail the whole operation, but instead leave the pg_dist_cleanup record in place for the maintenance daemon.

Resource cleaner (currently shard_cleaner.c) is part of the maintenance daemon and periodically checks pg_dist_cleanup for cleanup tasks. It’s important to prevent cleanup of operations that are already running. Therefore, each operation has a unique operation ID (from a sequence) and takes an advisory lock on the operation ID. The resource cleaner learns the operation ID from pg_dist_cleanup and attempts to acquire this lock. If it cannot acquire the lock, the operation is not done and cleanup is skipped. If it can, the operation is done, and the resource cleaner rechecks whether the record still exists, since it could have been deleted by the operation.
Resource cleaner (currently shard_cleaner.c) is part of the maintenance daemon and periodically checks pg_dist_cleanup for cleanup tasks. It’s important to prevent cleanup of operations that are still running. Therefore, each operation has a unique operation ID (from a sequence) and takes an advisory lock on the operation ID. The resource cleaner learns the operation ID from pg_dist_cleanup and attempts to acquire this lock. If it cannot acquire the lock, the operation is not done and cleanup is skipped. If it can, the operation is done, and the resource cleaner rechecks whether the record still exists, since it could have been deleted by the operation.

Cleanup records always need to be committed before creating the actual object. It’s also important for the cleanup operation to be idempotent, since the server might crash immediately after committing a cleanup record, but before actually creating the object. Hence, the object might not exist when trying to clean it up. In that case, the cleanup is seen as successful, and the cleanup record removed.

Expand Down

0 comments on commit 58fef24

Please sign in to comment.