diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index 699a556a0..54d4d8e91 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -21,7 +21,9 @@ #include "access/htup_details.h" #include "access/tupdesc.h" #include "catalog/pg_type.h" +#include "distributed/coordinator_protocol.h" #include "distributed/deparse_shard_query.h" +#include "distributed/insert_select_executor.h" #include "distributed/intermediate_results.h" #include "distributed/listutils.h" #include "distributed/metadata_utility.h" @@ -78,7 +80,29 @@ typedef struct NodeToNodeFragmentsTransfer } NodeToNodeFragmentsTransfer; +/* + * NamedDistributedResult represents a distributed intermediate result with a + * name that can be queried via read_intermediate_distributed_result. + */ +typedef struct NamedDistributedResult +{ + /* name of the distributed intermediate result */ + char resultId[NAMEDATALEN]; + + /* pointer to the distributed intermediate result */ + DistributedResult *distributedResult; + +} NamedDistributedResult; + + +/* hash of distributed intermediate results in the current transaction */ +static HTAB *NamedDistributedResults = NULL; + + /* forward declarations of local functions */ +static void CreateDistributedResult(char *resultIdPrefix, Query *query, + int partitionColumnIndex, Oid targetRelationId); +static void InitializeNamedDistributedResultsHash(void); static List * WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, int partitionColumnIndex, @@ -107,8 +131,9 @@ static DistributedResultFragment * TupleToDistributedResultFragment(HeapTuple he static void ExecuteSelectTasksIntoTupleDest(List *taskList, TupleDestination *tupleDestination, bool errorOnAnyFailure); -static List ** ColocateFragmentsWithRelation(List *fragmentList, - CitusTableCacheEntry *targetRelation); +static void ColocateFragmentsWithRelation(List *fragmentList, + CitusTableCacheEntry *targetRelation, + DistributedResultShard *resultShards); static List * ColocationTransfers(List *fragmentList, CitusTableCacheEntry *targetRelation); static List * FragmentTransferTaskList(List *fragmentListTransfers); @@ -117,21 +142,238 @@ static char * QueryStringForFragmentsTransfer( static void ExecuteFetchTaskList(List *fetchTaskList); +PG_FUNCTION_INFO_V1(read_distributed_intermediate_result); +PG_FUNCTION_INFO_V1(create_distributed_intermediate_result); + + +/* + * read_distributed_intermediate_result is a placeholder for reading from + * temporary distributed results. + */ +Datum +read_distributed_intermediate_result(PG_FUNCTION_ARGS) +{ + ereport(ERROR, (errmsg("read_distributed_intermediate_result is a placeholder for " + "reading from temporary distributed results"))); +} + + +/* + * create_distributed_intermediate_result executes a query and writes + * the result into a distributed intermediate result. + */ +Datum +create_distributed_intermediate_result(PG_FUNCTION_ARGS) +{ + text *resultIdText = PG_GETARG_TEXT_P(0); + char *resultIdPrefix = text_to_cstring(resultIdText); + text *queryText = PG_GETARG_TEXT_P(1); + char *queryString = text_to_cstring(queryText); + int columnIndex = PG_GETARG_INT32(2); + text *colocateWithText = PG_GETARG_TEXT_P(3); + Oid targetRelationId = ResolveRelationId(colocateWithText, false); + + CheckCitusVersion(ERROR); + + if (!IsCitusTableType(targetRelationId, DISTRIBUTED_TABLE)) + { + ereport(ERROR, (errmsg("result can only be co-located with a distributed " + "table"))); + } + + Query *query = ParseQueryString(queryString, NULL, 0); + + CreateDistributedResult(resultIdPrefix, query, columnIndex, targetRelationId); + + PG_RETURN_VOID(); +} + + +/* + * RegisterDistributedResult registers a named distributed intermediate result + * for use in the planner. + */ +DistributedResult * +RegisterDistributedResult(char *resultIdPrefix, Query *query, int partitionColumnIndex, + Oid targetRelationId) +{ + InitializeNamedDistributedResultsHash(); + + /* + * Make sure that this transaction has a distributed transaction ID. + * + * Intermediate results will be stored in a directory that is derived + * from the distributed transaction ID. + */ + UseCoordinatedTransaction(); + + /* look in the hash first to do error checking early */ + bool found = false; + NamedDistributedResult *namedDistributedResult = + hash_search(NamedDistributedResults, resultIdPrefix, HASH_ENTER, &found); + if (found) + { + ereport(ERROR, (errmsg("A distributed intermediate result named \"%s\" already " + "exists in the current transaction", resultIdPrefix))); + } + + CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId); + int shardCount = targetRelation->shardIntervalArrayLength; + bool binaryFormat = CanUseBinaryCopyFormatForTargetList(query->targetList); + + MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); + + DistributedResult *distributedResult = palloc0(sizeof(DistributedResult)); + distributedResult->state = DISTRIBUTED_RESULT_PLANNED; + distributedResult->shardCount = shardCount; + distributedResult->colocationId = targetRelation->colocationId; + distributedResult->binaryFormat = binaryFormat; + distributedResult->partitionColumnIndex = partitionColumnIndex; + distributedResult->resultShards = + palloc0(sizeof(DistributedResultShard) * shardCount); + + ShardInterval **shardIntervalArray = targetRelation->sortedShardIntervalArray; + + for(int targetShardIndex = 0; targetShardIndex < shardCount; targetShardIndex++) + { + DistributedResultShard *resultShard = + &(distributedResult->resultShards[targetShardIndex]); + ShardInterval *targetShardInterval = shardIntervalArray[targetShardIndex]; + + resultShard->targetShardId = targetShardInterval->shardId; + resultShard->targetShardIndex = targetShardIndex; + resultShard->fragmentList = NIL; + resultShard->rowCount = -1; + } + + namedDistributedResult->distributedResult = distributedResult; + + MemoryContextSwitchTo(oldContext); + + return distributedResult; +} + + +/* + * CreateDistributedResult creates a distributed intermediate + * result by executing the query, reshuffling the results, and then storing + * the results in the global hash map. + */ +static void +CreateDistributedResult(char *resultIdPrefix, Query *query, int partitionColumnIndex, + Oid targetRelationId) +{ + DistributedResult *distResult = RegisterDistributedResult(resultIdPrefix, query, + partitionColumnIndex, + targetRelationId); + + int cursorOptions = CURSOR_OPT_PARALLEL_OK; + ParamListInfo params = NULL; + PlannedStmt *selectPlan = pg_plan_query_compat(query, NULL, cursorOptions, + params); + + if (!IsRedistributablePlan(selectPlan->planTree) || + !IsSupportedRedistributionTarget(targetRelationId)) + { + ereport(ERROR, (errmsg("query cannot be re-partitioned"), + errhint("use create_intermediate_result instead"))); + } + + DistributedPlan *distSelectPlan = + GetDistributedPlan((CustomScan *) selectPlan->planTree); + Job *distSelectJob = distSelectPlan->workerJob; + List *taskList = distSelectJob->taskList; + + CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId); + bool binaryFormat = CanUseBinaryCopyFormatForTargetList(query->targetList); + + List *fragmentList = PartitionTasklistResults(resultIdPrefix, taskList, + partitionColumnIndex, + targetRelation, binaryFormat); + + ColocateFragmentsWithRelation(fragmentList, targetRelation, + distResult->resultShards); + + distResult->state = DISTRIBUTED_RESULT_AVAILABLE; +} + + +/* + * GetNamedDistributedResult returns a named distributed result, allocated in the + * top transaction context. + */ +DistributedResult * +GetNamedDistributedResult(char *resultId) +{ + if (NamedDistributedResults == NULL) + { + ereport(ERROR, (errmsg("distributed intermediate result \"%s\" does not exist", + resultId))); + } + + bool found = false; + + NamedDistributedResult *namedDistributedResult = + hash_search(NamedDistributedResults, resultId, HASH_FIND, &found); + if (!found) + { + ereport(ERROR, (errmsg("distributed intermediate result \"%s\" does not exist", + resultId))); + } + + return namedDistributedResult->distributedResult; +} + + +/* + * InitializeNamedDistributedResultsHash creates the NamedDistributedResult + * hash if not already created. The hash is kept until the end of the transaction. + */ +static void +InitializeNamedDistributedResultsHash(void) +{ + if (NamedDistributedResults != NULL) + { + return; + } + + HASHCTL info; + memset(&info, 0, sizeof(info)); + info.keysize = NAMEDATALEN; + info.entrysize = sizeof(NamedDistributedResult); + info.hash = string_hash; + info.hcxt = TopTransactionContext; + uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + NamedDistributedResults = hash_create("named distributed results hash", 8, &info, + hashFlags); +} + + +/* + * ClearNamedDistributedResultsHash resets the distributed intermediate results + * hash. The memory is allocated in the top transaction context and will automatically + * be freed. + */ +void +ClearNamedDistributedResultsHash(void) +{ + NamedDistributedResults = NULL; +} + + /* * RedistributeTaskListResults partitions the results of given task list using * shard ranges and partition method of given targetRelation, and then colocates * the result files with shards. * - * If a shard has a replication factor > 1, corresponding result files are copied - * to all nodes containing that shard. - * - * returnValue[shardIndex] is list of cstrings each of which is a resultId which - * correspond to targetRelation->sortedShardIntervalArray[shardIndex]. - * * partitionColumnIndex determines the column in the selectTaskList to use for * partitioning. + * + * If a shard has a replication factor > 1, corresponding result files are copied + * to all nodes containing that shard. */ -List ** +DistributedResult * RedistributeTaskListResults(const char *resultIdPrefix, List *selectTaskList, int partitionColumnIndex, CitusTableCacheEntry *targetRelation, @@ -148,7 +390,21 @@ RedistributeTaskListResults(const char *resultIdPrefix, List *selectTaskList, List *fragmentList = PartitionTasklistResults(resultIdPrefix, selectTaskList, partitionColumnIndex, targetRelation, binaryFormat); - return ColocateFragmentsWithRelation(fragmentList, targetRelation); + + DistributedResult *distributedResult = palloc0(sizeof(DistributedResult)); + distributedResult->state = DISTRIBUTED_RESULT_AVAILABLE; + distributedResult->shardCount = targetRelation->shardIntervalArrayLength; + distributedResult->colocationId = targetRelation->colocationId; + distributedResult->partitionColumnIndex = partitionColumnIndex; + distributedResult->binaryFormat = binaryFormat; + + distributedResult->resultShards = + palloc0(sizeof(DistributedResultShard) * distributedResult->shardCount); + + ColocateFragmentsWithRelation(fragmentList, targetRelation, + distributedResult->resultShards); + + return distributedResult; } @@ -504,28 +760,38 @@ ExecuteSelectTasksIntoTupleDest(List *taskList, TupleDestination *tupleDestinati * targetRelation->sortedShardIntervalArray[shardIndex] after fetch tasks are * done. */ -static List ** -ColocateFragmentsWithRelation(List *fragmentList, CitusTableCacheEntry *targetRelation) +static void +ColocateFragmentsWithRelation(List *fragmentList, CitusTableCacheEntry *targetRelation, + DistributedResultShard *resultShards) { List *fragmentListTransfers = ColocationTransfers(fragmentList, targetRelation); List *fragmentTransferTaskList = FragmentTransferTaskList(fragmentListTransfers); ExecuteFetchTaskList(fragmentTransferTaskList); - int shardCount = targetRelation->shardIntervalArrayLength; - List **shardResultIdList = palloc0(shardCount * sizeof(List *)); + MemoryContext shardsContext = GetMemoryChunkContext(resultShards); DistributedResultFragment *sourceFragment = NULL; foreach_ptr(sourceFragment, fragmentList) { int shardIndex = sourceFragment->targetShardIndex; - Assert(shardIndex < shardCount); - shardResultIdList[shardIndex] = lappend(shardResultIdList[shardIndex], - sourceFragment->resultId); - } + Assert(shardIndex < targetRelation->shardIntervalArrayLength); - return shardResultIdList; + DistributedResultShard *resultShard = &(resultShards[shardIndex]); + resultShard->targetShardIndex = shardIndex; + resultShard->targetShardId = sourceFragment->targetShardId; + resultShard->rowCount += sourceFragment->rowCount; + + /* use the named distributed intermediate result context where needed */ + MemoryContext oldContext = MemoryContextSwitchTo(shardsContext); + + /* assign fragment to distributed result shard */ + resultShard->fragmentList = lappend(resultShard->fragmentList, + pstrdup(sourceFragment->resultId)); + + MemoryContextSwitchTo(oldContext); + } } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index c6fd32ea4..3ba78918f 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -70,7 +70,7 @@ static List * BuildColumnNameListFromTargetList(Oid targetRelationId, static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList); static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery, CitusTableCacheEntry *targetRelation, - List **redistributedResults, + DistributedResult *redistributedResult, bool useBinaryFormat); static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn); static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries); @@ -181,11 +181,10 @@ NonPushableInsertSelectExecScan(CustomScanState *node) WrapTaskListForProjection(distSelectTaskList, projectedTargetEntries); } - List **redistributedResults = RedistributeTaskListResults(distResultPrefix, - distSelectTaskList, - partitionColumnIndex, - targetRelation, - binaryFormat); + DistributedResult *redistributedResult = + RedistributeTaskListResults(distResultPrefix, distSelectTaskList, + partitionColumnIndex, targetRelation, + binaryFormat); /* * At this point select query has been executed on workers and results @@ -195,7 +194,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node) */ List *taskList = RedistributedInsertSelectTaskList(insertSelectQuery, targetRelation, - redistributedResults, + redistributedResult, binaryFormat); scanState->tuplestorestate = @@ -638,7 +637,7 @@ IsSupportedRedistributionTarget(Oid targetRelationId) static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery, CitusTableCacheEntry *targetRelation, - List **redistributedResults, + DistributedResult *redistributedResult, bool useBinaryFormat) { List *taskList = NIL; @@ -663,7 +662,9 @@ RedistributedInsertSelectTaskList(Query *insertSelectQuery, { ShardInterval *targetShardInterval = targetRelation->sortedShardIntervalArray[shardOffset]; - List *resultIdList = redistributedResults[targetShardInterval->shardIndex]; + DistributedResultShard *resultShard = + &(redistributedResult->resultShards[shardOffset]); + List *resultIdList = resultShard->fragmentList; uint64 shardId = targetShardInterval->shardId; StringInfo queryString = makeStringInfo(); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 1e5ebac6a..574fe17c4 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -153,6 +153,7 @@ typedef struct MetadataCacheData Oid copyFormatTypeId; Oid readIntermediateResultFuncId; Oid readIntermediateResultArrayFuncId; + Oid readDistributedIntermediateResultFuncId; Oid extraDataContainerFuncId; Oid workerHashFunctionId; Oid anyValueFunctionId; @@ -2259,6 +2260,26 @@ CitusReadIntermediateResultArrayFuncId(void) } +/* return oid of the read_distributed_intermediate_result(text) function */ +Oid +CitusReadDistributedIntermediateResultFuncId(void) +{ + if (MetadataCache.readDistributedIntermediateResultFuncId == InvalidOid) + { + char *functionName = "read_distributed_intermediate_result"; + List *functionNameList = list_make2(makeString("pg_catalog"), + makeString(functionName)); + Oid paramOids[1] = { TEXTOID }; + bool missingOK = true; + + MetadataCache.readDistributedIntermediateResultFuncId = + LookupFuncName(functionNameList, 1, paramOids, missingOK); + } + + return MetadataCache.readDistributedIntermediateResultFuncId; +} + + /* return oid of the citus.copy_format enum type */ Oid CitusCopyFormatTypeId(void) diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 052a89ba6..ebe8aa7ef 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -15,10 +15,12 @@ #include "access/heapam.h" #include "access/htup_details.h" #include "catalog/pg_constraint.h" +#include "catalog/pg_type.h" #include "distributed/citus_nodefuncs.h" #include "distributed/citus_ruleutils.h" #include "distributed/deparse_shard_query.h" #include "distributed/insert_select_planner.h" +#include "distributed/intermediate_results.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/metadata_cache.h" @@ -43,6 +45,10 @@ static void UpdateTaskQueryString(Query *query, Oid distributedTableId, static bool ReplaceRelationConstraintByShardConstraint(List *relationShardList, OnConflictExpr *onConflict); static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList); +static void ConvertReadDistributedResultForShard(RangeTblEntry *rte, + List *relationShardList); +static RelationShard * FindDistributedResultRelationShard(List *relationShardList, + char *resultId); static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); static bool ShouldLazyDeparseQuery(Task *task); static char * DeparseTaskQuery(Task *task, Query *query); @@ -229,6 +235,12 @@ UpdateRelationToShardNames(Node *node, List *relationShardList) RangeTblEntry *newRte = (RangeTblEntry *) node; + if (IsDistributedIntermediateResultRTE(newRte)) + { + ConvertReadDistributedResultForShard(newRte, relationShardList); + return false; + } + if (newRte->rtekind != RTE_RELATION) { return false; @@ -384,6 +396,89 @@ ReplaceRelationConstraintByShardConstraint(List *relationShardList, } +/* + * ConvertReadDistributedResultForShard converts a + * read_distributed_intermediate_result('') call to a + * read_intermediate_result(ARRAY[..fragments...]) call for the fragments belonging to + * a particular shard. + * + * The shard is obtained from the relationShardList. + */ +static void +ConvertReadDistributedResultForShard(RangeTblEntry *rte, + List *relationShardList) +{ + char *resultId = FindDistributedResultId(rte); + DistributedResult *distributedResult = GetNamedDistributedResult(resultId); + + List *sortedResultIds = NIL; + + RelationShard *relationShard = FindDistributedResultRelationShard(relationShardList, + resultId); + if (relationShard != NULL) + { + int shardIndex = relationShard->shardIndex; + + DistributedResultShard *resultShard = + &(distributedResult->resultShards[shardIndex]); + List *resultIdList = resultShard->fragmentList; + + /* sort result ids for consistent test output */ + sortedResultIds = SortList(resultIdList, pg_qsort_strcmp); + } + else + { + /* no matching relation shard, use empty array */ + } + + bool useBinaryFormat = distributedResult->binaryFormat; + + /* generate the query on the intermediate result */ + RangeTblFunction *rangeTableFunction = + (RangeTblFunction *) linitial(rte->functions); + + /* build read_intermediate_result call */ + Const *resultIdConst = makeNode(Const); + resultIdConst->consttype = TEXTARRAYOID; + resultIdConst->consttypmod = -1; + resultIdConst->constlen = -1; + resultIdConst->constvalue = PointerGetDatum(strlist_to_textarray(sortedResultIds)); + resultIdConst->constbyval = false; + resultIdConst->constisnull = false; + resultIdConst->location = -1; + + Oid copyFormatId = BinaryCopyFormatId(); + + if (!useBinaryFormat) + { + copyFormatId = TextCopyFormatId(); + } + + Const *resultFormatConst = makeNode(Const); + resultFormatConst->consttype = CitusCopyFormatTypeId(); + resultFormatConst->consttypmod = -1; + resultFormatConst->constlen = 4; + resultFormatConst->constvalue = ObjectIdGetDatum(copyFormatId); + resultFormatConst->constbyval = true; + resultFormatConst->constisnull = false; + resultFormatConst->location = -1; + + /* build the call to read_intermediate_result */ + FuncExpr *funcExpr = makeNode(FuncExpr); + funcExpr->funcid = CitusReadIntermediateResultArrayFuncId(); + funcExpr->funcretset = true; + funcExpr->funcvariadic = false; + funcExpr->funcformat = 0; + funcExpr->funccollid = 0; + funcExpr->inputcollid = 0; + funcExpr->location = -1; + funcExpr->args = list_make2(resultIdConst, resultFormatConst); + + /* replace function expression in RTE */ + rangeTableFunction->funcexpr = (Node *) funcExpr; +} + + /* * FindRelationShard finds the RelationShard for shard relation with * given Oid if exists in given relationShardList. Otherwise, returns NULL. @@ -410,6 +505,29 @@ FindRelationShard(Oid inputRelationId, List *relationShardList) } + +/* + * FindDistributedResultRelationShard finds a relation shard for a distributed + * result with the name , or NULL if it's not in the list. + */ +static RelationShard * +FindDistributedResultRelationShard(List *relationShardList, char *resultId) +{ + RelationShard *relationShard = NULL; + + foreach_ptr(relationShard, relationShardList) + { + if (relationShard->shardedRelationType == SHARDED_RESULT && + strncmp(relationShard->resultId, resultId, NAMEDATALEN) == 0) + { + return relationShard; + } + } + + return NULL; +} + + /* * ConvertRteToSubqueryWithEmptyResult converts given relation RTE into * subquery RTE that returns no results. diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 21de545e2..2624d9841 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -331,6 +331,11 @@ ListContainsDistributedTableRTE(List *rangeTableList) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + if (IsDistributedIntermediateResultRTE(rangeTableEntry)) + { + return true; + } + if (rangeTableEntry->rtekind != RTE_RELATION) { continue; @@ -375,7 +380,8 @@ AssignRTEIdentities(List *rangeTableList, int rteIdCounter) * Note that we're only interested in RTE_RELATIONs and thus assigning * identifiers to those RTEs only. */ - if (rangeTableEntry->rtekind == RTE_RELATION && + if ((rangeTableEntry->rtekind == RTE_RELATION || + IsDistributedIntermediateResultRTE(rangeTableEntry)) && rangeTableEntry->values_lists == NIL) { AssignRTEIdentity(rangeTableEntry, rteIdCounter++); @@ -446,8 +452,6 @@ AdjustPartitioningForDistributedPlanning(List *rangeTableList, static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier) { - Assert(rangeTableEntry->rtekind == RTE_RELATION); - rangeTableEntry->values_lists = list_make1_int(rteIdentifier); } @@ -456,7 +460,6 @@ AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier) int GetRTEIdentity(RangeTblEntry *rte) { - Assert(rte->rtekind == RTE_RELATION); Assert(rte->values_lists != NIL); Assert(IsA(rte->values_lists, IntList)); Assert(list_length(rte->values_lists) == 1); @@ -1767,6 +1770,11 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, { CitusTableCacheEntry *cacheEntry = NULL; + if (!CitusHasBeenLoaded()) + { + return; + } + if (ReplaceCitusExtraDataContainer && IsCitusExtraDataContainerRelation(rte)) { /* @@ -1789,7 +1797,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, AdjustReadIntermediateResultCost(rte, relOptInfo); AdjustReadIntermediateResultArrayCost(rte, relOptInfo); - if (rte->rtekind != RTE_RELATION) + if (rte->rtekind != RTE_RELATION && !IsDistributedIntermediateResultRTE(rte)) { return; } diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 1f028ff0e..24cca50de 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -346,6 +346,24 @@ IsDistributedTableRTE(Node *node) } +/* + * IsDistributedRelationRTE returns true if the given range table entry + * describes a distributed table or a distributed intermediate result. + */ +bool +IsDistributedRelationRTE(Node *node) +{ + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node; + if (!IsA(rangeTableEntry, RangeTblEntry)) + { + return false; + } + + return IsDistributedTableRTE((Node *) rangeTableEntry) || + IsDistributedIntermediateResultRTE(rangeTableEntry); +} + + /* * IsReferenceTableRTE gets a node and returns true if the node * is a range table relation entry that points to a reference table. @@ -678,6 +696,26 @@ MultiNodeTree(Query *queryTree) } +/* + * IsDistributedIntermediateResultRTE returns whether an RTE describes a + * FROM read_distributed_intermediate_result() call. + */ +bool +IsDistributedIntermediateResultRTE(RangeTblEntry *rangeTableEntry) +{ + if (rangeTableEntry->rtekind != RTE_FUNCTION) + { + return false; + } + + RangeTblFunction *rangeTableFunction = (RangeTblFunction *) linitial( + rangeTableEntry->functions); + FuncExpr *funcExpression = (FuncExpr *) rangeTableFunction->funcexpr; + + return funcExpression->funcid == CitusReadDistributedIntermediateResultFuncId(); +} + + /* * ContainsReadIntermediateResultFunction determines whether an expresion tree contains * a call to the read_intermediate_result function. @@ -817,6 +855,38 @@ FindIntermediateResultIdIfExists(RangeTblEntry *rte) } +/* + * FindDistributedResultId extracts the result ID from a + * read_distributed_intermediate_result call. + */ +char * +FindDistributedResultId(RangeTblEntry *rte) +{ + if (rte->rtekind != RTE_FUNCTION) + { + ereport(ERROR, (errmsg("not a read_distributed_intermediate_result RTE"))); + } + + List *functionList = rte->functions; + RangeTblFunction *rangeTblfunction = (RangeTblFunction *) linitial(functionList); + FuncExpr *funcExpr = (FuncExpr *) rangeTblfunction->funcexpr; + + if (funcExpr->funcid != CitusReadDistributedIntermediateResultFuncId()) + { + ereport(ERROR, (errmsg("not a read_distributed_intermediate_result RTE"))); + } + + Const *resultIdConst = linitial(funcExpr->args); + + if (resultIdConst->constisnull) + { + ereport(ERROR, (errmsg("distributed intermediate result ID cannot be NULL"))); + } + + return TextDatumGetCString(resultIdConst->constvalue); +} + + /* * ErrorIfQueryNotSupported checks that we can perform distributed planning for * the given query. The checks in this function will be removed as we support diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 1dd5c661a..f4cafc4ed 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -40,6 +40,7 @@ #include "distributed/colocation_utils.h" #include "distributed/deparse_shard_query.h" #include "distributed/coordinator_protocol.h" +#include "distributed/intermediate_results.h" #include "distributed/metadata_cache.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_join_order.h" @@ -2211,18 +2212,32 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, { RelationRestriction *relationRestriction = (RelationRestriction *) lfirst(restrictionCell); - Oid relationId = relationRestriction->relationId; List *prunedShardList = (List *) lfirst(prunedRelationShardCell); ListCell *shardIntervalCell = NULL; + int currentShardCount = 0; + DistributedResult *distributedResult = NULL; - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) + if (IsDistributedIntermediateResultRTE(relationRestriction->rte)) { - continue; + char *resultId = FindDistributedResultId(relationRestriction->rte); + distributedResult = GetNamedDistributedResult(resultId); + + currentShardCount = distributedResult->shardCount; + } + else + { + Oid relationId = relationRestriction->relationId; + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); + if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) + { + continue; + } + + currentShardCount = cacheEntry->shardIntervalArrayLength; } /* we expect distributed tables to have the same shard count */ - if (shardCount > 0 && shardCount != cacheEntry->shardIntervalArrayLength) + if (shardCount > 0 && shardCount != currentShardCount) { ereport(ERROR, (errmsg("shard counts of co-located tables do not " "match"))); @@ -2230,7 +2245,7 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, if (taskRequiredForShardIndex == NULL) { - shardCount = cacheEntry->shardIntervalArrayLength; + shardCount = currentShardCount; taskRequiredForShardIndex = (bool *) palloc0(shardCount); /* there is a distributed table, find the shard range */ @@ -2258,6 +2273,18 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); int shardIndex = shardInterval->shardIndex; + if (distributedResult != NULL) + { + DistributedResultShard *resultShard = + &(distributedResult->resultShards[shardIndex]); + + if (resultShard->fragmentList == NIL) + { + /* can skip empty result shard */ + continue; + } + } + taskRequiredForShardIndex[shardIndex] = true; minShardOffset = Min(minShardOffset, shardIndex); @@ -2499,6 +2526,37 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, Oid relationId = relationRestriction->relationId; ShardInterval *shardInterval = NULL; + if (IsDistributedIntermediateResultRTE(relationRestriction->rte)) + { + char *resultId = FindDistributedResultId(relationRestriction->rte); + DistributedResult *distributedResult = GetNamedDistributedResult(resultId); + DistributedResultShard *resultShard = + &(distributedResult->resultShards[shardIndex]); + + if (resultShard->fragmentList == NIL) + { + /* skip generating a relation shard */ + continue; + } + + RelationShard *relationShard = CitusMakeNode(RelationShard); + relationShard->shardedRelationType = SHARDED_RESULT; + relationShard->resultId = resultId; + relationShard->shardId = resultShard->targetShardId; + relationShard->shardIndex = shardIndex; + + if (anchorShardId == INVALID_SHARD_ID) + { + anchorShardId = resultShard->targetShardId; + } + + relationShardList = lappend(relationShardList, relationShard); + + shardInterval = LoadShardInterval(resultShard->targetShardId); + taskShardList = lappend(taskShardList, list_make1(shardInterval)); + continue; + } + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { @@ -2534,6 +2592,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationShard *relationShard = CitusMakeNode(RelationShard); relationShard->relationId = copiedShardInterval->relationId; relationShard->shardId = copiedShardInterval->shardId; + relationShard->shardIndex = shardIndex; relationShardList = lappend(relationShardList, relationShard); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index eaba0ff02..ef279cc26 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -30,6 +30,7 @@ #include "distributed/errormessage.h" #include "distributed/log_utils.h" #include "distributed/insert_select_planner.h" +#include "distributed/intermediate_results.h" #include "distributed/intermediate_result_pruning.h" #include "distributed/metadata_utility.h" #include "distributed/coordinator_protocol.h" @@ -2463,6 +2464,7 @@ RelationShardListForShardIntervalList(List *shardIntervalList, bool *shardsPrese relationShard->relationId = shardInterval->relationId; relationShard->shardId = shardInterval->shardId; + relationShard->shardIndex = shardInterval->shardIndex; relationShardList = lappend(relationShardList, relationShard); } @@ -2638,16 +2640,29 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte RelationRestriction *relationRestriction = (RelationRestriction *) lfirst(restrictionCell); Oid relationId = relationRestriction->relationId; + int shardCount = 0; - if (!IsCitusTable(relationId)) + if (IsCitusTable(relationId)) + { + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); + shardCount = cacheEntry->shardIntervalArrayLength; + } + else if (IsDistributedIntermediateResultRTE(relationRestriction->rte)) + { + char *resultId = FindDistributedResultId(relationRestriction->rte); + DistributedResult *distributedResult = GetNamedDistributedResult(resultId); + int colocationId = distributedResult->colocationId; + + relationId = ColocatedTableId(colocationId); + shardCount = distributedResult->shardCount; + } + else { /* ignore local tables for shard pruning purposes */ continue; } Index tableId = relationRestriction->index; - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - int shardCount = cacheEntry->shardIntervalArrayLength; List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo; List *restrictClauseList = get_all_actual_clauses(baseRestrictionList); List *prunedShardIntervalList = NIL; diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index e2e8be6cd..47f367854 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -725,7 +725,7 @@ FromClauseRecurringTupleType(Query *queryTree) } if (FindNodeMatchingCheckFunctionInRangeTableList(queryTree->rtable, - IsDistributedTableRTE)) + IsDistributedRelationRTE)) { /* * There is a distributed table somewhere in the FROM clause. @@ -1310,7 +1310,7 @@ RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo, Relids relids) RangeTblEntry *rangeTableEntry = plannerInfo->simple_rte_array[relationId]; if (FindNodeMatchingCheckFunctionInRangeTableList(list_make1(rangeTableEntry), - IsDistributedTableRTE)) + IsDistributedRelationRTE)) { /* we already found a distributed table, no need to check further */ return false; @@ -1418,6 +1418,11 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType) { List *functionList = rangeTableEntry->functions; + if (IsDistributedIntermediateResultRTE(rangeTableEntry)) + { + return false; + } + if (list_length(functionList) == 1 && ContainsReadIntermediateResultFunction((Node *) functionList)) { diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index c6dce836e..3e3976c83 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -1560,7 +1560,8 @@ ShouldTransformRTE(RangeTblEntry *rangeTableEntry) */ if (rangeTableEntry->rtekind != RTE_FUNCTION || rangeTableEntry->lateral || - rangeTableEntry->funcordinality) + rangeTableEntry->funcordinality || + IsDistributedIntermediateResultRTE(rangeTableEntry)) { return false; } diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 565e17c11..0f74a480e 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -14,6 +14,7 @@ #include "distributed/colocation_utils.h" #include "distributed/distributed_planner.h" +#include "distributed/intermediate_results.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" @@ -65,7 +66,15 @@ typedef struct AttributeEquivalenceClass */ typedef struct AttributeEquivalenceClassMember { + /* whether the class member is a table or a result */ + ShardedRelationType shardedRelationType; + + /* relation ID (in case of a table) */ Oid relationId; + + /* result ID (in case of a distributed result) */ + char *resultId; + int rteIdentity; Index varno; AttrNumber varattno; @@ -90,6 +99,10 @@ static void AddRteSubqueryToAttributeEquivalenceClass(AttributeEquivalenceClass rangeTableEntry, PlannerInfo *root, Var *varToBeAdded); +static void AddDistResultRteToAttributeEquivalenceClass(AttributeEquivalenceClass ** + attrEquivalenceClass, + RangeTblEntry *rangeTableEntry, + Var *varToBeAdded); static Query * GetTargetSubquery(PlannerInfo *root, RangeTblEntry *rangeTableEntry, Var *varToBeAdded); static void AddUnionAllSetOperationsToAttributeEquivalenceClass( @@ -604,6 +617,17 @@ UniqueRelationCount(RelationRestrictionContext *restrictionContext, CitusTableTy { RelationRestriction *relationRestriction = (RelationRestriction *) lfirst(relationRestrictionCell); + + if (IsDistributedIntermediateResultRTE(relationRestriction->rte)) + { + if (tableType == DISTRIBUTED_TABLE || tableType == ANY_CITUS_TABLE_TYPE) + { + int rteIdentity = GetRTEIdentity(relationRestriction->rte); + rteIdentityList = list_append_unique_int(rteIdentityList, rteIdentity); + } + continue; + } + Oid relationId = relationRestriction->relationId; CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(relationId); @@ -665,10 +689,11 @@ EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList, (RelationRestriction *) lfirst(relationRestrictionCell); int rteIdentity = GetRTEIdentity(relationRestriction->rte); - /* we shouldn't check for the equality of non-distributed tables */ - if (IsCitusTableType(relationRestriction->relationId, + if (relationRestriction->rte->rtekind == RTE_RELATION && + IsCitusTableType(relationRestriction->relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { + /* we shouldn't check for the equality of non-distributed tables */ continue; } @@ -1016,22 +1041,39 @@ GenerateEquivalenceClassForRelationRestriction( { RelationRestriction *relationRestriction = (RelationRestriction *) lfirst(relationRestrictionCell); - Var *relationPartitionKey = DistPartitionKey(relationRestriction->relationId); + int partitionAttributeNumber = 0; - if (relationPartitionKey) + if (IsDistributedIntermediateResultRTE(relationRestriction->rte)) { - eqClassForRelation = palloc0(sizeof(AttributeEquivalenceClass)); - eqMember = palloc0(sizeof(AttributeEquivalenceClassMember)); - eqMember->relationId = relationRestriction->relationId; - eqMember->rteIdentity = GetRTEIdentity(relationRestriction->rte); - eqMember->varno = relationRestriction->index; - eqMember->varattno = relationPartitionKey->varattno; + char *resultId = FindDistributedResultId(relationRestriction->rte); + DistributedResult *distResult = GetNamedDistributedResult(resultId); - eqClassForRelation->equivalentAttributes = - lappend(eqClassForRelation->equivalentAttributes, eqMember); - - break; + partitionAttributeNumber = distResult->partitionColumnIndex + 1; } + else + { + Var *relationPartitionKey = DistPartitionKey(relationRestriction->relationId); + + if (relationPartitionKey == NULL) + { + /* skip reference tables */ + continue; + } + + partitionAttributeNumber = relationPartitionKey->varattno; + } + + eqClassForRelation = palloc0(sizeof(AttributeEquivalenceClass)); + eqMember = palloc0(sizeof(AttributeEquivalenceClassMember)); + eqMember->relationId = relationRestriction->relationId; + eqMember->rteIdentity = GetRTEIdentity(relationRestriction->rte); + eqMember->varno = relationRestriction->index; + eqMember->varattno = partitionAttributeNumber; + + eqClassForRelation->equivalentAttributes = + lappend(eqClassForRelation->equivalentAttributes, eqMember); + + break; } return eqClassForRelation; @@ -1212,6 +1254,12 @@ AddToAttributeEquivalenceClass(AttributeEquivalenceClass **attributeEquivalenceC rangeTableEntry, root, varToBeAdded); } + else if (IsDistributedIntermediateResultRTE(rangeTableEntry)) + { + AddDistResultRteToAttributeEquivalenceClass(attributeEquivalenceClass, + rangeTableEntry, + varToBeAdded); + } } @@ -1483,6 +1531,43 @@ AddRteRelationToAttributeEquivalenceClass(AttributeEquivalenceClass ** } +/* + * AddRteRelationToAttributeEquivalenceClass adds the given var to the given equivalence + * class using the rteIdentity provided by the rangeTableEntry. Note that + * rteIdentities are only assigned to RTE_RELATIONs and this function asserts + * the input rte to be an RTE_RELATION. + */ +static void +AddDistResultRteToAttributeEquivalenceClass(AttributeEquivalenceClass ** + attrEquivalenceClass, + RangeTblEntry *rangeTableEntry, + Var *varToBeAdded) +{ + char *resultId = FindDistributedResultId(rangeTableEntry); + DistributedResult *distributedResult = GetNamedDistributedResult(resultId); + + /* we're only interested in partition columns */ + if (distributedResult->partitionColumnIndex + 1 != varToBeAdded->varattno) + { + return; + } + + AttributeEquivalenceClassMember *attributeEqMember = palloc0( + sizeof(AttributeEquivalenceClassMember)); + + attributeEqMember->varattno = varToBeAdded->varattno; + attributeEqMember->varno = varToBeAdded->varno; + attributeEqMember->rteIdentity = GetRTEIdentity(rangeTableEntry); + attributeEqMember->resultId = pstrdup(resultId); + attributeEqMember->shardedRelationType = SHARDED_RESULT; + + (*attrEquivalenceClass)->equivalentAttributes = + lappend((*attrEquivalenceClass)->equivalentAttributes, + attributeEqMember); +} + + + /* * AttributeClassContainsAttributeClassMember returns true if it the input class member * is already exists in the attributeEquivalenceClass. An equality is identified by the @@ -1728,14 +1813,26 @@ AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictio /* check whether all relations exists in the main restriction list */ foreach_ptr(relationRestriction, restrictionContext->relationRestrictionList) { - Oid relationId = relationRestriction->relationId; + int colocationId = INVALID_COLOCATION_ID; - if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) + if (IsDistributedIntermediateResultRTE(relationRestriction->rte)) { - continue; - } + char *resultId = FindDistributedResultId(relationRestriction->rte); + DistributedResult *distributedResult = GetNamedDistributedResult(resultId); - int colocationId = TableColocationId(relationId); + colocationId = distributedResult->colocationId; + } + else + { + Oid relationId = relationRestriction->relationId; + + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) + { + continue; + } + + colocationId = TableColocationId(relationId); + } if (initialColocationId == INVALID_COLOCATION_ID) { @@ -1937,7 +2034,8 @@ RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int ExtractRangeTableRelationWalker((Node *) rangeTableEntry->subquery, &rangeTableRelationList); } - else if (rangeTableEntry->rtekind == RTE_RELATION) + else if (rangeTableEntry->rtekind == RTE_RELATION || + IsDistributedIntermediateResultRTE(rangeTableEntry)) { ExtractRangeTableRelationWalker((Node *) rangeTableEntry, &rangeTableRelationList); @@ -1952,8 +2050,6 @@ RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int { RangeTblEntry *rteRelation = (RangeTblEntry *) lfirst(rteRelationCell); - Assert(rteRelation->rtekind == RTE_RELATION); - int rteIdentity = GetRTEIdentity(rteRelation); if (bms_is_member(rteIdentity, queryRteIdentities)) { diff --git a/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql b/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql index 43895d047..f514f044f 100644 --- a/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql +++ b/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql @@ -5,3 +5,17 @@ #include "udfs/citus_finish_pg_upgrade/10.0-1.sql" #include "../../columnar/sql/columnar--9.5-1--10.0-1.sql" + +CREATE OR REPLACE FUNCTION pg_catalog.read_distributed_intermediate_result(result_id text) + RETURNS record + LANGUAGE C STRICT VOLATILE PARALLEL SAFE + AS 'MODULE_PATHNAME', $$read_distributed_intermediate_result$$; +COMMENT ON FUNCTION pg_catalog.read_distributed_intermediate_result(text) + IS 'read a previously stored distributed intermediate result'; + +CREATE OR REPLACE FUNCTION pg_catalog.create_distributed_intermediate_result(result_id text, query text, distribution_column int, colocate_with text) + RETURNS void + LANGUAGE C STRICT VOLATILE + AS 'MODULE_PATHNAME', $$create_distributed_intermediate_result$$; +COMMENT ON FUNCTION pg_catalog.create_distributed_intermediate_result(text,text,int,text) + IS 'execute a query and write its results to local result file'; diff --git a/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql b/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql index a86527b1e..32339b6a4 100644 --- a/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql @@ -4,3 +4,6 @@ #include "../udfs/citus_finish_pg_upgrade/9.5-1.sql" #include "../../../columnar/sql/downgrades/columnar--10.0-1--9.5-1.sql" + +DROP FUNCTION pg_catalog.read_distributed_intermediate_result(text); +DROP FUNCTION pg_catalog.create_distributed_intermediate_result(text, text, int, text); diff --git a/src/backend/distributed/test/distributed_intermediate_results.c b/src/backend/distributed/test/distributed_intermediate_results.c index 5c450d88b..d5185dabe 100644 --- a/src/backend/distributed/test/distributed_intermediate_results.c +++ b/src/backend/distributed/test/distributed_intermediate_results.c @@ -150,9 +150,9 @@ redistribute_task_list_results(PG_FUNCTION_ARGS) DISTRIBUTED_TABLE) ? targetRelation->partitionColumn->varattno - 1 : 0; - List **shardResultIds = RedistributeTaskListResults(resultIdPrefix, taskList, - partitionColumnIndex, - targetRelation, binaryFormat); + DistributedResult *distResult = + RedistributeTaskListResults(resultIdPrefix, taskList, partitionColumnIndex, + targetRelation, binaryFormat); TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); @@ -163,10 +163,12 @@ redistribute_task_list_results(PG_FUNCTION_ARGS) ShardInterval *shardInterval = targetRelation->sortedShardIntervalArray[shardIndex]; uint64 shardId = shardInterval->shardId; + DistributedResultShard *resultShard = &(distResult->resultShards[shardIndex]); + List *fragmentList = resultShard->fragmentList; - int fragmentCount = list_length(shardResultIds[shardIndex]); + int fragmentCount = list_length(fragmentList); Datum *resultIdValues = palloc0(fragmentCount * sizeof(Datum)); - List *sortedResultIds = SortList(shardResultIds[shardIndex], pg_qsort_strcmp); + List *sortedResultIds = SortList(fragmentList, pg_qsort_strcmp); const char *resultId = NULL; int resultIdIndex = 0; diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 1999c836c..7da2540a3 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -271,6 +271,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) */ DeallocateReservedConnections(); + ClearNamedDistributedResultsHash(); + UnSetDistributedTransactionId(); /* empty the CommitContext to ensure we're not leaking memory */ @@ -330,6 +332,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) */ DeallocateReservedConnections(); + ClearNamedDistributedResultsHash(); + /* * We reset these mainly for posterity. The only way we would normally * get here with ExecutorLevel or PlannerLevel > 0 is during a fatal diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 24caf6a8d..0e0038b6b 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -250,8 +250,11 @@ CopyNodeRelationShard(COPYFUNC_ARGS) { DECLARE_FROM_AND_NEW_NODE(RelationShard); + COPY_SCALAR_FIELD(shardedRelationType); COPY_SCALAR_FIELD(relationId); COPY_SCALAR_FIELD(shardId); + COPY_STRING_FIELD(resultId); + COPY_SCALAR_FIELD(shardIndex); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index b02c7daf7..335d27f94 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -459,8 +459,11 @@ OutRelationShard(OUTFUNC_ARGS) WRITE_LOCALS(RelationShard); WRITE_NODE_TYPE("RELATIONSHARD"); + WRITE_ENUM_FIELD(shardedRelationType, ShardedRelationType); WRITE_OID_FIELD(relationId); WRITE_UINT64_FIELD(shardId); + WRITE_STRING_FIELD(resultId); + WRITE_INT_FIELD(shardIndex); } diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 2a0433e07..8019e1522 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -117,11 +117,32 @@ typedef struct PlannerRestrictionContext MemoryContext memoryContext; } PlannerRestrictionContext; +/* type of relation in a relation shard */ +typedef enum ShardedRelationType +{ + SHARDED_TABLE, + SHARDED_RESULT +} ShardedRelationType; + +/* mapping from relation to shard */ typedef struct RelationShard { CitusNode type; + + /* type of relation (table or result) */ + ShardedRelationType shardedRelationType; + + /* OID of the relation (InvalidOid in case of result) */ Oid relationId; + + /* shard ID in the distributed table */ uint64 shardId; + + /* name of the result (NULL in case of table) */ + char *resultId; + + /* index of the shard in a sorted shard interval array */ + int shardIndex; } RelationShard; typedef struct RelationRowLock diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index d7ebf15bf..ebbfe9e5d 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -23,7 +23,8 @@ /* - * DistributedResultFragment represents a fragment of a distributed result. + * DistributedResultFragment represents a fragment of a distributed result + * shard. */ typedef struct DistributedResultFragment { @@ -34,7 +35,7 @@ typedef struct DistributedResultFragment uint32 nodeId; /* number of rows in the result file */ - int rowCount; + uint64 rowCount; /* * The fragment contains the rows which match the partitioning method @@ -47,6 +48,72 @@ typedef struct DistributedResultFragment int targetShardIndex; } DistributedResultFragment; +/* + * DistributedResultState describes whether a distributed result is planned + * or already executed. + */ +typedef enum DistributedResultState +{ + DISTRIBUTED_RESULT_PLANNED, + DISTRIBUTED_RESULT_AVAILABLE +} DistributedResultState; + + +/* + * DistributedResultShard represents a shard of a distributed result. A shard + * can consist of multiple fragments, which are intermediate results that are + * no the same node. + */ +typedef struct DistributedResultShard +{ + /* what is the index of targetShardId in its relation's sorted shard list? */ + int targetShardIndex; + + /* + * The fragment contains the rows which match the partitioning method + * and partitioning ranges of targetShardId. The shape of each row matches + * the schema of the relation to which targetShardId belongs to. + */ + uint64 targetShardId; + + /* result ids of fragments that make up the shard (if result is available) */ + List *fragmentList; + + /* sum of the number of rows in the fragments (if result is available) */ + int64 rowCount; + +} DistributedResultShard; + + +/* + * DistributedResult describes a distributed intermediate result which can be + * queried like a distributed table. + * + * A distributed intermediate result has a set of distributed result fragment + * for each shard. + */ +typedef struct DistributedResult +{ + /* state of this distributed result (planner or executed) */ + DistributedResultState state; + + /* co-location ID with which the result is co-located */ + int colocationId; + + /* number of shards in the co-location group */ + int shardCount; + + /* index of the partition column */ + int partitionColumnIndex; + + /* whether the file format is binary */ + bool binaryFormat; + + /* array containing a list of result shards */ + DistributedResultShard *resultShards; + +} DistributedResult; + /* intermediate_results.c */ extern DestReceiver * CreateRemoteFileDestReceiver(const char *resultId, @@ -62,11 +129,17 @@ extern char * QueryResultFileName(const char *resultId); extern char * CreateIntermediateResultsDirectory(void); /* distributed_intermediate_results.c */ -extern List ** RedistributeTaskListResults(const char *resultIdPrefix, - List *selectTaskList, - int partitionColumnIndex, - CitusTableCacheEntry *targetRelation, - bool binaryFormat); +extern DistributedResult * RegisterDistributedResult(char *resultIdPrefix, Query *query, + int partitionColumnIndex, + Oid targetRelationId); +extern DistributedResult * GetNamedDistributedResult(char *resultId); +extern void ClearNamedDistributedResultsHash(void); +extern DistributedResult * RedistributeTaskListResults(const char *resultIdPrefix, + List *selectTaskList, + int partitionColumnIndex, + CitusTableCacheEntry * + targetRelation, + bool binaryFormat); extern List * PartitionTasklistResults(const char *resultIdPrefix, List *selectTaskList, int partitionColumnIndex, CitusTableCacheEntry *distributionScheme, diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 7303e55fe..b35f96bea 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -230,6 +230,7 @@ extern Oid CitusCopyFormatTypeId(void); /* function oids */ extern Oid CitusReadIntermediateResultFuncId(void); Oid CitusReadIntermediateResultArrayFuncId(void); +extern Oid CitusReadDistributedIntermediateResultFuncId(void); extern Oid CitusExtraDataContainerFuncId(void); extern Oid CitusAnyValueFunctionId(void); extern Oid PgTableVisibleFuncId(void); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 993e8b819..dbc2107f4 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -196,12 +196,15 @@ extern bool FindNodeMatchingCheckFunctionInRangeTableList(List *rtable, bool (*c Node *)); extern bool IsCitusTableRTE(Node *node); extern bool IsDistributedTableRTE(Node *node); +extern bool IsDistributedRelationRTE(Node *node); extern bool IsReferenceTableRTE(Node *node); extern bool QueryContainsDistributedTableRTE(Query *query); extern bool IsCitusExtraDataContainerRelation(RangeTblEntry *rte); extern bool ContainsReadIntermediateResultFunction(Node *node); extern bool ContainsReadIntermediateResultArrayFunction(Node *node); +extern bool IsDistributedIntermediateResultRTE(RangeTblEntry *rangeTableEntry); extern char * FindIntermediateResultIdIfExists(RangeTblEntry *rte); +extern char * FindDistributedResultId(RangeTblEntry *rte); extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode); extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 7c72761d0..6393c366e 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -475,19 +475,21 @@ SELECT * FROM print_extension_changes(); -- Snapshot of state at 10.0-1 ALTER EXTENSION citus UPDATE TO '10.0-1'; SELECT * FROM print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- | access method columnar | function alter_columnar_table_reset(regclass,boolean,boolean,boolean) | function alter_columnar_table_set(regclass,integer,integer,name) | function citus_internal.cstore_ensure_objects_exist() + | function create_distributed_intermediate_result(text,text,integer,text) | function cstore.columnar_handler(internal) + | function read_distributed_intermediate_result(text) | schema cstore | table cstore.cstore_data_files | table cstore.cstore_skipnodes | table cstore.cstore_stripes | table cstore.options -(10 rows) +(12 rows) DROP TABLE prev_objects, extension_diff; -- show running version