Commit Graph

125 Commits (44204ec9f1c85796950af115002bb2b9c50039e0)

Author SHA1 Message Date
Philip Dubé cc50682158 Fix typos. Spurred spotting "connectios" in logs 2021-10-25 13:54:09 +00:00
Onder Kalaci ce4c4540c5 Simplify 2PC decision in the executor
It seems like the decision for 2PC is more complicated than
it should be.

With this change, we do one behavioral change. In essense,
before this commit, when a SELECT task with replication factor > 1
is executed, the executor was triggering 2PC. And, in fact,
the transaction manager (`ConnectionModifiedPlacement()`) was
able to understand not to trigger 2PC when no modification happens.

However, for transaction blocks like:
BEGIN;
-- a command that triggers 2PC
-- A SELECT command on replication > 1
..
COMMIT;

The SELECT was used to be qualified as required 2PC. And, as a side-effect
the executor was setting `xactProperties.errorOnAnyFailure = true;`

So, the commands was failing at the time of execution. Now, they fail at
the end of the transaction.
2021-10-23 09:06:28 +02:00
Onder Kalaci 575bb6dde9 Drop support for Inactive Shard placements
Given that we do all operations via 2PC, there is no way
for any placement to be marked as INACTIVE.
2021-10-22 18:03:35 +02:00
Önder Kalacı b3299de81c
Drop support for citus.multi_shard_commit_protocol (#5380)
In the past, we allowed users to manually switch to 1PC
(e.g., one phase commit). However, with this commit, we
don't. All multi-shard modifications are done via 2PC.
2021-10-21 14:01:28 +02:00
Önder Kalacı 3f726c72e0
When replication factor > 1, all modifications are done via 2PC (#5379)
With Citus 9.0, we introduced `citus.single_shard_commit_protocol` which
defaults to 2PC.

With this commit, we prevent any user to set it to 1PC and drop support
for `citus.single_shard_commit_protocol`.

Although this might add some overhead for users, it is already the default
behaviour (so less likely) and marking placements as INVALID is much
worse.
2021-10-20 01:39:03 -07:00
Halil Ozan Akgul 43d5853b6d Fixes function names in comments 2021-10-06 09:24:43 +03:00
Jelte Fennema bb5c494104 Enable binary encoding by default on PG14
Since PG14 we can now use binary encoding for arrays and composite types
that contain user defined types. This was fixed in this commit in
Postgres: 670c0a1d47

This change starts using that knowledge, by not necessarily falling back
to text encoding anymore for those types.

While doing this and testing a bit more I found various cases where
binary encoding would fail that our checks didn't cover. This fixes
those cases and adds tests for those. It also fixes EXPLAIN ANALYZE
never using binary encoding, which was a leftover of workaround that
was not necessary anymore.

Finally, it changes the default for both `citus.enable_binary_protocol`
and `citus.binary_worker_copy_format` to `true` for PG14 and up. In our
cloud offering `binary_worker_copy_format` already was true by default.
`enable_binary_protocol` had some bug with MX and user defined types,
this bug was fixed by the above mentioned fixes.
2021-09-06 10:27:29 +02:00
Onder Kalaci 86bd28b92c Guard against hard WaitEvenSet errors
In short, add wrappers around Postgres' AddWaitEventToSet() and
ModifyWaitEvent().

AddWaitEventToSet()/ModifyWaitEvent*() may throw hard errors. For
example, when the underlying socket for a connection is closed by
the remote server and already reflected by the OS, however
Citus hasn't had a chance to get this information. In that case,
if replication factor is >1, Citus can failover to other nodes
for executing the query. Even if replication factor = 1, Citus
can give much nicer errors.

So CitusAddWaitEventSetToSet()/CitusModifyWaitEvent() simply puts
AddWaitEventToSet()/ModifyWaitEvent() into a PG_TRY/PG_CATCH block
in order to catch any hard errors, and returns this information to
the caller.
2021-08-10 09:35:03 +02:00
SaitTalhaNisanci a4944a2102
Rename CoordinatedTransactionShouldUse2PC (#4995) 2021-05-21 18:57:42 +03:00
Onder Kalaci 926069a859 Wait until all connections are successfully established
Comment from the code:
/*
 * Iterate until all the tasks are finished. Once all the tasks
 * are finished, ensure that that all the connection initializations
 * are also finished. Otherwise, those connections are terminated
 * abruptly before they are established (or failed). Instead, we let
 * the ConnectionStateMachine() to properly handle them.
 *
 * Note that we could have the connections that are not established
 * as a side effect of slow-start algorithm. At the time the algorithm
 * decides to establish new connections, the execution might have tasks
 * to finish. But, the execution might finish before the new connections
 * are established.
 */

 Note that the abruptly terminated connections lead to the following errors:

2020-11-16 21:09:09.800 CET [16633] LOG:  could not accept SSL connection: Connection reset by peer
2020-11-16 21:09:09.872 CET [16657] LOG:  could not accept SSL connection: Undefined error: 0
2020-11-16 21:09:09.894 CET [16667] LOG:  could not accept SSL connection: Connection reset by peer

To easily reproduce the issue:

- Create a single node Citus
- Add the coordinator to the metadata
- Create a distributed table with shards on the coordinator
- f.sql:  select count(*) from test;
- pgbench -f /tmp/f.sql postgres -T 12 -c 40 -P 1  or pgbench -f /tmp/f.sql postgres -T 12 -c 40 -P 1 -C
2021-05-19 15:59:13 +02:00
Onder Kalaci 995adf1a19 Executor takes connection establishment and task execution costs into account
With this commit, the executor becomes smarter about refrain to open
new connections. The very basic example is that, if the connection
establishments take 1000ms and task executions as 5 msecs, the executor
becomes smart enough to not establish new connections.
2021-05-19 15:48:07 +02:00
Onder Kalaci 28b0b4ebd1 Move slow start increment to generic place 2021-05-19 14:31:20 +02:00
Marco Slot 00792831ad Add execution memory contexts and free after local query execution 2021-05-18 16:11:43 +02:00
SaitTalhaNisanci ff2a125a5b
Lookup hostname before execution (#4976)
We lookup the hostname just before the execution so that even if there are cached entries in the prepared statement cache we use the updated entries.
2021-05-18 16:46:31 +03:00
Onder Kalaci cc4870a635 Remove wrong PG_USED_FOR_ASSERTS_ONLY 2021-05-11 12:58:37 +02:00
Onder Kalaci 5482d5822f Keep more statistics about connection establishment times
When DEBUG4 enabled, Citus now prints per connection establishment
time.
2021-04-16 14:56:31 +02:00
Onder Kalaci 5b78f6cd63 Keep more execution statistics
When DEBUG4 enabled, Citus now prints per task execution times.
2021-04-16 14:45:00 +02:00
Onder Kalaci e65e72130d Rename use -> shouldUse
Because setting the flag doesn't necessarily mean that we'll
use 2PC. If connections are read-only, we will not use 2PC.
In other words, we'll use 2PC only for connections that modified
any placements.
2021-03-12 08:29:43 +00:00
Onder Kalaci 6a7ed7b309 Do not trigger 2PC for reads on local execution
Before this commit, Citus used 2PC no matter what kind of
local query execution happens.

For example, if the coordinator has shards (and the workers as well),
even a simple SELECT query could start 2PC:
```SQL

WITH cte_1 AS (SELECT * FROM test LIMIT 10) SELECT count(*) FROM cte_1;
```

In this query, the local execution of the shards (and also intermediate
result reads) triggers the 2PC.

To prevent that, Citus now distinguishes local reads and local writes.
And, Citus switches to 2PC only if a modification happens. This may
still lead to unnecessary 2PCs when there is a local modification
and remote SELECTs only. Though, we handle that separately
via #4587.
2021-03-12 08:29:43 +00:00
Philip Dubé 4e22f02997 Fix various typos due to zealous repetition 2021-03-04 19:28:15 +00:00
Nils Dijk d127516dc8
Mitigate segfault in connection statemachine (#4551)
As described in the comment, we have observed crashes in production
due to a segfault caused by the dereference of a NULL pointer in our
connection statemachine.

As a mitigation, preventing system crashes, we provide an error with
a small explanation of the issue. Unfortunately the case is not
reliably reproduced yet, hence the inability to add tests.

DESCRIPTION: Prevent segfaults when SAVEPOINT handling cannot recover from connection failures
2021-01-25 15:55:04 +01:00
Onder Kalaci c546ec5e78 Local node connection management
When Citus needs to parallelize queries on the local node (e.g., the node
executing the distributed query and the shards are the same), we need to
be mindful about the connection management. The reason is that the client
backends that are running distributed queries are competing with the client
backends that Citus initiates to parallelize the queries in order to get
a slot on the max_connections.

In that regard, we implemented a "failover" mechanism where if the distributed
queries cannot get a connection, the execution failovers the tasks to the local
execution.

The failover logic is follows:

- As the connection manager if it is OK to get a connection
	- If yes, we are good.
	- If no, we fail the workerPool and the failure triggers
	  the failover of the tasks to local execution queue

The decision of getting a connection is follows:

/*
 * For local nodes, solely relying on citus.max_shared_pool_size or
 * max_connections might not be sufficient. The former gives us
 * a preview of the future (e.g., we let the new connections to establish,
 * but they are not established yet). The latter gives us the close to
 * precise view of the past (e.g., the active number of client backends).
 *
 * Overall, we want to limit both of the metrics. The former limit typically
 * kics in under regular loads, where the load of the database increases in
 * a reasonable pace. The latter limit typically kicks in when the database
 * is issued lots of concurrent sessions at the same time, such as benchmarks.
 */
2020-12-03 14:16:13 +03:00
Onder Kalaci f7e1aa3f22 Multi-row INSERTs use local execution when placements are local
Multi-row execution already uses sequential execution. When shards
are local, using local execution is profitable as it avoids
an extra connection establishment to the local node.
2020-12-01 21:37:59 +03:00
Önder Kalacı c760cd3470
Move local execution after remote execution (#4301)
* Move local execution after the remote execution

Before this commit, when both local and remote tasks
exist, the executor was starting the execution with
local execution. There is no strict requirements on
this.

Especially considering the adaptive connection management
improvements that we plan to roll soon, moving the local
execution after to the remote execution makes more sense.

The adaptive connection management for single node Citus
would look roughly as follows:

   - Try to connect back to the coordinator for running
     parallel queries.
        - If succeeds, go on and execute tasks in parallel
        - If fails, fallback to the local execution

So, we'll use local execution as a fallback mechanism. And,
moving it after to the remote execution allows us to implement
such further scenarios.
2020-11-24 13:43:38 +01:00
Önder Kalacı 532b457554
Solidify the slow-start algorithm (#4318)
The adaptive executor emulates the TCP's slow start algorithm.
Whenever the executor needs new connections, it doubles the number
of connections established in the previous iteration.

This approach is powerful. When the remote queries are very short
(like index lookup with < 1ms), even a single connection is sufficent
most of the time. When the remote queries are long, the executor
can quickly establish necessary number of connections.

One missing piece on our implementation seems that the executor
keeps doubling the number of connections even if the previous
connection attempts have been finalized. Instead, we should
wait until all the attempts are finalized. This is how TCP's
slow-start works. Plus, it decreases the unnecessary pressure
on the remote nodes.
2020-11-23 19:20:13 +01:00
Onder Kalaci c433c66f2b Do not execute subplans multiple times with cursors
Before this commit, we let AdaptiveExecutorPreExecutorRun()
to be effective multiple times on every FETCH on cursors.
That does not affect the correctness of the query results,
but adds significant overhead.
2020-11-20 10:43:56 +01:00
Onur Tirtir f80f4839ad Remove unused functions that cppcheck found 2020-10-19 13:50:52 +03:00
Onder Kalaci fe3caf3bc8 Local execution considers intermediate result size limit
With this commit, we make sure that local execution adds the
intermediate result size as the distributed execution adds. Plus,
it enforces the citus.max_intermediate_result_size value.
2020-10-15 17:18:55 +02:00
Sait Talha Nisanci ecde6c6eef Introduce GetCurrentLocalExecutionStatus wrapper
We should not access CurrentLocalExecutionStatus directly because that
would mean that we could also set it directly, which we shouldn't
because we have checks to see if the new state is possible, otherwise we
error.
2020-10-15 15:38:19 +03:00
Onder Kalaci 56ca256374 Forcefully terminate connections after citus.node_connection_timeout
After the connection timeout, we fail the session/pool. However, the
underlying connection can still be trying to connect. That is dangerous
because the new placement executions have already been in place. The
executor cannot handle the situation where multiple of
EXECUTION_ORDER_ANY task executions succeeds.

Adding a regression test doesn't seem easily doable. To reproduce the issue
- Add 2 worker nodes
- create a reference table
- set citus.node_connection_timeout to 1ms (requires code change)
- Continiously execute `SELECT count(*) FROM ref_table`
- Sometime later, you hit an out-of-array access in
  `ScheduleNextPlacementExecution()` hence crashing.
- The reason for that is sometimes the first connection
  successfully established while the executor is already
  trying to execute the query on the second node.
2020-09-30 18:24:24 +02:00
SaitTalhaNisanci 366461ccdb
Introduce cache entry/table utilities (#4132)
Introduce table entry utility functions

Citus table cache entry utilities are introduced so that we can easily
extend existing functionality with minimum changes, specifically changes
to these functions. For example IsNonDistributedTableCacheEntry can be
extended for citus local tables without the need to scan the whole
codebase and update each relevant part.

* Introduce utility functions to find the type of tables

A table type can be a reference table, a hash/range/append distributed
table. Utility methods are created so that we don't have to worry about
how a table is considered as a reference table etc. This also makes it
easy to extend the table types.

* Add IsCitusTableType utilities

* Rename IsCacheEntryCitusTableType -> IsCitusTableTypeCacheEntry

* Change citus table types in some checks
2020-09-02 22:26:05 +03:00
Hadi Moshayedi 7b74eca22d Support EXPLAIN EXECUTE ANALYZE. 2020-08-10 13:44:30 -07:00
Hanefi Onaldi 5be8287989
Fix comments of helper functions that set local config values (#4100) 2020-08-07 11:20:38 +03:00
Onder Kalaci eeb8c81de2 Implement shared connection count reservation & enable `citus.max_shared_pool_size` for COPY
With this patch, we introduce `locally_reserved_shared_connections.c/h` files
which are responsible for reserving some space in shared memory counters
upfront.

We sometimes need to reserve connections, but not necessarily
establish them. For example:
-  COPY command should reserve connections as it cannot know which
   connections it needs in which order. COPY establishes connections
   as any input data hits the workers. For example, for router COPY
   command, it only establishes 1 connection.

   As discussed here (https://github.com/citusdata/citus/pull/3849#pullrequestreview-431792473),
   COPY needs to reserve connections up-front, otherwise we can end
   up with resource starvation/un-detected deadlocks.
2020-08-03 18:51:40 +02:00
SaitTalhaNisanci 64469708af
separate the logic in ManageWorkerPool (#3298) 2020-07-23 13:47:35 +03:00
Onder Kalaci 52c0fccb08 Move executor specific logic to a function
Because as we're planning to use the same logic, it'd be nice to
use the exact same functions.
2020-07-22 15:09:47 +02:00
Onder Kalaci ff6555299c Unify node sort ordering
The executor relies on WorkerPool, and many other places rely on WorkerNode.
With this commit, we make sure that they are sorted via the same function/logic.
2020-07-22 11:03:25 +02:00
SaitTalhaNisanci b3af63c8ce
Remove task tracker executor (#3850)
* use adaptive executor even if task-tracker is set

* Update check-multi-mx tests for adaptive executor

Basically repartition joins are enabled where necessary. For parallel
tests max adaptive executor pool size is decresed to 2, otherwise we
would get too many clients error.

* Update limit_intermediate_size test

It seems that when we use adaptive executor instead of task tracker, we
exceed the intermediate result size less in the test. Therefore updated
the tests accordingly.

* Update multi_router_planner

It seems that there is one problem with multi_router_planner when we use
adaptive executor, we should fix the following error:
+ERROR:  relation "authors_range_840010" does not exist
+CONTEXT:  while executing command on localhost:57637

* update repartition join tests for check-multi

* update isolation tests for repartitioning

* Error out if shard_replication_factor > 1 with repartitioning

As we are removing the task tracker, we cannot switch to it if
shard_replication_factor > 1. In that case, we simply error out.

* Remove MULTI_EXECUTOR_TASK_TRACKER

* Remove multi_task_tracker_executor

Some utility methods are moved to task_execution_utils.c.

* Remove task tracker protocol methods

* Remove task_tracker.c methods

* remove unused methods from multi_server_executor

* fix style

* remove task tracker specific tests from worker_schedule

* comment out task tracker udf calls in tests

We were using task tracker udfs to test permissions in
multi_multiuser.sql. We should find some other way to test them, then we
should remove the commented out task tracker calls.

* remove task tracker test from follower schedule

* remove task tracker tests from multi mx schedule

* Remove task-tracker specific functions from worker functions

* remove multi task tracker extra schedule

* Remove unused methods from multi physical planner

* remove task_executor_type related things in tests

* remove LoadTuplesIntoTupleStore

* Do initial cleanup for repartition leftovers

During startup, task tracker would call TrackerCleanupJobDirectories and
TrackerCleanupJobSchemas to clean up leftover directories and job
schemas. With adaptive executor, while doing repartitions it is possible
to leak these things as well. We don't retry cleanups, so it is possible
to have leftover in case of errors.

TrackerCleanupJobDirectories is renamed as
RepartitionCleanupJobDirectories since it is repartition specific now,
however TrackerCleanupJobSchemas cannot be used currently because it is
task tracker specific. The thing is that this function is a no-op
currently.

We should add cleaning up intermediate schemas to DoInitialCleanup
method when that problem is solved(We might want to solve it in this PR
as well)

* Revert "remove task tracker tests from multi mx schedule"

This reverts commit 03ecc0a681.

* update multi mx repartition parallel tests

* not error with task_tracker_conninfo_cache_invalidate

* not run 4 repartition queries in parallel

It seems that when we run 4 repartition queries in parallel we get too
many clients error on CI even though we don't get it locally. Our guess
is that, it is because we open/close many connections without doing some
work and postgres has some delay to close the connections. Hence even
though connections are removed from the pg_stat_activity, they might
still not be closed. If the above assumption is correct, it is unlikely
for it to happen in practice because:
- There is some network latency in clusters, so this leaves some times
for connections to be able to close
- Repartition joins return some data and that also leaves some time for
connections to be fully closed.

As we don't get this error in our local, we currently assume that it is
not a bug. Ideally this wouldn't happen when we get rid of the
task-tracker repartition methods because they don't do any pruning and
might be opening more connections than necessary.

If this still gives us "too many clients" error, we can try to increase
the max_connections in our test suite(which is 100 by default).

Also there are different places where this error is given in postgres,
but adding some backtrace it seems that we get this from
ProcessStartupPacket. The backtraces can be found in this link:
https://circleci.com/gh/citusdata/citus/138702

* Set distributePlan->relationIdList when it is needed

It seems that we were setting the distributedPlan->relationIdList after
JobExecutorType is called, which would choose task-tracker if
replication factor > 1 and there is a repartition query. However, it
uses relationIdList to decide if the query has a repartition query, and
since it was not set yet, it would always think it is not a repartition
query and would choose adaptive executor when it should choose
task-tracker.

* use adaptive executor even with shard_replication_factor > 1

It seems that we were already using adaptive executor when
replication_factor > 1. So this commit removes the check.

* remove multi_resowner.c and deprecate some settings

* remove TaskExecution related leftovers

* change deprecated API error message

* not recursively plan single relatition repartition subquery

* recursively plan single relation repartition subquery

* test depreceated task tracker functions

* fix overlapping shard intervals in range-distributed test

* fix error message for citus_metadata_container

* drop task-tracker deprecated functions

* put the implemantation back to worker_cleanup_job_schema_cachesince citus cloud uses it

* drop some functions, add downgrade script

Some deprecated functions are dropped.
Downgrade script is added.
Some gucs are deprecated.
A new guc for repartition joins bucket size is added.

* order by a test to fix flappiness
2020-07-18 13:11:36 +03:00
Hadi Moshayedi 13003d8d05 Use TupleDestination API for partitioning in insert/select. 2020-07-17 09:43:46 -07:00
SaitTalhaNisanci b8830d063f
remove no-op check in TaskListRequires2PC (#4018)
We already return true if replication model is REPLICATION_MODEL_2PC at
the very beginning of the function, hence the check later is not used.
2020-07-10 14:16:23 +03:00
Hadi Moshayedi 23fa421639 Fix task->fetchedExplainAnalyzePlan memory issue. 2020-07-07 07:58:02 -07:00
citus bot bdfeb380d3 Fix some more master->coordinator comments 2020-07-07 10:37:53 +02:00
Onder Kalaci aa8a2866f3 Fix default value of EnableBinaryProtocol 2020-07-02 13:44:56 +02:00
Onder Kalaci 88c473e007 Sort WorkerPool in executions
We sort the workerList because adaptive connection management
(e.g., OPTIONAL_CONNECTION) requires any concurrent executions
to wait for the connections in the same order to prevent any
starvation. If we don't sort, we might end up with:
      Execution 1: Get connection for worker 1, wait for worker 2
      Execution 2: Get connection for worker 2, wait for worker 1

and, none could proceed. Instead, we enforce every execution establish
the required connections to workers in the same order.
2020-06-22 16:39:27 +02:00
Jelte Fennema 0259815d3a
Fix EXPLAIN ANALYZE received data counter issues (#3917)
In #3901 the "Data received from worker(s)" sections were added to EXPLAIN
ANALYZE. After merging @pykello posted some review comments. This addresses
those comments as well as fixing a other issues that I found while addressing 
them. The things this does:

1. Fix `EXPLAIN ANALYZE EXECUTE p1` to not increase received data on every
   execution
2. Fix `EXPLAIN ANALYZE EXECUTE p1(1)` to not return 0 bytes as received data
   allways.
3. Move `EXPLAIN ANALYZE` specific logic to `multi_explain.c` from
   `adaptive_executor.c`
4. Change naming of new explain sections to `Tuple data received from node(s)`.
   Firstly because a task can reference the coordinator too, so "worker(s)" was
   incorrect. Secondly to indicate that this is tuple data and not all network
   traffic that was performed.
5. Rename `totalReceivedData` in our codebase to `totalReceivedTupleData` to
   make it clearer that it's a tuple data counter, not all network traffic.
6. Actually add `binary_protocol` test to `multi_schedule` (woops)
7. Fix a randomly failing test in `local_shard_execution.sql`.
2020-06-17 11:33:38 +02:00
Jelte Fennema 927de6d187
Show amount of data received in EXPLAIN ANALYZE (#3901)
Sadly this does not actually work yet for binary protocol data, because
when doing EXPLAIN ANALYZE we send two commands at the same time. This
means we cannot use `SendRemoteCommandParams`, and thus cannot use the
binary protocol. This can still be useful though when using the text
protocol, to find out that a lot of data is being sent.
2020-06-15 16:01:05 +02:00
Jelte Fennema 0e12d045b1
Support use of binary protocol in between nodes (#3877)
This can save a lot of data to be sent in some cases, thus improving
performance for which inter query bandwidth is the bottleneck.
There's some issues with enabling this as default, so that's currently not done.
2020-06-12 15:02:51 +02:00
Philip Dubé 1722d8ac8b Allow routing modifying CTEs
We still recursively plan some cases, eg:
- INSERTs
- SELECT FOR UPDATE when reference tables in query
- Everything must be same single shard & replication model
2020-06-11 15:14:06 +00:00
Hadi Moshayedi bb96ef5047 Does the EXPLAIN ANALYZE at the same time as execution, so avoids executing twice.
We wrap worker tasks in worker_save_query_explain_analyze() so we can fetch
their explain output later by a call worker_last_saved_explain_analyze().

Fixes #3519
Fixes #2347
Fixes #2613
Fixes #621
2020-06-11 01:55:57 -07:00
Marco Slot 1243b6a948 Execute shard creation as utility tasks 2020-06-10 11:29:49 +02:00