From f33de24877002c5591b08b8fdd09f74332c60faf Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 20 Aug 2021 11:13:58 +0200 Subject: [PATCH] Fetch tuples in small batches in adaptive executor where possible --- .../distributed/executor/adaptive_executor.c | 178 +++++++++++++----- .../distributed/executor/citus_custom_scan.c | 21 ++- src/include/distributed/adaptive_executor.h | 5 + src/include/distributed/citus_custom_scan.h | 8 + src/include/distributed/multi_executor.h | 2 - 5 files changed, 163 insertions(+), 51 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ed4ad6b07..0cbee76af 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -266,7 +266,7 @@ typedef struct DistributedExecution bool raiseInterrupts; /* transactional properties of the current execution */ - TransactionProperties *transactionProperties; + TransactionProperties transactionProperties; /* indicates whether distributed execution has failed */ bool failed; @@ -282,6 +282,13 @@ typedef struct DistributedExecution */ uint64 rowsProcessed; + /* + * RunDistributedExecution can be called multiple time to perform partial + * execution. In that case, rowsReceivedInCurrentRun contains the number + * of rows received. + */ + uint64 rowsReceivedInCurrentRun; + /* * The following fields are used while receiving results from remote nodes. * We store this information here to avoid re-allocating it every time. @@ -299,6 +306,11 @@ typedef struct DistributedExecution * do cleanup for repartition queries. */ List *jobIdList; + + /* + * Memory context for the execution. + */ + MemoryContext memoryContext; } DistributedExecution; @@ -610,7 +622,7 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel int targetPoolSize, TupleDestination * defaultTupleDest, - TransactionProperties * + TransactionProperties xactProperties, List *jobIdList, bool localExecutionSupported); @@ -621,7 +633,7 @@ static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLev exludeFromTransaction); static void StartDistributedExecution(DistributedExecution *execution); static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution); -static void RunDistributedExecution(DistributedExecution *execution); +static void RunDistributedExecution(DistributedExecution *execution, bool toCompletion); static void SequentialRunDistributedExecution(DistributedExecution *execution); static void FinishDistributedExecution(DistributedExecution *execution); static void CleanUpSessions(DistributedExecution *execution); @@ -751,11 +763,9 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState) * first call of CitusExecScan. The function fills the tupleStore * of the input scanScate. */ -TupleTableSlot * -AdaptiveExecutor(CitusScanState *scanState) +void +AdaptiveExecutorStart(CitusScanState *scanState) { - TupleTableSlot *resultSlot = NULL; - DistributedPlan *distributedPlan = scanState->distributedPlan; EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; @@ -770,14 +780,10 @@ AdaptiveExecutor(CitusScanState *scanState) /* we should only call this once before the scan finished */ Assert(!scanState->finishedRemoteScan); - MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, - "AdaptiveExecutor", - ALLOCSET_DEFAULT_SIZES); - MemoryContext oldContext = MemoryContextSwitchTo(localContext); - - - /* Reset Task fields that are only valid for a single execution */ - ResetExplainAnalyzeData(taskList); + MemoryContext memoryContext = AllocSetContextCreate(executorState->es_query_cxt, + "AdaptiveExecutor", + ALLOCSET_DEFAULT_SIZES); + MemoryContext oldContext = MemoryContextSwitchTo(memoryContext); scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); @@ -786,6 +792,9 @@ AdaptiveExecutor(CitusScanState *scanState) TupleDestination *defaultTupleDest = CreateTupleStoreTupleDest(scanState->tuplestorestate, tupleDescriptor); + /* Reset Task fields that are only valid for a single execution */ + ResetExplainAnalyzeData(taskList); + if (RequestedForExplainAnalyze(scanState)) { /* @@ -820,7 +829,7 @@ AdaptiveExecutor(CitusScanState *scanState) paramListInfo, targetPoolSize, defaultTupleDest, - &xactProperties, + xactProperties, jobIdList, localExecutionSupported); @@ -830,13 +839,53 @@ AdaptiveExecutor(CitusScanState *scanState) */ StartDistributedExecution(execution); + /* store the execution in the custom scan state */ + scanState->execution = execution; + + execution->memoryContext = MemoryContextSwitchTo(oldContext); +} + + +bool +AdaptiveExecutorRun(CitusScanState *scanState) +{ + DistributedExecution *execution = scanState->execution; + DistributedPlan *distributedPlan = scanState->distributedPlan; + Job *job = distributedPlan->workerJob; + CmdType commandType = job->jobQuery->commandType; + + Assert(execution != NULL); + + MemoryContext oldContext = MemoryContextSwitchTo(execution->memoryContext); + + EState *executorState = ScanStateGetExecutorState(scanState); + bool sortTupleStore = false; + + if (SortReturning && distributedPlan->expectResults && commandType != CMD_SELECT) + { + /* sort the tuple store to get consistent DML output in tests */ + sortTupleStore = true; + } + if (ShouldRunTasksSequentially(execution->remoteTaskList)) { + /* sequential execution always runs to completion */ SequentialRunDistributedExecution(execution); } else { - RunDistributedExecution(execution); + /* if we need to sort the whole tuple store, run to completino */ + bool runToCompletion = sortTupleStore; + + RunDistributedExecution(execution, runToCompletion); + + if (execution->unfinishedTaskCount > 0) + { + MemoryContextSwitchTo(oldContext); + return false; + } + + /* done with remote tasks, finish the execution */ } /* execute tasks local to the node (if any) */ @@ -846,7 +895,6 @@ AdaptiveExecutor(CitusScanState *scanState) RunLocalExecution(scanState, execution); } - CmdType commandType = job->jobQuery->commandType; if (commandType != CMD_SELECT) { executorState->es_processed = execution->rowsProcessed; @@ -854,19 +902,14 @@ AdaptiveExecutor(CitusScanState *scanState) FinishDistributedExecution(execution); - if (hasDependentJobs) - { - DoRepartitionCleanup(jobIdList); - } - - if (SortReturning && distributedPlan->expectResults && commandType != CMD_SELECT) + if (sortTupleStore) { SortTupleStore(scanState); } MemoryContextSwitchTo(oldContext); - return resultSlot; + return true; } @@ -1014,7 +1057,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) CreateDistributedExecution( executionParams->modLevel, executionParams->taskList, paramListInfo, executionParams->targetPoolSize, - defaultTupleDest, &executionParams->xactProperties, + defaultTupleDest, executionParams->xactProperties, executionParams->jobIdList, executionParams->localExecutionSupported); /* @@ -1032,7 +1075,10 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) /* run the remote execution */ StartDistributedExecution(execution); - RunDistributedExecution(execution); + + bool runToCompletion = true; + RunDistributedExecution(execution, runToCompletion); + FinishDistributedExecution(execution); /* now, switch back to the local execution */ @@ -1083,7 +1129,7 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, ParamListInfo paramListInfo, int targetPoolSize, TupleDestination *defaultTupleDest, - TransactionProperties *xactProperties, + TransactionProperties xactProperties, List *jobIdList, bool localExecutionSupported) { DistributedExecution *execution = @@ -1254,7 +1300,7 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, void StartDistributedExecution(DistributedExecution *execution) { - TransactionProperties *xactProperties = execution->transactionProperties; + TransactionProperties *xactProperties = &(execution->transactionProperties); if (xactProperties->useRemoteTransactionBlocks == TRANSACTION_BLOCKS_REQUIRED) { @@ -1296,6 +1342,20 @@ StartDistributedExecution(DistributedExecution *execution) */ RecordParallelRelationAccessForTaskList(execution->remoteAndLocalTaskList); } + + /* + * We skip AssignTasksToConnectionsOrWorkerPool for sequential executions, + * because we do it separately for each task in SequentialRunDistributedExecution. + */ + if (!ShouldRunTasksSequentially(execution->remoteTaskList)) + { + /* + * If a (co-located) shard placement was accessed over a session earier in the + * transaction, assign the task to the same session. Otherwise, assign it to + * the general worker pool(s). + */ + AssignTasksToConnectionsOrWorkerPool(execution); + } } @@ -1639,11 +1699,24 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution) static void FinishDistributedExecution(DistributedExecution *execution) { + /* + * Sequential executions unclaim connections separately. + */ + if (!ShouldRunTasksSequentially(execution->remoteTaskList)) + { + CleanUpSessions(execution); + } + if (DistributedExecutionModifiesDatabase(execution)) { /* prevent copying shards in same transaction */ XactModificationLevel = XACT_MODIFICATION_DATA; } + + if (list_length(execution->jobIdList) > 0) + { + DoRepartitionCleanup(execution->jobIdList); + } } @@ -1827,7 +1900,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); MultiConnection *connection = NULL; - if (execution->transactionProperties->useRemoteTransactionBlocks != + if (execution->transactionProperties.useRemoteTransactionBlocks != TRANSACTION_BLOCKS_DISALLOWED) { /* @@ -2260,8 +2333,20 @@ SequentialRunDistributedExecution(DistributedExecution *execution) break; } + /* + * We skipped AssignTasksToConnectionsOrWorkerPool in StartDistributedExecution + * when all the tasks were in the execution. Do it now instead. + */ + AssignTasksToConnectionsOrWorkerPool(execution); + /* simply call the regular execution function */ - RunDistributedExecution(execution); + bool runToCompletion = true; + RunDistributedExecution(execution, runToCompletion); + + /* + * Unclaim connections since the current execution is technically finished. + */ + CleanUpSessions(execution); } /* set back the original execution mode */ @@ -2278,12 +2363,10 @@ SequentialRunDistributedExecution(DistributedExecution *execution) * has an event. */ void -RunDistributedExecution(DistributedExecution *execution) +RunDistributedExecution(DistributedExecution *execution, bool toCompletion) { WaitEvent *events = NULL; - AssignTasksToConnectionsOrWorkerPool(execution); - PG_TRY(); { /* Preemptively step state machines in case of immediate errors */ @@ -2299,6 +2382,10 @@ RunDistributedExecution(DistributedExecution *execution) /* always (re)build the wait event set the first time */ execution->rebuildWaitEventSet = true; + execution->rowsReceivedInCurrentRun = 0; + + /* TODO: GUC? be smart? */ + int maxBatchSize = 10000; /* * Iterate until all the tasks are finished. Once all the tasks @@ -2318,8 +2405,10 @@ RunDistributedExecution(DistributedExecution *execution) * irrespective of the current status of the tasks or the connections. */ while (!cancellationReceived && - (execution->unfinishedTaskCount > 0 || - HasIncompleteConnectionEstablishment(execution))) + ((execution->unfinishedTaskCount > 0 || + HasIncompleteConnectionEstablishment(execution)) && + (toCompletion || + execution->rowsReceivedInCurrentRun < maxBatchSize))) { WorkerPool *workerPool = NULL; foreach_ptr(workerPool, execution->workerList) @@ -2390,8 +2479,6 @@ RunDistributedExecution(DistributedExecution *execution) FreeWaitEventSet(execution->waitEventSet); execution->waitEventSet = NULL; } - - CleanUpSessions(execution); } PG_CATCH(); { @@ -2593,7 +2680,7 @@ ManageWorkerPool(WorkerPool *workerPool) /* increase the open rate every cycle (like TCP slow start) */ workerPool->maxNewConnectionsPerCycle += 1; - OpenNewConnections(workerPool, newConnectionCount, execution->transactionProperties); + OpenNewConnections(workerPool, newConnectionCount, &execution->transactionProperties); /* * Cannot establish new connections to the local host, most probably because the @@ -3089,7 +3176,7 @@ CheckConnectionTimeout(WorkerPool *workerPool) */ logLevel = DEBUG1; } - else if (execution->transactionProperties->errorOnAnyFailure || + else if (execution->transactionProperties.errorOnAnyFailure || execution->failed) { /* @@ -3436,7 +3523,7 @@ ConnectionStateMachine(WorkerSession *session) * or WorkerPoolFailed. */ if (execution->failed || - (execution->transactionProperties->errorOnAnyFailure && + (execution->transactionProperties.errorOnAnyFailure && workerPool->failureState != WORKER_POOL_FAILED_OVER_TO_LOCAL)) { /* a task has failed due to this connection failure */ @@ -3633,7 +3720,7 @@ TransactionModifiedDistributedTable(DistributedExecution *execution) * should not be pretending that we're in a coordinated transaction even * if XACT_MODIFICATION_DATA is set. That's why we implemented this workaround. */ - return execution->transactionProperties->useRemoteTransactionBlocks == + return execution->transactionProperties.useRemoteTransactionBlocks == TRANSACTION_BLOCKS_REQUIRED && XactModificationLevel == XACT_MODIFICATION_DATA; } @@ -3648,7 +3735,7 @@ TransactionStateMachine(WorkerSession *session) WorkerPool *workerPool = session->workerPool; DistributedExecution *execution = workerPool->distributedExecution; TransactionBlocksUsage useRemoteTransactionBlocks = - execution->transactionProperties->useRemoteTransactionBlocks; + execution->transactionProperties.useRemoteTransactionBlocks; MultiConnection *connection = session->connection; RemoteTransaction *transaction = &(connection->remoteTransaction); @@ -4068,7 +4155,7 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, ShardPlacement *taskPlacement = placementExecution->shardPlacement; List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); - if (execution->transactionProperties->useRemoteTransactionBlocks != + if (execution->transactionProperties.useRemoteTransactionBlocks != TRANSACTION_BLOCKS_DISALLOWED) { /* @@ -4421,6 +4508,7 @@ ReceiveResults(WorkerSession *session, bool storeRows) MemoryContextReset(rowContext); execution->rowsProcessed++; + execution->rowsReceivedInCurrentRun++; } PQclear(result); @@ -4926,7 +5014,7 @@ static bool ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution) { if (!DistributedExecutionModifiesDatabase(execution) || - execution->transactionProperties->errorOnAnyFailure) + execution->transactionProperties.errorOnAnyFailure) { /* * Failures that do not modify the database (e.g., mainly SELECTs) should diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index a06f060f3..ff97f1af8 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -14,6 +14,7 @@ #include "miscadmin.h" #include "commands/copy.h" +#include "distributed/adaptive_executor.h" #include "distributed/backend_data.h" #include "distributed/citus_clauses.h" #include "distributed/citus_custom_scan.h" @@ -219,14 +220,25 @@ CitusExecScan(CustomScanState *node) { CitusScanState *scanState = (CitusScanState *) node; - if (!scanState->finishedRemoteScan) + if (!scanState->executionStarted) { - AdaptiveExecutor(scanState); + AdaptiveExecutorStart(scanState); - scanState->finishedRemoteScan = true; + scanState->executionStarted = true; } - return ReturnTupleFromTuplestore(scanState); + TupleTableSlot *resultSlot = ReturnTupleFromTuplestore(scanState); + if (TupIsNull(resultSlot) && !scanState->finishedRemoteScan) + { + /* clear the tuple store for the next batch */ + tuplestore_clear(scanState->tuplestorestate); + + scanState->finishedRemoteScan = AdaptiveExecutorRun(scanState); + + resultSlot = ReturnTupleFromTuplestore(scanState); + } + + return resultSlot; } @@ -582,6 +594,7 @@ AdaptiveExecutorCreateScan(CustomScan *scan) scanState->finishedPreScan = false; scanState->finishedRemoteScan = false; + scanState->executionStarted = false; return (Node *) scanState; } diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index 0a3768177..90200f70b 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -1,8 +1,10 @@ #ifndef ADAPTIVE_EXECUTOR_H #define ADAPTIVE_EXECUTOR_H +#include "distributed/citus_custom_scan.h" #include "distributed/multi_physical_planner.h" + /* GUC, determining whether Citus opens 1 connection per task */ extern bool ForceMaxQueryParallelization; extern int MaxAdaptiveExecutorPoolSize; @@ -14,6 +16,9 @@ extern int ExecutorSlowStartInterval; extern bool EnableCostBasedConnectionEstablishment; extern bool PreventIncompleteConnectionEstablishment; +extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState); +extern void AdaptiveExecutorStart(CitusScanState *scanState); +extern bool AdaptiveExecutorRun(CitusScanState *scanState); extern bool ShouldRunTasksSequentially(List *taskList); extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported); extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize, diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index 92301fceb..7ce0fa603 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -15,6 +15,10 @@ #include "executor/execdesc.h" #include "nodes/plannodes.h" + +struct DistributedExecution; + + typedef struct CitusScanState { CustomScanState customScanState; /* underlying custom scan node */ @@ -26,7 +30,11 @@ typedef struct CitusScanState DistributedPlan *distributedPlan; /* distributed execution plan */ MultiExecutorType executorType; /* distributed executor type */ bool finishedRemoteScan; /* flag to check if remote scan is finished */ + bool executionStarted; /* flag to check whether execution started */ Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */ + + /* execution state when using adaptive executor */ + struct DistributedExecution *execution; } CitusScanState; diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 0054c958a..9306aa21c 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -73,8 +73,6 @@ extern int ExecutorLevel; extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags); extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once); -extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState); -extern TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState); /*