diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index cd1f80707..480f85b82 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -18,6 +18,7 @@ #include "access/tupdesc.h" #include "catalog/pg_type.h" #include "distributed/intermediate_results.h" +#include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" #include "distributed/transaction_management.h" @@ -29,6 +30,28 @@ #include "utils/lsyscache.h" +/* + * NodePair contains the source and destination node in a NodeToNodeFragmentsTransfer. + * It is a separate struct to use it as a key in a hash table. + */ +typedef struct NodePair +{ + int sourceNodeId; + int targetNodeId; +} NodePair; + + +/* + * NodeToNodeFragmentsTransfer contains all fragments that need to be fetched from + * the source node to the destination node in the NodePair. + */ +typedef struct NodeToNodeFragmentsTransfer +{ + NodePair nodes; + List *fragmentList; +} NodeToNodeFragmentsTransfer; + + /* forward declarations of local functions */ static void WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList, DistTableCacheEntry *targetRelation, @@ -46,6 +69,44 @@ static DistributedResultFragment * TupleToDistributedResultFragment( static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, bool errorOnAnyFailure); +static List ** ColocateFragmentsWithRelation(List *fragmentList, + DistTableCacheEntry *targetRelation); +static List * ColocationTransfers(List *fragmentList, + DistTableCacheEntry *targetRelation); +static List * FragmentTransferTaskList(List *fragmentListTransfers); +static char * QueryStringForFragmentsTransfer( + NodeToNodeFragmentsTransfer *fragmentsTransfer); +static void ExecuteFetchTaskList(List *fetchTaskList); + + +/* + * RedistributeTaskListResults partitions the results of given task list using + * shard ranges and partition method of given targetRelation, and then colocates + * the result files with shards. + * + * If a shard has a replication factor > 1, corresponding result files are copied + * to all nodes containing that shard. + * + * returnValue[shardIndex] is list of cstrings each of which is a resultId which + * correspond to targetRelation->sortedShardIntervalArray[shardIndex]. + */ +List ** +RedistributeTaskListResults(char *resultIdPrefix, List *selectTaskList, + DistTableCacheEntry *targetRelation, + bool binaryFormat) +{ + /* + * Make sure that this transaction has a distributed transaction ID. + * + * Intermediate results will be stored in a directory that is derived + * from the distributed transaction ID. + */ + UseCoordinatedTransaction(); + + List *fragmentList = PartitionTasklistResults(resultIdPrefix, selectTaskList, + targetRelation, binaryFormat); + return ColocateFragmentsWithRelation(fragmentList, targetRelation); +} /* @@ -64,6 +125,14 @@ PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList, DistTableCacheEntry *targetRelation, bool binaryFormat) { + if (targetRelation->partitionMethod != DISTRIBUTE_BY_HASH && + targetRelation->partitionMethod != DISTRIBUTE_BY_RANGE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("repartitioning results of a tasklist is only supported " + "when target relation is hash or range partitioned."))); + } + /* * Make sure that this transaction has a distributed transaction ID. * @@ -333,3 +402,222 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, return resultStore; } + + +/* + * ColocateFragmentsWithRelation moves the fragments in the cluster so they are + * colocated with the shards of target relation. These transfers are done by + * calls to fetch_intermediate_results() between nodes. + * + * returnValue[shardIndex] is list of result Ids that are colocated with + * targetRelation->sortedShardIntervalArray[shardIndex] after fetch tasks are + * done. + */ +static List ** +ColocateFragmentsWithRelation(List *fragmentList, DistTableCacheEntry *targetRelation) +{ + List *fragmentListTransfers = ColocationTransfers(fragmentList, targetRelation); + List *fragmentTransferTaskList = FragmentTransferTaskList(fragmentListTransfers); + + ExecuteFetchTaskList(fragmentTransferTaskList); + + int shardCount = targetRelation->shardIntervalArrayLength; + List **shardResultIdList = palloc0(shardCount * sizeof(List *)); + + ListCell *fragmentCell = NULL; + foreach(fragmentCell, fragmentList) + { + DistributedResultFragment *sourceFragment = lfirst(fragmentCell); + int shardIndex = sourceFragment->targetShardIndex; + + shardResultIdList[shardIndex] = lappend(shardResultIdList[shardIndex], + sourceFragment->resultId); + } + + return shardResultIdList; +} + + +/* + * ColocationTransfers returns a list of transfers to colocate given fragments with + * shards of the target relation. These transfers also take into account replicated + * target relations. This prunes away transfers with same source and target + */ +static List * +ColocationTransfers(List *fragmentList, DistTableCacheEntry *targetRelation) +{ + HASHCTL transferHashInfo; + MemSet(&transferHashInfo, 0, sizeof(HASHCTL)); + transferHashInfo.keysize = sizeof(NodePair); + transferHashInfo.entrysize = sizeof(NodeToNodeFragmentsTransfer); + transferHashInfo.hcxt = CurrentMemoryContext; + HTAB *transferHash = hash_create("Fragment Transfer Hash", 32, &transferHashInfo, + HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + + ListCell *fragmentCell = NULL; + foreach(fragmentCell, fragmentList) + { + DistributedResultFragment *fragment = lfirst(fragmentCell); + List *placementList = FinalizedShardPlacementList(fragment->targetShardId); + + ListCell *placementCell = NULL; + foreach(placementCell, placementList) + { + ShardPlacement *placement = lfirst(placementCell); + NodePair transferKey = { + .sourceNodeId = fragment->nodeId, + .targetNodeId = placement->nodeId + }; + + if (transferKey.sourceNodeId == transferKey.targetNodeId) + { + continue; + } + + bool foundInCache = false; + NodeToNodeFragmentsTransfer *fragmentListTransfer = + hash_search(transferHash, &transferKey, HASH_ENTER, &foundInCache); + if (!foundInCache) + { + fragmentListTransfer->nodes = transferKey; + fragmentListTransfer->fragmentList = NIL; + } + + fragmentListTransfer->fragmentList = + lappend(fragmentListTransfer->fragmentList, fragment); + } + } + + List *fragmentListTransfers = NIL; + NodeToNodeFragmentsTransfer *transfer = NULL; + HASH_SEQ_STATUS hashSeqStatus; + + hash_seq_init(&hashSeqStatus, transferHash); + + while ((transfer = hash_seq_search(&hashSeqStatus)) != NULL) + { + fragmentListTransfers = lappend(fragmentListTransfers, transfer); + } + + return fragmentListTransfers; +} + + +/* + * FragmentTransferTaskList returns a list of tasks which performs the given list of + * transfers. Each of the transfers are done by a SQL call to fetch_intermediate_results. + * See QueryStringForFragmentsTransfer for how the query is constructed. + */ +static List * +FragmentTransferTaskList(List *fragmentListTransfers) +{ + List *fetchTaskList = NIL; + ListCell *transferCell = NULL; + + foreach(transferCell, fragmentListTransfers) + { + NodeToNodeFragmentsTransfer *fragmentsTransfer = lfirst(transferCell); + + int targetNodeId = fragmentsTransfer->nodes.targetNodeId; + + /* these should have already been pruned away in ColocationTransfers */ + Assert(targetNodeId != fragmentsTransfer->nodes.sourceNodeId); + + WorkerNode *workerNode = LookupNodeByNodeId(targetNodeId); + + ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement); + targetPlacement->nodeName = workerNode->workerName; + targetPlacement->nodePort = workerNode->workerPort; + targetPlacement->groupId = workerNode->groupId; + + Task *task = CitusMakeNode(Task); + task->taskType = SELECT_TASK; + task->queryString = QueryStringForFragmentsTransfer(fragmentsTransfer); + task->taskPlacementList = list_make1(targetPlacement); + + fetchTaskList = lappend(fetchTaskList, task); + } + + return fetchTaskList; +} + + +/* + * QueryStringForFragmentsTransfer returns a query which fetches distributed + * result fragments from source node to target node. See the structure of + * NodeToNodeFragmentsTransfer for details of how these are decided. + */ +static char * +QueryStringForFragmentsTransfer(NodeToNodeFragmentsTransfer *fragmentsTransfer) +{ + ListCell *fragmentCell = NULL; + StringInfo queryString = makeStringInfo(); + StringInfo fragmentNamesArrayString = makeStringInfo(); + int fragmentCount = 0; + NodePair *nodePair = &fragmentsTransfer->nodes; + WorkerNode *sourceNode = LookupNodeByNodeId(nodePair->sourceNodeId); + + appendStringInfoString(fragmentNamesArrayString, "ARRAY["); + + foreach(fragmentCell, fragmentsTransfer->fragmentList) + { + DistributedResultFragment *fragment = lfirst(fragmentCell); + char *fragmentName = fragment->resultId; + + if (fragmentCount > 0) + { + appendStringInfoString(fragmentNamesArrayString, ","); + } + + appendStringInfoString(fragmentNamesArrayString, + quote_literal_cstr(fragmentName)); + + fragmentCount++; + } + + appendStringInfoString(fragmentNamesArrayString, "]::text[]"); + + appendStringInfo(queryString, + "SELECT bytes FROM fetch_intermediate_results(%s,%s,%d) bytes", + fragmentNamesArrayString->data, + quote_literal_cstr(sourceNode->workerName), + sourceNode->workerPort); + + ereport(DEBUG3, (errmsg("fetch task on %s:%d: %s", sourceNode->workerName, + sourceNode->workerPort, queryString->data))); + + return queryString->data; +} + + +/* + * ExecuteFetchTaskList executes a list of fetch_intermediate_results() tasks. + * It ignores the byte_count result of the fetch_intermediate_results() calls. + */ +static void +ExecuteFetchTaskList(List *taskList) +{ + TupleDesc resultDescriptor = NULL; + Tuplestorestate *resultStore = NULL; + int resultColumnCount = 1; + +#if PG_VERSION_NUM >= 120000 + resultDescriptor = CreateTemplateTupleDesc(resultColumnCount); +#else + resultDescriptor = CreateTemplateTupleDesc(resultColumnCount, false); +#endif + + TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "byte_count", INT8OID, -1, 0); + + bool errorOnAnyFailure = true; + resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor, + errorOnAnyFailure); + + TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor, + &TTSOpsMinimalTuple); + + while (tuplestore_gettupleslot(resultStore, true, false, slot)) + { + ExecClearTuple(slot); + } +} diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 3b1b0a967..9366e9dfc 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -847,6 +847,8 @@ fetch_intermediate_results(PG_FUNCTION_ARGS) totalBytesWritten += FetchRemoteIntermediateResult(connection, resultId); } + UnclaimConnection(connection); + PG_RETURN_INT64(totalBytesWritten); } diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index d358ac48d..2f18be9c7 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -234,7 +234,8 @@ citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod, static bool CitusIsVolatileFunctionIdChecker(Oid func_id, void *context) { - if (func_id == CitusReadIntermediateResultFuncId()) + if (func_id == CitusReadIntermediateResultFuncId() || + func_id == CitusReadIntermediateResultArrayFuncId()) { return false; } @@ -273,7 +274,8 @@ CitusIsVolatileFunction(Node *node) static bool CitusIsMutableFunctionIdChecker(Oid func_id, void *context) { - if (func_id == CitusReadIntermediateResultFuncId()) + if (func_id == CitusReadIntermediateResultFuncId() || + func_id == CitusReadIntermediateResultArrayFuncId()) { return false; } diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index b637a37c6..61e670fbc 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -60,6 +60,10 @@ extern char * QueryResultFileName(const char *resultId); extern char * CreateIntermediateResultsDirectory(void); /* distributed_intermediate_results.c */ +extern List ** RedistributeTaskListResults(char *resultIdPrefix, + List *selectTaskList, + DistTableCacheEntry *targetRelation, + bool binaryFormat); extern List * PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList, DistTableCacheEntry *distributionScheme, bool binaryFormat);