From a5370f5bb0ea3f68fbb3982432941c0fdbd9f69c Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 5 Jun 2018 13:52:54 +0300 Subject: [PATCH] Realtime executor honours multi_shard_modify_mode We're relying on multi_shard_modify_mode GUC for real-time SELECTs. The name of the GUC is unfortunate, but, adding one more GUC (or renaming the GUC) would make the UX even worse. Given that this mode is mostly important for transaction blocks that involve modification /DDL queries along with real-time SELECTs, we can live with the confusion. --- .../executor/multi_client_executor.c | 13 +++++++++- .../distributed/executor/multi_executor.c | 5 +++- .../executor/multi_real_time_executor.c | 26 +++++++++++++------ .../expected/multi_real_time_transaction.out | 16 ++++++++++++ .../multi_real_time_transaction_0.out | 16 ++++++++++++ .../regress/expected/with_transactions.out | 17 ++++++++++++ .../sql/multi_real_time_transaction.sql | 7 +++++ src/test/regress/sql/with_transactions.sql | 10 +++++++ 8 files changed, 100 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index a28d1008c..118104299 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -20,6 +20,7 @@ #include "commands/dbcommands.h" #include "distributed/metadata_cache.h" #include "distributed/connection_management.h" +#include "distributed/multi_executor.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_server_executor.h" #include "distributed/placement_connection.h" @@ -188,7 +189,17 @@ MultiClientPlacementConnectStart(List *placementAccessList, const char *userName MultiConnection *connection = NULL; ConnStatusType connStatusType = CONNECTION_OK; int32 connectionId = AllocateConnectionId(); - int connectionFlags = CONNECTION_PER_PLACEMENT; /* no cached connections for now */ + int connectionFlags = 0; + + /* + * Although we're opening connections for SELECT queries, we're relying + * on multi_shard_modify_mode GUC. The name of the GUC is unfortunate, but, + * adding one more GUC (or renaming the GUC) would make the UX even worse. + */ + if (MultiShardConnectionType == PARALLEL_CONNECTION) + { + connectionFlags = CONNECTION_PER_PLACEMENT; + } if (connectionId == INVALID_CONNECTION_ID) { diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 7a2f256ce..a89193656 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -40,7 +40,10 @@ #include "utils/memutils.h" -/* controls the connection type for multi shard update/delete queries */ +/* + * Controls the connection type for multi shard modifications, DDLs + * TRUNCATE and real-time SELECT queries. + */ int MultiShardConnectionType = PARALLEL_CONNECTION; diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 4d4f94993..26509b018 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -901,23 +901,33 @@ LookupWorkerForTask(HTAB *workerHash, Task *task, TaskExecution *taskExecution) /* * WorkerConnectionsExhausted determines if the current query has exhausted the * maximum number of open connections that can be made to a worker. + * + * Note that the function takes sequential exection of the queries into account + * as well. In other words, in the sequential mode, the connections are considered + * to be exahusted when there is already a connection opened to the given worker. */ static bool WorkerConnectionsExhausted(WorkerNodeState *workerNodeState) { bool reachedLimit = false; - /* - * A worker cannot accept more than max_connections connections. If we have a - * small number of workers with many shards, then a single query could exhaust - * max_connections unless we throttle here. We use the value of max_connections - * on the master as a proxy for the worker configuration to avoid introducing a - * new configuration value. - */ - if (workerNodeState->openConnectionCount >= MaxConnections) + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION && + workerNodeState->openConnectionCount >= 1) { reachedLimit = true; } + else if (MultiShardConnectionType == PARALLEL_CONNECTION && + workerNodeState->openConnectionCount >= MaxConnections) + { + /* + * A worker cannot accept more than max_connections connections. If we have a + * small number of workers with many shards, then a single query could exhaust + * max_connections unless we throttle here. We use the value of max_connections + * on the master as a proxy for the worker configuration to avoid introducing a + * new configuration value. + */ + reachedLimit = true; + } return reachedLimit; } diff --git a/src/test/regress/expected/multi_real_time_transaction.out b/src/test/regress/expected/multi_real_time_transaction.out index 110fc51a3..a1f0ee882 100644 --- a/src/test/regress/expected/multi_real_time_transaction.out +++ b/src/test/regress/expected/multi_real_time_transaction.out @@ -340,6 +340,22 @@ ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock +ROLLBACK; +-- sequential real-time queries should be successfully executed +-- since the queries are sent over the same connection +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC; + id | pg_advisory_lock +----+------------------ + 6 | + 5 | + 4 | + 3 | + 2 | + 1 | +(6 rows) + ROLLBACK; SET client_min_messages TO DEFAULT; alter system set deadlock_timeout TO DEFAULT; diff --git a/src/test/regress/expected/multi_real_time_transaction_0.out b/src/test/regress/expected/multi_real_time_transaction_0.out index 33147487a..27a3d7603 100644 --- a/src/test/regress/expected/multi_real_time_transaction_0.out +++ b/src/test/regress/expected/multi_real_time_transaction_0.out @@ -348,6 +348,22 @@ ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock +ROLLBACK; +-- sequential real-time queries should be successfully executed +-- since the queries are sent over the same connection +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC; + id | pg_advisory_lock +----+------------------ + 6 | + 5 | + 4 | + 3 | + 2 | + 1 | +(6 rows) + ROLLBACK; SET client_min_messages TO DEFAULT; alter system set deadlock_timeout TO DEFAULT; diff --git a/src/test/regress/expected/with_transactions.out b/src/test/regress/expected/with_transactions.out index 5aebf2a83..1e0759b99 100644 --- a/src/test/regress/expected/with_transactions.out +++ b/src/test/regress/expected/with_transactions.out @@ -105,6 +105,23 @@ SELECT count(*) FROM second_raw_table; 0 (1 row) +-- sequential insert followed by a sequential real-time query should be fine +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +WITH ids_inserted AS +( + INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id +) +SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3; +DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries +DEBUG: generating subplan 17_1 for CTE ids_inserted: INSERT INTO with_transactions.raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING raw_table.tenant_id +DEBUG: Plan 17 query after replacing subqueries and CTEs: SELECT income FROM with_transactions.second_raw_table WHERE (tenant_id OPERATOR(pg_catalog.=) ANY (SELECT ids_inserted.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) ids_inserted)) ORDER BY income DESC LIMIT 3 +DEBUG: push down of limit count: 3 + income +-------- +(0 rows) + +ROLLBACK; RESET client_min_messages; RESET citus.shard_count; DROP SCHEMA with_transactions CASCADE; diff --git a/src/test/regress/sql/multi_real_time_transaction.sql b/src/test/regress/sql/multi_real_time_transaction.sql index 26e40b455..4c07be7df 100644 --- a/src/test/regress/sql/multi_real_time_transaction.sql +++ b/src/test/regress/sql/multi_real_time_transaction.sql @@ -213,6 +213,13 @@ BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ROLLBACK; +-- sequential real-time queries should be successfully executed +-- since the queries are sent over the same connection +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC; +ROLLBACK; + SET client_min_messages TO DEFAULT; alter system set deadlock_timeout TO DEFAULT; SELECT pg_reload_conf(); diff --git a/src/test/regress/sql/with_transactions.sql b/src/test/regress/sql/with_transactions.sql index d85edd6d0..bb23ce195 100644 --- a/src/test/regress/sql/with_transactions.sql +++ b/src/test/regress/sql/with_transactions.sql @@ -70,6 +70,16 @@ SELECT count(*) FROM raw_table; SELECT count(*) FROM raw_table WHERE created_at = '2001-02-10 20:00:00'; SELECT count(*) FROM second_raw_table; +-- sequential insert followed by a sequential real-time query should be fine +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +WITH ids_inserted AS +( + INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id +) +SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3; +ROLLBACK; + RESET client_min_messages; RESET citus.shard_count; DROP SCHEMA with_transactions CASCADE;