diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index b3952d2b8..ae2ae34b7 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -155,6 +155,7 @@ #include "distributed/resource_lock.h" #include "distributed/subplan_execution.h" #include "distributed/transaction_management.h" +#include "distributed/tuple_destination.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" #include "lib/ilist.h" @@ -193,14 +194,15 @@ typedef struct DistributedExecution */ bool expectResults; + /* + * If a task specific destination is not provided for a task, then use + * defaultTupleDest. + */ + TupleDestination *defaultTupleDest; + /* Parameters for parameterized plans. Can be NULL. */ ParamListInfo paramListInfo; - /* Tuple descriptor and destination for result. Can be NULL. */ - TupleDesc tupleDescriptor; - Tuplestorestate *tupleStore; - - /* list of workers involved in the execution */ List *workerList; @@ -284,7 +286,7 @@ typedef struct DistributedExecution * contexts. The benefit of keeping it here is to avoid allocating the array * over and over again. */ - AttInMetadata *attributeInputMetadata; + uint32 allocatedColumnCount; char **columnArray; /* @@ -478,6 +480,9 @@ typedef struct ShardCommandExecution /* description of the task */ Task *task; + /* cached AttInMetadata for task */ + AttInMetadata **attributeInputMetadata; + /* order in which the command should be replicated on replicas */ PlacementExecutionOrder executionOrder; @@ -525,6 +530,12 @@ typedef struct TaskPlacementExecution /* state of the execution of the command on the placement */ TaskPlacementExecutionState executionState; + /* + * Task query can contain multiple queries. queryIndex tracks results of + * which query we are waiting for. + */ + uint32 queryIndex; + /* worker pool on which the placement needs to be executed */ WorkerPool *workerPool; @@ -553,9 +564,9 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel List *taskList, bool expectResults, ParamListInfo paramListInfo, - TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore, int targetPoolSize, + TupleDestination * + defaultTupleDest, TransactionProperties * xactProperties, List *jobIdList); @@ -665,7 +676,6 @@ AdaptiveExecutor(CitusScanState *scanState) DistributedPlan *distributedPlan = scanState->distributedPlan; EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; - TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); bool randomAccess = true; bool interTransactions = false; int targetPoolSize = MaxAdaptiveExecutorPoolSize; @@ -677,6 +687,13 @@ AdaptiveExecutor(CitusScanState *scanState) /* we should only call this once before the scan finished */ Assert(!scanState->finishedRemoteScan); + scanState->tuplestorestate = + tuplestore_begin_heap(randomAccess, interTransactions, work_mem); + + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); + TupleDestination *defaultTupleDest = + CreateTupleStoreTupleDest(scanState->tuplestorestate, tupleDescriptor); + bool hasDependentJobs = HasDependentJobs(job); if (hasDependentJobs) { @@ -689,9 +706,6 @@ AdaptiveExecutor(CitusScanState *scanState) targetPoolSize = 1; } - scanState->tuplestorestate = - tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( distributedPlan->modLevel, taskList, hasDependentJobs); @@ -702,9 +716,8 @@ AdaptiveExecutor(CitusScanState *scanState) taskList, distributedPlan->expectResults, paramListInfo, - tupleDescriptor, - scanState->tuplestorestate, targetPoolSize, + defaultTupleDest, &xactProperties, jobIdList); @@ -794,7 +807,7 @@ RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList, estate->es_param_list_info, scanState->distributedPlan, - scanState->tuplestorestate, + execution->defaultTupleDest, isUtilityCommand); /* @@ -921,6 +934,17 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) List *localTaskList = NIL; List *remoteTaskList = NIL; + TupleDestination *defaultTupleDest = NULL; + if (executionParams->tupleDescriptor != NULL) + { + defaultTupleDest = CreateTupleStoreTupleDest(executionParams->tupleStore, + executionParams->tupleDescriptor); + } + else + { + defaultTupleDest = CreateTupleDestNone(); + } + if (executionParams->localExecutionSupported && ShouldExecuteTasksLocally( executionParams->taskList)) { @@ -952,8 +976,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) } else { - locallyProcessedRows += ExecuteLocalTaskList(localTaskList, - executionParams->tupleStore); + locallyProcessedRows += ExecuteLocalTaskList(localTaskList, defaultTupleDest); } if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) @@ -965,8 +988,8 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) CreateDistributedExecution( executionParams->modLevel, remoteTaskList, executionParams->expectResults, paramListInfo, - executionParams->tupleDescriptor, executionParams->tupleStore, - executionParams->targetPoolSize, &executionParams->xactProperties, + executionParams->targetPoolSize, defaultTupleDest, + &executionParams->xactProperties, executionParams->jobIdList); StartDistributedExecution(execution); @@ -1009,10 +1032,10 @@ CreateBasicExecutionParams(RowModifyLevel modLevel, */ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, - bool expectResults, - ParamListInfo paramListInfo, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore, int targetPoolSize, - TransactionProperties *xactProperties, List *jobIdList) + bool expectResults, ParamListInfo paramListInfo, + int targetPoolSize, TupleDestination *defaultTupleDest, + TransactionProperties *xactProperties, + List *jobIdList) { DistributedExecution *execution = (DistributedExecution *) palloc0(sizeof(DistributedExecution)); @@ -1028,12 +1051,10 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, execution->executionStats = (DistributedExecutionStats *) palloc0(sizeof(DistributedExecutionStats)); execution->paramListInfo = paramListInfo; - execution->tupleDescriptor = tupleDescriptor; - execution->tupleStore = tupleStore; - execution->workerList = NIL; execution->sessionList = NIL; execution->targetPoolSize = targetPoolSize; + execution->defaultTupleDest = defaultTupleDest; execution->totalTaskCount = list_length(taskList); execution->unfinishedTaskCount = list_length(taskList); @@ -1046,18 +1067,12 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, execution->jobIdList = jobIdList; - /* allocate execution specific data once, on the ExecutorState memory context */ - if (tupleDescriptor != NULL) - { - execution->attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); - execution->columnArray = - (char **) palloc0(tupleDescriptor->natts * sizeof(char *)); - } - else - { - execution->attributeInputMetadata = NULL; - execution->columnArray = NULL; - } + /* + * Since task can have multiple queries, we are not sure how many columns we should + * allocate for. We start with 16, and reallocate when we need more. + */ + execution->allocatedColumnCount = 16; + execution->columnArray = palloc0(execution->allocatedColumnCount * sizeof(char *)); if (ShouldExecuteTasksLocally(taskList)) { @@ -1709,6 +1724,23 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) sizeof(TaskPlacementExecution *)); shardCommandExecution->placementExecutionCount = placementExecutionCount; + TupleDestination *tupleDest = task->tupleDest ? + task->tupleDest : + execution->defaultTupleDest; + uint32 queryCount = task->queryCount; + shardCommandExecution->attributeInputMetadata = palloc0(queryCount * + sizeof(AttInMetadata *)); + + for (uint32 queryIndex = 0; queryIndex < queryCount; queryIndex++) + { + TupleDesc tupleDescriptor = tupleDest->tupleDescForQuery(tupleDest, + queryIndex); + shardCommandExecution->attributeInputMetadata[queryIndex] = + tupleDescriptor ? + TupleDescGetAttInMetadata(tupleDescriptor) : + NULL; + } + shardCommandExecution->expectResults = expectResults && !task->partiallyLocalOrRemote; @@ -1731,6 +1763,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) placementExecution->shardPlacement = taskPlacement; placementExecution->workerPool = workerPool; placementExecution->placementExecutionIndex = placementExecutionIndex; + placementExecution->queryIndex = 0; if (placementExecutionReady) { @@ -3412,16 +3445,13 @@ ReceiveResults(WorkerSession *session, bool storeRows) WorkerPool *workerPool = session->workerPool; DistributedExecution *execution = workerPool->distributedExecution; DistributedExecutionStats *executionStats = execution->executionStats; - TupleDesc tupleDescriptor = execution->tupleDescriptor; - AttInMetadata *attributeInputMetadata = execution->attributeInputMetadata; - uint32 expectedColumnCount = 0; - char **columnArray = execution->columnArray; - Tuplestorestate *tupleStore = execution->tupleStore; - - if (tupleDescriptor != NULL) - { - expectedColumnCount = tupleDescriptor->natts; - } + TaskPlacementExecution *placementExecution = session->currentTask; + ShardCommandExecution *shardCommandExecution = + placementExecution->shardCommandExecution; + Task *task = placementExecution->shardCommandExecution->task; + TupleDestination *tupleDest = task->tupleDest ? + task->tupleDest : + execution->defaultTupleDest; /* * We use this context while converting each row fetched from remote node @@ -3452,8 +3482,6 @@ ReceiveResults(WorkerSession *session, bool storeRows) { char *currentAffectedTupleString = PQcmdTuples(result); int64 currentAffectedTupleCount = 0; - ShardCommandExecution *shardCommandExecution = - session->currentTask->shardCommandExecution; /* if there are multiple replicas, make sure to consider only one */ if (!shardCommandExecution->gotResults && *currentAffectedTupleString != '\0') @@ -3466,9 +3494,9 @@ ReceiveResults(WorkerSession *session, bool storeRows) PQclear(result); - /* no more results, break out of loop and free allocated memory */ - fetchDone = true; - break; + /* task query might contain multiple queries, so fetch until we reach NULL */ + placementExecution->queryIndex++; + continue; } else if (resultStatus == PGRES_TUPLES_OK) { @@ -3479,8 +3507,9 @@ ReceiveResults(WorkerSession *session, bool storeRows) Assert(PQntuples(result) == 0); PQclear(result); - fetchDone = true; - break; + /* task query might contain multiple queries, so fetch until we reach NULL */ + placementExecution->queryIndex++; + continue; } else if (resultStatus != PGRES_SINGLE_TUPLE) { @@ -3497,8 +3526,16 @@ ReceiveResults(WorkerSession *session, bool storeRows) continue; } + uint32 queryIndex = placementExecution->queryIndex; + TupleDesc tupleDescriptor = tupleDest->tupleDescForQuery(tupleDest, queryIndex); + if (tupleDescriptor == NULL) + { + continue; + } + rowsProcessed = PQntuples(result); uint32 columnCount = PQnfields(result); + uint32 expectedColumnCount = tupleDescriptor->natts; if (columnCount != expectedColumnCount) { @@ -3507,6 +3544,16 @@ ReceiveResults(WorkerSession *session, bool storeRows) columnCount, expectedColumnCount))); } + if (columnCount > execution->allocatedColumnCount) + { + pfree(execution->columnArray); + execution->allocatedColumnCount = columnCount; + execution->columnArray = palloc0(execution->allocatedColumnCount * + sizeof(char *)); + } + + char **columnArray = execution->columnArray; + for (uint32 rowIndex = 0; rowIndex < rowsProcessed; rowIndex++) { memset(columnArray, 0, columnCount * sizeof(char *)); @@ -3536,12 +3583,16 @@ ReceiveResults(WorkerSession *session, bool storeRows) */ MemoryContext oldContextPerRow = MemoryContextSwitchTo(ioContext); - HeapTuple heapTuple = BuildTupleFromCStrings(attributeInputMetadata, - columnArray); + AttInMetadata *attInMetadata = + shardCommandExecution->attributeInputMetadata[queryIndex]; + HeapTuple heapTuple = BuildTupleFromCStrings(attInMetadata, columnArray); MemoryContextSwitchTo(oldContextPerRow); - tuplestore_puttuple(tupleStore, heapTuple); + tupleDest->putTuple(tupleDest, task, + placementExecution->placementExecutionIndex, queryIndex, + heapTuple); + MemoryContextReset(ioContext); execution->rowsProcessed++; diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index f8a8c14e1..7da48f5d6 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -117,11 +117,12 @@ static void SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList, List **remoteTaskPlacementList); static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, - Tuplestorestate *tupleStoreState, ParamListInfo - paramListInfo); + TupleDestination *tupleDest, Task *task, + ParamListInfo paramListInfo); static void LogLocalCommand(Task *task); static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings, - Tuplestorestate *tupleStoreState); + TupleDestination *tupleDest, + Task *task); static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); @@ -139,7 +140,7 @@ static void EnsureTransitionPossible(LocalExecutionStatus from, * The function returns totalRowsProcessed. */ uint64 -ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState) +ExecuteLocalTaskList(List *taskList, TupleDestination *defaultTupleDest) { if (list_length(taskList) == 0) { @@ -149,7 +150,7 @@ ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState) ParamListInfo paramListInfo = NULL; bool isUtilityCommand = false; return ExecuteLocalTaskListExtended(taskList, paramListInfo, distributedPlan, - tupleStoreState, isUtilityCommand); + defaultTupleDest, isUtilityCommand); } @@ -167,10 +168,10 @@ ExecuteLocalUtilityTaskList(List *utilityTaskList) } DistributedPlan *distributedPlan = NULL; ParamListInfo paramListInfo = NULL; - Tuplestorestate *tupleStoreState = NULL; + TupleDestination *defaultTupleDest = CreateTupleDestNone(); bool isUtilityCommand = true; return ExecuteLocalTaskListExtended(utilityTaskList, paramListInfo, distributedPlan, - tupleStoreState, isUtilityCommand); + defaultTupleDest, isUtilityCommand); } @@ -188,7 +189,7 @@ uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, DistributedPlan *distributedPlan, - Tuplestorestate *tupleStoreState, + TupleDestination *defaultTupleDest, bool isUtilityCommand) { ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); @@ -207,14 +208,13 @@ ExecuteLocalTaskListExtended(List *taskList, numParams = paramListInfo->numParams; } - if (tupleStoreState == NULL) - { - tupleStoreState = tuplestore_begin_heap(true, false, work_mem); - } - Task *task = NULL; foreach_ptr(task, taskList) { + TupleDestination *tupleDest = task->tupleDest ? + task->tupleDest : + defaultTupleDest; + /* * If we have a valid shard id, a distributed table will be accessed * during execution. Record it to apply the restrictions related to @@ -278,7 +278,8 @@ ExecuteLocalTaskListExtended(List *taskList, List *queryStringList = task->taskQuery.data.queryStringList; totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries( queryStringList, - tupleStoreState); + tupleDest, + task); continue; } @@ -312,8 +313,8 @@ ExecuteLocalTaskListExtended(List *taskList, } totalRowsProcessed += - ExecuteLocalTaskPlan(localPlan, shardQueryString, tupleStoreState, - paramListInfo); + ExecuteLocalTaskPlan(localPlan, shardQueryString, + tupleDest, task, paramListInfo); } return totalRowsProcessed; @@ -325,7 +326,8 @@ ExecuteLocalTaskListExtended(List *taskList, * one by one. */ static uint64 -LocallyPlanAndExecuteMultipleQueries(List *queryStrings, Tuplestorestate *tupleStoreState) +LocallyPlanAndExecuteMultipleQueries(List *queryStrings, TupleDestination *tupleDest, + Task *task) { char *queryString = NULL; uint64 totalProcessedRows = 0; @@ -338,7 +340,7 @@ LocallyPlanAndExecuteMultipleQueries(List *queryStrings, Tuplestorestate *tupleS ParamListInfo paramListInfo = NULL; PlannedStmt *localPlan = planner(shardQuery, cursorOptions, paramListInfo); totalProcessedRows += ExecuteLocalTaskPlan(localPlan, queryString, - tupleStoreState, + tupleDest, task, paramListInfo); } return totalProcessedRows; @@ -538,9 +540,9 @@ SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacement */ static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, - Tuplestorestate *tupleStoreState, ParamListInfo paramListInfo) + TupleDestination *tupleDest, Task *task, + ParamListInfo paramListInfo) { - DestReceiver *tupleStoreDestReceiver = CreateDestReceiver(DestTuplestore); ScanDirection scanDirection = ForwardScanDirection; QueryEnvironment *queryEnv = create_queryEnv(); int eflags = 0; @@ -550,14 +552,15 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, * Use the tupleStore provided by the scanState because it is shared accross * the other task executions and the adaptive executor. */ - SetTuplestoreDestReceiverParams(tupleStoreDestReceiver, - tupleStoreState, - CurrentMemoryContext, false); + DestReceiver *destReceiver = tupleDest ? + CreateTupleDestDestReceiver(tupleDest, task, + LOCAL_PLACEMENT_INDEX) : + CreateDestReceiver(DestNone); /* Create a QueryDesc for the query */ QueryDesc *queryDesc = CreateQueryDesc(taskPlan, queryString, GetActiveSnapshot(), InvalidSnapshot, - tupleStoreDestReceiver, paramListInfo, + destReceiver, paramListInfo, queryEnv, 0); ExecutorStart(queryDesc, eflags); diff --git a/src/backend/distributed/executor/tuple_destination.c b/src/backend/distributed/executor/tuple_destination.c new file mode 100644 index 000000000..f4f12fbd0 --- /dev/null +++ b/src/backend/distributed/executor/tuple_destination.c @@ -0,0 +1,230 @@ +#include "postgres.h" +#include "funcapi.h" +#include "libpq-fe.h" +#include "miscadmin.h" +#include "pgstat.h" + +#include +#include + +#include "distributed/tuple_destination.h" + +/* + * TupleStoreTupleDestination is internal representation of a TupleDestination + * which forwards tuples to a tuple store. + */ +typedef struct TupleStoreTupleDestination +{ + TupleDestination pub; + + /* destination of tuples */ + Tuplestorestate *tupleStore; + + /* how does tuples look like? */ + TupleDesc tupleDesc; +} TupleStoreTupleDestination; + +/* + * TupleDestDestReceiver is internal representation of a DestReceiver which + * forards tuples to a tuple destination. + */ +typedef struct TupleDestDestReceiver +{ + DestReceiver pub; + TupleDestination *tupleDest; + + /* parameters to pass to tupleDest->putTuple() */ + Task *task; + int placementIndex; +} TupleDestDestReceiver; + + +/* forward declarations for local functions */ +static void TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task, + int placementIndex, int queryNumber, + HeapTuple heapTuple); +static TupleDesc TupleStoreTupleDestTupleDescForQuery(TupleDestination *self, int + queryNumber); +static void TupleDestNonePutTuple(TupleDestination *self, Task *task, + int placementIndex, int queryNumber, + HeapTuple heapTuple); +static TupleDesc TupleDestNoneTupleDescForQuery(TupleDestination *self, int queryNumber); +static void TupleDestDestReceiverStartup(DestReceiver *copyDest, int operation, + TupleDesc inputTupleDesc); +static bool TupleDestDestReceiverReceive(TupleTableSlot *slot, + DestReceiver *copyDest); +static void TupleDestDestReceiverShutdown(DestReceiver *destReceiver); +static void TupleDestDestReceiverDestroy(DestReceiver *destReceiver); + + +/* + * CreateTupleStoreTupleDest creates a TupleDestination which forwards tuples to + * a tupleStore. + */ +TupleDestination * +CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) +{ + TupleStoreTupleDestination *tupleStoreTupleDest = palloc0( + sizeof(TupleStoreTupleDestination)); + tupleStoreTupleDest->tupleStore = tupleStore; + tupleStoreTupleDest->tupleDesc = tupleDescriptor; + tupleStoreTupleDest->pub.putTuple = TupleStoreTupleDestPutTuple; + tupleStoreTupleDest->pub.tupleDescForQuery = + TupleStoreTupleDestTupleDescForQuery; + + return (TupleDestination *) tupleStoreTupleDest; +} + + +/* + * TupleStoreTupleDestPutTuple implements TupleDestination->putTuple for + * TupleStoreTupleDestination. + */ +static void +TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task, + int placementIndex, int queryNumber, + HeapTuple heapTuple) +{ + TupleStoreTupleDestination *tupleDest = (TupleStoreTupleDestination *) self; + tuplestore_puttuple(tupleDest->tupleStore, heapTuple); +} + + +/* + * TupleStoreTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery + * for TupleStoreTupleDestination. + */ +static TupleDesc +TupleStoreTupleDestTupleDescForQuery(TupleDestination *self, int queryNumber) +{ + Assert(queryNumber == 0); + + TupleStoreTupleDestination *tupleDest = (TupleStoreTupleDestination *) self; + + return tupleDest->tupleDesc; +} + + +/* + * CreateTupleDestNone creates a tuple destination which ignores the tuples. + */ +TupleDestination * +CreateTupleDestNone(void) +{ + TupleDestination *tupleDest = palloc0( + sizeof(TupleDestination)); + tupleDest->putTuple = TupleDestNonePutTuple; + tupleDest->tupleDescForQuery = TupleDestNoneTupleDescForQuery; + + return (TupleDestination *) tupleDest; +} + + +/* + * TupleStoreTupleDestPutTuple implements TupleDestination->putTuple for + * no-op tuple destination. + */ +static void +TupleDestNonePutTuple(TupleDestination *self, Task *task, + int placementIndex, int queryNumber, + HeapTuple heapTuple) +{ + /* nothing to do */ +} + + +/* + * TupleStoreTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery + * for no-op tuple destination. + */ +static TupleDesc +TupleDestNoneTupleDescForQuery(TupleDestination *self, int queryNumber) +{ + return NULL; +} + + +/* + * CreateTupleDestDestReceiver creates a dest receiver which forwards tuples + * to a tuple destination. + */ +DestReceiver * +CreateTupleDestDestReceiver(TupleDestination *tupleDest, Task *task, int placementIndex) +{ + TupleDestDestReceiver *destReceiver = palloc0(sizeof(TupleDestDestReceiver)); + destReceiver->pub.rStartup = TupleDestDestReceiverStartup; + destReceiver->pub.receiveSlot = TupleDestDestReceiverReceive; + destReceiver->pub.rShutdown = TupleDestDestReceiverShutdown; + destReceiver->pub.rDestroy = TupleDestDestReceiverDestroy; + + destReceiver->tupleDest = tupleDest; + destReceiver->task = task; + destReceiver->placementIndex = placementIndex; + + return (DestReceiver *) destReceiver; +} + + +/* + * TupleDestDestReceiverStartup implements DestReceiver->rStartup for + * TupleDestDestReceiver. + */ +static void +TupleDestDestReceiverStartup(DestReceiver *destReceiver, int operation, + TupleDesc inputTupleDesc) +{ + /* nothing to do */ +} + + +/* + * TupleDestDestReceiverStartup implements DestReceiver->receiveSlot for + * TupleDestDestReceiver. + */ +static bool +TupleDestDestReceiverReceive(TupleTableSlot *slot, + DestReceiver *destReceiver) +{ + TupleDestDestReceiver *tupleDestReceiver = (TupleDestDestReceiver *) destReceiver; + TupleDestination *tupleDest = tupleDestReceiver->tupleDest; + Task *task = tupleDestReceiver->task; + int placementIndex = tupleDestReceiver->placementIndex; + + /* + * DestReceiver doesn't support multiple result sets with different shapes. + */ + Assert(task->queryCount == 1); + int queryNumber = 0; + +#if PG_VERSION_NUM >= PG_VERSION_12 + HeapTuple heapTuple = ExecFetchSlotHeapTuple(slot, true, NULL); +#else + HeapTuple heapTuple = ExecFetchSlotTuple(slot); +#endif + + tupleDest->putTuple(tupleDest, task, placementIndex, queryNumber, heapTuple); + + return true; +} + + +/* + * TupleDestDestReceiverStartup implements DestReceiver->rShutdown for + * TupleDestDestReceiver. + */ +static void +TupleDestDestReceiverShutdown(DestReceiver *destReceiver) +{ + /* nothing to do */ +} + + +/* + * TupleDestDestReceiverStartup implements DestReceiver->rDestroy for + * TupleDestDestReceiver. + */ +static void +TupleDestDestReceiverDestroy(DestReceiver *destReceiver) +{ + /* nothing to do */ +} diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 82b1ec539..137c506ec 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -428,6 +428,7 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query) { task->taskQuery.queryType = TASK_QUERY_OBJECT; task->taskQuery.data.jobQueryReferenceForLazyDeparsing = query; + task->queryCount = 1; return; } @@ -446,11 +447,13 @@ SetTaskQueryString(Task *task, char *queryString) if (queryString == NULL) { task->taskQuery.queryType = TASK_QUERY_NULL; + task->queryCount = 0; } else { task->taskQuery.queryType = TASK_QUERY_TEXT; task->taskQuery.data.queryStringLazy = queryString; + task->queryCount = 1; } } @@ -464,6 +467,7 @@ SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList) Assert(perPlacementQueryStringList != NIL); task->taskQuery.queryType = TASK_QUERY_TEXT_PER_PLACEMENT; task->taskQuery.data.perPlacementQueryStrings = perPlacementQueryStringList; + task->queryCount = 1; } @@ -476,6 +480,7 @@ SetTaskQueryStringList(Task *task, List *queryStringList) Assert(queryStringList != NIL); task->taskQuery.queryType = TASK_QUERY_TEXT_LIST; task->taskQuery.data.queryStringList = queryStringList; + task->queryCount = list_length(queryStringList); } diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 80afd89b2..6c5f16883 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -327,6 +327,8 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_NODE_FIELD(rowValuesLists); COPY_SCALAR_FIELD(partiallyLocalOrRemote); COPY_SCALAR_FIELD(parametersInQueryStringResolved); + COPY_SCALAR_FIELD(tupleDest); + COPY_SCALAR_FIELD(queryCount); } diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 37a5696f0..74c8a7a1d 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -12,6 +12,13 @@ #define LOCAL_EXECUTION_H #include "distributed/citus_custom_scan.h" +#include "distributed/tuple_destination.h" + +/* + * Used as TupleDestination->putTuple's placementIndex when executing + * local tasks. + */ +#define LOCAL_PLACEMENT_INDEX -1 /* enabled with GUCs*/ extern bool EnableLocalExecution; @@ -27,13 +34,12 @@ typedef enum LocalExecutionStatus extern enum LocalExecutionStatus CurrentLocalExecutionStatus; /* extern function declarations */ -extern uint64 ExecuteLocalTaskList(List *taskList, - Tuplestorestate *tupleStoreState); +extern uint64 ExecuteLocalTaskList(List *taskList, TupleDestination *defaultTupleDest); extern uint64 ExecuteLocalUtilityTaskList(List *utilityTaskList); extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, DistributedPlan *distributedPlan, - Tuplestorestate *tupleStoreState, + TupleDestination *defaultTupleDest, bool isUtilityCommand); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, List **localTaskList, List **remoteTaskList); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 78faf41c3..a2af0a480 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -262,6 +262,8 @@ typedef struct TaskQuery }data; }TaskQuery; +typedef struct TupleDestination TupleDestination; + typedef struct Task { CitusNode type; @@ -275,6 +277,12 @@ typedef struct Task */ TaskQuery taskQuery; + /* + * A task can have multiple queries, in which case queryCount will be > 1. If + * a task has more one query, then taskQuery->queryType == TASK_QUERY_TEXT_LIST. + */ + int queryCount; + Oid anchorDistributedTableId; /* only applies to insert tasks */ uint64 anchorShardId; /* only applies to compute tasks */ List *taskPlacementList; /* only applies to compute tasks */ @@ -323,6 +331,12 @@ typedef struct Task * query. */ bool parametersInQueryStringResolved; + + /* + * Destination of tuples generated as a result of executing this task. Can be + * NULL, in which case executor might use a default destination. + */ + TupleDestination *tupleDest; } Task; diff --git a/src/include/distributed/tuple_destination.h b/src/include/distributed/tuple_destination.h new file mode 100644 index 000000000..5fd9cd859 --- /dev/null +++ b/src/include/distributed/tuple_destination.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * tuple_destination.h + * Tuple destination generic struct. + * + * Copyright (c) Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#ifndef TUPLE_DESTINATION_H +#define TUPLE_DESTINATION_H + +#include "access/tupdesc.h" +#include "distributed/multi_physical_planner.h" +#include "tcop/dest.h" +#include "utils/tuplestore.h" + +typedef struct TupleDestination TupleDestination; + +/* + * TupleDestination provides a generic interface for where to send tuples. + * + * Users of the executor can set task->tupleDest for custom processing of + * the result tuples. + * + * Since a task can have multiple queries, methods of TupleDestination also + * accept a queryNumber parameter which denotes the index of the query that + * tuple belongs to. + */ +typedef struct TupleDestination +{ + /* putTuple implements custom processing of a tuple */ + void (*putTuple)(TupleDestination *self, Task *task, + int placementIndex, int queryNumber, + HeapTuple tuple); + + /* tupleDescForQuery returns tuple descriptor for a query number. Can return NULL. */ + TupleDesc (*tupleDescForQuery)(TupleDestination *self, int queryNumber); +} TupleDestination; + +extern TupleDestination * CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc + tupleDescriptor); +extern TupleDestination * CreateTupleDestNone(void); +extern DestReceiver * CreateTupleDestDestReceiver(TupleDestination *tupleDest, + Task *task, int placementIndex); + +#endif