Add shard move description

pull/7226/head
Marco Slot 2023-09-22 16:10:55 +02:00 committed by Önder Kalacı
parent 810ec5e4e7
commit d3d70e4f7d
1 changed files with 41 additions and 11 deletions

View File

@ -2111,12 +2111,40 @@ In the past we had some bugs where we had a `palloc` failure while holding `Spin
# Rebalancing
A high-level overview of the rebalancer is given in [this rebalancer blog post](https://www.citusdata.com/blog/2021/03/13/scaling-out-postgres-with-citus-open-source-shard-rebalancer/).
## Rebalancing algorithm
## Shard moves
Shard moves move a shard group placement to a different node (group). Moves are orchestrated by the `citus_move_shard_placement` UDF, which is also the function that the rebalancer runs to move a shard.
We implement blocking and non-blocking shard splits. Non-blocking shard moves use logical replication, which has an important limitation. If the (distributed) table does not have a replica identity (usually the primary key), then update/delete commands will error out once we create a publication. That means using a non-blocking move without a replica identity does incur some downtime. Since a blocking move is generally faster (in part because it forces out regular work), it may be less invasive. We therefore force the user to choose when trying to move a shard group that includes a table without a replica identity by supplying `shard_transfer_mode := 'force_logical'` or `shard_transfer_mode := 'block_writes'`.
- **Create the new shard group placement on the target node**. We also create constraints that do not involve an index and set up ownership and access control.
- **Create publication(s) on the source node**. We create publications containing the shards in the source shard group placement. We create one publications per table owner, mainly because we need one subscription per table owner to prevent privilege escalation issues on older versions of PostgreSQL (15 and below).
- **Create replication slot(s) and export snapshot(s)**. We create a slot per table owner because we use a separate subscription per table owner, similar to publications. Subscriptions can create the replication slot, but we (nowadays) copy the data outside of the subscription because we apply several optimizations.
- **Create subscription(s) in disabled state**. We create subscriptions upfront in case there are any errors (e.g. hitting resource limits, connectivity issues). We create one subscription per table owner, and we set the subscription owner to the table owner. The logical replication will happen with the permissions of the subscription owner.
- **Copy the data from the source to target** by calling `worker_shard_copy` function for each source shard placement via the executor. The `worker_shard_copy` function makes a single pass over the source shard and pushes it into the target shard via `COPY`. We found this to be faster than using the `copy_data` option in the subscription because we can benefit from binary copy, optimizations in the adaptive executor, uses fewer replication slots, and it simplifies the flow. Some of these optimizations might be obsolete as of PostgreSQL 16.
- **Enable subscription(s)**, which starts the replication of writes that happened on the source shard group placement during the data copy.
- **Wait for subscription(s) to catch up to the current source LSN**. This can take a while since many writes could have happened during the data copy.
- **Create indexes, unique/exclusion constraints, statistics, etc.**. For efficiency, we create these objects after copying the data and catching up to concurrent writes.
- **Wait for subscription(s) to catch up to the current source LSN**. This can take a while since many writes could have happened during the index creation.
- **Block writes to the split parent by acquiring metadata locks globally**. At this point, we wait for any ongoing INSERT/UPDATE/DELETE/COPY/MERGE to finish and block new ones. Once we acquire the locks we try to quickly finalize the split.
- **Wait for subscription(s) to catch up to the current source LSN**. Some writes could still have happened before acquiring locks, we wait for those writes to be replicated.
- **Update the metadata**. We globally update the `pg_dist_placement` record to point to the new node. Writes tactically acquire metadata locks just before reading from `pg_dist_placement`, so they will see the new placement as soon as we commit.
- **Create foreign keys on the target shard group placement**. Creating foreign keys is deferred until the replication is fully done, because we used multiple subscriptions for tables with different owners and this is the first time that the data is guaranteed to be consistent between shards. We avoid rechecking foreign keys by using the `citus.skip_constraint_validation` setting on the session.
- **Final cleanup of connections, resources**. We primarily lean on "Resource cleanup" to drop publications, replication slots, subscriptions, which ensures they are removed both in case of success and failure. The source shard group placement is dropped once all ongoing (read-only) queries are done, by repeatedly dropping with a short lock timeout until it succeeds.
It is worth noting that the final commit happens in a 2PC, with all the characteristics of a 2PC. If the commit phase fails on one of the nodes, writes on the shell table remain blocked on that node until the prepared transaction is recovered, after which they will see the updated placement. The data movement generally happens outside of the 2PC, so the 2PC failing on the target node does not necessarily prevent access to the shard.
A similar operation to shard moves is `citus_copy_shard_placement`, which can be used to add a replica to a shard group. We also use this function to replicate reference tables without blocking. The main difference is that dropping the old shard group placement is skipped.
A workaround for the replica identity problem is to always assign REPLICA IDENTITY FULL to distributed tables / shards if they have no other replica identity. However, prior to PostgreSQL 16, replication of updates and delete to a table with replica identity full could be extremely slow (it does a sequential scan per tuple). As of PostgreSQL 16, the logical replication worker can use a regular btree index to find a matching tuple (if one exists). Even for distributed tables without any indexes, and without a replica identity, we could tactically set REPLICA IDENTITY FULL on the shards, and create a suitable index on the target shard group placement for the duration of the move. Once we implement this, we could avoid erroring for distributed tables without a replica identity.
## Shard splits
Shard splits convert one shard group ("split parent") into two or more shard groups ("split children") by splitting the hash range. The new shard groups can be placed on the node itself, or on other nodes. We currently implement blocking and non-blocking shard splits. The blocking variant is mostly a simplified version of non-blocking, so we only cover non-blocking here.
Shard splits convert one shard group ("split parent") into two or more shard groups ("split children") by splitting the hash range. The new shard groups can be placed on the node itself, or on other nodes. We implement blocking and non-blocking shard splits. The blocking variant is mostly a simplified version of non-blocking, so we only cover non-blocking here. Shard splits have many similarities to shard moves, and have the same `shard_transfer_mode` choice.
The shard split is a lengthy process performed by the `NonBlockingShardSplit` function, supported by a custom output plugin to handle writes that happen during the split. There are a few different entry-points in this logic, namely: `citus_split_shard_by_split_points`, `create_distributed_table_concurrently`, and `isolate_tenant_to_node`.
@ -2127,23 +2155,25 @@ A split involves the following steps:
- **Create the new shard groups ("split children") on the target node(s)**. We also create constraints that do not involve an index and set up ownership and access control.
- **Create "dummy" shard groups on the source node**, unless the split child is also on the source node. The reason for creating the dummy shards is primarily to make the pgoutput output plug-in happy. Our output plug-in maps changes to the split parent into changes to split children before calling pgouput, and those tables need to exist for pgoutput to be able to interpret and emit the change, even when that table is not actually used on the source node.
- **Create replica identities on dummy shards**. This is also needed to keep pgoutput happy, because for updates and deletes it emits the values in the replica identity columns, so it needs to know what the replica identity is.
- **Create publications on the source node**, which include both the parent and children. We add the split parent for our own output plug-in to recognize which shard group it should split, and we add the split children for pgoutput to recognize that it should emit them.
- **Create publication(s) on the source node**, which include both the parent and children. We add the split parent for our own output plug-in to recognize which shard group it should split, and we add the split children for pgoutput to recognize that it should emit them. We might make multiple publications per shard group because we use a separate publication and subscription per table owner, to prevent privilege escalation issues on older versions of PostgreSQL (15 and below).
- **Set up the shard split output plug-in**. We configure our output plug-in on the source node via `worker_split_shard_replication_setup`, which sets up a dynamic shared memory (DSM) segment that the output plug-in will read from. We currently only have one DSM segment, which would need to changed to support concurrent splits from the same node.
- **Create replication slots and export snapshots**. We cannot perform any write to the database before this step, because this step waits for all transactions that perform writes to finish. We create multiple slots because we use a separate slot & subscription per table owner, mainly to prevent privilege escalation issues on older versions of PostgreSQL (15 and below).
- **Create subscriptions in disabled state**. Once enabled, the subscriptions will activate the replication of writes that happen during the initial data copy. We create them upfront in case there are any errors (e.g. hitting resource limits, connectivity issues).
- **Split the data in the split parent into the split children** using `worker_split_copy` with the exported snapshot. The `worker_split_copy` function makes a single pass over the table and pushes it into the split children via `COPY` into the split children, either via a connection to another node or by invoking the COPY logic locally when the split children are on the same node. Internally, it uses the DestReceiver APIS and effectively it layers the DestReceiver used in re-partition operations on top of the DestReceiver used by `worker_shard_copy` in shard moves. We run a separate `worker_split_copy` task for every shard in the shard group and execute them via the adaptive executor, which may elect to parallelize them.
- **Enable subscriptions**, which starts the replication of writes that happened on the split parent during the data copy into the split children.
- **Wait for subscriptions to catch up to the current source LSN**. This can take a while since many writes could have happened during the data copy.
- **Create replication slot(s) and export snapshot(s)**. We cannot perform any write to the database before this step, because this step waits for all transactions that perform writes to finish. We create multiple slots because we use a separate slot per table owner, similar to publications.
- **Create subscription(s) in disabled state**. We create subscriptions upfront in case there are any errors (e.g. hitting resource limits, connectivity issues). We create a slot per table owner because we use a separate subscription per table owner, similar to publications. The logical replication will happen with the permissions of the subscription owner.
- **Split the data in the split parent into the split children** using `worker_split_copy` with the exported snapshot. The `worker_split_copy` function makes a single pass over the parent shard and pushes it into the split children via `COPY`, either via a connection to another node or by invoking the COPY logic locally when the split children are on the same node. Internally, it uses the DestReceiver APIS and effectively it layers the DestReceiver used in re-partition operations on top of the DestReceiver used by `worker_shard_copy` in shard moves. We run a separate `worker_split_copy` task for every shard in the shard group and execute them via the adaptive executor, which may elect to parallelize them.
- **Enable subscription(s)**, which starts the replication of writes that happened on the split parent during the data copy into the split children.
- **Wait for subscription(s) to catch up to the current source LSN**. This can take a while since many writes could have happened during the data copy.
- **Create indexes, unique/exclusion constraints, statistics, etc.**. For efficiency, we create these objects after copying the data and catching up to concurrent writes.
- **Wait for subscriptions to catch up to the current source LSN**. This can take a while since many writes could have happened during the index creation.
- **Block writes to the split parent by acquiring metadata locks**. At this point, we wait for any ongoing INSERT/UPDATE/DELETE/COPY/MERGE to finish and block new ones. Once we acquire the locks we try to quickly finalize the split.
- **Wait for subscriptions to catch up to the current source LSN**. Some writes could still have happened before acquiring locks, we wait for those writes to be replicated.
- **Wait for subscription(s) to catch up to the current source LSN**. This can take a while since many writes could have happened during the index creation.
- **Block writes to the split parent by acquiring metadata locks globally**. At this point, we wait for any ongoing INSERT/UPDATE/DELETE/COPY/MERGE to finish and block new ones. Once we acquire the locks we try to quickly finalize the split.
- **Wait for subscription(s) to catch up to the current source LSN**. Some writes could still have happened before acquiring locks, we wait for those writes to be replicated.
- **Update the metadata**. We globally delete the metadata of the split parent, and insert the metadata of the split children. In case of `create_distributed_table_concurrently` we also update `pg_dist_partition` and `pg_dist_colocation`.
- **Create partitioning hierarchy and foreign keys on the split children**. Creating these relationships is deferred until the replication is fully done, because we used multiple subscriptions for tables with different owners and this is the first time that the data is guaranteed to be consistent between shards. We avoid rechecking foreign keys by using the `citus.skip_constraint_validation` setting on the session.
- **Final cleanup of DSM, connections, resources**. We clean up all the resources we created such as publications, subscriptions, replication slots, dummy shards, as well as the split parent (in case of success) or split children (in case of failure). We currently do not clean up the DSM in case of failure, but we always idempotently reset it when doing another split.
- **Final cleanup of DSM, connections, resources**. We clean up all the resources we created such as publications, subscriptions, replication slots, dummy shards via "Resource lceanup", as well as the split parent (deferred, in case of success) or split children (in case of failure). We currently do not clean up the DSM in case of failure, but we always idempotently reset it when doing another split.
A difference between splits and moves is that the old shard ID disappears. In case of a move, only the placement changes and for writes we always look up placement in the executor after acquiring locks that conflict with moves (wait until move is done). In case of a split, the query changes in more fundamental ways, and a single-shard query might actually become a multi-shard queryif it were replanned. When a writes get to the executor, after acquiring locks that conflict with the shard split (wait until split is done), we check whether the shard still exists _in the metadata_ and in case of fast path queries (which are strictly single shard), we try to reroute in `TryToRerouteFastPathModifyQuery`. Otherwise, we error in `EnsureAnchorShardsInJobExist`. In case of reads, we lean on the deferred drop logic to let the read proceed on the old shard placement.
## Background tasks
## Resource cleanup
During a shard move/split, some PostgreSQL objects can be created that live outside of the scope of any transaction or are committed early. We need to make sure those objects are dropped once the shard move ends, either through failure or success. For instance, subscriptions and publications used for logical replication need to be dropped in case of failure, but also the target shard (in case of failure) and source shard (in case of success).