Implement TupleDestination intereface.

Implements a new `TupleDestination` interface to allow custom tuple processing per task.

This can be specially useful if a task contains multiple queries. An example of this EXPLAIN
ANALYZE, where it needs to add some UDF calls to the query to fetch the explain output
from worker after fetching the actual query results.
pull/3871/head
Hadi Moshayedi 2020-06-04 11:35:57 -07:00
parent d0f47eb338
commit 0bfd39ea52
8 changed files with 443 additions and 85 deletions

View File

@ -155,6 +155,7 @@
#include "distributed/resource_lock.h"
#include "distributed/subplan_execution.h"
#include "distributed/transaction_management.h"
#include "distributed/tuple_destination.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#include "lib/ilist.h"
@ -193,14 +194,15 @@ typedef struct DistributedExecution
*/
bool expectResults;
/*
* If a task specific destination is not provided for a task, then use
* defaultTupleDest.
*/
TupleDestination *defaultTupleDest;
/* Parameters for parameterized plans. Can be NULL. */
ParamListInfo paramListInfo;
/* Tuple descriptor and destination for result. Can be NULL. */
TupleDesc tupleDescriptor;
Tuplestorestate *tupleStore;
/* list of workers involved in the execution */
List *workerList;
@ -284,7 +286,7 @@ typedef struct DistributedExecution
* contexts. The benefit of keeping it here is to avoid allocating the array
* over and over again.
*/
AttInMetadata *attributeInputMetadata;
uint32 allocatedColumnCount;
char **columnArray;
/*
@ -478,6 +480,9 @@ typedef struct ShardCommandExecution
/* description of the task */
Task *task;
/* cached AttInMetadata for task */
AttInMetadata **attributeInputMetadata;
/* order in which the command should be replicated on replicas */
PlacementExecutionOrder executionOrder;
@ -525,6 +530,12 @@ typedef struct TaskPlacementExecution
/* state of the execution of the command on the placement */
TaskPlacementExecutionState executionState;
/*
* Task query can contain multiple queries. queryIndex tracks results of
* which query we are waiting for.
*/
uint32 queryIndex;
/* worker pool on which the placement needs to be executed */
WorkerPool *workerPool;
@ -553,9 +564,9 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel
List *taskList,
bool expectResults,
ParamListInfo paramListInfo,
TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore,
int targetPoolSize,
TupleDestination *
defaultTupleDest,
TransactionProperties *
xactProperties,
List *jobIdList);
@ -665,7 +676,6 @@ AdaptiveExecutor(CitusScanState *scanState)
DistributedPlan *distributedPlan = scanState->distributedPlan;
EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info;
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
bool randomAccess = true;
bool interTransactions = false;
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
@ -677,6 +687,13 @@ AdaptiveExecutor(CitusScanState *scanState)
/* we should only call this once before the scan finished */
Assert(!scanState->finishedRemoteScan);
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
TupleDestination *defaultTupleDest =
CreateTupleStoreTupleDest(scanState->tuplestorestate, tupleDescriptor);
bool hasDependentJobs = HasDependentJobs(job);
if (hasDependentJobs)
{
@ -689,9 +706,6 @@ AdaptiveExecutor(CitusScanState *scanState)
targetPoolSize = 1;
}
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList(
distributedPlan->modLevel, taskList,
hasDependentJobs);
@ -702,9 +716,8 @@ AdaptiveExecutor(CitusScanState *scanState)
taskList,
distributedPlan->expectResults,
paramListInfo,
tupleDescriptor,
scanState->tuplestorestate,
targetPoolSize,
defaultTupleDest,
&xactProperties,
jobIdList);
@ -794,7 +807,7 @@ RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList,
estate->es_param_list_info,
scanState->distributedPlan,
scanState->tuplestorestate,
execution->defaultTupleDest,
isUtilityCommand);
/*
@ -921,6 +934,17 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
List *localTaskList = NIL;
List *remoteTaskList = NIL;
TupleDestination *defaultTupleDest = NULL;
if (executionParams->tupleDescriptor != NULL)
{
defaultTupleDest = CreateTupleStoreTupleDest(executionParams->tupleStore,
executionParams->tupleDescriptor);
}
else
{
defaultTupleDest = CreateTupleDestNone();
}
if (executionParams->localExecutionSupported && ShouldExecuteTasksLocally(
executionParams->taskList))
{
@ -952,8 +976,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
}
else
{
locallyProcessedRows += ExecuteLocalTaskList(localTaskList,
executionParams->tupleStore);
locallyProcessedRows += ExecuteLocalTaskList(localTaskList, defaultTupleDest);
}
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
@ -965,8 +988,8 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
CreateDistributedExecution(
executionParams->modLevel, remoteTaskList,
executionParams->expectResults, paramListInfo,
executionParams->tupleDescriptor, executionParams->tupleStore,
executionParams->targetPoolSize, &executionParams->xactProperties,
executionParams->targetPoolSize, defaultTupleDest,
&executionParams->xactProperties,
executionParams->jobIdList);
StartDistributedExecution(execution);
@ -1009,10 +1032,10 @@ CreateBasicExecutionParams(RowModifyLevel modLevel,
*/
static DistributedExecution *
CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
bool expectResults,
ParamListInfo paramListInfo, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, int targetPoolSize,
TransactionProperties *xactProperties, List *jobIdList)
bool expectResults, ParamListInfo paramListInfo,
int targetPoolSize, TupleDestination *defaultTupleDest,
TransactionProperties *xactProperties,
List *jobIdList)
{
DistributedExecution *execution =
(DistributedExecution *) palloc0(sizeof(DistributedExecution));
@ -1028,12 +1051,10 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
execution->executionStats =
(DistributedExecutionStats *) palloc0(sizeof(DistributedExecutionStats));
execution->paramListInfo = paramListInfo;
execution->tupleDescriptor = tupleDescriptor;
execution->tupleStore = tupleStore;
execution->workerList = NIL;
execution->sessionList = NIL;
execution->targetPoolSize = targetPoolSize;
execution->defaultTupleDest = defaultTupleDest;
execution->totalTaskCount = list_length(taskList);
execution->unfinishedTaskCount = list_length(taskList);
@ -1046,18 +1067,12 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
execution->jobIdList = jobIdList;
/* allocate execution specific data once, on the ExecutorState memory context */
if (tupleDescriptor != NULL)
{
execution->attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
execution->columnArray =
(char **) palloc0(tupleDescriptor->natts * sizeof(char *));
}
else
{
execution->attributeInputMetadata = NULL;
execution->columnArray = NULL;
}
/*
* Since task can have multiple queries, we are not sure how many columns we should
* allocate for. We start with 16, and reallocate when we need more.
*/
execution->allocatedColumnCount = 16;
execution->columnArray = palloc0(execution->allocatedColumnCount * sizeof(char *));
if (ShouldExecuteTasksLocally(taskList))
{
@ -1709,6 +1724,23 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
sizeof(TaskPlacementExecution *));
shardCommandExecution->placementExecutionCount = placementExecutionCount;
TupleDestination *tupleDest = task->tupleDest ?
task->tupleDest :
execution->defaultTupleDest;
uint32 queryCount = task->queryCount;
shardCommandExecution->attributeInputMetadata = palloc0(queryCount *
sizeof(AttInMetadata *));
for (uint32 queryIndex = 0; queryIndex < queryCount; queryIndex++)
{
TupleDesc tupleDescriptor = tupleDest->tupleDescForQuery(tupleDest,
queryIndex);
shardCommandExecution->attributeInputMetadata[queryIndex] =
tupleDescriptor ?
TupleDescGetAttInMetadata(tupleDescriptor) :
NULL;
}
shardCommandExecution->expectResults =
expectResults && !task->partiallyLocalOrRemote;
@ -1731,6 +1763,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
placementExecution->shardPlacement = taskPlacement;
placementExecution->workerPool = workerPool;
placementExecution->placementExecutionIndex = placementExecutionIndex;
placementExecution->queryIndex = 0;
if (placementExecutionReady)
{
@ -3412,16 +3445,13 @@ ReceiveResults(WorkerSession *session, bool storeRows)
WorkerPool *workerPool = session->workerPool;
DistributedExecution *execution = workerPool->distributedExecution;
DistributedExecutionStats *executionStats = execution->executionStats;
TupleDesc tupleDescriptor = execution->tupleDescriptor;
AttInMetadata *attributeInputMetadata = execution->attributeInputMetadata;
uint32 expectedColumnCount = 0;
char **columnArray = execution->columnArray;
Tuplestorestate *tupleStore = execution->tupleStore;
if (tupleDescriptor != NULL)
{
expectedColumnCount = tupleDescriptor->natts;
}
TaskPlacementExecution *placementExecution = session->currentTask;
ShardCommandExecution *shardCommandExecution =
placementExecution->shardCommandExecution;
Task *task = placementExecution->shardCommandExecution->task;
TupleDestination *tupleDest = task->tupleDest ?
task->tupleDest :
execution->defaultTupleDest;
/*
* We use this context while converting each row fetched from remote node
@ -3452,8 +3482,6 @@ ReceiveResults(WorkerSession *session, bool storeRows)
{
char *currentAffectedTupleString = PQcmdTuples(result);
int64 currentAffectedTupleCount = 0;
ShardCommandExecution *shardCommandExecution =
session->currentTask->shardCommandExecution;
/* if there are multiple replicas, make sure to consider only one */
if (!shardCommandExecution->gotResults && *currentAffectedTupleString != '\0')
@ -3466,9 +3494,9 @@ ReceiveResults(WorkerSession *session, bool storeRows)
PQclear(result);
/* no more results, break out of loop and free allocated memory */
fetchDone = true;
break;
/* task query might contain multiple queries, so fetch until we reach NULL */
placementExecution->queryIndex++;
continue;
}
else if (resultStatus == PGRES_TUPLES_OK)
{
@ -3479,8 +3507,9 @@ ReceiveResults(WorkerSession *session, bool storeRows)
Assert(PQntuples(result) == 0);
PQclear(result);
fetchDone = true;
break;
/* task query might contain multiple queries, so fetch until we reach NULL */
placementExecution->queryIndex++;
continue;
}
else if (resultStatus != PGRES_SINGLE_TUPLE)
{
@ -3497,8 +3526,16 @@ ReceiveResults(WorkerSession *session, bool storeRows)
continue;
}
uint32 queryIndex = placementExecution->queryIndex;
TupleDesc tupleDescriptor = tupleDest->tupleDescForQuery(tupleDest, queryIndex);
if (tupleDescriptor == NULL)
{
continue;
}
rowsProcessed = PQntuples(result);
uint32 columnCount = PQnfields(result);
uint32 expectedColumnCount = tupleDescriptor->natts;
if (columnCount != expectedColumnCount)
{
@ -3507,6 +3544,16 @@ ReceiveResults(WorkerSession *session, bool storeRows)
columnCount, expectedColumnCount)));
}
if (columnCount > execution->allocatedColumnCount)
{
pfree(execution->columnArray);
execution->allocatedColumnCount = columnCount;
execution->columnArray = palloc0(execution->allocatedColumnCount *
sizeof(char *));
}
char **columnArray = execution->columnArray;
for (uint32 rowIndex = 0; rowIndex < rowsProcessed; rowIndex++)
{
memset(columnArray, 0, columnCount * sizeof(char *));
@ -3536,12 +3583,16 @@ ReceiveResults(WorkerSession *session, bool storeRows)
*/
MemoryContext oldContextPerRow = MemoryContextSwitchTo(ioContext);
HeapTuple heapTuple = BuildTupleFromCStrings(attributeInputMetadata,
columnArray);
AttInMetadata *attInMetadata =
shardCommandExecution->attributeInputMetadata[queryIndex];
HeapTuple heapTuple = BuildTupleFromCStrings(attInMetadata, columnArray);
MemoryContextSwitchTo(oldContextPerRow);
tuplestore_puttuple(tupleStore, heapTuple);
tupleDest->putTuple(tupleDest, task,
placementExecution->placementExecutionIndex, queryIndex,
heapTuple);
MemoryContextReset(ioContext);
execution->rowsProcessed++;

View File

@ -117,11 +117,12 @@ static void SplitLocalAndRemotePlacements(List *taskPlacementList,
List **localTaskPlacementList,
List **remoteTaskPlacementList);
static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
Tuplestorestate *tupleStoreState, ParamListInfo
paramListInfo);
TupleDestination *tupleDest, Task *task,
ParamListInfo paramListInfo);
static void LogLocalCommand(Task *task);
static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings,
Tuplestorestate *tupleStoreState);
TupleDestination *tupleDest,
Task *task);
static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
@ -139,7 +140,7 @@ static void EnsureTransitionPossible(LocalExecutionStatus from,
* The function returns totalRowsProcessed.
*/
uint64
ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState)
ExecuteLocalTaskList(List *taskList, TupleDestination *defaultTupleDest)
{
if (list_length(taskList) == 0)
{
@ -149,7 +150,7 @@ ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState)
ParamListInfo paramListInfo = NULL;
bool isUtilityCommand = false;
return ExecuteLocalTaskListExtended(taskList, paramListInfo, distributedPlan,
tupleStoreState, isUtilityCommand);
defaultTupleDest, isUtilityCommand);
}
@ -167,10 +168,10 @@ ExecuteLocalUtilityTaskList(List *utilityTaskList)
}
DistributedPlan *distributedPlan = NULL;
ParamListInfo paramListInfo = NULL;
Tuplestorestate *tupleStoreState = NULL;
TupleDestination *defaultTupleDest = CreateTupleDestNone();
bool isUtilityCommand = true;
return ExecuteLocalTaskListExtended(utilityTaskList, paramListInfo, distributedPlan,
tupleStoreState, isUtilityCommand);
defaultTupleDest, isUtilityCommand);
}
@ -188,7 +189,7 @@ uint64
ExecuteLocalTaskListExtended(List *taskList,
ParamListInfo orig_paramListInfo,
DistributedPlan *distributedPlan,
Tuplestorestate *tupleStoreState,
TupleDestination *defaultTupleDest,
bool isUtilityCommand)
{
ParamListInfo paramListInfo = copyParamList(orig_paramListInfo);
@ -207,14 +208,13 @@ ExecuteLocalTaskListExtended(List *taskList,
numParams = paramListInfo->numParams;
}
if (tupleStoreState == NULL)
{
tupleStoreState = tuplestore_begin_heap(true, false, work_mem);
}
Task *task = NULL;
foreach_ptr(task, taskList)
{
TupleDestination *tupleDest = task->tupleDest ?
task->tupleDest :
defaultTupleDest;
/*
* If we have a valid shard id, a distributed table will be accessed
* during execution. Record it to apply the restrictions related to
@ -278,7 +278,8 @@ ExecuteLocalTaskListExtended(List *taskList,
List *queryStringList = task->taskQuery.data.queryStringList;
totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries(
queryStringList,
tupleStoreState);
tupleDest,
task);
continue;
}
@ -312,8 +313,8 @@ ExecuteLocalTaskListExtended(List *taskList,
}
totalRowsProcessed +=
ExecuteLocalTaskPlan(localPlan, shardQueryString, tupleStoreState,
paramListInfo);
ExecuteLocalTaskPlan(localPlan, shardQueryString,
tupleDest, task, paramListInfo);
}
return totalRowsProcessed;
@ -325,7 +326,8 @@ ExecuteLocalTaskListExtended(List *taskList,
* one by one.
*/
static uint64
LocallyPlanAndExecuteMultipleQueries(List *queryStrings, Tuplestorestate *tupleStoreState)
LocallyPlanAndExecuteMultipleQueries(List *queryStrings, TupleDestination *tupleDest,
Task *task)
{
char *queryString = NULL;
uint64 totalProcessedRows = 0;
@ -338,7 +340,7 @@ LocallyPlanAndExecuteMultipleQueries(List *queryStrings, Tuplestorestate *tupleS
ParamListInfo paramListInfo = NULL;
PlannedStmt *localPlan = planner(shardQuery, cursorOptions, paramListInfo);
totalProcessedRows += ExecuteLocalTaskPlan(localPlan, queryString,
tupleStoreState,
tupleDest, task,
paramListInfo);
}
return totalProcessedRows;
@ -538,9 +540,9 @@ SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacement
*/
static uint64
ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
Tuplestorestate *tupleStoreState, ParamListInfo paramListInfo)
TupleDestination *tupleDest, Task *task,
ParamListInfo paramListInfo)
{
DestReceiver *tupleStoreDestReceiver = CreateDestReceiver(DestTuplestore);
ScanDirection scanDirection = ForwardScanDirection;
QueryEnvironment *queryEnv = create_queryEnv();
int eflags = 0;
@ -550,14 +552,15 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
* Use the tupleStore provided by the scanState because it is shared accross
* the other task executions and the adaptive executor.
*/
SetTuplestoreDestReceiverParams(tupleStoreDestReceiver,
tupleStoreState,
CurrentMemoryContext, false);
DestReceiver *destReceiver = tupleDest ?
CreateTupleDestDestReceiver(tupleDest, task,
LOCAL_PLACEMENT_INDEX) :
CreateDestReceiver(DestNone);
/* Create a QueryDesc for the query */
QueryDesc *queryDesc = CreateQueryDesc(taskPlan, queryString,
GetActiveSnapshot(), InvalidSnapshot,
tupleStoreDestReceiver, paramListInfo,
destReceiver, paramListInfo,
queryEnv, 0);
ExecutorStart(queryDesc, eflags);

View File

@ -0,0 +1,230 @@
#include "postgres.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "pgstat.h"
#include <sys/stat.h>
#include <unistd.h>
#include "distributed/tuple_destination.h"
/*
* TupleStoreTupleDestination is internal representation of a TupleDestination
* which forwards tuples to a tuple store.
*/
typedef struct TupleStoreTupleDestination
{
TupleDestination pub;
/* destination of tuples */
Tuplestorestate *tupleStore;
/* how does tuples look like? */
TupleDesc tupleDesc;
} TupleStoreTupleDestination;
/*
* TupleDestDestReceiver is internal representation of a DestReceiver which
* forards tuples to a tuple destination.
*/
typedef struct TupleDestDestReceiver
{
DestReceiver pub;
TupleDestination *tupleDest;
/* parameters to pass to tupleDest->putTuple() */
Task *task;
int placementIndex;
} TupleDestDestReceiver;
/* forward declarations for local functions */
static void TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple);
static TupleDesc TupleStoreTupleDestTupleDescForQuery(TupleDestination *self, int
queryNumber);
static void TupleDestNonePutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple);
static TupleDesc TupleDestNoneTupleDescForQuery(TupleDestination *self, int queryNumber);
static void TupleDestDestReceiverStartup(DestReceiver *copyDest, int operation,
TupleDesc inputTupleDesc);
static bool TupleDestDestReceiverReceive(TupleTableSlot *slot,
DestReceiver *copyDest);
static void TupleDestDestReceiverShutdown(DestReceiver *destReceiver);
static void TupleDestDestReceiverDestroy(DestReceiver *destReceiver);
/*
* CreateTupleStoreTupleDest creates a TupleDestination which forwards tuples to
* a tupleStore.
*/
TupleDestination *
CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
{
TupleStoreTupleDestination *tupleStoreTupleDest = palloc0(
sizeof(TupleStoreTupleDestination));
tupleStoreTupleDest->tupleStore = tupleStore;
tupleStoreTupleDest->tupleDesc = tupleDescriptor;
tupleStoreTupleDest->pub.putTuple = TupleStoreTupleDestPutTuple;
tupleStoreTupleDest->pub.tupleDescForQuery =
TupleStoreTupleDestTupleDescForQuery;
return (TupleDestination *) tupleStoreTupleDest;
}
/*
* TupleStoreTupleDestPutTuple implements TupleDestination->putTuple for
* TupleStoreTupleDestination.
*/
static void
TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple)
{
TupleStoreTupleDestination *tupleDest = (TupleStoreTupleDestination *) self;
tuplestore_puttuple(tupleDest->tupleStore, heapTuple);
}
/*
* TupleStoreTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery
* for TupleStoreTupleDestination.
*/
static TupleDesc
TupleStoreTupleDestTupleDescForQuery(TupleDestination *self, int queryNumber)
{
Assert(queryNumber == 0);
TupleStoreTupleDestination *tupleDest = (TupleStoreTupleDestination *) self;
return tupleDest->tupleDesc;
}
/*
* CreateTupleDestNone creates a tuple destination which ignores the tuples.
*/
TupleDestination *
CreateTupleDestNone(void)
{
TupleDestination *tupleDest = palloc0(
sizeof(TupleDestination));
tupleDest->putTuple = TupleDestNonePutTuple;
tupleDest->tupleDescForQuery = TupleDestNoneTupleDescForQuery;
return (TupleDestination *) tupleDest;
}
/*
* TupleStoreTupleDestPutTuple implements TupleDestination->putTuple for
* no-op tuple destination.
*/
static void
TupleDestNonePutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple)
{
/* nothing to do */
}
/*
* TupleStoreTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery
* for no-op tuple destination.
*/
static TupleDesc
TupleDestNoneTupleDescForQuery(TupleDestination *self, int queryNumber)
{
return NULL;
}
/*
* CreateTupleDestDestReceiver creates a dest receiver which forwards tuples
* to a tuple destination.
*/
DestReceiver *
CreateTupleDestDestReceiver(TupleDestination *tupleDest, Task *task, int placementIndex)
{
TupleDestDestReceiver *destReceiver = palloc0(sizeof(TupleDestDestReceiver));
destReceiver->pub.rStartup = TupleDestDestReceiverStartup;
destReceiver->pub.receiveSlot = TupleDestDestReceiverReceive;
destReceiver->pub.rShutdown = TupleDestDestReceiverShutdown;
destReceiver->pub.rDestroy = TupleDestDestReceiverDestroy;
destReceiver->tupleDest = tupleDest;
destReceiver->task = task;
destReceiver->placementIndex = placementIndex;
return (DestReceiver *) destReceiver;
}
/*
* TupleDestDestReceiverStartup implements DestReceiver->rStartup for
* TupleDestDestReceiver.
*/
static void
TupleDestDestReceiverStartup(DestReceiver *destReceiver, int operation,
TupleDesc inputTupleDesc)
{
/* nothing to do */
}
/*
* TupleDestDestReceiverStartup implements DestReceiver->receiveSlot for
* TupleDestDestReceiver.
*/
static bool
TupleDestDestReceiverReceive(TupleTableSlot *slot,
DestReceiver *destReceiver)
{
TupleDestDestReceiver *tupleDestReceiver = (TupleDestDestReceiver *) destReceiver;
TupleDestination *tupleDest = tupleDestReceiver->tupleDest;
Task *task = tupleDestReceiver->task;
int placementIndex = tupleDestReceiver->placementIndex;
/*
* DestReceiver doesn't support multiple result sets with different shapes.
*/
Assert(task->queryCount == 1);
int queryNumber = 0;
#if PG_VERSION_NUM >= PG_VERSION_12
HeapTuple heapTuple = ExecFetchSlotHeapTuple(slot, true, NULL);
#else
HeapTuple heapTuple = ExecFetchSlotTuple(slot);
#endif
tupleDest->putTuple(tupleDest, task, placementIndex, queryNumber, heapTuple);
return true;
}
/*
* TupleDestDestReceiverStartup implements DestReceiver->rShutdown for
* TupleDestDestReceiver.
*/
static void
TupleDestDestReceiverShutdown(DestReceiver *destReceiver)
{
/* nothing to do */
}
/*
* TupleDestDestReceiverStartup implements DestReceiver->rDestroy for
* TupleDestDestReceiver.
*/
static void
TupleDestDestReceiverDestroy(DestReceiver *destReceiver)
{
/* nothing to do */
}

View File

@ -428,6 +428,7 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query)
{
task->taskQuery.queryType = TASK_QUERY_OBJECT;
task->taskQuery.data.jobQueryReferenceForLazyDeparsing = query;
task->queryCount = 1;
return;
}
@ -446,11 +447,13 @@ SetTaskQueryString(Task *task, char *queryString)
if (queryString == NULL)
{
task->taskQuery.queryType = TASK_QUERY_NULL;
task->queryCount = 0;
}
else
{
task->taskQuery.queryType = TASK_QUERY_TEXT;
task->taskQuery.data.queryStringLazy = queryString;
task->queryCount = 1;
}
}
@ -464,6 +467,7 @@ SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList)
Assert(perPlacementQueryStringList != NIL);
task->taskQuery.queryType = TASK_QUERY_TEXT_PER_PLACEMENT;
task->taskQuery.data.perPlacementQueryStrings = perPlacementQueryStringList;
task->queryCount = 1;
}
@ -476,6 +480,7 @@ SetTaskQueryStringList(Task *task, List *queryStringList)
Assert(queryStringList != NIL);
task->taskQuery.queryType = TASK_QUERY_TEXT_LIST;
task->taskQuery.data.queryStringList = queryStringList;
task->queryCount = list_length(queryStringList);
}

View File

@ -327,6 +327,8 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_NODE_FIELD(rowValuesLists);
COPY_SCALAR_FIELD(partiallyLocalOrRemote);
COPY_SCALAR_FIELD(parametersInQueryStringResolved);
COPY_SCALAR_FIELD(tupleDest);
COPY_SCALAR_FIELD(queryCount);
}

View File

@ -12,6 +12,13 @@
#define LOCAL_EXECUTION_H
#include "distributed/citus_custom_scan.h"
#include "distributed/tuple_destination.h"
/*
* Used as TupleDestination->putTuple's placementIndex when executing
* local tasks.
*/
#define LOCAL_PLACEMENT_INDEX -1
/* enabled with GUCs*/
extern bool EnableLocalExecution;
@ -27,13 +34,12 @@ typedef enum LocalExecutionStatus
extern enum LocalExecutionStatus CurrentLocalExecutionStatus;
/* extern function declarations */
extern uint64 ExecuteLocalTaskList(List *taskList,
Tuplestorestate *tupleStoreState);
extern uint64 ExecuteLocalTaskList(List *taskList, TupleDestination *defaultTupleDest);
extern uint64 ExecuteLocalUtilityTaskList(List *utilityTaskList);
extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo
orig_paramListInfo,
DistributedPlan *distributedPlan,
Tuplestorestate *tupleStoreState,
TupleDestination *defaultTupleDest,
bool isUtilityCommand);
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
List **localTaskList, List **remoteTaskList);

View File

@ -262,6 +262,8 @@ typedef struct TaskQuery
}data;
}TaskQuery;
typedef struct TupleDestination TupleDestination;
typedef struct Task
{
CitusNode type;
@ -275,6 +277,12 @@ typedef struct Task
*/
TaskQuery taskQuery;
/*
* A task can have multiple queries, in which case queryCount will be > 1. If
* a task has more one query, then taskQuery->queryType == TASK_QUERY_TEXT_LIST.
*/
int queryCount;
Oid anchorDistributedTableId; /* only applies to insert tasks */
uint64 anchorShardId; /* only applies to compute tasks */
List *taskPlacementList; /* only applies to compute tasks */
@ -323,6 +331,12 @@ typedef struct Task
* query.
*/
bool parametersInQueryStringResolved;
/*
* Destination of tuples generated as a result of executing this task. Can be
* NULL, in which case executor might use a default destination.
*/
TupleDestination *tupleDest;
} Task;

View File

@ -0,0 +1,47 @@
/*-------------------------------------------------------------------------
*
* tuple_destination.h
* Tuple destination generic struct.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef TUPLE_DESTINATION_H
#define TUPLE_DESTINATION_H
#include "access/tupdesc.h"
#include "distributed/multi_physical_planner.h"
#include "tcop/dest.h"
#include "utils/tuplestore.h"
typedef struct TupleDestination TupleDestination;
/*
* TupleDestination provides a generic interface for where to send tuples.
*
* Users of the executor can set task->tupleDest for custom processing of
* the result tuples.
*
* Since a task can have multiple queries, methods of TupleDestination also
* accept a queryNumber parameter which denotes the index of the query that
* tuple belongs to.
*/
typedef struct TupleDestination
{
/* putTuple implements custom processing of a tuple */
void (*putTuple)(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple tuple);
/* tupleDescForQuery returns tuple descriptor for a query number. Can return NULL. */
TupleDesc (*tupleDescForQuery)(TupleDestination *self, int queryNumber);
} TupleDestination;
extern TupleDestination * CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc
tupleDescriptor);
extern TupleDestination * CreateTupleDestNone(void);
extern DestReceiver * CreateTupleDestDestReceiver(TupleDestination *tupleDest,
Task *task, int placementIndex);
#endif