Merge pull request #4374 from citusdata/sequential_execution_use_lcao

Multi-row INSERTs use local execution when placements are local
pull/4305/head^2
Önder Kalacı 2020-12-01 22:45:35 +03:00 committed by GitHub
commit 48d6266fd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 26 additions and 11 deletions

View File

@ -581,7 +581,6 @@ static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLev
static void StartDistributedExecution(DistributedExecution *execution);
static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution);
static void RunDistributedExecution(DistributedExecution *execution);
static bool ShouldRunTasksSequentially(List *taskList);
static void SequentialRunDistributedExecution(DistributedExecution *execution);
static void FinishDistributedExecution(DistributedExecution *execution);
@ -2105,7 +2104,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
* true sequential execution, concurrent multi-row upserts could easily form
* a distributed deadlock when the upserts touch the same rows.
*/
static bool
bool
ShouldRunTasksSequentially(List *taskList)
{
if (list_length(taskList) < 2)

View File

@ -80,6 +80,7 @@
#include "distributed/pg_version_constants.h"
#include "distributed/adaptive_executor.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/citus_ruleutils.h"
@ -764,11 +765,14 @@ ShouldExecuteTasksLocally(List *taskList)
{
/*
* For multi-task executions, we prefer to use connections for parallelism,
* except when in a multi-statement transaction since there could be other
* commands that require local execution.
* except for two cases. First, when in a multi-statement transaction since
* there could be other commands that require local execution. Second, the
* task list already requires sequential execution. In that case, connection
* establishment becomes an unnecessary operation.
*/
return IsMultiStatementTransaction() && AnyTaskAccessesLocalNode(taskList);
return (IsMultiStatementTransaction() || ShouldRunTasksSequentially(taskList)) &&
AnyTaskAccessesLocalNode(taskList);
}
return false;

View File

@ -11,6 +11,7 @@ extern bool EnableBinaryProtocol;
/* GUC, number of ms to wait between opening connections to the same worker */
extern int ExecutorSlowStartInterval;
extern bool ShouldRunTasksSequentially(List *taskList);
extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);
extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize,
bool localExecutionSupported);

View File

@ -831,6 +831,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES
(3, '(''test3'', 3)'), (4, '(''test4'', 4)'), (7, '(''test7'', 7)'),
(9, '(''test9'', 9)'), (11, '(''test11'', 11)'), (12, '(''test12'', 12)'),
(14, '(''test14'', 14)'), (16, '(''test16'', 16)');
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(''test3'',3)'::coordinator_evaluation_combinations_modify.user_data), (4,'(''test4'',4)'::coordinator_evaluation_combinations_modify.user_data), (7,'(''test7'',7)'::coordinator_evaluation_combinations_modify.user_data), (14,'(''test14'',14)'::coordinator_evaluation_combinations_modify.user_data), (16,'(''test16'',16)'::coordinator_evaluation_combinations_modify.user_data)
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(''test9'',9)'::coordinator_evaluation_combinations_modify.user_data), (11,'(''test11'',11)'::coordinator_evaluation_combinations_modify.user_data), (12,'(''test12'',12)'::coordinator_evaluation_combinations_modify.user_data)
-- make sure that it is also true for fast-path router queries with paramaters
PREPARE fast_path_router_with_param(int) AS DELETE FROM user_info_data WHERE user_id = $1 RETURNING user_id, u_data;
execute fast_path_router_with_param(3);
@ -893,6 +895,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES
(3, '(''test'', 2)'), (4, '(''test'', 2)'), (7, '(''test'', 2)'),
(9, '(''test'', 9)'), (11, '(''test'', 2)'), (12, '(''test'', 2)'),
(14, '(''test'', 2)'), (16, '(''test'', 2)');
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (4,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (7,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (14,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (16,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data)
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(''test'',9)'::coordinator_evaluation_combinations_modify.user_data), (11,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (12,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data)
-- make sure that it is also true for fast-path router queries with paramaters
PREPARE fast_path_router_with_param_and_func(int) AS DELETE FROM user_info_data WHERE u_data = ('''test''', get_constant_stable())::user_data AND user_id = $1 RETURNING user_id, u_data;
execute fast_path_router_with_param_and_func(3);
@ -1142,6 +1146,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES
(3, ('test', 2)), (4, ('test', 2)), (7, ('test', 2)),
(9, ('test', 2)), (11, ('test', 2)), (12, ('test', 2)),
(14, ('test', 2)), (16, ('test', 2));
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (4,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (7,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (14,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (16,'(test,2)'::coordinator_evaluation_combinations_modify.user_data)
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (11,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (12,'(test,2)'::coordinator_evaluation_combinations_modify.user_data)
PREPARE fast_path_router_with_two_params(user_data, int) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = $2 RETURNING user_id, u_data;
EXECUTE fast_path_router_with_two_params(('test', 2)::user_data, 3);
NOTICE: executing the command locally: DELETE FROM coordinator_evaluation_combinations_modify.user_info_data_1180001 user_info_data WHERE ((u_data OPERATOR(pg_catalog.=) '(test,2)'::coordinator_evaluation_combinations_modify.user_data) AND (user_id OPERATOR(pg_catalog.=) 3)) RETURNING user_id, u_data
@ -1319,6 +1325,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES
(3, '(''test3'', 3)'), (4, '(''test4'', 4)'), (7, '(''test7'', 7)'),
(9, '(''test9'', 9)'), (11, '(''test11'', 11)'), (12, '(''test12'', 12)'),
(14, '(''test14'', 14)'), (16, '(''test16'', 16)');
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(''test3'',3)'::coordinator_evaluation_combinations_modify.user_data), (4,'(''test4'',4)'::coordinator_evaluation_combinations_modify.user_data), (7,'(''test7'',7)'::coordinator_evaluation_combinations_modify.user_data), (14,'(''test14'',14)'::coordinator_evaluation_combinations_modify.user_data), (16,'(''test16'',16)'::coordinator_evaluation_combinations_modify.user_data)
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(''test9'',9)'::coordinator_evaluation_combinations_modify.user_data), (11,'(''test11'',11)'::coordinator_evaluation_combinations_modify.user_data), (12,'(''test12'',12)'::coordinator_evaluation_combinations_modify.user_data)
-- make sure that it is also true for fast-path router queries with paramaters
PREPARE router_with_param(int) AS DELETE FROM user_info_data WHERE user_id = $1 AND user_id = $1 RETURNING user_id, u_data;
execute router_with_param(3);
@ -1381,6 +1389,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES
(3, '(''test'', 2)'), (4, '(''test'', 2)'), (7, '(''test'', 2)'),
(9, '(''test'', 9)'), (11, '(''test'', 2)'), (12, '(''test'', 2)'),
(14, '(''test'', 2)'), (16, '(''test'', 2)');
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (4,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (7,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (14,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (16,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data)
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(''test'',9)'::coordinator_evaluation_combinations_modify.user_data), (11,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data), (12,'(''test'',2)'::coordinator_evaluation_combinations_modify.user_data)
-- make sure that it is also true for fast-path router queries with paramaters
PREPARE router_with_param_and_func(int) AS DELETE FROM user_info_data WHERE u_data = ('''test''', get_constant_stable())::user_data AND user_id = $1 AND user_id = $1 RETURNING user_id, u_data;
execute router_with_param_and_func(3);
@ -1630,6 +1640,8 @@ INSERT INTO user_info_data (user_id, u_data) VALUES
(3, ('test', 2)), (4, ('test', 2)), (7, ('test', 2)),
(9, ('test', 2)), (11, ('test', 2)), (12, ('test', 2)),
(14, ('test', 2)), (16, ('test', 2));
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180001 AS citus_table_alias (user_id, u_data) VALUES (3,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (4,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (7,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (14,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (16,'(test,2)'::coordinator_evaluation_combinations_modify.user_data)
NOTICE: executing the command locally: INSERT INTO coordinator_evaluation_combinations_modify.user_info_data_1180003 AS citus_table_alias (user_id, u_data) VALUES (9,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (11,'(test,2)'::coordinator_evaluation_combinations_modify.user_data), (12,'(test,2)'::coordinator_evaluation_combinations_modify.user_data)
PREPARE router_with_two_params(user_data, int) AS DELETE FROM user_info_data WHERE u_data = $1 AND user_id = $2 AND user_id = $2 RETURNING user_id, u_data;
EXECUTE router_with_two_params(('test', 2)::user_data, 3);
NOTICE: executing the command locally: DELETE FROM coordinator_evaluation_combinations_modify.user_info_data_1180001 user_info_data WHERE ((u_data OPERATOR(pg_catalog.=) $1::coordinator_evaluation_combinations_modify.user_data) AND (user_id OPERATOR(pg_catalog.=) $2) AND (user_id OPERATOR(pg_catalog.=) $2)) RETURNING user_id, u_data

View File

@ -1035,10 +1035,10 @@ NOTICE: executing the command locally: INSERT INTO local_shard_execution.distri
5 | 55 | 22
(2 rows)
-- distributed execution of multi-rows INSERTs, where some part of the execution
-- could have been done via local execution but the executor choose the other way around
-- because the command is a multi-shard query
-- distributed execution of multi-rows INSERTs, where executor
-- is smart enough to execute local tasks via local execution
INSERT INTO distributed_table VALUES (1, '11',21), (2,'22',22), (3,'33',33), (4,'44',44),(5,'55',55) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *;
NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1,'11'::text,'21'::bigint), (5,'55'::text,'55'::bigint) ON CONFLICT(key) DO UPDATE SET value = (((excluded.value)::integer OPERATOR(pg_catalog.+) 1))::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age
key | value | age
---------------------------------------------------------------------
1 | 12 | 21

View File

@ -562,9 +562,8 @@ INSERT INTO reference_table VALUES (1),(2),(3),(4),(5),(6) RETURNING *;
INSERT INTO distributed_table VALUES (1, '11',21), (5,'55',22) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *;
-- distributed execution of multi-rows INSERTs, where some part of the execution
-- could have been done via local execution but the executor choose the other way around
-- because the command is a multi-shard query
-- distributed execution of multi-rows INSERTs, where executor
-- is smart enough to execute local tasks via local execution
INSERT INTO distributed_table VALUES (1, '11',21), (2,'22',22), (3,'33',33), (4,'44',44),(5,'55',55) ON CONFLICT(key) DO UPDATE SET value = (EXCLUDED.value::int + 1)::text RETURNING *;