From 0b3d4e55d92184f1bcb53e49af75514ae5bde2e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96nder=20Kalac=C4=B1?= Date: Fri, 8 Nov 2019 12:49:56 +0100 Subject: [PATCH] Local execution should not change hasReturning for distributed tables (#3160) It looks like the logic to prevent RETURNING in reference tables to have duplicate entries that comes from local and remote executions leads to missing some tuples for distributed tables. With this PR, we're ensuring to kick in the logic for reference tables only. --- .../distributed/executor/adaptive_executor.c | 11 ++--- .../distributed/executor/local_executor.c | 5 ++- .../planner/multi_router_planner.c | 1 + .../distributed/utils/citus_copyfuncs.c | 1 + .../distributed/utils/citus_outfuncs.c | 1 + .../distributed/utils/citus_readfuncs.c | 1 + .../distributed/multi_physical_planner.h | 9 ++++ .../expected/local_shard_execution.out | 42 +++++++++++++++++++ .../regress/sql/local_shard_execution.sql | 25 +++++++++++ 9 files changed, 87 insertions(+), 9 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 652437589..cc53687cf 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -738,12 +738,6 @@ RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) static void AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution) { - /* - * Local execution already stores the tuples for returning, so we should not - * store again. - */ - execution->hasReturning = false; - /* we only need to execute the remote tasks */ execution->tasksToExecute = execution->remoteTaskList; @@ -1467,8 +1461,9 @@ AssignTasksToConnections(DistributedExecution *execution) sizeof(TaskPlacementExecution *)); shardCommandExecution->placementExecutionCount = placementExecutionCount; - shardCommandExecution->expectResults = hasReturning || - modLevel == ROW_MODIFY_READONLY; + shardCommandExecution->expectResults = + (hasReturning && !task->partiallyLocalOrRemote) || + modLevel == ROW_MODIFY_READONLY; foreach(taskPlacementCell, task->taskPlacementList) { diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 476903141..179ec498b 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -241,7 +241,7 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, } else { - Task *localTask = copyObject(task); + Task *localTask = NULL; Task *remoteTask = NULL; /* @@ -250,6 +250,9 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, * prefer to use local placement, and require remote placements only for * modifications. */ + task->partiallyLocalOrRemote = true; + + localTask = copyObject(task); localTask->taskPlacementList = localTaskPlacementList; *localTaskList = lappend(*localTaskList, localTask); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 30a866127..a3855b6f3 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1616,6 +1616,7 @@ CreateTask(TaskType taskType) task->relationRowLockList = NIL; task->modifyWithSubquery = false; + task->partiallyLocalOrRemote = false; task->relationShardList = NIL; return task; diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 8d0d26c24..a2fdc8c10 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -262,6 +262,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_NODE_FIELD(relationShardList); COPY_NODE_FIELD(relationRowLockList); COPY_NODE_FIELD(rowValuesLists); + COPY_SCALAR_FIELD(partiallyLocalOrRemote); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 1ec507856..d293e2a00 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -476,6 +476,7 @@ OutTask(OUTFUNC_ARGS) WRITE_NODE_FIELD(relationShardList); WRITE_NODE_FIELD(relationRowLockList); WRITE_NODE_FIELD(rowValuesLists); + WRITE_BOOL_FIELD(partiallyLocalOrRemote); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 265da3598..bea14e73f 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -392,6 +392,7 @@ ReadTask(READFUNC_ARGS) READ_NODE_FIELD(relationShardList); READ_NODE_FIELD(relationRowLockList); READ_NODE_FIELD(rowValuesLists); + READ_BOOL_FIELD(partiallyLocalOrRemote); READ_DONE(); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 9e0048596..a3ddd1a94 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -213,6 +213,15 @@ typedef struct Task List *relationShardList; List *rowValuesLists; /* rows to use when building multi-row INSERT */ + + /* + * Used only when local execution happens. Indicates that this task is part of + * both local and remote executions. We use "or" in the field name because this + * is set to true for both the remote and local tasks generated for such + * executions. The most common example is modifications to reference tables where + * the task splitted into local and remote tasks. + */ + bool partiallyLocalOrRemote; } Task; diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index c487b2416..f38219205 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -1088,6 +1088,48 @@ LOG: executing the command locally: INSERT INTO local_shard_execution.reference 1000 (1 row) +-- clean the table for the next tests +SET search_path TO local_shard_execution; +TRUNCATE distributed_table CASCADE; +NOTICE: truncate cascades to table "second_distributed_table" +-- load some data on a remote shard +INSERT INTO reference_table (key) VALUES (1), (2); +LOG: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 AS citus_table_alias (key) VALUES (1), (2) +INSERT INTO distributed_table (key) VALUES (2); +BEGIN; + -- local execution followed by a distributed query + INSERT INTO distributed_table (key) VALUES (1); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 (key) VALUES (1) + DELETE FROM distributed_table RETURNING key; +LOG: executing the command locally: DELETE FROM local_shard_execution.distributed_table_1470001 distributed_table RETURNING key +LOG: executing the command locally: DELETE FROM local_shard_execution.distributed_table_1470003 distributed_table RETURNING key + key +----- + 1 + 2 +(2 rows) + +COMMIT; +-- a similar test with a reference table +TRUNCATE reference_table CASCADE; +NOTICE: truncate cascades to table "distributed_table" +NOTICE: truncate cascades to table "second_distributed_table" +-- load some data on a remote shard +INSERT INTO reference_table (key) VALUES (2); +LOG: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (2) +BEGIN; + -- local execution followed by a distributed query + INSERT INTO reference_table (key) VALUES (1); +LOG: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (1) + DELETE FROM reference_table RETURNING key; +LOG: executing the command locally: DELETE FROM local_shard_execution.reference_table_1470000 reference_table RETURNING key + key +----- + 1 + 2 +(2 rows) + +COMMIT; \c - - - :master_port -- local execution with custom type SET citus.replication_model TO "streaming"; diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index 7099690cf..3f04dedc6 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -621,6 +621,31 @@ COMMIT; -- Citus currently doesn't allow using task_assignment_policy for intermediate results WITH distributed_local_mixed AS (INSERT INTO reference_table VALUES (1000) RETURNING *) SELECT * FROM distributed_local_mixed; +-- clean the table for the next tests +SET search_path TO local_shard_execution; +TRUNCATE distributed_table CASCADE; + +-- load some data on a remote shard +INSERT INTO reference_table (key) VALUES (1), (2); +INSERT INTO distributed_table (key) VALUES (2); +BEGIN; + -- local execution followed by a distributed query + INSERT INTO distributed_table (key) VALUES (1); + DELETE FROM distributed_table RETURNING key; +COMMIT; + +-- a similar test with a reference table +TRUNCATE reference_table CASCADE; + +-- load some data on a remote shard +INSERT INTO reference_table (key) VALUES (2); +BEGIN; + -- local execution followed by a distributed query + INSERT INTO reference_table (key) VALUES (1); + DELETE FROM reference_table RETURNING key; +COMMIT; + + \c - - - :master_port -- local execution with custom type