Refactor ExecuteModifyTasksSequentially.

pull/2781/head
Hadi Moshayedi 2019-06-20 17:23:12 +02:00
parent 6741ffd716
commit 2e6d04df7b
2 changed files with 45 additions and 64 deletions

View File

@ -91,8 +91,6 @@ bool SortReturning = false;
/* functions needed during run phase */
static void AcquireMetadataLocks(List *taskList);
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType);
static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType
operation, bool alwaysThrowErrorOnFailure, bool
expectResults);
@ -732,59 +730,13 @@ RouterSequentialModifyExecScan(CustomScanState *node)
bool hasReturning = distributedPlan->hasReturning;
Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList;
ListCell *taskCell = NULL;
bool multipleTasks = list_length(taskList) > 1;
EState *executorState = ScanStateGetExecutorState(scanState);
bool taskListRequires2PC = TaskListRequires2PC(taskList);
bool alwaysThrowErrorOnFailure = false;
CmdType operation = scanState->distributedPlan->operation;
Assert(!scanState->finishedRemoteScan);
/*
* We could naturally handle function-based transactions (i.e. those using
* PL/pgSQL or similar) by checking the type of queryDesc->dest, but some
* customers already use functions that touch multiple shards from within
* a function, so we'll ignore functions for now.
*/
if (IsMultiStatementTransaction() || multipleTasks || taskListRequires2PC)
{
BeginOrContinueCoordinatedTransaction();
/*
* Although using two phase commit protocol is an independent decision than
* failing on any error, we prefer to couple them. Our motivation is that
* the failures are rare, and we prefer to avoid marking placements invalid
* in case of failures.
*
* For reference tables, we always set alwaysThrowErrorOnFailure since we
* absolutely want to avoid marking any placements invalid.
*
* We also cannot handle failures when there is RETURNING and there are more
* than one task to execute.
*/
if (taskListRequires2PC)
{
CoordinatedTransactionUse2PC();
alwaysThrowErrorOnFailure = true;
}
else if (multipleTasks && hasReturning)
{
alwaysThrowErrorOnFailure = true;
}
}
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
bool expectResults = (hasReturning || task->relationRowLockList != NIL);
executorState->es_processed +=
ExecuteSingleModifyTask(scanState, task, operation,
alwaysThrowErrorOnFailure, expectResults);
}
executorState->es_processed +=
ExecuteModifyTasksSequentially(scanState, taskList, operation, hasReturning);
}
@ -1056,7 +1008,7 @@ BuildPlacementAccessList(int32 groupId, List *relationShardList,
* CreatePlacementAccess returns a new ShardPlacementAccess for the given placement
* and access type.
*/
static ShardPlacementAccess *
ShardPlacementAccess *
CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType)
{
ShardPlacementAccess *placementAccess = NULL;
@ -1379,7 +1331,18 @@ ExecuteModifyTasksWithoutResults(List *taskList)
/*
* ExecuteModifyTasksSequentiallyWithoutResults basically calls ExecuteSingleModifyTask in
* ExecuteModifyTasksSequentiallyWithoutResults calls ExecuteModifyTasksSequentially
* and ignores the results.
*/
int64
ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation)
{
return ExecuteModifyTasksSequentially(NULL, taskList, operation, false);
}
/*
* ExecuteModifyTasksSequentially basically calls ExecuteSingleModifyTask in
* a loop in order to simulate sequential execution of a list of tasks. Useful
* in cases where issuing commands in parallel before waiting for results could
* result in deadlocks (such as foreign key creation to reference tables).
@ -1388,43 +1351,57 @@ ExecuteModifyTasksWithoutResults(List *taskList)
* returns 0.
*/
int64
ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation)
ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList,
CmdType operation, bool hasReturning)
{
ListCell *taskCell = NULL;
bool multipleTasks = list_length(taskList) > 1;
bool expectResults = false;
int64 affectedTupleCount = 0;
bool alwaysThrowErrorOnFailure = true;
bool alwaysThrowErrorOnFailure = false;
bool taskListRequires2PC = TaskListRequires2PC(taskList);
/* decide on whether to use coordinated transaction and 2PC */
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE)
{
alwaysThrowErrorOnFailure = true;
/* we don't run CREATE INDEX CONCURRENTLY in a distributed transaction */
}
else if (IsMultiStatementTransaction() || multipleTasks)
else if (IsMultiStatementTransaction() || multipleTasks || taskListRequires2PC)
{
BeginOrContinueCoordinatedTransaction();
/*
* Although using two phase commit protocol is an independent decision than
* failing on any error, we prefer to couple them. Our motivation is that
* the failures are rare, and we prefer to avoid marking placements invalid
* in case of failures.
*
* For reference tables, we always set alwaysThrowErrorOnFailure since we
* absolutely want to avoid marking any placements invalid.
*
* We also cannot handle failures when there is RETURNING and there are more
* than one task to execute.
*/
if (taskListRequires2PC)
{
CoordinatedTransactionUse2PC();
alwaysThrowErrorOnFailure = true;
}
else if (multipleTasks && hasReturning)
{
alwaysThrowErrorOnFailure = true;
}
}
else if (!multipleTasks && taskListRequires2PC)
{
/* DDL on a reference table should also use 2PC */
BeginOrContinueCoordinatedTransaction();
CoordinatedTransactionUse2PC();
}
/* now that we've decided on the transaction status, execute the tasks */
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
bool expectResults = (hasReturning || task->relationRowLockList != NIL);
affectedTupleCount +=
ExecuteSingleModifyTask(NULL, task, operation, alwaysThrowErrorOnFailure,
ExecuteSingleModifyTask(scanState, task, operation, alwaysThrowErrorOnFailure,
expectResults);
}

View File

@ -45,9 +45,13 @@ extern TupleTableSlot * RouterModifyExecScan(CustomScanState *node);
extern void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults);
int64 ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList,
CmdType operation, bool hasReturning);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList,
CmdType operation);
extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType);
/* helper functions */
extern bool TaskListRequires2PC(List *taskList);