Merge branch 'master' into velioglu/table_wo_seq_prototype

ref_on_buraks
Burak Velioglu 2022-01-26 01:20:03 +03:00
commit 048be241c9
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
3 changed files with 123 additions and 20 deletions

View File

@ -301,6 +301,14 @@ typedef struct DistributedExecution
* do cleanup for repartition queries. * do cleanup for repartition queries.
*/ */
List *jobIdList; List *jobIdList;
/*
* Indicates whether we can execute tasks locally during distributed
* execution. In other words, this flag must be set to false when
* executing a command that we surely know that local execution would
* fail, such as CREATE INDEX CONCURRENTLY.
*/
bool localExecutionSupported;
} DistributedExecution; } DistributedExecution;
@ -545,6 +553,12 @@ typedef struct ShardCommandExecution
bool gotResults; bool gotResults;
TaskExecutionState executionState; TaskExecutionState executionState;
/*
* Indicates whether given shard command can be executed locally on
* placements. Normally determined by DistributedExecution's same field.
*/
bool localExecutionSupported;
} ShardCommandExecution; } ShardCommandExecution;
/* /*
@ -1112,6 +1126,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
execution->jobIdList = jobIdList; execution->jobIdList = jobIdList;
execution->localExecutionSupported = localExecutionSupported;
/* /*
* Since task can have multiple queries, we are not sure how many columns we should * Since task can have multiple queries, we are not sure how many columns we should
* allocate for. We start with 16, and reallocate when we need more. * allocate for. We start with 16, and reallocate when we need more.
@ -1140,7 +1156,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
} }
} }
if (localExecutionSupported && ShouldExecuteTasksLocally(taskList)) if (execution->localExecutionSupported &&
ShouldExecuteTasksLocally(taskList))
{ {
bool readOnlyPlan = !TaskListModifiesDatabase(modLevel, taskList); bool readOnlyPlan = !TaskListModifiesDatabase(modLevel, taskList);
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &execution->localTaskList, ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &execution->localTaskList,
@ -2004,6 +2021,8 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
shardCommandExecution->task = task; shardCommandExecution->task = task;
shardCommandExecution->executionOrder = ExecutionOrderForTask(modLevel, task); shardCommandExecution->executionOrder = ExecutionOrderForTask(modLevel, task);
shardCommandExecution->executionState = TASK_EXECUTION_NOT_FINISHED; shardCommandExecution->executionState = TASK_EXECUTION_NOT_FINISHED;
shardCommandExecution->localExecutionSupported =
execution->localExecutionSupported;
shardCommandExecution->placementExecutions = shardCommandExecution->placementExecutions =
(TaskPlacementExecution **) palloc0(placementExecutionCount * (TaskPlacementExecution **) palloc0(placementExecutionCount *
sizeof(TaskPlacementExecution *)); sizeof(TaskPlacementExecution *));
@ -2841,14 +2860,18 @@ ManageWorkerPool(WorkerPool *workerPool)
if (execution->failed) if (execution->failed)
{ {
const char *errHint =
execution->localExecutionSupported ?
"This command supports local execution. Consider enabling "
"local execution using SET citus.enable_local_execution "
"TO true;" :
"Consider using a higher value for max_connections";
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg( errmsg("the total number of connections on the "
"could not establish any connections to the node %s:%d " "server is more than max_connections(%d)",
"when local execution is also disabled.", MaxConnections),
workerPool->nodeName, errhint("%s", errHint)));
workerPool->nodePort),
errhint("Enable local execution via SET "
"citus.enable_local_execution TO true;")));
} }
return; return;
@ -5034,6 +5057,12 @@ CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution *placementE
return false; return false;
} }
if (!placementExecution->shardCommandExecution->localExecutionSupported)
{
/* cannot execute given task locally */
return false;
}
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED) if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED)
{ {
/* /*

View File

@ -5,6 +5,8 @@ SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 90630500; SET citus.next_shard_id TO 90630500;
-- Ensure tuple data in explain analyze output is the same on all PG versions -- Ensure tuple data in explain analyze output is the same on all PG versions
SET citus.enable_binary_protocol = TRUE; SET citus.enable_binary_protocol = TRUE;
-- do not cache any connections for now, will enable it back soon
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
-- adding the coordinator as inactive is disallowed -- adding the coordinator as inactive is disallowed
SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);
ERROR: coordinator node cannot be added as inactive node ERROR: coordinator node cannot be added as inactive node
@ -33,6 +35,49 @@ SELECT count(*) FROM pg_dist_node;
(1 row) (1 row)
-- there are no workers now, but we should still be able to create Citus tables -- there are no workers now, but we should still be able to create Citus tables
-- force local execution when creating the index
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
-- Postmaster might not ack SIGHUP signal sent by pg_reload_conf() immediately,
-- so we need to sleep for some amount of time to do our best to ensure that
-- postmaster reflects GUC changes.
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
CREATE TABLE failover_to_local (a int);
SELECT create_distributed_table('failover_to_local', 'a', shard_count=>32);
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE INDEX CONCURRENTLY ON failover_to_local(a);
WARNING: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index.
Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index.
ERROR: the total number of connections on the server is more than max_connections(100)
HINT: Consider using a higher value for max_connections
-- reset global GUC changes
ALTER SYSTEM RESET citus.local_shared_pool_size;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SET client_min_messages TO WARNING;
DROP TABLE failover_to_local;
RESET client_min_messages;
-- so that we don't have to update rest of the test output
SET citus.next_shard_id TO 90630500;
CREATE TABLE ref(x int, y int); CREATE TABLE ref(x int, y int);
SELECT create_reference_table('ref'); SELECT create_reference_table('ref');
create_reference_table create_reference_table
@ -2163,24 +2208,24 @@ NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.anothe
-- local execution and the queries would fail -- local execution and the queries would fail
SET citus.enable_local_execution TO false; SET citus.enable_local_execution TO false;
SELECT count(*) from another_schema_table; SELECT count(*) from another_schema_table;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. ERROR: the total number of connections on the server is more than max_connections(100)
HINT: Enable local execution via SET citus.enable_local_execution TO true; HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true;
UPDATE another_schema_table SET b = b; UPDATE another_schema_table SET b = b;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. ERROR: the total number of connections on the server is more than max_connections(100)
HINT: Enable local execution via SET citus.enable_local_execution TO true; HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true;
INSERT INTO another_schema_table SELECT * FROM another_schema_table; INSERT INTO another_schema_table SELECT * FROM another_schema_table;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. ERROR: the total number of connections on the server is more than max_connections(100)
HINT: Enable local execution via SET citus.enable_local_execution TO true; HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true;
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table; INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. ERROR: the total number of connections on the server is more than max_connections(100)
HINT: Enable local execution via SET citus.enable_local_execution TO true; HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true;
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
SELECT count(*) FROM cte_1; SELECT count(*) FROM cte_1;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. ERROR: the total number of connections on the server is more than max_connections(100)
HINT: Enable local execution via SET citus.enable_local_execution TO true; HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true;
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. ERROR: the total number of connections on the server is more than max_connections(100)
HINT: Enable local execution via SET citus.enable_local_execution TO true; HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true;
-- copy fails if local execution is disabled and there is no connection slot -- copy fails if local execution is disabled and there is no connection slot
COPY another_schema_table(a) FROM PROGRAM 'seq 32'; COPY another_schema_table(a) FROM PROGRAM 'seq 32';
ERROR: could not find an available connection ERROR: could not find an available connection

View File

@ -7,6 +7,9 @@ SET citus.next_shard_id TO 90630500;
-- Ensure tuple data in explain analyze output is the same on all PG versions -- Ensure tuple data in explain analyze output is the same on all PG versions
SET citus.enable_binary_protocol = TRUE; SET citus.enable_binary_protocol = TRUE;
-- do not cache any connections for now, will enable it back soon
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
-- adding the coordinator as inactive is disallowed -- adding the coordinator as inactive is disallowed
SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);
@ -25,6 +28,32 @@ SELECT count(*) FROM pg_dist_node;
-- there are no workers now, but we should still be able to create Citus tables -- there are no workers now, but we should still be able to create Citus tables
-- force local execution when creating the index
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
-- Postmaster might not ack SIGHUP signal sent by pg_reload_conf() immediately,
-- so we need to sleep for some amount of time to do our best to ensure that
-- postmaster reflects GUC changes.
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
CREATE TABLE failover_to_local (a int);
SELECT create_distributed_table('failover_to_local', 'a', shard_count=>32);
CREATE INDEX CONCURRENTLY ON failover_to_local(a);
-- reset global GUC changes
ALTER SYSTEM RESET citus.local_shared_pool_size;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
SET client_min_messages TO WARNING;
DROP TABLE failover_to_local;
RESET client_min_messages;
-- so that we don't have to update rest of the test output
SET citus.next_shard_id TO 90630500;
CREATE TABLE ref(x int, y int); CREATE TABLE ref(x int, y int);
SELECT create_reference_table('ref'); SELECT create_reference_table('ref');