diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index e4080cb0c..1a204cfe4 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -28,6 +28,7 @@ #include "distributed/adaptive_executor.h" #include "distributed/remote_commands.h" #include "distributed/shard_pruning.h" +#include "distributed/tuple_destination.h" #include "distributed/version_compat.h" #include "distributed/worker_manager.h" #include "distributed/worker_log_messages.h" @@ -193,8 +194,8 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize, localExecutionSupported ); - executionParams->tupleStore = tupleStore; - executionParams->tupleDescriptor = tupleDesc; + executionParams->tupleDestination = CreateTupleStoreTupleDest(tupleStore, + tupleDesc); executionParams->expectResults = expectResults; executionParams->xactProperties = xactProperties; ExecuteTaskListExtended(executionParams); diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index daaa75548..bc46b5364 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -930,6 +930,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, ); executionParams->xactProperties = DecideTransactionPropertiesForTaskList( modLevel, taskList, false); + return ExecuteTaskListExtended(executionParams); } @@ -939,9 +940,9 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, * for some of the arguments. */ uint64 -ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, - TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, - bool expectResults) +ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList, + TupleDestination *tupleDest, + bool expectResults) { int targetPoolSize = MaxAdaptiveExecutorPoolSize; bool localExecutionSupported = true; @@ -952,8 +953,7 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, executionParams->xactProperties = DecideTransactionPropertiesForTaskList( modLevel, taskList, false); executionParams->expectResults = expectResults; - executionParams->tupleStore = tupleStore; - executionParams->tupleDescriptor = tupleDescriptor; + executionParams->tupleDestination = tupleDest; return ExecuteTaskListExtended(executionParams); } @@ -971,16 +971,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) List *localTaskList = NIL; List *remoteTaskList = NIL; - TupleDestination *defaultTupleDest = NULL; - if (executionParams->tupleDescriptor != NULL) - { - defaultTupleDest = CreateTupleStoreTupleDest(executionParams->tupleStore, - executionParams->tupleDescriptor); - } - else - { - defaultTupleDest = CreateTupleDestNone(); - } + TupleDestination *defaultTupleDest = executionParams->tupleDestination; if (executionParams->localExecutionSupported && ShouldExecuteTasksLocally( executionParams->taskList)) @@ -1052,8 +1043,7 @@ CreateBasicExecutionParams(RowModifyLevel modLevel, executionParams->targetPoolSize = targetPoolSize; executionParams->localExecutionSupported = localExecutionSupported; - executionParams->tupleStore = NULL; - executionParams->tupleDescriptor = NULL; + executionParams->tupleDestination = CreateTupleDestNone(); executionParams->expectResults = false; executionParams->isUtilityCommand = false; executionParams->jobIdList = NIL; @@ -3495,9 +3485,7 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); int querySent = 0; - char *queryString = TaskQueryStringForPlacement(task, - placementExecution-> - placementExecutionIndex); + char *queryString = TaskQueryString(task); if (execution->transactionProperties->useRemoteTransactionBlocks != TRANSACTION_BLOCKS_DISALLOWED) diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index c4cc3e3c4..f28f82862 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -18,6 +18,7 @@ #include "miscadmin.h" #include "port.h" +#include "access/htup_details.h" #include "access/tupdesc.h" #include "catalog/pg_type.h" #include "distributed/deparse_shard_query.h" @@ -28,6 +29,7 @@ #include "distributed/multi_executor.h" #include "distributed/multi_physical_planner.h" #include "distributed/transaction_management.h" +#include "distributed/tuple_destination.h" #include "distributed/tuplestore.h" #include "distributed/worker_protocol.h" #include "tcop/pquery.h" @@ -36,6 +38,24 @@ #include "utils/lsyscache.h" +/* + * PartitioningTupleDest is internal representation of a TupleDestination + * which consumes queries constructed in WrapTasksForPartitioning. + */ +typedef struct PartitioningTupleDest +{ + TupleDestination pub; + + CitusTableCacheEntry *targetRelation; + + /* list of DistributedResultFragment pointer */ + List *fragmentList; + + /* what do tuples look like */ + TupleDesc tupleDesc; +} PartitioningTupleDest; + + /* * NodePair contains the source and destination node in a NodeToNodeFragmentsTransfer. * It is a separate struct to use it as a key in a hash table. @@ -59,23 +79,34 @@ typedef struct NodeToNodeFragmentsTransfer /* forward declarations of local functions */ -static void WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, - int partitionColumnIndex, - CitusTableCacheEntry *targetRelation, - bool binaryFormat); +static List * WrapTasksForPartitioning(const char *resultIdPrefix, + List *selectTaskList, + int partitionColumnIndex, + CitusTableCacheEntry *targetRelation, + bool binaryFormat); static List * ExecutePartitionTaskList(List *partitionTaskList, CitusTableCacheEntry *targetRelation); +static PartitioningTupleDest * CreatePartitioningTupleDest( + CitusTableCacheEntry *targetRelation); +static void PartitioningTupleDestPutTuple(TupleDestination *self, Task *task, + int placementIndex, int queryNumber, + HeapTuple heapTuple, uint64 tupleLibpqSize); +static TupleDesc PartitioningTupleDestTupleDescForQuery(TupleDestination *self, int + queryNumber); static ArrayType * CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int datumCount, Oid typeId); static void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount, Oid intervalTypeId, ArrayType **minValueArray, ArrayType **maxValueArray); static char * SourceShardPrefix(const char *resultPrefix, uint64 shardId); -static DistributedResultFragment * TupleToDistributedResultFragment( - TupleTableSlot *tupleSlot, CitusTableCacheEntry *targetRelation); -static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList, - TupleDesc resultDescriptor, - bool errorOnAnyFailure); +static DistributedResultFragment * TupleToDistributedResultFragment(HeapTuple heapTuple, + TupleDesc tupleDesc, + CitusTableCacheEntry * + targetRelation, + uint32 sourceNodeId); +static void ExecuteSelectTasksIntoTupleDest(List *taskList, + TupleDestination *tupleDestination, + bool errorOnAnyFailure); static List ** ColocateFragmentsWithRelation(List *fragmentList, CitusTableCacheEntry *targetRelation); static List * ColocationTransfers(List *fragmentList, @@ -157,9 +188,9 @@ PartitionTasklistResults(const char *resultIdPrefix, List *selectTaskList, */ UseCoordinatedTransaction(); - WrapTasksForPartitioning(resultIdPrefix, selectTaskList, - partitionColumnIndex, targetRelation, - binaryFormat); + selectTaskList = WrapTasksForPartitioning(resultIdPrefix, selectTaskList, + partitionColumnIndex, targetRelation, + binaryFormat); return ExecutePartitionTaskList(selectTaskList, targetRelation); } @@ -169,12 +200,13 @@ PartitionTasklistResults(const char *resultIdPrefix, List *selectTaskList, * to worker_partition_query_result(). Target list of the wrapped query should * match the tuple descriptor in ExecutePartitionTaskList(). */ -static void +static List * WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, int partitionColumnIndex, CitusTableCacheEntry *targetRelation, bool binaryFormat) { + List *wrappedTaskList = NIL; ShardInterval **shardIntervalArray = targetRelation->sortedShardIntervalArray; int shardCount = targetRelation->shardIntervalArrayLength; @@ -200,41 +232,105 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, Task *selectTask = NULL; foreach_ptr(selectTask, selectTaskList) { - List *shardPlacementList = selectTask->taskPlacementList; char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId); char *partitionMethodString = targetRelation->partitionMethod == 'h' ? "hash" : "range"; const char *binaryFormatString = binaryFormat ? "true" : "false"; List *perPlacementQueries = NIL; - /* - * We need to know which placement could successfully execute the query, - * so we form a different query per placement, each of which returning - * the node id of the placement. - */ - ShardPlacement *shardPlacement = NULL; - foreach_ptr(shardPlacement, shardPlacementList) - { - StringInfo wrappedQuery = makeStringInfo(); - appendStringInfo(wrappedQuery, - "SELECT %u::int, partition_index" - ", %s || '_' || partition_index::text " - ", rows_written " - "FROM worker_partition_query_result" - "(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0", - shardPlacement->nodeId, - quote_literal_cstr(taskPrefix), - quote_literal_cstr(taskPrefix), - quote_literal_cstr(TaskQueryStringForAllPlacements( - selectTask)), - partitionColumnIndex, - quote_literal_cstr(partitionMethodString), - minValuesString->data, maxValuesString->data, - binaryFormatString); - perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data); - } - SetTaskPerPlacementQueryStrings(selectTask, perPlacementQueries); + Task *wrappedSelectTask = copyObject(selectTask); + + StringInfo wrappedQuery = makeStringInfo(); + appendStringInfo(wrappedQuery, + "SELECT partition_index" + ", %s || '_' || partition_index::text " + ", rows_written " + "FROM worker_partition_query_result" + "(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0", + quote_literal_cstr(taskPrefix), + quote_literal_cstr(taskPrefix), + quote_literal_cstr(TaskQueryString(selectTask)), + partitionColumnIndex, + quote_literal_cstr(partitionMethodString), + minValuesString->data, maxValuesString->data, + binaryFormatString); + perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data); + + SetTaskQueryString(wrappedSelectTask, wrappedQuery->data); + wrappedTaskList = lappend(wrappedTaskList, wrappedSelectTask); } + + return wrappedTaskList; +} + + +/* + * CreatePartitioningTupleDest creates a TupleDestination which consumes results of + * tasks constructed in WrapTasksForPartitioning. + */ +static PartitioningTupleDest * +CreatePartitioningTupleDest(CitusTableCacheEntry *targetRelation) +{ + TupleDesc tupleDescriptor = NULL; + int resultColumnCount = 3; + +#if PG_VERSION_NUM >= PG_VERSION_12 + tupleDescriptor = CreateTemplateTupleDesc(resultColumnCount); +#else + tupleDescriptor = CreateTemplateTupleDesc(resultColumnCount, false); +#endif + + TupleDescInitEntry(tupleDescriptor, (AttrNumber) 1, "partition_index", + INT4OID, -1, 0); + TupleDescInitEntry(tupleDescriptor, (AttrNumber) 2, "result_id", + TEXTOID, -1, 0); + TupleDescInitEntry(tupleDescriptor, (AttrNumber) 3, "rows_written", + INT8OID, -1, 0); + + + PartitioningTupleDest *tupleDest = palloc0(sizeof(PartitioningTupleDest)); + tupleDest->targetRelation = targetRelation; + tupleDest->tupleDesc = tupleDescriptor; + tupleDest->pub.putTuple = PartitioningTupleDestPutTuple; + tupleDest->pub.tupleDescForQuery = + PartitioningTupleDestTupleDescForQuery; + + return tupleDest; +} + + +/* + * PartitioningTupleDestTupleDescForQuery implements TupleDestination->putTuple for + * PartitioningTupleDest. + */ +static void +PartitioningTupleDestPutTuple(TupleDestination *self, Task *task, + int placementIndex, int queryNumber, + HeapTuple heapTuple, uint64 tupleLibpqSize) +{ + PartitioningTupleDest *tupleDest = (PartitioningTupleDest *) self; + ShardPlacement *placement = list_nth(task->taskPlacementList, placementIndex); + + DistributedResultFragment *fragment = + TupleToDistributedResultFragment(heapTuple, tupleDest->tupleDesc, + tupleDest->targetRelation, + placement->nodeId); + tupleDest->fragmentList = lappend(tupleDest->fragmentList, fragment); +} + + +/* + * PartitioningTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery + * for PartitioningTupleDest. + */ +static TupleDesc +PartitioningTupleDestTupleDescForQuery(TupleDestination *self, int queryNumber) +{ + Assert(queryNumber == 0); + + PartitioningTupleDest *tupleDest = (PartitioningTupleDest *) self; + + return tupleDest->tupleDesc; } @@ -323,43 +419,13 @@ CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int datumCount, Oid t static List * ExecutePartitionTaskList(List *taskList, CitusTableCacheEntry *targetRelation) { - TupleDesc resultDescriptor = NULL; - Tuplestorestate *resultStore = NULL; - int resultColumnCount = 4; - -#if PG_VERSION_NUM >= PG_VERSION_12 - resultDescriptor = CreateTemplateTupleDesc(resultColumnCount); -#else - resultDescriptor = CreateTemplateTupleDesc(resultColumnCount, false); -#endif - - TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "node_id", - INT4OID, -1, 0); - TupleDescInitEntry(resultDescriptor, (AttrNumber) 2, "partition_index", - INT4OID, -1, 0); - TupleDescInitEntry(resultDescriptor, (AttrNumber) 3, "result_id", - TEXTOID, -1, 0); - TupleDescInitEntry(resultDescriptor, (AttrNumber) 4, "rows_written", - INT8OID, -1, 0); + PartitioningTupleDest *tupleDest = CreatePartitioningTupleDest(targetRelation); bool errorOnAnyFailure = false; - resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor, - errorOnAnyFailure); + ExecuteSelectTasksIntoTupleDest(taskList, (TupleDestination *) tupleDest, + errorOnAnyFailure); - List *fragmentList = NIL; - TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor, - &TTSOpsMinimalTuple); - while (tuplestore_gettupleslot(resultStore, true, false, slot)) - { - DistributedResultFragment *distributedResultFragment = - TupleToDistributedResultFragment(slot, targetRelation); - - fragmentList = lappend(fragmentList, distributedResultFragment); - - ExecClearTuple(slot); - } - - return fragmentList; + return tupleDest->fragmentList; } @@ -368,14 +434,15 @@ ExecutePartitionTaskList(List *taskList, CitusTableCacheEntry *targetRelation) * WrapTasksForPartitioning() to a DistributedResultFragment. */ static DistributedResultFragment * -TupleToDistributedResultFragment(TupleTableSlot *tupleSlot, - CitusTableCacheEntry *targetRelation) +TupleToDistributedResultFragment(HeapTuple tuple, + TupleDesc tupleDesc, + CitusTableCacheEntry *targetRelation, + uint32 sourceNodeId) { bool isNull = false; - uint32 sourceNodeId = DatumGetUInt32(slot_getattr(tupleSlot, 1, &isNull)); - uint32 targetShardIndex = DatumGetUInt32(slot_getattr(tupleSlot, 2, &isNull)); - text *resultId = DatumGetTextP(slot_getattr(tupleSlot, 3, &isNull)); - int64 rowCount = DatumGetInt64(slot_getattr(tupleSlot, 4, &isNull)); + uint32 targetShardIndex = DatumGetUInt32(heap_getattr(tuple, 1, tupleDesc, &isNull)); + text *resultId = DatumGetTextP(heap_getattr(tuple, 2, tupleDesc, &isNull)); + int64 rowCount = DatumGetInt64(heap_getattr(tuple, 3, tupleDesc, &isNull)); Assert(targetShardIndex < targetRelation->shardIntervalArrayLength); ShardInterval *shardInterval = @@ -395,26 +462,21 @@ TupleToDistributedResultFragment(TupleTableSlot *tupleSlot, /* - * ExecuteSelectTasksIntoTupleStore executes the given tasks and returns a tuple - * store containing its results. + * ExecuteSelectTasksIntoTupleDest executes the given tasks and forwards its result + * to the given destination. */ -static Tuplestorestate * -ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, - bool errorOnAnyFailure) +static void +ExecuteSelectTasksIntoTupleDest(List *taskList, TupleDestination *tupleDestination, + bool errorOnAnyFailure) { bool expectResults = true; int targetPoolSize = MaxAdaptiveExecutorPoolSize; - bool randomAccess = true; - bool interTransactions = false; TransactionProperties xactProperties = { .errorOnAnyFailure = errorOnAnyFailure, .useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED, .requires2PC = false }; - Tuplestorestate *resultStore = tuplestore_begin_heap(randomAccess, interTransactions, - work_mem); - /* * Local execution is not supported because here we use perPlacementQueryStrings. * Local execution does not know how to handle it. One solution is to extract and set @@ -425,14 +487,11 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, ExecutionParams *executionParams = CreateBasicExecutionParams( ROW_MODIFY_READONLY, taskList, targetPoolSize, localExecutionSupported ); - executionParams->tupleDescriptor = resultDescriptor; - executionParams->tupleStore = resultStore; + executionParams->tupleDestination = tupleDestination; executionParams->xactProperties = xactProperties; executionParams->expectResults = expectResults; ExecuteTaskListExtended(executionParams); - - return resultStore; } @@ -622,7 +681,6 @@ static void ExecuteFetchTaskList(List *taskList) { TupleDesc resultDescriptor = NULL; - Tuplestorestate *resultStore = NULL; int resultColumnCount = 1; #if PG_VERSION_NUM >= PG_VERSION_12 @@ -633,15 +691,8 @@ ExecuteFetchTaskList(List *taskList) TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "byte_count", INT8OID, -1, 0); + TupleDestination *tupleDestination = CreateTupleDestNone(); + bool errorOnAnyFailure = true; - resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor, - errorOnAnyFailure); - - TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor, - &TTSOpsMinimalTuple); - - while (tuplestore_gettupleslot(resultStore, true, false, slot)) - { - ExecClearTuple(slot); - } + ExecuteSelectTasksIntoTupleDest(taskList, tupleDestination, errorOnAnyFailure); } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index ac47f0109..3d4dbf3b0 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -200,10 +200,11 @@ NonPushableInsertSelectExecScan(CustomScanState *node) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); - uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, - taskList, tupleDescriptor, - scanState->tuplestorestate, - hasReturning); + TupleDestination *tupleDest = CreateTupleStoreTupleDest( + scanState->tuplestorestate, tupleDescriptor); + uint64 rowsInserted = ExecuteTaskListIntoTupleDest(ROW_MODIFY_COMMUTATIVE, + taskList, tupleDest, + hasReturning); executorState->es_processed = rowsInserted; } @@ -262,9 +263,11 @@ NonPushableInsertSelectExecScan(CustomScanState *node) tuplestore_begin_heap(randomAccess, interTransactions, work_mem); TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); - ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList, - tupleDescriptor, scanState->tuplestorestate, - hasReturning); + TupleDestination *tupleDest = CreateTupleStoreTupleDest( + scanState->tuplestorestate, tupleDescriptor); + + ExecuteTaskListIntoTupleDest(ROW_MODIFY_COMMUTATIVE, prunedTaskList, + tupleDest, hasReturning); if (SortReturning && hasReturning) { @@ -832,7 +835,7 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries) StringInfo wrappedQuery = makeStringInfo(); appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery", projectedColumnsString->data, - TaskQueryStringForAllPlacements(task)); + TaskQueryString(task)); SetTaskQueryString(task, wrappedQuery->data); } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index fd5bd48c7..5dce24ac7 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -228,7 +228,7 @@ ExecuteLocalTaskListExtended(List *taskList, if (isUtilityCommand) { - LocallyExecuteUtilityTask(TaskQueryStringForAllPlacements(task)); + LocallyExecuteUtilityTask(TaskQueryString(task)); continue; } @@ -280,7 +280,7 @@ ExecuteLocalTaskListExtended(List *taskList, continue; } - Query *shardQuery = ParseQueryString(TaskQueryStringForAllPlacements(task), + Query *shardQuery = ParseQueryString(TaskQueryString(task), taskParameterTypes, taskNumParams); @@ -301,7 +301,7 @@ ExecuteLocalTaskListExtended(List *taskList, char *shardQueryString = NULL; if (GetTaskQueryType(task) == TASK_QUERY_TEXT) { - shardQueryString = TaskQueryStringForAllPlacements(task); + shardQueryString = TaskQueryString(task); } else { @@ -431,7 +431,7 @@ LogLocalCommand(Task *task) } ereport(NOTICE, (errmsg("executing the command locally: %s", - ApplyLogRedaction(TaskQueryStringForAllPlacements(task))))); + ApplyLogRedaction(TaskQueryString(task))))); } diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 54ad1e8a6..a9df07251 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -1579,8 +1579,7 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task) */ StringInfo sqlTaskQueryString = makeStringInfo(); - char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringForAllPlacements( - task)); + char *escapedTaskQueryString = quote_literal_cstr(TaskQueryString(task)); if (BinaryMasterCopyFormat) { @@ -1616,7 +1615,7 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task) /* wrap a task assignment query outside the original query */ StringInfo taskAssignmentQuery = - TaskAssignmentQuery(task, TaskQueryStringForAllPlacements(task)); + TaskAssignmentQuery(task, TaskQueryString(task)); TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId, task->taskId); @@ -2753,7 +2752,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) { /* assign through task tracker to manage resource utilization */ StringInfo jobCleanupQuery = TaskAssignmentQuery( - jobCleanupTask, TaskQueryStringForAllPlacements(jobCleanupTask)); + jobCleanupTask, TaskQueryString(jobCleanupTask)); jobCleanupQuerySent = MultiClientSendQuery(taskTracker->connectionId, jobCleanupQuery->data); @@ -2832,7 +2831,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) nodeName, nodePort, (int) queryStatus), errhint("Manually clean job resources on node " "\"%s:%u\" by running \"%s\" ", nodeName, - nodePort, TaskQueryStringForAllPlacements( + nodePort, TaskQueryString( jobCleanupTask)))); } else @@ -2851,7 +2850,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) nodePort, (int) resultStatus), errhint("Manually clean job resources on node " "\"%s:%u\" by running \"%s\" ", nodeName, - nodePort, TaskQueryStringForAllPlacements( + nodePort, TaskQueryString( jobCleanupTask)))); } else diff --git a/src/backend/distributed/operations/delete_protocol.c b/src/backend/distributed/operations/delete_protocol.c index a70d5506a..1fb6b54bc 100644 --- a/src/backend/distributed/operations/delete_protocol.c +++ b/src/backend/distributed/operations/delete_protocol.c @@ -450,8 +450,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName, * connect to that node to drop the shard placement over that * remote connection. */ - const char *dropShardPlacementCommand = TaskQueryStringForAllPlacements( - task); + const char *dropShardPlacementCommand = TaskQueryString(task); ExecuteDropShardPlacementCommandRemotely(shardPlacement, relationName, dropShardPlacementCommand); diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 79fb80b5a..d568ff3c7 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -41,7 +41,6 @@ static void UpdateTaskQueryString(Query *query, Oid distributedTableId, static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); static bool ShouldLazyDeparseQuery(Task *task); static char * DeparseTaskQuery(Task *task, Query *query); -static bool IsEachPlacementQueryStringDifferent(Task *task); /* @@ -117,8 +116,7 @@ RebuildQueryStrings(Job *workerJob) ereport(DEBUG4, (errmsg("query before rebuilding: %s", !isQueryObjectOrText ? "(null)" - : ApplyLogRedaction(TaskQueryStringForAllPlacements( - task))))); + : ApplyLogRedaction(TaskQueryString(task))))); UpdateTaskQueryString(query, relationId, valuesRTE, task); @@ -129,8 +127,7 @@ RebuildQueryStrings(Job *workerJob) task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved; ereport(DEBUG4, (errmsg("query after rebuilding: %s", - ApplyLogRedaction(TaskQueryStringForAllPlacements( - task))))); + ApplyLogRedaction(TaskQueryString(task))))); } } @@ -464,19 +461,6 @@ SetTaskQueryString(Task *task, char *queryString) } -/* - * SetTaskPerPlacementQueryStrings set the perPlacementQueryString for the given task. - */ -void -SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList) -{ - Assert(perPlacementQueryStringList != NIL); - task->taskQuery.queryType = TASK_QUERY_TEXT_PER_PLACEMENT; - task->taskQuery.data.perPlacementQueryStrings = perPlacementQueryStringList; - task->queryCount = 1; -} - - /* * SetTaskQueryStringList sets the queryStringList of the given task. */ @@ -530,14 +514,14 @@ GetTaskQueryType(Task *task) /* - * TaskQueryStringForAllPlacements generates task query string text if missing. + * TaskQueryString generates task query string text if missing. * * For performance reasons, the queryString is generated lazily. For example * for local queries it is usually not needed to generate it, so this way we * can skip the expensive deparsing+parsing. */ char * -TaskQueryStringForAllPlacements(Task *task) +TaskQueryString(Task *task) { int taskQueryType = GetTaskQueryType(task); if (taskQueryType == TASK_QUERY_NULL) @@ -562,8 +546,6 @@ TaskQueryStringForAllPlacements(Task *task) /* * At this point task query type should be TASK_QUERY_OBJECT. - * if someone calls this method inappropriately with TASK_QUERY_TEXT_PER_PLACEMENT case - * (instead of TaskQueryStringForPlacement), they will hit this assert. */ Assert(task->taskQuery.queryType == TASK_QUERY_OBJECT && jobQueryReferenceForLazyDeparsing != NULL); @@ -584,32 +566,3 @@ TaskQueryStringForAllPlacements(Task *task) SetTaskQueryString(task, queryString); return task->taskQuery.data.queryStringLazy; } - - -/* - * TaskQueryStringForPlacement returns the query string that should be executed - * on the placement with the given placementIndex. - */ -char * -TaskQueryStringForPlacement(Task *task, int placementIndex) -{ - if (IsEachPlacementQueryStringDifferent(task)) - { - List *perPlacementQueryStringList = - task->taskQuery.data.perPlacementQueryStrings; - Assert(list_length(perPlacementQueryStringList) > placementIndex); - return list_nth(perPlacementQueryStringList, placementIndex); - } - return TaskQueryStringForAllPlacements(task); -} - - -/* - * IsEachPlacementQueryStringDifferent returns true if each placement - * has a different query string. - */ -static bool -IsEachPlacementQueryStringDifferent(Task *task) -{ - return GetTaskQueryType(task) == TASK_QUERY_TEXT_PER_PLACEMENT; -} diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 1eb3354b1..e3020d1b4 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -606,9 +606,7 @@ FetchRemoteExplainFromWorkers(Task *task, ExplainState *es) RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0( sizeof(RemoteExplainPlan)); - StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements( - task), - es); + StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryString(task), es); /* * Use a coordinated transaction to ensure that we open a transaction block @@ -694,7 +692,7 @@ ExplainTask(CitusScanState *scanState, Task *task, int placementIndex, if (es->verbose) { - const char *queryText = TaskQueryStringForAllPlacements(task); + const char *queryText = TaskQueryString(task); ExplainPropertyText("Query", queryText, es); } @@ -1312,7 +1310,7 @@ ExplainAnalyzeTaskList(List *originalTaskList, } Task *explainAnalyzeTask = copyObject(originalTask); - const char *queryString = TaskQueryStringForAllPlacements(explainAnalyzeTask); + const char *queryString = TaskQueryString(explainAnalyzeTask); char *wrappedQuery = WrapQueryForExplainAnalyze(queryString, tupleDesc); char *fetchQuery = "SELECT explain_analyze_output FROM worker_last_saved_explain_analyze()"; diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index df8c4ec9b..7545267ba 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -4534,7 +4534,7 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, /* wrap repartition query string around filter query string */ StringInfo mapQueryString = makeStringInfo(); - char *filterQueryString = TaskQueryStringForAllPlacements(filterTask); + char *filterQueryString = TaskQueryString(filterTask); char *filterQueryEscapedText = quote_literal_cstr(filterQueryString); PartitionType partitionType = mapMergeJob->partitionType; diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 3314fd8a0..e676dabe8 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -284,12 +284,6 @@ CopyTaskQuery(Task *newnode, Task *from) break; } - case TASK_QUERY_TEXT_PER_PLACEMENT: - { - COPY_STRING_LIST(taskQuery.data.perPlacementQueryStrings); - break; - } - case TASK_QUERY_TEXT_LIST: { COPY_STRING_LIST(taskQuery.data.queryStringList); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 1dd20a825..903b94e94 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -490,12 +490,6 @@ static void WriteTaskQuery(OUTFUNC_ARGS) { break; } - case TASK_QUERY_TEXT_PER_PLACEMENT: - { - WRITE_NODE_FIELD(taskQuery.data.perPlacementQueryStrings); - break; - } - case TASK_QUERY_TEXT_LIST: { WRITE_NODE_FIELD(taskQuery.data.queryStringList); diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 7eef84675..9ab4cdbff 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -26,10 +26,7 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryStringList(Task *task, List *queryStringList); -extern void SetTaskPerPlacementQueryStrings(Task *task, - List *perPlacementQueryStringList); -extern char * TaskQueryStringForAllPlacements(Task *task); -extern char * TaskQueryStringForPlacement(Task *task, int placementIndex); +extern char * TaskQueryString(Task *task); extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList); extern int GetTaskQueryType(Task *task); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 1aed861b0..675734db9 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -17,6 +17,7 @@ #include "distributed/citus_custom_scan.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" +#include "distributed/tuple_destination.h" /* managed via guc.c */ @@ -87,11 +88,8 @@ typedef struct ExecutionParams /* taskList contains the tasks for the execution.*/ List *taskList; - /* tupleDescriptor contains the description for the result tuples.*/ - TupleDesc tupleDescriptor; - - /* tupleStore is where the results will be stored for this execution */ - Tuplestorestate *tupleStore; + /* where to forward each tuple received */ + TupleDestination *tupleDestination; /* expectResults is true if this execution will return some result. */ bool expectResults; @@ -120,10 +118,9 @@ ExecutionParams * CreateBasicExecutionParams(RowModifyLevel modLevel, bool localExecutionSupported); extern uint64 ExecuteTaskListExtended(ExecutionParams *executionParams); -extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, - TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore, - bool expectResults); +extern uint64 ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList, + TupleDestination *tupleDest, + bool expectResults); extern bool IsCitusCustomState(PlanState *planState); extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index c21659088..cb7f160d2 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -208,7 +208,6 @@ typedef enum TaskQueryType TASK_QUERY_NULL, TASK_QUERY_TEXT, TASK_QUERY_OBJECT, - TASK_QUERY_TEXT_PER_PLACEMENT, TASK_QUERY_TEXT_LIST } TaskQueryType; @@ -238,20 +237,15 @@ typedef struct TaskQuery /* * In almost all cases queryStringLazy should be read only indirectly by - * using TaskQueryStringForAllPlacements(). This will populate the field if only the + * using TaskQueryString(). This will populate the field if only the * jobQueryReferenceForLazyDeparsing field is not NULL. * * This field should only be set by using SetTaskQueryString() (or as a - * side effect from TaskQueryStringForAllPlacements()). Otherwise it might not be in sync + * side effect from TaskQueryString()). Otherwise it might not be in sync * with jobQueryReferenceForLazyDeparsing. */ char *queryStringLazy; - /* - * perPlacementQueryStrings is used when we have different query strings for each placement. - */ - List *perPlacementQueryStrings; - /* * queryStringList contains query strings. They should be * run sequentially. The concatenated version of this list