mirror of https://github.com/citusdata/citus.git
Merge pull request #4037 from citusdata/remove_per_placement_query
Refactor: Use TupleDestination API for partitioning in insert/select.pull/3850/head^2
commit
339d43357c
|
@ -28,6 +28,7 @@
|
||||||
#include "distributed/adaptive_executor.h"
|
#include "distributed/adaptive_executor.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/shard_pruning.h"
|
#include "distributed/shard_pruning.h"
|
||||||
|
#include "distributed/tuple_destination.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_log_messages.h"
|
#include "distributed/worker_log_messages.h"
|
||||||
|
@ -193,8 +194,8 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
|
||||||
ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize,
|
ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize,
|
||||||
localExecutionSupported
|
localExecutionSupported
|
||||||
);
|
);
|
||||||
executionParams->tupleStore = tupleStore;
|
executionParams->tupleDestination = CreateTupleStoreTupleDest(tupleStore,
|
||||||
executionParams->tupleDescriptor = tupleDesc;
|
tupleDesc);
|
||||||
executionParams->expectResults = expectResults;
|
executionParams->expectResults = expectResults;
|
||||||
executionParams->xactProperties = xactProperties;
|
executionParams->xactProperties = xactProperties;
|
||||||
ExecuteTaskListExtended(executionParams);
|
ExecuteTaskListExtended(executionParams);
|
||||||
|
|
|
@ -930,6 +930,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList,
|
||||||
);
|
);
|
||||||
executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
|
executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
|
||||||
modLevel, taskList, false);
|
modLevel, taskList, false);
|
||||||
|
|
||||||
return ExecuteTaskListExtended(executionParams);
|
return ExecuteTaskListExtended(executionParams);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -939,8 +940,8 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList,
|
||||||
* for some of the arguments.
|
* for some of the arguments.
|
||||||
*/
|
*/
|
||||||
uint64
|
uint64
|
||||||
ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
|
ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList,
|
||||||
TupleDesc tupleDescriptor, Tuplestorestate *tupleStore,
|
TupleDestination *tupleDest,
|
||||||
bool expectResults)
|
bool expectResults)
|
||||||
{
|
{
|
||||||
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
||||||
|
@ -952,8 +953,7 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
|
||||||
executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
|
executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
|
||||||
modLevel, taskList, false);
|
modLevel, taskList, false);
|
||||||
executionParams->expectResults = expectResults;
|
executionParams->expectResults = expectResults;
|
||||||
executionParams->tupleStore = tupleStore;
|
executionParams->tupleDestination = tupleDest;
|
||||||
executionParams->tupleDescriptor = tupleDescriptor;
|
|
||||||
|
|
||||||
return ExecuteTaskListExtended(executionParams);
|
return ExecuteTaskListExtended(executionParams);
|
||||||
}
|
}
|
||||||
|
@ -971,16 +971,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
|
||||||
List *localTaskList = NIL;
|
List *localTaskList = NIL;
|
||||||
List *remoteTaskList = NIL;
|
List *remoteTaskList = NIL;
|
||||||
|
|
||||||
TupleDestination *defaultTupleDest = NULL;
|
TupleDestination *defaultTupleDest = executionParams->tupleDestination;
|
||||||
if (executionParams->tupleDescriptor != NULL)
|
|
||||||
{
|
|
||||||
defaultTupleDest = CreateTupleStoreTupleDest(executionParams->tupleStore,
|
|
||||||
executionParams->tupleDescriptor);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
defaultTupleDest = CreateTupleDestNone();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (executionParams->localExecutionSupported && ShouldExecuteTasksLocally(
|
if (executionParams->localExecutionSupported && ShouldExecuteTasksLocally(
|
||||||
executionParams->taskList))
|
executionParams->taskList))
|
||||||
|
@ -1052,8 +1043,7 @@ CreateBasicExecutionParams(RowModifyLevel modLevel,
|
||||||
executionParams->targetPoolSize = targetPoolSize;
|
executionParams->targetPoolSize = targetPoolSize;
|
||||||
executionParams->localExecutionSupported = localExecutionSupported;
|
executionParams->localExecutionSupported = localExecutionSupported;
|
||||||
|
|
||||||
executionParams->tupleStore = NULL;
|
executionParams->tupleDestination = CreateTupleDestNone();
|
||||||
executionParams->tupleDescriptor = NULL;
|
|
||||||
executionParams->expectResults = false;
|
executionParams->expectResults = false;
|
||||||
executionParams->isUtilityCommand = false;
|
executionParams->isUtilityCommand = false;
|
||||||
executionParams->jobIdList = NIL;
|
executionParams->jobIdList = NIL;
|
||||||
|
@ -3495,9 +3485,7 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
|
||||||
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
|
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
|
||||||
int querySent = 0;
|
int querySent = 0;
|
||||||
|
|
||||||
char *queryString = TaskQueryStringForPlacement(task,
|
char *queryString = TaskQueryString(task);
|
||||||
placementExecution->
|
|
||||||
placementExecutionIndex);
|
|
||||||
|
|
||||||
if (execution->transactionProperties->useRemoteTransactionBlocks !=
|
if (execution->transactionProperties->useRemoteTransactionBlocks !=
|
||||||
TRANSACTION_BLOCKS_DISALLOWED)
|
TRANSACTION_BLOCKS_DISALLOWED)
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "port.h"
|
#include "port.h"
|
||||||
|
|
||||||
|
#include "access/htup_details.h"
|
||||||
#include "access/tupdesc.h"
|
#include "access/tupdesc.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "distributed/deparse_shard_query.h"
|
#include "distributed/deparse_shard_query.h"
|
||||||
|
@ -28,6 +29,7 @@
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
|
#include "distributed/tuple_destination.h"
|
||||||
#include "distributed/tuplestore.h"
|
#include "distributed/tuplestore.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "tcop/pquery.h"
|
#include "tcop/pquery.h"
|
||||||
|
@ -36,6 +38,24 @@
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PartitioningTupleDest is internal representation of a TupleDestination
|
||||||
|
* which consumes queries constructed in WrapTasksForPartitioning.
|
||||||
|
*/
|
||||||
|
typedef struct PartitioningTupleDest
|
||||||
|
{
|
||||||
|
TupleDestination pub;
|
||||||
|
|
||||||
|
CitusTableCacheEntry *targetRelation;
|
||||||
|
|
||||||
|
/* list of DistributedResultFragment pointer */
|
||||||
|
List *fragmentList;
|
||||||
|
|
||||||
|
/* what do tuples look like */
|
||||||
|
TupleDesc tupleDesc;
|
||||||
|
} PartitioningTupleDest;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NodePair contains the source and destination node in a NodeToNodeFragmentsTransfer.
|
* NodePair contains the source and destination node in a NodeToNodeFragmentsTransfer.
|
||||||
* It is a separate struct to use it as a key in a hash table.
|
* It is a separate struct to use it as a key in a hash table.
|
||||||
|
@ -59,22 +79,33 @@ typedef struct NodeToNodeFragmentsTransfer
|
||||||
|
|
||||||
|
|
||||||
/* forward declarations of local functions */
|
/* forward declarations of local functions */
|
||||||
static void WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList,
|
static List * WrapTasksForPartitioning(const char *resultIdPrefix,
|
||||||
|
List *selectTaskList,
|
||||||
int partitionColumnIndex,
|
int partitionColumnIndex,
|
||||||
CitusTableCacheEntry *targetRelation,
|
CitusTableCacheEntry *targetRelation,
|
||||||
bool binaryFormat);
|
bool binaryFormat);
|
||||||
static List * ExecutePartitionTaskList(List *partitionTaskList,
|
static List * ExecutePartitionTaskList(List *partitionTaskList,
|
||||||
CitusTableCacheEntry *targetRelation);
|
CitusTableCacheEntry *targetRelation);
|
||||||
|
static PartitioningTupleDest * CreatePartitioningTupleDest(
|
||||||
|
CitusTableCacheEntry *targetRelation);
|
||||||
|
static void PartitioningTupleDestPutTuple(TupleDestination *self, Task *task,
|
||||||
|
int placementIndex, int queryNumber,
|
||||||
|
HeapTuple heapTuple, uint64 tupleLibpqSize);
|
||||||
|
static TupleDesc PartitioningTupleDestTupleDescForQuery(TupleDestination *self, int
|
||||||
|
queryNumber);
|
||||||
static ArrayType * CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int
|
static ArrayType * CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int
|
||||||
datumCount, Oid typeId);
|
datumCount, Oid typeId);
|
||||||
static void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount,
|
static void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount,
|
||||||
Oid intervalTypeId, ArrayType **minValueArray,
|
Oid intervalTypeId, ArrayType **minValueArray,
|
||||||
ArrayType **maxValueArray);
|
ArrayType **maxValueArray);
|
||||||
static char * SourceShardPrefix(const char *resultPrefix, uint64 shardId);
|
static char * SourceShardPrefix(const char *resultPrefix, uint64 shardId);
|
||||||
static DistributedResultFragment * TupleToDistributedResultFragment(
|
static DistributedResultFragment * TupleToDistributedResultFragment(HeapTuple heapTuple,
|
||||||
TupleTableSlot *tupleSlot, CitusTableCacheEntry *targetRelation);
|
TupleDesc tupleDesc,
|
||||||
static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList,
|
CitusTableCacheEntry *
|
||||||
TupleDesc resultDescriptor,
|
targetRelation,
|
||||||
|
uint32 sourceNodeId);
|
||||||
|
static void ExecuteSelectTasksIntoTupleDest(List *taskList,
|
||||||
|
TupleDestination *tupleDestination,
|
||||||
bool errorOnAnyFailure);
|
bool errorOnAnyFailure);
|
||||||
static List ** ColocateFragmentsWithRelation(List *fragmentList,
|
static List ** ColocateFragmentsWithRelation(List *fragmentList,
|
||||||
CitusTableCacheEntry *targetRelation);
|
CitusTableCacheEntry *targetRelation);
|
||||||
|
@ -157,7 +188,7 @@ PartitionTasklistResults(const char *resultIdPrefix, List *selectTaskList,
|
||||||
*/
|
*/
|
||||||
UseCoordinatedTransaction();
|
UseCoordinatedTransaction();
|
||||||
|
|
||||||
WrapTasksForPartitioning(resultIdPrefix, selectTaskList,
|
selectTaskList = WrapTasksForPartitioning(resultIdPrefix, selectTaskList,
|
||||||
partitionColumnIndex, targetRelation,
|
partitionColumnIndex, targetRelation,
|
||||||
binaryFormat);
|
binaryFormat);
|
||||||
return ExecutePartitionTaskList(selectTaskList, targetRelation);
|
return ExecutePartitionTaskList(selectTaskList, targetRelation);
|
||||||
|
@ -169,12 +200,13 @@ PartitionTasklistResults(const char *resultIdPrefix, List *selectTaskList,
|
||||||
* to worker_partition_query_result(). Target list of the wrapped query should
|
* to worker_partition_query_result(). Target list of the wrapped query should
|
||||||
* match the tuple descriptor in ExecutePartitionTaskList().
|
* match the tuple descriptor in ExecutePartitionTaskList().
|
||||||
*/
|
*/
|
||||||
static void
|
static List *
|
||||||
WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList,
|
WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList,
|
||||||
int partitionColumnIndex,
|
int partitionColumnIndex,
|
||||||
CitusTableCacheEntry *targetRelation,
|
CitusTableCacheEntry *targetRelation,
|
||||||
bool binaryFormat)
|
bool binaryFormat)
|
||||||
{
|
{
|
||||||
|
List *wrappedTaskList = NIL;
|
||||||
ShardInterval **shardIntervalArray = targetRelation->sortedShardIntervalArray;
|
ShardInterval **shardIntervalArray = targetRelation->sortedShardIntervalArray;
|
||||||
int shardCount = targetRelation->shardIntervalArrayLength;
|
int shardCount = targetRelation->shardIntervalArrayLength;
|
||||||
|
|
||||||
|
@ -200,41 +232,105 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList,
|
||||||
Task *selectTask = NULL;
|
Task *selectTask = NULL;
|
||||||
foreach_ptr(selectTask, selectTaskList)
|
foreach_ptr(selectTask, selectTaskList)
|
||||||
{
|
{
|
||||||
List *shardPlacementList = selectTask->taskPlacementList;
|
|
||||||
char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId);
|
char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId);
|
||||||
char *partitionMethodString = targetRelation->partitionMethod == 'h' ?
|
char *partitionMethodString = targetRelation->partitionMethod == 'h' ?
|
||||||
"hash" : "range";
|
"hash" : "range";
|
||||||
const char *binaryFormatString = binaryFormat ? "true" : "false";
|
const char *binaryFormatString = binaryFormat ? "true" : "false";
|
||||||
List *perPlacementQueries = NIL;
|
List *perPlacementQueries = NIL;
|
||||||
|
|
||||||
/*
|
Task *wrappedSelectTask = copyObject(selectTask);
|
||||||
* We need to know which placement could successfully execute the query,
|
|
||||||
* so we form a different query per placement, each of which returning
|
|
||||||
* the node id of the placement.
|
|
||||||
*/
|
|
||||||
ShardPlacement *shardPlacement = NULL;
|
|
||||||
foreach_ptr(shardPlacement, shardPlacementList)
|
|
||||||
{
|
|
||||||
StringInfo wrappedQuery = makeStringInfo();
|
StringInfo wrappedQuery = makeStringInfo();
|
||||||
appendStringInfo(wrappedQuery,
|
appendStringInfo(wrappedQuery,
|
||||||
"SELECT %u::int, partition_index"
|
"SELECT partition_index"
|
||||||
", %s || '_' || partition_index::text "
|
", %s || '_' || partition_index::text "
|
||||||
", rows_written "
|
", rows_written "
|
||||||
"FROM worker_partition_query_result"
|
"FROM worker_partition_query_result"
|
||||||
"(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0",
|
"(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0",
|
||||||
shardPlacement->nodeId,
|
|
||||||
quote_literal_cstr(taskPrefix),
|
quote_literal_cstr(taskPrefix),
|
||||||
quote_literal_cstr(taskPrefix),
|
quote_literal_cstr(taskPrefix),
|
||||||
quote_literal_cstr(TaskQueryStringForAllPlacements(
|
quote_literal_cstr(TaskQueryString(selectTask)),
|
||||||
selectTask)),
|
|
||||||
partitionColumnIndex,
|
partitionColumnIndex,
|
||||||
quote_literal_cstr(partitionMethodString),
|
quote_literal_cstr(partitionMethodString),
|
||||||
minValuesString->data, maxValuesString->data,
|
minValuesString->data, maxValuesString->data,
|
||||||
binaryFormatString);
|
binaryFormatString);
|
||||||
perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data);
|
perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data);
|
||||||
|
|
||||||
|
SetTaskQueryString(wrappedSelectTask, wrappedQuery->data);
|
||||||
|
wrappedTaskList = lappend(wrappedTaskList, wrappedSelectTask);
|
||||||
}
|
}
|
||||||
SetTaskPerPlacementQueryStrings(selectTask, perPlacementQueries);
|
|
||||||
}
|
return wrappedTaskList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreatePartitioningTupleDest creates a TupleDestination which consumes results of
|
||||||
|
* tasks constructed in WrapTasksForPartitioning.
|
||||||
|
*/
|
||||||
|
static PartitioningTupleDest *
|
||||||
|
CreatePartitioningTupleDest(CitusTableCacheEntry *targetRelation)
|
||||||
|
{
|
||||||
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
int resultColumnCount = 3;
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_12
|
||||||
|
tupleDescriptor = CreateTemplateTupleDesc(resultColumnCount);
|
||||||
|
#else
|
||||||
|
tupleDescriptor = CreateTemplateTupleDesc(resultColumnCount, false);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
TupleDescInitEntry(tupleDescriptor, (AttrNumber) 1, "partition_index",
|
||||||
|
INT4OID, -1, 0);
|
||||||
|
TupleDescInitEntry(tupleDescriptor, (AttrNumber) 2, "result_id",
|
||||||
|
TEXTOID, -1, 0);
|
||||||
|
TupleDescInitEntry(tupleDescriptor, (AttrNumber) 3, "rows_written",
|
||||||
|
INT8OID, -1, 0);
|
||||||
|
|
||||||
|
|
||||||
|
PartitioningTupleDest *tupleDest = palloc0(sizeof(PartitioningTupleDest));
|
||||||
|
tupleDest->targetRelation = targetRelation;
|
||||||
|
tupleDest->tupleDesc = tupleDescriptor;
|
||||||
|
tupleDest->pub.putTuple = PartitioningTupleDestPutTuple;
|
||||||
|
tupleDest->pub.tupleDescForQuery =
|
||||||
|
PartitioningTupleDestTupleDescForQuery;
|
||||||
|
|
||||||
|
return tupleDest;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PartitioningTupleDestTupleDescForQuery implements TupleDestination->putTuple for
|
||||||
|
* PartitioningTupleDest.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
PartitioningTupleDestPutTuple(TupleDestination *self, Task *task,
|
||||||
|
int placementIndex, int queryNumber,
|
||||||
|
HeapTuple heapTuple, uint64 tupleLibpqSize)
|
||||||
|
{
|
||||||
|
PartitioningTupleDest *tupleDest = (PartitioningTupleDest *) self;
|
||||||
|
ShardPlacement *placement = list_nth(task->taskPlacementList, placementIndex);
|
||||||
|
|
||||||
|
DistributedResultFragment *fragment =
|
||||||
|
TupleToDistributedResultFragment(heapTuple, tupleDest->tupleDesc,
|
||||||
|
tupleDest->targetRelation,
|
||||||
|
placement->nodeId);
|
||||||
|
tupleDest->fragmentList = lappend(tupleDest->fragmentList, fragment);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PartitioningTupleDestTupleDescForQuery implements TupleDestination->TupleDescForQuery
|
||||||
|
* for PartitioningTupleDest.
|
||||||
|
*/
|
||||||
|
static TupleDesc
|
||||||
|
PartitioningTupleDestTupleDescForQuery(TupleDestination *self, int queryNumber)
|
||||||
|
{
|
||||||
|
Assert(queryNumber == 0);
|
||||||
|
|
||||||
|
PartitioningTupleDest *tupleDest = (PartitioningTupleDest *) self;
|
||||||
|
|
||||||
|
return tupleDest->tupleDesc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -323,43 +419,13 @@ CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int datumCount, Oid t
|
||||||
static List *
|
static List *
|
||||||
ExecutePartitionTaskList(List *taskList, CitusTableCacheEntry *targetRelation)
|
ExecutePartitionTaskList(List *taskList, CitusTableCacheEntry *targetRelation)
|
||||||
{
|
{
|
||||||
TupleDesc resultDescriptor = NULL;
|
PartitioningTupleDest *tupleDest = CreatePartitioningTupleDest(targetRelation);
|
||||||
Tuplestorestate *resultStore = NULL;
|
|
||||||
int resultColumnCount = 4;
|
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_12
|
|
||||||
resultDescriptor = CreateTemplateTupleDesc(resultColumnCount);
|
|
||||||
#else
|
|
||||||
resultDescriptor = CreateTemplateTupleDesc(resultColumnCount, false);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "node_id",
|
|
||||||
INT4OID, -1, 0);
|
|
||||||
TupleDescInitEntry(resultDescriptor, (AttrNumber) 2, "partition_index",
|
|
||||||
INT4OID, -1, 0);
|
|
||||||
TupleDescInitEntry(resultDescriptor, (AttrNumber) 3, "result_id",
|
|
||||||
TEXTOID, -1, 0);
|
|
||||||
TupleDescInitEntry(resultDescriptor, (AttrNumber) 4, "rows_written",
|
|
||||||
INT8OID, -1, 0);
|
|
||||||
|
|
||||||
bool errorOnAnyFailure = false;
|
bool errorOnAnyFailure = false;
|
||||||
resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor,
|
ExecuteSelectTasksIntoTupleDest(taskList, (TupleDestination *) tupleDest,
|
||||||
errorOnAnyFailure);
|
errorOnAnyFailure);
|
||||||
|
|
||||||
List *fragmentList = NIL;
|
return tupleDest->fragmentList;
|
||||||
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor,
|
|
||||||
&TTSOpsMinimalTuple);
|
|
||||||
while (tuplestore_gettupleslot(resultStore, true, false, slot))
|
|
||||||
{
|
|
||||||
DistributedResultFragment *distributedResultFragment =
|
|
||||||
TupleToDistributedResultFragment(slot, targetRelation);
|
|
||||||
|
|
||||||
fragmentList = lappend(fragmentList, distributedResultFragment);
|
|
||||||
|
|
||||||
ExecClearTuple(slot);
|
|
||||||
}
|
|
||||||
|
|
||||||
return fragmentList;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -368,14 +434,15 @@ ExecutePartitionTaskList(List *taskList, CitusTableCacheEntry *targetRelation)
|
||||||
* WrapTasksForPartitioning() to a DistributedResultFragment.
|
* WrapTasksForPartitioning() to a DistributedResultFragment.
|
||||||
*/
|
*/
|
||||||
static DistributedResultFragment *
|
static DistributedResultFragment *
|
||||||
TupleToDistributedResultFragment(TupleTableSlot *tupleSlot,
|
TupleToDistributedResultFragment(HeapTuple tuple,
|
||||||
CitusTableCacheEntry *targetRelation)
|
TupleDesc tupleDesc,
|
||||||
|
CitusTableCacheEntry *targetRelation,
|
||||||
|
uint32 sourceNodeId)
|
||||||
{
|
{
|
||||||
bool isNull = false;
|
bool isNull = false;
|
||||||
uint32 sourceNodeId = DatumGetUInt32(slot_getattr(tupleSlot, 1, &isNull));
|
uint32 targetShardIndex = DatumGetUInt32(heap_getattr(tuple, 1, tupleDesc, &isNull));
|
||||||
uint32 targetShardIndex = DatumGetUInt32(slot_getattr(tupleSlot, 2, &isNull));
|
text *resultId = DatumGetTextP(heap_getattr(tuple, 2, tupleDesc, &isNull));
|
||||||
text *resultId = DatumGetTextP(slot_getattr(tupleSlot, 3, &isNull));
|
int64 rowCount = DatumGetInt64(heap_getattr(tuple, 3, tupleDesc, &isNull));
|
||||||
int64 rowCount = DatumGetInt64(slot_getattr(tupleSlot, 4, &isNull));
|
|
||||||
|
|
||||||
Assert(targetShardIndex < targetRelation->shardIntervalArrayLength);
|
Assert(targetShardIndex < targetRelation->shardIntervalArrayLength);
|
||||||
ShardInterval *shardInterval =
|
ShardInterval *shardInterval =
|
||||||
|
@ -395,26 +462,21 @@ TupleToDistributedResultFragment(TupleTableSlot *tupleSlot,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExecuteSelectTasksIntoTupleStore executes the given tasks and returns a tuple
|
* ExecuteSelectTasksIntoTupleDest executes the given tasks and forwards its result
|
||||||
* store containing its results.
|
* to the given destination.
|
||||||
*/
|
*/
|
||||||
static Tuplestorestate *
|
static void
|
||||||
ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor,
|
ExecuteSelectTasksIntoTupleDest(List *taskList, TupleDestination *tupleDestination,
|
||||||
bool errorOnAnyFailure)
|
bool errorOnAnyFailure)
|
||||||
{
|
{
|
||||||
bool expectResults = true;
|
bool expectResults = true;
|
||||||
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
||||||
bool randomAccess = true;
|
|
||||||
bool interTransactions = false;
|
|
||||||
TransactionProperties xactProperties = {
|
TransactionProperties xactProperties = {
|
||||||
.errorOnAnyFailure = errorOnAnyFailure,
|
.errorOnAnyFailure = errorOnAnyFailure,
|
||||||
.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED,
|
.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED,
|
||||||
.requires2PC = false
|
.requires2PC = false
|
||||||
};
|
};
|
||||||
|
|
||||||
Tuplestorestate *resultStore = tuplestore_begin_heap(randomAccess, interTransactions,
|
|
||||||
work_mem);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Local execution is not supported because here we use perPlacementQueryStrings.
|
* Local execution is not supported because here we use perPlacementQueryStrings.
|
||||||
* Local execution does not know how to handle it. One solution is to extract and set
|
* Local execution does not know how to handle it. One solution is to extract and set
|
||||||
|
@ -425,14 +487,11 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor,
|
||||||
ExecutionParams *executionParams = CreateBasicExecutionParams(
|
ExecutionParams *executionParams = CreateBasicExecutionParams(
|
||||||
ROW_MODIFY_READONLY, taskList, targetPoolSize, localExecutionSupported
|
ROW_MODIFY_READONLY, taskList, targetPoolSize, localExecutionSupported
|
||||||
);
|
);
|
||||||
executionParams->tupleDescriptor = resultDescriptor;
|
executionParams->tupleDestination = tupleDestination;
|
||||||
executionParams->tupleStore = resultStore;
|
|
||||||
executionParams->xactProperties = xactProperties;
|
executionParams->xactProperties = xactProperties;
|
||||||
executionParams->expectResults = expectResults;
|
executionParams->expectResults = expectResults;
|
||||||
|
|
||||||
ExecuteTaskListExtended(executionParams);
|
ExecuteTaskListExtended(executionParams);
|
||||||
|
|
||||||
return resultStore;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -622,7 +681,6 @@ static void
|
||||||
ExecuteFetchTaskList(List *taskList)
|
ExecuteFetchTaskList(List *taskList)
|
||||||
{
|
{
|
||||||
TupleDesc resultDescriptor = NULL;
|
TupleDesc resultDescriptor = NULL;
|
||||||
Tuplestorestate *resultStore = NULL;
|
|
||||||
int resultColumnCount = 1;
|
int resultColumnCount = 1;
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_12
|
#if PG_VERSION_NUM >= PG_VERSION_12
|
||||||
|
@ -633,15 +691,8 @@ ExecuteFetchTaskList(List *taskList)
|
||||||
|
|
||||||
TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "byte_count", INT8OID, -1, 0);
|
TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "byte_count", INT8OID, -1, 0);
|
||||||
|
|
||||||
|
TupleDestination *tupleDestination = CreateTupleDestNone();
|
||||||
|
|
||||||
bool errorOnAnyFailure = true;
|
bool errorOnAnyFailure = true;
|
||||||
resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor,
|
ExecuteSelectTasksIntoTupleDest(taskList, tupleDestination, errorOnAnyFailure);
|
||||||
errorOnAnyFailure);
|
|
||||||
|
|
||||||
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor,
|
|
||||||
&TTSOpsMinimalTuple);
|
|
||||||
|
|
||||||
while (tuplestore_gettupleslot(resultStore, true, false, slot))
|
|
||||||
{
|
|
||||||
ExecClearTuple(slot);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -200,9 +200,10 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||||
scanState->tuplestorestate =
|
scanState->tuplestorestate =
|
||||||
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
||||||
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
||||||
uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE,
|
TupleDestination *tupleDest = CreateTupleStoreTupleDest(
|
||||||
taskList, tupleDescriptor,
|
scanState->tuplestorestate, tupleDescriptor);
|
||||||
scanState->tuplestorestate,
|
uint64 rowsInserted = ExecuteTaskListIntoTupleDest(ROW_MODIFY_COMMUTATIVE,
|
||||||
|
taskList, tupleDest,
|
||||||
hasReturning);
|
hasReturning);
|
||||||
|
|
||||||
executorState->es_processed = rowsInserted;
|
executorState->es_processed = rowsInserted;
|
||||||
|
@ -262,9 +263,11 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||||
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
||||||
|
|
||||||
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
||||||
ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList,
|
TupleDestination *tupleDest = CreateTupleStoreTupleDest(
|
||||||
tupleDescriptor, scanState->tuplestorestate,
|
scanState->tuplestorestate, tupleDescriptor);
|
||||||
hasReturning);
|
|
||||||
|
ExecuteTaskListIntoTupleDest(ROW_MODIFY_COMMUTATIVE, prunedTaskList,
|
||||||
|
tupleDest, hasReturning);
|
||||||
|
|
||||||
if (SortReturning && hasReturning)
|
if (SortReturning && hasReturning)
|
||||||
{
|
{
|
||||||
|
@ -832,7 +835,7 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
|
||||||
StringInfo wrappedQuery = makeStringInfo();
|
StringInfo wrappedQuery = makeStringInfo();
|
||||||
appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery",
|
appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery",
|
||||||
projectedColumnsString->data,
|
projectedColumnsString->data,
|
||||||
TaskQueryStringForAllPlacements(task));
|
TaskQueryString(task));
|
||||||
SetTaskQueryString(task, wrappedQuery->data);
|
SetTaskQueryString(task, wrappedQuery->data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -228,7 +228,7 @@ ExecuteLocalTaskListExtended(List *taskList,
|
||||||
|
|
||||||
if (isUtilityCommand)
|
if (isUtilityCommand)
|
||||||
{
|
{
|
||||||
LocallyExecuteUtilityTask(TaskQueryStringForAllPlacements(task));
|
LocallyExecuteUtilityTask(TaskQueryString(task));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -280,7 +280,7 @@ ExecuteLocalTaskListExtended(List *taskList,
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
Query *shardQuery = ParseQueryString(TaskQueryStringForAllPlacements(task),
|
Query *shardQuery = ParseQueryString(TaskQueryString(task),
|
||||||
taskParameterTypes,
|
taskParameterTypes,
|
||||||
taskNumParams);
|
taskNumParams);
|
||||||
|
|
||||||
|
@ -301,7 +301,7 @@ ExecuteLocalTaskListExtended(List *taskList,
|
||||||
char *shardQueryString = NULL;
|
char *shardQueryString = NULL;
|
||||||
if (GetTaskQueryType(task) == TASK_QUERY_TEXT)
|
if (GetTaskQueryType(task) == TASK_QUERY_TEXT)
|
||||||
{
|
{
|
||||||
shardQueryString = TaskQueryStringForAllPlacements(task);
|
shardQueryString = TaskQueryString(task);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -431,7 +431,7 @@ LogLocalCommand(Task *task)
|
||||||
}
|
}
|
||||||
|
|
||||||
ereport(NOTICE, (errmsg("executing the command locally: %s",
|
ereport(NOTICE, (errmsg("executing the command locally: %s",
|
||||||
ApplyLogRedaction(TaskQueryStringForAllPlacements(task)))));
|
ApplyLogRedaction(TaskQueryString(task)))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1579,8 +1579,7 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
StringInfo sqlTaskQueryString = makeStringInfo();
|
StringInfo sqlTaskQueryString = makeStringInfo();
|
||||||
char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringForAllPlacements(
|
char *escapedTaskQueryString = quote_literal_cstr(TaskQueryString(task));
|
||||||
task));
|
|
||||||
|
|
||||||
if (BinaryMasterCopyFormat)
|
if (BinaryMasterCopyFormat)
|
||||||
{
|
{
|
||||||
|
@ -1616,7 +1615,7 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task)
|
||||||
|
|
||||||
/* wrap a task assignment query outside the original query */
|
/* wrap a task assignment query outside the original query */
|
||||||
StringInfo taskAssignmentQuery =
|
StringInfo taskAssignmentQuery =
|
||||||
TaskAssignmentQuery(task, TaskQueryStringForAllPlacements(task));
|
TaskAssignmentQuery(task, TaskQueryString(task));
|
||||||
|
|
||||||
TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId,
|
TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId,
|
||||||
task->taskId);
|
task->taskId);
|
||||||
|
@ -2753,7 +2752,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
||||||
{
|
{
|
||||||
/* assign through task tracker to manage resource utilization */
|
/* assign through task tracker to manage resource utilization */
|
||||||
StringInfo jobCleanupQuery = TaskAssignmentQuery(
|
StringInfo jobCleanupQuery = TaskAssignmentQuery(
|
||||||
jobCleanupTask, TaskQueryStringForAllPlacements(jobCleanupTask));
|
jobCleanupTask, TaskQueryString(jobCleanupTask));
|
||||||
|
|
||||||
jobCleanupQuerySent = MultiClientSendQuery(taskTracker->connectionId,
|
jobCleanupQuerySent = MultiClientSendQuery(taskTracker->connectionId,
|
||||||
jobCleanupQuery->data);
|
jobCleanupQuery->data);
|
||||||
|
@ -2832,7 +2831,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
||||||
nodeName, nodePort, (int) queryStatus),
|
nodeName, nodePort, (int) queryStatus),
|
||||||
errhint("Manually clean job resources on node "
|
errhint("Manually clean job resources on node "
|
||||||
"\"%s:%u\" by running \"%s\" ", nodeName,
|
"\"%s:%u\" by running \"%s\" ", nodeName,
|
||||||
nodePort, TaskQueryStringForAllPlacements(
|
nodePort, TaskQueryString(
|
||||||
jobCleanupTask))));
|
jobCleanupTask))));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -2851,7 +2850,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
|
||||||
nodePort, (int) resultStatus),
|
nodePort, (int) resultStatus),
|
||||||
errhint("Manually clean job resources on node "
|
errhint("Manually clean job resources on node "
|
||||||
"\"%s:%u\" by running \"%s\" ", nodeName,
|
"\"%s:%u\" by running \"%s\" ", nodeName,
|
||||||
nodePort, TaskQueryStringForAllPlacements(
|
nodePort, TaskQueryString(
|
||||||
jobCleanupTask))));
|
jobCleanupTask))));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
|
@ -450,8 +450,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
|
||||||
* connect to that node to drop the shard placement over that
|
* connect to that node to drop the shard placement over that
|
||||||
* remote connection.
|
* remote connection.
|
||||||
*/
|
*/
|
||||||
const char *dropShardPlacementCommand = TaskQueryStringForAllPlacements(
|
const char *dropShardPlacementCommand = TaskQueryString(task);
|
||||||
task);
|
|
||||||
ExecuteDropShardPlacementCommandRemotely(shardPlacement,
|
ExecuteDropShardPlacementCommandRemotely(shardPlacement,
|
||||||
relationName,
|
relationName,
|
||||||
dropShardPlacementCommand);
|
dropShardPlacementCommand);
|
||||||
|
|
|
@ -41,7 +41,6 @@ static void UpdateTaskQueryString(Query *query, Oid distributedTableId,
|
||||||
static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
|
static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
|
||||||
static bool ShouldLazyDeparseQuery(Task *task);
|
static bool ShouldLazyDeparseQuery(Task *task);
|
||||||
static char * DeparseTaskQuery(Task *task, Query *query);
|
static char * DeparseTaskQuery(Task *task, Query *query);
|
||||||
static bool IsEachPlacementQueryStringDifferent(Task *task);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -117,8 +116,7 @@ RebuildQueryStrings(Job *workerJob)
|
||||||
ereport(DEBUG4, (errmsg("query before rebuilding: %s",
|
ereport(DEBUG4, (errmsg("query before rebuilding: %s",
|
||||||
!isQueryObjectOrText
|
!isQueryObjectOrText
|
||||||
? "(null)"
|
? "(null)"
|
||||||
: ApplyLogRedaction(TaskQueryStringForAllPlacements(
|
: ApplyLogRedaction(TaskQueryString(task)))));
|
||||||
task)))));
|
|
||||||
|
|
||||||
UpdateTaskQueryString(query, relationId, valuesRTE, task);
|
UpdateTaskQueryString(query, relationId, valuesRTE, task);
|
||||||
|
|
||||||
|
@ -129,8 +127,7 @@ RebuildQueryStrings(Job *workerJob)
|
||||||
task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved;
|
task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved;
|
||||||
|
|
||||||
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
|
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
|
||||||
ApplyLogRedaction(TaskQueryStringForAllPlacements(
|
ApplyLogRedaction(TaskQueryString(task)))));
|
||||||
task)))));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -464,19 +461,6 @@ SetTaskQueryString(Task *task, char *queryString)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* SetTaskPerPlacementQueryStrings set the perPlacementQueryString for the given task.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList)
|
|
||||||
{
|
|
||||||
Assert(perPlacementQueryStringList != NIL);
|
|
||||||
task->taskQuery.queryType = TASK_QUERY_TEXT_PER_PLACEMENT;
|
|
||||||
task->taskQuery.data.perPlacementQueryStrings = perPlacementQueryStringList;
|
|
||||||
task->queryCount = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SetTaskQueryStringList sets the queryStringList of the given task.
|
* SetTaskQueryStringList sets the queryStringList of the given task.
|
||||||
*/
|
*/
|
||||||
|
@ -530,14 +514,14 @@ GetTaskQueryType(Task *task)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TaskQueryStringForAllPlacements generates task query string text if missing.
|
* TaskQueryString generates task query string text if missing.
|
||||||
*
|
*
|
||||||
* For performance reasons, the queryString is generated lazily. For example
|
* For performance reasons, the queryString is generated lazily. For example
|
||||||
* for local queries it is usually not needed to generate it, so this way we
|
* for local queries it is usually not needed to generate it, so this way we
|
||||||
* can skip the expensive deparsing+parsing.
|
* can skip the expensive deparsing+parsing.
|
||||||
*/
|
*/
|
||||||
char *
|
char *
|
||||||
TaskQueryStringForAllPlacements(Task *task)
|
TaskQueryString(Task *task)
|
||||||
{
|
{
|
||||||
int taskQueryType = GetTaskQueryType(task);
|
int taskQueryType = GetTaskQueryType(task);
|
||||||
if (taskQueryType == TASK_QUERY_NULL)
|
if (taskQueryType == TASK_QUERY_NULL)
|
||||||
|
@ -562,8 +546,6 @@ TaskQueryStringForAllPlacements(Task *task)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* At this point task query type should be TASK_QUERY_OBJECT.
|
* At this point task query type should be TASK_QUERY_OBJECT.
|
||||||
* if someone calls this method inappropriately with TASK_QUERY_TEXT_PER_PLACEMENT case
|
|
||||||
* (instead of TaskQueryStringForPlacement), they will hit this assert.
|
|
||||||
*/
|
*/
|
||||||
Assert(task->taskQuery.queryType == TASK_QUERY_OBJECT &&
|
Assert(task->taskQuery.queryType == TASK_QUERY_OBJECT &&
|
||||||
jobQueryReferenceForLazyDeparsing != NULL);
|
jobQueryReferenceForLazyDeparsing != NULL);
|
||||||
|
@ -584,32 +566,3 @@ TaskQueryStringForAllPlacements(Task *task)
|
||||||
SetTaskQueryString(task, queryString);
|
SetTaskQueryString(task, queryString);
|
||||||
return task->taskQuery.data.queryStringLazy;
|
return task->taskQuery.data.queryStringLazy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* TaskQueryStringForPlacement returns the query string that should be executed
|
|
||||||
* on the placement with the given placementIndex.
|
|
||||||
*/
|
|
||||||
char *
|
|
||||||
TaskQueryStringForPlacement(Task *task, int placementIndex)
|
|
||||||
{
|
|
||||||
if (IsEachPlacementQueryStringDifferent(task))
|
|
||||||
{
|
|
||||||
List *perPlacementQueryStringList =
|
|
||||||
task->taskQuery.data.perPlacementQueryStrings;
|
|
||||||
Assert(list_length(perPlacementQueryStringList) > placementIndex);
|
|
||||||
return list_nth(perPlacementQueryStringList, placementIndex);
|
|
||||||
}
|
|
||||||
return TaskQueryStringForAllPlacements(task);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* IsEachPlacementQueryStringDifferent returns true if each placement
|
|
||||||
* has a different query string.
|
|
||||||
*/
|
|
||||||
static bool
|
|
||||||
IsEachPlacementQueryStringDifferent(Task *task)
|
|
||||||
{
|
|
||||||
return GetTaskQueryType(task) == TASK_QUERY_TEXT_PER_PLACEMENT;
|
|
||||||
}
|
|
||||||
|
|
|
@ -606,9 +606,7 @@ FetchRemoteExplainFromWorkers(Task *task, ExplainState *es)
|
||||||
RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0(
|
RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0(
|
||||||
sizeof(RemoteExplainPlan));
|
sizeof(RemoteExplainPlan));
|
||||||
|
|
||||||
StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements(
|
StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryString(task), es);
|
||||||
task),
|
|
||||||
es);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Use a coordinated transaction to ensure that we open a transaction block
|
* Use a coordinated transaction to ensure that we open a transaction block
|
||||||
|
@ -694,7 +692,7 @@ ExplainTask(CitusScanState *scanState, Task *task, int placementIndex,
|
||||||
|
|
||||||
if (es->verbose)
|
if (es->verbose)
|
||||||
{
|
{
|
||||||
const char *queryText = TaskQueryStringForAllPlacements(task);
|
const char *queryText = TaskQueryString(task);
|
||||||
ExplainPropertyText("Query", queryText, es);
|
ExplainPropertyText("Query", queryText, es);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1312,7 +1310,7 @@ ExplainAnalyzeTaskList(List *originalTaskList,
|
||||||
}
|
}
|
||||||
|
|
||||||
Task *explainAnalyzeTask = copyObject(originalTask);
|
Task *explainAnalyzeTask = copyObject(originalTask);
|
||||||
const char *queryString = TaskQueryStringForAllPlacements(explainAnalyzeTask);
|
const char *queryString = TaskQueryString(explainAnalyzeTask);
|
||||||
char *wrappedQuery = WrapQueryForExplainAnalyze(queryString, tupleDesc);
|
char *wrappedQuery = WrapQueryForExplainAnalyze(queryString, tupleDesc);
|
||||||
char *fetchQuery =
|
char *fetchQuery =
|
||||||
"SELECT explain_analyze_output FROM worker_last_saved_explain_analyze()";
|
"SELECT explain_analyze_output FROM worker_last_saved_explain_analyze()";
|
||||||
|
|
|
@ -4534,7 +4534,7 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask,
|
||||||
|
|
||||||
/* wrap repartition query string around filter query string */
|
/* wrap repartition query string around filter query string */
|
||||||
StringInfo mapQueryString = makeStringInfo();
|
StringInfo mapQueryString = makeStringInfo();
|
||||||
char *filterQueryString = TaskQueryStringForAllPlacements(filterTask);
|
char *filterQueryString = TaskQueryString(filterTask);
|
||||||
char *filterQueryEscapedText = quote_literal_cstr(filterQueryString);
|
char *filterQueryEscapedText = quote_literal_cstr(filterQueryString);
|
||||||
PartitionType partitionType = mapMergeJob->partitionType;
|
PartitionType partitionType = mapMergeJob->partitionType;
|
||||||
|
|
||||||
|
|
|
@ -284,12 +284,6 @@ CopyTaskQuery(Task *newnode, Task *from)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case TASK_QUERY_TEXT_PER_PLACEMENT:
|
|
||||||
{
|
|
||||||
COPY_STRING_LIST(taskQuery.data.perPlacementQueryStrings);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case TASK_QUERY_TEXT_LIST:
|
case TASK_QUERY_TEXT_LIST:
|
||||||
{
|
{
|
||||||
COPY_STRING_LIST(taskQuery.data.queryStringList);
|
COPY_STRING_LIST(taskQuery.data.queryStringList);
|
||||||
|
|
|
@ -490,12 +490,6 @@ static void WriteTaskQuery(OUTFUNC_ARGS) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case TASK_QUERY_TEXT_PER_PLACEMENT:
|
|
||||||
{
|
|
||||||
WRITE_NODE_FIELD(taskQuery.data.perPlacementQueryStrings);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case TASK_QUERY_TEXT_LIST:
|
case TASK_QUERY_TEXT_LIST:
|
||||||
{
|
{
|
||||||
WRITE_NODE_FIELD(taskQuery.data.queryStringList);
|
WRITE_NODE_FIELD(taskQuery.data.queryStringList);
|
||||||
|
|
|
@ -26,10 +26,7 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
|
||||||
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
|
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
|
||||||
extern void SetTaskQueryString(Task *task, char *queryString);
|
extern void SetTaskQueryString(Task *task, char *queryString);
|
||||||
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
|
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
|
||||||
extern void SetTaskPerPlacementQueryStrings(Task *task,
|
extern char * TaskQueryString(Task *task);
|
||||||
List *perPlacementQueryStringList);
|
|
||||||
extern char * TaskQueryStringForAllPlacements(Task *task);
|
|
||||||
extern char * TaskQueryStringForPlacement(Task *task, int placementIndex);
|
|
||||||
extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList);
|
extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList);
|
||||||
extern int GetTaskQueryType(Task *task);
|
extern int GetTaskQueryType(Task *task);
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "distributed/citus_custom_scan.h"
|
#include "distributed/citus_custom_scan.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
|
#include "distributed/tuple_destination.h"
|
||||||
|
|
||||||
|
|
||||||
/* managed via guc.c */
|
/* managed via guc.c */
|
||||||
|
@ -87,11 +88,8 @@ typedef struct ExecutionParams
|
||||||
/* taskList contains the tasks for the execution.*/
|
/* taskList contains the tasks for the execution.*/
|
||||||
List *taskList;
|
List *taskList;
|
||||||
|
|
||||||
/* tupleDescriptor contains the description for the result tuples.*/
|
/* where to forward each tuple received */
|
||||||
TupleDesc tupleDescriptor;
|
TupleDestination *tupleDestination;
|
||||||
|
|
||||||
/* tupleStore is where the results will be stored for this execution */
|
|
||||||
Tuplestorestate *tupleStore;
|
|
||||||
|
|
||||||
/* expectResults is true if this execution will return some result. */
|
/* expectResults is true if this execution will return some result. */
|
||||||
bool expectResults;
|
bool expectResults;
|
||||||
|
@ -120,9 +118,8 @@ ExecutionParams * CreateBasicExecutionParams(RowModifyLevel modLevel,
|
||||||
bool localExecutionSupported);
|
bool localExecutionSupported);
|
||||||
|
|
||||||
extern uint64 ExecuteTaskListExtended(ExecutionParams *executionParams);
|
extern uint64 ExecuteTaskListExtended(ExecutionParams *executionParams);
|
||||||
extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
|
extern uint64 ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList,
|
||||||
TupleDesc tupleDescriptor,
|
TupleDestination *tupleDest,
|
||||||
Tuplestorestate *tupleStore,
|
|
||||||
bool expectResults);
|
bool expectResults);
|
||||||
extern bool IsCitusCustomState(PlanState *planState);
|
extern bool IsCitusCustomState(PlanState *planState);
|
||||||
extern TupleTableSlot * CitusExecScan(CustomScanState *node);
|
extern TupleTableSlot * CitusExecScan(CustomScanState *node);
|
||||||
|
|
|
@ -208,7 +208,6 @@ typedef enum TaskQueryType
|
||||||
TASK_QUERY_NULL,
|
TASK_QUERY_NULL,
|
||||||
TASK_QUERY_TEXT,
|
TASK_QUERY_TEXT,
|
||||||
TASK_QUERY_OBJECT,
|
TASK_QUERY_OBJECT,
|
||||||
TASK_QUERY_TEXT_PER_PLACEMENT,
|
|
||||||
TASK_QUERY_TEXT_LIST
|
TASK_QUERY_TEXT_LIST
|
||||||
} TaskQueryType;
|
} TaskQueryType;
|
||||||
|
|
||||||
|
@ -238,20 +237,15 @@ typedef struct TaskQuery
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* In almost all cases queryStringLazy should be read only indirectly by
|
* In almost all cases queryStringLazy should be read only indirectly by
|
||||||
* using TaskQueryStringForAllPlacements(). This will populate the field if only the
|
* using TaskQueryString(). This will populate the field if only the
|
||||||
* jobQueryReferenceForLazyDeparsing field is not NULL.
|
* jobQueryReferenceForLazyDeparsing field is not NULL.
|
||||||
*
|
*
|
||||||
* This field should only be set by using SetTaskQueryString() (or as a
|
* This field should only be set by using SetTaskQueryString() (or as a
|
||||||
* side effect from TaskQueryStringForAllPlacements()). Otherwise it might not be in sync
|
* side effect from TaskQueryString()). Otherwise it might not be in sync
|
||||||
* with jobQueryReferenceForLazyDeparsing.
|
* with jobQueryReferenceForLazyDeparsing.
|
||||||
*/
|
*/
|
||||||
char *queryStringLazy;
|
char *queryStringLazy;
|
||||||
|
|
||||||
/*
|
|
||||||
* perPlacementQueryStrings is used when we have different query strings for each placement.
|
|
||||||
*/
|
|
||||||
List *perPlacementQueryStrings;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* queryStringList contains query strings. They should be
|
* queryStringList contains query strings. They should be
|
||||||
* run sequentially. The concatenated version of this list
|
* run sequentially. The concatenated version of this list
|
||||||
|
|
Loading…
Reference in New Issue