Fetch tuples in small batches in adaptive executor where possible

pull/5195/head
Marco Slot 2021-08-20 11:13:58 +02:00
parent 262f89359e
commit f33de24877
5 changed files with 163 additions and 51 deletions

View File

@ -266,7 +266,7 @@ typedef struct DistributedExecution
bool raiseInterrupts;
/* transactional properties of the current execution */
TransactionProperties *transactionProperties;
TransactionProperties transactionProperties;
/* indicates whether distributed execution has failed */
bool failed;
@ -282,6 +282,13 @@ typedef struct DistributedExecution
*/
uint64 rowsProcessed;
/*
* RunDistributedExecution can be called multiple time to perform partial
* execution. In that case, rowsReceivedInCurrentRun contains the number
* of rows received.
*/
uint64 rowsReceivedInCurrentRun;
/*
* The following fields are used while receiving results from remote nodes.
* We store this information here to avoid re-allocating it every time.
@ -299,6 +306,11 @@ typedef struct DistributedExecution
* do cleanup for repartition queries.
*/
List *jobIdList;
/*
* Memory context for the execution.
*/
MemoryContext memoryContext;
} DistributedExecution;
@ -610,7 +622,7 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel
int targetPoolSize,
TupleDestination *
defaultTupleDest,
TransactionProperties *
TransactionProperties
xactProperties,
List *jobIdList,
bool localExecutionSupported);
@ -621,7 +633,7 @@ static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLev
exludeFromTransaction);
static void StartDistributedExecution(DistributedExecution *execution);
static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution);
static void RunDistributedExecution(DistributedExecution *execution);
static void RunDistributedExecution(DistributedExecution *execution, bool toCompletion);
static void SequentialRunDistributedExecution(DistributedExecution *execution);
static void FinishDistributedExecution(DistributedExecution *execution);
static void CleanUpSessions(DistributedExecution *execution);
@ -751,11 +763,9 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
* first call of CitusExecScan. The function fills the tupleStore
* of the input scanScate.
*/
TupleTableSlot *
AdaptiveExecutor(CitusScanState *scanState)
void
AdaptiveExecutorStart(CitusScanState *scanState)
{
TupleTableSlot *resultSlot = NULL;
DistributedPlan *distributedPlan = scanState->distributedPlan;
EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info;
@ -770,14 +780,10 @@ AdaptiveExecutor(CitusScanState *scanState)
/* we should only call this once before the scan finished */
Assert(!scanState->finishedRemoteScan);
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"AdaptiveExecutor",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
/* Reset Task fields that are only valid for a single execution */
ResetExplainAnalyzeData(taskList);
MemoryContext memoryContext = AllocSetContextCreate(executorState->es_query_cxt,
"AdaptiveExecutor",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(memoryContext);
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
@ -786,6 +792,9 @@ AdaptiveExecutor(CitusScanState *scanState)
TupleDestination *defaultTupleDest =
CreateTupleStoreTupleDest(scanState->tuplestorestate, tupleDescriptor);
/* Reset Task fields that are only valid for a single execution */
ResetExplainAnalyzeData(taskList);
if (RequestedForExplainAnalyze(scanState))
{
/*
@ -820,7 +829,7 @@ AdaptiveExecutor(CitusScanState *scanState)
paramListInfo,
targetPoolSize,
defaultTupleDest,
&xactProperties,
xactProperties,
jobIdList,
localExecutionSupported);
@ -830,13 +839,53 @@ AdaptiveExecutor(CitusScanState *scanState)
*/
StartDistributedExecution(execution);
/* store the execution in the custom scan state */
scanState->execution = execution;
execution->memoryContext = MemoryContextSwitchTo(oldContext);
}
bool
AdaptiveExecutorRun(CitusScanState *scanState)
{
DistributedExecution *execution = scanState->execution;
DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *job = distributedPlan->workerJob;
CmdType commandType = job->jobQuery->commandType;
Assert(execution != NULL);
MemoryContext oldContext = MemoryContextSwitchTo(execution->memoryContext);
EState *executorState = ScanStateGetExecutorState(scanState);
bool sortTupleStore = false;
if (SortReturning && distributedPlan->expectResults && commandType != CMD_SELECT)
{
/* sort the tuple store to get consistent DML output in tests */
sortTupleStore = true;
}
if (ShouldRunTasksSequentially(execution->remoteTaskList))
{
/* sequential execution always runs to completion */
SequentialRunDistributedExecution(execution);
}
else
{
RunDistributedExecution(execution);
/* if we need to sort the whole tuple store, run to completino */
bool runToCompletion = sortTupleStore;
RunDistributedExecution(execution, runToCompletion);
if (execution->unfinishedTaskCount > 0)
{
MemoryContextSwitchTo(oldContext);
return false;
}
/* done with remote tasks, finish the execution */
}
/* execute tasks local to the node (if any) */
@ -846,7 +895,6 @@ AdaptiveExecutor(CitusScanState *scanState)
RunLocalExecution(scanState, execution);
}
CmdType commandType = job->jobQuery->commandType;
if (commandType != CMD_SELECT)
{
executorState->es_processed = execution->rowsProcessed;
@ -854,19 +902,14 @@ AdaptiveExecutor(CitusScanState *scanState)
FinishDistributedExecution(execution);
if (hasDependentJobs)
{
DoRepartitionCleanup(jobIdList);
}
if (SortReturning && distributedPlan->expectResults && commandType != CMD_SELECT)
if (sortTupleStore)
{
SortTupleStore(scanState);
}
MemoryContextSwitchTo(oldContext);
return resultSlot;
return true;
}
@ -1014,7 +1057,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
CreateDistributedExecution(
executionParams->modLevel, executionParams->taskList,
paramListInfo, executionParams->targetPoolSize,
defaultTupleDest, &executionParams->xactProperties,
defaultTupleDest, executionParams->xactProperties,
executionParams->jobIdList, executionParams->localExecutionSupported);
/*
@ -1032,7 +1075,10 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
/* run the remote execution */
StartDistributedExecution(execution);
RunDistributedExecution(execution);
bool runToCompletion = true;
RunDistributedExecution(execution, runToCompletion);
FinishDistributedExecution(execution);
/* now, switch back to the local execution */
@ -1083,7 +1129,7 @@ static DistributedExecution *
CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
ParamListInfo paramListInfo,
int targetPoolSize, TupleDestination *defaultTupleDest,
TransactionProperties *xactProperties,
TransactionProperties xactProperties,
List *jobIdList, bool localExecutionSupported)
{
DistributedExecution *execution =
@ -1254,7 +1300,7 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList,
void
StartDistributedExecution(DistributedExecution *execution)
{
TransactionProperties *xactProperties = execution->transactionProperties;
TransactionProperties *xactProperties = &(execution->transactionProperties);
if (xactProperties->useRemoteTransactionBlocks == TRANSACTION_BLOCKS_REQUIRED)
{
@ -1296,6 +1342,20 @@ StartDistributedExecution(DistributedExecution *execution)
*/
RecordParallelRelationAccessForTaskList(execution->remoteAndLocalTaskList);
}
/*
* We skip AssignTasksToConnectionsOrWorkerPool for sequential executions,
* because we do it separately for each task in SequentialRunDistributedExecution.
*/
if (!ShouldRunTasksSequentially(execution->remoteTaskList))
{
/*
* If a (co-located) shard placement was accessed over a session earier in the
* transaction, assign the task to the same session. Otherwise, assign it to
* the general worker pool(s).
*/
AssignTasksToConnectionsOrWorkerPool(execution);
}
}
@ -1639,11 +1699,24 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution)
static void
FinishDistributedExecution(DistributedExecution *execution)
{
/*
* Sequential executions unclaim connections separately.
*/
if (!ShouldRunTasksSequentially(execution->remoteTaskList))
{
CleanUpSessions(execution);
}
if (DistributedExecutionModifiesDatabase(execution))
{
/* prevent copying shards in same transaction */
XactModificationLevel = XACT_MODIFICATION_DATA;
}
if (list_length(execution->jobIdList) > 0)
{
DoRepartitionCleanup(execution->jobIdList);
}
}
@ -1827,7 +1900,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
MultiConnection *connection = NULL;
if (execution->transactionProperties->useRemoteTransactionBlocks !=
if (execution->transactionProperties.useRemoteTransactionBlocks !=
TRANSACTION_BLOCKS_DISALLOWED)
{
/*
@ -2260,8 +2333,20 @@ SequentialRunDistributedExecution(DistributedExecution *execution)
break;
}
/*
* We skipped AssignTasksToConnectionsOrWorkerPool in StartDistributedExecution
* when all the tasks were in the execution. Do it now instead.
*/
AssignTasksToConnectionsOrWorkerPool(execution);
/* simply call the regular execution function */
RunDistributedExecution(execution);
bool runToCompletion = true;
RunDistributedExecution(execution, runToCompletion);
/*
* Unclaim connections since the current execution is technically finished.
*/
CleanUpSessions(execution);
}
/* set back the original execution mode */
@ -2278,12 +2363,10 @@ SequentialRunDistributedExecution(DistributedExecution *execution)
* has an event.
*/
void
RunDistributedExecution(DistributedExecution *execution)
RunDistributedExecution(DistributedExecution *execution, bool toCompletion)
{
WaitEvent *events = NULL;
AssignTasksToConnectionsOrWorkerPool(execution);
PG_TRY();
{
/* Preemptively step state machines in case of immediate errors */
@ -2299,6 +2382,10 @@ RunDistributedExecution(DistributedExecution *execution)
/* always (re)build the wait event set the first time */
execution->rebuildWaitEventSet = true;
execution->rowsReceivedInCurrentRun = 0;
/* TODO: GUC? be smart? */
int maxBatchSize = 10000;
/*
* Iterate until all the tasks are finished. Once all the tasks
@ -2318,8 +2405,10 @@ RunDistributedExecution(DistributedExecution *execution)
* irrespective of the current status of the tasks or the connections.
*/
while (!cancellationReceived &&
(execution->unfinishedTaskCount > 0 ||
HasIncompleteConnectionEstablishment(execution)))
((execution->unfinishedTaskCount > 0 ||
HasIncompleteConnectionEstablishment(execution)) &&
(toCompletion ||
execution->rowsReceivedInCurrentRun < maxBatchSize)))
{
WorkerPool *workerPool = NULL;
foreach_ptr(workerPool, execution->workerList)
@ -2390,8 +2479,6 @@ RunDistributedExecution(DistributedExecution *execution)
FreeWaitEventSet(execution->waitEventSet);
execution->waitEventSet = NULL;
}
CleanUpSessions(execution);
}
PG_CATCH();
{
@ -2593,7 +2680,7 @@ ManageWorkerPool(WorkerPool *workerPool)
/* increase the open rate every cycle (like TCP slow start) */
workerPool->maxNewConnectionsPerCycle += 1;
OpenNewConnections(workerPool, newConnectionCount, execution->transactionProperties);
OpenNewConnections(workerPool, newConnectionCount, &execution->transactionProperties);
/*
* Cannot establish new connections to the local host, most probably because the
@ -3089,7 +3176,7 @@ CheckConnectionTimeout(WorkerPool *workerPool)
*/
logLevel = DEBUG1;
}
else if (execution->transactionProperties->errorOnAnyFailure ||
else if (execution->transactionProperties.errorOnAnyFailure ||
execution->failed)
{
/*
@ -3436,7 +3523,7 @@ ConnectionStateMachine(WorkerSession *session)
* or WorkerPoolFailed.
*/
if (execution->failed ||
(execution->transactionProperties->errorOnAnyFailure &&
(execution->transactionProperties.errorOnAnyFailure &&
workerPool->failureState != WORKER_POOL_FAILED_OVER_TO_LOCAL))
{
/* a task has failed due to this connection failure */
@ -3633,7 +3720,7 @@ TransactionModifiedDistributedTable(DistributedExecution *execution)
* should not be pretending that we're in a coordinated transaction even
* if XACT_MODIFICATION_DATA is set. That's why we implemented this workaround.
*/
return execution->transactionProperties->useRemoteTransactionBlocks ==
return execution->transactionProperties.useRemoteTransactionBlocks ==
TRANSACTION_BLOCKS_REQUIRED &&
XactModificationLevel == XACT_MODIFICATION_DATA;
}
@ -3648,7 +3735,7 @@ TransactionStateMachine(WorkerSession *session)
WorkerPool *workerPool = session->workerPool;
DistributedExecution *execution = workerPool->distributedExecution;
TransactionBlocksUsage useRemoteTransactionBlocks =
execution->transactionProperties->useRemoteTransactionBlocks;
execution->transactionProperties.useRemoteTransactionBlocks;
MultiConnection *connection = session->connection;
RemoteTransaction *transaction = &(connection->remoteTransaction);
@ -4068,7 +4155,7 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
ShardPlacement *taskPlacement = placementExecution->shardPlacement;
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
if (execution->transactionProperties->useRemoteTransactionBlocks !=
if (execution->transactionProperties.useRemoteTransactionBlocks !=
TRANSACTION_BLOCKS_DISALLOWED)
{
/*
@ -4421,6 +4508,7 @@ ReceiveResults(WorkerSession *session, bool storeRows)
MemoryContextReset(rowContext);
execution->rowsProcessed++;
execution->rowsReceivedInCurrentRun++;
}
PQclear(result);
@ -4926,7 +5014,7 @@ static bool
ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution)
{
if (!DistributedExecutionModifiesDatabase(execution) ||
execution->transactionProperties->errorOnAnyFailure)
execution->transactionProperties.errorOnAnyFailure)
{
/*
* Failures that do not modify the database (e.g., mainly SELECTs) should

View File

@ -14,6 +14,7 @@
#include "miscadmin.h"
#include "commands/copy.h"
#include "distributed/adaptive_executor.h"
#include "distributed/backend_data.h"
#include "distributed/citus_clauses.h"
#include "distributed/citus_custom_scan.h"
@ -219,14 +220,25 @@ CitusExecScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
if (!scanState->finishedRemoteScan)
if (!scanState->executionStarted)
{
AdaptiveExecutor(scanState);
AdaptiveExecutorStart(scanState);
scanState->finishedRemoteScan = true;
scanState->executionStarted = true;
}
return ReturnTupleFromTuplestore(scanState);
TupleTableSlot *resultSlot = ReturnTupleFromTuplestore(scanState);
if (TupIsNull(resultSlot) && !scanState->finishedRemoteScan)
{
/* clear the tuple store for the next batch */
tuplestore_clear(scanState->tuplestorestate);
scanState->finishedRemoteScan = AdaptiveExecutorRun(scanState);
resultSlot = ReturnTupleFromTuplestore(scanState);
}
return resultSlot;
}
@ -582,6 +594,7 @@ AdaptiveExecutorCreateScan(CustomScan *scan)
scanState->finishedPreScan = false;
scanState->finishedRemoteScan = false;
scanState->executionStarted = false;
return (Node *) scanState;
}

View File

@ -1,8 +1,10 @@
#ifndef ADAPTIVE_EXECUTOR_H
#define ADAPTIVE_EXECUTOR_H
#include "distributed/citus_custom_scan.h"
#include "distributed/multi_physical_planner.h"
/* GUC, determining whether Citus opens 1 connection per task */
extern bool ForceMaxQueryParallelization;
extern int MaxAdaptiveExecutorPoolSize;
@ -14,6 +16,9 @@ extern int ExecutorSlowStartInterval;
extern bool EnableCostBasedConnectionEstablishment;
extern bool PreventIncompleteConnectionEstablishment;
extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState);
extern void AdaptiveExecutorStart(CitusScanState *scanState);
extern bool AdaptiveExecutorRun(CitusScanState *scanState);
extern bool ShouldRunTasksSequentially(List *taskList);
extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);
extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize,

View File

@ -15,6 +15,10 @@
#include "executor/execdesc.h"
#include "nodes/plannodes.h"
struct DistributedExecution;
typedef struct CitusScanState
{
CustomScanState customScanState; /* underlying custom scan node */
@ -26,7 +30,11 @@ typedef struct CitusScanState
DistributedPlan *distributedPlan; /* distributed execution plan */
MultiExecutorType executorType; /* distributed executor type */
bool finishedRemoteScan; /* flag to check if remote scan is finished */
bool executionStarted; /* flag to check whether execution started */
Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */
/* execution state when using adaptive executor */
struct DistributedExecution *execution;
} CitusScanState;

View File

@ -73,8 +73,6 @@ extern int ExecutorLevel;
extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags);
extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count,
bool execute_once);
extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState);
extern TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState);
/*