mirror of https://github.com/citusdata/citus.git
PartitionTasklistResults: Use different queries per placement
We need to know which placement succeeded in executing the worker_partition_query_result() call. Otherwise we wouldn't know which node to fetch from. This change allows that by introducing Task::perPlacementQueryStrings.pull/3363/head
parent
08b5145765
commit
c7c460e843
|
@ -3127,9 +3127,21 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
|
||||||
Task *task = shardCommandExecution->task;
|
Task *task = shardCommandExecution->task;
|
||||||
ShardPlacement *taskPlacement = placementExecution->shardPlacement;
|
ShardPlacement *taskPlacement = placementExecution->shardPlacement;
|
||||||
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
|
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
|
||||||
char *queryString = task->queryString;
|
|
||||||
int querySent = 0;
|
int querySent = 0;
|
||||||
|
|
||||||
|
char *queryString = NULL;
|
||||||
|
if (task->queryString != NULL)
|
||||||
|
{
|
||||||
|
queryString = task->queryString;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Assert(list_length(task->taskPlacementList) == list_length(
|
||||||
|
task->perPlacementQueryStrings));
|
||||||
|
queryString = list_nth(task->perPlacementQueryStrings,
|
||||||
|
placementExecution->placementExecutionIndex);
|
||||||
|
}
|
||||||
|
|
||||||
if (execution->transactionProperties->useRemoteTransactionBlocks !=
|
if (execution->transactionProperties->useRemoteTransactionBlocks !=
|
||||||
TRANSACTION_BLOCKS_DISALLOWED)
|
TRANSACTION_BLOCKS_DISALLOWED)
|
||||||
{
|
{
|
||||||
|
|
|
@ -43,8 +43,9 @@ static void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shard
|
||||||
static char * SourceShardPrefix(char *resultPrefix, uint64 shardId);
|
static char * SourceShardPrefix(char *resultPrefix, uint64 shardId);
|
||||||
static DistributedResultFragment * TupleToDistributedResultFragment(
|
static DistributedResultFragment * TupleToDistributedResultFragment(
|
||||||
TupleTableSlot *tupleSlot, DistTableCacheEntry *targetRelation);
|
TupleTableSlot *tupleSlot, DistTableCacheEntry *targetRelation);
|
||||||
static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc
|
static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList,
|
||||||
resultDescriptor);
|
TupleDesc resultDescriptor,
|
||||||
|
bool errorOnAnyFailure);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -111,31 +112,42 @@ WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList,
|
||||||
foreach(taskCell, selectTaskList)
|
foreach(taskCell, selectTaskList)
|
||||||
{
|
{
|
||||||
Task *selectTask = (Task *) lfirst(taskCell);
|
Task *selectTask = (Task *) lfirst(taskCell);
|
||||||
StringInfo wrappedQuery = makeStringInfo();
|
|
||||||
List *shardPlacementList = selectTask->taskPlacementList;
|
List *shardPlacementList = selectTask->taskPlacementList;
|
||||||
|
|
||||||
ShardPlacement *shardPlacement = linitial(shardPlacementList);
|
|
||||||
char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId);
|
char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId);
|
||||||
char *partitionMethodString = targetRelation->partitionMethod == 'h' ?
|
char *partitionMethodString = targetRelation->partitionMethod == 'h' ?
|
||||||
"hash" : "range";
|
"hash" : "range";
|
||||||
const char *binaryFormatString = binaryFormat ? "true" : "false";
|
const char *binaryFormatString = binaryFormat ? "true" : "false";
|
||||||
|
List *perPlacementQueries = NIL;
|
||||||
|
|
||||||
appendStringInfo(wrappedQuery,
|
/*
|
||||||
"SELECT %d, partition_index"
|
* We need to know which placement could successfully execute the query,
|
||||||
", %s || '_' || partition_index::text "
|
* so we form a different query per placement, each of which returning
|
||||||
", rows_written "
|
* the node id of the placement.
|
||||||
"FROM worker_partition_query_result"
|
*/
|
||||||
"(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0",
|
ListCell *placementCell = NULL;
|
||||||
shardPlacement->nodeId,
|
foreach(placementCell, shardPlacementList)
|
||||||
quote_literal_cstr(taskPrefix),
|
{
|
||||||
quote_literal_cstr(taskPrefix),
|
ShardPlacement *shardPlacement = lfirst(placementCell);
|
||||||
quote_literal_cstr(selectTask->queryString),
|
StringInfo wrappedQuery = makeStringInfo();
|
||||||
partitionColumnIndex,
|
appendStringInfo(wrappedQuery,
|
||||||
quote_literal_cstr(partitionMethodString),
|
"SELECT %d, partition_index"
|
||||||
minValuesString->data, maxValuesString->data,
|
", %s || '_' || partition_index::text "
|
||||||
binaryFormatString);
|
", rows_written "
|
||||||
|
"FROM worker_partition_query_result"
|
||||||
|
"(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0",
|
||||||
|
shardPlacement->nodeId,
|
||||||
|
quote_literal_cstr(taskPrefix),
|
||||||
|
quote_literal_cstr(taskPrefix),
|
||||||
|
quote_literal_cstr(selectTask->queryString),
|
||||||
|
partitionColumnIndex,
|
||||||
|
quote_literal_cstr(partitionMethodString),
|
||||||
|
minValuesString->data, maxValuesString->data,
|
||||||
|
binaryFormatString);
|
||||||
|
perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data);
|
||||||
|
}
|
||||||
|
|
||||||
selectTask->queryString = wrappedQuery->data;
|
selectTask->queryString = NULL;
|
||||||
|
selectTask->perPlacementQueryStrings = perPlacementQueries;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -244,7 +256,9 @@ ExecutePartitionTaskList(List *taskList, DistTableCacheEntry *targetRelation)
|
||||||
TupleDescInitEntry(resultDescriptor, (AttrNumber) 4, "rows_written",
|
TupleDescInitEntry(resultDescriptor, (AttrNumber) 4, "rows_written",
|
||||||
INT8OID, -1, 0);
|
INT8OID, -1, 0);
|
||||||
|
|
||||||
resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor);
|
bool errorOnAnyFailure = false;
|
||||||
|
resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor,
|
||||||
|
errorOnAnyFailure);
|
||||||
|
|
||||||
List *fragmentList = NIL;
|
List *fragmentList = NIL;
|
||||||
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor,
|
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor,
|
||||||
|
@ -298,14 +312,15 @@ TupleToDistributedResultFragment(TupleTableSlot *tupleSlot,
|
||||||
* store containing its results.
|
* store containing its results.
|
||||||
*/
|
*/
|
||||||
static Tuplestorestate *
|
static Tuplestorestate *
|
||||||
ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor)
|
ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor,
|
||||||
|
bool errorOnAnyFailure)
|
||||||
{
|
{
|
||||||
bool hasReturning = true;
|
bool hasReturning = true;
|
||||||
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
||||||
bool randomAccess = true;
|
bool randomAccess = true;
|
||||||
bool interTransactions = false;
|
bool interTransactions = false;
|
||||||
TransactionProperties xactProperties = {
|
TransactionProperties xactProperties = {
|
||||||
.errorOnAnyFailure = true,
|
.errorOnAnyFailure = errorOnAnyFailure,
|
||||||
.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED,
|
.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED,
|
||||||
.requires2PC = false
|
.requires2PC = false
|
||||||
};
|
};
|
||||||
|
|
|
@ -181,7 +181,17 @@ typedef struct Task
|
||||||
TaskType taskType;
|
TaskType taskType;
|
||||||
uint64 jobId;
|
uint64 jobId;
|
||||||
uint32 taskId;
|
uint32 taskId;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If queryString != NULL, then we have a single query for all placements.
|
||||||
|
* Otherwise, length of perPlacementQueryStrings is equal to length of
|
||||||
|
* taskPlacementList and can assign a different query for each placement.
|
||||||
|
* We need this flexibility when a query should return node specific values.
|
||||||
|
* For example, on which node did we succeed storing some result files?
|
||||||
|
*/
|
||||||
char *queryString;
|
char *queryString;
|
||||||
|
List *perPlacementQueryStrings;
|
||||||
|
|
||||||
uint64 anchorShardId; /* only applies to compute tasks */
|
uint64 anchorShardId; /* only applies to compute tasks */
|
||||||
List *taskPlacementList; /* only applies to compute tasks */
|
List *taskPlacementList; /* only applies to compute tasks */
|
||||||
List *dependentTaskList; /* only applies to compute tasks */
|
List *dependentTaskList; /* only applies to compute tasks */
|
||||||
|
|
Loading…
Reference in New Issue