Update README.md

pull/7226/head
Marco Slot 2023-09-18 13:02:23 +02:00 committed by Önder Kalacı
parent 0c7969c855
commit 60f99bf32c
1 changed files with 59 additions and 27 deletions

View File

@ -1,7 +1,6 @@
# Citus Technical Documentation
The purpose of this document is to provide comprehensive technical documentation for Citus.
The purpose of this document is to provide comprehensive technical documentation for Citus, in particular the distributed database implementation.
# Citus Concepts
@ -1273,7 +1272,7 @@ ERROR: complex joins are only supported when all distributed tables are joined
1. **Repartition Joins**:
- Repartition joins are generally incompatible with both recursive and pushdown planning. If a query uses recursive planning, it can't also use repartition joins.
- Repartition joins are generally incompatible with both recursive and pushdown planning. If a query uses recursive planning, it can't also use repartition joins. However, re-partition joins can be in a CTE that is recursively planned.
@ -1496,12 +1495,12 @@ In summary, the Recurring Tuples concept in Citus helps in managing and identify
# Executor
The overall hierarchy of where Citus hooks into the executor looks like this:
Citus primarily hooks into the PostgreSQL executor by producing a query plan with a CustomScan. The overall hierarchy of where Citus hooks into the executor looks like this:
- PostgreSQL executor
- ExecutorRun_hook
- Subplans are executed before regular execution
- CustomScan functions are invoked is part of overall scan tree in regular ExecutorRun
- CustomScan functions are invoked is part of overall scan tree
- BeginCustomScan (which steps are included depends on the query)
- Function calls & distribution column parameters are evaluated
- Deferred shard pruning
@ -1590,9 +1589,9 @@ The current structure is “less than ideal”, but by now it is battle hardened
### Local plan caching
We currently only take advantage of plan caching for shard queries when querying a single local shard groups that uses deferred pruning (described in the previous section) or is a DML. This avoids reparsing or replanning the query on the local shard. That works well in combination with smart clients that immediately connect to the right node, function call delegation, triggers, and Citus local tables, which are meant to behave like regular PG tables.
We currently only take advantage of plan caching for shard queries that access a single local shard groups and use deferred pruning (described in the previous section). This avoids reparsing or replanning the query on the local shard. That works well in combination with smart clients that immediately connect to the right node, function call delegation, triggers, and Citus local tables.
We can only know whether we are dealing with a local shard group after evaluating parameters and functions. Immediately after that, we plan the query on the local shard group and store the resulting (regular PG) plan in the distributed plan data structure (Job). The reason we store it in the distributed plan is that it is already cached by PostgreSQL, so anything we add to the plan will be cached along with it, with the correct lifecycle. We store a list of local plans, one for each shard plans.
We can only know whether we are dealing with a local shard group after evaluating parameters and functions. Immediately after that, we plan the query on the local shard group and store the resulting (regular PG) plan in the distributed plan data structure (Job). The reason we store it in the distributed plan is that it is already cached by PostgreSQL, so anything we add to the plan will be cached along with it, with the correct lifecycle. We store a list of local plans, one for each shard plan.
Local plan caching quite significantly improves performance for certain workloads, but it comes with a subtle caveat. For queries with deferred pruning, we only know whether the shard query is on a local shard query after evaluating parameters and function calls, which we do by replacing them in the query tree. However, to obtain a cacheable generic plan, we need to use the original query tree which still has the original function calls and parameters. That means re-execute those function calls when executing the shard query, which is unusual since we usually only execute them in the BeginCustomScan hook. Since we only do this for local execution, the function calls will still run in the same process and will therefore have the same effect, but it means we sometimes evaluate function calls twice. That is acceptable for stable functions, but not for volatile functions. We therefore skip caching when there are calls to volatile functions.
@ -1636,25 +1635,19 @@ The 10ms was chosen to be higher than a typical connection-establishment time, b
**The citus.max_shared_pool_size setting can be used to limit the pool sizes globally**. Its important to reiterate that the adaptive executor operates in the context of a single process. Each coordinating process has its own pools of connections to other nodes. This would lead to issues if e.g. the client makes 200 connections which each make 4 connections per node (800 total) concurrently while max_connections is 500. Therefore, there is a global limit on the number of connections configured by max_shared_pool_size. The citus.max_shared_pool_size is implemented in the connection management layer rather than the executor. Refer to the connection management section for details.
**The comment on top of adaptive_executor.c has a detailed description of the underlying data structures.** While these data structures are complex and this might look like an area technical debt, the current data structures and algorithm have proven to be a relatively elegant and robust way to meet all the different requirements. It is worth noting that a significant part of the complexity comes from dealing with replication, and shard replication is mostly a deprecated feature, but keep in mind that reference tables are also replicated tables and most of the same logic applies.
citus/src/backend/distributed/executor/adaptive_executor.c at main · citusdata/citus (github.com)
**The comment on top of [adaptive_executor.c](executor/adaptive_executor.c) has a detailed description of the underlying data structures.** While these data structures are complex and this might look like an area technical debt, the current data structures and algorithm have proven to be a relatively elegant and robust way to meet all the different requirements. It is worth noting that a significant part of the complexity comes from dealing with replication, and shard replication is mostly a deprecated feature, but keep in mind that reference tables are also replicated tables and most of the same logic applies.
## Local execution
When the adaptive executor completes all of its remote tasks, the final step is to perform local execution. We formally see this as part of the adaptive executor, though the code is largely separate (in local_executor.c).
When the adaptive executor completes all of its remote tasks, the final step is to perform local execution. We formally see this as part of the adaptive executor, though the code is largely separate (in local_executor.c). Local execution is essentially just executing the shard queries on local shards directly by invoking the planner & executor. In other words, there is no additional backends or connections are established for local execution.
Local execution is essentially using the client backend that user established. In other words, there is no additional backends or connections are established for local execution.
Some queries strictly require local execution. In particular, queries that depend on objects (types, tables) created by the current transaction, or joins between regular tables and Citus local or reference tables.
This sometimes causes interesting side-effects. For example, CREATE INDEX CONCURRENTLY requires all active transactions to be committed to proceed. Now, the command on the shards needs to wait the command on the shell table to be finished. However, all happening over the same backend. To avoid these edge cases, Citus sometimes injects some additional (sub)transactions withing the local execution code-paths.
In case of a multi-shard query, a downside of local execution is that there is no parallelism across shards. Therefore, the executor tries to avoid local execution for simple multi-shard queries outside of a transaction block. Instead, it will open multiple connections to localhost to run queries in parallel. In a multi-statement transaction, the executor always prefers local execution even for multi-shard queries, since the tranasaction might also perform operations that require local execution.
In some cases, say a query on a Citus local table on the coordinator, the execution only does a local execution. In some cases, say a query on a distributed table from a worker node, then some shards are queried over remote execution and some over local execution.
Some queries cannot local execution. For instance, we cannot use CREATE INDEX CONCURRENTLY as part of a bigger transaction, and we have not implemented a local version of EXPLAIN ANALYZE. We also cannot perform replication commands like creating a subscription via local execution. For the most part, these commands are typically executed outside of a transaction block or as internal commands, so it does not significantly affect the user experience.
When the execution switches to local execution, there is no further parallelization that can be done for the local tasks. All commands executed over the same backend.
The reason Citus does the local execution after remote execution is that if there are any problems with the remote execution, Citus can still switch back (e.g., failover) to local execution.
All transactions that access to local placements switch to local execution automatically. This is required otherwise a command in the same transaction that accesses a local object say a local postgres table would cause transaction visibility/deadlock problems.
The executor always does the local execution after remote execution. That way, if there are any problems with the remote execution, Citus can still switch back (e.g., failover) to local execution.
## Subplans
@ -1736,13 +1729,35 @@ Merge command the same principles as INSERT .. SELECT processing. However, due t
# DDL
DDL commands are primarily handled via the ProcessUtility hook, which gets the parse tree of the DDL command. For supported DDL commands, we always follow the same sequence of steps:
1. Qualify the table names in the parse tree (simplifies deparsing, avoids sensitivity to search_path changes)
2. Pre-process logic
3. Call original ProcessUtility to execute the command on the local shell table
4. Post-process logic
5. Execute command on all other nodes
6. Execute command on shards (in case of table DDL)
## Table DDL
Either the pre-process or post-process step generates a "Distributed DDL Job", which contains a task list to run in steps 4 & 5 (via adaptive executor).
## Object DDL
In general pre-process should:
- Acquire any locks that are needed beyond the ones PostgreSQL will acquire in step 3
- Perform upfront error checks (e.g. is this unique constrained allowed on a distributed table?)
Post-process should:
- Ensure dependencies of the current object exist on worker nodes (e.g. types used in parameters when creating a function)
- Deparse the DDL parse tree to a string
- Generate a task list using the deparsed DDL command
The reason for handling dependencies and deparsing in post-process step is that in case of a CREATE/ALTER, the object exist in its intended form at that point. In case of a DROP, the opposite is true and the pre-process should be used. Most commands have either a pre-process or post-process function. We have not been particularly careful about defining what should be done in pre-process vs. post-process, so the steps are not always the same across different commands.
Not all table DDL is currently deparsed. In that case, the original command sent by the client is used. That is a shortcoming in our DDL logic that causes user-facing issues and should be addressed. We do not directly construct a separate DDL command for each shard. Instead, we call the `worker_apply_shard_ddl_command(shardid bigint, ddl_command text)` function which parses the DDL command, replaces the table names with shard names in the parse tree according to the shard ID, and then executes the command. That also has some shortcomings, because we cannot support more complex DDL commands in this manner (e.g. adding multiple foreign keys). Ideally, all DDL would be deparsed, and for table DDL the deparsed query string would have shard names, similar to regular queries.
## Object & dependency propagation
TODO
## Foreign keys
@ -1790,7 +1805,24 @@ The connection management logic is divided into two parts:
## Connection management
Connection management tracks all the connections made by the current backend to other worker nodes. The connections can exist for the lifetime of the transaction, or longer when they are cached. The connections are kept in a hash that is keyed by hostname, port, user, database, and replication (yes/no). Each hash entry has a list of connections, since there can be multiple when the executor decides to parallelize a multi-shard query.
Citus operations that need a connection call `StartNodeUserDatabaseConnection` (or a wrapper), which either returns an existing connection or a new one. the caller should wait for the connection to be fully established.
When a Citus operation needs a connection to a worker node (hostname, port, user, database, replication), it can ask for it in a few different ways via flags:
- Pick any connection (default), open a new one if none exists
- Force a new connection (FORCE_NEW_CONNECTION), even if connections already exist
- Pick a connection outside of a transaction (OUTSIDE_TRANSACTION), or open a new one if none exists
- Pick the connection used for metadata syncing (REQUIRE_METADATA_CONNECTION), or open a new one if none exists and mark it for metadata syncing
In addition, the caller can claim a connection exclusively, in which case it will not be returned until it is unclaimed (or transaction end). For instance, the adaptive executor claims connections it uses exclusively. When it calls `StartNodeUserDatabaseConnection` again, it will always get a new connection that it can use to parallelize the query.
It is important that global commands like creating a type, or a function, or changing Citus metadata, all use the same connection. Otherwise, we might end up creating a type over one connection, and a function that depends on it over another. The use of the REQUIRE_METADATA_CONNECTION flag prevents this.
The FORCE_NEW_CONNECTION and OUTSIDE_TRANSACTION flags can BOTH be used to execute (and commit) commands outside of the current transaction. Many usages of the older FORCE_NEW_CONNECTION flag could perhaps be replaced by OUTSIDE_TRANSACTION. A benefit of FORCE_NEW_CONNECTION is that it can provide a more intuitive way to parallelize commands than claiming connections exclusively. For instance, the `run_command_on_shards` uses FORCE_NEW_CONNECTION for this purpose.
It is worth noting that Citus currently always opens a new connection when switching to a different user (e.g. via SET ROLE), rather than propagating the SET ROLE command. That can lead to some inconsistent behaviour (e.g. cannot see uncommitted writes after SET ROLE).
## Placement connection tracking
@ -1804,17 +1836,17 @@ We sometimes allow the same shard group placement to be accessed from different
A downside of the current placement connection tracking logic is that it does not consider foreign keys to reference tables, and the fact that writes and locks can cascade from a write to a reference table. We have a separate subsystem for error checking those scenarios (relation_access_tracking.c), but it would be nice if they can be unified.
## max_cached_connections_per_worker
## citus.max_cached_connections_per_worker
An important part of the connection management is caching at least 1 outgoing connection per worker node in the session. Establishing a new connection for every query is quite expensive due to SSL establishment, forking a process on the worker node, and rebuilding caches. Transactional workloads that have a high rate of short-running queries benefit a lot from caching connections. For analytical queries that take hundreds of milliseconds or more, the relative benefit is smaller, but often still noticeable.
At the end of a transaction, the connection management logic decides which connections to keep. It keeps at most `citus.max_cached_connections_per_worker` regular connections that are in a healthy state, unless they are open for more than `citus.max_cached_connection_lifetime` (10 minutes by default). For workloads with a high rate of multi-shard queries, it can be beneficial to increase `citus.max_cached_connections_per_worker`.
## max_shared_pool_size
## citus.max_shared_pool_size
**The citus.max_shared_pool_size setting can be used to limit the number of outgoing connections across processes **. Each session has its own set of connections to other nodes. We often make multiple connections to the same worker node from the same session to parallelize analytical queries, but if all session are doing that we might overload the worker nodes with connections. That is prevented by setting citus.max_shared_pool_size, which should be at least `citus.max_client_connections` on coordinator node, and at most `max_connections - citus.max_client_connections` on worker node.
The principle behind citus.max_shared_pool_size is that under high concurrency (all client connections used) it converges to each process having 1 connection per node. To do so, we distinguish between “optional” and “required” connections. When the executor asks the connection management layer for a connection, the first connection to a node is always required, and other connections are optional. If all connection slots are in use, the connection manager blocks until one is available when asking for a required connection, or returns NULL when asking for an optional connection. That signals to the executor that it cannot currently expand its pool. It may try again later. Most Citus code paths are tweaked to be able to complete their operation with 1 connection per node, and use local execution for local shards.
The principle behind `citus.max_shared_pool_size` is that under high concurrency (all client connections used) it converges to each process having 1 connection per node. To do so, we distinguish between “optional” and “required” connections. When the executor asks the connection management layer for a connection, the first connection to a node is always required, and other connections are optional. If all connection slots are in use, the connection manager blocks until one is available when asking for a required connection, or returns NULL when asking for an optional connection. That signals to the executor that it cannot currently expand its pool. It may try again later. Most Citus code paths are tweaked to be able to complete their operation with 1 connection per node, and use local execution for local shards.
Note that `citus.max_shared_pool_size` can only control the number of outgoing connections on a single node. When there are many nodes, the number of possible inbound internal connections is the sum of the `citus.max_shared_pool_size` on all other nodes. To ensure this does not exceed max_connections, we recommend that `sum(citus.max_client_connections) < max_connections`.