diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index cb782bce3..b36cf93ae 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -581,7 +581,6 @@ static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLev static void StartDistributedExecution(DistributedExecution *execution); static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution); static void RunDistributedExecution(DistributedExecution *execution); -static bool ShouldRunTasksSequentially(List *taskList); static void SequentialRunDistributedExecution(DistributedExecution *execution); static void FinishDistributedExecution(DistributedExecution *execution); @@ -2105,7 +2104,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) * true sequential execution, concurrent multi-row upserts could easily form * a distributed deadlock when the upserts touch the same rows. */ -static bool +bool ShouldRunTasksSequentially(List *taskList) { if (list_length(taskList) < 2) diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index c5efcaaab..d49a0d44d 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -80,6 +80,7 @@ #include "distributed/pg_version_constants.h" +#include "distributed/adaptive_executor.h" #include "distributed/commands/utility_hook.h" #include "distributed/citus_custom_scan.h" #include "distributed/citus_ruleutils.h" @@ -764,11 +765,14 @@ ShouldExecuteTasksLocally(List *taskList) { /* * For multi-task executions, we prefer to use connections for parallelism, - * except when in a multi-statement transaction since there could be other - * commands that require local execution. + * except for two cases. First, when in a multi-statement transaction since + * there could be other commands that require local execution. Second, the + * task list already requires sequential execution. In that case, connection + * establishment becomes an unnecessary operation. */ - return IsMultiStatementTransaction() && AnyTaskAccessesLocalNode(taskList); + return (IsMultiStatementTransaction() || ShouldRunTasksSequentially(taskList)) && + AnyTaskAccessesLocalNode(taskList); } return false; diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index ae28e4385..3affd1877 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -11,6 +11,7 @@ extern bool EnableBinaryProtocol; /* GUC, number of ms to wait between opening connections to the same worker */ extern int ExecutorSlowStartInterval; +extern bool ShouldRunTasksSequentially(List *taskList); extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported); extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize, bool localExecutionSupported); diff --git a/src/test/regress/expected/coordinator_evaluation_modify.out b/src/test/regress/expected/coordinator_evaluation_modify.out index 60d2b35c3..588466fa1 100644 --- a/src/test/regress/expected/coordinator_evaluation_modify.out +++ b/src/test/regress/expected/coordinator_evaluation_modify.out @@ -831,6 +831,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test3'', 3)'), (4, '(''test4'', 4)'), (7, '(''test7'', 7)'), (9, '(''test9'', 9)'), (11, '(''test11'', 11)'), (12, '(''test12'', 12)'), (14, '(''test14'', 14)'), (16, '(''test16'', 16)'); +NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(''test3'',3)'::coordinator_evaluation_combinations_modify.user_data), (4,'(''test4'',4)'::coordinator_evaluation_combinations_modify.user_data), (7,'(''test7'',7)'::coordinator_evaluation_combinations_modify.user_data), (14,'(''test14'',14)'::coordinator_evaluation_combinations_modify.user_data), (16,'(''test16'',16)'::coordinator_evaluation_combinations_modify.user_data) +NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(''test9'',9)'::coordinator_evaluation_combinations_modify.user_data), (11,'(''test11'',11)'::coordinator_evaluation_combinations_modify.user_data), (12,'(''test12'',12)'::coordinator_evaluation_combinations_modify.user_data) -- make sure that it is also true for fast-path router queries with paramaters PREPARE fast_path_router_with_param(int) AS DELETE FROM user_info_data WHERE user_id = $1 RETURNING user_id, u_data; execute fast_path_router_with_param(3); @@ -893,6 +895,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)'), (4, '(''test'', 2)'), (7, '(''test'', 2)'), (9, '(''test'', 9)'), (11, '(''test'', 2)'), (12, '(''test'', 2)'), (14, '(''test'', 2)'), (16, '(''test'', 2)'); +NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (4,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (7,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (14,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (16,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data) +NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(''test'',9)'::coordinator_evaluation_combinations_modify.user_data), (11,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (12,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data) -- make sure that it is also true for fast-path router queries with paramaters PREPARE fast_path_router_with_param_and_func(int) AS DELETE FROM user_info_data WHERE u_data = ('''test''', get_constant_stable())::user_data AND user_id = $1 RETURNING user_id, u_data; execute fast_path_router_with_param_and_func(3); @@ -1142,6 +1146,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)), (4, ('test', 2)), (7, ('test', 2)), (9, ('test', 2)), (11, ('test', 2)), (12, ('test', 2)), (14, ('test', 2)), (16, ('test', 2)); +NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (4,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (7,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (14,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (16,'(test,2)'::coordinator_evaluation_combinations_modify.user_data) +NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (11,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (12,'(test,2)'::coordinator_evaluation_combinations_modify.user_data) PREPARE fast_path_router_with_two_params(user_data, int) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = $2 RETURNING user_id, u_data; EXECUTE fast_path_router_with_two_params(('test', 2)::user_data, 3); NOTICE: executing the command locally: DELETE FROM coordinator_evaluation_combinations_modify.user_info_data_1180001 user_info_data WHERE ((u_data OPERATOR(pg_catalog.=) '(test,2)'::coordinator_evaluation_combinations_modify.user_data) AND (user_id OPERATOR(pg_catalog.=) 3)) RETURNING user_id, u_data @@ -1319,6 +1325,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test3'', 3)'), (4, '(''test4'', 4)'), (7, '(''test7'', 7)'), (9, '(''test9'', 9)'), (11, '(''test11'', 11)'), (12, '(''test12'', 12)'), (14, '(''test14'', 14)'), (16, '(''test16'', 16)'); +NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(''test3'',3)'::coordinator_evaluation_combinations_modify.user_data), (4,'(''test4'',4)'::coordinator_evaluation_combinations_modify.user_data), (7,'(''test7'',7)'::coordinator_evaluation_combinations_modify.user_data), (14,'(''test14'',14)'::coordinator_evaluation_combinations_modify.user_data), (16,'(''test16'',16)'::coordinator_evaluation_combinations_modify.user_data) +NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(''test9'',9)'::coordinator_evaluation_combinations_modify.user_data), (11,'(''test11'',11)'::coordinator_evaluation_combinations_modify.user_data), (12,'(''test12'',12)'::coordinator_evaluation_combinations_modify.user_data) -- make sure that it is also true for fast-path router queries with paramaters PREPARE router_with_param(int) AS DELETE FROM user_info_data WHERE user_id = $1 AND user_id = $1 RETURNING user_id, u_data; execute router_with_param(3); @@ -1381,6 +1389,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES (3, '(''test'', 2)'), (4, '(''test'', 2)'), (7, '(''test'', 2)'), (9, '(''test'', 9)'), (11, '(''test'', 2)'), (12, '(''test'', 2)'), (14, '(''test'', 2)'), (16, '(''test'', 2)'); +NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (4,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (7,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (14,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (16,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data) +NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(''test'',9)'::coordinator_evaluation_combinations_modify.user_data), (11,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (12,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data) -- make sure that it is also true for fast-path router queries with paramaters PREPARE router_with_param_and_func(int) AS DELETE FROM user_info_data WHERE u_data = ('''test''', get_constant_stable())::user_data AND user_id = $1 AND user_id = $1 RETURNING user_id, u_data; execute router_with_param_and_func(3); @@ -1630,6 +1640,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES (3, ('test', 2)), (4, ('test', 2)), (7, ('test', 2)), (9, ('test', 2)), (11, ('test', 2)), (12, ('test', 2)), (14, ('test', 2)), (16, ('test', 2)); +NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (4,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (7,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (14,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (16,'(test,2)'::coordinator_evaluation_combinations_modify.user_data) +NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (11,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (12,'(test,2)'::coordinator_evaluation_combinations_modify.user_data) PREPARE router_with_two_params(user_data, int) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = $2 AND user_id = $2 RETURNING user_id, u_data; EXECUTE router_with_two_params(('test', 2)::user_data, 3); NOTICE: executing the command locally: DELETE FROM coordinator_evaluation_combinations_modify.user_info_data_1180001 user_info_data WHERE ((u_data OPERATOR(pg_catalog.=) $1::coordinator_evaluation_combinations_modify.user_data) AND (user_id OPERATOR(pg_catalog.=) $2) AND (user_id OPERATOR(pg_catalog.=) $2)) RETURNING user_id, u_data diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 7ebf60597..241ce0673 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -1035,10 +1035,10 @@ NOTICE: executing the command locally: INSERT INTO local_shard_execution.distri 5 | 55 | 22 (2 rows) --- distributed execution of multi-rows INSERTs, where some part of the execution --- could have been done via local execution but the executor choose the other way around --- because the command is a multi-shard query +-- distributed execution of multi-rows INSERTs, where executor +-- is smart enough to execute local tasks via local execution INSERT INTO distributed_table VALUES (1, '11',21), (2,'22',22), (3,'33',33), (4,'44',44),(5,'55',55) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *; +NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'11'::text,'21'::bigint), (5,'55'::text,'55'::bigint) ON CONFLICT(key) DO UPDATE SET value = (((excluded.value)::integer OPERATOR(pg_catalog.+) 1))::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age key | value | age --------------------------------------------------------------------- 1 | 12 | 21 diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index 8b485c182..8fdb42149 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -562,9 +562,8 @@ INSERT INTO reference_table VALUES (1),(2),(3),(4),(5),(6) RETURNING *; INSERT INTO distributed_table VALUES (1, '11',21), (5,'55',22) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *; --- distributed execution of multi-rows INSERTs, where some part of the execution --- could have been done via local execution but the executor choose the other way around --- because the command is a multi-shard query +-- distributed execution of multi-rows INSERTs, where executor +-- is smart enough to execute local tasks via local execution INSERT INTO distributed_table VALUES (1, '11',21), (2,'22',22), (3,'33',33), (4,'44',44),(5,'55',55) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *;