Merge pull request #3871 from citusdata/tupledest

Implement TupleDestination to allow custom processing of task results.
pull/3882/head
Hadi Moshayedi 2020-06-06 10:50:02 -07:00 committed by GitHub
commit 797405f3d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 443 additions and 85 deletions

View File

@ -155,6 +155,7 @@
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/subplan_execution.h" #include "distributed/subplan_execution.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/tuple_destination.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "lib/ilist.h" #include "lib/ilist.h"
@ -193,14 +194,15 @@ typedef struct DistributedExecution
*/ */
bool expectResults; bool expectResults;
/*
* If a task specific destination is not provided for a task, then use
* defaultTupleDest.
*/
TupleDestination *defaultTupleDest;
/* Parameters for parameterized plans. Can be NULL. */ /* Parameters for parameterized plans. Can be NULL. */
ParamListInfo paramListInfo; ParamListInfo paramListInfo;
/* Tuple descriptor and destination for result. Can be NULL. */
TupleDesc tupleDescriptor;
Tuplestorestate *tupleStore;
/* list of workers involved in the execution */ /* list of workers involved in the execution */
List *workerList; List *workerList;
@ -284,7 +286,7 @@ typedef struct DistributedExecution
* contexts. The benefit of keeping it here is to avoid allocating the array * contexts. The benefit of keeping it here is to avoid allocating the array
* over and over again. * over and over again.
*/ */
AttInMetadata *attributeInputMetadata; uint32 allocatedColumnCount;
char **columnArray; char **columnArray;
/* /*
@ -478,6 +480,9 @@ typedef struct ShardCommandExecution
/* description of the task */ /* description of the task */
Task *task; Task *task;
/* cached AttInMetadata for task */
AttInMetadata **attributeInputMetadata;
/* order in which the command should be replicated on replicas */ /* order in which the command should be replicated on replicas */
PlacementExecutionOrder executionOrder; PlacementExecutionOrder executionOrder;
@ -525,6 +530,12 @@ typedef struct TaskPlacementExecution
/* state of the execution of the command on the placement */ /* state of the execution of the command on the placement */
TaskPlacementExecutionState executionState; TaskPlacementExecutionState executionState;
/*
* Task query can contain multiple queries. queryIndex tracks results of
* which query we are waiting for.
*/
uint32 queryIndex;
/* worker pool on which the placement needs to be executed */ /* worker pool on which the placement needs to be executed */
WorkerPool *workerPool; WorkerPool *workerPool;
@ -553,9 +564,9 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel
List *taskList, List *taskList,
bool expectResults, bool expectResults,
ParamListInfo paramListInfo, ParamListInfo paramListInfo,
TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore,
int targetPoolSize, int targetPoolSize,
TupleDestination *
defaultTupleDest,
TransactionProperties * TransactionProperties *
xactProperties, xactProperties,
List *jobIdList); List *jobIdList);
@ -665,7 +676,6 @@ AdaptiveExecutor(CitusScanState *scanState)
DistributedPlan *distributedPlan = scanState->distributedPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
EState *executorState = ScanStateGetExecutorState(scanState); EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info; ParamListInfo paramListInfo = executorState->es_param_list_info;
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
bool randomAccess = true; bool randomAccess = true;
bool interTransactions = false; bool interTransactions = false;
int targetPoolSize = MaxAdaptiveExecutorPoolSize; int targetPoolSize = MaxAdaptiveExecutorPoolSize;
@ -677,6 +687,13 @@ AdaptiveExecutor(CitusScanState *scanState)
/* we should only call this once before the scan finished */ /* we should only call this once before the scan finished */
Assert(!scanState->finishedRemoteScan); Assert(!scanState->finishedRemoteScan);
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
TupleDestination *defaultTupleDest =
CreateTupleStoreTupleDest(scanState->tuplestorestate, tupleDescriptor);
bool hasDependentJobs = HasDependentJobs(job); bool hasDependentJobs = HasDependentJobs(job);
if (hasDependentJobs) if (hasDependentJobs)
{ {
@ -689,9 +706,6 @@ AdaptiveExecutor(CitusScanState *scanState)
targetPoolSize = 1; targetPoolSize = 1;
} }
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList(
distributedPlan->modLevel, taskList, distributedPlan->modLevel, taskList,
hasDependentJobs); hasDependentJobs);
@ -702,9 +716,8 @@ AdaptiveExecutor(CitusScanState *scanState)
taskList, taskList,
distributedPlan->expectResults, distributedPlan->expectResults,
paramListInfo, paramListInfo,
tupleDescriptor,
scanState->tuplestorestate,
targetPoolSize, targetPoolSize,
defaultTupleDest,
&xactProperties, &xactProperties,
jobIdList); jobIdList);
@ -794,7 +807,7 @@ RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList, uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList,
estate->es_param_list_info, estate->es_param_list_info,
scanState->distributedPlan, scanState->distributedPlan,
scanState->tuplestorestate, execution->defaultTupleDest,
isUtilityCommand); isUtilityCommand);
/* /*
@ -921,6 +934,17 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
List *localTaskList = NIL; List *localTaskList = NIL;
List *remoteTaskList = NIL; List *remoteTaskList = NIL;
TupleDestination *defaultTupleDest = NULL;
if (executionParams->tupleDescriptor != NULL)
{
defaultTupleDest = CreateTupleStoreTupleDest(executionParams->tupleStore,
executionParams->tupleDescriptor);
}
else
{
defaultTupleDest = CreateTupleDestNone();
}
if (executionParams->localExecutionSupported && ShouldExecuteTasksLocally( if (executionParams->localExecutionSupported && ShouldExecuteTasksLocally(
executionParams->taskList)) executionParams->taskList))
{ {
@ -952,8 +976,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
} }
else else
{ {
locallyProcessedRows += ExecuteLocalTaskList(localTaskList, locallyProcessedRows += ExecuteLocalTaskList(localTaskList, defaultTupleDest);
executionParams->tupleStore);
} }
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
@ -965,8 +988,8 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
CreateDistributedExecution( CreateDistributedExecution(
executionParams->modLevel, remoteTaskList, executionParams->modLevel, remoteTaskList,
executionParams->expectResults, paramListInfo, executionParams->expectResults, paramListInfo,
executionParams->tupleDescriptor, executionParams->tupleStore, executionParams->targetPoolSize, defaultTupleDest,
executionParams->targetPoolSize, &executionParams->xactProperties, &executionParams->xactProperties,
executionParams->jobIdList); executionParams->jobIdList);
StartDistributedExecution(execution); StartDistributedExecution(execution);
@ -1009,10 +1032,10 @@ CreateBasicExecutionParams(RowModifyLevel modLevel,
*/ */
static DistributedExecution * static DistributedExecution *
CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
bool expectResults, bool expectResults, ParamListInfo paramListInfo,
ParamListInfo paramListInfo, TupleDesc tupleDescriptor, int targetPoolSize, TupleDestination *defaultTupleDest,
Tuplestorestate *tupleStore, int targetPoolSize, TransactionProperties *xactProperties,
TransactionProperties *xactProperties, List *jobIdList) List *jobIdList)
{ {
DistributedExecution *execution = DistributedExecution *execution =
(DistributedExecution *) palloc0(sizeof(DistributedExecution)); (DistributedExecution *) palloc0(sizeof(DistributedExecution));
@ -1028,12 +1051,10 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
execution->executionStats = execution->executionStats =
(DistributedExecutionStats *) palloc0(sizeof(DistributedExecutionStats)); (DistributedExecutionStats *) palloc0(sizeof(DistributedExecutionStats));
execution->paramListInfo = paramListInfo; execution->paramListInfo = paramListInfo;
execution->tupleDescriptor = tupleDescriptor;
execution->tupleStore = tupleStore;
execution->workerList = NIL; execution->workerList = NIL;
execution->sessionList = NIL; execution->sessionList = NIL;
execution->targetPoolSize = targetPoolSize; execution->targetPoolSize = targetPoolSize;
execution->defaultTupleDest = defaultTupleDest;
execution->totalTaskCount = list_length(taskList); execution->totalTaskCount = list_length(taskList);
execution->unfinishedTaskCount = list_length(taskList); execution->unfinishedTaskCount = list_length(taskList);
@ -1046,18 +1067,12 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
execution->jobIdList = jobIdList; execution->jobIdList = jobIdList;
/* allocate execution specific data once, on the ExecutorState memory context */ /*
if (tupleDescriptor != NULL) * Since task can have multiple queries, we are not sure how many columns we should
{ * allocate for. We start with 16, and reallocate when we need more.
execution->attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); */
execution->columnArray = execution->allocatedColumnCount = 16;
(char **) palloc0(tupleDescriptor->natts * sizeof(char *)); execution->columnArray = palloc0(execution->allocatedColumnCount * sizeof(char *));
}
else
{
execution->attributeInputMetadata = NULL;
execution->columnArray = NULL;
}
if (ShouldExecuteTasksLocally(taskList)) if (ShouldExecuteTasksLocally(taskList))
{ {
@ -1709,6 +1724,23 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
sizeof(TaskPlacementExecution *)); sizeof(TaskPlacementExecution *));
shardCommandExecution->placementExecutionCount = placementExecutionCount; shardCommandExecution->placementExecutionCount = placementExecutionCount;
TupleDestination *tupleDest = task->tupleDest ?
task->tupleDest :
execution->defaultTupleDest;
uint32 queryCount = task->queryCount;
shardCommandExecution->attributeInputMetadata = palloc0(queryCount *
sizeof(AttInMetadata *));
for (uint32 queryIndex = 0; queryIndex < queryCount; queryIndex++)
{
TupleDesc tupleDescriptor = tupleDest->tupleDescForQuery(tupleDest,
queryIndex);
shardCommandExecution->attributeInputMetadata[queryIndex] =
tupleDescriptor ?
TupleDescGetAttInMetadata(tupleDescriptor) :
NULL;
}
shardCommandExecution->expectResults = shardCommandExecution->expectResults =
expectResults && !task->partiallyLocalOrRemote; expectResults && !task->partiallyLocalOrRemote;
@ -1731,6 +1763,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
placementExecution->shardPlacement = taskPlacement; placementExecution->shardPlacement = taskPlacement;
placementExecution->workerPool = workerPool; placementExecution->workerPool = workerPool;
placementExecution->placementExecutionIndex = placementExecutionIndex; placementExecution->placementExecutionIndex = placementExecutionIndex;
placementExecution->queryIndex = 0;
if (placementExecutionReady) if (placementExecutionReady)
{ {
@ -3412,16 +3445,13 @@ ReceiveResults(WorkerSession *session, bool storeRows)
WorkerPool *workerPool = session->workerPool; WorkerPool *workerPool = session->workerPool;
DistributedExecution *execution = workerPool->distributedExecution; DistributedExecution *execution = workerPool->distributedExecution;
DistributedExecutionStats *executionStats = execution->executionStats; DistributedExecutionStats *executionStats = execution->executionStats;
TupleDesc tupleDescriptor = execution->tupleDescriptor; TaskPlacementExecution *placementExecution = session->currentTask;
AttInMetadata *attributeInputMetadata = execution->attributeInputMetadata; ShardCommandExecution *shardCommandExecution =
uint32 expectedColumnCount = 0; placementExecution->shardCommandExecution;
char **columnArray = execution->columnArray; Task *task = placementExecution->shardCommandExecution->task;
Tuplestorestate *tupleStore = execution->tupleStore; TupleDestination *tupleDest = task->tupleDest ?
task->tupleDest :
if (tupleDescriptor != NULL) execution->defaultTupleDest;
{
expectedColumnCount = tupleDescriptor->natts;
}
/* /*
* We use this context while converting each row fetched from remote node * We use this context while converting each row fetched from remote node
@ -3452,8 +3482,6 @@ ReceiveResults(WorkerSession *session, bool storeRows)
{ {
char *currentAffectedTupleString = PQcmdTuples(result); char *currentAffectedTupleString = PQcmdTuples(result);
int64 currentAffectedTupleCount = 0; int64 currentAffectedTupleCount = 0;
ShardCommandExecution *shardCommandExecution =
session->currentTask->shardCommandExecution;
/* if there are multiple replicas, make sure to consider only one */ /* if there are multiple replicas, make sure to consider only one */
if (!shardCommandExecution->gotResults && *currentAffectedTupleString != '\0') if (!shardCommandExecution->gotResults && *currentAffectedTupleString != '\0')
@ -3466,9 +3494,9 @@ ReceiveResults(WorkerSession *session, bool storeRows)
PQclear(result); PQclear(result);
/* no more results, break out of loop and free allocated memory */ /* task query might contain multiple queries, so fetch until we reach NULL */
fetchDone = true; placementExecution->queryIndex++;
break; continue;
} }
else if (resultStatus == PGRES_TUPLES_OK) else if (resultStatus == PGRES_TUPLES_OK)
{ {
@ -3479,8 +3507,9 @@ ReceiveResults(WorkerSession *session, bool storeRows)
Assert(PQntuples(result) == 0); Assert(PQntuples(result) == 0);
PQclear(result); PQclear(result);
fetchDone = true; /* task query might contain multiple queries, so fetch until we reach NULL */
break; placementExecution->queryIndex++;
continue;
} }
else if (resultStatus != PGRES_SINGLE_TUPLE) else if (resultStatus != PGRES_SINGLE_TUPLE)
{ {
@ -3497,8 +3526,16 @@ ReceiveResults(WorkerSession *session, bool storeRows)
continue; continue;
} }
uint32 queryIndex = placementExecution->queryIndex;
TupleDesc tupleDescriptor = tupleDest->tupleDescForQuery(tupleDest, queryIndex);
if (tupleDescriptor == NULL)
{
continue;
}
rowsProcessed = PQntuples(result); rowsProcessed = PQntuples(result);
uint32 columnCount = PQnfields(result); uint32 columnCount = PQnfields(result);
uint32 expectedColumnCount = tupleDescriptor->natts;
if (columnCount != expectedColumnCount) if (columnCount != expectedColumnCount)
{ {
@ -3507,6 +3544,16 @@ ReceiveResults(WorkerSession *session, bool storeRows)
columnCount, expectedColumnCount))); columnCount, expectedColumnCount)));
} }
if (columnCount > execution->allocatedColumnCount)
{
pfree(execution->columnArray);
execution->allocatedColumnCount = columnCount;
execution->columnArray = palloc0(execution->allocatedColumnCount *
sizeof(char *));
}
char **columnArray = execution->columnArray;
for (uint32 rowIndex = 0; rowIndex < rowsProcessed; rowIndex++) for (uint32 rowIndex = 0; rowIndex < rowsProcessed; rowIndex++)
{ {
memset(columnArray, 0, columnCount * sizeof(char *)); memset(columnArray, 0, columnCount * sizeof(char *));
@ -3536,12 +3583,16 @@ ReceiveResults(WorkerSession *session, bool storeRows)
*/ */
MemoryContext oldContextPerRow = MemoryContextSwitchTo(ioContext); MemoryContext oldContextPerRow = MemoryContextSwitchTo(ioContext);
HeapTuple heapTuple = BuildTupleFromCStrings(attributeInputMetadata, AttInMetadata *attInMetadata =
columnArray); shardCommandExecution->attributeInputMetadata[queryIndex];
HeapTuple heapTuple = BuildTupleFromCStrings(attInMetadata, columnArray);
MemoryContextSwitchTo(oldContextPerRow); MemoryContextSwitchTo(oldContextPerRow);
tuplestore_puttuple(tupleStore, heapTuple); tupleDest->putTuple(tupleDest, task,
placementExecution->placementExecutionIndex, queryIndex,
heapTuple);
MemoryContextReset(ioContext); MemoryContextReset(ioContext);
execution->rowsProcessed++; execution->rowsProcessed++;

View File

@ -117,11 +117,12 @@ static void SplitLocalAndRemotePlacements(List *taskPlacementList,
List **localTaskPlacementList, List **localTaskPlacementList,
List **remoteTaskPlacementList); List **remoteTaskPlacementList);
static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
Tuplestorestate *tupleStoreState, ParamListInfo TupleDestination *tupleDest, Task *task,
paramListInfo); ParamListInfo paramListInfo);
static void LogLocalCommand(Task *task); static void LogLocalCommand(Task *task);
static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings, static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings,
Tuplestorestate *tupleStoreState); TupleDestination *tupleDest,
Task *task);
static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
Oid **parameterTypes, Oid **parameterTypes,
const char ***parameterValues); const char ***parameterValues);
@ -139,7 +140,7 @@ static void EnsureTransitionPossible(LocalExecutionStatus from,
* The function returns totalRowsProcessed. * The function returns totalRowsProcessed.
*/ */
uint64 uint64
ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState) ExecuteLocalTaskList(List *taskList, TupleDestination *defaultTupleDest)
{ {
if (list_length(taskList) == 0) if (list_length(taskList) == 0)
{ {
@ -149,7 +150,7 @@ ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState)
ParamListInfo paramListInfo = NULL; ParamListInfo paramListInfo = NULL;
bool isUtilityCommand = false; bool isUtilityCommand = false;
return ExecuteLocalTaskListExtended(taskList, paramListInfo, distributedPlan, return ExecuteLocalTaskListExtended(taskList, paramListInfo, distributedPlan,
tupleStoreState, isUtilityCommand); defaultTupleDest, isUtilityCommand);
} }
@ -167,10 +168,10 @@ ExecuteLocalUtilityTaskList(List *utilityTaskList)
} }
DistributedPlan *distributedPlan = NULL; DistributedPlan *distributedPlan = NULL;
ParamListInfo paramListInfo = NULL; ParamListInfo paramListInfo = NULL;
Tuplestorestate *tupleStoreState = NULL; TupleDestination *defaultTupleDest = CreateTupleDestNone();
bool isUtilityCommand = true; bool isUtilityCommand = true;
return ExecuteLocalTaskListExtended(utilityTaskList, paramListInfo, distributedPlan, return ExecuteLocalTaskListExtended(utilityTaskList, paramListInfo, distributedPlan,
tupleStoreState, isUtilityCommand); defaultTupleDest, isUtilityCommand);
} }
@ -188,7 +189,7 @@ uint64
ExecuteLocalTaskListExtended(List *taskList, ExecuteLocalTaskListExtended(List *taskList,
ParamListInfo orig_paramListInfo, ParamListInfo orig_paramListInfo,
DistributedPlan *distributedPlan, DistributedPlan *distributedPlan,
Tuplestorestate *tupleStoreState, TupleDestination *defaultTupleDest,
bool isUtilityCommand) bool isUtilityCommand)
{ {
ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); ParamListInfo paramListInfo = copyParamList(orig_paramListInfo);
@ -207,14 +208,13 @@ ExecuteLocalTaskListExtended(List *taskList,
numParams = paramListInfo->numParams; numParams = paramListInfo->numParams;
} }
if (tupleStoreState == NULL)
{
tupleStoreState = tuplestore_begin_heap(true, false, work_mem);
}
Task *task = NULL; Task *task = NULL;
foreach_ptr(task, taskList) foreach_ptr(task, taskList)
{ {
TupleDestination *tupleDest = task->tupleDest ?
task->tupleDest :
defaultTupleDest;
/* /*
* If we have a valid shard id, a distributed table will be accessed * If we have a valid shard id, a distributed table will be accessed
* during execution. Record it to apply the restrictions related to * during execution. Record it to apply the restrictions related to
@ -278,7 +278,8 @@ ExecuteLocalTaskListExtended(List *taskList,
List *queryStringList = task->taskQuery.data.queryStringList; List *queryStringList = task->taskQuery.data.queryStringList;
totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries( totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries(
queryStringList, queryStringList,
tupleStoreState); tupleDest,
task);
continue; continue;
} }
@ -312,8 +313,8 @@ ExecuteLocalTaskListExtended(List *taskList,
} }
totalRowsProcessed += totalRowsProcessed +=
ExecuteLocalTaskPlan(localPlan, shardQueryString, tupleStoreState, ExecuteLocalTaskPlan(localPlan, shardQueryString,
paramListInfo); tupleDest, task, paramListInfo);
} }
return totalRowsProcessed; return totalRowsProcessed;
@ -325,7 +326,8 @@ ExecuteLocalTaskListExtended(List *taskList,
* one by one. * one by one.
*/ */
static uint64 static uint64
LocallyPlanAndExecuteMultipleQueries(List *queryStrings, Tuplestorestate *tupleStoreState) LocallyPlanAndExecuteMultipleQueries(List *queryStrings, TupleDestination *tupleDest,
Task *task)
{ {
char *queryString = NULL; char *queryString = NULL;
uint64 totalProcessedRows = 0; uint64 totalProcessedRows = 0;
@ -338,7 +340,7 @@ LocallyPlanAndExecuteMultipleQueries(List *queryStrings, Tuplestorestate *tupleS
ParamListInfo paramListInfo = NULL; ParamListInfo paramListInfo = NULL;
PlannedStmt *localPlan = planner(shardQuery, cursorOptions, paramListInfo); PlannedStmt *localPlan = planner(shardQuery, cursorOptions, paramListInfo);
totalProcessedRows += ExecuteLocalTaskPlan(localPlan, queryString, totalProcessedRows += ExecuteLocalTaskPlan(localPlan, queryString,
tupleStoreState, tupleDest, task,
paramListInfo); paramListInfo);
} }
return totalProcessedRows; return totalProcessedRows;
@ -538,9 +540,9 @@ SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacement
*/ */
static uint64 static uint64
ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
Tuplestorestate *tupleStoreState, ParamListInfo paramListInfo) TupleDestination *tupleDest, Task *task,
ParamListInfo paramListInfo)
{ {
DestReceiver *tupleStoreDestReceiver = CreateDestReceiver(DestTuplestore);
ScanDirection scanDirection = ForwardScanDirection; ScanDirection scanDirection = ForwardScanDirection;
QueryEnvironment *queryEnv = create_queryEnv(); QueryEnvironment *queryEnv = create_queryEnv();
int eflags = 0; int eflags = 0;
@ -550,14 +552,15 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
* Use the tupleStore provided by the scanState because it is shared accross * Use the tupleStore provided by the scanState because it is shared accross
* the other task executions and the adaptive executor. * the other task executions and the adaptive executor.
*/ */
SetTuplestoreDestReceiverParams(tupleStoreDestReceiver, DestReceiver *destReceiver = tupleDest ?
tupleStoreState, CreateTupleDestDestReceiver(tupleDest, task,
CurrentMemoryContext, false); LOCAL_PLACEMENT_INDEX) :
CreateDestReceiver(DestNone);
/* Create a QueryDesc for the query */ /* Create a QueryDesc for the query */
QueryDesc *queryDesc = CreateQueryDesc(taskPlan, queryString, QueryDesc *queryDesc = CreateQueryDesc(taskPlan, queryString,
GetActiveSnapshot(), InvalidSnapshot, GetActiveSnapshot(), InvalidSnapshot,
tupleStoreDestReceiver, paramListInfo, destReceiver, paramListInfo,
queryEnv, 0); queryEnv, 0);
ExecutorStart(queryDesc, eflags); ExecutorStart(queryDesc, eflags);

View File

@ -0,0 +1,230 @@
#include "postgres.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "pgstat.h"
#include <sys/stat.h>
#include <unistd.h>
#include "distributed/tuple_destination.h"
/*
* TupleStoreTupleDestination is internal representation of a TupleDestination
* which forwards tuples to a tuple store.
*/
typedef struct TupleStoreTupleDestination
{
TupleDestination pub;
/* destination of tuples */
Tuplestorestate *tupleStore;
/* how does tuples look like? */
TupleDesc tupleDesc;
} TupleStoreTupleDestination;
/*
* TupleDestDestReceiver is internal representation of a DestReceiver which
* forards tuples to a tuple destination.
*/
typedef struct TupleDestDestReceiver
{
DestReceiver pub;
TupleDestination *tupleDest;
/* parameters to pass to tupleDest->putTuple() */
Task *task;
int placementIndex;
} TupleDestDestReceiver;
/* forward declarations for local functions */
static void TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple);
static TupleDesc TupleStoreTupleDestTupleDescForQuery(TupleDestination *self, int
queryNumber);
static void TupleDestNonePutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple);
static TupleDesc TupleDestNoneTupleDescForQuery(TupleDestination *self, int queryNumber);
static void TupleDestDestReceiverStartup(DestReceiver *copyDest, int operation,
TupleDesc inputTupleDesc);
static bool TupleDestDestReceiverReceive(TupleTableSlot *slot,
DestReceiver *copyDest);
static void TupleDestDestReceiverShutdown(DestReceiver *destReceiver);
static void TupleDestDestReceiverDestroy(DestReceiver *destReceiver);
/*
* CreateTupleStoreTupleDest creates a TupleDestination which forwards tuples to
* a tupleStore.
*/
TupleDestination *
CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
{
TupleStoreTupleDestination *tupleStoreTupleDest = palloc0(
sizeof(TupleStoreTupleDestination));
tupleStoreTupleDest->tupleStore = tupleStore;
tupleStoreTupleDest->tupleDesc = tupleDescriptor;
tupleStoreTupleDest->pub.putTuple = TupleStoreTupleDestPutTuple;
tupleStoreTupleDest->pub.tupleDescForQuery =
TupleStoreTupleDestTupleDescForQuery;
return (TupleDestination *) tupleStoreTupleDest;
}
/*
* TupleStoreTupleDestPutTuple implements TupleDestination->putTuple for
* TupleStoreTupleDestination.
*/
static void
TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple)
{
TupleStoreTupleDestination *tupleDest = (TupleStoreTupleDestination *) self;
tuplestore_puttuple(tupleDest->tupleStore, heapTuple);
}
/*
* TupleStoreTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery
* for TupleStoreTupleDestination.
*/
static TupleDesc
TupleStoreTupleDestTupleDescForQuery(TupleDestination *self, int queryNumber)
{
Assert(queryNumber == 0);
TupleStoreTupleDestination *tupleDest = (TupleStoreTupleDestination *) self;
return tupleDest->tupleDesc;
}
/*
* CreateTupleDestNone creates a tuple destination which ignores the tuples.
*/
TupleDestination *
CreateTupleDestNone(void)
{
TupleDestination *tupleDest = palloc0(
sizeof(TupleDestination));
tupleDest->putTuple = TupleDestNonePutTuple;
tupleDest->tupleDescForQuery = TupleDestNoneTupleDescForQuery;
return (TupleDestination *) tupleDest;
}
/*
* TupleStoreTupleDestPutTuple implements TupleDestination->putTuple for
* no-op tuple destination.
*/
static void
TupleDestNonePutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple)
{
/* nothing to do */
}
/*
* TupleStoreTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery
* for no-op tuple destination.
*/
static TupleDesc
TupleDestNoneTupleDescForQuery(TupleDestination *self, int queryNumber)
{
return NULL;
}
/*
* CreateTupleDestDestReceiver creates a dest receiver which forwards tuples
* to a tuple destination.
*/
DestReceiver *
CreateTupleDestDestReceiver(TupleDestination *tupleDest, Task *task, int placementIndex)
{
TupleDestDestReceiver *destReceiver = palloc0(sizeof(TupleDestDestReceiver));
destReceiver->pub.rStartup = TupleDestDestReceiverStartup;
destReceiver->pub.receiveSlot = TupleDestDestReceiverReceive;
destReceiver->pub.rShutdown = TupleDestDestReceiverShutdown;
destReceiver->pub.rDestroy = TupleDestDestReceiverDestroy;
destReceiver->tupleDest = tupleDest;
destReceiver->task = task;
destReceiver->placementIndex = placementIndex;
return (DestReceiver *) destReceiver;
}
/*
* TupleDestDestReceiverStartup implements DestReceiver->rStartup for
* TupleDestDestReceiver.
*/
static void
TupleDestDestReceiverStartup(DestReceiver *destReceiver, int operation,
TupleDesc inputTupleDesc)
{
/* nothing to do */
}
/*
* TupleDestDestReceiverStartup implements DestReceiver->receiveSlot for
* TupleDestDestReceiver.
*/
static bool
TupleDestDestReceiverReceive(TupleTableSlot *slot,
DestReceiver *destReceiver)
{
TupleDestDestReceiver *tupleDestReceiver = (TupleDestDestReceiver *) destReceiver;
TupleDestination *tupleDest = tupleDestReceiver->tupleDest;
Task *task = tupleDestReceiver->task;
int placementIndex = tupleDestReceiver->placementIndex;
/*
* DestReceiver doesn't support multiple result sets with different shapes.
*/
Assert(task->queryCount == 1);
int queryNumber = 0;
#if PG_VERSION_NUM >= PG_VERSION_12
HeapTuple heapTuple = ExecFetchSlotHeapTuple(slot, true, NULL);
#else
HeapTuple heapTuple = ExecFetchSlotTuple(slot);
#endif
tupleDest->putTuple(tupleDest, task, placementIndex, queryNumber, heapTuple);
return true;
}
/*
* TupleDestDestReceiverStartup implements DestReceiver->rShutdown for
* TupleDestDestReceiver.
*/
static void
TupleDestDestReceiverShutdown(DestReceiver *destReceiver)
{
/* nothing to do */
}
/*
* TupleDestDestReceiverStartup implements DestReceiver->rDestroy for
* TupleDestDestReceiver.
*/
static void
TupleDestDestReceiverDestroy(DestReceiver *destReceiver)
{
/* nothing to do */
}

View File

@ -428,6 +428,7 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query)
{ {
task->taskQuery.queryType = TASK_QUERY_OBJECT; task->taskQuery.queryType = TASK_QUERY_OBJECT;
task->taskQuery.data.jobQueryReferenceForLazyDeparsing = query; task->taskQuery.data.jobQueryReferenceForLazyDeparsing = query;
task->queryCount = 1;
return; return;
} }
@ -446,11 +447,13 @@ SetTaskQueryString(Task *task, char *queryString)
if (queryString == NULL) if (queryString == NULL)
{ {
task->taskQuery.queryType = TASK_QUERY_NULL; task->taskQuery.queryType = TASK_QUERY_NULL;
task->queryCount = 0;
} }
else else
{ {
task->taskQuery.queryType = TASK_QUERY_TEXT; task->taskQuery.queryType = TASK_QUERY_TEXT;
task->taskQuery.data.queryStringLazy = queryString; task->taskQuery.data.queryStringLazy = queryString;
task->queryCount = 1;
} }
} }
@ -464,6 +467,7 @@ SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList)
Assert(perPlacementQueryStringList != NIL); Assert(perPlacementQueryStringList != NIL);
task->taskQuery.queryType = TASK_QUERY_TEXT_PER_PLACEMENT; task->taskQuery.queryType = TASK_QUERY_TEXT_PER_PLACEMENT;
task->taskQuery.data.perPlacementQueryStrings = perPlacementQueryStringList; task->taskQuery.data.perPlacementQueryStrings = perPlacementQueryStringList;
task->queryCount = 1;
} }
@ -476,6 +480,7 @@ SetTaskQueryStringList(Task *task, List *queryStringList)
Assert(queryStringList != NIL); Assert(queryStringList != NIL);
task->taskQuery.queryType = TASK_QUERY_TEXT_LIST; task->taskQuery.queryType = TASK_QUERY_TEXT_LIST;
task->taskQuery.data.queryStringList = queryStringList; task->taskQuery.data.queryStringList = queryStringList;
task->queryCount = list_length(queryStringList);
} }

View File

@ -327,6 +327,8 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_NODE_FIELD(rowValuesLists); COPY_NODE_FIELD(rowValuesLists);
COPY_SCALAR_FIELD(partiallyLocalOrRemote); COPY_SCALAR_FIELD(partiallyLocalOrRemote);
COPY_SCALAR_FIELD(parametersInQueryStringResolved); COPY_SCALAR_FIELD(parametersInQueryStringResolved);
COPY_SCALAR_FIELD(tupleDest);
COPY_SCALAR_FIELD(queryCount);
} }

View File

@ -12,6 +12,13 @@
#define LOCAL_EXECUTION_H #define LOCAL_EXECUTION_H
#include "distributed/citus_custom_scan.h" #include "distributed/citus_custom_scan.h"
#include "distributed/tuple_destination.h"
/*
* Used as TupleDestination->putTuple's placementIndex when executing
* local tasks.
*/
#define LOCAL_PLACEMENT_INDEX -1
/* enabled with GUCs*/ /* enabled with GUCs*/
extern bool EnableLocalExecution; extern bool EnableLocalExecution;
@ -27,13 +34,12 @@ typedef enum LocalExecutionStatus
extern enum LocalExecutionStatus CurrentLocalExecutionStatus; extern enum LocalExecutionStatus CurrentLocalExecutionStatus;
/* extern function declarations */ /* extern function declarations */
extern uint64 ExecuteLocalTaskList(List *taskList, extern uint64 ExecuteLocalTaskList(List *taskList, TupleDestination *defaultTupleDest);
Tuplestorestate *tupleStoreState);
extern uint64 ExecuteLocalUtilityTaskList(List *utilityTaskList); extern uint64 ExecuteLocalUtilityTaskList(List *utilityTaskList);
extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo
orig_paramListInfo, orig_paramListInfo,
DistributedPlan *distributedPlan, DistributedPlan *distributedPlan,
Tuplestorestate *tupleStoreState, TupleDestination *defaultTupleDest,
bool isUtilityCommand); bool isUtilityCommand);
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
List **localTaskList, List **remoteTaskList); List **localTaskList, List **remoteTaskList);

View File

@ -262,6 +262,8 @@ typedef struct TaskQuery
}data; }data;
}TaskQuery; }TaskQuery;
typedef struct TupleDestination TupleDestination;
typedef struct Task typedef struct Task
{ {
CitusNode type; CitusNode type;
@ -275,6 +277,12 @@ typedef struct Task
*/ */
TaskQuery taskQuery; TaskQuery taskQuery;
/*
* A task can have multiple queries, in which case queryCount will be > 1. If
* a task has more one query, then taskQuery->queryType == TASK_QUERY_TEXT_LIST.
*/
int queryCount;
Oid anchorDistributedTableId; /* only applies to insert tasks */ Oid anchorDistributedTableId; /* only applies to insert tasks */
uint64 anchorShardId; /* only applies to compute tasks */ uint64 anchorShardId; /* only applies to compute tasks */
List *taskPlacementList; /* only applies to compute tasks */ List *taskPlacementList; /* only applies to compute tasks */
@ -323,6 +331,12 @@ typedef struct Task
* query. * query.
*/ */
bool parametersInQueryStringResolved; bool parametersInQueryStringResolved;
/*
* Destination of tuples generated as a result of executing this task. Can be
* NULL, in which case executor might use a default destination.
*/
TupleDestination *tupleDest;
} Task; } Task;

View File

@ -0,0 +1,47 @@
/*-------------------------------------------------------------------------
*
* tuple_destination.h
* Tuple destination generic struct.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef TUPLE_DESTINATION_H
#define TUPLE_DESTINATION_H
#include "access/tupdesc.h"
#include "distributed/multi_physical_planner.h"
#include "tcop/dest.h"
#include "utils/tuplestore.h"
typedef struct TupleDestination TupleDestination;
/*
* TupleDestination provides a generic interface for where to send tuples.
*
* Users of the executor can set task->tupleDest for custom processing of
* the result tuples.
*
* Since a task can have multiple queries, methods of TupleDestination also
* accept a queryNumber parameter which denotes the index of the query that
* tuple belongs to.
*/
typedef struct TupleDestination
{
/* putTuple implements custom processing of a tuple */
void (*putTuple)(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple tuple);
/* tupleDescForQuery returns tuple descriptor for a query number. Can return NULL. */
TupleDesc (*tupleDescForQuery)(TupleDestination *self, int queryNumber);
} TupleDestination;
extern TupleDestination * CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc
tupleDescriptor);
extern TupleDestination * CreateTupleDestNone(void);
extern DestReceiver * CreateTupleDestDestReceiver(TupleDestination *tupleDest,
Task *task, int placementIndex);
#endif