With this commit, we're introducing the Adaptive Executor.
The commit message consists of two distinct sections. The first part explains
how the executor works. The second part consists of the commit messages of
the individual smaller commits that resulted in this commit. The readers
can search for the each of the smaller commit messages on
https://github.com/citusdata/citus and can learn more about the history
of the change.
/*-------------------------------------------------------------------------
*
* adaptive_executor.c
*
* The adaptive executor executes a list of tasks (queries on shards) over
* a connection pool per worker node. The results of the queries, if any,
* are written to a tuple store.
*
* The concepts in the executor are modelled in a set of structs:
*
* - DistributedExecution:
* Execution of a Task list over a set of WorkerPools.
* - WorkerPool
* Pool of WorkerSessions for the same worker which opportunistically
* executes "unassigned" tasks from a queue.
* - WorkerSession:
* Connection to a worker that is used to execute "assigned" tasks
* from a queue and may execute unasssigned tasks from the WorkerPool.
* - ShardCommandExecution:
* Execution of a Task across a list of placements.
* - TaskPlacementExecution:
* Execution of a Task on a specific placement.
* Used in the WorkerPool and WorkerSession queues.
*
* Every connection pool (WorkerPool) and every connection (WorkerSession)
* have a queue of tasks that are ready to execute (readyTaskQueue) and a
* queue/set of pending tasks that may become ready later in the execution
* (pendingTaskQueue). The tasks are wrapped in a ShardCommandExecution,
* which keeps track of the state of execution and is referenced from a
* TaskPlacementExecution, which is the data structure that is actually
* added to the queues and describes the state of the execution of a task
* on a particular worker node.
*
* When the task list is part of a bigger distributed transaction, the
* shards that are accessed or modified by the task may have already been
* accessed earlier in the transaction. We need to make sure we use the
* same connection since it may hold relevant locks or have uncommitted
* writes. In that case we "assign" the task to a connection by adding
* it to the task queue of specific connection (in
* AssignTasksToConnections). Otherwise we consider the task unassigned
* and add it to the task queue of a worker pool, which means that it
* can be executed over any connection in the pool.
*
* A task may be executed on multiple placements in case of a reference
* table or a replicated distributed table. Depending on the type of
* task, it may not be ready to be executed on a worker node immediately.
* For instance, INSERTs on a reference table are executed serially across
* placements to avoid deadlocks when concurrent INSERTs take conflicting
* locks. At the beginning, only the "first" placement is ready to execute
* and therefore added to the readyTaskQueue in the pool or connection.
* The remaining placements are added to the pendingTaskQueue. Once
* execution on the first placement is done the second placement moves
* from pendingTaskQueue to readyTaskQueue. The same approach is used to
* fail over read-only tasks to another placement.
*
* Once all the tasks are added to a queue, the main loop in
* RunDistributedExecution repeatedly does the following:
*
* For each pool:
* - ManageWorkPool evaluates whether to open additional connections
* based on the number unassigned tasks that are ready to execute
* and the targetPoolSize of the execution.
*
* Poll all connections:
* - We use a WaitEventSet that contains all (non-failed) connections
* and is rebuilt whenever the set of active connections or any of
* their wait flags change.
*
* We almost always check for WL_SOCKET_READABLE because a session
* can emit notices at any time during execution, but it will only
* wake up WaitEventSetWait when there are actual bytes to read.
*
* We check for WL_SOCKET_WRITEABLE just after sending bytes in case
* there is not enough space in the TCP buffer. Since a socket is
* almost always writable we also use WL_SOCKET_WRITEABLE as a
* mechanism to wake up WaitEventSetWait for non-I/O events, e.g.
* when a task moves from pending to ready.
*
* For each connection that is ready:
* - ConnectionStateMachine handles connection establishment and failure
* as well as command execution via TransactionStateMachine.
*
* When a connection is ready to execute a new task, it first checks its
* own readyTaskQueue and otherwise takes a task from the worker pool's
* readyTaskQueue (on a first-come-first-serve basis).
*
* In cases where the tasks finish quickly (e.g. <1ms), a single
* connection will often be sufficient to finish all tasks. It is
* therefore not necessary that all connections are established
* successfully or open a transaction (which may be blocked by an
* intermediate pgbouncer in transaction pooling mode). It is therefore
* essential that we take a task from the queue only after opening a
* transaction block.
*
* When a command on a worker finishes or the connection is lost, we call
* PlacementExecutionDone, which then updates the state of the task
* based on whether we need to run it on other placements. When a
* connection fails or all connections to a worker fail, we also call
* PlacementExecutionDone for all queued tasks to try the next placement
* and, if necessary, mark shard placements as inactive. If a task fails
* to execute on all placements, the execution fails and the distributed
* transaction rolls back.
*
* For multi-row INSERTs, tasks are executed sequentially by
* SequentialRunDistributedExecution instead of in parallel, which allows
* a high degree of concurrency without high risk of deadlocks.
* Conversely, multi-row UPDATE/DELETE/DDL commands take aggressive locks
* which forbids concurrency, but allows parallelism without high risk
* of deadlocks. Note that this is unrelated to SEQUENTIAL_CONNECTION,
* which indicates that we should use at most one connection per node, but
* can run tasks in parallel across nodes. This is used when there are
* writes to a reference table that has foreign keys from a distributed
* table.
*
* Execution finishes when all tasks are done, the query errors out, or
* the user cancels the query.
*
*-------------------------------------------------------------------------
*/
All the commits involved here:
* Initial unified executor prototype
* Latest changes
* Fix rebase conflicts to master branch
* Add missing variable for assertion
* Ensure that master_modify_multiple_shards() returns the affectedTupleCount
* Adjust intermediate result sizes
The real-time executor uses COPY command to get the results
from the worker nodes. Unified executor avoids that which
results in less data transfer. Simply adjust the tests to lower
sizes.
* Force one connection per placement (or co-located placements) when requested
The existing executors (real-time and router) always open 1 connection per
placement when parallel execution is requested.
That might be useful under certain circumstances:
(a) User wants to utilize as much as CPUs on the workers per
distributed query
(b) User has a transaction block which involves COPY command
Also, lots of regression tests rely on this execution semantics.
So, we'd enable few of the tests with this change as well.
* For parameters to be resolved before using them
For the details, see PostgreSQL's copyParamList()
* Unified executor sorts the returning output
* Ensure that unified executor doesn't ignore sequential execution of DDLJob's
Certain DDL commands, mainly creating foreign keys to reference tables,
should be executed sequentially. Otherwise, we'd end up with a self
distributed deadlock.
To overcome this situaiton, we set a flag `DDLJob->executeSequentially`
and execute it sequentially. Note that we have to do this because
the command might not be called within a transaction block, and
we cannot call `SetLocalMultiShardModifyModeToSequential()`.
This fixes at least two test: multi_insert_select_on_conflit.sql and
multi_foreign_key.sql
Also, I wouldn't mind scattering local `targetPoolSize` variables within
the code. The reason is that we'll soon have a GUC (or a global
variable based on a GUC) that'd set the pool size. In that case, we'd
simply replace `targetPoolSize` with the global variables.
* Fix 2PC conditions for DDL tasks
* Improve closing connections that are not fully established in unified execution
* Support foreign keys to reference tables in unified executor
The idea for supporting foreign keys to reference tables is simple:
Keep track of the relation accesses within a transaction block.
- If a parallel access happens on a distributed table which
has a foreign key to a reference table, one cannot modify
the reference table in the same transaction. Otherwise,
we're very likely to end-up with a self-distributed deadlock.
- If an access to a reference table happens, and then a parallel
access to a distributed table (which has a fkey to the reference
table) happens, we switch to sequential mode.
Unified executor misses the function calls that marks the relation
accesses during the execution. Thus, simply add the necessary calls
and let the logic kick in.
* Make sure to close the failed connections after the execution
* Improve comments
* Fix savepoints in unified executor.
* Rebuild the WaitEventSet only when necessary
* Unclaim connections on all errors.
* Improve failure handling for unified executor
- Implement the notion of errorOnAnyFailure. This is similar to
Critical Connections that the connection managament APIs provide
- If the nodes inside a modifying transaction expand, activate 2PC
- Fix few bugs related to wait event sets
- Mark placement INACTIVE during the execution as much as possible
as opposed to we do in the COMMIT handler
- Fix few bugs related to scheduling next placement executions
- Improve decision on when to use 2PC
Improve the logic to start a transaction block for distributed transactions
- Make sure that only reference table modifications are always
executed with distributed transactions
- Make sure that stored procedures and functions are executed
with distributed transactions
* Move waitEventSet to DistributedExecution
This could also be local to RunDistributedExecution(), but in that case
we had to mark it as "volatile" to avoid PG_TRY()/PG_CATCH() issues, and
cast it to non-volatile when doing WaitEventSetFree(). We thought that
would make code a bit harder to read than making this non-local, so we
move it here. See comments for PG_TRY() in postgres/src/include/elog.h
and "man 3 siglongjmp" for more context.
* Fix multi_insert_select test outputs
Two things:
1) One complex transaction block is now supported. Simply update
the test output
2) Due to dynamic nature of the unified executor, the orders of
the errors coming from the shards might change (e.g., all of
the queries on the shards would fail, but which one appears
on the error message?). To fix that, we simply added it to
our shardId normalization tool which happens just before diff.
* Fix subeury_and_cte test
The error message is updated from:
failed to execute task
To:
more than one row returned by a subquery or an expression
which is a lot clearer to the user.
* Fix intermediate_results test outputs
Simply update the error message from:
could not receive query results
to
result "squares" does not exist
which makes a lot more sense.
* Fix multi_function_in_join test
The error messages update from:
Failed to execute task XXX
To:
function f(..) does not exist
* Fix multi_query_directory_cleanup test
The unified executor does not create any intermediate files.
* Fix with_transactions test
A test case that just started to work fine
* Fix multi_router_planner test outputs
The error message is update from:
Could not receive query results
To:
Relation does not exists
which is a lot more clearer for the users
* Fix multi_router_planner_fast_path test
The error message is update from:
Could not receive query results
To:
Relation does not exists
which is a lot more clearer for the users
* Fix isolation_copy_placement_vs_modification by disabling select_opens_transaction_block
* Fix ordering in isolation_multi_shard_modify_vs_all
* Add executor locks to unified executor
* Make sure to allocate enought WaitEvents
The previous code was missing the waitEvents for the latch and
postmaster death.
* Fix rebase conflicts for master rebase
* Make sure that TRUNCATE relies on unified executor
* Implement true sequential execution for multi-row INSERTS
Execute the individual tasks executed one by one. Note that this is different than
MultiShardConnectionType == SEQUENTIAL_CONNECTION case (e.g., sequential execution
mode). In that case, running the tasks across the nodes in parallel is acceptable
and implemented in that way.
However, the executions that are qualified here would perform poorly if the
tasks across the workers are executed in parallel. We currently qualify only
one class of distributed queries here, multi-row INSERTs. If we do not enforce
true sequential execution, concurrent multi-row upserts could easily form
a distributed deadlock when the upserts touch the same rows.
* Remove SESSION_LIFESPAN flag in unified_executor
* Apply failure test updates
We've changed the failure behaviour a bit, and also the error messages
that show up to the user. This PR covers majority of the updates.
* Unified executor honors citus.node_connection_timeout
With this commit, unified executor errors out if even
a single connection cannot be established within
citus.node_connection_timeout.
And, as a side effect this fixes failure_connection_establishment
test.
* Properly increment/decrement pool size variables
Before this commit, the idle and active connection
counts were not properly calculated.
* insert_select_executor goes through unified executor.
* Add missing file for task tracker
* Modify ExecuteTaskListExtended()'s signature
* Sort output of INSERT ... SELECT ... RETURNING
* Take partition locks correctly in unified executor
* Alternative implementation for force_max_query_parallelization
* Fix compile warnings in unified executor
* Fix style issues
* Decrement idleConnectionCount when idle connection is lost
* Always rebuild the wait event sets
In the previous implementation, on waitFlag changes, we were only
modifying the wait events. However, we've realized that it might
be an over optimization since (a) we couldn't see any performance
benefits (b) we see some errors on failures and because of (a)
we prefer to disable it now.
* Make sure to allocate enough sized waitEventSet
With multi-row INSERTs, we might have more sessions than
task*workerCount after few calls of RunDistributedExecution()
because the previous sessions would also be alive.
Instead, re-allocate events when the connectino set changes.
* Implement SELECT FOR UPDATE on reference tables
On master branch, we do two extra things on SELECT FOR UPDATE
queries on reference tables:
- Acquire executor locks
- Execute the query on all replicas
With this commit, we're implementing the same logic on the
new executor.
* SELECT FOR UPDATE opens transaction block even if SelectOpensTransactionBlock disabled
Otherwise, users would be very confused and their logic is very likely
to break.
* Fix build error
* Fix the newConnectionCount calculation in ManageWorkerPool
* Fix rebase conflicts
* Fix minor test output differences
* Fix citus indent
* Remove duplicate sorts that is added with rebase
* Create distributed table via executor
* Fix wait flags in CheckConnectionReady
* failure_savepoints output for unified executor.
* failure_vacuum output (pg 10) for unified executor.
* Fix WaitEventSetWait timeout in unified executor
* Stabilize failure_truncate test output
* Add an ORDER BY to multi_upsert
* Fix regression test outputs after rebase to master
* Add executor.c comment
* Rename executor.c to adaptive_executor.c
* Do not schedule tasks if the failed placement is not ready to execute
Before the commit, we were blindly scheduling the next placement executions
even if the failed placement is not on the ready queue. Now, we're ensuring
that if failed placement execution is on a failed pool or session where the
execution is on the pendingQueue, we do not schedule the next task. Because
the other placement execution should be already running.
* Implement a proper custom scan node for adaptive executor
- Switch between the executors, add GUC to set the pool size
- Add non-adaptive regression test suites
- Enable CIRCLE CI for non-adaptive tests
- Adjust test output files
* Add slow start interval to the executor
* Expose max_cached_connection_per_worker to user
* Do not start slow when there are cached connections
* Consider ExecutorSlowStartInterval in NextEventTimeout
* Fix memory issues with ReceiveResults().
* Disable executor via TaskExecutorType
* Make sure to execute the tests with the other executor
* Use task_executor_type to enable-disable adaptive executor
* Remove useless code
* Adjust the regression tests
* Add slow start regression test
* Rebase to master
* Fix test failures in adaptive executor.
* Rebase to master - 2
* Improve comments & debug messages
* Set force_max_query_parallelization in isolation_citus_dist_activity
* Force max parallelization for creating shards when asked to use exclusive connection.
* Adjust the default pool size
* Expand description of max_adaptive_executor_pool_size GUC
* Update warnings in FinishRemoteTransactionCommit()
* Improve session clean up at the end of execution
Explicitly list all the states that the execution might end,
otherwise warn.
* Remove MULTI_CONNECTION_WAIT_RETRY which is not used at all
* Add more ORDER BYs to multi_mx_partitioning
This is necessary for multi-row INSERTs for the same reasons we use it
in e.g. UPSERTs: if the range table list has more than one entry, then
PostgreSQL's deparse logic requires that vars be prefixed by the name
of their corresponding range table entry. This of course doesn't affect
single-row INSERTs, but since multi-row INSERTs have a VALUE RTE, they
were affected.
The piece of ruleutils which builds range table names wasn't modified
to handle shard extension; instead UPSERT/INSERT INTO ... SELECT added
an alias to the RTE. When present, this alias is favored. Doing the
same in the multi-row INSERT case fixes RETURNING for such commands.
This is a pretty substantial refactoring of the existing modify path
within the router executor and planner. In particular, we now hunt for
all VALUES range table entries in INSERT statements and group the rows
contained therein by shard identifier. These rows are stashed away for
later in "ModifyRoute" elements. During deparse, the appropriate RTE
is extracted from the Query and its values list is replaced by these
rows before any SQL is generated.
In this way, we can create multiple Tasks, but only one per shard, to
piecemeal execute a multi-row INSERT. The execution of jobs containing
such tasks now exclusively go through the "multi-router executor" which
was previously used for e.g. INSERT INTO ... SELECT.
By piggybacking onto that executor, we participate in ongoing trans-
actions, get rollback-ability, etc. In short order, the only remaining
use of the "single modify" router executor will be for bare single-
row INSERT statements (i.e. those not in a transaction).
This change appropriately handles deferred pruning as well as master-
evaluated functions.
This commit adds INSERT INTO ... SELECT feature for distributed tables.
We implement INSERT INTO ... SELECT by pushing down the SELECT to
each shard. To compute that we use the router planner, by adding
an "uninstantiated" constraint that the partition column be equal to a
certain value. standard_planner() distributes that constraint to all
the tables where it knows how to push the restriction safely. An example
is that the tables that are connected via equi joins.
The router planner then iterates over the target table's shards,
for each we replace the "uninstantiated" restriction, with one that
PruneShardList() handles. Do so by replacing the partitioning qual
parameter added in multi_planner() with the current shard's
actual boundary values. Also, add the current shard's boundary values to the
top level subquery to ensure that even if the partitioning qual is
not distributed to all the tables, we never run the queries on the shards
that don't match with the current shard boundaries. Finally, perform the
normal shard pruning to decide on whether to push the query to the
current shard or not.
We do not support certain SQLs on the subquery, which are described/commented
on ErrorIfInsertSelectQueryNotSupported().
We also added some locking on the router executor. When an INSERT/SELECT command
runs on a distributed table with replication factor >1, we need to ensure that
it sees the same result on each placement of a shard. So we added the ability
such that router executor takes exclusive locks on shards from which the SELECT
in an INSERT/SELECT reads in order to prevent concurrent changes. This is not a
very optimal solution, but it's simple and correct. The
citus.all_modifications_commutative can be used to avoid aggressive locking.
An INSERT/SELECT whose filters are known to exclude any ongoing writes can be
marked as commutative. See RequiresConsistentSnapshot() for the details.
We also moved the decison of whether the multiPlan should be executed on
the router executor or not to the planning phase. This allowed us to
integrate multi task router executor tasks to the router executor smoothly.
This adds support for SERIAL/BIGSERIAL column types. Because we now can
evaluate functions on the master (during execution), adding this is a
matter of ensuring the table creation step works properly.
To accomplish this, I've added some logic to detect sequences owned by
a table (i.e. those related to its columns). Simply creating a sequence
and using it in a default value is insufficient; users who do so must
ensure the sequence is owned by the column using it.
Fortunately, this is exactly what SERIAL and BIGSERIAL do, which is the
use case we're targeting with this feature. While testing this, I found
that worker_apply_shard_ddl_command actually adds shard identifiers to
sequence names, though I found no places that use or test this path. I
removed that code so that sequence names are not mutated and will match
those used by a SERIAL default value expression.
Our use of the new-to-9.5 CREATE SEQUENCE IF NOT EXISTS syntax means we
are dropping support for 9.4 (which is being done regardless, but makes
this change simpler). I've removed 9.4 from the Travis build matrix.
Some edge cases are possible in ALTER SEQUENCE, COPY FROM (on workers),
and CREATE SEQUENCE OWNED BY. I've added errors for each so that users
understand when and why certain operations are prohibited.
- Enables using VOLATILE functions (like nextval()) in INSERT queries
- Enables using STABLE functions (like now()) targetLists and joinTrees
UPDATE and INSERT can now contain non-immutable functions. INSERT can contain any kind of
expression, while UPDATE can contain any STABLE function, so long as a Var is not passed
into the STABLE function, even indirectly. UPDATE TagetEntry's can now also include Vars.
There's an exception, CASE/COALESCE statements may not contain mutable functions.
Functions calls in master_modify_multiple_shards are also evaluated.
Since we now short-circuit on certain remote errors, we want to ensure
we preserve the old behavior of not modifying any placement states if
a non-short-circuiting error occurs on all placements.
Fixes#271
This change sets ShardIds and JobIds for each test case. Before this change,
when a new test that somehow increments Job or Shard IDs is added, then
the tests after the new test should be updated.
ShardID and JobID sequences are set at the beginning of each file with the
following commands:
```
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 290000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 290000;
```
ShardIds and JobIds are multiples of 10000. Exceptions are:
- multi_large_shardid: shardid and jobid sequences are set to much larger values
- multi_fdw_large_shardid: same as above
- multi_join_pruning: Causes a race condition with multi_hash_pruning since
they are run in parallel.
Allow references to columns in UPDATE statements
Queries like "UPDATE tbl SET column = column + 1" are now allowed, so long as you don't use any IMMUTABLE functions.
- non-router plannable queries can be executed
by router executor if they satisfy the criteria
- router executor is removed from configuration,
now task executor can not be set to router
- removed some tests that error out for router executor
There already exist tests that locally embed knowledge about port
numbers, and there's more tests requiring that. Instead of copying
\set's to several tests, make these port number variables available to
all tests.
All citusdb references in
- extension, binary names
- file headers
- all configuration name prefixes
- error/warning messages
- some functions names
- regression tests
are changed to be citus.
This entirely removes any restriction on the type of partitioning
during DML planning and execution. Though there aren't actually any
technical limitations preventing DML commands against append- (or even
range-) partitioned tables, we had initially forbidden this, as any
future stage operation could cause shards to overlap, banning all
subsequent DML operations to partition values contained within more
than one shards. This ended up mostly restricting us, so we're now
removing that restriction.