diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index a28d1008c..118104299 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -20,6 +20,7 @@ #include "commands/dbcommands.h" #include "distributed/metadata_cache.h" #include "distributed/connection_management.h" +#include "distributed/multi_executor.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_server_executor.h" #include "distributed/placement_connection.h" @@ -188,7 +189,17 @@ MultiClientPlacementConnectStart(List *placementAccessList, const char *userName MultiConnection *connection = NULL; ConnStatusType connStatusType = CONNECTION_OK; int32 connectionId = AllocateConnectionId(); - int connectionFlags = CONNECTION_PER_PLACEMENT; /* no cached connections for now */ + int connectionFlags = 0; + + /* + * Although we're opening connections for SELECT queries, we're relying + * on multi_shard_modify_mode GUC. The name of the GUC is unfortunate, but, + * adding one more GUC (or renaming the GUC) would make the UX even worse. + */ + if (MultiShardConnectionType == PARALLEL_CONNECTION) + { + connectionFlags = CONNECTION_PER_PLACEMENT; + } if (connectionId == INVALID_CONNECTION_ID) { diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 7a2f256ce..a89193656 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -40,7 +40,10 @@ #include "utils/memutils.h" -/* controls the connection type for multi shard update/delete queries */ +/* + * Controls the connection type for multi shard modifications, DDLs + * TRUNCATE and real-time SELECT queries. + */ int MultiShardConnectionType = PARALLEL_CONNECTION; diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 4d4f94993..26509b018 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -901,23 +901,33 @@ LookupWorkerForTask(HTAB *workerHash, Task *task, TaskExecution *taskExecution) /* * WorkerConnectionsExhausted determines if the current query has exhausted the * maximum number of open connections that can be made to a worker. + * + * Note that the function takes sequential exection of the queries into account + * as well. In other words, in the sequential mode, the connections are considered + * to be exahusted when there is already a connection opened to the given worker. */ static bool WorkerConnectionsExhausted(WorkerNodeState *workerNodeState) { bool reachedLimit = false; - /* - * A worker cannot accept more than max_connections connections. If we have a - * small number of workers with many shards, then a single query could exhaust - * max_connections unless we throttle here. We use the value of max_connections - * on the master as a proxy for the worker configuration to avoid introducing a - * new configuration value. - */ - if (workerNodeState->openConnectionCount >= MaxConnections) + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION && + workerNodeState->openConnectionCount >= 1) { reachedLimit = true; } + else if (MultiShardConnectionType == PARALLEL_CONNECTION && + workerNodeState->openConnectionCount >= MaxConnections) + { + /* + * A worker cannot accept more than max_connections connections. If we have a + * small number of workers with many shards, then a single query could exhaust + * max_connections unless we throttle here. We use the value of max_connections + * on the master as a proxy for the worker configuration to avoid introducing a + * new configuration value. + */ + reachedLimit = true; + } return reachedLimit; } diff --git a/src/test/regress/expected/multi_real_time_transaction.out b/src/test/regress/expected/multi_real_time_transaction.out index 110fc51a3..a1f0ee882 100644 --- a/src/test/regress/expected/multi_real_time_transaction.out +++ b/src/test/regress/expected/multi_real_time_transaction.out @@ -340,6 +340,22 @@ ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock +ROLLBACK; +-- sequential real-time queries should be successfully executed +-- since the queries are sent over the same connection +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC; + id | pg_advisory_lock +----+------------------ + 6 | + 5 | + 4 | + 3 | + 2 | + 1 | +(6 rows) + ROLLBACK; SET client_min_messages TO DEFAULT; alter system set deadlock_timeout TO DEFAULT; diff --git a/src/test/regress/expected/multi_real_time_transaction_0.out b/src/test/regress/expected/multi_real_time_transaction_0.out index 33147487a..27a3d7603 100644 --- a/src/test/regress/expected/multi_real_time_transaction_0.out +++ b/src/test/regress/expected/multi_real_time_transaction_0.out @@ -348,6 +348,22 @@ ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock +ROLLBACK; +-- sequential real-time queries should be successfully executed +-- since the queries are sent over the same connection +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC; + id | pg_advisory_lock +----+------------------ + 6 | + 5 | + 4 | + 3 | + 2 | + 1 | +(6 rows) + ROLLBACK; SET client_min_messages TO DEFAULT; alter system set deadlock_timeout TO DEFAULT; diff --git a/src/test/regress/expected/with_transactions.out b/src/test/regress/expected/with_transactions.out index 5aebf2a83..1e0759b99 100644 --- a/src/test/regress/expected/with_transactions.out +++ b/src/test/regress/expected/with_transactions.out @@ -105,6 +105,23 @@ SELECT count(*) FROM second_raw_table; 0 (1 row) +-- sequential insert followed by a sequential real-time query should be fine +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +WITH ids_inserted AS +( + INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id +) +SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3; +DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries +DEBUG: generating subplan 17_1 for CTE ids_inserted: INSERT INTO with_transactions.raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING raw_table.tenant_id +DEBUG: Plan 17 query after replacing subqueries and CTEs: SELECT income FROM with_transactions.second_raw_table WHERE (tenant_id OPERATOR(pg_catalog.=) ANY (SELECT ids_inserted.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) ids_inserted)) ORDER BY income DESC LIMIT 3 +DEBUG: push down of limit count: 3 + income +-------- +(0 rows) + +ROLLBACK; RESET client_min_messages; RESET citus.shard_count; DROP SCHEMA with_transactions CASCADE; diff --git a/src/test/regress/sql/multi_real_time_transaction.sql b/src/test/regress/sql/multi_real_time_transaction.sql index 26e40b455..4c07be7df 100644 --- a/src/test/regress/sql/multi_real_time_transaction.sql +++ b/src/test/regress/sql/multi_real_time_transaction.sql @@ -213,6 +213,13 @@ BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ROLLBACK; +-- sequential real-time queries should be successfully executed +-- since the queries are sent over the same connection +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC; +ROLLBACK; + SET client_min_messages TO DEFAULT; alter system set deadlock_timeout TO DEFAULT; SELECT pg_reload_conf(); diff --git a/src/test/regress/sql/with_transactions.sql b/src/test/regress/sql/with_transactions.sql index d85edd6d0..bb23ce195 100644 --- a/src/test/regress/sql/with_transactions.sql +++ b/src/test/regress/sql/with_transactions.sql @@ -70,6 +70,16 @@ SELECT count(*) FROM raw_table; SELECT count(*) FROM raw_table WHERE created_at = '2001-02-10 20:00:00'; SELECT count(*) FROM second_raw_table; +-- sequential insert followed by a sequential real-time query should be fine +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +WITH ids_inserted AS +( + INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id +) +SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3; +ROLLBACK; + RESET client_min_messages; RESET citus.shard_count; DROP SCHEMA with_transactions CASCADE;