From 8c8d69662174c0f4c8a8d7ff60f2a272f23ff34e Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Tue, 25 Jan 2022 18:43:21 +0300 Subject: [PATCH] Not fail over to local execution when it's not supported (#5625) We fall back to local execution if we cannot establish any more connections to local node. However, we should not do that for the commands that we don't know how to execute locally (or we know we shouldn't execute locally). To fix that, we take localExecutionSupported take into account in CanFailoverPlacementExecutionToLocalExecution too. Moreover, we also prompt a more accurate hint message to inform user about whether the execution is failed because local execution is disabled by them, or because local execution wasn't possible for given command. --- .../distributed/executor/adaptive_executor.c | 45 +++++++++--- src/test/regress/expected/single_node.out | 69 +++++++++++++++---- src/test/regress/sql/single_node.sql | 29 ++++++++ 3 files changed, 123 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 6161b7d70..9ff53cf37 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -301,6 +301,14 @@ typedef struct DistributedExecution * do cleanup for repartition queries. */ List *jobIdList; + + /* + * Indicates whether we can execute tasks locally during distributed + * execution. In other words, this flag must be set to false when + * executing a command that we surely know that local execution would + * fail, such as CREATE INDEX CONCURRENTLY. + */ + bool localExecutionSupported; } DistributedExecution; @@ -545,6 +553,12 @@ typedef struct ShardCommandExecution bool gotResults; TaskExecutionState executionState; + + /* + * Indicates whether given shard command can be executed locally on + * placements. Normally determined by DistributedExecution's same field. + */ + bool localExecutionSupported; } ShardCommandExecution; /* @@ -1112,6 +1126,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, execution->jobIdList = jobIdList; + execution->localExecutionSupported = localExecutionSupported; + /* * Since task can have multiple queries, we are not sure how many columns we should * allocate for. We start with 16, and reallocate when we need more. @@ -1140,7 +1156,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, } } - if (localExecutionSupported && ShouldExecuteTasksLocally(taskList)) + if (execution->localExecutionSupported && + ShouldExecuteTasksLocally(taskList)) { bool readOnlyPlan = !TaskListModifiesDatabase(modLevel, taskList); ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &execution->localTaskList, @@ -2004,6 +2021,8 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) shardCommandExecution->task = task; shardCommandExecution->executionOrder = ExecutionOrderForTask(modLevel, task); shardCommandExecution->executionState = TASK_EXECUTION_NOT_FINISHED; + shardCommandExecution->localExecutionSupported = + execution->localExecutionSupported; shardCommandExecution->placementExecutions = (TaskPlacementExecution **) palloc0(placementExecutionCount * sizeof(TaskPlacementExecution *)); @@ -2841,14 +2860,18 @@ ManageWorkerPool(WorkerPool *workerPool) if (execution->failed) { + const char *errHint = + execution->localExecutionSupported ? + "This command supports local execution. Consider enabling " + "local execution using SET citus.enable_local_execution " + "TO true;" : + "Consider using a higher value for max_connections"; + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg( - "could not establish any connections to the node %s:%d " - "when local execution is also disabled.", - workerPool->nodeName, - workerPool->nodePort), - errhint("Enable local execution via SET " - "citus.enable_local_execution TO true;"))); + errmsg("the total number of connections on the " + "server is more than max_connections(%d)", + MaxConnections), + errhint("%s", errHint))); } return; @@ -5034,6 +5057,12 @@ CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution *placementE return false; } + if (!placementExecution->shardCommandExecution->localExecutionSupported) + { + /* cannot execute given task locally */ + return false; + } + if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED) { /* diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 9715232a2..6b92e7bba 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -5,6 +5,8 @@ SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 90630500; -- Ensure tuple data in explain analyze output is the same on all PG versions SET citus.enable_binary_protocol = TRUE; +-- do not cache any connections for now, will enable it back soon +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; -- adding the coordinator as inactive is disallowed SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); ERROR: coordinator node cannot be added as inactive node @@ -33,6 +35,49 @@ SELECT count(*) FROM pg_dist_node; (1 row) -- there are no workers now, but we should still be able to create Citus tables +-- force local execution when creating the index +ALTER SYSTEM SET citus.local_shared_pool_size TO -1; +-- Postmaster might not ack SIGHUP signal sent by pg_reload_conf() immediately, +-- so we need to sleep for some amount of time to do our best to ensure that +-- postmaster reflects GUC changes. +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE failover_to_local (a int); +SELECT create_distributed_table('failover_to_local', 'a', shard_count=>32); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE INDEX CONCURRENTLY ON failover_to_local(a); +WARNING: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index. + Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index. +ERROR: the total number of connections on the server is more than max_connections(100) +HINT: Consider using a higher value for max_connections +-- reset global GUC changes +ALTER SYSTEM RESET citus.local_shared_pool_size; +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SET client_min_messages TO WARNING; +DROP TABLE failover_to_local; +RESET client_min_messages; +-- so that we don't have to update rest of the test output +SET citus.next_shard_id TO 90630500; CREATE TABLE ref(x int, y int); SELECT create_reference_table('ref'); create_reference_table @@ -2163,24 +2208,24 @@ NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.anothe -- local execution and the queries would fail SET citus.enable_local_execution TO false; SELECT count(*) from another_schema_table; -ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. -HINT: Enable local execution via SET citus.enable_local_execution TO true; +ERROR: the total number of connections on the server is more than max_connections(100) +HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; UPDATE another_schema_table SET b = b; -ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. -HINT: Enable local execution via SET citus.enable_local_execution TO true; +ERROR: the total number of connections on the server is more than max_connections(100) +HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; INSERT INTO another_schema_table SELECT * FROM another_schema_table; -ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. -HINT: Enable local execution via SET citus.enable_local_execution TO true; +ERROR: the total number of connections on the server is more than max_connections(100) +HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table; -ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. -HINT: Enable local execution via SET citus.enable_local_execution TO true; +ERROR: the total number of connections on the server is more than max_connections(100) +HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) SELECT count(*) FROM cte_1; -ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. -HINT: Enable local execution via SET citus.enable_local_execution TO true; +ERROR: the total number of connections on the server is more than max_connections(100) +HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); -ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. -HINT: Enable local execution via SET citus.enable_local_execution TO true; +ERROR: the total number of connections on the server is more than max_connections(100) +HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; -- copy fails if local execution is disabled and there is no connection slot COPY another_schema_table(a) FROM PROGRAM 'seq 32'; ERROR: could not find an available connection diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index c21066424..5aba05770 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -7,6 +7,9 @@ SET citus.next_shard_id TO 90630500; -- Ensure tuple data in explain analyze output is the same on all PG versions SET citus.enable_binary_protocol = TRUE; +-- do not cache any connections for now, will enable it back soon +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; + -- adding the coordinator as inactive is disallowed SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); @@ -25,6 +28,32 @@ SELECT count(*) FROM pg_dist_node; -- there are no workers now, but we should still be able to create Citus tables +-- force local execution when creating the index +ALTER SYSTEM SET citus.local_shared_pool_size TO -1; + +-- Postmaster might not ack SIGHUP signal sent by pg_reload_conf() immediately, +-- so we need to sleep for some amount of time to do our best to ensure that +-- postmaster reflects GUC changes. +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +CREATE TABLE failover_to_local (a int); +SELECT create_distributed_table('failover_to_local', 'a', shard_count=>32); + +CREATE INDEX CONCURRENTLY ON failover_to_local(a); + +-- reset global GUC changes +ALTER SYSTEM RESET citus.local_shared_pool_size; +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +SELECT pg_reload_conf(); + +SET client_min_messages TO WARNING; +DROP TABLE failover_to_local; +RESET client_min_messages; + +-- so that we don't have to update rest of the test output +SET citus.next_shard_id TO 90630500; + CREATE TABLE ref(x int, y int); SELECT create_reference_table('ref');