DESCRIPTION: satisfy static analysis tool for a nullptr dereference
During the static analysis project on the codebase this code has been flagged as having the potential for a null pointer dereference. Funnily enough the author had already made a comment of it in the code this was not possible due to us setting the schema name before we pass in the statement. If we want to reuse this code in a later setting this comment might not always apply and we could actually run into null pointer dereference.
This patch changes a bit of the code around to first of all make sure there is no NULL pointer dereference in this code anymore.
Secondly we allow for better deparsing by setting and adhering to the `if_not_exists` flag on the statement.
And finally add support for all syntax described in the documentation of postgres (FROM was missing).
Makees VacuumTaskList function even with other TaskList creator functions.
Also, previously we were generating per-shard vacuum command strings via
unconventional usage of StringInfo struct (setting the stringInfo->len field
manually) which could cause unexepected memory errors (that I cannot foresee now).
If the generated column does not come at the end of the column list,
columnNameList doesn't line up with the column indexes. Seek past
CREATE TABLE test_table (
test_id int PRIMARY KEY,
gen_n int GENERATED ALWAYS AS (1) STORED,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
SELECT create_distributed_table('test_table', 'test_id');
Would raise ERROR: cannot cast 23 to 1184
Semmle reported quite some places where we use a value that could be NULL. Most of these are not actually a real issue, but better to be on the safe side with these things and make the static analysis happy.
- Stop the daemon when citus extension is dropped
- Bail on maintenance daemon startup if myDbData is started with a non-zero pid
- Stop maintenance daemon from spawning itself
- Don't use postgres die, just wrap proc_exit(0)
- Assert(myDbData->workerPid == MyProcPid)
The two issues were that multiple daemons could be running for a database,
or that a daemon would be leftover after DROP EXTENSION citus
Mark existing objects that are not included in distributed object infrastructure
in older versions of Citus (but now should be) as distributed, after updating
Citus successfully.
* Update shardPlacement->nodeId to uint
As the source of the shardPlacement->nodeId is always workerNode->nodeId,
and that is uint32.
We had this hack because of: 0ea4e52df5 (r266421409)
And, that is gone with: 90056f7d3c (diff-c532177d74c72d3f0e7cd10e448ab3c6L1123)
So, we're safe to do it now.
* Relax the restrictions on using the local execution
Previously, whenever any local execution happens, we disabled further
commands to do any remote queries. The basic motivation for doing that
is to prevent any accesses in the same transaction block to access the
same placements over multiple sessions: one is local session the other
is remote session to the same placement.
However, the current implementation does not distinguish local accesses
being to a placement or not. For example, we could have local accesses
that only touches intermediate results. In that case, we should not
implement the same restrictions as they become useless.
So, this is a pre-requisite for executing the intermediate result only
queries locally.
* Update the error messages
As the underlying implementation has changed, reflect it in the error
messages.
* Keep track of connections to local node
With this commit, we're adding infrastructure to track if any connection
to the same local host is done or not.
The main motivation for doing this is that we've previously were more
conservative about not choosing local execution. Simply, we disallowed
local execution if any connection to any remote node is done. However,
if we want to use local execution for intermediate result only queries,
this'd be annoying because we expect all queries to touch remote node
before the final query.
Note that this approach is still limiting in Citus MX case, but for now
we can ignore that.
* Formalize the concept of Local Node
Also some minor refactoring while creating the dummy placement
* Write intermediate results locally when the results are only needed locally
Before this commit, Citus used to always broadcast all the intermediate
results to remote nodes. However, it is possible to skip pushing
the results to remote nodes always.
There are two notable cases for doing that:
(a) When the query consists of only intermediate results
(b) When the query is a zero shard query
In both of the above cases, we don't need to access any data on the shards. So,
it is a valuable optimization to skip pushing the results to remote nodes.
The pattern mentioned in (a) is actually a common patterns that Citus users
use in practice. For example, if you have the following query:
WITH cte_1 AS (...), cte_2 AS (....), ... cte_n (...)
SELECT ... FROM cte_1 JOIN cte_2 .... JOIN cte_n ...;
The final query could be operating only on intermediate results. With this patch,
the intermediate results of the ctes are not unnecessarily pushed to remote
nodes.
* Add specific regression tests
As there are edge cases in Citus MX and with round-robin policy,
use the same queries on those cases as well.
* Fix failure tests
By forcing not to use local execution for intermediate results since
all the tests expects the results to be pushed remotely.
* Fix flaky test
* Apply code-review feedback
Mostly style changes
* Limit the max value of pg_dist_node_seq to reserve for internal use
Deparsing and parsing a query can be heavy on CPU. When locally executing
the query we don't need to do this in theory most of the time.
This PR is the first step in allowing to skip deparsing and parsing
the query in these cases, by lazily creating the query string and
storing the query in the task. Future commits will make use of this and
not deparse and parse the query anymore, but use the one from the task
directly.
DESCRIPTION: Fixes a problem when adding a new node due to tables referenced in a functions body
Fixes#3378
It was reported that `master_add_node` would fail if a distributed function has a table name referenced in its declare section of the body. By default postgres validates the body of a function on creation. This is not a problem in the normal case as tables are replicated to the workers when we distribute functions.
However when a new node is added we first create dependencies on the workers before we try to create any tables, and the original tables get created out of bound when the metadata gets synced to the new node. This causes the function body validator to raise an error the table is not on the worker.
To mitigate this issue we set `check_function_bodies` to `off` right before we are creating the function.
The added test shows this does resolve the issue. (issue can be reproduced on the commit without the fix)
In two places I've made code more straight forward by using ROUTINE in our own codegen
Two changes which may seem extraneous:
AppendFunctionName was updated to not use pg_get_function_identity_arguments.
This is because that function includes ORDER BY when printing an aggregate like my_rank.
While ALTER AGGREGATE my_rank(x "any" ORDER BY y "any") is accepted by postgres,
ALTER ROUTINE my_rank(x "any" ORDER BY y "any") is not.
Tests were updated to use macaddr over integer. Using integer is flaky, our logic
could sometimes end up on tables like users_table. I originally wanted to use money,
but money isn't hashable.
* WIP
* wip
* add basic logic to run a single job with repartioning joins with adaptive executor
* fix some warnings and return in ExecuteDependedTasks if there is none
* Add the logic to run depended jobs in adaptive executor
The execution of depended tasks logic is changed. With the current
logic:
- All tasks are created from the top level task list.
- At one iteration:
- CurTasks whose dependencies are executed are found.
- CurTasks are executed in parallel with adapter executor main
logic.
- The iteration is repeated until all tasks are completed.
* Separate adaptive executor repartioning logic
* Remove duplicate parts
* cleanup directories and schemas
* add basic repartion tests for adaptive executor
* Use the first placement to fetch data
In task tracker, when there are replicas, we try to fetch from a replica
for which a map task is succeeded. TaskExecution is used for this,
however TaskExecution is not used in adaptive executor. So we cannot use
the same thing as task tracker.
Since adaptive executor fails when a map task fails (There is no retry
logic yet). We know that if we try to execute a fetch task, all of its
map tasks already succeeded, so we can just use the first one to fetch
from.
* fix clean directories logic
* do not change the search path while creating a udf
* Enable repartition joins with adaptive executor with only enable_reparitition_joins guc
* Add comments to adaptive_executor_repartition
* dont run adaptive executor repartition test in paralle with other tests
* execute cleanup only in the top level execution
* do cleanup only in the top level ezecution
* not begin a transaction if repartition query is used
* use new connections for repartititon specific queries
New connections are opened to send repartition specific queries. The
opened connections will be closed at the FinishDistributedExecution.
While sending repartition queries no transaction is begun so that
we can see all changes.
* error if a modification was done prior to repartition execution
* not start a transaction if a repartition query and sql task, and clean temporary files and schemas at each subplan level
* fix cleanup logic
* update tests
* add missing function comments
* add test for transaction with DDL before repartition query
* do not close repartition connections in adaptive executor
* rollback instead of commit in repartition join test
* use close connection instead of shutdown connection
* remove unnecesary connection list, ensure schema owner before removing directory
* rename ExecuteTaskListRepartition
* put fetch query string in planner not executor as we currently support only replication factor = 1 with adaptive executor and repartition query and we know the query string in the planner phase in that case
* split adaptive executor repartition to DAG execution logic and repartition logic
* apply review items
* apply review items
* use an enum for remote transaction state and fix cleanup for repartition
* add outside transaction flag to find connections that are unclaimed instead of always opening a new transaction
* fix style
* wip
* rename removejobdir to partition cleanup
* do not close connections at the end of repartition queries
* do repartition cleanup in pg catch
* apply review items
* decide whether to use transaction or not at execution creation
* rename isOutsideTransaction and add missing comment
* not error in pg catch while doing cleanup
* use replication factor of the creation time, not current time to decide if task tracker should be chosen
* apply review items
* apply review items
* apply review item
Use partition column's collation for range distributed tables
Don't allow non deterministic collations for hash distributed tables
CoPartitionedTables: don't compare unequal types
* Improve extension command propagation tests
* patch for hardcoded citus extension name
(cherry picked from commit 0bb3dbac0afabda10e8928f9c17eda048dc4361a)
* Remove unused executor codes
All of the codes of real-time executor. Some functions
in router executor still remains there because there
are common functions. We'll move them to accurate places
in the follow-up commits.
* Move GUCs to transaction mngnt and remove unused struct
* Update test output
* Get rid of references of real-time executor from code
* Warn if real-time executor is picked
* Remove lots of unused connection codes
* Removed unused code for connection restrictions
Real-time and router executors cannot handle re-using of the existing
connections within a transaction block.
Adaptive executor and COPY can re-use the connections. So, there is no
reason to keep the code around for applying the restrictions in the
placement connection logic.
This completely hides `ListCell` to the user of the loop
Example usage:
```c
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) {
// Do stuff with workerNode
}
```
Instead of:
```c
ListCell *workerNodeCell = NULL;
foreach(cell, workerNodeList) {
WorkerNode *workerNode = lfirst(workerNodeCell);
// Do stuff with workerNode
}
```
DESCRIPTION: Fix order for enum values and correctly support pg12
PG 12 introduces `ALTER TYPE ... ADD VALUE ...` during transactions. Earlier versions would error out when called in a transaction, hence we connect to workers outside of the transaction which could cause inconsistencies on pg12 now that postgres doesn't error with this syntax anymore.
During the implementation of this fix it became apparent there was an error with the ordering of enum labels when the type was recreated. A patch and test have been included.
Objectives:
(a) both super user and regular user should have the correct owner for the function on the worker
(b) The transactional semantics would work fine for both super user and regular user
(c) non-super-user and non-function owner would get a reasonable error message if tries to distribute the function
Co-authored-by: @serprex
DESCRIPTION: Disallow distributed functions for functions depending on an extension
Functions depending on an extension cannot (yet) be distributed by citus. If we would allow this it would cause issues with our dependency following mechanism as we stop following objects depending on an extension.
By not allowing functions to be distributed when they depend on an extension as well as not allowing to make distributed functions depend on an extension we won't break the ability to add new nodes. Allowing functions depending on extensions to be distributed at the moment could cause problems in that area.
DESCRIPTION: Propagate CREATE OR REPLACE FUNCTION
Distributed functions could be replaced, which should be propagated to the workers to keep the function in sync between all nodes.
Due to the complexity of deparsing the `CreateFunctionStmt` we actually produce the plan during the processing phase of our utilityhook. Since the changes have already been made in the catalog tables we can reuse `pg_get_functiondef` to get us the generated `CREATE OR REPLACE` sql.
DESCRIPTION: Propagate ALTER FUNCTION statements for distributed functions
Using the implemented deparser for function statements to propagate changes to both functions and procedures that are previously distributed.
When a function is marked as colocated with a distributed table,
we try delegating queries of kind "SELECT func(...)" to workers.
We currently only support this simple form, and don't delegate
forms like "SELECT f1(...), f2(...)", "SELECT f1(...) FROM ...",
or function calls inside transactions.
As a side effect, we also fix the transactional semantics of DO blocks.
Previously we didn't consider a DO block a multi-statement transaction.
Now we do.
Co-authored-by: Marco Slot <marco@citusdata.com>
Co-authored-by: serprex <serprex@users.noreply.github.com>
Co-authored-by: pykello <hadi.moshayedi@microsoft.com>
We started copying parse trees by default further on in `multi_ProcessUtility`. That's not a problem for maintenance command, but might register for things like `PREPARE` and `EXECUTE`, which might happen thousands of times per second. Add a few common commands to the check at the start.
Since the distributed functions are useful when the workers have
metadata, we automatically sync it.
Also, after master_add_node(). We do it lazily and let the deamon
sync it. That's mainly because the metadata syncing cannot be done
in transaction blocks, and we don't want to add lots of transactional
limitations to master_add_node() and create_distributed_function().
With this commit, we're changing the API for create_distributed_function()
such that users can provide the distribution argument and the colocation
information.
DESCRIPTION: Provide a GUC to turn of the new dependency propagation functionality
In the case the dependency propagation functionality introduced in 9.0 causes issues to a cluster of a user they can turn it off almost completely. The only dependency that will still be propagated and kept track of is the schema to emulate the old behaviour.
GUC to change is `citus.enable_object_propagation`. When set to `false` the functionality will be mostly turned off. Be aware that objects marked as distributed in `pg_dist_object` will still be kept in the catalog as a distributed object. Alter statements to these objects will not be propagated to workers and may cause desynchronisation.
DESCRIPTION: Rename remote types during type propagation
To prevent data to be destructed when a remote type differs from the type on the coordinator during type propagation we wanted to rename the type instead of `DROP CASCADE`.
This patch removes the `DROP` logic and adds the creation of a rename statement to a free name.
DESCRIPTION: Add feature flag to turn off create type propagation
When `citus.enable_create_type_propagation` is set to `false` citus will not propagate `CREATE TYPE` statements to the workers. Types are still distributed when tables that depend on these types are distributed.
This PR aims to add the minimal set of changes required to start
distributing functions. You can use create_distributed_function(regproc)
UDF to distribute a function.
SELECT create_distributed_function('add(int,int)');
The function definition should include the param types to properly
identify the correct function that we wish to distribute
DESCRIPTION: Distribute Types to worker nodes
When to propagate
==============
There are two logical moments that types could be distributed to the worker nodes
- When they get used ( just in time distribution )
- When they get created ( proactive distribution )
The just in time distribution follows the model used by how schema's get created right before we are going to create a table in that schema, for types this would be when the table uses a type as its column.
The proactive distribution is suitable for situations where it is benificial to have the type on the worker nodes directly. They can later on be used in queries where an intermediate result gets created with a cast to this type.
Just in time creation is always the last resort, you cannot create a distributed table before the type gets created. A good example use case is; you have an existing postgres server that needs to scale out. By adding the citus extension, add some nodes to the cluster, and distribute the table. The type got created before citus existed. There was no moment where citus could have propagated the creation of a type.
Proactive is almost always a good option. Types are not resource intensive objects, there is no performance overhead of having 100's of types. If you want to use them in a query to represent an intermediate result (which happens in our test suite) they just work.
There is however a moment when proactive type distribution is not beneficial; in transactions where the type is used in a distributed table.
Lets assume the following transaction:
```sql
BEGIN;
CREATE TYPE tt1 AS (a int, b int);
CREATE TABLE t1 AS (a int PRIMARY KEY, b tt1);
SELECT create_distributed_table('t1', 'a');
\copy t1 FROM bigdata.csv
```
Types are node scoped objects; meaning the type exists once per worker. Shards however have best performance when they are created over their own connection. For the type to be visible on all connections it needs to be created and committed before we try to create the shards. Here the just in time situation is most beneficial and follows how we create schema's on the workers. Outside of a transaction block we will just use 1 connection to propagate the creation.
How propagation works
=================
Just in time
-----------
Just in time propagation hooks into the infrastructure introduced in #2882. It adds types as a supported object in `SupportedDependencyByCitus`. This will make sure that any object being distributed by citus that depends on types will now cascade into types. When types are depending them self on other objects they will get created first.
Creation later works by getting the ddl commands to create the object by its `ObjectAddress` in `GetDependencyCreateDDLCommands` which will dispatch types to `CreateTypeDDLCommandsIdempotent`.
For the correct walking of the graph we follow array types, when later asked for the ddl commands for array types we return `NIL` (empty list) which makes that the object will not be recorded as distributed, (its an internal type, dependant on the user type).
Proactive distribution
---------------------
When the user creates a type (composite or enum) we will have a hook running in `multi_ProcessUtility` after the command has been applied locally. Running after running locally makes that we already have an `ObjectAddress` for the type. This is required to mark the type as being distributed.
Keeping the type up to date
====================
For types that are recorded in `pg_dist_object` (eg. `IsObjectDistributed` returns true for the `ObjectAddress`) we will intercept the utility commands that alter the type.
- `AlterTableStmt` with `relkind` set to `OBJECT_TYPE` encapsulate changes to the fields of a composite type.
- `DropStmt` with removeType set to `OBJECT_TYPE` encapsulate `DROP TYPE`.
- `AlterEnumStmt` encapsulates changes to enum values.
Enum types can not be changed transactionally. When the execution on a worker fails a warning will be shown to the user the propagation was incomplete due to worker communication failure. An idempotent command is shown for the user to re-execute when the worker communication is fixed.
Keeping types up to date is done via the executor. Before the statement is executed locally we create a plan on how to apply it on the workers. This plan is executed after we have applied the statement locally.
All changes to types need to be done in the same transaction for types that have already been distributed and will fail with an error if parallel queries have already been executed in the same transaction. Much like foreign keys to reference tables.
DESCRIPTION: Fix schema leak on CREATE INDEX statement
When a CREATE INDEX is cached between execution we might leak the schema name onto the cached statement of an earlier execution preventing the right index to be created.
Even though the cache is cleared when the search_path changes we can trigger this behaviour by having the schema already on the search path before a colliding table is created in a schema earlier on the `search_path`. When calling an unqualified create index via a function (used to trigger the caching behaviour) we see that the index is created on the wrong table after the schema leaked onto the statement.
By copying the complete `PlannedStmt` and `utilityStmt` during our planning phase for distributed ddls we make sure we are not leaking the schema name onto a cached data structure.
Caveat; COPY statements already have a lot of parsestree copying ongoing without directly putting it back on the `pstmt`. We should verify that copies modify the statement and potentially copy the complete `pstmt` there already.
/*
* local_executor.c
*
* The scope of the local execution is locally executing the queries on the
* shards. In other words, local execution does not deal with any local tables
* that are not shards on the node that the query is being executed. In that sense,
* the local executor is only triggered if the node has both the metadata and the
* shards (e.g., only Citus MX worker nodes).
*
* The goal of the local execution is to skip the unnecessary network round-trip
* happening on the node itself. Instead, identify the locally executable tasks and
* simply call PostgreSQL's planner and executor.
*
* The local executor is an extension of the adaptive executor. So, the executor uses
* adaptive executor's custom scan nodes.
*
* One thing to note that Citus MX is only supported with replication factor = 1, so
* keep that in mind while continuing the comments below.
*
* On the high level, there are 3 slightly different ways of utilizing local execution:
*
* (1) Execution of local single shard queries of a distributed table
*
* This is the simplest case. The executor kicks at the start of the adaptive
* executor, and since the query is only a single task the execution finishes
* without going to the network at all.
*
* Even if there is a transaction block (or recursively planned CTEs), as long
* as the queries hit the shards on the same, the local execution will kick in.
*
* (2) Execution of local single queries and remote multi-shard queries
*
* The rule is simple. If a transaction block starts with a local query execution,
* all the other queries in the same transaction block that touch any local shard
* have to use the local execution. Although this sounds restrictive, we prefer to
* implement in this way, otherwise we'd end-up with as complex scenarious as we
* have in the connection managements due to foreign keys.
*
* See the following example:
* BEGIN;
* -- assume that the query is executed locally
* SELECT count(*) FROM test WHERE key = 1;
*
* -- at this point, all the shards that reside on the
* -- node is executed locally one-by-one. After those finishes
* -- the remaining tasks are handled by adaptive executor
* SELECT count(*) FROM test;
*
*
* (3) Modifications of reference tables
*
* Modifications to reference tables have to be executed on all nodes. So, after the
* local execution, the adaptive executor keeps continuing the execution on the other
* nodes.
*
* Note that for read-only queries, after the local execution, there is no need to
* kick in adaptive executor.
*
* There are also few limitations/trade-offs that is worth mentioning. First, the
* local execution on multiple shards might be slow because the execution has to
* happen one task at a time (e.g., no parallelism). Second, if a transaction
* block/CTE starts with a multi-shard command, we do not use local query execution
* since local execution is sequential. Basically, we do not want to lose parallelism
* across local tasks by switching to local execution. Third, the local execution
* currently only supports queries. In other words, any utility commands like TRUNCATE,
* fails if the command is executed after a local execution inside a transaction block.
* Forth, the local execution cannot be mixed with the executors other than adaptive,
* namely task-tracker, real-time and router executors. Finally, related with the
* previous item, COPY command cannot be mixed with local execution in a transaction.
* The implication of that any part of INSERT..SELECT via coordinator cannot happen
* via the local execution.
*/
DESCRIPTION: Refactor ensure schema exists to dependency exists
Historically we only supported schema's as table dependencies to be created on the workers before a table gets distributed. This PR puts infrastructure in place to walk pg_depend to figure out which dependencies to create on the workers. Currently only schema's are supported as objects to create before creating a table.
We also keep track of dependencies that have been created in the cluster. When we add a new node to the cluster we use this catalog to know which objects need to be created on the worker.
Side effect of knowing which objects are already distributed is that we don't have debug messages anymore when creating schema's that are already created on the workers.
Before this commit, we've recorded the relation accesses in 3 different
places
- FindPlacementListConnection -- applies all executor in tx block
- StartPlacementExecutionOnSession() -- adaptive executor only
- StartPlacementListConnection() -- router/real-time only
This is different than Citus 8.2, and could lead to query execution times
increase considerably on multi-shard commands in transaction block
that are on partitioned tables.
Benchmarks:
```
1+8 c5.4xlarge cluster
Empty distributed partitioned table with 365 partitions: https://gist.github.com/onderkalaci/1edace4ed6bd6f061c8a15594865bb51#file-partitions_365-sql
./pgbench -f /tmp/multi_shard.sql -c10 -j10 -P 1 -T 120 postgres://citus:w3r6KLJpv3mxe9E-NIUeJw@c.fy5fkjcv45vcepaogqcaskmmkee.db.citusdata.com:5432/citus?sslmode=require
cat /tmp/multi_shard.sql
BEGIN;
DELETE FROM collections_list;
DELETE FROM collections_list;
DELETE FROM collections_list;
COMMIT;
cat /tmp/single_shard.sql
BEGIN;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list WHERE key = :aid;
COMMIT;
cat /tmp/mix.sql
BEGIN;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list;
DELETE FROM collections_list;
DELETE FROM collections_list;
COMMIT;
```
The table shows `latency average` of pgbench runs explained above, so we have a pretty solid improvement even over 8.2.2.
| Test | Citus 8.2.2 | Citus 8.3.1 | Citus 8.3.2 (this branch) | Citus 8.3.1 (FKEYs disabled via GUC) |
| ------------- | ------------- | ------------- |------------- | ------------- |
|multi_shard | 2370.083 ms |3605.040 ms |1324.094 ms |1247.255 ms |
| single_shard | 85.338 ms |120.934 ms |73.216 ms | 78.765 ms |
| mix | 2434.459 ms | 3727.080 ms |1306.456 ms | 1280.326 ms |