Not fail over to local execution when it's not supported (#5625)

We fall back to local execution if we cannot establish any more
connections to local node. However, we should not do that for the
commands that we don't know how to execute locally (or we know we
shouldn't execute locally). To fix that, we take localExecutionSupported
take into account in CanFailoverPlacementExecutionToLocalExecution too.

Moreover, we also prompt a more accurate hint message to inform user
about whether the execution is failed because local execution is
disabled by them, or because local execution wasn't possible for given
command.
pull/5653/head
Onur Tirtir 2022-01-25 18:43:21 +03:00 committed by GitHub
parent ff3913ad99
commit 8c8d696621
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 123 additions and 20 deletions

View File

@ -301,6 +301,14 @@ typedef struct DistributedExecution
* do cleanup for repartition queries.
*/
List *jobIdList;
/*
* Indicates whether we can execute tasks locally during distributed
* execution. In other words, this flag must be set to false when
* executing a command that we surely know that local execution would
* fail, such as CREATE INDEX CONCURRENTLY.
*/
bool localExecutionSupported;
} DistributedExecution;
@ -545,6 +553,12 @@ typedef struct ShardCommandExecution
bool gotResults;
TaskExecutionState executionState;
/*
* Indicates whether given shard command can be executed locally on
* placements. Normally determined by DistributedExecution's same field.
*/
bool localExecutionSupported;
} ShardCommandExecution;
/*
@ -1112,6 +1126,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
execution->jobIdList = jobIdList;
execution->localExecutionSupported = localExecutionSupported;
/*
* Since task can have multiple queries, we are not sure how many columns we should
* allocate for. We start with 16, and reallocate when we need more.
@ -1140,7 +1156,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
}
}
if (localExecutionSupported && ShouldExecuteTasksLocally(taskList))
if (execution->localExecutionSupported &&
ShouldExecuteTasksLocally(taskList))
{
bool readOnlyPlan = !TaskListModifiesDatabase(modLevel, taskList);
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &execution->localTaskList,
@ -2004,6 +2021,8 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
shardCommandExecution->task = task;
shardCommandExecution->executionOrder = ExecutionOrderForTask(modLevel, task);
shardCommandExecution->executionState = TASK_EXECUTION_NOT_FINISHED;
shardCommandExecution->localExecutionSupported =
execution->localExecutionSupported;
shardCommandExecution->placementExecutions =
(TaskPlacementExecution **) palloc0(placementExecutionCount *
sizeof(TaskPlacementExecution *));
@ -2841,14 +2860,18 @@ ManageWorkerPool(WorkerPool *workerPool)
if (execution->failed)
{
const char *errHint =
execution->localExecutionSupported ?
"This command supports local execution. Consider enabling "
"local execution using SET citus.enable_local_execution "
"TO true;" :
"Consider using a higher value for max_connections";
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg(
"could not establish any connections to the node %s:%d "
"when local execution is also disabled.",
workerPool->nodeName,
workerPool->nodePort),
errhint("Enable local execution via SET "
"citus.enable_local_execution TO true;")));
errmsg("the total number of connections on the "
"server is more than max_connections(%d)",
MaxConnections),
errhint("%s", errHint)));
}
return;
@ -5034,6 +5057,12 @@ CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution *placementE
return false;
}
if (!placementExecution->shardCommandExecution->localExecutionSupported)
{
/* cannot execute given task locally */
return false;
}
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED)
{
/*

View File

@ -5,6 +5,8 @@ SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 90630500;
-- Ensure tuple data in explain analyze output is the same on all PG versions
SET citus.enable_binary_protocol = TRUE;
-- do not cache any connections for now, will enable it back soon
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
-- adding the coordinator as inactive is disallowed
SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);
ERROR: coordinator node cannot be added as inactive node
@ -33,6 +35,49 @@ SELECT count(*) FROM pg_dist_node;
(1 row)
-- there are no workers now, but we should still be able to create Citus tables
-- force local execution when creating the index
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
-- Postmaster might not ack SIGHUP signal sent by pg_reload_conf() immediately,
-- so we need to sleep for some amount of time to do our best to ensure that
-- postmaster reflects GUC changes.
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
CREATE TABLE failover_to_local (a int);
SELECT create_distributed_table('failover_to_local', 'a', shard_count=>32);
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE INDEX CONCURRENTLY ON failover_to_local(a);
WARNING: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index.
Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index.
ERROR: the total number of connections on the server is more than max_connections(100)
HINT: Consider using a higher value for max_connections
-- reset global GUC changes
ALTER SYSTEM RESET citus.local_shared_pool_size;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SET client_min_messages TO WARNING;
DROP TABLE failover_to_local;
RESET client_min_messages;
-- so that we don't have to update rest of the test output
SET citus.next_shard_id TO 90630500;
CREATE TABLE ref(x int, y int);
SELECT create_reference_table('ref');
create_reference_table
@ -2163,24 +2208,24 @@ NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.anothe
-- local execution and the queries would fail
SET citus.enable_local_execution TO false;
SELECT count(*) from another_schema_table;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
HINT: Enable local execution via SET citus.enable_local_execution TO true;
ERROR: the total number of connections on the server is more than max_connections(100)
HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true;
UPDATE another_schema_table SET b = b;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
HINT: Enable local execution via SET citus.enable_local_execution TO true;
ERROR: the total number of connections on the server is more than max_connections(100)
HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true;
INSERT INTO another_schema_table SELECT * FROM another_schema_table;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
HINT: Enable local execution via SET citus.enable_local_execution TO true;
ERROR: the total number of connections on the server is more than max_connections(100)
HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true;
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
HINT: Enable local execution via SET citus.enable_local_execution TO true;
ERROR: the total number of connections on the server is more than max_connections(100)
HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true;
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
SELECT count(*) FROM cte_1;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
HINT: Enable local execution via SET citus.enable_local_execution TO true;
ERROR: the total number of connections on the server is more than max_connections(100)
HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true;
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
HINT: Enable local execution via SET citus.enable_local_execution TO true;
ERROR: the total number of connections on the server is more than max_connections(100)
HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true;
-- copy fails if local execution is disabled and there is no connection slot
COPY another_schema_table(a) FROM PROGRAM 'seq 32';
ERROR: could not find an available connection

View File

@ -7,6 +7,9 @@ SET citus.next_shard_id TO 90630500;
-- Ensure tuple data in explain analyze output is the same on all PG versions
SET citus.enable_binary_protocol = TRUE;
-- do not cache any connections for now, will enable it back soon
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
-- adding the coordinator as inactive is disallowed
SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);
@ -25,6 +28,32 @@ SELECT count(*) FROM pg_dist_node;
-- there are no workers now, but we should still be able to create Citus tables
-- force local execution when creating the index
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
-- Postmaster might not ack SIGHUP signal sent by pg_reload_conf() immediately,
-- so we need to sleep for some amount of time to do our best to ensure that
-- postmaster reflects GUC changes.
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
CREATE TABLE failover_to_local (a int);
SELECT create_distributed_table('failover_to_local', 'a', shard_count=>32);
CREATE INDEX CONCURRENTLY ON failover_to_local(a);
-- reset global GUC changes
ALTER SYSTEM RESET citus.local_shared_pool_size;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
SET client_min_messages TO WARNING;
DROP TABLE failover_to_local;
RESET client_min_messages;
-- so that we don't have to update rest of the test output
SET citus.next_shard_id TO 90630500;
CREATE TABLE ref(x int, y int);
SELECT create_reference_table('ref');