diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 6161b7d70..9ff53cf37 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -301,6 +301,14 @@ typedef struct DistributedExecution * do cleanup for repartition queries. */ List *jobIdList; + + /* + * Indicates whether we can execute tasks locally during distributed + * execution. In other words, this flag must be set to false when + * executing a command that we surely know that local execution would + * fail, such as CREATE INDEX CONCURRENTLY. + */ + bool localExecutionSupported; } DistributedExecution; @@ -545,6 +553,12 @@ typedef struct ShardCommandExecution bool gotResults; TaskExecutionState executionState; + + /* + * Indicates whether given shard command can be executed locally on + * placements. Normally determined by DistributedExecution's same field. + */ + bool localExecutionSupported; } ShardCommandExecution; /* @@ -1112,6 +1126,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, execution->jobIdList = jobIdList; + execution->localExecutionSupported = localExecutionSupported; + /* * Since task can have multiple queries, we are not sure how many columns we should * allocate for. We start with 16, and reallocate when we need more. @@ -1140,7 +1156,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, } } - if (localExecutionSupported && ShouldExecuteTasksLocally(taskList)) + if (execution->localExecutionSupported && + ShouldExecuteTasksLocally(taskList)) { bool readOnlyPlan = !TaskListModifiesDatabase(modLevel, taskList); ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &execution->localTaskList, @@ -2004,6 +2021,8 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) shardCommandExecution->task = task; shardCommandExecution->executionOrder = ExecutionOrderForTask(modLevel, task); shardCommandExecution->executionState = TASK_EXECUTION_NOT_FINISHED; + shardCommandExecution->localExecutionSupported = + execution->localExecutionSupported; shardCommandExecution->placementExecutions = (TaskPlacementExecution **) palloc0(placementExecutionCount * sizeof(TaskPlacementExecution *)); @@ -2841,14 +2860,18 @@ ManageWorkerPool(WorkerPool *workerPool) if (execution->failed) { + const char *errHint = + execution->localExecutionSupported ? + "This command supports local execution. Consider enabling " + "local execution using SET citus.enable_local_execution " + "TO true;" : + "Consider using a higher value for max_connections"; + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg( - "could not establish any connections to the node %s:%d " - "when local execution is also disabled.", - workerPool->nodeName, - workerPool->nodePort), - errhint("Enable local execution via SET " - "citus.enable_local_execution TO true;"))); + errmsg("the total number of connections on the " + "server is more than max_connections(%d)", + MaxConnections), + errhint("%s", errHint))); } return; @@ -5034,6 +5057,12 @@ CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution *placementE return false; } + if (!placementExecution->shardCommandExecution->localExecutionSupported) + { + /* cannot execute given task locally */ + return false; + } + if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED) { /* diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 9715232a2..6b92e7bba 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -5,6 +5,8 @@ SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 90630500; -- Ensure tuple data in explain analyze output is the same on all PG versions SET citus.enable_binary_protocol = TRUE; +-- do not cache any connections for now, will enable it back soon +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; -- adding the coordinator as inactive is disallowed SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); ERROR: coordinator node cannot be added as inactive node @@ -33,6 +35,49 @@ SELECT count(*) FROM pg_dist_node; (1 row) -- there are no workers now, but we should still be able to create Citus tables +-- force local execution when creating the index +ALTER SYSTEM SET citus.local_shared_pool_size TO -1; +-- Postmaster might not ack SIGHUP signal sent by pg_reload_conf() immediately, +-- so we need to sleep for some amount of time to do our best to ensure that +-- postmaster reflects GUC changes. +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE failover_to_local (a int); +SELECT create_distributed_table('failover_to_local', 'a', shard_count=>32); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE INDEX CONCURRENTLY ON failover_to_local(a); +WARNING: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index. + Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index. +ERROR: the total number of connections on the server is more than max_connections(100) +HINT: Consider using a higher value for max_connections +-- reset global GUC changes +ALTER SYSTEM RESET citus.local_shared_pool_size; +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SET client_min_messages TO WARNING; +DROP TABLE failover_to_local; +RESET client_min_messages; +-- so that we don't have to update rest of the test output +SET citus.next_shard_id TO 90630500; CREATE TABLE ref(x int, y int); SELECT create_reference_table('ref'); create_reference_table @@ -2163,24 +2208,24 @@ NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.anothe -- local execution and the queries would fail SET citus.enable_local_execution TO false; SELECT count(*) from another_schema_table; -ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. -HINT: Enable local execution via SET citus.enable_local_execution TO true; +ERROR: the total number of connections on the server is more than max_connections(100) +HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; UPDATE another_schema_table SET b = b; -ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. -HINT: Enable local execution via SET citus.enable_local_execution TO true; +ERROR: the total number of connections on the server is more than max_connections(100) +HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; INSERT INTO another_schema_table SELECT * FROM another_schema_table; -ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. -HINT: Enable local execution via SET citus.enable_local_execution TO true; +ERROR: the total number of connections on the server is more than max_connections(100) +HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table; -ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. -HINT: Enable local execution via SET citus.enable_local_execution TO true; +ERROR: the total number of connections on the server is more than max_connections(100) +HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) SELECT count(*) FROM cte_1; -ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. -HINT: Enable local execution via SET citus.enable_local_execution TO true; +ERROR: the total number of connections on the server is more than max_connections(100) +HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); -ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. -HINT: Enable local execution via SET citus.enable_local_execution TO true; +ERROR: the total number of connections on the server is more than max_connections(100) +HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; -- copy fails if local execution is disabled and there is no connection slot COPY another_schema_table(a) FROM PROGRAM 'seq 32'; ERROR: could not find an available connection diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index c21066424..5aba05770 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -7,6 +7,9 @@ SET citus.next_shard_id TO 90630500; -- Ensure tuple data in explain analyze output is the same on all PG versions SET citus.enable_binary_protocol = TRUE; +-- do not cache any connections for now, will enable it back soon +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; + -- adding the coordinator as inactive is disallowed SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); @@ -25,6 +28,32 @@ SELECT count(*) FROM pg_dist_node; -- there are no workers now, but we should still be able to create Citus tables +-- force local execution when creating the index +ALTER SYSTEM SET citus.local_shared_pool_size TO -1; + +-- Postmaster might not ack SIGHUP signal sent by pg_reload_conf() immediately, +-- so we need to sleep for some amount of time to do our best to ensure that +-- postmaster reflects GUC changes. +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +CREATE TABLE failover_to_local (a int); +SELECT create_distributed_table('failover_to_local', 'a', shard_count=>32); + +CREATE INDEX CONCURRENTLY ON failover_to_local(a); + +-- reset global GUC changes +ALTER SYSTEM RESET citus.local_shared_pool_size; +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +SELECT pg_reload_conf(); + +SET client_min_messages TO WARNING; +DROP TABLE failover_to_local; +RESET client_min_messages; + +-- so that we don't have to update rest of the test output +SET citus.next_shard_id TO 90630500; + CREATE TABLE ref(x int, y int); SELECT create_reference_table('ref');