Use TupleDestination API for partitioning in insert/select.

pull/4037/head
Hadi Moshayedi 2020-07-16 14:25:09 -07:00
parent f323033ce8
commit 13003d8d05
15 changed files with 206 additions and 238 deletions

View File

@ -28,6 +28,7 @@
#include "distributed/adaptive_executor.h" #include "distributed/adaptive_executor.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/shard_pruning.h" #include "distributed/shard_pruning.h"
#include "distributed/tuple_destination.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_log_messages.h" #include "distributed/worker_log_messages.h"
@ -193,8 +194,8 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize, ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize,
localExecutionSupported localExecutionSupported
); );
executionParams->tupleStore = tupleStore; executionParams->tupleDestination = CreateTupleStoreTupleDest(tupleStore,
executionParams->tupleDescriptor = tupleDesc; tupleDesc);
executionParams->expectResults = expectResults; executionParams->expectResults = expectResults;
executionParams->xactProperties = xactProperties; executionParams->xactProperties = xactProperties;
ExecuteTaskListExtended(executionParams); ExecuteTaskListExtended(executionParams);

View File

@ -930,6 +930,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList,
); );
executionParams->xactProperties = DecideTransactionPropertiesForTaskList( executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
modLevel, taskList, false); modLevel, taskList, false);
return ExecuteTaskListExtended(executionParams); return ExecuteTaskListExtended(executionParams);
} }
@ -939,8 +940,8 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList,
* for some of the arguments. * for some of the arguments.
*/ */
uint64 uint64
ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, TupleDestination *tupleDest,
bool expectResults) bool expectResults)
{ {
int targetPoolSize = MaxAdaptiveExecutorPoolSize; int targetPoolSize = MaxAdaptiveExecutorPoolSize;
@ -952,8 +953,7 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
executionParams->xactProperties = DecideTransactionPropertiesForTaskList( executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
modLevel, taskList, false); modLevel, taskList, false);
executionParams->expectResults = expectResults; executionParams->expectResults = expectResults;
executionParams->tupleStore = tupleStore; executionParams->tupleDestination = tupleDest;
executionParams->tupleDescriptor = tupleDescriptor;
return ExecuteTaskListExtended(executionParams); return ExecuteTaskListExtended(executionParams);
} }
@ -971,16 +971,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
List *localTaskList = NIL; List *localTaskList = NIL;
List *remoteTaskList = NIL; List *remoteTaskList = NIL;
TupleDestination *defaultTupleDest = NULL; TupleDestination *defaultTupleDest = executionParams->tupleDestination;
if (executionParams->tupleDescriptor != NULL)
{
defaultTupleDest = CreateTupleStoreTupleDest(executionParams->tupleStore,
executionParams->tupleDescriptor);
}
else
{
defaultTupleDest = CreateTupleDestNone();
}
if (executionParams->localExecutionSupported && ShouldExecuteTasksLocally( if (executionParams->localExecutionSupported && ShouldExecuteTasksLocally(
executionParams->taskList)) executionParams->taskList))
@ -1052,8 +1043,7 @@ CreateBasicExecutionParams(RowModifyLevel modLevel,
executionParams->targetPoolSize = targetPoolSize; executionParams->targetPoolSize = targetPoolSize;
executionParams->localExecutionSupported = localExecutionSupported; executionParams->localExecutionSupported = localExecutionSupported;
executionParams->tupleStore = NULL; executionParams->tupleDestination = CreateTupleDestNone();
executionParams->tupleDescriptor = NULL;
executionParams->expectResults = false; executionParams->expectResults = false;
executionParams->isUtilityCommand = false; executionParams->isUtilityCommand = false;
executionParams->jobIdList = NIL; executionParams->jobIdList = NIL;
@ -3495,9 +3485,7 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
int querySent = 0; int querySent = 0;
char *queryString = TaskQueryStringForPlacement(task, char *queryString = TaskQueryString(task);
placementExecution->
placementExecutionIndex);
if (execution->transactionProperties->useRemoteTransactionBlocks != if (execution->transactionProperties->useRemoteTransactionBlocks !=
TRANSACTION_BLOCKS_DISALLOWED) TRANSACTION_BLOCKS_DISALLOWED)

View File

@ -18,6 +18,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "port.h" #include "port.h"
#include "access/htup_details.h"
#include "access/tupdesc.h" #include "access/tupdesc.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
@ -28,6 +29,7 @@
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/tuple_destination.h"
#include "distributed/tuplestore.h" #include "distributed/tuplestore.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "tcop/pquery.h" #include "tcop/pquery.h"
@ -36,6 +38,24 @@
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
/*
* PartitioningTupleDest is internal representation of a TupleDestination
* which consumes queries constructed in WrapTasksForPartitioning.
*/
typedef struct PartitioningTupleDest
{
TupleDestination pub;
CitusTableCacheEntry *targetRelation;
/* list of DistributedResultFragment pointer */
List *fragmentList;
/* what do tuples look like */
TupleDesc tupleDesc;
} PartitioningTupleDest;
/* /*
* NodePair contains the source and destination node in a NodeToNodeFragmentsTransfer. * NodePair contains the source and destination node in a NodeToNodeFragmentsTransfer.
* It is a separate struct to use it as a key in a hash table. * It is a separate struct to use it as a key in a hash table.
@ -59,22 +79,33 @@ typedef struct NodeToNodeFragmentsTransfer
/* forward declarations of local functions */ /* forward declarations of local functions */
static void WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, static List * WrapTasksForPartitioning(const char *resultIdPrefix,
List *selectTaskList,
int partitionColumnIndex, int partitionColumnIndex,
CitusTableCacheEntry *targetRelation, CitusTableCacheEntry *targetRelation,
bool binaryFormat); bool binaryFormat);
static List * ExecutePartitionTaskList(List *partitionTaskList, static List * ExecutePartitionTaskList(List *partitionTaskList,
CitusTableCacheEntry *targetRelation); CitusTableCacheEntry *targetRelation);
static PartitioningTupleDest * CreatePartitioningTupleDest(
CitusTableCacheEntry *targetRelation);
static void PartitioningTupleDestPutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple, uint64 tupleLibpqSize);
static TupleDesc PartitioningTupleDestTupleDescForQuery(TupleDestination *self, int
queryNumber);
static ArrayType * CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int static ArrayType * CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int
datumCount, Oid typeId); datumCount, Oid typeId);
static void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount, static void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount,
Oid intervalTypeId, ArrayType **minValueArray, Oid intervalTypeId, ArrayType **minValueArray,
ArrayType **maxValueArray); ArrayType **maxValueArray);
static char * SourceShardPrefix(const char *resultPrefix, uint64 shardId); static char * SourceShardPrefix(const char *resultPrefix, uint64 shardId);
static DistributedResultFragment * TupleToDistributedResultFragment( static DistributedResultFragment * TupleToDistributedResultFragment(HeapTuple heapTuple,
TupleTableSlot *tupleSlot, CitusTableCacheEntry *targetRelation); TupleDesc tupleDesc,
static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList, CitusTableCacheEntry *
TupleDesc resultDescriptor, targetRelation,
uint32 sourceNodeId);
static void ExecuteSelectTasksIntoTupleDest(List *taskList,
TupleDestination *tupleDestination,
bool errorOnAnyFailure); bool errorOnAnyFailure);
static List ** ColocateFragmentsWithRelation(List *fragmentList, static List ** ColocateFragmentsWithRelation(List *fragmentList,
CitusTableCacheEntry *targetRelation); CitusTableCacheEntry *targetRelation);
@ -157,7 +188,7 @@ PartitionTasklistResults(const char *resultIdPrefix, List *selectTaskList,
*/ */
UseCoordinatedTransaction(); UseCoordinatedTransaction();
WrapTasksForPartitioning(resultIdPrefix, selectTaskList, selectTaskList = WrapTasksForPartitioning(resultIdPrefix, selectTaskList,
partitionColumnIndex, targetRelation, partitionColumnIndex, targetRelation,
binaryFormat); binaryFormat);
return ExecutePartitionTaskList(selectTaskList, targetRelation); return ExecutePartitionTaskList(selectTaskList, targetRelation);
@ -169,12 +200,13 @@ PartitionTasklistResults(const char *resultIdPrefix, List *selectTaskList,
* to worker_partition_query_result(). Target list of the wrapped query should * to worker_partition_query_result(). Target list of the wrapped query should
* match the tuple descriptor in ExecutePartitionTaskList(). * match the tuple descriptor in ExecutePartitionTaskList().
*/ */
static void static List *
WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList,
int partitionColumnIndex, int partitionColumnIndex,
CitusTableCacheEntry *targetRelation, CitusTableCacheEntry *targetRelation,
bool binaryFormat) bool binaryFormat)
{ {
List *wrappedTaskList = NIL;
ShardInterval **shardIntervalArray = targetRelation->sortedShardIntervalArray; ShardInterval **shardIntervalArray = targetRelation->sortedShardIntervalArray;
int shardCount = targetRelation->shardIntervalArrayLength; int shardCount = targetRelation->shardIntervalArrayLength;
@ -200,41 +232,105 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList,
Task *selectTask = NULL; Task *selectTask = NULL;
foreach_ptr(selectTask, selectTaskList) foreach_ptr(selectTask, selectTaskList)
{ {
List *shardPlacementList = selectTask->taskPlacementList;
char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId); char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId);
char *partitionMethodString = targetRelation->partitionMethod == 'h' ? char *partitionMethodString = targetRelation->partitionMethod == 'h' ?
"hash" : "range"; "hash" : "range";
const char *binaryFormatString = binaryFormat ? "true" : "false"; const char *binaryFormatString = binaryFormat ? "true" : "false";
List *perPlacementQueries = NIL; List *perPlacementQueries = NIL;
/* Task *wrappedSelectTask = copyObject(selectTask);
* We need to know which placement could successfully execute the query,
* so we form a different query per placement, each of which returning
* the node id of the placement.
*/
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{
StringInfo wrappedQuery = makeStringInfo(); StringInfo wrappedQuery = makeStringInfo();
appendStringInfo(wrappedQuery, appendStringInfo(wrappedQuery,
"SELECT %u::int, partition_index" "SELECT partition_index"
", %s || '_' || partition_index::text " ", %s || '_' || partition_index::text "
", rows_written " ", rows_written "
"FROM worker_partition_query_result" "FROM worker_partition_query_result"
"(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0", "(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0",
shardPlacement->nodeId,
quote_literal_cstr(taskPrefix), quote_literal_cstr(taskPrefix),
quote_literal_cstr(taskPrefix), quote_literal_cstr(taskPrefix),
quote_literal_cstr(TaskQueryStringForAllPlacements( quote_literal_cstr(TaskQueryString(selectTask)),
selectTask)),
partitionColumnIndex, partitionColumnIndex,
quote_literal_cstr(partitionMethodString), quote_literal_cstr(partitionMethodString),
minValuesString->data, maxValuesString->data, minValuesString->data, maxValuesString->data,
binaryFormatString); binaryFormatString);
perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data); perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data);
SetTaskQueryString(wrappedSelectTask, wrappedQuery->data);
wrappedTaskList = lappend(wrappedTaskList, wrappedSelectTask);
} }
SetTaskPerPlacementQueryStrings(selectTask, perPlacementQueries);
return wrappedTaskList;
} }
/*
* CreatePartitioningTupleDest creates a TupleDestination which consumes results of
* tasks constructed in WrapTasksForPartitioning.
*/
static PartitioningTupleDest *
CreatePartitioningTupleDest(CitusTableCacheEntry *targetRelation)
{
TupleDesc tupleDescriptor = NULL;
int resultColumnCount = 3;
#if PG_VERSION_NUM >= PG_VERSION_12
tupleDescriptor = CreateTemplateTupleDesc(resultColumnCount);
#else
tupleDescriptor = CreateTemplateTupleDesc(resultColumnCount, false);
#endif
TupleDescInitEntry(tupleDescriptor, (AttrNumber) 1, "partition_index",
INT4OID, -1, 0);
TupleDescInitEntry(tupleDescriptor, (AttrNumber) 2, "result_id",
TEXTOID, -1, 0);
TupleDescInitEntry(tupleDescriptor, (AttrNumber) 3, "rows_written",
INT8OID, -1, 0);
PartitioningTupleDest *tupleDest = palloc0(sizeof(PartitioningTupleDest));
tupleDest->targetRelation = targetRelation;
tupleDest->tupleDesc = tupleDescriptor;
tupleDest->pub.putTuple = PartitioningTupleDestPutTuple;
tupleDest->pub.tupleDescForQuery =
PartitioningTupleDestTupleDescForQuery;
return tupleDest;
}
/*
* PartitioningTupleDestTupleDescForQuery implements TupleDestination->putTuple for
* PartitioningTupleDest.
*/
static void
PartitioningTupleDestPutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple, uint64 tupleLibpqSize)
{
PartitioningTupleDest *tupleDest = (PartitioningTupleDest *) self;
ShardPlacement *placement = list_nth(task->taskPlacementList, placementIndex);
DistributedResultFragment *fragment =
TupleToDistributedResultFragment(heapTuple, tupleDest->tupleDesc,
tupleDest->targetRelation,
placement->nodeId);
tupleDest->fragmentList = lappend(tupleDest->fragmentList, fragment);
}
/*
* PartitioningTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery
* for PartitioningTupleDest.
*/
static TupleDesc
PartitioningTupleDestTupleDescForQuery(TupleDestination *self, int queryNumber)
{
Assert(queryNumber == 0);
PartitioningTupleDest *tupleDest = (PartitioningTupleDest *) self;
return tupleDest->tupleDesc;
} }
@ -323,43 +419,13 @@ CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int datumCount, Oid t
static List * static List *
ExecutePartitionTaskList(List *taskList, CitusTableCacheEntry *targetRelation) ExecutePartitionTaskList(List *taskList, CitusTableCacheEntry *targetRelation)
{ {
TupleDesc resultDescriptor = NULL; PartitioningTupleDest *tupleDest = CreatePartitioningTupleDest(targetRelation);
Tuplestorestate *resultStore = NULL;
int resultColumnCount = 4;
#if PG_VERSION_NUM >= PG_VERSION_12
resultDescriptor = CreateTemplateTupleDesc(resultColumnCount);
#else
resultDescriptor = CreateTemplateTupleDesc(resultColumnCount, false);
#endif
TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "node_id",
INT4OID, -1, 0);
TupleDescInitEntry(resultDescriptor, (AttrNumber) 2, "partition_index",
INT4OID, -1, 0);
TupleDescInitEntry(resultDescriptor, (AttrNumber) 3, "result_id",
TEXTOID, -1, 0);
TupleDescInitEntry(resultDescriptor, (AttrNumber) 4, "rows_written",
INT8OID, -1, 0);
bool errorOnAnyFailure = false; bool errorOnAnyFailure = false;
resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor, ExecuteSelectTasksIntoTupleDest(taskList, (TupleDestination *) tupleDest,
errorOnAnyFailure); errorOnAnyFailure);
List *fragmentList = NIL; return tupleDest->fragmentList;
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor,
&TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(resultStore, true, false, slot))
{
DistributedResultFragment *distributedResultFragment =
TupleToDistributedResultFragment(slot, targetRelation);
fragmentList = lappend(fragmentList, distributedResultFragment);
ExecClearTuple(slot);
}
return fragmentList;
} }
@ -368,14 +434,15 @@ ExecutePartitionTaskList(List *taskList, CitusTableCacheEntry *targetRelation)
* WrapTasksForPartitioning() to a DistributedResultFragment. * WrapTasksForPartitioning() to a DistributedResultFragment.
*/ */
static DistributedResultFragment * static DistributedResultFragment *
TupleToDistributedResultFragment(TupleTableSlot *tupleSlot, TupleToDistributedResultFragment(HeapTuple tuple,
CitusTableCacheEntry *targetRelation) TupleDesc tupleDesc,
CitusTableCacheEntry *targetRelation,
uint32 sourceNodeId)
{ {
bool isNull = false; bool isNull = false;
uint32 sourceNodeId = DatumGetUInt32(slot_getattr(tupleSlot, 1, &isNull)); uint32 targetShardIndex = DatumGetUInt32(heap_getattr(tuple, 1, tupleDesc, &isNull));
uint32 targetShardIndex = DatumGetUInt32(slot_getattr(tupleSlot, 2, &isNull)); text *resultId = DatumGetTextP(heap_getattr(tuple, 2, tupleDesc, &isNull));
text *resultId = DatumGetTextP(slot_getattr(tupleSlot, 3, &isNull)); int64 rowCount = DatumGetInt64(heap_getattr(tuple, 3, tupleDesc, &isNull));
int64 rowCount = DatumGetInt64(slot_getattr(tupleSlot, 4, &isNull));
Assert(targetShardIndex < targetRelation->shardIntervalArrayLength); Assert(targetShardIndex < targetRelation->shardIntervalArrayLength);
ShardInterval *shardInterval = ShardInterval *shardInterval =
@ -395,26 +462,21 @@ TupleToDistributedResultFragment(TupleTableSlot *tupleSlot,
/* /*
* ExecuteSelectTasksIntoTupleStore executes the given tasks and returns a tuple * ExecuteSelectTasksIntoTupleDest executes the given tasks and forwards its result
* store containing its results. * to the given destination.
*/ */
static Tuplestorestate * static void
ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, ExecuteSelectTasksIntoTupleDest(List *taskList, TupleDestination *tupleDestination,
bool errorOnAnyFailure) bool errorOnAnyFailure)
{ {
bool expectResults = true; bool expectResults = true;
int targetPoolSize = MaxAdaptiveExecutorPoolSize; int targetPoolSize = MaxAdaptiveExecutorPoolSize;
bool randomAccess = true;
bool interTransactions = false;
TransactionProperties xactProperties = { TransactionProperties xactProperties = {
.errorOnAnyFailure = errorOnAnyFailure, .errorOnAnyFailure = errorOnAnyFailure,
.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED, .useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED,
.requires2PC = false .requires2PC = false
}; };
Tuplestorestate *resultStore = tuplestore_begin_heap(randomAccess, interTransactions,
work_mem);
/* /*
* Local execution is not supported because here we use perPlacementQueryStrings. * Local execution is not supported because here we use perPlacementQueryStrings.
* Local execution does not know how to handle it. One solution is to extract and set * Local execution does not know how to handle it. One solution is to extract and set
@ -425,14 +487,11 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor,
ExecutionParams *executionParams = CreateBasicExecutionParams( ExecutionParams *executionParams = CreateBasicExecutionParams(
ROW_MODIFY_READONLY, taskList, targetPoolSize, localExecutionSupported ROW_MODIFY_READONLY, taskList, targetPoolSize, localExecutionSupported
); );
executionParams->tupleDescriptor = resultDescriptor; executionParams->tupleDestination = tupleDestination;
executionParams->tupleStore = resultStore;
executionParams->xactProperties = xactProperties; executionParams->xactProperties = xactProperties;
executionParams->expectResults = expectResults; executionParams->expectResults = expectResults;
ExecuteTaskListExtended(executionParams); ExecuteTaskListExtended(executionParams);
return resultStore;
} }
@ -622,7 +681,6 @@ static void
ExecuteFetchTaskList(List *taskList) ExecuteFetchTaskList(List *taskList)
{ {
TupleDesc resultDescriptor = NULL; TupleDesc resultDescriptor = NULL;
Tuplestorestate *resultStore = NULL;
int resultColumnCount = 1; int resultColumnCount = 1;
#if PG_VERSION_NUM >= PG_VERSION_12 #if PG_VERSION_NUM >= PG_VERSION_12
@ -633,15 +691,8 @@ ExecuteFetchTaskList(List *taskList)
TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "byte_count", INT8OID, -1, 0); TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "byte_count", INT8OID, -1, 0);
TupleDestination *tupleDestination = CreateTupleDestNone();
bool errorOnAnyFailure = true; bool errorOnAnyFailure = true;
resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor, ExecuteSelectTasksIntoTupleDest(taskList, tupleDestination, errorOnAnyFailure);
errorOnAnyFailure);
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor,
&TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(resultStore, true, false, slot))
{
ExecClearTuple(slot);
}
} }

View File

@ -200,9 +200,10 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
scanState->tuplestorestate = scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem); tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, TupleDestination *tupleDest = CreateTupleStoreTupleDest(
taskList, tupleDescriptor, scanState->tuplestorestate, tupleDescriptor);
scanState->tuplestorestate, uint64 rowsInserted = ExecuteTaskListIntoTupleDest(ROW_MODIFY_COMMUTATIVE,
taskList, tupleDest,
hasReturning); hasReturning);
executorState->es_processed = rowsInserted; executorState->es_processed = rowsInserted;
@ -262,9 +263,11 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
tuplestore_begin_heap(randomAccess, interTransactions, work_mem); tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList, TupleDestination *tupleDest = CreateTupleStoreTupleDest(
tupleDescriptor, scanState->tuplestorestate, scanState->tuplestorestate, tupleDescriptor);
hasReturning);
ExecuteTaskListIntoTupleDest(ROW_MODIFY_COMMUTATIVE, prunedTaskList,
tupleDest, hasReturning);
if (SortReturning && hasReturning) if (SortReturning && hasReturning)
{ {
@ -832,7 +835,7 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
StringInfo wrappedQuery = makeStringInfo(); StringInfo wrappedQuery = makeStringInfo();
appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery", appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery",
projectedColumnsString->data, projectedColumnsString->data,
TaskQueryStringForAllPlacements(task)); TaskQueryString(task));
SetTaskQueryString(task, wrappedQuery->data); SetTaskQueryString(task, wrappedQuery->data);
} }
} }

View File

@ -228,7 +228,7 @@ ExecuteLocalTaskListExtended(List *taskList,
if (isUtilityCommand) if (isUtilityCommand)
{ {
LocallyExecuteUtilityTask(TaskQueryStringForAllPlacements(task)); LocallyExecuteUtilityTask(TaskQueryString(task));
continue; continue;
} }
@ -280,7 +280,7 @@ ExecuteLocalTaskListExtended(List *taskList,
continue; continue;
} }
Query *shardQuery = ParseQueryString(TaskQueryStringForAllPlacements(task), Query *shardQuery = ParseQueryString(TaskQueryString(task),
taskParameterTypes, taskParameterTypes,
taskNumParams); taskNumParams);
@ -301,7 +301,7 @@ ExecuteLocalTaskListExtended(List *taskList,
char *shardQueryString = NULL; char *shardQueryString = NULL;
if (GetTaskQueryType(task) == TASK_QUERY_TEXT) if (GetTaskQueryType(task) == TASK_QUERY_TEXT)
{ {
shardQueryString = TaskQueryStringForAllPlacements(task); shardQueryString = TaskQueryString(task);
} }
else else
{ {
@ -431,7 +431,7 @@ LogLocalCommand(Task *task)
} }
ereport(NOTICE, (errmsg("executing the command locally: %s", ereport(NOTICE, (errmsg("executing the command locally: %s",
ApplyLogRedaction(TaskQueryStringForAllPlacements(task))))); ApplyLogRedaction(TaskQueryString(task)))));
} }

View File

@ -1579,8 +1579,7 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task)
*/ */
StringInfo sqlTaskQueryString = makeStringInfo(); StringInfo sqlTaskQueryString = makeStringInfo();
char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringForAllPlacements( char *escapedTaskQueryString = quote_literal_cstr(TaskQueryString(task));
task));
if (BinaryMasterCopyFormat) if (BinaryMasterCopyFormat)
{ {
@ -1616,7 +1615,7 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task)
/* wrap a task assignment query outside the original query */ /* wrap a task assignment query outside the original query */
StringInfo taskAssignmentQuery = StringInfo taskAssignmentQuery =
TaskAssignmentQuery(task, TaskQueryStringForAllPlacements(task)); TaskAssignmentQuery(task, TaskQueryString(task));
TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId, TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId,
task->taskId); task->taskId);
@ -2753,7 +2752,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
{ {
/* assign through task tracker to manage resource utilization */ /* assign through task tracker to manage resource utilization */
StringInfo jobCleanupQuery = TaskAssignmentQuery( StringInfo jobCleanupQuery = TaskAssignmentQuery(
jobCleanupTask, TaskQueryStringForAllPlacements(jobCleanupTask)); jobCleanupTask, TaskQueryString(jobCleanupTask));
jobCleanupQuerySent = MultiClientSendQuery(taskTracker->connectionId, jobCleanupQuerySent = MultiClientSendQuery(taskTracker->connectionId,
jobCleanupQuery->data); jobCleanupQuery->data);
@ -2832,7 +2831,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
nodeName, nodePort, (int) queryStatus), nodeName, nodePort, (int) queryStatus),
errhint("Manually clean job resources on node " errhint("Manually clean job resources on node "
"\"%s:%u\" by running \"%s\" ", nodeName, "\"%s:%u\" by running \"%s\" ", nodeName,
nodePort, TaskQueryStringForAllPlacements( nodePort, TaskQueryString(
jobCleanupTask)))); jobCleanupTask))));
} }
else else
@ -2851,7 +2850,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
nodePort, (int) resultStatus), nodePort, (int) resultStatus),
errhint("Manually clean job resources on node " errhint("Manually clean job resources on node "
"\"%s:%u\" by running \"%s\" ", nodeName, "\"%s:%u\" by running \"%s\" ", nodeName,
nodePort, TaskQueryStringForAllPlacements( nodePort, TaskQueryString(
jobCleanupTask)))); jobCleanupTask))));
} }
else else

View File

@ -450,8 +450,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
* connect to that node to drop the shard placement over that * connect to that node to drop the shard placement over that
* remote connection. * remote connection.
*/ */
const char *dropShardPlacementCommand = TaskQueryStringForAllPlacements( const char *dropShardPlacementCommand = TaskQueryString(task);
task);
ExecuteDropShardPlacementCommandRemotely(shardPlacement, ExecuteDropShardPlacementCommandRemotely(shardPlacement,
relationName, relationName,
dropShardPlacementCommand); dropShardPlacementCommand);

View File

@ -41,7 +41,6 @@ static void UpdateTaskQueryString(Query *query, Oid distributedTableId,
static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
static bool ShouldLazyDeparseQuery(Task *task); static bool ShouldLazyDeparseQuery(Task *task);
static char * DeparseTaskQuery(Task *task, Query *query); static char * DeparseTaskQuery(Task *task, Query *query);
static bool IsEachPlacementQueryStringDifferent(Task *task);
/* /*
@ -117,8 +116,7 @@ RebuildQueryStrings(Job *workerJob)
ereport(DEBUG4, (errmsg("query before rebuilding: %s", ereport(DEBUG4, (errmsg("query before rebuilding: %s",
!isQueryObjectOrText !isQueryObjectOrText
? "(null)" ? "(null)"
: ApplyLogRedaction(TaskQueryStringForAllPlacements( : ApplyLogRedaction(TaskQueryString(task)))));
task)))));
UpdateTaskQueryString(query, relationId, valuesRTE, task); UpdateTaskQueryString(query, relationId, valuesRTE, task);
@ -129,8 +127,7 @@ RebuildQueryStrings(Job *workerJob)
task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved; task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved;
ereport(DEBUG4, (errmsg("query after rebuilding: %s", ereport(DEBUG4, (errmsg("query after rebuilding: %s",
ApplyLogRedaction(TaskQueryStringForAllPlacements( ApplyLogRedaction(TaskQueryString(task)))));
task)))));
} }
} }
@ -464,19 +461,6 @@ SetTaskQueryString(Task *task, char *queryString)
} }
/*
* SetTaskPerPlacementQueryStrings set the perPlacementQueryString for the given task.
*/
void
SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList)
{
Assert(perPlacementQueryStringList != NIL);
task->taskQuery.queryType = TASK_QUERY_TEXT_PER_PLACEMENT;
task->taskQuery.data.perPlacementQueryStrings = perPlacementQueryStringList;
task->queryCount = 1;
}
/* /*
* SetTaskQueryStringList sets the queryStringList of the given task. * SetTaskQueryStringList sets the queryStringList of the given task.
*/ */
@ -530,14 +514,14 @@ GetTaskQueryType(Task *task)
/* /*
* TaskQueryStringForAllPlacements generates task query string text if missing. * TaskQueryString generates task query string text if missing.
* *
* For performance reasons, the queryString is generated lazily. For example * For performance reasons, the queryString is generated lazily. For example
* for local queries it is usually not needed to generate it, so this way we * for local queries it is usually not needed to generate it, so this way we
* can skip the expensive deparsing+parsing. * can skip the expensive deparsing+parsing.
*/ */
char * char *
TaskQueryStringForAllPlacements(Task *task) TaskQueryString(Task *task)
{ {
int taskQueryType = GetTaskQueryType(task); int taskQueryType = GetTaskQueryType(task);
if (taskQueryType == TASK_QUERY_NULL) if (taskQueryType == TASK_QUERY_NULL)
@ -562,8 +546,6 @@ TaskQueryStringForAllPlacements(Task *task)
/* /*
* At this point task query type should be TASK_QUERY_OBJECT. * At this point task query type should be TASK_QUERY_OBJECT.
* if someone calls this method inappropriately with TASK_QUERY_TEXT_PER_PLACEMENT case
* (instead of TaskQueryStringForPlacement), they will hit this assert.
*/ */
Assert(task->taskQuery.queryType == TASK_QUERY_OBJECT && Assert(task->taskQuery.queryType == TASK_QUERY_OBJECT &&
jobQueryReferenceForLazyDeparsing != NULL); jobQueryReferenceForLazyDeparsing != NULL);
@ -584,32 +566,3 @@ TaskQueryStringForAllPlacements(Task *task)
SetTaskQueryString(task, queryString); SetTaskQueryString(task, queryString);
return task->taskQuery.data.queryStringLazy; return task->taskQuery.data.queryStringLazy;
} }
/*
* TaskQueryStringForPlacement returns the query string that should be executed
* on the placement with the given placementIndex.
*/
char *
TaskQueryStringForPlacement(Task *task, int placementIndex)
{
if (IsEachPlacementQueryStringDifferent(task))
{
List *perPlacementQueryStringList =
task->taskQuery.data.perPlacementQueryStrings;
Assert(list_length(perPlacementQueryStringList) > placementIndex);
return list_nth(perPlacementQueryStringList, placementIndex);
}
return TaskQueryStringForAllPlacements(task);
}
/*
* IsEachPlacementQueryStringDifferent returns true if each placement
* has a different query string.
*/
static bool
IsEachPlacementQueryStringDifferent(Task *task)
{
return GetTaskQueryType(task) == TASK_QUERY_TEXT_PER_PLACEMENT;
}

View File

@ -606,9 +606,7 @@ FetchRemoteExplainFromWorkers(Task *task, ExplainState *es)
RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0( RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0(
sizeof(RemoteExplainPlan)); sizeof(RemoteExplainPlan));
StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements( StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryString(task), es);
task),
es);
/* /*
* Use a coordinated transaction to ensure that we open a transaction block * Use a coordinated transaction to ensure that we open a transaction block
@ -694,7 +692,7 @@ ExplainTask(CitusScanState *scanState, Task *task, int placementIndex,
if (es->verbose) if (es->verbose)
{ {
const char *queryText = TaskQueryStringForAllPlacements(task); const char *queryText = TaskQueryString(task);
ExplainPropertyText("Query", queryText, es); ExplainPropertyText("Query", queryText, es);
} }
@ -1312,7 +1310,7 @@ ExplainAnalyzeTaskList(List *originalTaskList,
} }
Task *explainAnalyzeTask = copyObject(originalTask); Task *explainAnalyzeTask = copyObject(originalTask);
const char *queryString = TaskQueryStringForAllPlacements(explainAnalyzeTask); const char *queryString = TaskQueryString(explainAnalyzeTask);
char *wrappedQuery = WrapQueryForExplainAnalyze(queryString, tupleDesc); char *wrappedQuery = WrapQueryForExplainAnalyze(queryString, tupleDesc);
char *fetchQuery = char *fetchQuery =
"SELECT explain_analyze_output FROM worker_last_saved_explain_analyze()"; "SELECT explain_analyze_output FROM worker_last_saved_explain_analyze()";

View File

@ -4534,7 +4534,7 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask,
/* wrap repartition query string around filter query string */ /* wrap repartition query string around filter query string */
StringInfo mapQueryString = makeStringInfo(); StringInfo mapQueryString = makeStringInfo();
char *filterQueryString = TaskQueryStringForAllPlacements(filterTask); char *filterQueryString = TaskQueryString(filterTask);
char *filterQueryEscapedText = quote_literal_cstr(filterQueryString); char *filterQueryEscapedText = quote_literal_cstr(filterQueryString);
PartitionType partitionType = mapMergeJob->partitionType; PartitionType partitionType = mapMergeJob->partitionType;

View File

@ -284,12 +284,6 @@ CopyTaskQuery(Task *newnode, Task *from)
break; break;
} }
case TASK_QUERY_TEXT_PER_PLACEMENT:
{
COPY_STRING_LIST(taskQuery.data.perPlacementQueryStrings);
break;
}
case TASK_QUERY_TEXT_LIST: case TASK_QUERY_TEXT_LIST:
{ {
COPY_STRING_LIST(taskQuery.data.queryStringList); COPY_STRING_LIST(taskQuery.data.queryStringList);

View File

@ -490,12 +490,6 @@ static void WriteTaskQuery(OUTFUNC_ARGS) {
break; break;
} }
case TASK_QUERY_TEXT_PER_PLACEMENT:
{
WRITE_NODE_FIELD(taskQuery.data.perPlacementQueryStrings);
break;
}
case TASK_QUERY_TEXT_LIST: case TASK_QUERY_TEXT_LIST:
{ {
WRITE_NODE_FIELD(taskQuery.data.queryStringList); WRITE_NODE_FIELD(taskQuery.data.queryStringList);

View File

@ -26,10 +26,7 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryString(Task *task, char *queryString);
extern void SetTaskQueryStringList(Task *task, List *queryStringList); extern void SetTaskQueryStringList(Task *task, List *queryStringList);
extern void SetTaskPerPlacementQueryStrings(Task *task, extern char * TaskQueryString(Task *task);
List *perPlacementQueryStringList);
extern char * TaskQueryStringForAllPlacements(Task *task);
extern char * TaskQueryStringForPlacement(Task *task, int placementIndex);
extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList); extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList);
extern int GetTaskQueryType(Task *task); extern int GetTaskQueryType(Task *task);

View File

@ -17,6 +17,7 @@
#include "distributed/citus_custom_scan.h" #include "distributed/citus_custom_scan.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/tuple_destination.h"
/* managed via guc.c */ /* managed via guc.c */
@ -87,11 +88,8 @@ typedef struct ExecutionParams
/* taskList contains the tasks for the execution.*/ /* taskList contains the tasks for the execution.*/
List *taskList; List *taskList;
/* tupleDescriptor contains the description for the result tuples.*/ /* where to forward each tuple received */
TupleDesc tupleDescriptor; TupleDestination *tupleDestination;
/* tupleStore is where the results will be stored for this execution */
Tuplestorestate *tupleStore;
/* expectResults is true if this execution will return some result. */ /* expectResults is true if this execution will return some result. */
bool expectResults; bool expectResults;
@ -120,9 +118,8 @@ ExecutionParams * CreateBasicExecutionParams(RowModifyLevel modLevel,
bool localExecutionSupported); bool localExecutionSupported);
extern uint64 ExecuteTaskListExtended(ExecutionParams *executionParams); extern uint64 ExecuteTaskListExtended(ExecutionParams *executionParams);
extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, extern uint64 ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, TupleDestination *tupleDest,
Tuplestorestate *tupleStore,
bool expectResults); bool expectResults);
extern bool IsCitusCustomState(PlanState *planState); extern bool IsCitusCustomState(PlanState *planState);
extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * CitusExecScan(CustomScanState *node);

View File

@ -208,7 +208,6 @@ typedef enum TaskQueryType
TASK_QUERY_NULL, TASK_QUERY_NULL,
TASK_QUERY_TEXT, TASK_QUERY_TEXT,
TASK_QUERY_OBJECT, TASK_QUERY_OBJECT,
TASK_QUERY_TEXT_PER_PLACEMENT,
TASK_QUERY_TEXT_LIST TASK_QUERY_TEXT_LIST
} TaskQueryType; } TaskQueryType;
@ -238,20 +237,15 @@ typedef struct TaskQuery
/* /*
* In almost all cases queryStringLazy should be read only indirectly by * In almost all cases queryStringLazy should be read only indirectly by
* using TaskQueryStringForAllPlacements(). This will populate the field if only the * using TaskQueryString(). This will populate the field if only the
* jobQueryReferenceForLazyDeparsing field is not NULL. * jobQueryReferenceForLazyDeparsing field is not NULL.
* *
* This field should only be set by using SetTaskQueryString() (or as a * This field should only be set by using SetTaskQueryString() (or as a
* side effect from TaskQueryStringForAllPlacements()). Otherwise it might not be in sync * side effect from TaskQueryString()). Otherwise it might not be in sync
* with jobQueryReferenceForLazyDeparsing. * with jobQueryReferenceForLazyDeparsing.
*/ */
char *queryStringLazy; char *queryStringLazy;
/*
* perPlacementQueryStrings is used when we have different query strings for each placement.
*/
List *perPlacementQueryStrings;
/* /*
* queryStringList contains query strings. They should be * queryStringList contains query strings. They should be
* run sequentially. The concatenated version of this list * run sequentially. The concatenated version of this list