From f4828f547f73e07d06cf8580342f6b4a85849cc0 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Wed, 12 Feb 2020 22:27:58 +0300 Subject: [PATCH] wip --- .../distributed/executor/adaptive_executor.c | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 415c26d2a..8ae383af8 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -163,6 +163,8 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/timestamp.h" +#include "executor/tstoreReceiver.h" +#include "executor/tuptable.h" /* @@ -191,7 +193,7 @@ typedef struct DistributedExecution /* Tuple descriptor and destination for result. Can be NULL. */ TupleDesc tupleDescriptor; - Tuplestorestate *tupleStore; + DestReceiver* destReceiver; /* list of workers involved in the execution */ @@ -547,7 +549,7 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel bool hasReturning, ParamListInfo paramListInfo, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore, + DestReceiver *destReceiver, int targetPoolSize, TransactionProperties * xactProperties, @@ -633,6 +635,7 @@ AdaptiveExecutor(CitusScanState *scanState) EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); + DestReceiver *tupleStoreDestReceiever = CreateTuplestoreDestReceiver(); bool randomAccess = true; bool interTransactions = false; int targetPoolSize = MaxAdaptiveExecutorPoolSize; @@ -668,6 +671,10 @@ AdaptiveExecutor(CitusScanState *scanState) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); + SetTuplestoreDestReceiverParams(tupleStoreDestReceiever, + scanState->tuplestorestate, + CurrentMemoryContext, false); + TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( distributedPlan->modLevel, taskList, hasDependentJobs); @@ -679,7 +686,7 @@ AdaptiveExecutor(CitusScanState *scanState) distributedPlan->hasReturning, paramListInfo, tupleDescriptor, - scanState->tuplestorestate, + tupleStoreDestReceiever, targetPoolSize, &xactProperties, jobIdList); @@ -889,9 +896,14 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, targetPoolSize = 1; } + DestReceiver *tupleStoreDestReceiever = CreateTuplestoreDestReceiver(); + SetTuplestoreDestReceiverParams(tupleStoreDestReceiever, + tupleStore, + CurrentMemoryContext, false); + DistributedExecution *execution = CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo, - tupleDescriptor, tupleStore, targetPoolSize, + tupleDescriptor, tupleStoreDestReceiever, targetPoolSize, xactProperties, jobIdList); StartDistributedExecution(execution); @@ -910,7 +922,7 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning, ParamListInfo paramListInfo, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore, int targetPoolSize, + DestReceiver *destReceiver, int targetPoolSize, TransactionProperties *xactProperties, List *jobIdList) { DistributedExecution *execution = @@ -928,7 +940,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, (DistributedExecutionStats *) palloc0(sizeof(DistributedExecutionStats)); execution->paramListInfo = paramListInfo; execution->tupleDescriptor = tupleDescriptor; - execution->tupleStore = tupleStore; + execution->destReceiver = destReceiver; execution->workerList = NIL; execution->sessionList = NIL; @@ -1114,6 +1126,8 @@ StartDistributedExecution(DistributedExecution *execution) { RecordParallelRelationAccessForTaskList(execution->tasksToExecute); } + + //execution->destReceiver->rStartup(execution->destReceiver, , execution->tupleDescriptor); } @@ -1472,6 +1486,7 @@ FinishDistributedExecution(DistributedExecution *execution) /* prevent copying shards in same transaction */ XactModificationLevel = XACT_MODIFICATION_DATA; } + execution->destReceiver->rDestroy(execution->destReceiver); } @@ -3204,7 +3219,7 @@ ReceiveResults(WorkerSession *session, bool storeRows) AttInMetadata *attributeInputMetadata = execution->attributeInputMetadata; uint32 expectedColumnCount = 0; char **columnArray = execution->columnArray; - Tuplestorestate *tupleStore = execution->tupleStore; + DestReceiver *destReceiver = execution->destReceiver; if (tupleDescriptor != NULL) { @@ -3328,8 +3343,7 @@ ReceiveResults(WorkerSession *session, bool storeRows) columnArray); MemoryContextSwitchTo(oldContextPerRow); - - tuplestore_puttuple(tupleStore, heapTuple); + //destReceiver->receiveSlot(TupleDescGetSlot(), destReceiver); MemoryContextReset(ioContext); execution->rowsProcessed++;