* Remove unused executor codes
All of the codes of real-time executor. Some functions
in router executor still remains there because there
are common functions. We'll move them to accurate places
in the follow-up commits.
* Move GUCs to transaction mngnt and remove unused struct
* Update test output
* Get rid of references of real-time executor from code
* Warn if real-time executor is picked
* Remove lots of unused connection codes
* Removed unused code for connection restrictions
Real-time and router executors cannot handle re-using of the existing
connections within a transaction block.
Adaptive executor and COPY can re-use the connections. So, there is no
reason to keep the code around for applying the restrictions in the
placement connection logic.
We've changed the logic for pulling RTE_RELATIONs in #3109 and
non-colocated subquery joins and partitioned tables.
@onurctirtir found this steps where I traced back and found the issues.
While looking into it in more detail, we decided to expand the list in a
way that the callers get all the relevant RTE_RELATIONs RELKIND_RELATION,
RELKIND_PARTITIONED_TABLE, RELKIND_FOREIGN_TABLE and RELKIND_MATVIEW.
These are all relation kinds that Citus planner is aware of.
This completely hides `ListCell` to the user of the loop
Example usage:
```c
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) {
// Do stuff with workerNode
}
```
Instead of:
```c
ListCell *workerNodeCell = NULL;
foreach(cell, workerNodeList) {
WorkerNode *workerNode = lfirst(workerNodeCell);
// Do stuff with workerNode
}
```
This is an improvement over #2512.
This adds the boolean shouldhaveshards column to pg_dist_node. When it's false, create_distributed_table for new collocation groups will not create shards on that node. Reference tables will still be created on nodes where it is false.
Areas for further optimization:
- Don't save subquery results to a local file on the coordinator when the subquery is not in the having clause
- Push the the HAVING with subquery to the workers if there's a group by on the distribution column
- Don't push down the results to the workers when we don't push down the HAVING clause, only the coordinator needs it
Fixes#520Fixes#756Closes#2047
Objectives:
(a) both super user and regular user should have the correct owner for the function on the worker
(b) The transactional semantics would work fine for both super user and regular user
(c) non-super-user and non-function owner would get a reasonable error message if tries to distribute the function
Co-authored-by: @serprex
DESCRIPTION: Disallow distributed functions for functions depending on an extension
Functions depending on an extension cannot (yet) be distributed by citus. If we would allow this it would cause issues with our dependency following mechanism as we stop following objects depending on an extension.
By not allowing functions to be distributed when they depend on an extension as well as not allowing to make distributed functions depend on an extension we won't break the ability to add new nodes. Allowing functions depending on extensions to be distributed at the moment could cause problems in that area.
DESCRIPTION: Propagate CREATE OR REPLACE FUNCTION
Distributed functions could be replaced, which should be propagated to the workers to keep the function in sync between all nodes.
Due to the complexity of deparsing the `CreateFunctionStmt` we actually produce the plan during the processing phase of our utilityhook. Since the changes have already been made in the catalog tables we can reuse `pg_get_functiondef` to get us the generated `CREATE OR REPLACE` sql.
DESCRIPTION: Propagate ALTER FUNCTION statements for distributed functions
Using the implemented deparser for function statements to propagate changes to both functions and procedures that are previously distributed.
This PR aims to add all the necessary logic to qualify and deparse all possible `{ALTER|DROP} .. {FUNCTION|PROCEDURE}` queries.
As Procedures are introduced in PG11, the code contains many PG version checks. I tried my best to make it easy to clean up once we drop PG10 support.
Here are some caveats:
- I assumed that the parse tree is a valid one. There are some queries that are not allowed, but still are parsed successfully by postgres planner. Such queries will result in errors in execution time. (e.g. `ALTER PROCEDURE p STRICT` -> `STRICT` action is valid for functions but not procedures. Postgres decides to parse them nevertheless.)
When a function is marked as colocated with a distributed table,
we try delegating queries of kind "SELECT func(...)" to workers.
We currently only support this simple form, and don't delegate
forms like "SELECT f1(...), f2(...)", "SELECT f1(...) FROM ...",
or function calls inside transactions.
As a side effect, we also fix the transactional semantics of DO blocks.
Previously we didn't consider a DO block a multi-statement transaction.
Now we do.
Co-authored-by: Marco Slot <marco@citusdata.com>
Co-authored-by: serprex <serprex@users.noreply.github.com>
Co-authored-by: pykello <hadi.moshayedi@microsoft.com>
Since the distributed functions are useful when the workers have
metadata, we automatically sync it.
Also, after master_add_node(). We do it lazily and let the deamon
sync it. That's mainly because the metadata syncing cannot be done
in transaction blocks, and we don't want to add lots of transactional
limitations to master_add_node() and create_distributed_function().
DESCRIPTION: Provide a GUC to turn of the new dependency propagation functionality
In the case the dependency propagation functionality introduced in 9.0 causes issues to a cluster of a user they can turn it off almost completely. The only dependency that will still be propagated and kept track of is the schema to emulate the old behaviour.
GUC to change is `citus.enable_object_propagation`. When set to `false` the functionality will be mostly turned off. Be aware that objects marked as distributed in `pg_dist_object` will still be kept in the catalog as a distributed object. Alter statements to these objects will not be propagated to workers and may cause desynchronisation.
DESCRIPTION: Rename remote types during type propagation
To prevent data to be destructed when a remote type differs from the type on the coordinator during type propagation we wanted to rename the type instead of `DROP CASCADE`.
This patch removes the `DROP` logic and adds the creation of a rename statement to a free name.
DESCRIPTION: Add feature flag to turn off create type propagation
When `citus.enable_create_type_propagation` is set to `false` citus will not propagate `CREATE TYPE` statements to the workers. Types are still distributed when tables that depend on these types are distributed.
This PR simply adds the columns to pg_dist_object and
implements the necessary metadata changes to keep track of
distribution argument of the functions/procedures.
DESCRIPTION: Distribute Types to worker nodes
When to propagate
==============
There are two logical moments that types could be distributed to the worker nodes
- When they get used ( just in time distribution )
- When they get created ( proactive distribution )
The just in time distribution follows the model used by how schema's get created right before we are going to create a table in that schema, for types this would be when the table uses a type as its column.
The proactive distribution is suitable for situations where it is benificial to have the type on the worker nodes directly. They can later on be used in queries where an intermediate result gets created with a cast to this type.
Just in time creation is always the last resort, you cannot create a distributed table before the type gets created. A good example use case is; you have an existing postgres server that needs to scale out. By adding the citus extension, add some nodes to the cluster, and distribute the table. The type got created before citus existed. There was no moment where citus could have propagated the creation of a type.
Proactive is almost always a good option. Types are not resource intensive objects, there is no performance overhead of having 100's of types. If you want to use them in a query to represent an intermediate result (which happens in our test suite) they just work.
There is however a moment when proactive type distribution is not beneficial; in transactions where the type is used in a distributed table.
Lets assume the following transaction:
```sql
BEGIN;
CREATE TYPE tt1 AS (a int, b int);
CREATE TABLE t1 AS (a int PRIMARY KEY, b tt1);
SELECT create_distributed_table('t1', 'a');
\copy t1 FROM bigdata.csv
```
Types are node scoped objects; meaning the type exists once per worker. Shards however have best performance when they are created over their own connection. For the type to be visible on all connections it needs to be created and committed before we try to create the shards. Here the just in time situation is most beneficial and follows how we create schema's on the workers. Outside of a transaction block we will just use 1 connection to propagate the creation.
How propagation works
=================
Just in time
-----------
Just in time propagation hooks into the infrastructure introduced in #2882. It adds types as a supported object in `SupportedDependencyByCitus`. This will make sure that any object being distributed by citus that depends on types will now cascade into types. When types are depending them self on other objects they will get created first.
Creation later works by getting the ddl commands to create the object by its `ObjectAddress` in `GetDependencyCreateDDLCommands` which will dispatch types to `CreateTypeDDLCommandsIdempotent`.
For the correct walking of the graph we follow array types, when later asked for the ddl commands for array types we return `NIL` (empty list) which makes that the object will not be recorded as distributed, (its an internal type, dependant on the user type).
Proactive distribution
---------------------
When the user creates a type (composite or enum) we will have a hook running in `multi_ProcessUtility` after the command has been applied locally. Running after running locally makes that we already have an `ObjectAddress` for the type. This is required to mark the type as being distributed.
Keeping the type up to date
====================
For types that are recorded in `pg_dist_object` (eg. `IsObjectDistributed` returns true for the `ObjectAddress`) we will intercept the utility commands that alter the type.
- `AlterTableStmt` with `relkind` set to `OBJECT_TYPE` encapsulate changes to the fields of a composite type.
- `DropStmt` with removeType set to `OBJECT_TYPE` encapsulate `DROP TYPE`.
- `AlterEnumStmt` encapsulates changes to enum values.
Enum types can not be changed transactionally. When the execution on a worker fails a warning will be shown to the user the propagation was incomplete due to worker communication failure. An idempotent command is shown for the user to re-execute when the worker communication is fixed.
Keeping types up to date is done via the executor. Before the statement is executed locally we create a plan on how to apply it on the workers. This plan is executed after we have applied the statement locally.
All changes to types need to be done in the same transaction for types that have already been distributed and will fail with an error if parallel queries have already been executed in the same transaction. Much like foreign keys to reference tables.
/*
* local_executor.c
*
* The scope of the local execution is locally executing the queries on the
* shards. In other words, local execution does not deal with any local tables
* that are not shards on the node that the query is being executed. In that sense,
* the local executor is only triggered if the node has both the metadata and the
* shards (e.g., only Citus MX worker nodes).
*
* The goal of the local execution is to skip the unnecessary network round-trip
* happening on the node itself. Instead, identify the locally executable tasks and
* simply call PostgreSQL's planner and executor.
*
* The local executor is an extension of the adaptive executor. So, the executor uses
* adaptive executor's custom scan nodes.
*
* One thing to note that Citus MX is only supported with replication factor = 1, so
* keep that in mind while continuing the comments below.
*
* On the high level, there are 3 slightly different ways of utilizing local execution:
*
* (1) Execution of local single shard queries of a distributed table
*
* This is the simplest case. The executor kicks at the start of the adaptive
* executor, and since the query is only a single task the execution finishes
* without going to the network at all.
*
* Even if there is a transaction block (or recursively planned CTEs), as long
* as the queries hit the shards on the same, the local execution will kick in.
*
* (2) Execution of local single queries and remote multi-shard queries
*
* The rule is simple. If a transaction block starts with a local query execution,
* all the other queries in the same transaction block that touch any local shard
* have to use the local execution. Although this sounds restrictive, we prefer to
* implement in this way, otherwise we'd end-up with as complex scenarious as we
* have in the connection managements due to foreign keys.
*
* See the following example:
* BEGIN;
* -- assume that the query is executed locally
* SELECT count(*) FROM test WHERE key = 1;
*
* -- at this point, all the shards that reside on the
* -- node is executed locally one-by-one. After those finishes
* -- the remaining tasks are handled by adaptive executor
* SELECT count(*) FROM test;
*
*
* (3) Modifications of reference tables
*
* Modifications to reference tables have to be executed on all nodes. So, after the
* local execution, the adaptive executor keeps continuing the execution on the other
* nodes.
*
* Note that for read-only queries, after the local execution, there is no need to
* kick in adaptive executor.
*
* There are also few limitations/trade-offs that is worth mentioning. First, the
* local execution on multiple shards might be slow because the execution has to
* happen one task at a time (e.g., no parallelism). Second, if a transaction
* block/CTE starts with a multi-shard command, we do not use local query execution
* since local execution is sequential. Basically, we do not want to lose parallelism
* across local tasks by switching to local execution. Third, the local execution
* currently only supports queries. In other words, any utility commands like TRUNCATE,
* fails if the command is executed after a local execution inside a transaction block.
* Forth, the local execution cannot be mixed with the executors other than adaptive,
* namely task-tracker, real-time and router executors. Finally, related with the
* previous item, COPY command cannot be mixed with local execution in a transaction.
* The implication of that any part of INSERT..SELECT via coordinator cannot happen
* via the local execution.
*/
DESCRIPTION: Refactor ensure schema exists to dependency exists
Historically we only supported schema's as table dependencies to be created on the workers before a table gets distributed. This PR puts infrastructure in place to walk pg_depend to figure out which dependencies to create on the workers. Currently only schema's are supported as objects to create before creating a table.
We also keep track of dependencies that have been created in the cluster. When we add a new node to the cluster we use this catalog to know which objects need to be created on the worker.
Side effect of knowing which objects are already distributed is that we don't have debug messages anymore when creating schema's that are already created on the workers.
* Add tuplestore helpers
* More detailed error messages in tuplestore
* Add CreateTupleDescCopy to SetupTuplestore
* Use new SetupTuplestore helper function
* Remove unnecessary copy
* Remove comment about undefined behaviour
See a9c35cf85c
clang raises a warning due to FunctionCall2InfoData technically being variable sized
This is fine, as the struct is the size we want it to be. So silence the warning
master_deactivate_node is updated to decrement the replication factor
Otherwise deactivation could have create_reference_table produce a second record
UpdateColocationGroupReplicationFactor is renamed UpdateColocationGroupReplicationFactorForReferenceTables
& the implementation looks up the record based on distributioncolumntype == InvalidOid, rather than by id
Otherwise the record's replication factor fails to be maintained when there are no reference tables
Before this commit, we've recorded the relation accesses in 3 different
places
- FindPlacementListConnection -- applies all executor in tx block
- StartPlacementExecutionOnSession() -- adaptive executor only
- StartPlacementListConnection() -- router/real-time only
This is different than Citus 8.2, and could lead to query execution times
increase considerably on multi-shard commands in transaction block
that are on partitioned tables.
Benchmarks:
```
1+8 c5.4xlarge cluster
Empty distributed partitioned table with 365 partitions: https://gist.github.com/onderkalaci/1edace4ed6bd6f061c8a15594865bb51#file-partitions_365-sql
./pgbench -f /tmp/multi_shard.sql -c10 -j10 -P 1 -T 120 postgres://citus:w3r6KLJpv3mxe9E-NIUeJw@c.fy5fkjcv45vcepaogqcaskmmkee.db.citusdata.com:5432/citus?sslmode=require
cat /tmp/multi_shard.sql
BEGIN;
DELETE FROM collections_list;
DELETE FROM collections_list;
DELETE FROM collections_list;
COMMIT;
cat /tmp/single_shard.sql
BEGIN;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list WHERE key = :aid;
COMMIT;
cat /tmp/mix.sql
BEGIN;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list;
DELETE FROM collections_list;
DELETE FROM collections_list;
COMMIT;
```
The table shows `latency average` of pgbench runs explained above, so we have a pretty solid improvement even over 8.2.2.
| Test | Citus 8.2.2 | Citus 8.3.1 | Citus 8.3.2 (this branch) | Citus 8.3.1 (FKEYs disabled via GUC) |
| ------------- | ------------- | ------------- |------------- | ------------- |
|multi_shard | 2370.083 ms |3605.040 ms |1324.094 ms |1247.255 ms |
| single_shard | 85.338 ms |120.934 ms |73.216 ms | 78.765 ms |
| mix | 2434.459 ms | 3727.080 ms |1306.456 ms | 1280.326 ms |
This causes no behaviorial changes, only organizes better to implement modifying CTEs
Also rename ExtactInsertRangeTableEntry to ExtractResultRelationRTE,
as the source of this function didn't match the documentation
Remove Task's upsertQuery in favor of ROW_MODIFY_NONCOMMUTATIVE
Split up AcquireExecutorShardLock into more internal functions
Tests: Normalize multi_reference_table multi_create_table_constraints
With this commit, we're introducing the Adaptive Executor.
The commit message consists of two distinct sections. The first part explains
how the executor works. The second part consists of the commit messages of
the individual smaller commits that resulted in this commit. The readers
can search for the each of the smaller commit messages on
https://github.com/citusdata/citus and can learn more about the history
of the change.
/*-------------------------------------------------------------------------
*
* adaptive_executor.c
*
* The adaptive executor executes a list of tasks (queries on shards) over
* a connection pool per worker node. The results of the queries, if any,
* are written to a tuple store.
*
* The concepts in the executor are modelled in a set of structs:
*
* - DistributedExecution:
* Execution of a Task list over a set of WorkerPools.
* - WorkerPool
* Pool of WorkerSessions for the same worker which opportunistically
* executes "unassigned" tasks from a queue.
* - WorkerSession:
* Connection to a worker that is used to execute "assigned" tasks
* from a queue and may execute unasssigned tasks from the WorkerPool.
* - ShardCommandExecution:
* Execution of a Task across a list of placements.
* - TaskPlacementExecution:
* Execution of a Task on a specific placement.
* Used in the WorkerPool and WorkerSession queues.
*
* Every connection pool (WorkerPool) and every connection (WorkerSession)
* have a queue of tasks that are ready to execute (readyTaskQueue) and a
* queue/set of pending tasks that may become ready later in the execution
* (pendingTaskQueue). The tasks are wrapped in a ShardCommandExecution,
* which keeps track of the state of execution and is referenced from a
* TaskPlacementExecution, which is the data structure that is actually
* added to the queues and describes the state of the execution of a task
* on a particular worker node.
*
* When the task list is part of a bigger distributed transaction, the
* shards that are accessed or modified by the task may have already been
* accessed earlier in the transaction. We need to make sure we use the
* same connection since it may hold relevant locks or have uncommitted
* writes. In that case we "assign" the task to a connection by adding
* it to the task queue of specific connection (in
* AssignTasksToConnections). Otherwise we consider the task unassigned
* and add it to the task queue of a worker pool, which means that it
* can be executed over any connection in the pool.
*
* A task may be executed on multiple placements in case of a reference
* table or a replicated distributed table. Depending on the type of
* task, it may not be ready to be executed on a worker node immediately.
* For instance, INSERTs on a reference table are executed serially across
* placements to avoid deadlocks when concurrent INSERTs take conflicting
* locks. At the beginning, only the "first" placement is ready to execute
* and therefore added to the readyTaskQueue in the pool or connection.
* The remaining placements are added to the pendingTaskQueue. Once
* execution on the first placement is done the second placement moves
* from pendingTaskQueue to readyTaskQueue. The same approach is used to
* fail over read-only tasks to another placement.
*
* Once all the tasks are added to a queue, the main loop in
* RunDistributedExecution repeatedly does the following:
*
* For each pool:
* - ManageWorkPool evaluates whether to open additional connections
* based on the number unassigned tasks that are ready to execute
* and the targetPoolSize of the execution.
*
* Poll all connections:
* - We use a WaitEventSet that contains all (non-failed) connections
* and is rebuilt whenever the set of active connections or any of
* their wait flags change.
*
* We almost always check for WL_SOCKET_READABLE because a session
* can emit notices at any time during execution, but it will only
* wake up WaitEventSetWait when there are actual bytes to read.
*
* We check for WL_SOCKET_WRITEABLE just after sending bytes in case
* there is not enough space in the TCP buffer. Since a socket is
* almost always writable we also use WL_SOCKET_WRITEABLE as a
* mechanism to wake up WaitEventSetWait for non-I/O events, e.g.
* when a task moves from pending to ready.
*
* For each connection that is ready:
* - ConnectionStateMachine handles connection establishment and failure
* as well as command execution via TransactionStateMachine.
*
* When a connection is ready to execute a new task, it first checks its
* own readyTaskQueue and otherwise takes a task from the worker pool's
* readyTaskQueue (on a first-come-first-serve basis).
*
* In cases where the tasks finish quickly (e.g. <1ms), a single
* connection will often be sufficient to finish all tasks. It is
* therefore not necessary that all connections are established
* successfully or open a transaction (which may be blocked by an
* intermediate pgbouncer in transaction pooling mode). It is therefore
* essential that we take a task from the queue only after opening a
* transaction block.
*
* When a command on a worker finishes or the connection is lost, we call
* PlacementExecutionDone, which then updates the state of the task
* based on whether we need to run it on other placements. When a
* connection fails or all connections to a worker fail, we also call
* PlacementExecutionDone for all queued tasks to try the next placement
* and, if necessary, mark shard placements as inactive. If a task fails
* to execute on all placements, the execution fails and the distributed
* transaction rolls back.
*
* For multi-row INSERTs, tasks are executed sequentially by
* SequentialRunDistributedExecution instead of in parallel, which allows
* a high degree of concurrency without high risk of deadlocks.
* Conversely, multi-row UPDATE/DELETE/DDL commands take aggressive locks
* which forbids concurrency, but allows parallelism without high risk
* of deadlocks. Note that this is unrelated to SEQUENTIAL_CONNECTION,
* which indicates that we should use at most one connection per node, but
* can run tasks in parallel across nodes. This is used when there are
* writes to a reference table that has foreign keys from a distributed
* table.
*
* Execution finishes when all tasks are done, the query errors out, or
* the user cancels the query.
*
*-------------------------------------------------------------------------
*/
All the commits involved here:
* Initial unified executor prototype
* Latest changes
* Fix rebase conflicts to master branch
* Add missing variable for assertion
* Ensure that master_modify_multiple_shards() returns the affectedTupleCount
* Adjust intermediate result sizes
The real-time executor uses COPY command to get the results
from the worker nodes. Unified executor avoids that which
results in less data transfer. Simply adjust the tests to lower
sizes.
* Force one connection per placement (or co-located placements) when requested
The existing executors (real-time and router) always open 1 connection per
placement when parallel execution is requested.
That might be useful under certain circumstances:
(a) User wants to utilize as much as CPUs on the workers per
distributed query
(b) User has a transaction block which involves COPY command
Also, lots of regression tests rely on this execution semantics.
So, we'd enable few of the tests with this change as well.
* For parameters to be resolved before using them
For the details, see PostgreSQL's copyParamList()
* Unified executor sorts the returning output
* Ensure that unified executor doesn't ignore sequential execution of DDLJob's
Certain DDL commands, mainly creating foreign keys to reference tables,
should be executed sequentially. Otherwise, we'd end up with a self
distributed deadlock.
To overcome this situaiton, we set a flag `DDLJob->executeSequentially`
and execute it sequentially. Note that we have to do this because
the command might not be called within a transaction block, and
we cannot call `SetLocalMultiShardModifyModeToSequential()`.
This fixes at least two test: multi_insert_select_on_conflit.sql and
multi_foreign_key.sql
Also, I wouldn't mind scattering local `targetPoolSize` variables within
the code. The reason is that we'll soon have a GUC (or a global
variable based on a GUC) that'd set the pool size. In that case, we'd
simply replace `targetPoolSize` with the global variables.
* Fix 2PC conditions for DDL tasks
* Improve closing connections that are not fully established in unified execution
* Support foreign keys to reference tables in unified executor
The idea for supporting foreign keys to reference tables is simple:
Keep track of the relation accesses within a transaction block.
- If a parallel access happens on a distributed table which
has a foreign key to a reference table, one cannot modify
the reference table in the same transaction. Otherwise,
we're very likely to end-up with a self-distributed deadlock.
- If an access to a reference table happens, and then a parallel
access to a distributed table (which has a fkey to the reference
table) happens, we switch to sequential mode.
Unified executor misses the function calls that marks the relation
accesses during the execution. Thus, simply add the necessary calls
and let the logic kick in.
* Make sure to close the failed connections after the execution
* Improve comments
* Fix savepoints in unified executor.
* Rebuild the WaitEventSet only when necessary
* Unclaim connections on all errors.
* Improve failure handling for unified executor
- Implement the notion of errorOnAnyFailure. This is similar to
Critical Connections that the connection managament APIs provide
- If the nodes inside a modifying transaction expand, activate 2PC
- Fix few bugs related to wait event sets
- Mark placement INACTIVE during the execution as much as possible
as opposed to we do in the COMMIT handler
- Fix few bugs related to scheduling next placement executions
- Improve decision on when to use 2PC
Improve the logic to start a transaction block for distributed transactions
- Make sure that only reference table modifications are always
executed with distributed transactions
- Make sure that stored procedures and functions are executed
with distributed transactions
* Move waitEventSet to DistributedExecution
This could also be local to RunDistributedExecution(), but in that case
we had to mark it as "volatile" to avoid PG_TRY()/PG_CATCH() issues, and
cast it to non-volatile when doing WaitEventSetFree(). We thought that
would make code a bit harder to read than making this non-local, so we
move it here. See comments for PG_TRY() in postgres/src/include/elog.h
and "man 3 siglongjmp" for more context.
* Fix multi_insert_select test outputs
Two things:
1) One complex transaction block is now supported. Simply update
the test output
2) Due to dynamic nature of the unified executor, the orders of
the errors coming from the shards might change (e.g., all of
the queries on the shards would fail, but which one appears
on the error message?). To fix that, we simply added it to
our shardId normalization tool which happens just before diff.
* Fix subeury_and_cte test
The error message is updated from:
failed to execute task
To:
more than one row returned by a subquery or an expression
which is a lot clearer to the user.
* Fix intermediate_results test outputs
Simply update the error message from:
could not receive query results
to
result "squares" does not exist
which makes a lot more sense.
* Fix multi_function_in_join test
The error messages update from:
Failed to execute task XXX
To:
function f(..) does not exist
* Fix multi_query_directory_cleanup test
The unified executor does not create any intermediate files.
* Fix with_transactions test
A test case that just started to work fine
* Fix multi_router_planner test outputs
The error message is update from:
Could not receive query results
To:
Relation does not exists
which is a lot more clearer for the users
* Fix multi_router_planner_fast_path test
The error message is update from:
Could not receive query results
To:
Relation does not exists
which is a lot more clearer for the users
* Fix isolation_copy_placement_vs_modification by disabling select_opens_transaction_block
* Fix ordering in isolation_multi_shard_modify_vs_all
* Add executor locks to unified executor
* Make sure to allocate enought WaitEvents
The previous code was missing the waitEvents for the latch and
postmaster death.
* Fix rebase conflicts for master rebase
* Make sure that TRUNCATE relies on unified executor
* Implement true sequential execution for multi-row INSERTS
Execute the individual tasks executed one by one. Note that this is different than
MultiShardConnectionType == SEQUENTIAL_CONNECTION case (e.g., sequential execution
mode). In that case, running the tasks across the nodes in parallel is acceptable
and implemented in that way.
However, the executions that are qualified here would perform poorly if the
tasks across the workers are executed in parallel. We currently qualify only
one class of distributed queries here, multi-row INSERTs. If we do not enforce
true sequential execution, concurrent multi-row upserts could easily form
a distributed deadlock when the upserts touch the same rows.
* Remove SESSION_LIFESPAN flag in unified_executor
* Apply failure test updates
We've changed the failure behaviour a bit, and also the error messages
that show up to the user. This PR covers majority of the updates.
* Unified executor honors citus.node_connection_timeout
With this commit, unified executor errors out if even
a single connection cannot be established within
citus.node_connection_timeout.
And, as a side effect this fixes failure_connection_establishment
test.
* Properly increment/decrement pool size variables
Before this commit, the idle and active connection
counts were not properly calculated.
* insert_select_executor goes through unified executor.
* Add missing file for task tracker
* Modify ExecuteTaskListExtended()'s signature
* Sort output of INSERT ... SELECT ... RETURNING
* Take partition locks correctly in unified executor
* Alternative implementation for force_max_query_parallelization
* Fix compile warnings in unified executor
* Fix style issues
* Decrement idleConnectionCount when idle connection is lost
* Always rebuild the wait event sets
In the previous implementation, on waitFlag changes, we were only
modifying the wait events. However, we've realized that it might
be an over optimization since (a) we couldn't see any performance
benefits (b) we see some errors on failures and because of (a)
we prefer to disable it now.
* Make sure to allocate enough sized waitEventSet
With multi-row INSERTs, we might have more sessions than
task*workerCount after few calls of RunDistributedExecution()
because the previous sessions would also be alive.
Instead, re-allocate events when the connectino set changes.
* Implement SELECT FOR UPDATE on reference tables
On master branch, we do two extra things on SELECT FOR UPDATE
queries on reference tables:
- Acquire executor locks
- Execute the query on all replicas
With this commit, we're implementing the same logic on the
new executor.
* SELECT FOR UPDATE opens transaction block even if SelectOpensTransactionBlock disabled
Otherwise, users would be very confused and their logic is very likely
to break.
* Fix build error
* Fix the newConnectionCount calculation in ManageWorkerPool
* Fix rebase conflicts
* Fix minor test output differences
* Fix citus indent
* Remove duplicate sorts that is added with rebase
* Create distributed table via executor
* Fix wait flags in CheckConnectionReady
* failure_savepoints output for unified executor.
* failure_vacuum output (pg 10) for unified executor.
* Fix WaitEventSetWait timeout in unified executor
* Stabilize failure_truncate test output
* Add an ORDER BY to multi_upsert
* Fix regression test outputs after rebase to master
* Add executor.c comment
* Rename executor.c to adaptive_executor.c
* Do not schedule tasks if the failed placement is not ready to execute
Before the commit, we were blindly scheduling the next placement executions
even if the failed placement is not on the ready queue. Now, we're ensuring
that if failed placement execution is on a failed pool or session where the
execution is on the pendingQueue, we do not schedule the next task. Because
the other placement execution should be already running.
* Implement a proper custom scan node for adaptive executor
- Switch between the executors, add GUC to set the pool size
- Add non-adaptive regression test suites
- Enable CIRCLE CI for non-adaptive tests
- Adjust test output files
* Add slow start interval to the executor
* Expose max_cached_connection_per_worker to user
* Do not start slow when there are cached connections
* Consider ExecutorSlowStartInterval in NextEventTimeout
* Fix memory issues with ReceiveResults().
* Disable executor via TaskExecutorType
* Make sure to execute the tests with the other executor
* Use task_executor_type to enable-disable adaptive executor
* Remove useless code
* Adjust the regression tests
* Add slow start regression test
* Rebase to master
* Fix test failures in adaptive executor.
* Rebase to master - 2
* Improve comments & debug messages
* Set force_max_query_parallelization in isolation_citus_dist_activity
* Force max parallelization for creating shards when asked to use exclusive connection.
* Adjust the default pool size
* Expand description of max_adaptive_executor_pool_size GUC
* Update warnings in FinishRemoteTransactionCommit()
* Improve session clean up at the end of execution
Explicitly list all the states that the execution might end,
otherwise warn.
* Remove MULTI_CONNECTION_WAIT_RETRY which is not used at all
* Add more ORDER BYs to multi_mx_partitioning
- All the schema creations on the workers will now be via superuser connections
- If a shard is being repaired or a shard is replicated, we will create the
schema only in the relevant worker; and in all the other cases where a schema
creation is needed, we will block operations until we ensure the schema exists
in all the workers
When `master_update_node` is called to update a node's location it waits for appropriate locks to become available. This is useful during normal operation as new operations will be blocked till after the metadata update while running operations have time to finish.
When `master_update_node` is called after a node failure it is less useful to wait for running operations to finish as they can't. The lock being held indicates an operation that once attempted to commit will fail as the machine already failed. Now the downside is the failover is postponed till the termination point of the operation. This has been observed by users to take a significant amount of time causing the rest of the system to be observed unavailable.
With this patch it is possible in such situations to invoke `master_update_node` with 2 optional arguments:
- `force` (bool defaults to `false`): When called with true the update of the metadata will be forced to proceed by terminating conflicting backends. A cancel is not enough as the backend might be in idle time (eg. an interactive session, or going back and forth between an appliaction), therefore a more intrusive solution of termination is used here.
- `lock_cooldown` (int defaults to `10000`): This is the time in milliseconds before conflicting backends are terminated. This is to allow the backends to finish cleanly before terminating them. This allows the user to set an upperbound to the expected time to complete the metadata update, eg. performing the failover.
The functionality is implemented by spawning a background worker that has the task of helping a certain backend in acquiring its locks. The backend is either terminated on successful execution of the metadata update, or once the memory context of the expression gets reset, eg. on a cancel of the statement.
Adds support for propagation of SET LOCAL commands to all workers
involved in a query. For now, SET SESSION (i.e. plain SET) is not
supported whatsoever, though this code is intended as somewhat of a
base for implementing such support in the future.
As SET LOCAL modifications are scoped to the body of a BEGIN/END xact
block, queries wishing to use SET LOCAL propagation must be within such
a block. In addition, subsequent modifications after e.g. any SAVEPOINT
or ROLLBACK statements will correspondingly push or pop variable mod-
ifications onto an internal stack such that the behavior of changed
values across the cluster will be identical to such behavior on e.g.
single-node PostgreSQL (or equivalently, what values are visible to
the end user by running SHOW on such variables on the coordinator).
If nodes enter the set of participants at some point after SET LOCAL
modifications (or SAVEPOINT, ROLLBACK, etc.) have occurred, the SET
variable state is eagerly propagated to them upon their entrance (this
is identical to, and indeed just augments, the existing logic for the
propagation of the SAVEPOINT "stack").
A new GUC (citus.propagate_set_commands) has been added to control this
behavior. Though the code suggests the valid settings are 'none', 'local',
'session', and 'all', only 'none' (the default) and 'local' are presently
implemented: attempting to use other values will result in an error.
This is a preperation for the new executor, where creating shards
would go through the executor. So, explicitly generate the commands
for further processing.
If a query is router executable, it hits a single shard and therefore has a
single task associated with it. Therefore there is no need to sort the task list
that has a single element.
Also we already have a list of active shard placements, sending it in param
and reuse it.
Instead of scattering the code around, we move all the
logic into a single function.
This will help supporting foreign keys to reference tables
in the unified executor with a single line of change, just
calling this function.
The feature is only intended for getting consistent outputs for the regression tests.
RETURNING does not have any ordering gurantees and with unified executor, the ordering
of query executions on the shards are also becoming unpredictable. Thus, we're enforcing
ordering when a GUC is set.
We implicitly add an `ORDER BY` something equivalent of
`
RETURNING expr1, expr2, .. ,exprN
ORDER BY expr1, expr2, .. ,exprN
`
As described in the code comments as well, this is probably not the most
performant approach we could implement. However, since we're only
targeting regression tests, I don't see any issues with that. If we
decide to expand this to a feature to users, we should revisit the
implementation and improve the performance.
Having DATA-segment string literals made blindly freeing the keywords/
values difficult, so I've switched to allocating all in the provided
context; because of this (and with the knowledge of the end point of
the global parameters), we can safely pfree non-global parameters when
we come across an invalid connection parameter entry.
Do it in two ways (a) re-use the rte list as much as possible instead of
re-calculating over and over again (b) Limit the recursion to the relevant
parts of the query tree
Before this commit, shardPlacements were identified with shardId, nodeName
and nodeport. Instead of using nodeName and nodePort, we now use nodeId
since it apparently has performance benefits in several places in the
code.
Before this commit, round-robin task assignment policy was relying
on the taskId. Thus, even inside a transaction, the tasks were
assigned to different nodes. This was especially problematic
while reading from reference tables within transaction blocks.
Because, we had to expand the distributed transaction to many
nodes that are not necessarily already in the distributed transaction.
In this context, we define "Fast Path Planning for SELECT" as trivial
queries where Citus can skip relying on the standard_planner() and
handle all the planning.
For router planner, standard_planner() is mostly important to generate
the necessary restriction information. Later, the restriction information
generated by the standard_planner is used to decide whether all the shards
that a distributed query touches reside on a single worker node. However,
standard_planner() does a lot of extra things such as cost estimation and
execution path generations which are completely unnecessary in the context
of distributed planning.
There are certain types of queries where Citus could skip relying on
standard_planner() to generate the restriction information. For queries
in the following format, Citus does not need any information that the
standard_planner() generates:
SELECT ... FROM single_table WHERE distribution_key = X; or
DELETE FROM single_table WHERE distribution_key = X; or
UPDATE single_table SET value_1 = value_2 + 1 WHERE distribution_key = X;
Note that the queries might not be as simple as the above such that
GROUP BY, WINDOW FUNCIONS, ORDER BY or HAVING etc. are all acceptable. The
only rule is that the query is on a single distributed (or reference) table
and there is a "distribution_key = X;" in the WHERE clause. With that, we
could use to decide the shard that a distributed query touches reside on
a worker node.
Postgresql loads shared libraries before calculating MaxBackends.
However, Citus relies on MaxBackends being set. Thus, with this
commit we use the same steps to calculate MaxBackends while
Citus is being loaded (e.g., PG_Init is called).
Note that this is safe since all the elements that are used to
calculate MaxBackends are PGC_POSTMASTER gucs and a constant
value.
Before this commit, Citus supported INSERT...SELECT queries with
ON CONFLICT or RETURNING clauses only for pushdownable ones, since
queries supported via coordinator were utilizing COPY infrastructure
of PG to send selected tuples to the target worker nodes.
After this PR, INSERT...SELECT queries with ON CONFLICT or RETURNING
clauses will be performed in two phases via coordinator. In the first
phase selected tuples will be saved to the intermediate table which
is colocated with target table of the INSERT...SELECT query. Note that,
a utility function to save results to the colocated intermediate result
also implemented as a part of this commit. In the second phase, INSERT..
SELECT query is directly run on the worker node using the intermediate
table as the source table.
Description: Support round-robin `task_assignment_policy` for queries to reference tables.
This PR allows users to query multiple placements of shards in a round robin fashion. When `citus.task_assignment_policy` is set to `'round-robin'` the planner will use a round robin scheduling feature when multiple shard placements are available.
The primary use-case is spreading the load of reference table queries to all the nodes in the cluster instead of hammering only the first placement of the reference table. Since reference tables share the same path for selecting the shards with single shard queries that have multiple placements (`citus.shard_replication_factor > 1`) this setting also allows users to spread the query load on these shards.
For modifying queries we do not apply a round-robin strategy. This would be negated by an extra reordering step in the executor for such queries where a `first-replica` strategy is enforced.
The file handling the utility functions (DDL) for citus organically grew over time and became unreasonably large. This refactor takes that file and refactored the functionality into separate files per command. Initially modeled after the directory and file layout that can be found in postgres.
Although the size of the change is quite big there are barely any code changes. Only one two functions have been added for readability purposes:
- PostProcessIndexStmt which is extracted from PostProcessUtility
- PostProcessAlterTableStmt which is extracted from multi_ProcessUtility
A README.md has been added to `src/backend/distributed/commands` describing the contents of the module and every file in the module.
We need more documentation around the overloading of the COPY command, for now the boilerplate has been added for people with better knowledge to fill out.
Drop schema command fails in mx mode if there
is a partitioned table with active partitions.
This is due to fact that sql drop trigger receives
all the dropped objects including partitions. When
we call drop table on parent partition, it also drops
the partitions on the mx node. This causes the drop
table command on partitions to fail on mx node because
they are already dropped when the partition parent was
dropped.
With this work we did not require the table to exist on
worker_drop_distributed_table.
With this commit, we all partitioned distributed tables with
replication factor > 1. However, we also have many restrictions.
In summary, we disallow all kinds of modifications (including DDLs)
on the partition tables. Instead, the user is allowed to run the
modifications over the parent table.
The necessity for such a restriction have two aspects:
- We need to acquire shard resource locks appropriately
- We need to handle marking partitions INVALID in case
of any failures. Note that, in theory, the parent table
should also become INVALID, which is too aggressive.
We acquire distributed lock on all mx nodes for truncated
tables before actually doing truncate operation.
This is needed for distributed serialization of the truncate
command without causing a deadlock.
This commit uses *_walker instead of *_mutator for performance reasons.
Given that we're only updating a functionId in the tree, the approach
seems fine.
In the distributed deadlock detection design, we concluded that prepared transactions
cannot be part of a distributed deadlock. The idea is that (a) when the transaction
is prepared it already acquires all the locks, so cannot be part of a deadlock
(b) even if some other processes blocked on the prepared transaction, prepared transactions
would eventually be committed (or rollbacked) and the system will continue operating.
With the above in mind, we probably had a mistake in terms of memory allocations. For each
backend initialized, we keep a `BackendData` struct. The bug we've introduced is that, we
assumed there would only be `MaxBackend` number of backends. However, `MaxBackends` doesn't
include the prepared transactions and axuliary processes. When you check Postgres' InitProcGlobal`
you'd see that `TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts;`
This commit aligns with total procs processed with that.
With this commit, we implement two views that are very similar
to pg_stat_activity, but showing queries that are involved in
distributed queries:
- citus_dist_stat_activity: Shows all the distributed queries
- citus_worker_stat_activity: Shows all the queries on the shards
that are initiated by distributed queries.
Both views have the same columns in the outputs. In very basic terms, both of the views
are meant to provide some useful insights about the distributed
transactions within the cluster. As the names reveal, both views are similar to pg_stat_activity.
Also note that these views can be pretty useful on Citus MX clusters.
Note that when the views are queried from the worker nodes, they'd not show the distributed
transactions that are initiated from the coordinator node. The reason is that the worker
nodes do not know the host/port of the coordinator. Thus, it is advisable to query the
views from the coordinator.
If we bucket the columns that the views returns, we'd end up with the following:
- Hostnames and ports:
- query_hostname, query_hostport: The node that the query is running
- master_query_host_name, master_query_host_port: The node in the cluster
initiated the query.
Note that for citus_dist_stat_activity view, the query_hostname-query_hostport
is always the same with master_query_host_name-master_query_host_port. The
distinction is mostly relevant for citus_worker_stat_activity. For example,
on Citus MX, a users starts a transaction on Node-A, which starts worker
transactions on Node-B and Node-C. In that case, the query hostnames would be
Node-B and Node-C whereas the master_query_host_name would Node-A.
- Distributed transaction related things:
This is mostly the process_id, distributed transactionId and distributed transaction
number.
- pg_stat_activity columns:
These two views get all the columns from pg_stat_activity. We're basically joining
pg_stat_activity with get_all_active_transactions on process_id.
We previously implemented OTHER_WORKERS_WITH_METADATA tag. However,
that was wrong. See the related discussion:
https://github.com/citusdata/citus/issues/2320
Instead, we switched using OTHER_WORKER_NODES and make the command
that we're running optional such that even if the node is not a
metadata node, we won't be in trouble.
This commit enables support for TRUNCATE on both
distributed table and reference tables.
The basic idea is to acquire lock on the relation by sending
the TRUNCATE command to all metedata worker nodes. We only
skip sending the TRUNCATE command to the node that actually
executus the command to prevent a self-distributed-deadlock.
This commit by default enables hiding shard names on MX workers
by simple replacing `pg_table_is_visible()` calls with
`citus_table_is_visible()` calls on the MX worker nodes. The latter
function filters out tables that are known to be shards.
The main motivation of this change is a better UX. The functionality
can be opted out via a GUC.
We also added two views, namely citus_shards_on_worker and
citus_shard_indexes_on_worker such that users can query
them to see the shards and their corresponding indexes.
We also added debug messages such that the filtered tables can
be interactively seen by setting the level to DEBUG1.
Add ability to understand whether a table is a
known shard on MX workers. Note that this is only useful
and applicable for hiding shards on MX worker nodes given
that we can have metadata only there.
When a hash distributed table have a foreign key to a reference
table, there are few restrictions we have to apply in order to
prevent distributed deadlocks or reading wrong results.
The necessity to apply the restrictions arise from cascading
nature of foreign keys. When a foreign key on a reference table
cascades to a distributed table, a single operation over a single
connection can acquire locks on multiple shards of the distributed
table. Thus, any parallel operation on that distributed table, in the
same transaction should not open parallel connections to the shards.
Otherwise, we'd either end-up with a self-distributed deadlock or
read wrong results.
As briefly described above, the restrictions that we apply is done
by tracking the distributed/reference relation accesses inside
transaction blocks, and act accordingly when necessary.
The two main rules are as follows:
- Whenever a parallel distributed relation access conflicts
with a consecutive reference relation access, Citus errors
out
- Whenever a reference relation access is followed by a
conflicting parallel relation access, the execution mode
is switched to sequential mode.
There are also some other notes to mention:
- If the user does SET LOCAL citus.multi_shard_modify_mode
TO 'sequential';, all the queries should simply work with
using one connection per worker and sequentially executing
the commands. That's obviously a slower approach than Citus'
usual parallel execution. However, we've at least have a way
to run all commands successfully.
- If an unrelated parallel query executed on any distributed
table, we cannot switch to sequential mode. Because, the essense
of sequential mode is using one connection per worker. However,
in the presence of a parallel connection, the connection manager
picks those connections to execute the commands. That contradicts
with our purpose, thus we error out.
- COPY to a distributed table cannot be executed in sequential mode.
Thus, if we switch to sequential mode and COPY is executed, the
operation fails and there is currently no way of implementing that.
Note that, when the local table is not empty and create_distributed_table
is used, citus uses COPY internally. Thus, in those cases,
create_distributed_table() will also fail.
- There is a GUC called citus.enforce_foreign_key_restrictions
to disable all the checks. We added that GUC since the restrictions
we apply is sometimes a bit more restrictive than its necessary.
The user might want to relax those. Similarly, if you don't have
CASCADEing reference tables, you might consider disabling all the
checks.
-[x] drop constraint
-[x] drop column
-[x] alter column type
-[x] truncate
are sequentialized if there is a foreign constraint from
a distributed table to a reference table on the affected relations
by the above commands.
Make sure that intermediate results use a connection that is
not associated with any placement. That is useful in two ways:
- More complex queries can be executed with CTEs
- Safely use the same connections when there is a foreign key
to reference table from a distributed table, which needs to
use the same connection for modifications since the reference
table might cascade to the distributed table.
To support more flexible (i.e. not at compile-time) specification of
libpq connection parameters, this change adds a new GUC, node_conninfo,
which must be a space-separated string of key-value pairs suitable for
parsing by libpq's connection establishment methods.
To avoid rebuilding and parsing these values at connection time, this
change also adds a cache in front of the configuration params to permit
immediate use of any previously-calculated parameters.
After this commit DDL commands honour `citus.multi_shard_modify_mode`.
We preferred using the code-path that executes single task router
queries (e.g., ExecuteSingleModifyTask()) in order not to invent
a new executor that is only applicable for DDL commands that require
sequential execution.
* Change worker_hash_partition_table() such that the
divergence between Citus planner's hashing and
worker_hash_partition_table() becomes the same.
* Rename single partitioning to single range partitioning.
* Add single hash repartitioning. Basically, logical planner
treats single hash and range partitioning almost equally.
Physical planner, on the other hand, treats single hash and
dual hash repartitioning almost equally (except for JoinPruning).
* Add a new GUC to enable this feature
- changes in ruleutils_11.c is reflected
- vacuum statement api change is handled. We now allow
multi-table vacuum commands.
- some other function header changes are reflected
- api conflicts between PG11 and earlier versions
are handled by adding shims in version_compat.h
- various regression tests are fixed due output and
functionality in PG1
- no change is made to support new features in PG11
they need to be handled by new commit
Before this commit, we had a divergence among
the creation of master/worker extended op nodes.
This commit moves the related parts into a single place
and allows the creation of master/extended op nodes to
share a common data structure.
After this commit large_table_shard_count wont be used to
check whether broadcast join, which is renamed as reference
join, can be applied. Reference join can only be applied over
reference tables.
Previously, we prevented creation of partitioned tables on Citus MX.
We decided to not focus on this feature until there is a need. Since
now there are requests for this feature, we are implementing support
for partitioned tables on Citus MX.
After this change all the logic related to shard data fetch logic
will be removed. Planner won't plan any ShardFetchTask anymore.
Shard fetch related steps in real time executor and task-tracker
executor have been removed.
Pushing down limit and order by into workers may produce
wrong output when distinct on() clause has expressions,
aggregates, or window functions.
This checking allows pushing down of limits only if
distinct clause is a superset of group by clause. i.e. it contains all clauses in group by.
This is the first of series of window function work.
We can now support window functions that can be pushed down to workers.
Window function must have distribution column in the partition clause
to be pushed down.
With #1804 (and related PRs), Citus gained the ability to
plan subqueries that are not safe to pushdown.
There are two high-level requirements for pushing down subqueries:
* Individual subqueries that require a merge step (i.e., GROUP BY
on non-distribution key, or LIMIT in the subquery etc). We've
handled such subqueries via #1876.
* Combination of subqueries that are not joined on distribution keys.
This commit aims to recursively plan some of such subqueries to make
the whole query safe to pushdown.
The main logic behind non colocated subquery joins is that we pick
an anchor range table entry and check for distribution key equality
of any other subqueries in the given query. If for a given subquery,
we cannot find distribution key equality with the anchor rte, we
recursively plan that subquery.
We also used a hacky solution for picking relations as the anchor range
table entries. The hack is that we wrap them into a subquery. This is only
necessary since some of the attribute equivalance checks are based on
queries rather than range table entries.
We use PostgreSQL hooks to accumulate the join restrictions
and PostgreSQL gives us all the join paths it tries while
deciding on the join order. Thus, for queries that have many
joins, this function is likely to remove lots of duplicate join
restrictions. This becomes relevant for Citus on query pushdown
check peformance.
VLAs aren't supported by Visual Studio.
- Remove all existing instances of VLAs.
- Add a flag, -Werror=vla, which makes gcc refuse to compile if we add
VLAs in the future.
* Don't use expressions inside compound statements
* Don't depend on __builtin_constant_p
* Remove reliance on S_ISLNK
* Replace use of __func__: older mcvs doesn't support this builtin
This commit introduces a new GUC to limit the intermediate
result size which we handle when we use read_intermediate_result
function for CTEs and complex subqueries.
With this commit, Citus recursively plans subqueries that
are not safe to pushdown, in other words, requires a merge
step.
The algorithm is simple: Recursively traverse the query from bottom
up (i.e., bottom meaning the leaf queries). On each level, check
whether the query is safe to pushdown (or a single repartition
subquery). If the answer is yes, do not touch that subquery. If the
answer is no, plan the subquery seperately (i.e., create a subPlan
for it) and replace the subquery with a call to
`read_intermediate_results(planId, subPlanId)`. During the the
execution, run the subPlans first, and make them avaliable to the
next query executions.
Some of the queries hat this change allows us:
* Subqueries with LIMIT
* Subqueries with GROUP BY/DISTINCT on non-partition keys
* Subqueries involving re-partition joins, router queries
* Mixed usage of subqueries and CTEs (i.e., use CTEs in
subqueries as well). Nested subqueries as long as we
support the subquery inside the nested subquery.
* Subqueries with local tables (i.e., those subqueries
has the limitation that they have to be leaf subqueries)
* VIEWs on the distributed tables just works (i.e., the
limitations mentioned below still applies to views)
Some of the queries that is still NOT supported:
* Corrolated subqueries that are not safe to pushdown
* Window function on non-partition keys
* Recursively planned subqueries or CTEs on the outer
side of an outer join
* Only recursively planned subqueries and CTEs in the FROM
(i.e., not any distributed tables in the FROM) and subqueries
in WHERE clause
* Subquery joins that are not on the partition columns (i.e., each
subquery is individually joined on partition keys but not the upper
level subquery.)
* Any limitation that logical planner applies such as aggregate
distincts (except for count) when GROUP BY is on non-partition key,
or array_agg with ORDER BY
Postgres provides OS agnosting formatting macros for
formatting 64 bit numbers. Replaced %ld %lu with
INT64_FORMAT and UINT64_FORMAT respectively.
Also found some incorrect usages of formatting
flags and fixed them.
In subquery pushdown, we first ensure that each relation is joined with at least
on another relation on the partition keys. That's fine given that the decision
is binary: pushdown the query at all or not.
With recursive planning, we'd want to check whether any specific part
of the query can be pushded down or not. Thus, we need the ability to
understand which part(s) of the subquery is safe to pushdown. This commit
adds the infrastructure for doing that.
Store pointers to shared hashes in process-local variables. Previously
pointers to shared hashes were put into shared memory. This causes
problems on EXEC_BACKEND because everybody calls execve and receives a
brand new address space; the shared hash will be in a different place
for every backend. (normally we call fork, which gives you a copy of the
address space, so these pointers remain constant)
It's possible to build INSERT SELECT queries which include implicit
casts, currently we attempt to support these by adding explicit casts to
the SELECT query, but this sometimes crashes because we don't update all
nodes with the new types. (SortClauses, for instance)
This commit removes those explicit casts and passes an unmodified SELECT
query to the COPY executor (how we implement INSERT SELECT under the
scenes). In lieu of those cases, COPY has been given some extra logic to
inspect queries, notice that the types don't line up with the table it's
supposed to be inserting into, and "manually" casting every tuple before
sending them to workers.
This patch adds --with-reports-host configure option, which sets the
REPORTS_BASE_URL constant. The default is reports.citusdata.com.
It also enables stats collection in tests.
Sends a request to /v1/releases/latest?flavor=$CITUS_EDITION once a day,
which returns a response similar to {"version": "7.1.0", "major": 7,
"minor": 1, "patch": 0}. Then compares it with current Citus version,
and if the latest release is newer, logs a LOG message.
This includes:
(1) Wrap everything inside a StartTransactionCommand()/CommitTransactionCommand().
This is so we can access the database. This also switches to a new memory context
and releases it, so we don't have to do our own memory management.
(2) LockCitusExtension() so the extension cannot be dropped or created concurrently.
(3) Check CitusHasBeenLoaded() && CheckCitusVersion() before doing any work.
(4) Do not PG_TRY() inside a loop.
By this commit, citus minds the replica identity of the table when
we distribute the table. So the shards of the distributed table
have the same replica identity with the local table.
This change introduces the `pg_dist_node_metadata` which has a single jsonb value. When creating
the extension, a random server id is generated and stored in there. Everything in the metadata table
is added as a nested objected to the json payload that is sent to the reports server.
This will provide the full project name (i.e. Citus/Citus Enterprise),
and the host system, compiler, and architecture word size.
I wanted to limit the number of copied files in 'config', so I added
only config.guess and call it manually, rather than using the macro
AC_CANONICAL_HOST, which requires several other files.
Eclipse apparently doesn't scan build output looking for -D flags, so
having the value actually appear in a header is nicer for those of us
using IDEs.
Adds ```citus.enable_statistics_collection``` GUC variable, which ```true``` by default, unless built without libcurl. If statistics collection is enabled, sends basic usage data to Citus servers every 24 hours.
The data that is collected consists of:
- Citus version
- OS name & release
- Hardware Id
- Number of tables, rounded to next power of 2
- Size of data, rounded to next power of 2
- Number of workers
This commit provides the support for window functions in subquery and insert
into select queries. Note that our support for window functions is still limited
because it must have a partition by clause on the distribution key. This commit
makes changes in the files insert_select_planner and multi_logical_planner. The
required tests are also added with files multi_subquery_window_functions.out
and multi_insert_select_window.out.
We sent multiple commands to worker when starting a transaction.
Previously we only checked the result of the first command that
is transaction 'BEGIN' which always succeeds. Any failure on
following commands were not checked.
With this commit, we make sure all command results are checked.
If there is any error we report the first error found.
Citus can handle INSERT INTO ... SELECT queries if the query inserts
into local table by reading data from distributed table. The opposite
way is not correct. With this commit we warn the user if the latter
option is used.
When a NULL connection is provided to PQerrorMessage(), the
returned error message is a static text. Modifying that static
text, which doesn't necessarly be in a writeable memory, is
dangreous and might cause a segfault.
This change adds support for SAVEPOINT, ROLLBACK TO SAVEPOINT, and RELEASE SAVEPOINT.
When transaction connections are not established yet, savepoints are kept in a stack and sent to the worker when the connection is later established. After establishing connections, savepoint commands are sent as they arrive.
This change fixes#1493 .
We added a new field to the transaction id that is set to true only
for the transactions initialized on the coordinator. This is only
useful for MX in order to distinguish the transaction that started
the distributed transaction on the coordinator where we could
have the same transactions' worker queries on the same node.
With this commit, the maintenance deamon starts to check for
distributed deadlocks.
We also introduced a GUC variable (distributed_deadlock_detection_factor)
whose value is multiplied with Postgres' deadlock_timeout. Setting
it to -1 disables the distributed deadlock detection.
This commit adds all the necessary pieces to do the distributed
deadlock detection.
Each distributed transaction is already assigned with distributed
transaction ids introduced with
3369f3486f. The dependency among the
distributed transactions are gathered with
80ea233ec1.
With this commit, we implement a DFS (depth first seach) on the
dependency graph and search for cycles. Finding a cycle reveals
a distributed deadlock.
Once we find the deadlock, we examine the path that the cycle exists
and cancel the youngest distributed transaction.
Note that, we're not yet enabling the deadlock detection by default
with this commit.
This GUC has two settings, 'always' and 'never'. When it's set to
'never' all behavior stays exactly as it was prior to this commit. When
it's set to 'always' only SELECT queries are allowed to run, and only
secondary nodes are used when processing those queries.
Add some helper functions:
- WorkerNodeIsSecondary(), checks the noderole of the worker node
- WorkerNodeIsReadable(), returns whether we're currently allowed to
read from this node
- ActiveReadableNodeList(), some functions (namely, the ones on the
SELECT path) don't require working with Primary Nodes. They should call
this function instead of ActivePrimaryNodeList(), because the latter
will error out in contexts where we're not allowed to write to nodes.
- ActiveReadableNodeCount(), like the above, replaces
ActivePrimaryNodeCount().
- EnsureModificationsCanRun(), error out if we're not currently allowed
to run queries which modify data. (Either we're in read-only mode or
use_secondary_nodes is set)
Some parts of the code were switched over to use readable nodes instead
of primary nodes:
- Deadlock detection
- DistributedTableSize,
- the router, real-time, and task tracker executors
- ShardPlacement resolution
This change declares two new functions:
`master_update_table_statistics` updates the statistics of shards belong
to the given table as well as its colocated tables.
`get_colocated_shard_array` returns the ids of colocated shards of a
given shard.
This is a pretty substantial refactoring of the existing modify path
within the router executor and planner. In particular, we now hunt for
all VALUES range table entries in INSERT statements and group the rows
contained therein by shard identifier. These rows are stashed away for
later in "ModifyRoute" elements. During deparse, the appropriate RTE
is extracted from the Query and its values list is replaced by these
rows before any SQL is generated.
In this way, we can create multiple Tasks, but only one per shard, to
piecemeal execute a multi-row INSERT. The execution of jobs containing
such tasks now exclusively go through the "multi-router executor" which
was previously used for e.g. INSERT INTO ... SELECT.
By piggybacking onto that executor, we participate in ongoing trans-
actions, get rollback-ability, etc. In short order, the only remaining
use of the "single modify" router executor will be for bare single-
row INSERT statements (i.e. those not in a transaction).
This change appropriately handles deferred pruning as well as master-
evaluated functions.
We use the backend shared memory lock for preventing
new backends to be part of a new distributed transaction
or an existing backend to leave a distributed transaction
while we're reading the all backends' data.
The primary goal is to provide consistent view of the
current distributed transactions while doing the
deadlock detection.
With this PR, Citus starts to support all possible ways to create
distributed partitioned tables. These are;
- Distributing already created partitioning hierarchy
- CREATE TABLE ... PARTITION OF a distributed_table
- ALTER TABLE distributed_table ATTACH PARTITION non_distributed_table
- ALTER TABLE distributed_table ATTACH PARTITION distributed_table
We also support DETACHing partitions from partitioned tables and propogating
TRUNCATE and DDL commands to distributed partitioned tables.
This PR also refactors some parts of distributed table creation logic.
- master_activate_node and master_disable_node correctly toggle
isActive, without crashing
- master_add_node rejects duplicate nodes, even if they're in different
clusters
- master_remove_node allows removing nodes in different clusters
This commit is preperation for introducing distributed partitioned
table support. We want to clean and refactor some code in distributed
table creation logic so that we can handle partitioned tables in more
robust way.
maxTaskStringSize determines the size of worker query string.
It was originally hard coded to a specific value. This has caused
issues at some users. Since it determines initial shared memory
allocation, we did not want to set it to an arbitrary higher number.
Instead made it configurable.
This commit introduces a new GUC variable max_task_string_size
Changes in this variable requires restart to be in effect.
In this commit, we add ability to convert global wait edges
into adjacency list with the following format:
[transactionId] = [transactionNode->waitsFor {list of waiting transaction nodes}]
This change adds a general purpose infrastructure to log and monitor
process about long running progresses. It uses
`pg_stat_get_progress_info` infrastructure, introduced with PostgreSQL
9.6 and used for tracking `VACUUM` commands.
This patch only handles the creation of a memory space in dynamic shared
memory, putting its info in `pg_stat_get_progress_info`, fetching the
progress monitors on demand and finalizing the progress tracking.
- master_add_node enforces that there is only one primary per group
- there's also a trigger on pg_dist_node to prevent multiple primaries
per group
- functions in metadata cache only return primary nodes
- Rename ActiveWorkerNodeList -> ActivePrimaryNodeList
- Rename WorkerGetLive{Node->Group}Count()
- Refactor WorkerGetRandomCandidateNode
- master_remove_node only complains about active shard placements if the
node being removed is a primary.
- master_remove_node only deletes all reference table placements in the
group if the node being removed is the primary.
- Rename {Node->NodeGroup}HasShardPlacements, this reflects the behavior it
already had.
- Rename DeleteAllReferenceTablePlacementsFrom{Node->NodeGroup}. This also
reflects the behavior it already had, but the new signature forces the
caller to pass in a groupId
- Rename {WorkerGetLiveGroup->ActivePrimaryNode}Count
This commit adds distributed transaction id infrastructure in
the scope of distributed deadlock detection.
In general, the distributed transaction id consists of a tuple
in the form of: `(databaseId, initiatorNodeIdentifier, transactionId,
timestamp)`.
Briefly, we add a shared memory block on each node, which holds some
information per backend (i.e., an array `BackendData backends[MaxBackends]`).
Later, on each coordinated transaction, Citus sends
`SELECT assign_distributed_transaction_id()` right after `BEGIN`.
For that backend on the worker, the distributed transaction id is set to
the values assigned via the function call.
The aim of the above is to correlate the transactions on the coordinator
to the transactions on the worker nodes.
Comes with a few changes:
- Change the signature of some functions to accept groupid
- InsertShardPlacementRow
- DeleteShardPlacementRow
- UpdateShardPlacementState
- NodeHasActiveShardPlacements returns true if the group the node is a
part of has any active shard placements
- TupleToShardPlacement now returns ShardPlacements which have NULL
nodeName and nodePort.
- Populate (nodeName, nodePort) when creating ShardPlacements
- Disallow removing a node if it contains any shard placements
- DeleteAllReferenceTablePlacementsFromNode matches based on group. This
doesn't change behavior for now (while there is only one node per
group), but means in the future callers should be careful about
calling it on a secondary node, it'll delete placements on the primary.
- Create concept of a GroupShardPlacement, which represents an actual
tuple in pg_dist_placement and is distinct from a ShardPlacement,
which has been resolved to a specific node. In the future
ShardPlacement should be renamed to NodeShardPlacement.
- Create some triggers which allow existing code to continue to insert
into and update pg_dist_shard_placement as if it still existed.
These functions are holdovers from pg_shard and were created for unit
testing c-level functions (like InsertShardPlacementRow) which our
regression tests already test quite effectively. Removing because it
makes refactoring the signatures of those c-level functions
unnecessarily difficult.
- create_healthy_local_shard_placement_row
- update_shard_placement_row_state
- delete_shard_placement_row
This commit is intended to be a base for supporting declarative partitioning
on distributed tables. Here we add the following utility functions and their
unit tests:
* Very basic functions including differnentiating partitioned tables and
partitions, listing the partitions
* Generating the PARTITION BY (expr) and adding this to the DDL events
of partitioned tables
* Ability to generate text representations of the ranges for partitions
* Ability to generate the `ALTER TABLE parent_table ATTACH PARTITION
partition_table FOR VALUES value_range`
* Ability to apply add shard ids to the above command using
`worker_apply_inter_shard_ddl_command()`
* Ability to generate `ALTER TABLE parent_table DETACH PARTITION`
Adds support for PostgreSQL 10 by copying in the requisite ruleutils
and updating all API usages to conform with changes in PostgreSQL 10.
Most changes are fairly minor but they are numerous. One particular
obstacle was the change in \d behavior in PostgreSQL 10's psql; I had
to add SQL implementations (views, mostly) to mimic the pre-10 output.
Add a second implementation of INSERT INTO distributed_table SELECT ... that is used if
the query cannot be pushed down. The basic idea is to execute the SELECT query separately
and pass the results into the distributed table using a CopyDestReceiver, which is also
used for COPY and create_distributed_table. When planning the SELECT, we go through
planner hooks again, which means the SELECT can also be a distributed query.
EXPLAIN is supported, but EXPLAIN ANALYZE is not because preventing double execution was
a lot more complicated in this case.
During version update, we indirectly calld CheckInstalledVersion via
ChackCitusVersions. This obviously fails because during version update it is
expected to have version mismatch between installed version and binary version.
Thus, we remove that ChackCitusVersions. We now only call ChackAvailableVersion.
Before this commit, we were erroring out at almost all queries if there is a
version mismatch. With this commit, we started to error out only requested
operation touches distributed tables.
Normally we would need to use distributed cache to understand whether a table
is distributed or not. However, it is not safe to read our metadata tables when
there is a version mismatch, thus it is not safe to create distributed cache.
Therefore for this specific occasion, we directly read from pg_dist_partition
table. However; reading from catalog is costly and we should not use this
method in other places as much as possible.
Distributed query planning for subquery pushdown is done on the original
query. This prevents the usage of external parameters on the execution.
To overcome this, we manually replace the parameters on the original
query.
* Enabling physical planner for subquery pushdown changes
This commit applies the logic that exists in INSERT .. SELECT
planning to the subquery pushdown changes.
The main algorithm is followed as :
- pick an anchor relation (i.e., target relation)
- per each target shard interval
- add the target shard interval's shard range
as a restriction to the relations (if all relations
joined on the partition keys)
- Check whether the query is router plannable per
target shard interval.
- If router plannable, create a task
* Add union support within the JOINS
This commit adds support for UNION/UNION ALL subqueries that are
in the following form:
.... (Q1 UNION Q2 UNION ...) as union_query JOIN (QN) ...
In other words, we currently do NOT support the queries that are
in the following form where union query is not JOINed with
other relations/subqueries :
.... (Q1 UNION Q2 UNION ...) as union_query ....
* Subquery pushdown planner uses original query
With this commit, we change the input to the logical planner for
subquery pushdown. Before this commit, the planner was relying
on the query tree that is transformed by the postgresql planner.
After this commit, the planner uses the original query. The main
motivation behind this change is the simplify deparsing of
subqueries.
* Enable top level subquery join queries
This work enables
- Top level subquery joins
- Joins between subqueries and relations
- Joins involving more than 2 range table entries
A new regression test file is added to reflect enabled test cases
* Add top level union support
This commit adds support for UNION/UNION ALL subqueries that are
in the following form:
.... (Q1 UNION Q2 UNION ...) as union_query ....
In other words, Citus supports allow top level
unions being wrapped into aggregations queries
and/or simple projection queries that only selects
some fields from the lower level queries.
* Disallow subqueries without a relation in the range table list for subquery pushdown
This commit disallows subqueries without relation in the range table
list. This commit is only applied for subquery pushdown. In other words,
we do not add this limitation for single table re-partition subqueries.
The reasoning behind this limitation is that if we allow pushing down
such queries, the result would include (shardCount * expectedResults)
where in a non distributed world the result would be (expectedResult)
only.
* Disallow subqueries without a relation in the range table list for INSERT .. SELECT
This commit disallows subqueries without relation in the range table
list. This commit is only applied for INSERT.. SELECT queries.
The reasoning behind this limitation is that if we allow pushing down
such queries, the result would include (shardCount * expectedResults)
where in a non distributed world the result would be (expectedResult)
only.
* Change behaviour of subquery pushdown flag (#1315)
This commit changes the behaviour of the citus.subquery_pushdown flag.
Before this commit, the flag is used to enable subquery pushdown logic. But,
with this commit, that behaviour is enabled by default. In other words, the
flag is now useless. We prefer to keep the flag since we don't want to break
the backward compatibility. Also, we may consider using that flag for other
purposes in the next commits.
* Require subquery_pushdown when limit is used in subquery
Using limit in subqueries may cause returning incorrect
results. Therefore we allow limits in subqueries only
if user explicitly set subquery_pushdown flag.
* Evaluate expressions on the LIMIT clause (#1333)
Subquery pushdown uses orignal query, the LIMIT and OFFSET clauses
are not evaluated. However, logical optimizer expects these expressions
are already evaluated by the standard planner. This commit manually
evaluates the functions on the logical planner for subquery pushdown.
* Better format subquery regression tests (#1340)
* Style fix for subquery pushdown regression tests
With this commit we intented a more consistent style for the
regression tests we've added in the
- multi_subquery_union.sql
- multi_subquery_complex_queries.sql
- multi_subquery_behavioral_analytics.sql
* Enable the tests that are temporarily commented
This commit enables some of the regression tests that were commented
out until all the development is done.
* Fix merge conflicts (#1347)
- Update regression tests to meet the changes in the regression
test output.
- Replace Ifs with Asserts given that the check is already done
- Update shard pruning outputs
* Add view regression tests for increased subquery coverage (#1348)
- joins between views and tables
- joins between views
- union/union all queries involving views
- views with limit
- explain queries with view
* Improve btree operators for the subquery tests
This commit adds the missing comprasion for subquery composite key
btree comparator.
It semms that GEQO optimizations, when it is set to on, create their own memory context
and free it after when it is no longer necessary. In join multi_join_restriction_hook
we allocate our variables in the CurrentMemoryContext, which is GEQO's memory context
if it is active. To prevent deallocation of our variables when GEQO's memory context is
freed, we started to allocate memory fo these variables in separate MemoryContext.
So far citus used postgres' predicate proofing logic for shard
pruning, except for INSERT and COPY which were already optimized for
speed. That turns out to be too slow:
* Shard pruning for SELECTs is currently O(#shards), because
PruneShardList calls predicate_refuted_by() for every
shard. Obviously using an O(N) type algorithm for general pruning
isn't good.
* predicate_refuted_by() is quite expensive on its own right. That's
primarily because it's optimized for doing a single refutation
proof, rather than performing the same proof over and over.
* predicate_refuted_by() does not keep persistent state (see 2.) for
function calls, which means that a lot of syscache lookups will be
performed. That's particularly bad if the partitioning key is a
composite key, because without a persistent FunctionCallInfo
record_cmp() has to repeatedly look-up the type definition of the
composite key. That's quite expensive.
Thus replace this with custom-code that works in two phases:
1) Search restrictions for constraints that can be pruned upon
2) Use those restrictions to search for matching shards in the most
efficient manner available:
a) Binary search / Hash Lookup in case of hash partitioned tables
b) Binary search for equal clauses in case of range or append
tables without overlapping shards.
c) Binary search for inequality clauses, searching for both lower
and upper boundaries, again in case of range or append
tables without overlapping shards.
d) exhaustive search testing each ShardInterval
My measurements suggest that we are considerably, often orders of
magnitude, faster than the previous solution, even if we have to fall
back to exhaustive pruning.
This determines whether it's possible to perform binary search on
sortedShardIntervalArray or not. If e.g. two shards have overlapping
ranges, that'd be prohibitive.
That'll be useful in later commit introducing faster shard pruning.
That's useful when comparing values a hash-partitioned table is
filtered by. The existing shardIntervalCompareFunction is about
comparing hashed values, not unhashed ones.
The added btree opclass function is so we can get a comparator
back. This should be changed much more widely, but is not necessary so
far.
Previously we, unnecessarily, used a the first shard's type
information to to look up the comparison function. But that
information is already available, so use it. That's helpful because
we sometimes want to access the comparator function even if there's no
shards.
With this commit, we started to send explain queries within a savepoint. After
running explain query, we rollback to savepoint. This saves us from side effects
of EXPLAIN ANALYZE on DML queries.
All callers fetch a cache entry and extract/compute arguments for the
eventual FindShardInterval call, so it makes more sense to refactor
into that function itself; this solves the use-after-free bug, too.
With this change we add an option to add a node without replicating all reference
tables to that node. If a node is added with this option, we mark the node as
inactive and no queries will sent to that node.
We also added two new UDFs;
- master_activate_node(host, port):
- marks node as active and replicates all reference tables to that node
- master_add_inactive_node(host, port):
- only adds node to pg_dist_node
In this PR, we aim to deduce whether each of the RTE_RELATION
is joined with at least on another RTE_RELATION on their partition keys. If each
RTE_RELATION follows the above rule, we can conclude that all RTE_RELATIONs are
joined on their partition keys.
In order to do that, we invented a new equivalence class namely:
AttributeEquivalenceClass. In very simple words, a AttributeEquivalenceClass is
identified by an unique id and consists of a list of AttributeEquivalenceMembers.
Each AttributeEquivalenceMember is designed to identify attributes uniquely within the
whole query. The necessity of this arise since varno attributes are defined within
a single level of a query. Instead, here we want to identify each RTE_RELATION uniquely
and try to find equality among each RTE_RELATION's partition key.
Whenever we find an equality clause A = B, where both A and B originates from
relation attributes (i.e., not random expressions), we create an
AttributeEquivalenceClass to record this knowledge. If we later find another
equivalence B = C, we create another AttributeEquivalenceClass. Finally, we can
apply transitity rules and generate a new AttributeEquivalenceClass which includes
A, B and C.
Note that equality among the members are identified by the varattno and rteIdentity.
Each equality among RTE_RELATION is saved using an AttributeEquivalenceClass where
each member attribute is identified by a AttributeEquivalenceMember. In the final
step, we try generate a common attribute equivalence class that holds as much as
AttributeEquivalenceMembers whose attributes are a partition keys.
The use of a bare src/ rather than $srcdir caused configure to fail
during VPATH builds. With our additional dependency upon AWK, we need
to call AC_PROG_AWK, otherwise environments may not have $AWK set.
Finally, citus_version.h should be in .gitignore.
With this change, we start to error out if loaded citus binaries does not match
the available major version or installed citus extension version. In this case
we force user to restart the server or run ALTER EXTENSION depending on the
situation
Thought this looked slightly nicer than the default behavior.
Changed preventTransaction to concurrent to be clearer that this code
path presently affects CONCURRENTLY code only.
Custom Scan is a node in the planned statement which helps external providers
to abstract data scan not just for foreign data wrappers but also for regular
relations so you can benefit your version of caching or hardware optimizations.
This sounds like only an abstraction on the data scan layer, but we can use it
as an abstraction for our distributed queries. The only thing we need to do is
to find distributable parts of the query, plan for them and replace them with
a Citus Custom Scan. Then, whenever PostgreSQL hits this custom scan node in
its Vulcano style execution, it will call our callback functions which run
distributed plan and provides tuples to the upper node as it scans a regular
relation. This means fewer code changes, fewer bugs and more supported features
for us!
First, in the distributed query planner phase, we create a Custom Scan which
wraps the distributed plan. For real-time and task-tracker executors, we add
this custom plan under the master query plan. For router executor, we directly
pass the custom plan because there is not any master query. Then, we simply let
the PostgreSQL executor run this plan. When it hits the custom scan node, we
call the related executor parts for distributed plan, fill the tuple store in
the custom scan and return results to PostgreSQL executor in Vulcano style,
a tuple per XXX_ExecScan() call.
* Modify planner to utilize Custom Scan node.
* Create different scan methods for different executors.
* Use native PostgreSQL Explain for master part of queries.
This change ignores `citus.replication_model` setting and uses the
statement based replication in
- Tables distributed via the old `master_create_distributed_table` function
- Append and range partitioned tables, even if created via
`create_distributed_table` function
This seems like the easiest solution to #1191, without changing the existing
behavior and harming existing users with custom scripts.
This change also prevents RF>1 on streaming replicated tables on `master_create_worker_shards`
Prior to this change, `master_create_worker_shards` command was not checking
the replication model of the target table, thus allowing RF>1 with streaming
replicated tables. With this change, `master_create_worker_shards` errors
out on the case.
- Break CheckShardPlacements into multiple functions (The most important
is MarkFailedShardPlacements), so that we can get rid of the global
CoordinatedTransactionUses2PC.
- Call MarkFailedShardPlacements in the router executor, so we mark
shards as invalid and stop using them while inside transaction blocks.
This UDF returns a shard placement from cache given shard id and placement id. At the
moment it iterates over all shard placements of given shard by ShardPlacementList and
searches given placement id in that list, which is not a good solution performance-wise.
However, currently, this function will be used only when there is a failed transaction.
If a need arises we can optimize this function in the future.
All router, real-time, task-tracker plannable queries should now have
full prepared statement support (and even use router when possible),
unless they don't go through the custom plan interface (which
basically just affects LANGUAGE SQL (not plpgsql) functions).
This is achieved by forcing postgres' planner to always choose a
custom plan, by assigning very low costs to plans with bound
parameters (i.e. ones were the postgres planner replanned the query
upon EXECUTE with all parameter values provided), instead of the
generic one.
This requires some trickery, because for custom plans to work the
costs for a non-custom plan have to be known, which means we can't
error out when planning the generic plan. Instead we have to return a
"faux" plan, that'd trigger an error message if executed. But due to
the custom plan logic that plan will likely (unless called by an SQL
function, or because we can't support that query for some reason) not
be executed; instead the custom plan will be chosen.
So far router planner had encapsulated different functionality in
MultiRouterPlanCreate. Modifications always go through router, selects
sometimes. Modifications always error out if the query is unsupported,
selects return NULL. Especially the error handling is a problem for
the upcoming extension of prepared statement support.
Split MultiRouterPlanCreate into CreateRouterPlan and
CreateModifyPlan, and change them to not throw errors.
Instead errors are now reported by setting the new
MultiPlan->plannigError.
Callers of router planner functionality now have to throw errors
themselves if desired, but also can skip doing so.
This is a pre-requisite for expanding prepared statement support.
While touching all those lines, improve a number of error messages by
getting them closer to the postgres error message guidelines.
It can be useful, e.g. in the upcoming prepared statement support, to
be able to return an error from a function that is not raised
immediately, but can later be thrown. That allows e.g. to attempt to
plan a statment using different methods and to create good error
messages in each planner, but to only error out after all planners
have been run.
To enable that create support for deferred error messages that can be
created (supporting errorcode, message, detail, hint) in one function,
and then thrown in different place.
This adds a replication_model GUC which is used as the replication
model for any new distributed table that is not a reference table.
With this change, tables with replication factor 1 are no longer
implicitly MX tables.
The GUC is similarly respected during empty shard creation for e.g.
existing append-partitioned tables. If the model is set to streaming
while replication factor is greater than one, table and shard creation
routines will error until this invalid combination is corrected.
Changing this parameter requires superuser permissions.
If any placements fail it doesn't update shard statistics on those placements.
A minor enabling refactor: Make CoordinatedTransactionUses2PC public (it used to be CoordinatedTransactionUse2PC but that symbol already existed, so renamed it as well)
This enables proper transactional behaviour for copy and relaxes some
restrictions like combining COPY with single-row modifications. It
also provides the basis for relaxing restrictions further, and for
optionally allowing connection caching.
They make fixing explain for prepared statement harder, and they don't
really fit into EXPLAIN in the first place. Additionally they're
currently not exercised in any tests.
This change adds support for serial columns to be used with MX tables.
Prior to this change, sequences of serial columns were created in all
workers (for being able to create shards) but never used. With MX, we
need to set the sequences so that sequences in each worker create
unique values. This is done by setting the MINVALUE, MAXVALUE and
START values of the sequence.
A small refactor which pulls some code out of `RecoverWorkerTransactions`
and into `remote_commands.c`. This code block currently only occurs in
`RecoverWorkerTransactions` but will be useful to other functions
shortly.
Unfortunately we couldn't call it `ExecuteRemoteCommand`, that name was
already taken.
With this change, we start to delete placement of reference tables at given worker node
after master_remove_node UDF call. We remove placement metadata at master node but we do
not drop actual shard from the worker node. There are two reasons for that decision,
first, it is not critical to DROP the shards in the workers because Citus will ignore them
as long as node is removed from cluster and if we add that node back to cluster we will
DROP and recreate all reference tables. Second, if node is unreachable, it becomes
complicated to cover failure cases and have a transaction support.
Enables use views within distributed queries.
User can create and use a view on distributed tables/queries
as he/she would use with regular queries.
After this change router queries will have full support for views,
insert into select queries will support reading from views, not
writing into. Outer joins would have a limited support, and would
error out at certain cases such as when a view is in the inner side
of the outer join.
Although PostgreSQL supports writing into views under certain circumstances.
We disallowed that for distributed views.
So far we've reloaded them frequently. Besides avoiding that cost -
noticeable for some workloads with large shard counts - it makes it
easier to add information to ShardPlacements that help us make
placement_connection.c colocation aware.
Remove the router specific transaction and shard management, and
replace it with the new placement connection API. This mostly leaves
behaviour alone, except that it is now, inside a transaction, legal to
select from a shard to which no pre-existing connection exists.
To simplify code the code handling task executions for select and
modify has been split into two - the previous coding was starting to
get confusing due to the amount of only conditionally applicable code.
Modification connections & transactions are now always established in
parallel, not just for reference tables.
Currently there are several places in citus that map placements to
connections and that manage placement health. Centralize this
knowledge. Because of the centralized knowledge about which
connection has previously been used for which shard/placement, this
also provides the basis for relaxing restrictions around combining
various forms of DDL/DML.
Connections for a placement can now be acquired using
GetPlacementConnection(). If the connection is used for DML or DDL the
FOR_DDL/DML flags should be used respectively. If an individual
remote transaction fails (but the transaction on the master succeeds)
and FOR_DDL/DML have been specified, the placement is marked as
invalid, unless that'd mark all placements for a shard as invalid.
With this change, we start to replicate all reference tables to the new node when new node
is added to the cluster with master_add_node command. We also update replication factor
of reference table's colocation group.
With this change we introduce new UDF, upgrade_to_reference_table, which can be used to
upgrade existing broadcast tables reference tables. For upgrading, we require that given
table contains only one shard.
Renamed FindShardIntervalIndex() to ShardIndex() and added binary search
capability. It used to assume that hash partition tables are always
uniformly distributed which is not true if upcoming tenant isolation
feature is applied. This commit also reduces code duplication.
With this commit, we implemented some basic features of reference tables.
To start with, a reference table is
* a distributed table whithout a distribution column defined on it
* the distributed table is single sharded
* and the shard is replicated to all nodes
Reference tables follows the same code-path with a single sharded
tables. Thus, broadcast JOINs are applicable to reference tables.
But, since the table is replicated to all nodes, table fetching is
not required any more.
Reference tables support the uniqueness constraints for any column.
Reference tables can be used in INSERT INTO .. SELECT queries with
the following rules:
* If a reference table is in the SELECT part of the query, it is
safe join with another reference table and/or hash partitioned
tables.
* If a reference table is in the INSERT part of the query, all
other participating tables should be reference tables.
Reference tables follow the regular co-location structure. Since
all reference tables are single sharded and replicated to all nodes,
they are always co-located with each other.
Queries involving only reference tables always follows router planner
and executor.
Reference tables can have composite typed columns and there is no need
to create/define the necessary support functions.
All modification queries, master_* UDFs, EXPLAIN, DDLs, TRUNCATE,
sequences, transactions, COPY, schema support works on reference
tables as expected. Plus, all the pre-requisites associated with
distribution columns are dismissed.
We used to disable router planner and executor
when task executor is set to task-tracker.
This change enables router planning and execution
at all times regardless of task execution mode.
We are introducing a hidden flag enable_router_execution
to enable/disable router execution. Its default value is
true. User may disable router planning by setting it to false.
Adds support for VACUUM and ANALYZE commands which target a specific
distributed table. After grabbing the appropriate locks, this imple-
mentation sends VACUUM commands to each placement (using one connec-
tion per placement). These commands are sent in parallel, so users
with large tables will benefit from sharding. Except for VERBOSE, all
VACUUM and ANALYZE options are supported, including the explicit
column list used by ANALYZE.
As with many of our utility commands, the local command also runs. In
the VACUUM/ANALYZE case, the local command is executed before any re-
mote propagation. Because error handling is managed after local proc-
essing, this can result in a VACUUM completing locally but erroring
out when distributed processing commences: a minor technicality in all
cases, as there isn't really much reason to ever roll back a VACUUM (an
impossibility in any case, as VACUUM cannot run within a transaction).
Remote propagation of targeted VACUUM/ANALYZE is controlled by the
enable_ddl_propagation setting; warnings are emitted if such a command
is attempted when DDL propagation is disabled. Unqualified VACUUM or
ANALYZE is not handled, but a warning message informs the user of this.
Implementation note: this commit adds a "BARE" value to MultiShard-
CommitProtocol. When active, no BEGIN command is ever sent to remote
nodes, useful for commands such as VACUUM/ANALYZE which must not run in
a transaction block. This value is not user-facing and is reset at
transaction end.
This change adds `start_metadata_sync_to_node` UDF which copies the metadata about nodes and MX tables
from master to the specified worker, sets its local group ID and marks its hasmetadata to true to
allow it receive future DDL changes.