Multi-row INSERTs use local execution when placements are local

Multi-row execution already uses sequential execution. When shards
are local, using local execution is profitable as it avoids
an extra connection establishment to the local node.
pull/4374/head
Onder Kalaci 2020-12-01 21:37:59 +03:00
parent ea79ca0e5e
commit f7e1aa3f22
6 changed files with 26 additions and 11 deletions

View File

@ -581,7 +581,6 @@ static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLev
static void StartDistributedExecution(DistributedExecution *execution); static void StartDistributedExecution(DistributedExecution *execution);
static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution); static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution);
static void RunDistributedExecution(DistributedExecution *execution); static void RunDistributedExecution(DistributedExecution *execution);
static bool ShouldRunTasksSequentially(List *taskList);
static void SequentialRunDistributedExecution(DistributedExecution *execution); static void SequentialRunDistributedExecution(DistributedExecution *execution);
static void FinishDistributedExecution(DistributedExecution *execution); static void FinishDistributedExecution(DistributedExecution *execution);
@ -2105,7 +2104,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
* true sequential execution, concurrent multi-row upserts could easily form * true sequential execution, concurrent multi-row upserts could easily form
* a distributed deadlock when the upserts touch the same rows. * a distributed deadlock when the upserts touch the same rows.
*/ */
static bool bool
ShouldRunTasksSequentially(List *taskList) ShouldRunTasksSequentially(List *taskList)
{ {
if (list_length(taskList) < 2) if (list_length(taskList) < 2)

View File

@ -80,6 +80,7 @@
#include "distributed/pg_version_constants.h" #include "distributed/pg_version_constants.h"
#include "distributed/adaptive_executor.h"
#include "distributed/commands/utility_hook.h" #include "distributed/commands/utility_hook.h"
#include "distributed/citus_custom_scan.h" #include "distributed/citus_custom_scan.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
@ -764,11 +765,14 @@ ShouldExecuteTasksLocally(List *taskList)
{ {
/* /*
* For multi-task executions, we prefer to use connections for parallelism, * For multi-task executions, we prefer to use connections for parallelism,
* except when in a multi-statement transaction since there could be other * except for two cases. First, when in a multi-statement transaction since
* commands that require local execution. * there could be other commands that require local execution. Second, the
* task list already requires sequential execution. In that case, connection
* establishment becomes an unnecessary operation.
*/ */
return IsMultiStatementTransaction() && AnyTaskAccessesLocalNode(taskList); return (IsMultiStatementTransaction() || ShouldRunTasksSequentially(taskList)) &&
AnyTaskAccessesLocalNode(taskList);
} }
return false; return false;

View File

@ -11,6 +11,7 @@ extern bool EnableBinaryProtocol;
/* GUC, number of ms to wait between opening connections to the same worker */ /* GUC, number of ms to wait between opening connections to the same worker */
extern int ExecutorSlowStartInterval; extern int ExecutorSlowStartInterval;
extern bool ShouldRunTasksSequentially(List *taskList);
extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported); extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);
extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize, extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize,
bool localExecutionSupported); bool localExecutionSupported);

View File

@ -831,6 +831,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES
(3, '(''test3'', 3)'), (4, '(''test4'', 4)'), (7, '(''test7'', 7)'), (3, '(''test3'', 3)'), (4, '(''test4'', 4)'), (7, '(''test7'', 7)'),
(9, '(''test9'', 9)'), (11, '(''test11'', 11)'), (12, '(''test12'', 12)'), (9, '(''test9'', 9)'), (11, '(''test11'', 11)'), (12, '(''test12'', 12)'),
(14, '(''test14'', 14)'), (16, '(''test16'', 16)'); (14, '(''test14'', 14)'), (16, '(''test16'', 16)');
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(''test3'',3)'::coordinator_evaluation_combinations_modify.user_data), (4,'(''test4'',4)'::coordinator_evaluation_combinations_modify.user_data), (7,'(''test7'',7)'::coordinator_evaluation_combinations_modify.user_data), (14,'(''test14'',14)'::coordinator_evaluation_combinations_modify.user_data), (16,'(''test16'',16)'::coordinator_evaluation_combinations_modify.user_data)
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(''test9'',9)'::coordinator_evaluation_combinations_modify.user_data), (11,'(''test11'',11)'::coordinator_evaluation_combinations_modify.user_data), (12,'(''test12'',12)'::coordinator_evaluation_combinations_modify.user_data)
-- make sure that it is also true for fast-path router queries with paramaters -- make sure that it is also true for fast-path router queries with paramaters
PREPARE fast_path_router_with_param(int) AS DELETE FROM user_info_data WHERE user_id = $1 RETURNING user_id, u_data; PREPARE fast_path_router_with_param(int) AS DELETE FROM user_info_data WHERE user_id = $1 RETURNING user_id, u_data;
execute fast_path_router_with_param(3); execute fast_path_router_with_param(3);
@ -893,6 +895,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES
(3, '(''test'', 2)'), (4, '(''test'', 2)'), (7, '(''test'', 2)'), (3, '(''test'', 2)'), (4, '(''test'', 2)'), (7, '(''test'', 2)'),
(9, '(''test'', 9)'), (11, '(''test'', 2)'), (12, '(''test'', 2)'), (9, '(''test'', 9)'), (11, '(''test'', 2)'), (12, '(''test'', 2)'),
(14, '(''test'', 2)'), (16, '(''test'', 2)'); (14, '(''test'', 2)'), (16, '(''test'', 2)');
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (4,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (7,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (14,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (16,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data)
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(''test'',9)'::coordinator_evaluation_combinations_modify.user_data), (11,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (12,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data)
-- make sure that it is also true for fast-path router queries with paramaters -- make sure that it is also true for fast-path router queries with paramaters
PREPARE fast_path_router_with_param_and_func(int) AS DELETE FROM user_info_data WHERE u_data = ('''test''', get_constant_stable())::user_data AND user_id = $1 RETURNING user_id, u_data; PREPARE fast_path_router_with_param_and_func(int) AS DELETE FROM user_info_data WHERE u_data = ('''test''', get_constant_stable())::user_data AND user_id = $1 RETURNING user_id, u_data;
execute fast_path_router_with_param_and_func(3); execute fast_path_router_with_param_and_func(3);
@ -1142,6 +1146,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES
(3, ('test', 2)), (4, ('test', 2)), (7, ('test', 2)), (3, ('test', 2)), (4, ('test', 2)), (7, ('test', 2)),
(9, ('test', 2)), (11, ('test', 2)), (12, ('test', 2)), (9, ('test', 2)), (11, ('test', 2)), (12, ('test', 2)),
(14, ('test', 2)), (16, ('test', 2)); (14, ('test', 2)), (16, ('test', 2));
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (4,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (7,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (14,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (16,'(test,2)'::coordinator_evaluation_combinations_modify.user_data)
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (11,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (12,'(test,2)'::coordinator_evaluation_combinations_modify.user_data)
PREPARE fast_path_router_with_two_params(user_data, int) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = $2 RETURNING user_id, u_data; PREPARE fast_path_router_with_two_params(user_data, int) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = $2 RETURNING user_id, u_data;
EXECUTE fast_path_router_with_two_params(('test', 2)::user_data, 3); EXECUTE fast_path_router_with_two_params(('test', 2)::user_data, 3);
NOTICE: executing the command locally: DELETE FROM coordinator_evaluation_combinations_modify.user_info_data_1180001 user_info_data WHERE ((u_data OPERATOR(pg_catalog.=) '(test,2)'::coordinator_evaluation_combinations_modify.user_data) AND (user_id OPERATOR(pg_catalog.=) 3)) RETURNING user_id, u_data NOTICE: executing the command locally: DELETE FROM coordinator_evaluation_combinations_modify.user_info_data_1180001 user_info_data WHERE ((u_data OPERATOR(pg_catalog.=) '(test,2)'::coordinator_evaluation_combinations_modify.user_data) AND (user_id OPERATOR(pg_catalog.=) 3)) RETURNING user_id, u_data
@ -1319,6 +1325,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES
(3, '(''test3'', 3)'), (4, '(''test4'', 4)'), (7, '(''test7'', 7)'), (3, '(''test3'', 3)'), (4, '(''test4'', 4)'), (7, '(''test7'', 7)'),
(9, '(''test9'', 9)'), (11, '(''test11'', 11)'), (12, '(''test12'', 12)'), (9, '(''test9'', 9)'), (11, '(''test11'', 11)'), (12, '(''test12'', 12)'),
(14, '(''test14'', 14)'), (16, '(''test16'', 16)'); (14, '(''test14'', 14)'), (16, '(''test16'', 16)');
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(''test3'',3)'::coordinator_evaluation_combinations_modify.user_data), (4,'(''test4'',4)'::coordinator_evaluation_combinations_modify.user_data), (7,'(''test7'',7)'::coordinator_evaluation_combinations_modify.user_data), (14,'(''test14'',14)'::coordinator_evaluation_combinations_modify.user_data), (16,'(''test16'',16)'::coordinator_evaluation_combinations_modify.user_data)
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(''test9'',9)'::coordinator_evaluation_combinations_modify.user_data), (11,'(''test11'',11)'::coordinator_evaluation_combinations_modify.user_data), (12,'(''test12'',12)'::coordinator_evaluation_combinations_modify.user_data)
-- make sure that it is also true for fast-path router queries with paramaters -- make sure that it is also true for fast-path router queries with paramaters
PREPARE router_with_param(int) AS DELETE FROM user_info_data WHERE user_id = $1 AND user_id = $1 RETURNING user_id, u_data; PREPARE router_with_param(int) AS DELETE FROM user_info_data WHERE user_id = $1 AND user_id = $1 RETURNING user_id, u_data;
execute router_with_param(3); execute router_with_param(3);
@ -1381,6 +1389,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES
(3, '(''test'', 2)'), (4, '(''test'', 2)'), (7, '(''test'', 2)'), (3, '(''test'', 2)'), (4, '(''test'', 2)'), (7, '(''test'', 2)'),
(9, '(''test'', 9)'), (11, '(''test'', 2)'), (12, '(''test'', 2)'), (9, '(''test'', 9)'), (11, '(''test'', 2)'), (12, '(''test'', 2)'),
(14, '(''test'', 2)'), (16, '(''test'', 2)'); (14, '(''test'', 2)'), (16, '(''test'', 2)');
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (4,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (7,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (14,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (16,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data)
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(''test'',9)'::coordinator_evaluation_combinations_modify.user_data), (11,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (12,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data)
-- make sure that it is also true for fast-path router queries with paramaters -- make sure that it is also true for fast-path router queries with paramaters
PREPARE router_with_param_and_func(int) AS DELETE FROM user_info_data WHERE u_data = ('''test''', get_constant_stable())::user_data AND user_id = $1 AND user_id = $1 RETURNING user_id, u_data; PREPARE router_with_param_and_func(int) AS DELETE FROM user_info_data WHERE u_data = ('''test''', get_constant_stable())::user_data AND user_id = $1 AND user_id = $1 RETURNING user_id, u_data;
execute router_with_param_and_func(3); execute router_with_param_and_func(3);
@ -1630,6 +1640,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES
(3, ('test', 2)), (4, ('test', 2)), (7, ('test', 2)), (3, ('test', 2)), (4, ('test', 2)), (7, ('test', 2)),
(9, ('test', 2)), (11, ('test', 2)), (12, ('test', 2)), (9, ('test', 2)), (11, ('test', 2)), (12, ('test', 2)),
(14, ('test', 2)), (16, ('test', 2)); (14, ('test', 2)), (16, ('test', 2));
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (4,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (7,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (14,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (16,'(test,2)'::coordinator_evaluation_combinations_modify.user_data)
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (11,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (12,'(test,2)'::coordinator_evaluation_combinations_modify.user_data)
PREPARE router_with_two_params(user_data, int) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = $2 AND user_id = $2 RETURNING user_id, u_data; PREPARE router_with_two_params(user_data, int) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = $2 AND user_id = $2 RETURNING user_id, u_data;
EXECUTE router_with_two_params(('test', 2)::user_data, 3); EXECUTE router_with_two_params(('test', 2)::user_data, 3);
NOTICE: executing the command locally: DELETE FROM coordinator_evaluation_combinations_modify.user_info_data_1180001 user_info_data WHERE ((u_data OPERATOR(pg_catalog.=) $1::coordinator_evaluation_combinations_modify.user_data) AND (user_id OPERATOR(pg_catalog.=) $2) AND (user_id OPERATOR(pg_catalog.=) $2)) RETURNING user_id, u_data NOTICE: executing the command locally: DELETE FROM coordinator_evaluation_combinations_modify.user_info_data_1180001 user_info_data WHERE ((u_data OPERATOR(pg_catalog.=) $1::coordinator_evaluation_combinations_modify.user_data) AND (user_id OPERATOR(pg_catalog.=) $2) AND (user_id OPERATOR(pg_catalog.=) $2)) RETURNING user_id, u_data

View File

@ -1035,10 +1035,10 @@ NOTICE: executing the command locally: INSERT INTO local_shard_execution.distri
5 | 55 | 22 5 | 55 | 22
(2 rows) (2 rows)
-- distributed execution of multi-rows INSERTs, where some part of the execution -- distributed execution of multi-rows INSERTs, where executor
-- could have been done via local execution but the executor choose the other way around -- is smart enough to execute local tasks via local execution
-- because the command is a multi-shard query
INSERT INTO distributed_table VALUES (1, '11',21), (2,'22',22), (3,'33',33), (4,'44',44),(5,'55',55) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *; INSERT INTO distributed_table VALUES (1, '11',21), (2,'22',22), (3,'33',33), (4,'44',44),(5,'55',55) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *;
NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'11'::text,'21'::bigint), (5,'55'::text,'55'::bigint) ON CONFLICT(key) DO UPDATE SET value = (((excluded.value)::integer OPERATOR(pg_catalog.+) 1))::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age
key | value | age key | value | age
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 12 | 21 1 | 12 | 21

View File

@ -562,9 +562,8 @@ INSERT INTO reference_table VALUES (1),(2),(3),(4),(5),(6) RETURNING *;
INSERT INTO distributed_table VALUES (1, '11',21), (5,'55',22) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *; INSERT INTO distributed_table VALUES (1, '11',21), (5,'55',22) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *;
-- distributed execution of multi-rows INSERTs, where some part of the execution -- distributed execution of multi-rows INSERTs, where executor
-- could have been done via local execution but the executor choose the other way around -- is smart enough to execute local tasks via local execution
-- because the command is a multi-shard query
INSERT INTO distributed_table VALUES (1, '11',21), (2,'22',22), (3,'33',33), (4,'44',44),(5,'55',55) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *; INSERT INTO distributed_table VALUES (1, '11',21), (2,'22',22), (3,'33',33), (4,'44',44),(5,'55',55) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *;