Merge pull request #2781 from citusdata/refactor_ExecuteModifyTasksSequentially

Refactor ExecuteModifyTasksSequentially.
pull/2780/head
Hadi Moshayedi 2019-06-20 18:45:33 +02:00 committed by GitHub
commit 17d4d3e5ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 64 deletions

View File

@ -91,8 +91,6 @@ bool SortReturning = false;
/* functions needed during run phase */ /* functions needed during run phase */
static void AcquireMetadataLocks(List *taskList); static void AcquireMetadataLocks(List *taskList);
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType);
static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType
operation, bool alwaysThrowErrorOnFailure, bool operation, bool alwaysThrowErrorOnFailure, bool
expectResults); expectResults);
@ -732,59 +730,13 @@ RouterSequentialModifyExecScan(CustomScanState *node)
bool hasReturning = distributedPlan->hasReturning; bool hasReturning = distributedPlan->hasReturning;
Job *workerJob = distributedPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList; List *taskList = workerJob->taskList;
ListCell *taskCell = NULL;
bool multipleTasks = list_length(taskList) > 1;
EState *executorState = ScanStateGetExecutorState(scanState); EState *executorState = ScanStateGetExecutorState(scanState);
bool taskListRequires2PC = TaskListRequires2PC(taskList);
bool alwaysThrowErrorOnFailure = false;
CmdType operation = scanState->distributedPlan->operation; CmdType operation = scanState->distributedPlan->operation;
Assert(!scanState->finishedRemoteScan); Assert(!scanState->finishedRemoteScan);
/* executorState->es_processed +=
* We could naturally handle function-based transactions (i.e. those using ExecuteModifyTasksSequentially(scanState, taskList, operation, hasReturning);
* PL/pgSQL or similar) by checking the type of queryDesc->dest, but some
* customers already use functions that touch multiple shards from within
* a function, so we'll ignore functions for now.
*/
if (IsMultiStatementTransaction() || multipleTasks || taskListRequires2PC)
{
BeginOrContinueCoordinatedTransaction();
/*
* Although using two phase commit protocol is an independent decision than
* failing on any error, we prefer to couple them. Our motivation is that
* the failures are rare, and we prefer to avoid marking placements invalid
* in case of failures.
*
* For reference tables, we always set alwaysThrowErrorOnFailure since we
* absolutely want to avoid marking any placements invalid.
*
* We also cannot handle failures when there is RETURNING and there are more
* than one task to execute.
*/
if (taskListRequires2PC)
{
CoordinatedTransactionUse2PC();
alwaysThrowErrorOnFailure = true;
}
else if (multipleTasks && hasReturning)
{
alwaysThrowErrorOnFailure = true;
}
}
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
bool expectResults = (hasReturning || task->relationRowLockList != NIL);
executorState->es_processed +=
ExecuteSingleModifyTask(scanState, task, operation,
alwaysThrowErrorOnFailure, expectResults);
}
} }
@ -1056,7 +1008,7 @@ BuildPlacementAccessList(int32 groupId, List *relationShardList,
* CreatePlacementAccess returns a new ShardPlacementAccess for the given placement * CreatePlacementAccess returns a new ShardPlacementAccess for the given placement
* and access type. * and access type.
*/ */
static ShardPlacementAccess * ShardPlacementAccess *
CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType) CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType)
{ {
ShardPlacementAccess *placementAccess = NULL; ShardPlacementAccess *placementAccess = NULL;
@ -1379,7 +1331,18 @@ ExecuteModifyTasksWithoutResults(List *taskList)
/* /*
* ExecuteModifyTasksSequentiallyWithoutResults basically calls ExecuteSingleModifyTask in * ExecuteModifyTasksSequentiallyWithoutResults calls ExecuteModifyTasksSequentially
* and ignores the results.
*/
int64
ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation)
{
return ExecuteModifyTasksSequentially(NULL, taskList, operation, false);
}
/*
* ExecuteModifyTasksSequentially basically calls ExecuteSingleModifyTask in
* a loop in order to simulate sequential execution of a list of tasks. Useful * a loop in order to simulate sequential execution of a list of tasks. Useful
* in cases where issuing commands in parallel before waiting for results could * in cases where issuing commands in parallel before waiting for results could
* result in deadlocks (such as foreign key creation to reference tables). * result in deadlocks (such as foreign key creation to reference tables).
@ -1388,43 +1351,57 @@ ExecuteModifyTasksWithoutResults(List *taskList)
* returns 0. * returns 0.
*/ */
int64 int64
ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList,
CmdType operation, bool hasReturning)
{ {
ListCell *taskCell = NULL; ListCell *taskCell = NULL;
bool multipleTasks = list_length(taskList) > 1; bool multipleTasks = list_length(taskList) > 1;
bool expectResults = false;
int64 affectedTupleCount = 0; int64 affectedTupleCount = 0;
bool alwaysThrowErrorOnFailure = true; bool alwaysThrowErrorOnFailure = false;
bool taskListRequires2PC = TaskListRequires2PC(taskList); bool taskListRequires2PC = TaskListRequires2PC(taskList);
/* decide on whether to use coordinated transaction and 2PC */
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE) if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE)
{ {
alwaysThrowErrorOnFailure = true;
/* we don't run CREATE INDEX CONCURRENTLY in a distributed transaction */ /* we don't run CREATE INDEX CONCURRENTLY in a distributed transaction */
} }
else if (IsMultiStatementTransaction() || multipleTasks) else if (IsMultiStatementTransaction() || multipleTasks || taskListRequires2PC)
{ {
BeginOrContinueCoordinatedTransaction(); BeginOrContinueCoordinatedTransaction();
/*
* Although using two phase commit protocol is an independent decision than
* failing on any error, we prefer to couple them. Our motivation is that
* the failures are rare, and we prefer to avoid marking placements invalid
* in case of failures.
*
* For reference tables, we always set alwaysThrowErrorOnFailure since we
* absolutely want to avoid marking any placements invalid.
*
* We also cannot handle failures when there is RETURNING and there are more
* than one task to execute.
*/
if (taskListRequires2PC) if (taskListRequires2PC)
{ {
CoordinatedTransactionUse2PC(); CoordinatedTransactionUse2PC();
alwaysThrowErrorOnFailure = true;
}
else if (multipleTasks && hasReturning)
{
alwaysThrowErrorOnFailure = true;
} }
}
else if (!multipleTasks && taskListRequires2PC)
{
/* DDL on a reference table should also use 2PC */
BeginOrContinueCoordinatedTransaction();
CoordinatedTransactionUse2PC();
} }
/* now that we've decided on the transaction status, execute the tasks */ /* now that we've decided on the transaction status, execute the tasks */
foreach(taskCell, taskList) foreach(taskCell, taskList)
{ {
Task *task = (Task *) lfirst(taskCell); Task *task = (Task *) lfirst(taskCell);
bool expectResults = (hasReturning || task->relationRowLockList != NIL);
affectedTupleCount += affectedTupleCount +=
ExecuteSingleModifyTask(NULL, task, operation, alwaysThrowErrorOnFailure, ExecuteSingleModifyTask(scanState, task, operation, alwaysThrowErrorOnFailure,
expectResults); expectResults);
} }

View File

@ -45,9 +45,13 @@ extern TupleTableSlot * RouterModifyExecScan(CustomScanState *node);
extern void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, extern void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults); bool isModificationQuery, bool expectResults);
int64 ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList,
CmdType operation, bool hasReturning);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList); extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList,
CmdType operation); CmdType operation);
extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType);
/* helper functions */ /* helper functions */
extern bool TaskListRequires2PC(List *taskList); extern bool TaskListRequires2PC(List *taskList);