From c7c460e843de9cc3900d0bc1b57e23679ee9b1c7 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 9 Jan 2020 10:49:58 -0800 Subject: [PATCH] PartitionTasklistResults: Use different queries per placement We need to know which placement succeeded in executing the worker_partition_query_result() call. Otherwise we wouldn't know which node to fetch from. This change allows that by introducing Task::perPlacementQueryStrings. --- .../distributed/executor/adaptive_executor.c | 14 ++++- .../distributed_intermediate_results.c | 61 ++++++++++++------- .../distributed/multi_physical_planner.h | 10 +++ 3 files changed, 61 insertions(+), 24 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index dfd4069ea..c66c8c541 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -3127,9 +3127,21 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, Task *task = shardCommandExecution->task; ShardPlacement *taskPlacement = placementExecution->shardPlacement; List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); - char *queryString = task->queryString; int querySent = 0; + char *queryString = NULL; + if (task->queryString != NULL) + { + queryString = task->queryString; + } + else + { + Assert(list_length(task->taskPlacementList) == list_length( + task->perPlacementQueryStrings)); + queryString = list_nth(task->perPlacementQueryStrings, + placementExecution->placementExecutionIndex); + } + if (execution->transactionProperties->useRemoteTransactionBlocks != TRANSACTION_BLOCKS_DISALLOWED) { diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index f68638ff7..cd1f80707 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -43,8 +43,9 @@ static void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shard static char * SourceShardPrefix(char *resultPrefix, uint64 shardId); static DistributedResultFragment * TupleToDistributedResultFragment( TupleTableSlot *tupleSlot, DistTableCacheEntry *targetRelation); -static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc - resultDescriptor); +static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList, + TupleDesc resultDescriptor, + bool errorOnAnyFailure); /* @@ -111,31 +112,42 @@ WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList, foreach(taskCell, selectTaskList) { Task *selectTask = (Task *) lfirst(taskCell); - StringInfo wrappedQuery = makeStringInfo(); List *shardPlacementList = selectTask->taskPlacementList; - - ShardPlacement *shardPlacement = linitial(shardPlacementList); char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId); char *partitionMethodString = targetRelation->partitionMethod == 'h' ? "hash" : "range"; const char *binaryFormatString = binaryFormat ? "true" : "false"; + List *perPlacementQueries = NIL; - appendStringInfo(wrappedQuery, - "SELECT %d, partition_index" - ", %s || '_' || partition_index::text " - ", rows_written " - "FROM worker_partition_query_result" - "(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0", - shardPlacement->nodeId, - quote_literal_cstr(taskPrefix), - quote_literal_cstr(taskPrefix), - quote_literal_cstr(selectTask->queryString), - partitionColumnIndex, - quote_literal_cstr(partitionMethodString), - minValuesString->data, maxValuesString->data, - binaryFormatString); + /* + * We need to know which placement could successfully execute the query, + * so we form a different query per placement, each of which returning + * the node id of the placement. + */ + ListCell *placementCell = NULL; + foreach(placementCell, shardPlacementList) + { + ShardPlacement *shardPlacement = lfirst(placementCell); + StringInfo wrappedQuery = makeStringInfo(); + appendStringInfo(wrappedQuery, + "SELECT %d, partition_index" + ", %s || '_' || partition_index::text " + ", rows_written " + "FROM worker_partition_query_result" + "(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0", + shardPlacement->nodeId, + quote_literal_cstr(taskPrefix), + quote_literal_cstr(taskPrefix), + quote_literal_cstr(selectTask->queryString), + partitionColumnIndex, + quote_literal_cstr(partitionMethodString), + minValuesString->data, maxValuesString->data, + binaryFormatString); + perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data); + } - selectTask->queryString = wrappedQuery->data; + selectTask->queryString = NULL; + selectTask->perPlacementQueryStrings = perPlacementQueries; } } @@ -244,7 +256,9 @@ ExecutePartitionTaskList(List *taskList, DistTableCacheEntry *targetRelation) TupleDescInitEntry(resultDescriptor, (AttrNumber) 4, "rows_written", INT8OID, -1, 0); - resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor); + bool errorOnAnyFailure = false; + resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor, + errorOnAnyFailure); List *fragmentList = NIL; TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor, @@ -298,14 +312,15 @@ TupleToDistributedResultFragment(TupleTableSlot *tupleSlot, * store containing its results. */ static Tuplestorestate * -ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor) +ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, + bool errorOnAnyFailure) { bool hasReturning = true; int targetPoolSize = MaxAdaptiveExecutorPoolSize; bool randomAccess = true; bool interTransactions = false; TransactionProperties xactProperties = { - .errorOnAnyFailure = true, + .errorOnAnyFailure = errorOnAnyFailure, .useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED, .requires2PC = false }; diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 4b193fabf..a9045bb50 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -181,7 +181,17 @@ typedef struct Task TaskType taskType; uint64 jobId; uint32 taskId; + + /* + * If queryString != NULL, then we have a single query for all placements. + * Otherwise, length of perPlacementQueryStrings is equal to length of + * taskPlacementList and can assign a different query for each placement. + * We need this flexibility when a query should return node specific values. + * For example, on which node did we succeed storing some result files? + */ char *queryString; + List *perPlacementQueryStrings; + uint64 anchorShardId; /* only applies to compute tasks */ List *taskPlacementList; /* only applies to compute tasks */ List *dependentTaskList; /* only applies to compute tasks */