Add back bullets to readme (#7208)

pull/7226/head
Marco Slot 2023-09-18 10:32:10 +02:00 committed by Önder Kalacı
parent c4b72306f2
commit 8437c8ea49
1 changed files with 82 additions and 121 deletions

View File

@ -1,9 +1,8 @@
Citus Technical Documentation # Citus Technical Documentation
The purpose of this document is to provide comprehensive technical documentation for Citus. The purpose of this document is to provide comprehensive technical documentation for Citus.
# Citus Concepts # Citus Concepts
**Citus table**: A table managed by Citus through PostgreSQL hooks. Whenever the table is involved in a command, the command is handled by Citus. **Citus table**: A table managed by Citus through PostgreSQL hooks. Whenever the table is involved in a command, the command is handled by Citus.
@ -18,11 +17,9 @@ There are several types of Citus tables:
**Distributed tables** are created using SELECT create_distributed_table(..). They have a distribution column and for each row the value in the distribution column determines which to shard the row is assigned. There are 3 different partitioning schemes for distributed tables, though only the first is supported: **Distributed tables** are created using SELECT create_distributed_table(..). They have a distribution column and for each row the value in the distribution column determines which to shard the row is assigned. There are 3 different partitioning schemes for distributed tables, though only the first is supported:
Hash-distributed tables have a range of hash values in shardminvalue/shardmaxvalue in pg_dist_shard - Hash-distributed tables have a range of hash values in shardminvalue/shardmaxvalue in pg_dist_shard
- Range-distributed tables (deprecated) have shards with a distinct range of values in pg_dist_shard
Range-distributed tables (deprecated) have shards with a distinct range of values in pg_dist_shard - Append-distributed tables (deprecated) have shards with a range of values in pg_dist_shard, though the ranges can overlap.
Append-distributed tables (deprecated) have shards with a range of values in pg_dist_shard, though the ranges can overlap.
Hash-distributed tables can be **co-located** with each other, such that the shards with the same hash value range are always on the same node. From the planner point-of-view, range-distributed tables can also be colocated, but shards are not managed by Citus. Hash-distributed tables can be **co-located** with each other, such that the shards with the same hash value range are always on the same node. From the planner point-of-view, range-distributed tables can also be colocated, but shards are not managed by Citus.
@ -76,47 +73,35 @@ In the query planner, we use the following terminology:
Use cases: Use cases:
Multi-tenant apps are the primary use case for Citus, which we can scale through distributing and co-locating by tenant ID, or through schema-based sharding. Citus is reasonably complete for this use case, but there are still SQL and operational improvements that can be made. Database sharding is also useful. - Multi-tenant apps are the primary use case for Citus, which we can scale through distributing and co-locating by tenant ID, or through schema-based sharding. Citus is reasonably complete for this use case, but there are still SQL and operational improvements that can be made.
- Real-time analytics is another popular use case due the combination of parallel distributed queries with indexes & in-database materialization (ETL). Improvement areas are automated time partitioning, better columnar storage (perf and update/delete, and incremental materialized views.
Real-time analytics is another popular use case due the combination of parallel distributed queries with indexes & in-database materialization (ETL). Improvement areas are automated time partitioning, better columnar storage (perf and update/delete, and incremental materialized views. - Citus works well for CRUD use cases, but would be far easier to use if we introduced a load balancer, DDL from any node (no explicit coordinator), and more performant by better use of connection pooling (e.g. outbound pgbouncers).
- Marketplace use cases could work well if we made it easier to distribute tables twice by different dimensions or made it easier to keep paired tables in sync.
Citus works well for CRUD use cases, but would be far easier to use if we introduced a load balancer, DDL from any node (no explicit coordinator), and more performant by better use of connection pooling (e.g. outbound pgbouncers).
Marketplace use cases could work well if we made it easier to distribute tables twice by different dimensions or made it easier to keep paired tables in sync.
Schema management: Schema management:
Our goal is for all DDL commands on Citus tables to work transparently, and for global DDL commands (e.g. CREATE TYPE) to be propagated to all nodes. Not all DDL is implemented yet and may either error or not propagate. - Our goal is for all DDL commands on Citus tables to work transparently, and for global DDL commands (e.g. CREATE TYPE) to be propagated to all nodes. Not all DDL is implemented yet and may either error or not propagate.
- Since we cannot define custom DDL commands for sharding operations, we use functions that are called from a SELECT query.
Since we cannot define custom DDL commands for sharding operations, we use functions that are called from a SELECT query.
Query layer: Query layer:
No incompatibilities with PostgreSQL any query on a Citus table is supported on an equivalent PostgreSQL table. - No incompatibilities with PostgreSQL any query on a Citus table is supported on an equivalent PostgreSQL table.
- We optimize for maximum pushdown (& performance) over complete compatibility, but our long-term goal is for all queries to be supported in all cases.
We optimize for maximum pushdown (& performance) over complete compatibility, but our long-term goal is for all queries to be supported in all cases. - For single-shard queries, it is useful to avoid detailed query analysis through the fast path planner (simple, single table) and router planner (co-located joins) layers. However, multi-shard queries can go through disparate code paths that were added out of expediency and should eventually be unified.
For single-shard queries, it is useful to avoid detailed query analysis through the fast path planner (simple, single table) and router planner (co-located joins) layers. However, multi-shard queries can go through disparate code paths that were added out of expediency and should eventually be unified.
Transactional semantics: Transactional semantics:
Transactions scoped to a single node follow the same semantics as PostgreSQL. - Transactions scoped to a single node follow the same semantics as PostgreSQL.
- Transactions across nodes are atomic, durable, and consistent, but do not have full snapshot isolation: A multi-shard query may see a concurrently committing transaction as committed on one node, but not yet committed on another node.
Transactions across nodes are atomic, durable, and consistent, but do not have full snapshot isolation: A multi-shard query may see a concurrently committing transaction as committed on one node, but not yet committed on another node. - Read-your-writes consistency should be preserved.
- Monotonic read consistency should be preserved for tables without replication, may not always be the case for replicated/reference tables.
Read-your-writes consistency should be preserved.
Monotonic read consistency should be preserved for tables without replication, may not always be the case for replicated/reference tables.
Replication model: Replication model:
High availability is achieved through hot standby nodes. - High availability is achieved through hot standby nodes.
- Read replicas are Citus cluster in which each node is a physical replica of a node in another Citus cluster. Since the read replica cannot have its own metadata.
Read replicas are Citus cluster in which each node is a physical replica of a node in another Citus cluster. Since the read replica cannot have its own metadata. - Hot standby nodes are, at the time of writing, not in the metadata. Instead, the hostname/IP is replaced or rerouted at failover time.
- The deprecated “statement based” replication is (as of Citus 11.0+) only useful for providing read scalability, not for HA as all modifications are done via 2PC. Reference tables do use statement-based replication.
Hot standby nodes are, at the time of writing, not in the metadata. Instead, the hostname/IP is replaced or rerouted at failover time.
The deprecated “statement based” replication is (as of Citus 11.0+) only useful for providing read scalability, not for HA as all modifications are done via 2PC. Reference tables do use statement-based replication.
# Use of hooks # Use of hooks
@ -1513,33 +1498,20 @@ In summary, the Recurring Tuples concept in Citus helps in managing and identify
The overall hierarchy of where Citus hooks into the executor looks like this: The overall hierarchy of where Citus hooks into the executor looks like this:
PostgreSQL executor - PostgreSQL executor
- ExecutorRun_hook
ExecutorRun_hook - Subplans are executed before regular execution
- CustomScan functions are invoked is part of overall scan tree in regular ExecutorRun
Subplans are executed before regular execution - BeginCustomScan (which steps are included depends on the query)
- Function calls & distribution column parameters are evaluated
CustomScan functions are invoked is part of overall scan tree in regular ExecutorRun - Deferred shard pruning
- Lock shards to prevent concurrent move (write only)
BeginCustomScan (which steps are included depends on the query) - Find placements for shards
- ExecCustomScan
Function calls & distribution column parameters are evaluated - Adaptive Executor executes a list of tasks and concatenates the results into a tuple store
- Re-partition jobs are executed
Deferred shard pruning - Remote tasks are executed
- Local tasks are executed
Lock shards to prevent concurrent move (write only)
Find placements for shards
ExecCustomScan
Adaptive Executor executes a list of tasks and concatenates the results into a tuple store
Re-partition jobs are executed
Remote tasks are executed
Local tasks are executed
We describe each part in more detail below. We describe each part in more detail below.
@ -1561,11 +1533,9 @@ Whether a function call should be evaluated once on the coordinator, or many tim
So far, the function evaluation logic does not distinguish between different contexts within queries. Instead, we follow a simple policy: So far, the function evaluation logic does not distinguish between different contexts within queries. Instead, we follow a simple policy:
For inserts, evaluate all function calls, including calls to volatile functions, but disallow stable/volatile functions in RETURNING - For inserts, evaluate all function calls, including calls to volatile functions, but disallow stable/volatile functions in RETURNING
- For update/delete, evaluate all function calls, but disallow volatile functions
For update/delete, evaluate all function calls, but disallow volatile functions - For select, do not evaluate function calls on coordinator (not entirely correct)
For select, do not evaluate function calls on coordinator (not entirely correct)
When DML commands appear in a CTE, the restriction only applies to the CTE. In many cases, the CTE will in that case be planned and executed separately through recursive planning. When DML commands appear in a CTE, the restriction only applies to the CTE. In many cases, the CTE will in that case be planned and executed separately through recursive planning.
@ -1583,15 +1553,11 @@ The plan of a prepared statement is only cached when the same prepared statement
There are a few important cases to distinguish in case of Citus: There are a few important cases to distinguish in case of Citus:
Multi-shard queries vs. single shard (Fast path & router) - Multi-shard queries vs. single shard (Fast path & router)
- Custom plan vs. Generic plan.
Custom plan vs. Generic plan. - Parameter in a filter on the distribution column vs. only on other columns
- Local vs. remote execution
Parameter in a filter on the distribution column vs. only on other columns - Combinations of parameters & function evaluation.
Local vs. remote execution
Combinations of parameters & function evaluation.
Lets start with the simplest case: Multi-shard queries. These queries have complex planning logic, and it would be even more complex if the planner did not know the values of parameters. Therefore, we dissuade PostgreSQL from using a generic plan by returning a mock PlannedStmt with an extremely high cost when asked for a generic plan (see DissuadePlannerFromUsingPlan). That will cause PostgreSQL to keep using a custom plan with known parameter values. In addition, we replace any Params that appear in the query tree with their Const values in ResolveExternalParams before distributed planning, so the remaining planner logic does not need to concern itself with query parameters. Lets start with the simplest case: Multi-shard queries. These queries have complex planning logic, and it would be even more complex if the planner did not know the values of parameters. Therefore, we dissuade PostgreSQL from using a generic plan by returning a mock PlannedStmt with an extremely high cost when asked for a generic plan (see DissuadePlannerFromUsingPlan). That will cause PostgreSQL to keep using a custom plan with known parameter values. In addition, we replace any Params that appear in the query tree with their Const values in ResolveExternalParams before distributed planning, so the remaining planner logic does not need to concern itself with query parameters.
@ -1599,9 +1565,8 @@ For single shard queries, the story is a lot more complex. An important question
We do not precisely distinguish all possible cases, but rather have a simple distinction: We do not precisely distinguish all possible cases, but rather have a simple distinction:
Fast path queries are simple queries on a single table with a <distribution column> = <Param or Const> filter (or single row inserts). We know that they prune to at most 1 shard regardless of the parameter value. The case of “distcol = NULL” is false/null by definition (unlike “distcol IS NULL”) and therefore prunes to 0 shards. - Fast path queries are simple queries on a single table with a <distribution column> = <Param or Const> filter (or single row inserts). We know that they prune to at most 1 shard regardless of the parameter value. The case of “distcol = NULL” is false/null by definition (unlike “distcol IS NULL”) and therefore prunes to 0 shards.
- Router queries are arbitrarily complex queries that prune down to a single shard at planning time based on the RestrictInfo data structures obtained from postgres planner.
Router queries are arbitrarily complex queries that prune down to a single shard at planning time based on the RestrictInfo data structures obtained from postgres planner.
We can only decide whether a query is a router query in the planner, because if it is not a router query, we need to fall back to the multi-shard query planning code path. Hence, we can only support generic router plans when all distribution column filters are constant, or there are only single shard/reference tables in the query. The router planner cannot prune based on unbound parameters and will therefore return a soft error. When the planner sees a soft error, we return a mock plan with a high cost, similar to multi-shard queries. We can only decide whether a query is a router query in the planner, because if it is not a router query, we need to fall back to the multi-shard query planning code path. Hence, we can only support generic router plans when all distribution column filters are constant, or there are only single shard/reference tables in the query. The router planner cannot prune based on unbound parameters and will therefore return a soft error. When the planner sees a soft error, we return a mock plan with a high cost, similar to multi-shard queries.
@ -1609,17 +1574,15 @@ Fast path queries prune to a single shard regardless of the parameter values. If
For both fast path queries and router queries, the job query tree for single shard queries still has all the parameters when we get to the executor. We resolve the parameters in the query tree before deparsing when: For both fast path queries and router queries, the job query tree for single shard queries still has all the parameters when we get to the executor. We resolve the parameters in the query tree before deparsing when:
pruning is deferred (has WHERE distcol = $1 …) - pruning is deferred (has WHERE distcol = $1 …)
- the query is a DML that contains function calls that need to resolved
the query is a DML that contains function calls that need to resolved
The latter happens primarily because function evaluation also resolves parameters. Otherwise, it could would not be able to resolve expressions like stable_fn($1). If the parameters are not resolved in the executor, they are passed on to the worker node using the libpq functions that take parameters. The latter happens primarily because function evaluation also resolves parameters. Otherwise, it could would not be able to resolve expressions like stable_fn($1). If the parameters are not resolved in the executor, they are passed on to the worker node using the libpq functions that take parameters.
Both fast path and router query plans can be cached by PostgreSQL (plancache.c) if they are executed at least 5 times. Both fast path and router query plans can be cached by PostgreSQL (plancache.c) if they are executed at least 5 times.
the query pruned to a single shard in the planner, the task is therefore static (always goes to the same shard group, with same query string) - the query pruned to a single shard in the planner, the task is therefore static (always goes to the same shard group, with same query string)
- the query uses deferred pruning, the shard group is therefore decided in the executor (not cached, query string rebuilt)
the query uses deferred pruning, the shard group is therefore decided in the executor (not cached, query string rebuilt)
Both scenarios reduce compute cycles in terms of planning the distributed query, but the plan for the shard query is never cached, except in the local execution case, which is described in the next section. Both scenarios reduce compute cycles in terms of planning the distributed query, but the plan for the shard query is never cached, except in the local execution case, which is described in the next section.
@ -1647,7 +1610,7 @@ Historically, Citus executed single shard queries via a single connection per wo
**The executor should gracefully handle failures**. One of the more challenging parts of doing remote, concurrent query execution is handling a variety of failures, including timeouts, failed connections, and query errors. The handling can be different for reads and writes, since reads on replicated tables can fail over to a different placement. **The executor should gracefully handle failures**. One of the more challenging parts of doing remote, concurrent query execution is handling a variety of failures, including timeouts, failed connections, and query errors. The handling can be different for reads and writes, since reads on replicated tables can fail over to a different placement.
**The executor should consider replicated writes**. **The executor should consider replicated shards**. Writes to reference tables (or replicated shards) need to be sent to all nodes, while reads can fail over to other replicas. Update/delete can be performed in parallel due to the exclusive lock on the shard, while inserts need to run in a consistent order to avoid deadlocks in case of constraint violations. The executor also needs to consider that replicas may be on the local node and use local execution.
To fulfill the first two requirements, the adaptive executor uses a (process-local) **pool of connections per node**, which typically starts at 1 connection, but can grow based on the runtime of the query. Queries on shard groups that were already modified are assigned to the connection that performed the modification(s), while remaining queries are assigned to the pool (to be parallelized at will). To fulfill the first two requirements, the adaptive executor uses a (process-local) **pool of connections per node**, which typically starts at 1 connection, but can grow based on the runtime of the query. Queries on shard groups that were already modified are assigned to the connection that performed the modification(s), while remaining queries are assigned to the pool (to be parallelized at will).
@ -1767,9 +1730,7 @@ INSERT..SELECT via the coordinator logic uses the COPY code path to write result
## Merge command ## Merge command
Merge command the same principles as INSERT .. SELECT processing. However, due to the nature of distributed systems, there are few more additional limitations on top of the INSERT .. SELECT processing. This blog post dives deep on this topic. Merge command the same principles as INSERT .. SELECT processing. However, due to the nature of distributed systems, there are few more additional limitations on top of the INSERT .. SELECT processing. The [MERGE blog post](https://www.citusdata.com/blog/2023/07/27/how-citus-12-supports-postgres-merge/) dives deep on this topic.
TODO:
@ -1793,15 +1754,11 @@ There is one tricky behavior regarding transactions when there is a foreign key
Each client session makes “internal” connections to other nodes in the cluster. Connection management is an important part of our overall execution logic. The design largely comes from the need to achieve a few different goals: Each client session makes “internal” connections to other nodes in the cluster. Connection management is an important part of our overall execution logic. The design largely comes from the need to achieve a few different goals:
Cache and reuse connections for low latency. - Cache and reuse connections for low latency.
- Parallelize commands over multiple connections per node, to use multiple cores.
Parallelize commands over multiple connections per node, to use multiple cores. - Multi-statement transactions have locks and uncommitted state that is only visible over a particular connection. We therefore need to make sure that:
- After a write to a shard, any access to that shard group should use the same connection as the write. We need to cover the whole shard group because writes and locks can cascade to other shards in the shard group via foreign keys, and they might be used together in a join.
Multi-statement transactions have locks and uncommitted state that is only visible over a particular connection. We therefore need to make sure that: - After a write to a reference tables, any subsequent read of a reference table, including joins between distributed table shards and reference tables, should use the same connection as the write.
After a write to a shard, any access to that shard group should use the same connection as the write. We need to cover the whole shard group because writes and locks can cascade to other shards in the shard group via foreign keys, and they might be used together in a join.
After a write to a reference tables, any subsequent read of a reference table, including joins between distributed table shards and reference tables, should use the same connection as the write.
The key function that deals with this logic is GetConnectionIfPlacementAccessedInXact() The key function that deals with this logic is GetConnectionIfPlacementAccessedInXact()
@ -1828,9 +1785,8 @@ The primary goal of the connection management layer is not to solve all these pr
The connection management logic is divided into two parts: The connection management logic is divided into two parts:
connection_management.c tracks all connections for various purposes and concerns itself with connection establishment, caching, and error handling. - **connection_management.c** tracks all connections for various purposes and concerns itself with connection establishment, caching, and error handling.
- **placement_connections.c** concerns itself with finding the right connection for a given shard placement access based on preceding commands in the transaction.
placement_connections.c concerns itself with finding the right connection for a given shard placement access based on preceding commands in the transaction.
## Connection management ## Connection management
@ -1840,11 +1796,9 @@ placement_connections.c concerns itself with finding the right connection for a
The placement connection tracking logic stores which shard group placements were accessed over which connections during the current transactions, and whether they performed a SELECT, DML, or DDL. It considers whether to use same connection for accesses to the same shard group placement in the following cases: The placement connection tracking logic stores which shard group placements were accessed over which connections during the current transactions, and whether they performed a SELECT, DML, or DDL. It considers whether to use same connection for accesses to the same shard group placement in the following cases:
SELECT after SELECT - can use different connection - SELECT after SELECT - can use different connection
- DML after SELECT can use different connection
DML after SELECT can use different connection - All other cases must use same connection
All other cases must use same connection
We sometimes allow the same shard group placement to be accessed from different connections (first two cases). Consider a transaction that does a query on a reference table followed by a join between a distributed table and a reference table. Currently Citus would parallelize the second query, but that implicitly causes the reference table to be accessed from multiple connections. After that, we can still perform writes on the reference table (second case), because they do not conflict with the reads. However, we cannot perform most DDL commands involving the reference table because the locks would conflict with the reads, such that it would self-deadlock (blocked waiting for itself). We throw an error to prevent the self-deadlock and suggest set citus.multi_shard_modify_mode is sequential. Probably some DDL commands that take weaker locks would still be permissible, but we currently treat them all the same way. We sometimes allow the same shard group placement to be accessed from different connections (first two cases). Consider a transaction that does a query on a reference table followed by a join between a distributed table and a reference table. Currently Citus would parallelize the second query, but that implicitly causes the reference table to be accessed from multiple connections. After that, we can still perform writes on the reference table (second case), because they do not conflict with the reads. However, we cannot perform most DDL commands involving the reference table because the locks would conflict with the reads, such that it would self-deadlock (blocked waiting for itself). We throw an error to prevent the self-deadlock and suggest set citus.multi_shard_modify_mode is sequential. Probably some DDL commands that take weaker locks would still be permissible, but we currently treat them all the same way.
@ -1885,12 +1839,20 @@ Multi-node transactions provide atomicity, consistency, and durability guarantee
An example anomaly that can occur is two distributed transactions: An example anomaly that can occur is two distributed transactions:
Two inserts in a transaction block into two different shards Two inserts in a transaction block into two different shards
```sql ```sql
BEGIN; BEGIN;
INSERT INTO test (key, value) VALUES (1,2); INSERT INTO test (key, value) VALUES (1,2);
INSERT INTO test (key, value) VALUES (2,2); INSERT INTO test (key, value) VALUES (2,2);
END; END;
``` ```
An update across shards
```sql
UPDATE test SET value = 3 WHERE value = 2;
```
An update across shards An update across shards
```sql ```sql
UPDATE test SET value = 3 WHERE value = 2; UPDATE test SET value = 3 WHERE value = 2;
@ -1900,12 +1862,14 @@ If Citus provided serializability, there could only be 2 outcomes (a happens fir
This can happen because the inserts commit using a 2PC if the shards are on different nodes, and therefore they might not become visible at exactly the same time. Since the commits happen in parallel, there are no guarantees w.r.t. which insert becomes visible first. The update could see either insert as committed, or none, or both, depending on exact timings. Hence, there is no well-defined order between a and b, theye are intertwined. This can happen because the inserts commit using a 2PC if the shards are on different nodes, and therefore they might not become visible at exactly the same time. Since the commits happen in parallel, there are no guarantees w.r.t. which insert becomes visible first. The update could see either insert as committed, or none, or both, depending on exact timings. Hence, there is no well-defined order between a and b, theye are intertwined.
If the inserts depend on the update, there may be even more possible outcomes. For instance, if there is a unique constraint on (key, value), and we do upserts concurrently with the multi-shard update: If the inserts depend on the update, there may be even more possible outcomes. For instance, if there is a unique constraint on (key, value), and we do upserts concurrently with the multi-shard update:
```sql ```sql
BEGIN; BEGIN;
INSERT INTO test (key, value) VALUES (1,2) ON CONFLICT DO NOTHING; INSERT INTO test (key, value) VALUES (1,2) ON CONFLICT DO NOTHING;
INSERT INTO test (key, value) VALUES (2,2) ON CONFLICT DO NOTHING; INSERT INTO test (key, value) VALUES (2,2) ON CONFLICT DO NOTHING;
END; END;
``` ```
Now, whether the insert proceeds or does nothing depends on whether the update is already committed or not. Hence, this scenario has 6 possible outcomes. Now, whether the insert proceeds or does nothing depends on whether the update is already committed or not. Hence, this scenario has 6 possible outcomes.
It is hard for users to understand these semantics and their implications. Therefore, many database researchers and engineers have a strong preference for serializability. Having fewer possible outcomes means less potential for bugs and unintended situations. On the other hand, the performance impacts of snapshot isolation are generally significant, and we have not seen a lot of problems due to the lack of snapshot isolation in practice. The types of transactional workloads that scale well and therefore benefit from Citus are the types of workloads that scope their transactions to a single node and therefore get all the usual PostgreSQL guarantees. It is hard for users to understand these semantics and their implications. Therefore, many database researchers and engineers have a strong preference for serializability. Having fewer possible outcomes means less potential for bugs and unintended situations. On the other hand, the performance impacts of snapshot isolation are generally significant, and we have not seen a lot of problems due to the lack of snapshot isolation in practice. The types of transactional workloads that scale well and therefore benefit from Citus are the types of workloads that scope their transactions to a single node and therefore get all the usual PostgreSQL guarantees.
@ -1993,25 +1957,20 @@ Cleanup records always need to be committed before creating the actual object. I
# Logical decoding / CDC # Logical decoding / CDC
PostgreSQL supports change data capture (CDC) via the logical decoding interface. The basic idea behind logical decoding is that you make a replication connection (a special type of postgres connection), start replication, and then the backend process reads through the WAL and decodes the WAL records and emits it over the wire in a format defined by the output plugin. PostgreSQL supports change data capture (CDC) via the logical decoding interface. The basic idea behind logical decoding is that you make a replication connection (a special type of postgres connection), start replication, and then the backend process reads through the WAL and decodes the WAL records and emits it over the wire in a format defined by the output plugin. If we were to use regular logical decoding on the nodes of a Citus cluster, we would see the name of the shard in each write, and internal data transfers such as shard moves would result in inserts being emitted. We use several techniques to avoid this.
If we were to use regular logical decoding on the nodes of a Citus cluster, we would see the name of the shard in each write, and internal data transfers such as shard moves would result in inserts being emitted. All writes in PostgreSQL are marked with a replication origin (0 by default) and the decoder can make decisions on whether to emit the change based on the replication origin. We use this to filter out internal data transfers. If `citus.enable_change_data_capture` is enabled, all internal data transfers are marked with the special DoNotReplicateId replication origin by calling the `citus_internal_start_replication_origin_tracking()` UDF before writing the data. This replication origin ID is special in the sense that it does not need to be created (which prevents locking issues, especially when dropping replication origins). It is still up to output plugin to decide what to do with changes marked as DoNotReplicateId.
Within the decoder, all writes are marked with a replication origin (0 by default) and can make decisions on whether to emit the change based on the replication origin. We use this to filter out internal data transfers. We have very minimal control over replication commands like `CREATE_REPLICATION_SLOT`, since there are no direct hooks, and decoder names (e.g. “pgoutput”) are typically hard-coded in the client. The only method we found of overriding logical decoding behaviour is to overload the output plugin name in the dynamic library path.
If citus.enable_change_data_capture is enabled, all internal data transfers are marked with the special DoNotReplicateId replication origin by calling the citus_internal_start_replication_origin_tracking() UDF before writing the data. This replication origin ID is special in the sense that it does not need to be created (which prevents locking issues, especially when dropping replication origins). It is still up to output plugin to decide what to do with changes marked as DoNotReplicateId. For specific output plugins, we supply a wrapper .so that has the same name as the existing output plugin, but in a separate directory in the PostgreSQL installation folder, and we automatically prefix the `dynamic_library_path` with this folder such that PostgreSQL will load our wrapper. The wrapper internally loads the original output plugin, and calls the original output plugin functions after applying two transformations:
We have very minimal control over replication commands like CREATE_REPLICATION_SLOT, since there are no direct hooks, and decoder names (e.g. “pgoutput”) are typically hard-coded in the client. The only method we found of overriding logical decoding behaviour is to overload the output plugin name in the dynamic library path. - Shard OIDS are mapped to distributed table OIDS
- Changes marked with DoNotReplicateId are skipped
For specific output plugins, we supply a wrapper .so that has the same name as the existing output plugin, but in a separate directory in the PostgreSQL installation folder, and we automatically prefix the dynamic_library_path with this folder such that PostgreSQL will load our wrapper. The wrapper internally loads the original output plugin, and calls the original output plugin functions after applying two transformations:
Shard OIDS are mapped to distributed table OIDS
Changes marked with DoNotReplicateId are skipped
Mapping the shard OIDs to distributed table OIDs not only makes the output understandable for users, but also simplifies our implementation of the CREATE PUBLICATION command, which is used to configure the pgoutput plugin (used by logical replication). We create the same publication on all nodes using the same distributed table names. Since the original pgoutput plugin only sees changes to distributed tables, it can relate those to the set of distributed tables in the publication. Mapping the shard OIDs to distributed table OIDs not only makes the output understandable for users, but also simplifies our implementation of the CREATE PUBLICATION command, which is used to configure the pgoutput plugin (used by logical replication). We create the same publication on all nodes using the same distributed table names. Since the original pgoutput plugin only sees changes to distributed tables, it can relate those to the set of distributed tables in the publication.
We have to build a .so for each wrapper separately. We currently build wrappers for pgoutput and wal2json. We have to build a .so for each wrapper separately. We currently build wrappers for [pgoutput and wal2json](https://github.com/citusdata/citus/blob/main/src/backend/distributed/cdc/Makefile#L8).
This approach fulfills our main requirements, though we currently have a small correctness issue. Logical decoding always deals with a situation in the past, and to do so they build a historical snapshot of the PostgreSQL catalogs. Tables may have been renamed or dropped since the change happened, but the historical snapshot shows the schema as it was at the time of the change. However, we cannot build a historical snapshot of the Citus catalogs, and we therefore rely on the present values. The main issue that can arise is that the shard may have been dropped, in which case the change might be emitted using its original shard name, since its not recognized as a shard name. In many cases, this issue is avoided by caching the Citus catalogs. This approach fulfills our main requirements, though we currently have a small correctness issue. Logical decoding always deals with a situation in the past, and to do so they build a historical snapshot of the PostgreSQL catalogs. Tables may have been renamed or dropped since the change happened, but the historical snapshot shows the schema as it was at the time of the change. However, we cannot build a historical snapshot of the Citus catalogs, and we therefore rely on the present values. The main issue that can arise is that the shard may have been dropped, in which case the change might be emitted using its original shard name, since its not recognized as a shard name. In many cases, this issue is avoided by caching the Citus catalogs.
@ -2021,6 +1980,8 @@ An open issue with CDC is that there is no good way to get a consistent snapshot
When you have a multi-node cluster, clients should connect to each node and combine the changes. It is important to note that there are no guarantees with regard to when and in what order changes will be emitted between nodes. It is especially important to understand that changes cannot be reordered (e.g. based on timestamp or transaction ID), because only the node-level order is correct. The lack of distributed snapshot isolation in Citus means that changes can be interleaved (a happens before b on node 1, b happens before a on node 2). The node-level decoder output will reflect that as it happened. When you have a multi-node cluster, clients should connect to each node and combine the changes. It is important to note that there are no guarantees with regard to when and in what order changes will be emitted between nodes. It is especially important to understand that changes cannot be reordered (e.g. based on timestamp or transaction ID), because only the node-level order is correct. The lack of distributed snapshot isolation in Citus means that changes can be interleaved (a happens before b on node 1, b happens before a on node 2). The node-level decoder output will reflect that as it happened.
_Do not reorder changes based on timestamp or distributed transaction ID. It is not correct._
# Global PID # Global PID
The global PID (gpid) is used to give each client connection to the cluster a unique process identifier, and to understand which internal connections belong to a specific client connection. A gpid consists of the combination of the node ID and the PID of the coordinating process (i.e. the process serving a client connection). It can be seen in various monitoring views: The global PID (gpid) is used to give each client connection to the cluster a unique process identifier, and to understand which internal connections belong to a specific client connection. A gpid consists of the combination of the node ID and the PID of the coordinating process (i.e. the process serving a client connection). It can be seen in various monitoring views:
@ -2041,7 +2002,7 @@ Additional details: Monitor distributed Postgres activity with citus_stat_activi
One of the downsides of multi-statement transactions in a distributed database is the extra network round trips involved in each individual statement, even when each statement goes to the same worker node. In Citus this can be solved by marking a function or stored procedure as distributed. A distributed function has a distribution argument and can be co-located with distributed tables. It can be created using: One of the downsides of multi-statement transactions in a distributed database is the extra network round trips involved in each individual statement, even when each statement goes to the same worker node. In Citus this can be solved by marking a function or stored procedure as distributed. A distributed function has a distribution argument and can be co-located with distributed tables. It can be created using:
```sql ```sql
SELECT create_distributed_function(delivery(int,int), $1); SELECT create_distributed_function('delivery(int,int)', '$1');
``` ```
When a distributed function is called, the argument is treated as a distribution column filter on a co-located distributed table and delegated to the worker node that stores the corresponding shards. Ideally, every statement in the function uses the distribution argument as a distribution column filter and only accesses co-located tables, such that the transaction remains local to the worker node. Otherwise, the worker assumes the role of coordinator and performs a distributed transaction. Function call delegation is especially useful in multi-tenant applications that involve complex transactions, as those transactions can be handled in a single network round-trip and with almost no overhead on the coordinator. When a distributed function is called, the argument is treated as a distribution column filter on a co-located distributed table and delegated to the worker node that stores the corresponding shards. Ideally, every statement in the function uses the distribution argument as a distribution column filter and only accesses co-located tables, such that the transaction remains local to the worker node. Otherwise, the worker assumes the role of coordinator and performs a distributed transaction. Function call delegation is especially useful in multi-tenant applications that involve complex transactions, as those transactions can be handled in a single network round-trip and with almost no overhead on the coordinator.