Clean up transaction block usage logic in adaptive executor

pull/3288/head
Marco Slot 2019-12-11 12:52:39 +01:00
parent bfc3d2eb90
commit f4031dd477
4 changed files with 228 additions and 85 deletions

View File

@ -174,9 +174,20 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
task->relationShardList = NIL;
task->taskPlacementList = placementList;
/*
* We are delegating the distributed transaction to the worker, so we
* should not run the CALL in a transaction block.
*/
TransactionProperties xactProperties = {
.errorOnAnyFailure = true,
.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_DISALLOWED,
.requires2PC = false
};
ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task),
tupleDesc, tupleStore, hasReturning,
MaxAdaptiveExecutorPoolSize);
MaxAdaptiveExecutorPoolSize,
&xactProperties);
while (tuplestore_gettupleslot(tupleStore, true, false, slot))
{

View File

@ -242,18 +242,12 @@ typedef struct DistributedExecution
*/
bool raiseInterrupts;
/*
* Flag to indicate whether the query is running in a distributed
* transaction.
*/
bool isTransaction;
/* transactional properties of the current execution */
TransactionProperties *transactionProperties;
/* indicates whether distributed execution has failed */
bool failed;
/* set to true when we prefer to bail out early */
bool errorOnAnyFailure;
/*
* For SELECT commands or INSERT/UPDATE/DELETE commands with RETURNING,
* the total number of rows received from the workers. For
@ -541,7 +535,12 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel
ParamListInfo paramListInfo,
TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore,
int targetPoolSize);
int targetPoolSize,
TransactionProperties *
xactProperties);
static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLevel
modLevel,
List *taskList);
static void StartDistributedExecution(DistributedExecution *execution);
static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution);
static void RunDistributedExecution(DistributedExecution *execution);
@ -556,8 +555,9 @@ static void AcquireExecutorShardLocksForExecution(DistributedExecution *executio
static void AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *
execution);
static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution);
static bool IsMultiShardModification(RowModifyLevel modLevel, List *taskList);
static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
static bool DistributedExecutionRequiresRollback(DistributedExecution *execution);
static bool DistributedExecutionRequiresRollback(List *taskList);
static bool TaskListRequires2PC(List *taskList);
static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList);
static void AssignTasksToConnections(DistributedExecution *execution);
@ -621,7 +621,6 @@ AdaptiveExecutor(CitusScanState *scanState)
bool interTransactions = false;
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
Job *job = distributedPlan->workerJob;
List *taskList = job->taskList;
@ -646,13 +645,18 @@ AdaptiveExecutor(CitusScanState *scanState)
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
TransactionProperties xactProperties =
DecideTransactionPropertiesForTaskList(distributedPlan->modLevel, taskList);
DistributedExecution *execution = CreateDistributedExecution(
distributedPlan->modLevel, taskList,
distributedPlan->
hasReturning, paramListInfo,
distributedPlan->modLevel,
taskList,
distributedPlan->hasReturning,
paramListInfo,
tupleDescriptor,
scanState->
tuplestorestate, targetPoolSize);
scanState->tuplestorestate,
targetPoolSize,
&xactProperties);
/*
* Make sure that we acquire the appropriate locks even if the local tasks
@ -771,8 +775,32 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize)
Tuplestorestate *tupleStore = NULL;
bool hasReturning = false;
TransactionProperties xactProperties =
DecideTransactionPropertiesForTaskList(modLevel, taskList);
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
tupleStore, hasReturning, targetPoolSize);
tupleStore, hasReturning, targetPoolSize,
&xactProperties);
}
/*
* ExecuteTaskListIntoTupleStore is a proxy to ExecuteTaskListExtended() with defaults
* for some of the arguments.
*/
uint64
ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, Tuplestorestate *tupleStore,
bool hasReturning)
{
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
TransactionProperties xactProperties =
DecideTransactionPropertiesForTaskList(modLevel, taskList);
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
tupleStore, hasReturning, targetPoolSize,
&xactProperties);
}
@ -783,7 +811,8 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize)
uint64
ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, Tuplestorestate *tupleStore,
bool hasReturning, int targetPoolSize)
bool hasReturning, int targetPoolSize,
TransactionProperties *xactProperties)
{
ParamListInfo paramListInfo = NULL;
@ -800,7 +829,8 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
DistributedExecution *execution =
CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo,
tupleDescriptor, tupleStore, targetPoolSize);
tupleDescriptor, tupleStore, targetPoolSize,
xactProperties);
StartDistributedExecution(execution);
RunDistributedExecution(execution);
@ -817,7 +847,8 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
static DistributedExecution *
CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning,
ParamListInfo paramListInfo, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, int targetPoolSize)
Tuplestorestate *tupleStore, int targetPoolSize,
TransactionProperties *xactProperties)
{
DistributedExecution *execution =
(DistributedExecution *) palloc0(sizeof(DistributedExecution));
@ -825,6 +856,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu
execution->modLevel = modLevel;
execution->tasksToExecute = taskList;
execution->hasReturning = hasReturning;
execution->transactionProperties = xactProperties;
execution->localTaskList = NIL;
execution->remoteTaskList = NIL;
@ -873,6 +905,97 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu
}
/*
* DecideTransactionPropertiesForTaskList decides whether to use remote transaction
* blocks, whether to use 2PC for the given task list, and whether to error on any
* failure.
*
* Since these decisions have specific dependencies on each other (e.g. 2PC implies
* errorOnAnyFailure, but not the other way around) we keep them in the same place.
*/
static TransactionProperties
DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList)
{
TransactionProperties xactProperties = {
.errorOnAnyFailure = false,
.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_ALLOWED,
.requires2PC = false
};
if (taskList == NIL)
{
/* nothing to do, return defaults */
return xactProperties;
}
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE)
{
/*
* We prefer to error on any failures for CREATE INDEX
* CONCURRENTLY or VACUUM//VACUUM ANALYZE (e.g., COMMIT_PROTOCOL_BARE).
*/
xactProperties.errorOnAnyFailure = true;
xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_DISALLOWED;
return xactProperties;
}
if (LocalExecutionHappened)
{
/*
* In case localExecutionHappened, we force the executor to use 2PC.
* The primary motivation is that at this point we're definitely expanding
* the nodes participated in the transaction. And, by re-generating the
* remote task lists during local query execution, we might prevent the adaptive
* executor to kick-in 2PC (or even start coordinated transaction, that's why
* we prefer adding this check here instead of
* Activate2PCIfModifyingTransactionExpandsToNewNode()).
*/
xactProperties.errorOnAnyFailure = true;
xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED;
xactProperties.requires2PC = true;
return xactProperties;
}
if (DistributedExecutionRequiresRollback(taskList))
{
/* transaction blocks are required if the task list needs to roll back */
xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED;
if (TaskListRequires2PC(taskList))
{
/*
* Although using two phase commit protocol is an independent decision than
* failing on any error, we prefer to couple them. Our motivation is that
* the failures are rare, and we prefer to avoid marking placements invalid
* in case of failures.
*/
xactProperties.errorOnAnyFailure = true;
xactProperties.requires2PC = true;
}
else if (MultiShardCommitProtocol != COMMIT_PROTOCOL_2PC &&
IsMultiShardModification(modLevel, taskList))
{
/*
* Even if we're not using 2PC, we prefer to error out
* on any failures during multi shard modifications/DDLs.
*/
xactProperties.errorOnAnyFailure = true;
}
}
else if (InCoordinatedTransaction())
{
/*
* If we are already in a coordinated transaction then transaction blocks
* are required even if they are not strictly required for the current
* execution.
*/
xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED;
}
return xactProperties;
}
/*
* StartDistributedExecution sets up the coordinated transaction and 2PC for
* the execution whenever necessary. It also keeps track of parallel relation
@ -882,54 +1005,16 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu
void
StartDistributedExecution(DistributedExecution *execution)
{
List *taskList = execution->tasksToExecute;
TransactionProperties *xactProperties = execution->transactionProperties;
if (MultiShardCommitProtocol != COMMIT_PROTOCOL_BARE)
if (xactProperties->useRemoteTransactionBlocks == TRANSACTION_BLOCKS_REQUIRED)
{
/*
* In case localExecutionHappened, we simply force the executor to use 2PC.
* The primary motivation is that at this point we're definitely expanding
* the nodes participated in the transaction. And, by re-generating the
* remote task lists during local query execution, we might prevent the adaptive
* executor to kick-in 2PC (or even start coordinated transaction, that's why
* we prefer adding this check here instead of
* Activate2PCIfModifyingTransactionExpandsToNewNode()).
*/
if (DistributedExecutionRequiresRollback(execution) || LocalExecutionHappened)
{
UseCoordinatedTransaction();
if (TaskListRequires2PC(taskList) || LocalExecutionHappened)
{
/*
* Although using two phase commit protocol is an independent decision than
* failing on any error, we prefer to couple them. Our motivation is that
* the failures are rare, and we prefer to avoid marking placements invalid
* in case of failures.
*/
CoordinatedTransactionUse2PC();
execution->errorOnAnyFailure = true;
}
else if (MultiShardCommitProtocol != COMMIT_PROTOCOL_2PC &&
list_length(taskList) > 1 &&
DistributedExecutionModifiesDatabase(execution))
{
/*
* Even if we're not using 2PC, we prefer to error out
* on any failures during multi shard modifications/DDLs.
*/
execution->errorOnAnyFailure = true;
}
}
UseCoordinatedTransaction();
}
else
if (xactProperties->requires2PC)
{
/*
* We prefer to error on any failures for CREATE INDEX
* CONCURRENTLY or VACUUM//VACUUM ANALYZE (e.g., COMMIT_PROTOCOL_BARE).
*/
execution->errorOnAnyFailure = true;
CoordinatedTransactionUse2PC();
}
/*
@ -943,12 +1028,6 @@ StartDistributedExecution(DistributedExecution *execution)
*/
AcquireExecutorShardLocksForExecution(execution);
/*
* If the current or previous execution in the current transaction requires
* rollback then we should use transaction blocks.
*/
execution->isTransaction = InCoordinatedTransaction();
/*
* We should not record parallel access if the target pool size is less than 2.
* The reason is that we define parallel access as at least two connections
@ -961,7 +1040,7 @@ StartDistributedExecution(DistributedExecution *execution)
*/
if (execution->targetPoolSize > 1)
{
RecordParallelRelationAccessForTaskList(taskList);
RecordParallelRelationAccessForTaskList(execution->tasksToExecute);
}
}
@ -988,6 +1067,17 @@ DistributedPlanModifiesDatabase(DistributedPlan *plan)
}
/*
* IsMultiShardModification returns true if the task list is a modification
* across shards.
*/
static bool
IsMultiShardModification(RowModifyLevel modLevel, List *taskList)
{
return list_length(taskList) > 1 && TaskListModifiesDatabase(modLevel, taskList);
}
/*
* TaskListModifiesDatabase is a helper function for DistributedExecutionModifiesDatabase and
* DistributedPlanModifiesDatabase.
@ -1023,9 +1113,8 @@ TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList)
* involved in the distributed execution.
*/
static bool
DistributedExecutionRequiresRollback(DistributedExecution *execution)
DistributedExecutionRequiresRollback(List *taskList)
{
List *taskList = execution->tasksToExecute;
int taskCount = list_length(taskList);
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE)
@ -2175,7 +2264,7 @@ CheckConnectionTimeout(WorkerPool *workerPool)
* has two different placements, we'd warn the user, fail the pool and continue
* with the next placement.
*/
if (execution->errorOnAnyFailure || execution->failed)
if (execution->transactionProperties->errorOnAnyFailure || execution->failed)
{
logLevel = ERROR;
}
@ -2424,7 +2513,8 @@ ConnectionStateMachine(WorkerSession *session)
* The execution may have failed as a result of WorkerSessionFailed
* or WorkerPoolFailed.
*/
if (execution->failed || execution->errorOnAnyFailure)
if (execution->failed ||
execution->transactionProperties->errorOnAnyFailure)
{
/* a task has failed due to this connection failure */
ReportConnectionError(connection, ERROR);
@ -2561,12 +2651,12 @@ static bool
TransactionModifiedDistributedTable(DistributedExecution *execution)
{
/*
* We need to explicitly check for isTransaction due to
* We need to explicitly check for a coordinated transaction due to
* citus.function_opens_transaction_block flag. When set to false, we
* should not be pretending that we're in a coordinated transaction even
* if XACT_MODIFICATION_DATA is set. That's why we implemented this workaround.
*/
return execution->isTransaction && XactModificationLevel == XACT_MODIFICATION_DATA;
return InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA;
}
@ -2578,6 +2668,8 @@ TransactionStateMachine(WorkerSession *session)
{
WorkerPool *workerPool = session->workerPool;
DistributedExecution *execution = workerPool->distributedExecution;
TransactionBlocksUsage useRemoteTransactionBlocks =
execution->transactionProperties->useRemoteTransactionBlocks;
MultiConnection *connection = session->connection;
RemoteTransaction *transaction = &(connection->remoteTransaction);
@ -2596,7 +2688,7 @@ TransactionStateMachine(WorkerSession *session)
{
case REMOTE_TRANS_NOT_STARTED:
{
if (execution->isTransaction)
if (useRemoteTransactionBlocks == TRANSACTION_BLOCKS_REQUIRED)
{
/* if we're expanding the nodes in a transaction, use 2PC */
Activate2PCIfModifyingTransactionExpandsToNewNode(session);
@ -2684,7 +2776,7 @@ TransactionStateMachine(WorkerSession *session)
UpdateConnectionWaitFlags(session,
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
if (execution->isTransaction)
if (transaction->beginSent)
{
transaction->transactionState = REMOTE_TRANS_STARTED;
}
@ -3404,7 +3496,8 @@ ScheduleNextPlacementExecution(TaskPlacementExecution *placementExecution, bool
static bool
ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution)
{
if (!DistributedExecutionModifiesDatabase(execution) || execution->errorOnAnyFailure)
if (!DistributedExecutionModifiesDatabase(execution) ||
execution->transactionProperties->errorOnAnyFailure)
{
/*
* Failures that do not modify the database (e.g., mainly SELECTs) should

View File

@ -196,9 +196,9 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, prunedTaskList,
tupleDescriptor, scanState->tuplestorestate,
hasReturning, MaxAdaptiveExecutorPoolSize);
ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList,
tupleDescriptor, scanState->tuplestorestate,
hasReturning);
if (SortReturning && hasReturning)
{

View File

@ -26,6 +26,40 @@ typedef enum
SEQUENTIAL_CONNECTION = 1
} MultiShardConnectionTypes;
/*
* TransactionBlocksUsage indicates whether to use remote transaction
* blocks according to one of the following policies:
* - opening a remote transaction is required
* - opening a remote transaction does not matter, so it is allowed but not required.
* - opening a remote transaction is disallowed
*/
typedef enum TransactionBlocksUsage
{
TRANSACTION_BLOCKS_REQUIRED,
TRANSACTION_BLOCKS_ALLOWED,
TRANSACTION_BLOCKS_DISALLOWED,
} TransactionBlocksUsage;
/*
* TransactionProperties reflects how we should execute a task list
* given previous commands in the transaction and the type of task list.
*/
typedef struct TransactionProperties
{
/* if true, any failure on the worker causes the execution to end immediately */
bool errorOnAnyFailure;
/*
* Determines whether transaction blocks on workers are required, disallowed, or
* allowed (will use them if already in a coordinated transaction).
*/
TransactionBlocksUsage useRemoteTransactionBlocks;
/* if true, the current execution requires 2PC to be globally enabled */
bool requires2PC;
} TransactionProperties;
extern int MultiShardConnectionType;
extern bool WritableStandbyCoordinator;
extern bool ForceMaxQueryParallelization;
@ -42,7 +76,12 @@ extern TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState);
extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore,
bool hasReturning, int targetPoolSize);
bool hasReturning, int targetPoolSize,
TransactionProperties *xactProperties);
extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore,
bool hasReturning);
extern void ExecuteUtilityTaskListWithoutResults(List *taskList);
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int
targetPoolSize);