diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 652437589..cc53687cf 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -738,12 +738,6 @@ RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) static void AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution) { - /* - * Local execution already stores the tuples for returning, so we should not - * store again. - */ - execution->hasReturning = false; - /* we only need to execute the remote tasks */ execution->tasksToExecute = execution->remoteTaskList; @@ -1467,8 +1461,9 @@ AssignTasksToConnections(DistributedExecution *execution) sizeof(TaskPlacementExecution *)); shardCommandExecution->placementExecutionCount = placementExecutionCount; - shardCommandExecution->expectResults = hasReturning || - modLevel == ROW_MODIFY_READONLY; + shardCommandExecution->expectResults = + (hasReturning && !task->partiallyLocalOrRemote) || + modLevel == ROW_MODIFY_READONLY; foreach(taskPlacementCell, task->taskPlacementList) { diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 476903141..179ec498b 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -241,7 +241,7 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, } else { - Task *localTask = copyObject(task); + Task *localTask = NULL; Task *remoteTask = NULL; /* @@ -250,6 +250,9 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, * prefer to use local placement, and require remote placements only for * modifications. */ + task->partiallyLocalOrRemote = true; + + localTask = copyObject(task); localTask->taskPlacementList = localTaskPlacementList; *localTaskList = lappend(*localTaskList, localTask); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 30a866127..a3855b6f3 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1616,6 +1616,7 @@ CreateTask(TaskType taskType) task->relationRowLockList = NIL; task->modifyWithSubquery = false; + task->partiallyLocalOrRemote = false; task->relationShardList = NIL; return task; diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 8d0d26c24..a2fdc8c10 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -262,6 +262,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_NODE_FIELD(relationShardList); COPY_NODE_FIELD(relationRowLockList); COPY_NODE_FIELD(rowValuesLists); + COPY_SCALAR_FIELD(partiallyLocalOrRemote); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 1ec507856..d293e2a00 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -476,6 +476,7 @@ OutTask(OUTFUNC_ARGS) WRITE_NODE_FIELD(relationShardList); WRITE_NODE_FIELD(relationRowLockList); WRITE_NODE_FIELD(rowValuesLists); + WRITE_BOOL_FIELD(partiallyLocalOrRemote); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 265da3598..bea14e73f 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -392,6 +392,7 @@ ReadTask(READFUNC_ARGS) READ_NODE_FIELD(relationShardList); READ_NODE_FIELD(relationRowLockList); READ_NODE_FIELD(rowValuesLists); + READ_BOOL_FIELD(partiallyLocalOrRemote); READ_DONE(); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 9e0048596..a3ddd1a94 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -213,6 +213,15 @@ typedef struct Task List *relationShardList; List *rowValuesLists; /* rows to use when building multi-row INSERT */ + + /* + * Used only when local execution happens. Indicates that this task is part of + * both local and remote executions. We use "or" in the field name because this + * is set to true for both the remote and local tasks generated for such + * executions. The most common example is modifications to reference tables where + * the task splitted into local and remote tasks. + */ + bool partiallyLocalOrRemote; } Task; diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index c487b2416..f38219205 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -1088,6 +1088,48 @@ LOG: executing the command locally: INSERT INTO local_shard_execution.reference 1000 (1 row) +-- clean the table for the next tests +SET search_path TO local_shard_execution; +TRUNCATE distributed_table CASCADE; +NOTICE: truncate cascades to table "second_distributed_table" +-- load some data on a remote shard +INSERT INTO reference_table (key) VALUES (1), (2); +LOG: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 AS citus_table_alias (key) VALUES (1), (2) +INSERT INTO distributed_table (key) VALUES (2); +BEGIN; + -- local execution followed by a distributed query + INSERT INTO distributed_table (key) VALUES (1); +LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 (key) VALUES (1) + DELETE FROM distributed_table RETURNING key; +LOG: executing the command locally: DELETE FROM local_shard_execution.distributed_table_1470001 distributed_table RETURNING key +LOG: executing the command locally: DELETE FROM local_shard_execution.distributed_table_1470003 distributed_table RETURNING key + key +----- + 1 + 2 +(2 rows) + +COMMIT; +-- a similar test with a reference table +TRUNCATE reference_table CASCADE; +NOTICE: truncate cascades to table "distributed_table" +NOTICE: truncate cascades to table "second_distributed_table" +-- load some data on a remote shard +INSERT INTO reference_table (key) VALUES (2); +LOG: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (2) +BEGIN; + -- local execution followed by a distributed query + INSERT INTO reference_table (key) VALUES (1); +LOG: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (1) + DELETE FROM reference_table RETURNING key; +LOG: executing the command locally: DELETE FROM local_shard_execution.reference_table_1470000 reference_table RETURNING key + key +----- + 1 + 2 +(2 rows) + +COMMIT; \c - - - :master_port -- local execution with custom type SET citus.replication_model TO "streaming"; diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index 7099690cf..3f04dedc6 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -621,6 +621,31 @@ COMMIT; -- Citus currently doesn't allow using task_assignment_policy for intermediate results WITH distributed_local_mixed AS (INSERT INTO reference_table VALUES (1000) RETURNING *) SELECT * FROM distributed_local_mixed; +-- clean the table for the next tests +SET search_path TO local_shard_execution; +TRUNCATE distributed_table CASCADE; + +-- load some data on a remote shard +INSERT INTO reference_table (key) VALUES (1), (2); +INSERT INTO distributed_table (key) VALUES (2); +BEGIN; + -- local execution followed by a distributed query + INSERT INTO distributed_table (key) VALUES (1); + DELETE FROM distributed_table RETURNING key; +COMMIT; + +-- a similar test with a reference table +TRUNCATE reference_table CASCADE; + +-- load some data on a remote shard +INSERT INTO reference_table (key) VALUES (2); +BEGIN; + -- local execution followed by a distributed query + INSERT INTO reference_table (key) VALUES (1); + DELETE FROM reference_table RETURNING key; +COMMIT; + + \c - - - :master_port -- local execution with custom type