Add shard split description

pull/7226/head
Marco Slot 2023-09-22 14:59:27 +02:00 committed by Önder Kalacı
parent 736a5892e6
commit ae4df08245
1 changed files with 32 additions and 2 deletions

View File

@ -1502,7 +1502,7 @@ Citus primarily hooks into the PostgreSQL executor by producing a query plan wit
- PostgreSQL executor
- ExecutorRun_hook
- Subplans are executed before regular execution
- CustomScan functions are invoked is part of overall scan tree
- CustomScan functions are invoked as part of overall scan tree
- BeginCustomScan (which steps are included depends on the query)
- Function calls & distribution column parameters are evaluated
- Deferred shard pruning
@ -2114,6 +2114,36 @@ In the past we had some bugs where we had a `palloc` failure while holding `Spin
## Shard moves
## Shard splits
Shard splits convert one shard group ("split parent") into two or more shard groups ("split children") by splitting the hash range. The new shard groups can be placed on the node itself, or on other nodes. We currently implement blocking and non-blocking shard splits. The blocking variant is mostly a simplified version of non-blocking, so we only cover non-blocking here.
The shard split is a lengthy process performed by the `NonBlockingShardSplit` function, supported by a custom output plugin to handle writes that happen during the split. There are a few different entry-points in this logic, namely: `citus_split_shard_by_split_points`, `create_distributed_table_concurrently`, and `isolate_tenant_to_node`.
We currently do not build a separate .so file for the output plug-in, so it is part of citus.so and therefore the name of the output plug-in is "citus". The purpose of the output plug-in is to translate changes to the original shard group to changes to the split children, and emit them in pgoutput format (by calling the original pgoutput plug-in). In some cases, the schema of the split parent can be subtly different from the split children. In particular, some columns may have been dropped on the parent. Dropped columns remain in the metadata and remaining columns are not renumbered, but when we create the split children we only create it with current columns. When this scenario occurs, we convert the tuple in `GetTupleForTargetSchema`.
A split involves the following steps:
- **Create the new shard groups ("split children") on the target node(s)**. We also create constraints that do not involve an index and set up ownership and access control.
- **Create "dummy" shard groups on the source node**, unless the split child is also on the source node. The reason for creating the dummy shards is primarily to make the pgoutput output plug-in happy. Our output plug-in maps changes to the split parent into changes to split children before calling pgouput, and those tables need to exist for pgoutput to be able to interpret and emit the change, even when that table is not actually used on the source node.
- **Create replica identities on dummy shards**. This is also needed to keep pgoutput happy, because for updates and deletes it emits the values in the replica identity columns, so it needs to know what the replica identity is.
- **Create publications on the source node**, which include both the parent and children. We add the split parent for our own output plug-in to recognize which shard group it should split, and we add the split children for pgoutput to recognize that it should emit them.
- Set up the shard split output plug-in**. We configure our output plug-in on the source node via `worker_split_shard_replication_setup`, which sets up a dynamic shared memory (DSM) segment that the output plug-in will read from. We currently only have one DSM segment, which would need to changed to support concurrent splits from the same node.
- **Create replication slots and export snapshots**. We cannot perform any write to the database before this step, because this step waits for all transactions that perform writes to finish. We create multiple slots because we use a separate slot & subscription per table owner, mainly to prevent privilege escalation issues on older versions of PostgreSQL (15 and below).
- **Create subscriptions in disabled state**. Once enabled, the subscriptions will activate the replication of writes that happen during the initial data copy. We create them upfront in case there are any errors (e.g. hitting resource limits, connectivity issues).
- **Split the data in the split parent into the split children** using `worker_split_copy` with the exported snapshot. The `worker_split_copy` function makes a single pass over the table and pushes it into the split children via `COPY` into the split children, either via a connection to another node or by invoking the COPY logic locally when the split children are on the same node. Internally, it uses the DestReceiver APIS and effectively it layers the DestReceiver used in re-partition operations on top of the DestReceiver used by `worker_shard_copy` in shard moves. We run a separate `worker_split_copy` task for every shard in the shard group and execute them via the adaptive executor, which may elect to parallelize them.
- **Enable subscriptions**, which starts the replication of writes that happened on the split parent during the data copy into the split children.
- **Wait for subscriptions to catch up to the current source LSN**. This can take a while since many writes could have happened during the data copy.
- **Create indexes, unique/exclusion constraints, statistics, etc.**. For efficiency, we create these objects after copying the data and catching up to concurrent writes.
- **Wait for subscriptions to catch up to the current source LSN**. This can take a while since many writes could have happened during the index creation.
- **Block writes to the split parent by acquiring metadata locks**. At this point, we wait for any ongoing INSERT/UPDATE/DELETE/COPY/MERGE to finish and block new ones. Once we acquire the locks we try to quickly finalize the split.
- **Wait for subscriptions to catch up to the current source LSN**. Some writes could still have happened before acquiring locks, we wait for those writes to be replicated.
- **Update the metadata**. We globally delete the metadata of the split parent, and insert the metadata of the split children. In case of `create_distributed_table_concurrently` we also update `pg_dist_partition` and `pg_dist_colocation`.
- **Create partitioning hierarchy and foreign keys on the split children**. Creating these relationships is deferred until the replication is fully done, because we used multiple subscriptions for tables with different owners and this is the first time that the data is guaranteed to be consistent between shards. We avoid rechecking foreign keys by using the `citus.skip_constraint_validation` setting on the session.
- **Final cleanup of DSM, connections, resources**. We clean up all the resources we created such as publications, subscriptions, replication slots, dummy shards, as well as the split parent (in case of success) or split children (in case of failure). We currently do not clean up the DSM in case of failure, but we always idempotently reset it when doing another split.
A difference between splits and moves is that the old shard ID disappears. In case of a move, only the placement changes and for writes we always look up placement in the executor after acquiring locks that conflict with moves (wait until move is done). In case of a split, the query changes in more fundamental ways, and a single-shard query might actually become a multi-shard queryif it were replanned. When a writes get to the executor, after acquiring locks that conflict with the shard split (wait until split is done), we check whether the shard still exists _in the metadata_ and in case of fast path queries (which are strictly single shard), we try to reroute in `TryToRerouteFastPathModifyQuery`. Otherwise, we error in `EnsureAnchorShardsInJobExist`. In case of reads, we lean on the deferred drop logic to let the read proceed on the old shard placement.
## Resource cleanup
During a shard move/split, some PostgreSQL objects can be created that live outside of the scope of any transaction or are committed early. We need to make sure those objects are dropped once the shard move ends, either through failure or success. For instance, subscriptions and publications used for logical replication need to be dropped in case of failure, but also the target shard (in case of failure) and source shard (in case of success).
@ -2216,4 +2246,4 @@ Some distributed databases distinguish the Query Nodes and Data Nodes. As the na
<img alt="Dedicated query nodes benchmarks" src="../../../images/mx-dedicated-query-nodes.png" width="500">
If this discussion comes up again, we suggest running some more benchmarks and ensuring the performance characteristics do not change dramatically. We do not foresee any architectural problems with that. It mostly comes down to price, performance, and product discussions. Note that you can quickly test this by disallowing certain nodes to have shards on them. Some important considerations is whether reference tables should still be present on query nodes, and whether there are any behavioural differences between query nodes and the coordinator.
If this discussion comes up again, we suggest running some more benchmarks and ensuring the performance characteristics do not change dramatically. We do not foresee any architectural problems with that. It mostly comes down to price, performance, and product discussions. Note that you can quickly test this by disallowing certain nodes to have shards on them. You should also consider whether reference tables should still be present on query nodes, and whether there are any behavioural differences between query nodes and the coordinator.