Local execution should not change hasReturning for distributed tables (#3160)

It looks like the logic to prevent RETURNING in reference tables to
have duplicate entries that comes from local and remote executions
leads to missing some tuples for distributed tables.

With this PR, we're ensuring to kick in the logic for reference tables
only.
pull/3165/head^2
Önder Kalacı 2019-11-08 12:49:56 +01:00 committed by GitHub
parent 9a31837647
commit 0b3d4e55d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 87 additions and 9 deletions

View File

@ -738,12 +738,6 @@ RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
static void static void
AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution) AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution)
{ {
/*
* Local execution already stores the tuples for returning, so we should not
* store again.
*/
execution->hasReturning = false;
/* we only need to execute the remote tasks */ /* we only need to execute the remote tasks */
execution->tasksToExecute = execution->remoteTaskList; execution->tasksToExecute = execution->remoteTaskList;
@ -1467,8 +1461,9 @@ AssignTasksToConnections(DistributedExecution *execution)
sizeof(TaskPlacementExecution *)); sizeof(TaskPlacementExecution *));
shardCommandExecution->placementExecutionCount = placementExecutionCount; shardCommandExecution->placementExecutionCount = placementExecutionCount;
shardCommandExecution->expectResults = hasReturning || shardCommandExecution->expectResults =
modLevel == ROW_MODIFY_READONLY; (hasReturning && !task->partiallyLocalOrRemote) ||
modLevel == ROW_MODIFY_READONLY;
foreach(taskPlacementCell, task->taskPlacementList) foreach(taskPlacementCell, task->taskPlacementList)
{ {

View File

@ -241,7 +241,7 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
} }
else else
{ {
Task *localTask = copyObject(task); Task *localTask = NULL;
Task *remoteTask = NULL; Task *remoteTask = NULL;
/* /*
@ -250,6 +250,9 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
* prefer to use local placement, and require remote placements only for * prefer to use local placement, and require remote placements only for
* modifications. * modifications.
*/ */
task->partiallyLocalOrRemote = true;
localTask = copyObject(task);
localTask->taskPlacementList = localTaskPlacementList; localTask->taskPlacementList = localTaskPlacementList;
*localTaskList = lappend(*localTaskList, localTask); *localTaskList = lappend(*localTaskList, localTask);

View File

@ -1616,6 +1616,7 @@ CreateTask(TaskType taskType)
task->relationRowLockList = NIL; task->relationRowLockList = NIL;
task->modifyWithSubquery = false; task->modifyWithSubquery = false;
task->partiallyLocalOrRemote = false;
task->relationShardList = NIL; task->relationShardList = NIL;
return task; return task;

View File

@ -262,6 +262,7 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_NODE_FIELD(relationShardList); COPY_NODE_FIELD(relationShardList);
COPY_NODE_FIELD(relationRowLockList); COPY_NODE_FIELD(relationRowLockList);
COPY_NODE_FIELD(rowValuesLists); COPY_NODE_FIELD(rowValuesLists);
COPY_SCALAR_FIELD(partiallyLocalOrRemote);
} }

View File

@ -476,6 +476,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_NODE_FIELD(relationShardList); WRITE_NODE_FIELD(relationShardList);
WRITE_NODE_FIELD(relationRowLockList); WRITE_NODE_FIELD(relationRowLockList);
WRITE_NODE_FIELD(rowValuesLists); WRITE_NODE_FIELD(rowValuesLists);
WRITE_BOOL_FIELD(partiallyLocalOrRemote);
} }

View File

@ -392,6 +392,7 @@ ReadTask(READFUNC_ARGS)
READ_NODE_FIELD(relationShardList); READ_NODE_FIELD(relationShardList);
READ_NODE_FIELD(relationRowLockList); READ_NODE_FIELD(relationRowLockList);
READ_NODE_FIELD(rowValuesLists); READ_NODE_FIELD(rowValuesLists);
READ_BOOL_FIELD(partiallyLocalOrRemote);
READ_DONE(); READ_DONE();
} }

View File

@ -213,6 +213,15 @@ typedef struct Task
List *relationShardList; List *relationShardList;
List *rowValuesLists; /* rows to use when building multi-row INSERT */ List *rowValuesLists; /* rows to use when building multi-row INSERT */
/*
* Used only when local execution happens. Indicates that this task is part of
* both local and remote executions. We use "or" in the field name because this
* is set to true for both the remote and local tasks generated for such
* executions. The most common example is modifications to reference tables where
* the task splitted into local and remote tasks.
*/
bool partiallyLocalOrRemote;
} Task; } Task;

View File

@ -1088,6 +1088,48 @@ LOG: executing the command locally: INSERT INTO local_shard_execution.reference
1000 1000
(1 row) (1 row)
-- clean the table for the next tests
SET search_path TO local_shard_execution;
TRUNCATE distributed_table CASCADE;
NOTICE: truncate cascades to table "second_distributed_table"
-- load some data on a remote shard
INSERT INTO reference_table (key) VALUES (1), (2);
LOG: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 AS citus_table_alias (key) VALUES (1), (2)
INSERT INTO distributed_table (key) VALUES (2);
BEGIN;
-- local execution followed by a distributed query
INSERT INTO distributed_table (key) VALUES (1);
LOG: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 (key) VALUES (1)
DELETE FROM distributed_table RETURNING key;
LOG: executing the command locally: DELETE FROM local_shard_execution.distributed_table_1470001 distributed_table RETURNING key
LOG: executing the command locally: DELETE FROM local_shard_execution.distributed_table_1470003 distributed_table RETURNING key
key
-----
1
2
(2 rows)
COMMIT;
-- a similar test with a reference table
TRUNCATE reference_table CASCADE;
NOTICE: truncate cascades to table "distributed_table"
NOTICE: truncate cascades to table "second_distributed_table"
-- load some data on a remote shard
INSERT INTO reference_table (key) VALUES (2);
LOG: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (2)
BEGIN;
-- local execution followed by a distributed query
INSERT INTO reference_table (key) VALUES (1);
LOG: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (1)
DELETE FROM reference_table RETURNING key;
LOG: executing the command locally: DELETE FROM local_shard_execution.reference_table_1470000 reference_table RETURNING key
key
-----
1
2
(2 rows)
COMMIT;
\c - - - :master_port \c - - - :master_port
-- local execution with custom type -- local execution with custom type
SET citus.replication_model TO "streaming"; SET citus.replication_model TO "streaming";

View File

@ -621,6 +621,31 @@ COMMIT;
-- Citus currently doesn't allow using task_assignment_policy for intermediate results -- Citus currently doesn't allow using task_assignment_policy for intermediate results
WITH distributed_local_mixed AS (INSERT INTO reference_table VALUES (1000) RETURNING *) SELECT * FROM distributed_local_mixed; WITH distributed_local_mixed AS (INSERT INTO reference_table VALUES (1000) RETURNING *) SELECT * FROM distributed_local_mixed;
-- clean the table for the next tests
SET search_path TO local_shard_execution;
TRUNCATE distributed_table CASCADE;
-- load some data on a remote shard
INSERT INTO reference_table (key) VALUES (1), (2);
INSERT INTO distributed_table (key) VALUES (2);
BEGIN;
-- local execution followed by a distributed query
INSERT INTO distributed_table (key) VALUES (1);
DELETE FROM distributed_table RETURNING key;
COMMIT;
-- a similar test with a reference table
TRUNCATE reference_table CASCADE;
-- load some data on a remote shard
INSERT INTO reference_table (key) VALUES (2);
BEGIN;
-- local execution followed by a distributed query
INSERT INTO reference_table (key) VALUES (1);
DELETE FROM reference_table RETURNING key;
COMMIT;
\c - - - :master_port \c - - - :master_port
-- local execution with custom type -- local execution with custom type