adaptiveExecutor_DestReceiver
SaitTalhaNisanci 2020-02-12 22:27:58 +03:00
parent fc1fe0244e
commit f4828f547f
1 changed files with 23 additions and 9 deletions

View File

@ -163,6 +163,8 @@
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/timestamp.h"
#include "executor/tstoreReceiver.h"
#include "executor/tuptable.h"
/*
@ -191,7 +193,7 @@ typedef struct DistributedExecution
/* Tuple descriptor and destination for result. Can be NULL. */
TupleDesc tupleDescriptor;
Tuplestorestate *tupleStore;
DestReceiver* destReceiver;
/* list of workers involved in the execution */
@ -547,7 +549,7 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel
bool hasReturning,
ParamListInfo paramListInfo,
TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore,
DestReceiver *destReceiver,
int targetPoolSize,
TransactionProperties *
xactProperties,
@ -633,6 +635,7 @@ AdaptiveExecutor(CitusScanState *scanState)
EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info;
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
DestReceiver *tupleStoreDestReceiever = CreateTuplestoreDestReceiver();
bool randomAccess = true;
bool interTransactions = false;
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
@ -668,6 +671,10 @@ AdaptiveExecutor(CitusScanState *scanState)
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
SetTuplestoreDestReceiverParams(tupleStoreDestReceiever,
scanState->tuplestorestate,
CurrentMemoryContext, false);
TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList(
distributedPlan->modLevel, taskList,
hasDependentJobs);
@ -679,7 +686,7 @@ AdaptiveExecutor(CitusScanState *scanState)
distributedPlan->hasReturning,
paramListInfo,
tupleDescriptor,
scanState->tuplestorestate,
tupleStoreDestReceiever,
targetPoolSize,
&xactProperties,
jobIdList);
@ -889,9 +896,14 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
targetPoolSize = 1;
}
DestReceiver *tupleStoreDestReceiever = CreateTuplestoreDestReceiver();
SetTuplestoreDestReceiverParams(tupleStoreDestReceiever,
tupleStore,
CurrentMemoryContext, false);
DistributedExecution *execution =
CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo,
tupleDescriptor, tupleStore, targetPoolSize,
tupleDescriptor, tupleStoreDestReceiever, targetPoolSize,
xactProperties, jobIdList);
StartDistributedExecution(execution);
@ -910,7 +922,7 @@ static DistributedExecution *
CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
bool hasReturning,
ParamListInfo paramListInfo, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, int targetPoolSize,
DestReceiver *destReceiver, int targetPoolSize,
TransactionProperties *xactProperties, List *jobIdList)
{
DistributedExecution *execution =
@ -928,7 +940,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
(DistributedExecutionStats *) palloc0(sizeof(DistributedExecutionStats));
execution->paramListInfo = paramListInfo;
execution->tupleDescriptor = tupleDescriptor;
execution->tupleStore = tupleStore;
execution->destReceiver = destReceiver;
execution->workerList = NIL;
execution->sessionList = NIL;
@ -1114,6 +1126,8 @@ StartDistributedExecution(DistributedExecution *execution)
{
RecordParallelRelationAccessForTaskList(execution->tasksToExecute);
}
//execution->destReceiver->rStartup(execution->destReceiver, , execution->tupleDescriptor);
}
@ -1472,6 +1486,7 @@ FinishDistributedExecution(DistributedExecution *execution)
/* prevent copying shards in same transaction */
XactModificationLevel = XACT_MODIFICATION_DATA;
}
execution->destReceiver->rDestroy(execution->destReceiver);
}
@ -3204,7 +3219,7 @@ ReceiveResults(WorkerSession *session, bool storeRows)
AttInMetadata *attributeInputMetadata = execution->attributeInputMetadata;
uint32 expectedColumnCount = 0;
char **columnArray = execution->columnArray;
Tuplestorestate *tupleStore = execution->tupleStore;
DestReceiver *destReceiver = execution->destReceiver;
if (tupleDescriptor != NULL)
{
@ -3328,8 +3343,7 @@ ReceiveResults(WorkerSession *session, bool storeRows)
columnArray);
MemoryContextSwitchTo(oldContextPerRow);
tuplestore_puttuple(tupleStore, heapTuple);
//destReceiver->receiveSlot(TupleDescGetSlot(), destReceiver);
MemoryContextReset(ioContext);
execution->rowsProcessed++;