Over time we have added significantly improved the support for objects to be propagated by Citus as to make scaling out the database more seamless. It became evident that there was a lot of code duplication that got into the codebase to implement the propagation.
This PR tries to reduce the amount of repeated code that is at most only slightly different. To make things worse, most of the differences were actually oversights instead of correct.
This Patch introduces 3 reusable sets of pre/post processing steps for respectively
- create
- alter
- drop
With the use of the common functionality we should have more coherent behaviour between different supported object by Citus.
Some steps either omit the Pre or Post processing step if they would not make sense to include.
All tests pass, only 1 test needed changing, foreign servers, as the dropping of foreign servers didn't implement support for dropping multiple foreign servers at once. Given the common approach correctly supports dropping of multiple objects, either distributed or not, the test that assumed it wouldn't work was now obsolete.
DESCRIPTION: Add GUC to control ddl creation behaviour in transactions
Historically we would _not_ propagate objects when we are in a transaction block. Creation of distributed tables would not always work in sequential mode, hence objects created in the same transaction as distributing a table that would use the just created object wouldn't work. The benefit was that the user could still benefit from parallelism.
Now that the creation of distributed tables is supported in sequential mode it would make sense for users to force transactional consistency of ddl commands for distributed tables. A transaction could switch more aggressively to sequential mode when creating new objects in a transaction.
We don't change the default behaviour just yet.
Also, many objects would not even propagate their creation when the transaction was already set to sequential, leaving the probability of a self deadlock. The new policy checks solve this discrepancy between objects as well.
With this commit, rebalancer backends are identified by application_name = citus_rebalancer
and the regular internal backends are identified by application_name = citus_internal
In addition to starting a new transaction, we also need to tell other
backends --including the ones spawned for connections opened to
localhost to build indexes on shards of this relation-- that concurrent
index builds can safely ignore us.
Normally, DefineIndex() only does that if index doesn't have any
predicates (i.e.: where clause) and no index expressions at all.
However, now that we already called standard process utility, index
build on the shell table is finished anyway.
The reason behind doing so is that we cannot guarantee not grabbing any
snapshots via adaptive executor, and the backends creating indexes on
local shards (if any) might block on waiting for current xact of the
current backend to finish, which would cause self deadlocks that are not
detectable.
Before that PR we were updating citus.pg_dist_object metadata, which keeps
the metadata related to objects on Citus, only on the coordinator node. In
order to allow using those object from worker nodes (or erroring out with
proper error message) we've started to propagate that metedata to worker
nodes as well.
In the past, we allowed users to manually switch to 1PC
(e.g., one phase commit). However, with this commit, we
don't. All multi-shard modifications are done via 2PC.
Add/fix tests
Fix creating partitions
Add test for mx - partition creating case
Enable cascading to partitioned tables
Fix mx partition adding test
Fix cascading through fkeys
Style
Disable converting with non-inherited fkeys
Fix detach bug
Early return in case of cascade & Add tests
Style
Fix undistribute_table bug & Fix test outputs
Remove RemovePartitionRelationIds
Test with undistribute_table
Add test for mx+convert+undistribute
Remove redundant usage of CreatePartitionedCitusLocalTable
Add some comments
Introduce bulk functions for generating attach/detach partition commands
Fix: Convert partitioned tables after adding fkey
Change the error message for partitions
Introduce function ErrorIfPartitionTableAddedToMetadata
Polish attach/detach command generation functions
Use time_partitions for testing
Move mx tests to citus_local_tables_mx
Add new partitioned table to cascade test
Add test with time series management UDFs
Fix test output
Fix: Assertion fail on relation access tracking
Style
Refactor creating partitioned citus local tables
Remove CreatePartitionedCitusLocalTable
Style
Error out if converting multi-level table
Revert some old tests
Error out adding partitioned partition
Polish
Polish/address
Fix create table partition of case
Use CascadeOperationForRelationIdList if no cascade needed
Fix create partition bug
Revert / Add new tests to mx
Style
Fix dropping fkey bug
Add test with IF NOT EXISTS
Convert to CLT when doing ATTACH PARTITION
Add comments
Add more tests with time series management
Edit the error message for converting the child
Use OR instead of AND in ErrorIfUnsupportedAlterTableStmt
Edit/improve tests
Disable ddl prop when dropping default column definitions
Disable/enable ddl prop just before/after the command
Add comment
Add sequence test
Add trigger test
Remove NeedCascadeViaForeignKeys
Add one more insert to sequence test
Add comment
Style
Fix test output shard ids
Update comments
Disable creating fkey on partitions
Move partition check to CreateCitusLocalTable
Add comment
Add check for attachingmulti-level partition
Add test for pg_constraint
Check pg_dist_partition in tests
Add test inserting on the worker
New macros: standard_ProcessUtility_compat, ProcessUtility_compat, ColumnarProcessUtility_compat, PrevProcessUtilityHook_compat
The functions now have a new bool parameter: readOnlyTree
These new macros give us the ability to use this new parameter for PG14 and it doesn't give the parameter for previous versions
In multi_ProcessUtility and ColumnarProcessUtility, before doing anything else, we check if readOnlyTree parameter is true and create a copy of pstmt
Existing readOnlyTree parameters are set to false since we already handle the read only case at multi_ProcessUtility and ColumnarProcessUtility
Relevant PG commit:
7c337b6b527b7052e6a751f966d5734c56f668b5
AlterTableStmt's relkind field is changed into objtype
New AlterTableStmtObjType macro uses the appropriate one
Relevant PG commit:
cc35d8933a211d9965eb1c1d2749a903d5735db2
ConnParams(AuthInfo and PoolInfo) gets a snapshot, which will block the
remote connectinos to localhost. And the release of snapshot will be
blocked by the snapshot. This leads to a deadlock.
We warm up the conn params hash before starting a new transaction so
that the entries will already be there when we start a new transaction.
Hence GetConnParams will not get a snapshot.
For certaion purposes, we drop and recreate the foreign
keys. As we acquire exclusive locks on the tables in between
drop and re-create, we can safely skip validation phase of
the foreign keys. The reason is purely being performance as
foreign key validation could take a long value.
* Rethrow original concurrent index creation failure message
* Alter test outputs for concurrent index creation
* Detect duplicate table failure in concurrent index creation
* Add test for conc. index creation w/out duplicates
When distributing a columnar table, as well as changing options on a distributed columnar table, this patch will forward the settings from the coordinator to the workers.
For propagating options changes on an already distributed table this change is pretty straight forward. Before applying the change in options locally we will create a `DDLJob` that contains a call to `alter_columnar_table_set(...)` for every shard placement with all settings of the current table. This goes both for setting an option as well as resetting. This will reset the values to the defaults configured on the coordinator. Having the effect that the coordinator is authoritative on the settings and makes sure the shards have the same settings set as the table on the coordinator.
When a columnar table is distributed it is using the `TableDDLCommand` infra structure to create a new kind of `TableDDLCommand`. This new type, called a `TableDDLCommandFunction` contains a context and 2 function pointers to execute. One function returns the command as applied on the table, the second function will return the sql command to apply to a shard with a given shard id. The schema name is ignored as it will use the fully qualified name of the shard in the same schema as the base table.
This commit brings following features:
Foreign key support from citus local tables to reference tables
* Foreign key support from reference tables to citus local tables
(only with RESTRICT & NO ACTION behavior)
* ALTER TABLE ENABLE/DISABLE trigger command support
* CREATE/DROP/ALTER trigger command support
and disallows:
* ALTER TABLE ATTACH/DETACH PARTITION commands
* CREATE TABLE <postgres table> ATTACH PARTITION <citus local table>
commands
* Foreign keys from postgres tables to citus local tables
(the other way was already disallowed)
for citus local tables.
This commit mostly adds pg_get_triggerdef_command to our ruleutils_13.
This doesn't add anything extra for ruleutils 13 so it is basically a copy
of the change on ruleutils_12
Postgres introduced QueryCompletion struct. Hence a compat utility is
added to finish query completion for older versions and pg >= 13.
The commit on Postgres side:
2f9661311b83dc481fc19f6e3bda015392010a40
We should check the remove type in IsDropCitusStmt because if the remove
type is not OBJECT_EXTENSION then the stored objects in
dropStmt->objects may not be of type Value. This was crashing PG-13.
Also rename the method as IsDropCitusExtensionStmt.
This PR removes ExecuteUtilityTaskListWithoutResults and uses the same
path for local execution via ExecuteTaskListExtended.
ExecuteUtilityTaskList is added. ExecuteLocalTaskListExtended now has a
parameter for utility commands so that it can call the right method. In
order not to change the existing calls,
ExecuteTaskListExtendedInternal is added, which is the main method that
runs the execution, via local and remote execution.
DESCRIPTION: Alter role only works for citus managed roles
Alter role was implemented before we implemented good role management that hooks into the object propagation framework. This is a refactor of all alter role commands that have been implemented to
- be on by default
- only work for supported roles
- make the citus extension owner a supported role
Instead of distributing the alter role commands for roles at the beginning of the node activation role it now _only_ executes the alter role commands for all users in all databases and in the current database.
In preparation of full role support small refactors have been done in the deparser.
Earlier tests targeting other roles than the citus extension owner have been either slightly changed or removed to be put back where we have full role support.
Fixes#2549
With this commit, we're introducing a new infrastructure to throttle
connections to the worker nodes. This infrastructure is useful for
multi-shard queries, router queries are have not been affected by this.
The goal is to prevent establishing more than citus.max_shared_pool_size
number of connections per worker node in total, across sessions.
To do that, we've introduced a new connection flag OPTIONAL_CONNECTION.
The idea is that some connections are optional such as the second
(and further connections) for the adaptive executor. A single connection
is enough to finish the distributed execution, the others are useful to
execute the query faster. Thus, they can be consider as optional connections.
When an optional connection is not allowed to the adaptive executor, it
simply skips it and continues the execution with the already established
connections. However, it'll keep retrying to establish optional
connections, in case some slots are open again.
For shardplacements, we were setting nodeid, nodename, nodeport and
nodegroup manually. This makes it very error prone, and it seems that we
already forgot to set some of them. This would mean that they would have
their default values, e.g group id would be 0 when its group id is not
0.
So the implication is that we would have inconsistent worker metadata.
A new method is introduced, and we call the method to set those fields
now, so that as long as we call this method, we won't be setting
inconsistent metadata.
It probably makes sense to have a struct for these fields. We already
have NodeMetadata but it doesn't have nodename or nodeport. So that
could be done over another refactor to make things simpler.
In PostgreSQL, user defaults for config parameters can be changed by
ALTER ROLE .. SET statements. We wish to propagate those defaults
accross the Citus cluster so that the behaviour will be similar in
different workers.
The defaults can either be set in a specific database, or the whole
cluster, similarly they can be set for a single role or all roles.
We propagate the ALTER ROLE .. SET if all the conditions below are met:
- The query affects the current database, or all databases
- The user is already created in worker nodes
Sometimes we have concatenated query strings for a task. However,
when we want to find each query string, it is not a trivial task.
Therefore, it makes sense to store this in task so that when we need
each query string we can easily get it.
* reimplement ExecuteUtilityTaskListWithoutResults for local utility command execution
* introduce new functions for local execution of utility commands
* change ErrorIfTransactionAccessedPlacementsLocally logic for local utility command execution
* enable local execution for TRUNCATE command on distributed & reference tables
* update existing tests for local utility command execution
* enable local execution for DDL commands on distributed & reference tables
* enable local execution for DROP command on distributed & reference tables
* add normalization rules for cascaded commands
* add new tests for local utility command execution
Semmle reported quite some places where we use a value that could be NULL. Most of these are not actually a real issue, but better to be on the safe side with these things and make the static analysis happy.
- Stop the daemon when citus extension is dropped
- Bail on maintenance daemon startup if myDbData is started with a non-zero pid
- Stop maintenance daemon from spawning itself
- Don't use postgres die, just wrap proc_exit(0)
- Assert(myDbData->workerPid == MyProcPid)
The two issues were that multiple daemons could be running for a database,
or that a daemon would be leftover after DROP EXTENSION citus
Mark existing objects that are not included in distributed object infrastructure
in older versions of Citus (but now should be) as distributed, after updating
Citus successfully.
* Update shardPlacement->nodeId to uint
As the source of the shardPlacement->nodeId is always workerNode->nodeId,
and that is uint32.
We had this hack because of: 0ea4e52df5 (r266421409)
And, that is gone with: 90056f7d3c (diff-c532177d74c72d3f0e7cd10e448ab3c6L1123)
So, we're safe to do it now.
* Relax the restrictions on using the local execution
Previously, whenever any local execution happens, we disabled further
commands to do any remote queries. The basic motivation for doing that
is to prevent any accesses in the same transaction block to access the
same placements over multiple sessions: one is local session the other
is remote session to the same placement.
However, the current implementation does not distinguish local accesses
being to a placement or not. For example, we could have local accesses
that only touches intermediate results. In that case, we should not
implement the same restrictions as they become useless.
So, this is a pre-requisite for executing the intermediate result only
queries locally.
* Update the error messages
As the underlying implementation has changed, reflect it in the error
messages.
* Keep track of connections to local node
With this commit, we're adding infrastructure to track if any connection
to the same local host is done or not.
The main motivation for doing this is that we've previously were more
conservative about not choosing local execution. Simply, we disallowed
local execution if any connection to any remote node is done. However,
if we want to use local execution for intermediate result only queries,
this'd be annoying because we expect all queries to touch remote node
before the final query.
Note that this approach is still limiting in Citus MX case, but for now
we can ignore that.
* Formalize the concept of Local Node
Also some minor refactoring while creating the dummy placement
* Write intermediate results locally when the results are only needed locally
Before this commit, Citus used to always broadcast all the intermediate
results to remote nodes. However, it is possible to skip pushing
the results to remote nodes always.
There are two notable cases for doing that:
(a) When the query consists of only intermediate results
(b) When the query is a zero shard query
In both of the above cases, we don't need to access any data on the shards. So,
it is a valuable optimization to skip pushing the results to remote nodes.
The pattern mentioned in (a) is actually a common patterns that Citus users
use in practice. For example, if you have the following query:
WITH cte_1 AS (...), cte_2 AS (....), ... cte_n (...)
SELECT ... FROM cte_1 JOIN cte_2 .... JOIN cte_n ...;
The final query could be operating only on intermediate results. With this patch,
the intermediate results of the ctes are not unnecessarily pushed to remote
nodes.
* Add specific regression tests
As there are edge cases in Citus MX and with round-robin policy,
use the same queries on those cases as well.
* Fix failure tests
By forcing not to use local execution for intermediate results since
all the tests expects the results to be pushed remotely.
* Fix flaky test
* Apply code-review feedback
Mostly style changes
* Limit the max value of pg_dist_node_seq to reserve for internal use
Deparsing and parsing a query can be heavy on CPU. When locally executing
the query we don't need to do this in theory most of the time.
This PR is the first step in allowing to skip deparsing and parsing
the query in these cases, by lazily creating the query string and
storing the query in the task. Future commits will make use of this and
not deparse and parse the query anymore, but use the one from the task
directly.
* Remove unused executor codes
All of the codes of real-time executor. Some functions
in router executor still remains there because there
are common functions. We'll move them to accurate places
in the follow-up commits.
* Move GUCs to transaction mngnt and remove unused struct
* Update test output
* Get rid of references of real-time executor from code
* Warn if real-time executor is picked
* Remove lots of unused connection codes
* Removed unused code for connection restrictions
Real-time and router executors cannot handle re-using of the existing
connections within a transaction block.
Adaptive executor and COPY can re-use the connections. So, there is no
reason to keep the code around for applying the restrictions in the
placement connection logic.
DESCRIPTION: Disallow distributed functions for functions depending on an extension
Functions depending on an extension cannot (yet) be distributed by citus. If we would allow this it would cause issues with our dependency following mechanism as we stop following objects depending on an extension.
By not allowing functions to be distributed when they depend on an extension as well as not allowing to make distributed functions depend on an extension we won't break the ability to add new nodes. Allowing functions depending on extensions to be distributed at the moment could cause problems in that area.
DESCRIPTION: Propagate CREATE OR REPLACE FUNCTION
Distributed functions could be replaced, which should be propagated to the workers to keep the function in sync between all nodes.
Due to the complexity of deparsing the `CreateFunctionStmt` we actually produce the plan during the processing phase of our utilityhook. Since the changes have already been made in the catalog tables we can reuse `pg_get_functiondef` to get us the generated `CREATE OR REPLACE` sql.
DESCRIPTION: Propagate ALTER FUNCTION statements for distributed functions
Using the implemented deparser for function statements to propagate changes to both functions and procedures that are previously distributed.
When a function is marked as colocated with a distributed table,
we try delegating queries of kind "SELECT func(...)" to workers.
We currently only support this simple form, and don't delegate
forms like "SELECT f1(...), f2(...)", "SELECT f1(...) FROM ...",
or function calls inside transactions.
As a side effect, we also fix the transactional semantics of DO blocks.
Previously we didn't consider a DO block a multi-statement transaction.
Now we do.
Co-authored-by: Marco Slot <marco@citusdata.com>
Co-authored-by: serprex <serprex@users.noreply.github.com>
Co-authored-by: pykello <hadi.moshayedi@microsoft.com>
We started copying parse trees by default further on in `multi_ProcessUtility`. That's not a problem for maintenance command, but might register for things like `PREPARE` and `EXECUTE`, which might happen thousands of times per second. Add a few common commands to the check at the start.
DESCRIPTION: Distribute Types to worker nodes
When to propagate
==============
There are two logical moments that types could be distributed to the worker nodes
- When they get used ( just in time distribution )
- When they get created ( proactive distribution )
The just in time distribution follows the model used by how schema's get created right before we are going to create a table in that schema, for types this would be when the table uses a type as its column.
The proactive distribution is suitable for situations where it is benificial to have the type on the worker nodes directly. They can later on be used in queries where an intermediate result gets created with a cast to this type.
Just in time creation is always the last resort, you cannot create a distributed table before the type gets created. A good example use case is; you have an existing postgres server that needs to scale out. By adding the citus extension, add some nodes to the cluster, and distribute the table. The type got created before citus existed. There was no moment where citus could have propagated the creation of a type.
Proactive is almost always a good option. Types are not resource intensive objects, there is no performance overhead of having 100's of types. If you want to use them in a query to represent an intermediate result (which happens in our test suite) they just work.
There is however a moment when proactive type distribution is not beneficial; in transactions where the type is used in a distributed table.
Lets assume the following transaction:
```sql
BEGIN;
CREATE TYPE tt1 AS (a int, b int);
CREATE TABLE t1 AS (a int PRIMARY KEY, b tt1);
SELECT create_distributed_table('t1', 'a');
\copy t1 FROM bigdata.csv
```
Types are node scoped objects; meaning the type exists once per worker. Shards however have best performance when they are created over their own connection. For the type to be visible on all connections it needs to be created and committed before we try to create the shards. Here the just in time situation is most beneficial and follows how we create schema's on the workers. Outside of a transaction block we will just use 1 connection to propagate the creation.
How propagation works
=================
Just in time
-----------
Just in time propagation hooks into the infrastructure introduced in #2882. It adds types as a supported object in `SupportedDependencyByCitus`. This will make sure that any object being distributed by citus that depends on types will now cascade into types. When types are depending them self on other objects they will get created first.
Creation later works by getting the ddl commands to create the object by its `ObjectAddress` in `GetDependencyCreateDDLCommands` which will dispatch types to `CreateTypeDDLCommandsIdempotent`.
For the correct walking of the graph we follow array types, when later asked for the ddl commands for array types we return `NIL` (empty list) which makes that the object will not be recorded as distributed, (its an internal type, dependant on the user type).
Proactive distribution
---------------------
When the user creates a type (composite or enum) we will have a hook running in `multi_ProcessUtility` after the command has been applied locally. Running after running locally makes that we already have an `ObjectAddress` for the type. This is required to mark the type as being distributed.
Keeping the type up to date
====================
For types that are recorded in `pg_dist_object` (eg. `IsObjectDistributed` returns true for the `ObjectAddress`) we will intercept the utility commands that alter the type.
- `AlterTableStmt` with `relkind` set to `OBJECT_TYPE` encapsulate changes to the fields of a composite type.
- `DropStmt` with removeType set to `OBJECT_TYPE` encapsulate `DROP TYPE`.
- `AlterEnumStmt` encapsulates changes to enum values.
Enum types can not be changed transactionally. When the execution on a worker fails a warning will be shown to the user the propagation was incomplete due to worker communication failure. An idempotent command is shown for the user to re-execute when the worker communication is fixed.
Keeping types up to date is done via the executor. Before the statement is executed locally we create a plan on how to apply it on the workers. This plan is executed after we have applied the statement locally.
All changes to types need to be done in the same transaction for types that have already been distributed and will fail with an error if parallel queries have already been executed in the same transaction. Much like foreign keys to reference tables.
DESCRIPTION: Fix schema leak on CREATE INDEX statement
When a CREATE INDEX is cached between execution we might leak the schema name onto the cached statement of an earlier execution preventing the right index to be created.
Even though the cache is cleared when the search_path changes we can trigger this behaviour by having the schema already on the search path before a colliding table is created in a schema earlier on the `search_path`. When calling an unqualified create index via a function (used to trigger the caching behaviour) we see that the index is created on the wrong table after the schema leaked onto the statement.
By copying the complete `PlannedStmt` and `utilityStmt` during our planning phase for distributed ddls we make sure we are not leaking the schema name onto a cached data structure.
Caveat; COPY statements already have a lot of parsestree copying ongoing without directly putting it back on the `pstmt`. We should verify that copies modify the statement and potentially copy the complete `pstmt` there already.
/*
* local_executor.c
*
* The scope of the local execution is locally executing the queries on the
* shards. In other words, local execution does not deal with any local tables
* that are not shards on the node that the query is being executed. In that sense,
* the local executor is only triggered if the node has both the metadata and the
* shards (e.g., only Citus MX worker nodes).
*
* The goal of the local execution is to skip the unnecessary network round-trip
* happening on the node itself. Instead, identify the locally executable tasks and
* simply call PostgreSQL's planner and executor.
*
* The local executor is an extension of the adaptive executor. So, the executor uses
* adaptive executor's custom scan nodes.
*
* One thing to note that Citus MX is only supported with replication factor = 1, so
* keep that in mind while continuing the comments below.
*
* On the high level, there are 3 slightly different ways of utilizing local execution:
*
* (1) Execution of local single shard queries of a distributed table
*
* This is the simplest case. The executor kicks at the start of the adaptive
* executor, and since the query is only a single task the execution finishes
* without going to the network at all.
*
* Even if there is a transaction block (or recursively planned CTEs), as long
* as the queries hit the shards on the same, the local execution will kick in.
*
* (2) Execution of local single queries and remote multi-shard queries
*
* The rule is simple. If a transaction block starts with a local query execution,
* all the other queries in the same transaction block that touch any local shard
* have to use the local execution. Although this sounds restrictive, we prefer to
* implement in this way, otherwise we'd end-up with as complex scenarious as we
* have in the connection managements due to foreign keys.
*
* See the following example:
* BEGIN;
* -- assume that the query is executed locally
* SELECT count(*) FROM test WHERE key = 1;
*
* -- at this point, all the shards that reside on the
* -- node is executed locally one-by-one. After those finishes
* -- the remaining tasks are handled by adaptive executor
* SELECT count(*) FROM test;
*
*
* (3) Modifications of reference tables
*
* Modifications to reference tables have to be executed on all nodes. So, after the
* local execution, the adaptive executor keeps continuing the execution on the other
* nodes.
*
* Note that for read-only queries, after the local execution, there is no need to
* kick in adaptive executor.
*
* There are also few limitations/trade-offs that is worth mentioning. First, the
* local execution on multiple shards might be slow because the execution has to
* happen one task at a time (e.g., no parallelism). Second, if a transaction
* block/CTE starts with a multi-shard command, we do not use local query execution
* since local execution is sequential. Basically, we do not want to lose parallelism
* across local tasks by switching to local execution. Third, the local execution
* currently only supports queries. In other words, any utility commands like TRUNCATE,
* fails if the command is executed after a local execution inside a transaction block.
* Forth, the local execution cannot be mixed with the executors other than adaptive,
* namely task-tracker, real-time and router executors. Finally, related with the
* previous item, COPY command cannot be mixed with local execution in a transaction.
* The implication of that any part of INSERT..SELECT via coordinator cannot happen
* via the local execution.
*/