CMDTAG_SELECT exists in PG12 hence defining a MACRO such as
CMDTAG_SELECT -> "SELECT" is not possible. I chose CMDTAG_SELECT_COMPAT
because with the COMPAT suffix it is explicit that it maps to different
things in different versions and also has a less chance of mapping
something irrevelant. For example if we used SELECT as a macro, then it
would map every SELECT to whatever it is mapping to, which might have
unexpected/undesired behaviour.
This commit mostly adds pg_get_triggerdef_command to our ruleutils_13.
This doesn't add anything extra for ruleutils 13 so it is basically a copy
of the change on ruleutils_12
Postgres introduced QueryCompletion struct. Hence a compat utility is
added to finish query completion for older versions and pg >= 13.
The commit on Postgres side:
2f9661311b83dc481fc19f6e3bda015392010a40
With PG13 heap_* (heap_open, heap_close etc) are replaced with table_*
(table_open, table_close etc).
It is better to use the new table access methods in the codebase and
define the macros for the previous versions as we can easily remove the
macro without having to change the codebase when we drop the support for
the old version.
Commits that introduced this change on Postgres:
f25968c49697db673f6cd2a07b3f7626779f1827
e0c4ec07284db817e1f8d9adfb3fffc952252db0
4b21acf522d751ba5b6679df391d5121b6c4a35f
Command to see relevant commits on Postgres side:
git log --all --grep="heap_open"
Pass the list to lnext API
lnext API now expects the list as well.
The commit on Postgres that introduced the change: 1cff1b95ab6ddae32faa3efe0d95a820dbfdc164
lnext_compat and list_delete_cell_compat macros are introduced so that
we can use these macros in the codebase without having to use #if
directives in the codebase.
Related commit on postgres:
1cff1b95ab6ddae32faa3efe0d95a820dbfdc164
Command to search in postgres:
git log --all --grep="list_delete_cell"
add ListCellAndListWrapper
When iterating a list in separate function calls, we need both the list
and the current cell starting from PG13, therefore
ListCellAndListWrapper is added to store both as a wrapper.
Use ListCellAndListWrapper in foreign key test udfs
As we iterate a list in these udfs using a functionContext, we need to
use the wrapper to be able to access both the list and the current cell.
With this patch, we introduce `locally_reserved_shared_connections.c/h` files
which are responsible for reserving some space in shared memory counters
upfront.
We sometimes need to reserve connections, but not necessarily
establish them. For example:
- COPY command should reserve connections as it cannot know which
connections it needs in which order. COPY establishes connections
as any input data hits the workers. For example, for router COPY
command, it only establishes 1 connection.
As discussed here (https://github.com/citusdata/citus/pull/3849#pullrequestreview-431792473),
COPY needs to reserve connections up-front, otherwise we can end
up with resource starvation/un-detected deadlocks.
* Use CalculateUniformHashRangeIndex in HashPartitionId
INT32_MIN definition can change among different platforms hence it is
possible to get overflow, we would see crashes because of this in debian
distros. We have already solved a similar problem with introducing
CalculateUniformHashRangeIndex method, hence to solve it we can use the
same method, this also removes some duplication and has a single place
to decide that.
* Use PG_INT32_XX instead of INT32_XX to be safer
1) Rename CONNECTION_PER_PLACEMENT to REQUIRE_CLEAN_CONNECTION. This is
mostly to make things clear as the new name reveals more.
2) We also make sure that mark all the copy connections critical,
even if they are accessed earlier in the transction
* use adaptive executor even if task-tracker is set
* Update check-multi-mx tests for adaptive executor
Basically repartition joins are enabled where necessary. For parallel
tests max adaptive executor pool size is decresed to 2, otherwise we
would get too many clients error.
* Update limit_intermediate_size test
It seems that when we use adaptive executor instead of task tracker, we
exceed the intermediate result size less in the test. Therefore updated
the tests accordingly.
* Update multi_router_planner
It seems that there is one problem with multi_router_planner when we use
adaptive executor, we should fix the following error:
+ERROR: relation "authors_range_840010" does not exist
+CONTEXT: while executing command on localhost:57637
* update repartition join tests for check-multi
* update isolation tests for repartitioning
* Error out if shard_replication_factor > 1 with repartitioning
As we are removing the task tracker, we cannot switch to it if
shard_replication_factor > 1. In that case, we simply error out.
* Remove MULTI_EXECUTOR_TASK_TRACKER
* Remove multi_task_tracker_executor
Some utility methods are moved to task_execution_utils.c.
* Remove task tracker protocol methods
* Remove task_tracker.c methods
* remove unused methods from multi_server_executor
* fix style
* remove task tracker specific tests from worker_schedule
* comment out task tracker udf calls in tests
We were using task tracker udfs to test permissions in
multi_multiuser.sql. We should find some other way to test them, then we
should remove the commented out task tracker calls.
* remove task tracker test from follower schedule
* remove task tracker tests from multi mx schedule
* Remove task-tracker specific functions from worker functions
* remove multi task tracker extra schedule
* Remove unused methods from multi physical planner
* remove task_executor_type related things in tests
* remove LoadTuplesIntoTupleStore
* Do initial cleanup for repartition leftovers
During startup, task tracker would call TrackerCleanupJobDirectories and
TrackerCleanupJobSchemas to clean up leftover directories and job
schemas. With adaptive executor, while doing repartitions it is possible
to leak these things as well. We don't retry cleanups, so it is possible
to have leftover in case of errors.
TrackerCleanupJobDirectories is renamed as
RepartitionCleanupJobDirectories since it is repartition specific now,
however TrackerCleanupJobSchemas cannot be used currently because it is
task tracker specific. The thing is that this function is a no-op
currently.
We should add cleaning up intermediate schemas to DoInitialCleanup
method when that problem is solved(We might want to solve it in this PR
as well)
* Revert "remove task tracker tests from multi mx schedule"
This reverts commit 03ecc0a681.
* update multi mx repartition parallel tests
* not error with task_tracker_conninfo_cache_invalidate
* not run 4 repartition queries in parallel
It seems that when we run 4 repartition queries in parallel we get too
many clients error on CI even though we don't get it locally. Our guess
is that, it is because we open/close many connections without doing some
work and postgres has some delay to close the connections. Hence even
though connections are removed from the pg_stat_activity, they might
still not be closed. If the above assumption is correct, it is unlikely
for it to happen in practice because:
- There is some network latency in clusters, so this leaves some times
for connections to be able to close
- Repartition joins return some data and that also leaves some time for
connections to be fully closed.
As we don't get this error in our local, we currently assume that it is
not a bug. Ideally this wouldn't happen when we get rid of the
task-tracker repartition methods because they don't do any pruning and
might be opening more connections than necessary.
If this still gives us "too many clients" error, we can try to increase
the max_connections in our test suite(which is 100 by default).
Also there are different places where this error is given in postgres,
but adding some backtrace it seems that we get this from
ProcessStartupPacket. The backtraces can be found in this link:
https://circleci.com/gh/citusdata/citus/138702
* Set distributePlan->relationIdList when it is needed
It seems that we were setting the distributedPlan->relationIdList after
JobExecutorType is called, which would choose task-tracker if
replication factor > 1 and there is a repartition query. However, it
uses relationIdList to decide if the query has a repartition query, and
since it was not set yet, it would always think it is not a repartition
query and would choose adaptive executor when it should choose
task-tracker.
* use adaptive executor even with shard_replication_factor > 1
It seems that we were already using adaptive executor when
replication_factor > 1. So this commit removes the check.
* remove multi_resowner.c and deprecate some settings
* remove TaskExecution related leftovers
* change deprecated API error message
* not recursively plan single relatition repartition subquery
* recursively plan single relation repartition subquery
* test depreceated task tracker functions
* fix overlapping shard intervals in range-distributed test
* fix error message for citus_metadata_container
* drop task-tracker deprecated functions
* put the implemantation back to worker_cleanup_job_schema_cachesince citus cloud uses it
* drop some functions, add downgrade script
Some deprecated functions are dropped.
Downgrade script is added.
Some gucs are deprecated.
A new guc for repartition joins bucket size is added.
* order by a test to fix flappiness
This can save a lot of data to be sent in some cases, thus improving
performance for which inter query bandwidth is the bottleneck.
There's some issues with enabling this as default, so that's currently not done.
We have two variables that are related to local execution status.
TransactionAccessedLocalPlacement and
TransactionConnectedToLocalGroup. Only one of these fields should be
set, however we didn't have any check for this contraint and it was
error prone.
What those two variables are used is that we are trying to understand if
we should use local execution, the current session, or if we should be
using a connection to execute the current query, therefore the tasks. In
the enum, now it is more clear what these variables mean.
Also, now we have a method to change the local execution status. The
method will error if we are trying to transition from a state to a wrong
state. This will help us avoid problems.
* explicitly return false if transaction connected to local node
* not set TransactionConnectedToLocalGroup if we are writing to a file
We use TransactionConnectedToLocalGroup to prevent local execution from
happening as that might cause visibility problems. As files are visible
to all transactions, we shouldn't set this variable if we are writing to
a file.
We have special logic to copy into intermediate results and we use a
custom format for that, "result" copy format. Postgres internally does
not know this format and if we use this locally it will error saying
that it does not know this format.
Files are visible to all transactions, which means that we can use any
connection to access files. In order to use the existing logic, it makes
sense that in case we have intermediate results, which means we will
write the results to a file, we preserve the same behavior, which is
opening connections to localhost. Therefore if we have intermediate
results we return false in ShouldExecuteCopyLocally.
If current transaction is connected to local group we should not use
local copy, because we might not see some of the changes that are made
over the connection to the local group.
A copy will be executed locally if
- Local execution is enabled and current transaction accessed a local placement
- Local execution is enabled and we are inside a transaction block.
So even if local execution is enabled but we are not in a transaction block, the copy will not be run locally.
This will not run locally:
```
COPY distributed_table FROM STDIN;
....
```
This will run locally:
```
SET citus.enable_local_execution to 'on';
BEGIN;
COPY distributed_table FROM STDIN;
COMMIT;
....
```
.
There are 3 ways to do a copy in postgres programmatically:
- from a file
- from a program
- from a callback function
I have chosen to implement it with a callback function, which means that we write the rows of copy from a callback function to the output buffer, which is used to insert tuples into the actual table.
For each shard id, we have a buffer that keeps the current rows to be written, we perform the actual copy operation either when:
- copy buffer for the given shard id reaches to a threshold, which is currently 512KB
- we reach to the end of the copy
The buffer size is debatable(512KB). At a given time, we might allocate (local placement * buffer size) memory at most.
The local copy uses the same copy format as remote copy, which means that we serialize the data in the same format as remote copy and send it locally.
There was also the option to use ExecSimpleRelationInsert to insert
slots one by one, which would avoid the extra
serialization/deserialization but doing some benchmarks it seems that
using buffers are significantly better in terms of the performance.
You can see this comment for more details: https://github.com/citusdata/citus/pull/3557#discussion_r389499054
If the generated column does not come at the end of the column list,
columnNameList doesn't line up with the column indexes. Seek past
CREATE TABLE test_table (
test_id int PRIMARY KEY,
gen_n int GENERATED ALWAYS AS (1) STORED,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
SELECT create_distributed_table('test_table', 'test_id');
Would raise ERROR: cannot cast 23 to 1184
* Update shardPlacement->nodeId to uint
As the source of the shardPlacement->nodeId is always workerNode->nodeId,
and that is uint32.
We had this hack because of: 0ea4e52df5 (r266421409)
And, that is gone with: 90056f7d3c (diff-c532177d74c72d3f0e7cd10e448ab3c6L1123)
So, we're safe to do it now.
* Relax the restrictions on using the local execution
Previously, whenever any local execution happens, we disabled further
commands to do any remote queries. The basic motivation for doing that
is to prevent any accesses in the same transaction block to access the
same placements over multiple sessions: one is local session the other
is remote session to the same placement.
However, the current implementation does not distinguish local accesses
being to a placement or not. For example, we could have local accesses
that only touches intermediate results. In that case, we should not
implement the same restrictions as they become useless.
So, this is a pre-requisite for executing the intermediate result only
queries locally.
* Update the error messages
As the underlying implementation has changed, reflect it in the error
messages.
* Keep track of connections to local node
With this commit, we're adding infrastructure to track if any connection
to the same local host is done or not.
The main motivation for doing this is that we've previously were more
conservative about not choosing local execution. Simply, we disallowed
local execution if any connection to any remote node is done. However,
if we want to use local execution for intermediate result only queries,
this'd be annoying because we expect all queries to touch remote node
before the final query.
Note that this approach is still limiting in Citus MX case, but for now
we can ignore that.
* Formalize the concept of Local Node
Also some minor refactoring while creating the dummy placement
* Write intermediate results locally when the results are only needed locally
Before this commit, Citus used to always broadcast all the intermediate
results to remote nodes. However, it is possible to skip pushing
the results to remote nodes always.
There are two notable cases for doing that:
(a) When the query consists of only intermediate results
(b) When the query is a zero shard query
In both of the above cases, we don't need to access any data on the shards. So,
it is a valuable optimization to skip pushing the results to remote nodes.
The pattern mentioned in (a) is actually a common patterns that Citus users
use in practice. For example, if you have the following query:
WITH cte_1 AS (...), cte_2 AS (....), ... cte_n (...)
SELECT ... FROM cte_1 JOIN cte_2 .... JOIN cte_n ...;
The final query could be operating only on intermediate results. With this patch,
the intermediate results of the ctes are not unnecessarily pushed to remote
nodes.
* Add specific regression tests
As there are edge cases in Citus MX and with round-robin policy,
use the same queries on those cases as well.
* Fix failure tests
By forcing not to use local execution for intermediate results since
all the tests expects the results to be pushed remotely.
* Fix flaky test
* Apply code-review feedback
Mostly style changes
* Limit the max value of pg_dist_node_seq to reserve for internal use
* Remove unused executor codes
All of the codes of real-time executor. Some functions
in router executor still remains there because there
are common functions. We'll move them to accurate places
in the follow-up commits.
* Move GUCs to transaction mngnt and remove unused struct
* Update test output
* Get rid of references of real-time executor from code
* Warn if real-time executor is picked
* Remove lots of unused connection codes
* Removed unused code for connection restrictions
Real-time and router executors cannot handle re-using of the existing
connections within a transaction block.
Adaptive executor and COPY can re-use the connections. So, there is no
reason to keep the code around for applying the restrictions in the
placement connection logic.
/*
* local_executor.c
*
* The scope of the local execution is locally executing the queries on the
* shards. In other words, local execution does not deal with any local tables
* that are not shards on the node that the query is being executed. In that sense,
* the local executor is only triggered if the node has both the metadata and the
* shards (e.g., only Citus MX worker nodes).
*
* The goal of the local execution is to skip the unnecessary network round-trip
* happening on the node itself. Instead, identify the locally executable tasks and
* simply call PostgreSQL's planner and executor.
*
* The local executor is an extension of the adaptive executor. So, the executor uses
* adaptive executor's custom scan nodes.
*
* One thing to note that Citus MX is only supported with replication factor = 1, so
* keep that in mind while continuing the comments below.
*
* On the high level, there are 3 slightly different ways of utilizing local execution:
*
* (1) Execution of local single shard queries of a distributed table
*
* This is the simplest case. The executor kicks at the start of the adaptive
* executor, and since the query is only a single task the execution finishes
* without going to the network at all.
*
* Even if there is a transaction block (or recursively planned CTEs), as long
* as the queries hit the shards on the same, the local execution will kick in.
*
* (2) Execution of local single queries and remote multi-shard queries
*
* The rule is simple. If a transaction block starts with a local query execution,
* all the other queries in the same transaction block that touch any local shard
* have to use the local execution. Although this sounds restrictive, we prefer to
* implement in this way, otherwise we'd end-up with as complex scenarious as we
* have in the connection managements due to foreign keys.
*
* See the following example:
* BEGIN;
* -- assume that the query is executed locally
* SELECT count(*) FROM test WHERE key = 1;
*
* -- at this point, all the shards that reside on the
* -- node is executed locally one-by-one. After those finishes
* -- the remaining tasks are handled by adaptive executor
* SELECT count(*) FROM test;
*
*
* (3) Modifications of reference tables
*
* Modifications to reference tables have to be executed on all nodes. So, after the
* local execution, the adaptive executor keeps continuing the execution on the other
* nodes.
*
* Note that for read-only queries, after the local execution, there is no need to
* kick in adaptive executor.
*
* There are also few limitations/trade-offs that is worth mentioning. First, the
* local execution on multiple shards might be slow because the execution has to
* happen one task at a time (e.g., no parallelism). Second, if a transaction
* block/CTE starts with a multi-shard command, we do not use local query execution
* since local execution is sequential. Basically, we do not want to lose parallelism
* across local tasks by switching to local execution. Third, the local execution
* currently only supports queries. In other words, any utility commands like TRUNCATE,
* fails if the command is executed after a local execution inside a transaction block.
* Forth, the local execution cannot be mixed with the executors other than adaptive,
* namely task-tracker, real-time and router executors. Finally, related with the
* previous item, COPY command cannot be mixed with local execution in a transaction.
* The implication of that any part of INSERT..SELECT via coordinator cannot happen
* via the local execution.
*/
Before this commit, we've recorded the relation accesses in 3 different
places
- FindPlacementListConnection -- applies all executor in tx block
- StartPlacementExecutionOnSession() -- adaptive executor only
- StartPlacementListConnection() -- router/real-time only
This is different than Citus 8.2, and could lead to query execution times
increase considerably on multi-shard commands in transaction block
that are on partitioned tables.
Benchmarks:
```
1+8 c5.4xlarge cluster
Empty distributed partitioned table with 365 partitions: https://gist.github.com/onderkalaci/1edace4ed6bd6f061c8a15594865bb51#file-partitions_365-sql
./pgbench -f /tmp/multi_shard.sql -c10 -j10 -P 1 -T 120 postgres://citus:w3r6KLJpv3mxe9E-NIUeJw@c.fy5fkjcv45vcepaogqcaskmmkee.db.citusdata.com:5432/citus?sslmode=require
cat /tmp/multi_shard.sql
BEGIN;
DELETE FROM collections_list;
DELETE FROM collections_list;
DELETE FROM collections_list;
COMMIT;
cat /tmp/single_shard.sql
BEGIN;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list WHERE key = :aid;
COMMIT;
cat /tmp/mix.sql
BEGIN;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list WHERE key = :aid;
DELETE FROM collections_list;
DELETE FROM collections_list;
DELETE FROM collections_list;
COMMIT;
```
The table shows `latency average` of pgbench runs explained above, so we have a pretty solid improvement even over 8.2.2.
| Test | Citus 8.2.2 | Citus 8.3.1 | Citus 8.3.2 (this branch) | Citus 8.3.1 (FKEYs disabled via GUC) |
| ------------- | ------------- | ------------- |------------- | ------------- |
|multi_shard | 2370.083 ms |3605.040 ms |1324.094 ms |1247.255 ms |
| single_shard | 85.338 ms |120.934 ms |73.216 ms | 78.765 ms |
| mix | 2434.459 ms | 3727.080 ms |1306.456 ms | 1280.326 ms |
With this commit, we're introducing the Adaptive Executor.
The commit message consists of two distinct sections. The first part explains
how the executor works. The second part consists of the commit messages of
the individual smaller commits that resulted in this commit. The readers
can search for the each of the smaller commit messages on
https://github.com/citusdata/citus and can learn more about the history
of the change.
/*-------------------------------------------------------------------------
*
* adaptive_executor.c
*
* The adaptive executor executes a list of tasks (queries on shards) over
* a connection pool per worker node. The results of the queries, if any,
* are written to a tuple store.
*
* The concepts in the executor are modelled in a set of structs:
*
* - DistributedExecution:
* Execution of a Task list over a set of WorkerPools.
* - WorkerPool
* Pool of WorkerSessions for the same worker which opportunistically
* executes "unassigned" tasks from a queue.
* - WorkerSession:
* Connection to a worker that is used to execute "assigned" tasks
* from a queue and may execute unasssigned tasks from the WorkerPool.
* - ShardCommandExecution:
* Execution of a Task across a list of placements.
* - TaskPlacementExecution:
* Execution of a Task on a specific placement.
* Used in the WorkerPool and WorkerSession queues.
*
* Every connection pool (WorkerPool) and every connection (WorkerSession)
* have a queue of tasks that are ready to execute (readyTaskQueue) and a
* queue/set of pending tasks that may become ready later in the execution
* (pendingTaskQueue). The tasks are wrapped in a ShardCommandExecution,
* which keeps track of the state of execution and is referenced from a
* TaskPlacementExecution, which is the data structure that is actually
* added to the queues and describes the state of the execution of a task
* on a particular worker node.
*
* When the task list is part of a bigger distributed transaction, the
* shards that are accessed or modified by the task may have already been
* accessed earlier in the transaction. We need to make sure we use the
* same connection since it may hold relevant locks or have uncommitted
* writes. In that case we "assign" the task to a connection by adding
* it to the task queue of specific connection (in
* AssignTasksToConnections). Otherwise we consider the task unassigned
* and add it to the task queue of a worker pool, which means that it
* can be executed over any connection in the pool.
*
* A task may be executed on multiple placements in case of a reference
* table or a replicated distributed table. Depending on the type of
* task, it may not be ready to be executed on a worker node immediately.
* For instance, INSERTs on a reference table are executed serially across
* placements to avoid deadlocks when concurrent INSERTs take conflicting
* locks. At the beginning, only the "first" placement is ready to execute
* and therefore added to the readyTaskQueue in the pool or connection.
* The remaining placements are added to the pendingTaskQueue. Once
* execution on the first placement is done the second placement moves
* from pendingTaskQueue to readyTaskQueue. The same approach is used to
* fail over read-only tasks to another placement.
*
* Once all the tasks are added to a queue, the main loop in
* RunDistributedExecution repeatedly does the following:
*
* For each pool:
* - ManageWorkPool evaluates whether to open additional connections
* based on the number unassigned tasks that are ready to execute
* and the targetPoolSize of the execution.
*
* Poll all connections:
* - We use a WaitEventSet that contains all (non-failed) connections
* and is rebuilt whenever the set of active connections or any of
* their wait flags change.
*
* We almost always check for WL_SOCKET_READABLE because a session
* can emit notices at any time during execution, but it will only
* wake up WaitEventSetWait when there are actual bytes to read.
*
* We check for WL_SOCKET_WRITEABLE just after sending bytes in case
* there is not enough space in the TCP buffer. Since a socket is
* almost always writable we also use WL_SOCKET_WRITEABLE as a
* mechanism to wake up WaitEventSetWait for non-I/O events, e.g.
* when a task moves from pending to ready.
*
* For each connection that is ready:
* - ConnectionStateMachine handles connection establishment and failure
* as well as command execution via TransactionStateMachine.
*
* When a connection is ready to execute a new task, it first checks its
* own readyTaskQueue and otherwise takes a task from the worker pool's
* readyTaskQueue (on a first-come-first-serve basis).
*
* In cases where the tasks finish quickly (e.g. <1ms), a single
* connection will often be sufficient to finish all tasks. It is
* therefore not necessary that all connections are established
* successfully or open a transaction (which may be blocked by an
* intermediate pgbouncer in transaction pooling mode). It is therefore
* essential that we take a task from the queue only after opening a
* transaction block.
*
* When a command on a worker finishes or the connection is lost, we call
* PlacementExecutionDone, which then updates the state of the task
* based on whether we need to run it on other placements. When a
* connection fails or all connections to a worker fail, we also call
* PlacementExecutionDone for all queued tasks to try the next placement
* and, if necessary, mark shard placements as inactive. If a task fails
* to execute on all placements, the execution fails and the distributed
* transaction rolls back.
*
* For multi-row INSERTs, tasks are executed sequentially by
* SequentialRunDistributedExecution instead of in parallel, which allows
* a high degree of concurrency without high risk of deadlocks.
* Conversely, multi-row UPDATE/DELETE/DDL commands take aggressive locks
* which forbids concurrency, but allows parallelism without high risk
* of deadlocks. Note that this is unrelated to SEQUENTIAL_CONNECTION,
* which indicates that we should use at most one connection per node, but
* can run tasks in parallel across nodes. This is used when there are
* writes to a reference table that has foreign keys from a distributed
* table.
*
* Execution finishes when all tasks are done, the query errors out, or
* the user cancels the query.
*
*-------------------------------------------------------------------------
*/
All the commits involved here:
* Initial unified executor prototype
* Latest changes
* Fix rebase conflicts to master branch
* Add missing variable for assertion
* Ensure that master_modify_multiple_shards() returns the affectedTupleCount
* Adjust intermediate result sizes
The real-time executor uses COPY command to get the results
from the worker nodes. Unified executor avoids that which
results in less data transfer. Simply adjust the tests to lower
sizes.
* Force one connection per placement (or co-located placements) when requested
The existing executors (real-time and router) always open 1 connection per
placement when parallel execution is requested.
That might be useful under certain circumstances:
(a) User wants to utilize as much as CPUs on the workers per
distributed query
(b) User has a transaction block which involves COPY command
Also, lots of regression tests rely on this execution semantics.
So, we'd enable few of the tests with this change as well.
* For parameters to be resolved before using them
For the details, see PostgreSQL's copyParamList()
* Unified executor sorts the returning output
* Ensure that unified executor doesn't ignore sequential execution of DDLJob's
Certain DDL commands, mainly creating foreign keys to reference tables,
should be executed sequentially. Otherwise, we'd end up with a self
distributed deadlock.
To overcome this situaiton, we set a flag `DDLJob->executeSequentially`
and execute it sequentially. Note that we have to do this because
the command might not be called within a transaction block, and
we cannot call `SetLocalMultiShardModifyModeToSequential()`.
This fixes at least two test: multi_insert_select_on_conflit.sql and
multi_foreign_key.sql
Also, I wouldn't mind scattering local `targetPoolSize` variables within
the code. The reason is that we'll soon have a GUC (or a global
variable based on a GUC) that'd set the pool size. In that case, we'd
simply replace `targetPoolSize` with the global variables.
* Fix 2PC conditions for DDL tasks
* Improve closing connections that are not fully established in unified execution
* Support foreign keys to reference tables in unified executor
The idea for supporting foreign keys to reference tables is simple:
Keep track of the relation accesses within a transaction block.
- If a parallel access happens on a distributed table which
has a foreign key to a reference table, one cannot modify
the reference table in the same transaction. Otherwise,
we're very likely to end-up with a self-distributed deadlock.
- If an access to a reference table happens, and then a parallel
access to a distributed table (which has a fkey to the reference
table) happens, we switch to sequential mode.
Unified executor misses the function calls that marks the relation
accesses during the execution. Thus, simply add the necessary calls
and let the logic kick in.
* Make sure to close the failed connections after the execution
* Improve comments
* Fix savepoints in unified executor.
* Rebuild the WaitEventSet only when necessary
* Unclaim connections on all errors.
* Improve failure handling for unified executor
- Implement the notion of errorOnAnyFailure. This is similar to
Critical Connections that the connection managament APIs provide
- If the nodes inside a modifying transaction expand, activate 2PC
- Fix few bugs related to wait event sets
- Mark placement INACTIVE during the execution as much as possible
as opposed to we do in the COMMIT handler
- Fix few bugs related to scheduling next placement executions
- Improve decision on when to use 2PC
Improve the logic to start a transaction block for distributed transactions
- Make sure that only reference table modifications are always
executed with distributed transactions
- Make sure that stored procedures and functions are executed
with distributed transactions
* Move waitEventSet to DistributedExecution
This could also be local to RunDistributedExecution(), but in that case
we had to mark it as "volatile" to avoid PG_TRY()/PG_CATCH() issues, and
cast it to non-volatile when doing WaitEventSetFree(). We thought that
would make code a bit harder to read than making this non-local, so we
move it here. See comments for PG_TRY() in postgres/src/include/elog.h
and "man 3 siglongjmp" for more context.
* Fix multi_insert_select test outputs
Two things:
1) One complex transaction block is now supported. Simply update
the test output
2) Due to dynamic nature of the unified executor, the orders of
the errors coming from the shards might change (e.g., all of
the queries on the shards would fail, but which one appears
on the error message?). To fix that, we simply added it to
our shardId normalization tool which happens just before diff.
* Fix subeury_and_cte test
The error message is updated from:
failed to execute task
To:
more than one row returned by a subquery or an expression
which is a lot clearer to the user.
* Fix intermediate_results test outputs
Simply update the error message from:
could not receive query results
to
result "squares" does not exist
which makes a lot more sense.
* Fix multi_function_in_join test
The error messages update from:
Failed to execute task XXX
To:
function f(..) does not exist
* Fix multi_query_directory_cleanup test
The unified executor does not create any intermediate files.
* Fix with_transactions test
A test case that just started to work fine
* Fix multi_router_planner test outputs
The error message is update from:
Could not receive query results
To:
Relation does not exists
which is a lot more clearer for the users
* Fix multi_router_planner_fast_path test
The error message is update from:
Could not receive query results
To:
Relation does not exists
which is a lot more clearer for the users
* Fix isolation_copy_placement_vs_modification by disabling select_opens_transaction_block
* Fix ordering in isolation_multi_shard_modify_vs_all
* Add executor locks to unified executor
* Make sure to allocate enought WaitEvents
The previous code was missing the waitEvents for the latch and
postmaster death.
* Fix rebase conflicts for master rebase
* Make sure that TRUNCATE relies on unified executor
* Implement true sequential execution for multi-row INSERTS
Execute the individual tasks executed one by one. Note that this is different than
MultiShardConnectionType == SEQUENTIAL_CONNECTION case (e.g., sequential execution
mode). In that case, running the tasks across the nodes in parallel is acceptable
and implemented in that way.
However, the executions that are qualified here would perform poorly if the
tasks across the workers are executed in parallel. We currently qualify only
one class of distributed queries here, multi-row INSERTs. If we do not enforce
true sequential execution, concurrent multi-row upserts could easily form
a distributed deadlock when the upserts touch the same rows.
* Remove SESSION_LIFESPAN flag in unified_executor
* Apply failure test updates
We've changed the failure behaviour a bit, and also the error messages
that show up to the user. This PR covers majority of the updates.
* Unified executor honors citus.node_connection_timeout
With this commit, unified executor errors out if even
a single connection cannot be established within
citus.node_connection_timeout.
And, as a side effect this fixes failure_connection_establishment
test.
* Properly increment/decrement pool size variables
Before this commit, the idle and active connection
counts were not properly calculated.
* insert_select_executor goes through unified executor.
* Add missing file for task tracker
* Modify ExecuteTaskListExtended()'s signature
* Sort output of INSERT ... SELECT ... RETURNING
* Take partition locks correctly in unified executor
* Alternative implementation for force_max_query_parallelization
* Fix compile warnings in unified executor
* Fix style issues
* Decrement idleConnectionCount when idle connection is lost
* Always rebuild the wait event sets
In the previous implementation, on waitFlag changes, we were only
modifying the wait events. However, we've realized that it might
be an over optimization since (a) we couldn't see any performance
benefits (b) we see some errors on failures and because of (a)
we prefer to disable it now.
* Make sure to allocate enough sized waitEventSet
With multi-row INSERTs, we might have more sessions than
task*workerCount after few calls of RunDistributedExecution()
because the previous sessions would also be alive.
Instead, re-allocate events when the connectino set changes.
* Implement SELECT FOR UPDATE on reference tables
On master branch, we do two extra things on SELECT FOR UPDATE
queries on reference tables:
- Acquire executor locks
- Execute the query on all replicas
With this commit, we're implementing the same logic on the
new executor.
* SELECT FOR UPDATE opens transaction block even if SelectOpensTransactionBlock disabled
Otherwise, users would be very confused and their logic is very likely
to break.
* Fix build error
* Fix the newConnectionCount calculation in ManageWorkerPool
* Fix rebase conflicts
* Fix minor test output differences
* Fix citus indent
* Remove duplicate sorts that is added with rebase
* Create distributed table via executor
* Fix wait flags in CheckConnectionReady
* failure_savepoints output for unified executor.
* failure_vacuum output (pg 10) for unified executor.
* Fix WaitEventSetWait timeout in unified executor
* Stabilize failure_truncate test output
* Add an ORDER BY to multi_upsert
* Fix regression test outputs after rebase to master
* Add executor.c comment
* Rename executor.c to adaptive_executor.c
* Do not schedule tasks if the failed placement is not ready to execute
Before the commit, we were blindly scheduling the next placement executions
even if the failed placement is not on the ready queue. Now, we're ensuring
that if failed placement execution is on a failed pool or session where the
execution is on the pendingQueue, we do not schedule the next task. Because
the other placement execution should be already running.
* Implement a proper custom scan node for adaptive executor
- Switch between the executors, add GUC to set the pool size
- Add non-adaptive regression test suites
- Enable CIRCLE CI for non-adaptive tests
- Adjust test output files
* Add slow start interval to the executor
* Expose max_cached_connection_per_worker to user
* Do not start slow when there are cached connections
* Consider ExecutorSlowStartInterval in NextEventTimeout
* Fix memory issues with ReceiveResults().
* Disable executor via TaskExecutorType
* Make sure to execute the tests with the other executor
* Use task_executor_type to enable-disable adaptive executor
* Remove useless code
* Adjust the regression tests
* Add slow start regression test
* Rebase to master
* Fix test failures in adaptive executor.
* Rebase to master - 2
* Improve comments & debug messages
* Set force_max_query_parallelization in isolation_citus_dist_activity
* Force max parallelization for creating shards when asked to use exclusive connection.
* Adjust the default pool size
* Expand description of max_adaptive_executor_pool_size GUC
* Update warnings in FinishRemoteTransactionCommit()
* Improve session clean up at the end of execution
Explicitly list all the states that the execution might end,
otherwise warn.
* Remove MULTI_CONNECTION_WAIT_RETRY which is not used at all
* Add more ORDER BYs to multi_mx_partitioning