Introduce distributed intermediate result infrastructure

marcocitus/dirt
Marco Slot 2020-11-29 15:07:48 +01:00
parent 7f43804dae
commit d11f6eb4dd
22 changed files with 870 additions and 81 deletions

View File

@ -21,7 +21,9 @@
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/tupdesc.h" #include "access/tupdesc.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
#include "distributed/insert_select_executor.h"
#include "distributed/intermediate_results.h" #include "distributed/intermediate_results.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/metadata_utility.h" #include "distributed/metadata_utility.h"
@ -78,7 +80,29 @@ typedef struct NodeToNodeFragmentsTransfer
} NodeToNodeFragmentsTransfer; } NodeToNodeFragmentsTransfer;
/*
* NamedDistributedResult represents a distributed intermediate result with a
* name that can be queried via read_intermediate_distributed_result.
*/
typedef struct NamedDistributedResult
{
/* name of the distributed intermediate result */
char resultId[NAMEDATALEN];
/* pointer to the distributed intermediate result */
DistributedResult *distributedResult;
} NamedDistributedResult;
/* hash of distributed intermediate results in the current transaction */
static HTAB *NamedDistributedResults = NULL;
/* forward declarations of local functions */ /* forward declarations of local functions */
static void CreateDistributedResult(char *resultIdPrefix, Query *query,
int partitionColumnIndex, Oid targetRelationId);
static void InitializeNamedDistributedResultsHash(void);
static List * WrapTasksForPartitioning(const char *resultIdPrefix, static List * WrapTasksForPartitioning(const char *resultIdPrefix,
List *selectTaskList, List *selectTaskList,
int partitionColumnIndex, int partitionColumnIndex,
@ -107,8 +131,9 @@ static DistributedResultFragment * TupleToDistributedResultFragment(HeapTuple he
static void ExecuteSelectTasksIntoTupleDest(List *taskList, static void ExecuteSelectTasksIntoTupleDest(List *taskList,
TupleDestination *tupleDestination, TupleDestination *tupleDestination,
bool errorOnAnyFailure); bool errorOnAnyFailure);
static List ** ColocateFragmentsWithRelation(List *fragmentList, static void ColocateFragmentsWithRelation(List *fragmentList,
CitusTableCacheEntry *targetRelation); CitusTableCacheEntry *targetRelation,
DistributedResultShard *resultShards);
static List * ColocationTransfers(List *fragmentList, static List * ColocationTransfers(List *fragmentList,
CitusTableCacheEntry *targetRelation); CitusTableCacheEntry *targetRelation);
static List * FragmentTransferTaskList(List *fragmentListTransfers); static List * FragmentTransferTaskList(List *fragmentListTransfers);
@ -117,21 +142,238 @@ static char * QueryStringForFragmentsTransfer(
static void ExecuteFetchTaskList(List *fetchTaskList); static void ExecuteFetchTaskList(List *fetchTaskList);
PG_FUNCTION_INFO_V1(read_distributed_intermediate_result);
PG_FUNCTION_INFO_V1(create_distributed_intermediate_result);
/*
* read_distributed_intermediate_result is a placeholder for reading from
* temporary distributed results.
*/
Datum
read_distributed_intermediate_result(PG_FUNCTION_ARGS)
{
ereport(ERROR, (errmsg("read_distributed_intermediate_result is a placeholder for "
"reading from temporary distributed results")));
}
/*
* create_distributed_intermediate_result executes a query and writes
* the result into a distributed intermediate result.
*/
Datum
create_distributed_intermediate_result(PG_FUNCTION_ARGS)
{
text *resultIdText = PG_GETARG_TEXT_P(0);
char *resultIdPrefix = text_to_cstring(resultIdText);
text *queryText = PG_GETARG_TEXT_P(1);
char *queryString = text_to_cstring(queryText);
int columnIndex = PG_GETARG_INT32(2);
text *colocateWithText = PG_GETARG_TEXT_P(3);
Oid targetRelationId = ResolveRelationId(colocateWithText, false);
CheckCitusVersion(ERROR);
if (!IsCitusTableType(targetRelationId, DISTRIBUTED_TABLE))
{
ereport(ERROR, (errmsg("result can only be co-located with a distributed "
"table")));
}
Query *query = ParseQueryString(queryString, NULL, 0);
CreateDistributedResult(resultIdPrefix, query, columnIndex, targetRelationId);
PG_RETURN_VOID();
}
/*
* RegisterDistributedResult registers a named distributed intermediate result
* for use in the planner.
*/
DistributedResult *
RegisterDistributedResult(char *resultIdPrefix, Query *query, int partitionColumnIndex,
Oid targetRelationId)
{
InitializeNamedDistributedResultsHash();
/*
* Make sure that this transaction has a distributed transaction ID.
*
* Intermediate results will be stored in a directory that is derived
* from the distributed transaction ID.
*/
UseCoordinatedTransaction();
/* look in the hash first to do error checking early */
bool found = false;
NamedDistributedResult *namedDistributedResult =
hash_search(NamedDistributedResults, resultIdPrefix, HASH_ENTER, &found);
if (found)
{
ereport(ERROR, (errmsg("A distributed intermediate result named \"%s\" already "
"exists in the current transaction", resultIdPrefix)));
}
CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId);
int shardCount = targetRelation->shardIntervalArrayLength;
bool binaryFormat = CanUseBinaryCopyFormatForTargetList(query->targetList);
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
DistributedResult *distributedResult = palloc0(sizeof(DistributedResult));
distributedResult->state = DISTRIBUTED_RESULT_PLANNED;
distributedResult->shardCount = shardCount;
distributedResult->colocationId = targetRelation->colocationId;
distributedResult->binaryFormat = binaryFormat;
distributedResult->partitionColumnIndex = partitionColumnIndex;
distributedResult->resultShards =
palloc0(sizeof(DistributedResultShard) * shardCount);
ShardInterval **shardIntervalArray = targetRelation->sortedShardIntervalArray;
for(int targetShardIndex = 0; targetShardIndex < shardCount; targetShardIndex++)
{
DistributedResultShard *resultShard =
&(distributedResult->resultShards[targetShardIndex]);
ShardInterval *targetShardInterval = shardIntervalArray[targetShardIndex];
resultShard->targetShardId = targetShardInterval->shardId;
resultShard->targetShardIndex = targetShardIndex;
resultShard->fragmentList = NIL;
resultShard->rowCount = -1;
}
namedDistributedResult->distributedResult = distributedResult;
MemoryContextSwitchTo(oldContext);
return distributedResult;
}
/*
* CreateDistributedResult creates a distributed intermediate
* result by executing the query, reshuffling the results, and then storing
* the results in the global hash map.
*/
static void
CreateDistributedResult(char *resultIdPrefix, Query *query, int partitionColumnIndex,
Oid targetRelationId)
{
DistributedResult *distResult = RegisterDistributedResult(resultIdPrefix, query,
partitionColumnIndex,
targetRelationId);
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
ParamListInfo params = NULL;
PlannedStmt *selectPlan = pg_plan_query_compat(query, NULL, cursorOptions,
params);
if (!IsRedistributablePlan(selectPlan->planTree) ||
!IsSupportedRedistributionTarget(targetRelationId))
{
ereport(ERROR, (errmsg("query cannot be re-partitioned"),
errhint("use create_intermediate_result instead")));
}
DistributedPlan *distSelectPlan =
GetDistributedPlan((CustomScan *) selectPlan->planTree);
Job *distSelectJob = distSelectPlan->workerJob;
List *taskList = distSelectJob->taskList;
CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId);
bool binaryFormat = CanUseBinaryCopyFormatForTargetList(query->targetList);
List *fragmentList = PartitionTasklistResults(resultIdPrefix, taskList,
partitionColumnIndex,
targetRelation, binaryFormat);
ColocateFragmentsWithRelation(fragmentList, targetRelation,
distResult->resultShards);
distResult->state = DISTRIBUTED_RESULT_AVAILABLE;
}
/*
* GetNamedDistributedResult returns a named distributed result, allocated in the
* top transaction context.
*/
DistributedResult *
GetNamedDistributedResult(char *resultId)
{
if (NamedDistributedResults == NULL)
{
ereport(ERROR, (errmsg("distributed intermediate result \"%s\" does not exist",
resultId)));
}
bool found = false;
NamedDistributedResult *namedDistributedResult =
hash_search(NamedDistributedResults, resultId, HASH_FIND, &found);
if (!found)
{
ereport(ERROR, (errmsg("distributed intermediate result \"%s\" does not exist",
resultId)));
}
return namedDistributedResult->distributedResult;
}
/*
* InitializeNamedDistributedResultsHash creates the NamedDistributedResult
* hash if not already created. The hash is kept until the end of the transaction.
*/
static void
InitializeNamedDistributedResultsHash(void)
{
if (NamedDistributedResults != NULL)
{
return;
}
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = NAMEDATALEN;
info.entrysize = sizeof(NamedDistributedResult);
info.hash = string_hash;
info.hcxt = TopTransactionContext;
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
NamedDistributedResults = hash_create("named distributed results hash", 8, &info,
hashFlags);
}
/*
* ClearNamedDistributedResultsHash resets the distributed intermediate results
* hash. The memory is allocated in the top transaction context and will automatically
* be freed.
*/
void
ClearNamedDistributedResultsHash(void)
{
NamedDistributedResults = NULL;
}
/* /*
* RedistributeTaskListResults partitions the results of given task list using * RedistributeTaskListResults partitions the results of given task list using
* shard ranges and partition method of given targetRelation, and then colocates * shard ranges and partition method of given targetRelation, and then colocates
* the result files with shards. * the result files with shards.
* *
* If a shard has a replication factor > 1, corresponding result files are copied
* to all nodes containing that shard.
*
* returnValue[shardIndex] is list of cstrings each of which is a resultId which
* correspond to targetRelation->sortedShardIntervalArray[shardIndex].
*
* partitionColumnIndex determines the column in the selectTaskList to use for * partitionColumnIndex determines the column in the selectTaskList to use for
* partitioning. * partitioning.
*
* If a shard has a replication factor > 1, corresponding result files are copied
* to all nodes containing that shard.
*/ */
List ** DistributedResult *
RedistributeTaskListResults(const char *resultIdPrefix, List *selectTaskList, RedistributeTaskListResults(const char *resultIdPrefix, List *selectTaskList,
int partitionColumnIndex, int partitionColumnIndex,
CitusTableCacheEntry *targetRelation, CitusTableCacheEntry *targetRelation,
@ -148,7 +390,21 @@ RedistributeTaskListResults(const char *resultIdPrefix, List *selectTaskList,
List *fragmentList = PartitionTasklistResults(resultIdPrefix, selectTaskList, List *fragmentList = PartitionTasklistResults(resultIdPrefix, selectTaskList,
partitionColumnIndex, partitionColumnIndex,
targetRelation, binaryFormat); targetRelation, binaryFormat);
return ColocateFragmentsWithRelation(fragmentList, targetRelation);
DistributedResult *distributedResult = palloc0(sizeof(DistributedResult));
distributedResult->state = DISTRIBUTED_RESULT_AVAILABLE;
distributedResult->shardCount = targetRelation->shardIntervalArrayLength;
distributedResult->colocationId = targetRelation->colocationId;
distributedResult->partitionColumnIndex = partitionColumnIndex;
distributedResult->binaryFormat = binaryFormat;
distributedResult->resultShards =
palloc0(sizeof(DistributedResultShard) * distributedResult->shardCount);
ColocateFragmentsWithRelation(fragmentList, targetRelation,
distributedResult->resultShards);
return distributedResult;
} }
@ -504,28 +760,38 @@ ExecuteSelectTasksIntoTupleDest(List *taskList, TupleDestination *tupleDestinati
* targetRelation->sortedShardIntervalArray[shardIndex] after fetch tasks are * targetRelation->sortedShardIntervalArray[shardIndex] after fetch tasks are
* done. * done.
*/ */
static List ** static void
ColocateFragmentsWithRelation(List *fragmentList, CitusTableCacheEntry *targetRelation) ColocateFragmentsWithRelation(List *fragmentList, CitusTableCacheEntry *targetRelation,
DistributedResultShard *resultShards)
{ {
List *fragmentListTransfers = ColocationTransfers(fragmentList, targetRelation); List *fragmentListTransfers = ColocationTransfers(fragmentList, targetRelation);
List *fragmentTransferTaskList = FragmentTransferTaskList(fragmentListTransfers); List *fragmentTransferTaskList = FragmentTransferTaskList(fragmentListTransfers);
ExecuteFetchTaskList(fragmentTransferTaskList); ExecuteFetchTaskList(fragmentTransferTaskList);
int shardCount = targetRelation->shardIntervalArrayLength; MemoryContext shardsContext = GetMemoryChunkContext(resultShards);
List **shardResultIdList = palloc0(shardCount * sizeof(List *));
DistributedResultFragment *sourceFragment = NULL; DistributedResultFragment *sourceFragment = NULL;
foreach_ptr(sourceFragment, fragmentList) foreach_ptr(sourceFragment, fragmentList)
{ {
int shardIndex = sourceFragment->targetShardIndex; int shardIndex = sourceFragment->targetShardIndex;
Assert(shardIndex < shardCount); Assert(shardIndex < targetRelation->shardIntervalArrayLength);
shardResultIdList[shardIndex] = lappend(shardResultIdList[shardIndex],
sourceFragment->resultId);
}
return shardResultIdList; DistributedResultShard *resultShard = &(resultShards[shardIndex]);
resultShard->targetShardIndex = shardIndex;
resultShard->targetShardId = sourceFragment->targetShardId;
resultShard->rowCount += sourceFragment->rowCount;
/* use the named distributed intermediate result context where needed */
MemoryContext oldContext = MemoryContextSwitchTo(shardsContext);
/* assign fragment to distributed result shard */
resultShard->fragmentList = lappend(resultShard->fragmentList,
pstrdup(sourceFragment->resultId));
MemoryContextSwitchTo(oldContext);
}
} }

View File

@ -70,7 +70,7 @@ static List * BuildColumnNameListFromTargetList(Oid targetRelationId,
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList); static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery, static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery,
CitusTableCacheEntry *targetRelation, CitusTableCacheEntry *targetRelation,
List **redistributedResults, DistributedResult *redistributedResult,
bool useBinaryFormat); bool useBinaryFormat);
static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn); static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn);
static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries); static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries);
@ -181,11 +181,10 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
WrapTaskListForProjection(distSelectTaskList, projectedTargetEntries); WrapTaskListForProjection(distSelectTaskList, projectedTargetEntries);
} }
List **redistributedResults = RedistributeTaskListResults(distResultPrefix, DistributedResult *redistributedResult =
distSelectTaskList, RedistributeTaskListResults(distResultPrefix, distSelectTaskList,
partitionColumnIndex, partitionColumnIndex, targetRelation,
targetRelation, binaryFormat);
binaryFormat);
/* /*
* At this point select query has been executed on workers and results * At this point select query has been executed on workers and results
@ -195,7 +194,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
*/ */
List *taskList = RedistributedInsertSelectTaskList(insertSelectQuery, List *taskList = RedistributedInsertSelectTaskList(insertSelectQuery,
targetRelation, targetRelation,
redistributedResults, redistributedResult,
binaryFormat); binaryFormat);
scanState->tuplestorestate = scanState->tuplestorestate =
@ -638,7 +637,7 @@ IsSupportedRedistributionTarget(Oid targetRelationId)
static List * static List *
RedistributedInsertSelectTaskList(Query *insertSelectQuery, RedistributedInsertSelectTaskList(Query *insertSelectQuery,
CitusTableCacheEntry *targetRelation, CitusTableCacheEntry *targetRelation,
List **redistributedResults, DistributedResult *redistributedResult,
bool useBinaryFormat) bool useBinaryFormat)
{ {
List *taskList = NIL; List *taskList = NIL;
@ -663,7 +662,9 @@ RedistributedInsertSelectTaskList(Query *insertSelectQuery,
{ {
ShardInterval *targetShardInterval = ShardInterval *targetShardInterval =
targetRelation->sortedShardIntervalArray[shardOffset]; targetRelation->sortedShardIntervalArray[shardOffset];
List *resultIdList = redistributedResults[targetShardInterval->shardIndex]; DistributedResultShard *resultShard =
&(redistributedResult->resultShards[shardOffset]);
List *resultIdList = resultShard->fragmentList;
uint64 shardId = targetShardInterval->shardId; uint64 shardId = targetShardInterval->shardId;
StringInfo queryString = makeStringInfo(); StringInfo queryString = makeStringInfo();

View File

@ -153,6 +153,7 @@ typedef struct MetadataCacheData
Oid copyFormatTypeId; Oid copyFormatTypeId;
Oid readIntermediateResultFuncId; Oid readIntermediateResultFuncId;
Oid readIntermediateResultArrayFuncId; Oid readIntermediateResultArrayFuncId;
Oid readDistributedIntermediateResultFuncId;
Oid extraDataContainerFuncId; Oid extraDataContainerFuncId;
Oid workerHashFunctionId; Oid workerHashFunctionId;
Oid anyValueFunctionId; Oid anyValueFunctionId;
@ -2259,6 +2260,26 @@ CitusReadIntermediateResultArrayFuncId(void)
} }
/* return oid of the read_distributed_intermediate_result(text) function */
Oid
CitusReadDistributedIntermediateResultFuncId(void)
{
if (MetadataCache.readDistributedIntermediateResultFuncId == InvalidOid)
{
char *functionName = "read_distributed_intermediate_result";
List *functionNameList = list_make2(makeString("pg_catalog"),
makeString(functionName));
Oid paramOids[1] = { TEXTOID };
bool missingOK = true;
MetadataCache.readDistributedIntermediateResultFuncId =
LookupFuncName(functionNameList, 1, paramOids, missingOK);
}
return MetadataCache.readDistributedIntermediateResultFuncId;
}
/* return oid of the citus.copy_format enum type */ /* return oid of the citus.copy_format enum type */
Oid Oid
CitusCopyFormatTypeId(void) CitusCopyFormatTypeId(void)

View File

@ -15,10 +15,12 @@
#include "access/heapam.h" #include "access/heapam.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "catalog/pg_constraint.h" #include "catalog/pg_constraint.h"
#include "catalog/pg_type.h"
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
#include "distributed/insert_select_planner.h" #include "distributed/insert_select_planner.h"
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/local_executor.h" #include "distributed/local_executor.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -43,6 +45,10 @@ static void UpdateTaskQueryString(Query *query, Oid distributedTableId,
static bool ReplaceRelationConstraintByShardConstraint(List *relationShardList, static bool ReplaceRelationConstraintByShardConstraint(List *relationShardList,
OnConflictExpr *onConflict); OnConflictExpr *onConflict);
static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList); static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList);
static void ConvertReadDistributedResultForShard(RangeTblEntry *rte,
List *relationShardList);
static RelationShard * FindDistributedResultRelationShard(List *relationShardList,
char *resultId);
static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
static bool ShouldLazyDeparseQuery(Task *task); static bool ShouldLazyDeparseQuery(Task *task);
static char * DeparseTaskQuery(Task *task, Query *query); static char * DeparseTaskQuery(Task *task, Query *query);
@ -229,6 +235,12 @@ UpdateRelationToShardNames(Node *node, List *relationShardList)
RangeTblEntry *newRte = (RangeTblEntry *) node; RangeTblEntry *newRte = (RangeTblEntry *) node;
if (IsDistributedIntermediateResultRTE(newRte))
{
ConvertReadDistributedResultForShard(newRte, relationShardList);
return false;
}
if (newRte->rtekind != RTE_RELATION) if (newRte->rtekind != RTE_RELATION)
{ {
return false; return false;
@ -384,6 +396,89 @@ ReplaceRelationConstraintByShardConstraint(List *relationShardList,
} }
/*
* ConvertReadDistributedResultForShard converts a
* read_distributed_intermediate_result('<result id>') call to a
* read_intermediate_result(ARRAY[..fragments...]) call for the fragments belonging to
* a particular shard.
*
* The shard is obtained from the relationShardList.
*/
static void
ConvertReadDistributedResultForShard(RangeTblEntry *rte,
List *relationShardList)
{
char *resultId = FindDistributedResultId(rte);
DistributedResult *distributedResult = GetNamedDistributedResult(resultId);
List *sortedResultIds = NIL;
RelationShard *relationShard = FindDistributedResultRelationShard(relationShardList,
resultId);
if (relationShard != NULL)
{
int shardIndex = relationShard->shardIndex;
DistributedResultShard *resultShard =
&(distributedResult->resultShards[shardIndex]);
List *resultIdList = resultShard->fragmentList;
/* sort result ids for consistent test output */
sortedResultIds = SortList(resultIdList, pg_qsort_strcmp);
}
else
{
/* no matching relation shard, use empty array */
}
bool useBinaryFormat = distributedResult->binaryFormat;
/* generate the query on the intermediate result */
RangeTblFunction *rangeTableFunction =
(RangeTblFunction *) linitial(rte->functions);
/* build read_intermediate_result call */
Const *resultIdConst = makeNode(Const);
resultIdConst->consttype = TEXTARRAYOID;
resultIdConst->consttypmod = -1;
resultIdConst->constlen = -1;
resultIdConst->constvalue = PointerGetDatum(strlist_to_textarray(sortedResultIds));
resultIdConst->constbyval = false;
resultIdConst->constisnull = false;
resultIdConst->location = -1;
Oid copyFormatId = BinaryCopyFormatId();
if (!useBinaryFormat)
{
copyFormatId = TextCopyFormatId();
}
Const *resultFormatConst = makeNode(Const);
resultFormatConst->consttype = CitusCopyFormatTypeId();
resultFormatConst->consttypmod = -1;
resultFormatConst->constlen = 4;
resultFormatConst->constvalue = ObjectIdGetDatum(copyFormatId);
resultFormatConst->constbyval = true;
resultFormatConst->constisnull = false;
resultFormatConst->location = -1;
/* build the call to read_intermediate_result */
FuncExpr *funcExpr = makeNode(FuncExpr);
funcExpr->funcid = CitusReadIntermediateResultArrayFuncId();
funcExpr->funcretset = true;
funcExpr->funcvariadic = false;
funcExpr->funcformat = 0;
funcExpr->funccollid = 0;
funcExpr->inputcollid = 0;
funcExpr->location = -1;
funcExpr->args = list_make2(resultIdConst, resultFormatConst);
/* replace function expression in RTE */
rangeTableFunction->funcexpr = (Node *) funcExpr;
}
/* /*
* FindRelationShard finds the RelationShard for shard relation with * FindRelationShard finds the RelationShard for shard relation with
* given Oid if exists in given relationShardList. Otherwise, returns NULL. * given Oid if exists in given relationShardList. Otherwise, returns NULL.
@ -410,6 +505,29 @@ FindRelationShard(Oid inputRelationId, List *relationShardList)
} }
/*
* FindDistributedResultRelationShard finds a relation shard for a distributed
* result with the name <resultId>, or NULL if it's not in the list.
*/
static RelationShard *
FindDistributedResultRelationShard(List *relationShardList, char *resultId)
{
RelationShard *relationShard = NULL;
foreach_ptr(relationShard, relationShardList)
{
if (relationShard->shardedRelationType == SHARDED_RESULT &&
strncmp(relationShard->resultId, resultId, NAMEDATALEN) == 0)
{
return relationShard;
}
}
return NULL;
}
/* /*
* ConvertRteToSubqueryWithEmptyResult converts given relation RTE into * ConvertRteToSubqueryWithEmptyResult converts given relation RTE into
* subquery RTE that returns no results. * subquery RTE that returns no results.

View File

@ -331,6 +331,11 @@ ListContainsDistributedTableRTE(List *rangeTableList)
{ {
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
if (IsDistributedIntermediateResultRTE(rangeTableEntry))
{
return true;
}
if (rangeTableEntry->rtekind != RTE_RELATION) if (rangeTableEntry->rtekind != RTE_RELATION)
{ {
continue; continue;
@ -375,7 +380,8 @@ AssignRTEIdentities(List *rangeTableList, int rteIdCounter)
* Note that we're only interested in RTE_RELATIONs and thus assigning * Note that we're only interested in RTE_RELATIONs and thus assigning
* identifiers to those RTEs only. * identifiers to those RTEs only.
*/ */
if (rangeTableEntry->rtekind == RTE_RELATION && if ((rangeTableEntry->rtekind == RTE_RELATION ||
IsDistributedIntermediateResultRTE(rangeTableEntry)) &&
rangeTableEntry->values_lists == NIL) rangeTableEntry->values_lists == NIL)
{ {
AssignRTEIdentity(rangeTableEntry, rteIdCounter++); AssignRTEIdentity(rangeTableEntry, rteIdCounter++);
@ -446,8 +452,6 @@ AdjustPartitioningForDistributedPlanning(List *rangeTableList,
static void static void
AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier) AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier)
{ {
Assert(rangeTableEntry->rtekind == RTE_RELATION);
rangeTableEntry->values_lists = list_make1_int(rteIdentifier); rangeTableEntry->values_lists = list_make1_int(rteIdentifier);
} }
@ -456,7 +460,6 @@ AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier)
int int
GetRTEIdentity(RangeTblEntry *rte) GetRTEIdentity(RangeTblEntry *rte)
{ {
Assert(rte->rtekind == RTE_RELATION);
Assert(rte->values_lists != NIL); Assert(rte->values_lists != NIL);
Assert(IsA(rte->values_lists, IntList)); Assert(IsA(rte->values_lists, IntList));
Assert(list_length(rte->values_lists) == 1); Assert(list_length(rte->values_lists) == 1);
@ -1767,6 +1770,11 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
{ {
CitusTableCacheEntry *cacheEntry = NULL; CitusTableCacheEntry *cacheEntry = NULL;
if (!CitusHasBeenLoaded())
{
return;
}
if (ReplaceCitusExtraDataContainer && IsCitusExtraDataContainerRelation(rte)) if (ReplaceCitusExtraDataContainer && IsCitusExtraDataContainerRelation(rte))
{ {
/* /*
@ -1789,7 +1797,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
AdjustReadIntermediateResultCost(rte, relOptInfo); AdjustReadIntermediateResultCost(rte, relOptInfo);
AdjustReadIntermediateResultArrayCost(rte, relOptInfo); AdjustReadIntermediateResultArrayCost(rte, relOptInfo);
if (rte->rtekind != RTE_RELATION) if (rte->rtekind != RTE_RELATION && !IsDistributedIntermediateResultRTE(rte))
{ {
return; return;
} }

View File

@ -346,6 +346,24 @@ IsDistributedTableRTE(Node *node)
} }
/*
* IsDistributedRelationRTE returns true if the given range table entry
* describes a distributed table or a distributed intermediate result.
*/
bool
IsDistributedRelationRTE(Node *node)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
if (!IsA(rangeTableEntry, RangeTblEntry))
{
return false;
}
return IsDistributedTableRTE((Node *) rangeTableEntry) ||
IsDistributedIntermediateResultRTE(rangeTableEntry);
}
/* /*
* IsReferenceTableRTE gets a node and returns true if the node * IsReferenceTableRTE gets a node and returns true if the node
* is a range table relation entry that points to a reference table. * is a range table relation entry that points to a reference table.
@ -678,6 +696,26 @@ MultiNodeTree(Query *queryTree)
} }
/*
* IsDistributedIntermediateResultRTE returns whether an RTE describes a
* FROM read_distributed_intermediate_result() call.
*/
bool
IsDistributedIntermediateResultRTE(RangeTblEntry *rangeTableEntry)
{
if (rangeTableEntry->rtekind != RTE_FUNCTION)
{
return false;
}
RangeTblFunction *rangeTableFunction = (RangeTblFunction *) linitial(
rangeTableEntry->functions);
FuncExpr *funcExpression = (FuncExpr *) rangeTableFunction->funcexpr;
return funcExpression->funcid == CitusReadDistributedIntermediateResultFuncId();
}
/* /*
* ContainsReadIntermediateResultFunction determines whether an expresion tree contains * ContainsReadIntermediateResultFunction determines whether an expresion tree contains
* a call to the read_intermediate_result function. * a call to the read_intermediate_result function.
@ -817,6 +855,38 @@ FindIntermediateResultIdIfExists(RangeTblEntry *rte)
} }
/*
* FindDistributedResultId extracts the result ID from a
* read_distributed_intermediate_result call.
*/
char *
FindDistributedResultId(RangeTblEntry *rte)
{
if (rte->rtekind != RTE_FUNCTION)
{
ereport(ERROR, (errmsg("not a read_distributed_intermediate_result RTE")));
}
List *functionList = rte->functions;
RangeTblFunction *rangeTblfunction = (RangeTblFunction *) linitial(functionList);
FuncExpr *funcExpr = (FuncExpr *) rangeTblfunction->funcexpr;
if (funcExpr->funcid != CitusReadDistributedIntermediateResultFuncId())
{
ereport(ERROR, (errmsg("not a read_distributed_intermediate_result RTE")));
}
Const *resultIdConst = linitial(funcExpr->args);
if (resultIdConst->constisnull)
{
ereport(ERROR, (errmsg("distributed intermediate result ID cannot be NULL")));
}
return TextDatumGetCString(resultIdConst->constvalue);
}
/* /*
* ErrorIfQueryNotSupported checks that we can perform distributed planning for * ErrorIfQueryNotSupported checks that we can perform distributed planning for
* the given query. The checks in this function will be removed as we support * the given query. The checks in this function will be removed as we support

View File

@ -40,6 +40,7 @@
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/intermediate_results.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
@ -2211,18 +2212,32 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
{ {
RelationRestriction *relationRestriction = RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(restrictionCell); (RelationRestriction *) lfirst(restrictionCell);
Oid relationId = relationRestriction->relationId;
List *prunedShardList = (List *) lfirst(prunedRelationShardCell); List *prunedShardList = (List *) lfirst(prunedRelationShardCell);
ListCell *shardIntervalCell = NULL; ListCell *shardIntervalCell = NULL;
int currentShardCount = 0;
DistributedResult *distributedResult = NULL;
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); if (IsDistributedIntermediateResultRTE(relationRestriction->rte))
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
{ {
continue; char *resultId = FindDistributedResultId(relationRestriction->rte);
distributedResult = GetNamedDistributedResult(resultId);
currentShardCount = distributedResult->shardCount;
}
else
{
Oid relationId = relationRestriction->relationId;
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
{
continue;
}
currentShardCount = cacheEntry->shardIntervalArrayLength;
} }
/* we expect distributed tables to have the same shard count */ /* we expect distributed tables to have the same shard count */
if (shardCount > 0 && shardCount != cacheEntry->shardIntervalArrayLength) if (shardCount > 0 && shardCount != currentShardCount)
{ {
ereport(ERROR, (errmsg("shard counts of co-located tables do not " ereport(ERROR, (errmsg("shard counts of co-located tables do not "
"match"))); "match")));
@ -2230,7 +2245,7 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
if (taskRequiredForShardIndex == NULL) if (taskRequiredForShardIndex == NULL)
{ {
shardCount = cacheEntry->shardIntervalArrayLength; shardCount = currentShardCount;
taskRequiredForShardIndex = (bool *) palloc0(shardCount); taskRequiredForShardIndex = (bool *) palloc0(shardCount);
/* there is a distributed table, find the shard range */ /* there is a distributed table, find the shard range */
@ -2258,6 +2273,18 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
int shardIndex = shardInterval->shardIndex; int shardIndex = shardInterval->shardIndex;
if (distributedResult != NULL)
{
DistributedResultShard *resultShard =
&(distributedResult->resultShards[shardIndex]);
if (resultShard->fragmentList == NIL)
{
/* can skip empty result shard */
continue;
}
}
taskRequiredForShardIndex[shardIndex] = true; taskRequiredForShardIndex[shardIndex] = true;
minShardOffset = Min(minShardOffset, shardIndex); minShardOffset = Min(minShardOffset, shardIndex);
@ -2499,6 +2526,37 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
Oid relationId = relationRestriction->relationId; Oid relationId = relationRestriction->relationId;
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
if (IsDistributedIntermediateResultRTE(relationRestriction->rte))
{
char *resultId = FindDistributedResultId(relationRestriction->rte);
DistributedResult *distributedResult = GetNamedDistributedResult(resultId);
DistributedResultShard *resultShard =
&(distributedResult->resultShards[shardIndex]);
if (resultShard->fragmentList == NIL)
{
/* skip generating a relation shard */
continue;
}
RelationShard *relationShard = CitusMakeNode(RelationShard);
relationShard->shardedRelationType = SHARDED_RESULT;
relationShard->resultId = resultId;
relationShard->shardId = resultShard->targetShardId;
relationShard->shardIndex = shardIndex;
if (anchorShardId == INVALID_SHARD_ID)
{
anchorShardId = resultShard->targetShardId;
}
relationShardList = lappend(relationShardList, relationShard);
shardInterval = LoadShardInterval(resultShard->targetShardId);
taskShardList = lappend(taskShardList, list_make1(shardInterval));
continue;
}
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
{ {
@ -2534,6 +2592,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
RelationShard *relationShard = CitusMakeNode(RelationShard); RelationShard *relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = copiedShardInterval->relationId; relationShard->relationId = copiedShardInterval->relationId;
relationShard->shardId = copiedShardInterval->shardId; relationShard->shardId = copiedShardInterval->shardId;
relationShard->shardIndex = shardIndex;
relationShardList = lappend(relationShardList, relationShard); relationShardList = lappend(relationShardList, relationShard);
} }

View File

@ -30,6 +30,7 @@
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
#include "distributed/log_utils.h" #include "distributed/log_utils.h"
#include "distributed/insert_select_planner.h" #include "distributed/insert_select_planner.h"
#include "distributed/intermediate_results.h"
#include "distributed/intermediate_result_pruning.h" #include "distributed/intermediate_result_pruning.h"
#include "distributed/metadata_utility.h" #include "distributed/metadata_utility.h"
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
@ -2463,6 +2464,7 @@ RelationShardListForShardIntervalList(List *shardIntervalList, bool *shardsPrese
relationShard->relationId = shardInterval->relationId; relationShard->relationId = shardInterval->relationId;
relationShard->shardId = shardInterval->shardId; relationShard->shardId = shardInterval->shardId;
relationShard->shardIndex = shardInterval->shardIndex;
relationShardList = lappend(relationShardList, relationShard); relationShardList = lappend(relationShardList, relationShard);
} }
@ -2638,16 +2640,29 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte
RelationRestriction *relationRestriction = RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(restrictionCell); (RelationRestriction *) lfirst(restrictionCell);
Oid relationId = relationRestriction->relationId; Oid relationId = relationRestriction->relationId;
int shardCount = 0;
if (!IsCitusTable(relationId)) if (IsCitusTable(relationId))
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
shardCount = cacheEntry->shardIntervalArrayLength;
}
else if (IsDistributedIntermediateResultRTE(relationRestriction->rte))
{
char *resultId = FindDistributedResultId(relationRestriction->rte);
DistributedResult *distributedResult = GetNamedDistributedResult(resultId);
int colocationId = distributedResult->colocationId;
relationId = ColocatedTableId(colocationId);
shardCount = distributedResult->shardCount;
}
else
{ {
/* ignore local tables for shard pruning purposes */ /* ignore local tables for shard pruning purposes */
continue; continue;
} }
Index tableId = relationRestriction->index; Index tableId = relationRestriction->index;
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
int shardCount = cacheEntry->shardIntervalArrayLength;
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo; List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList); List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
List *prunedShardIntervalList = NIL; List *prunedShardIntervalList = NIL;

View File

@ -725,7 +725,7 @@ FromClauseRecurringTupleType(Query *queryTree)
} }
if (FindNodeMatchingCheckFunctionInRangeTableList(queryTree->rtable, if (FindNodeMatchingCheckFunctionInRangeTableList(queryTree->rtable,
IsDistributedTableRTE)) IsDistributedRelationRTE))
{ {
/* /*
* There is a distributed table somewhere in the FROM clause. * There is a distributed table somewhere in the FROM clause.
@ -1310,7 +1310,7 @@ RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo, Relids relids)
RangeTblEntry *rangeTableEntry = plannerInfo->simple_rte_array[relationId]; RangeTblEntry *rangeTableEntry = plannerInfo->simple_rte_array[relationId];
if (FindNodeMatchingCheckFunctionInRangeTableList(list_make1(rangeTableEntry), if (FindNodeMatchingCheckFunctionInRangeTableList(list_make1(rangeTableEntry),
IsDistributedTableRTE)) IsDistributedRelationRTE))
{ {
/* we already found a distributed table, no need to check further */ /* we already found a distributed table, no need to check further */
return false; return false;
@ -1418,6 +1418,11 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType)
{ {
List *functionList = rangeTableEntry->functions; List *functionList = rangeTableEntry->functions;
if (IsDistributedIntermediateResultRTE(rangeTableEntry))
{
return false;
}
if (list_length(functionList) == 1 && if (list_length(functionList) == 1 &&
ContainsReadIntermediateResultFunction((Node *) functionList)) ContainsReadIntermediateResultFunction((Node *) functionList))
{ {

View File

@ -1560,7 +1560,8 @@ ShouldTransformRTE(RangeTblEntry *rangeTableEntry)
*/ */
if (rangeTableEntry->rtekind != RTE_FUNCTION || if (rangeTableEntry->rtekind != RTE_FUNCTION ||
rangeTableEntry->lateral || rangeTableEntry->lateral ||
rangeTableEntry->funcordinality) rangeTableEntry->funcordinality ||
IsDistributedIntermediateResultRTE(rangeTableEntry))
{ {
return false; return false;
} }

View File

@ -14,6 +14,7 @@
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/distributed_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
@ -65,7 +66,15 @@ typedef struct AttributeEquivalenceClass
*/ */
typedef struct AttributeEquivalenceClassMember typedef struct AttributeEquivalenceClassMember
{ {
/* whether the class member is a table or a result */
ShardedRelationType shardedRelationType;
/* relation ID (in case of a table) */
Oid relationId; Oid relationId;
/* result ID (in case of a distributed result) */
char *resultId;
int rteIdentity; int rteIdentity;
Index varno; Index varno;
AttrNumber varattno; AttrNumber varattno;
@ -90,6 +99,10 @@ static void AddRteSubqueryToAttributeEquivalenceClass(AttributeEquivalenceClass
rangeTableEntry, rangeTableEntry,
PlannerInfo *root, PlannerInfo *root,
Var *varToBeAdded); Var *varToBeAdded);
static void AddDistResultRteToAttributeEquivalenceClass(AttributeEquivalenceClass **
attrEquivalenceClass,
RangeTblEntry *rangeTableEntry,
Var *varToBeAdded);
static Query * GetTargetSubquery(PlannerInfo *root, RangeTblEntry *rangeTableEntry, static Query * GetTargetSubquery(PlannerInfo *root, RangeTblEntry *rangeTableEntry,
Var *varToBeAdded); Var *varToBeAdded);
static void AddUnionAllSetOperationsToAttributeEquivalenceClass( static void AddUnionAllSetOperationsToAttributeEquivalenceClass(
@ -604,6 +617,17 @@ UniqueRelationCount(RelationRestrictionContext *restrictionContext, CitusTableTy
{ {
RelationRestriction *relationRestriction = RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(relationRestrictionCell); (RelationRestriction *) lfirst(relationRestrictionCell);
if (IsDistributedIntermediateResultRTE(relationRestriction->rte))
{
if (tableType == DISTRIBUTED_TABLE || tableType == ANY_CITUS_TABLE_TYPE)
{
int rteIdentity = GetRTEIdentity(relationRestriction->rte);
rteIdentityList = list_append_unique_int(rteIdentityList, rteIdentity);
}
continue;
}
Oid relationId = relationRestriction->relationId; Oid relationId = relationRestriction->relationId;
CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(relationId); CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(relationId);
@ -665,10 +689,11 @@ EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
(RelationRestriction *) lfirst(relationRestrictionCell); (RelationRestriction *) lfirst(relationRestrictionCell);
int rteIdentity = GetRTEIdentity(relationRestriction->rte); int rteIdentity = GetRTEIdentity(relationRestriction->rte);
/* we shouldn't check for the equality of non-distributed tables */ if (relationRestriction->rte->rtekind == RTE_RELATION &&
if (IsCitusTableType(relationRestriction->relationId, IsCitusTableType(relationRestriction->relationId,
CITUS_TABLE_WITH_NO_DIST_KEY)) CITUS_TABLE_WITH_NO_DIST_KEY))
{ {
/* we shouldn't check for the equality of non-distributed tables */
continue; continue;
} }
@ -1016,22 +1041,39 @@ GenerateEquivalenceClassForRelationRestriction(
{ {
RelationRestriction *relationRestriction = RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(relationRestrictionCell); (RelationRestriction *) lfirst(relationRestrictionCell);
Var *relationPartitionKey = DistPartitionKey(relationRestriction->relationId); int partitionAttributeNumber = 0;
if (relationPartitionKey) if (IsDistributedIntermediateResultRTE(relationRestriction->rte))
{ {
eqClassForRelation = palloc0(sizeof(AttributeEquivalenceClass)); char *resultId = FindDistributedResultId(relationRestriction->rte);
eqMember = palloc0(sizeof(AttributeEquivalenceClassMember)); DistributedResult *distResult = GetNamedDistributedResult(resultId);
eqMember->relationId = relationRestriction->relationId;
eqMember->rteIdentity = GetRTEIdentity(relationRestriction->rte);
eqMember->varno = relationRestriction->index;
eqMember->varattno = relationPartitionKey->varattno;
eqClassForRelation->equivalentAttributes = partitionAttributeNumber = distResult->partitionColumnIndex + 1;
lappend(eqClassForRelation->equivalentAttributes, eqMember);
break;
} }
else
{
Var *relationPartitionKey = DistPartitionKey(relationRestriction->relationId);
if (relationPartitionKey == NULL)
{
/* skip reference tables */
continue;
}
partitionAttributeNumber = relationPartitionKey->varattno;
}
eqClassForRelation = palloc0(sizeof(AttributeEquivalenceClass));
eqMember = palloc0(sizeof(AttributeEquivalenceClassMember));
eqMember->relationId = relationRestriction->relationId;
eqMember->rteIdentity = GetRTEIdentity(relationRestriction->rte);
eqMember->varno = relationRestriction->index;
eqMember->varattno = partitionAttributeNumber;
eqClassForRelation->equivalentAttributes =
lappend(eqClassForRelation->equivalentAttributes, eqMember);
break;
} }
return eqClassForRelation; return eqClassForRelation;
@ -1212,6 +1254,12 @@ AddToAttributeEquivalenceClass(AttributeEquivalenceClass **attributeEquivalenceC
rangeTableEntry, root, rangeTableEntry, root,
varToBeAdded); varToBeAdded);
} }
else if (IsDistributedIntermediateResultRTE(rangeTableEntry))
{
AddDistResultRteToAttributeEquivalenceClass(attributeEquivalenceClass,
rangeTableEntry,
varToBeAdded);
}
} }
@ -1483,6 +1531,43 @@ AddRteRelationToAttributeEquivalenceClass(AttributeEquivalenceClass **
} }
/*
* AddRteRelationToAttributeEquivalenceClass adds the given var to the given equivalence
* class using the rteIdentity provided by the rangeTableEntry. Note that
* rteIdentities are only assigned to RTE_RELATIONs and this function asserts
* the input rte to be an RTE_RELATION.
*/
static void
AddDistResultRteToAttributeEquivalenceClass(AttributeEquivalenceClass **
attrEquivalenceClass,
RangeTblEntry *rangeTableEntry,
Var *varToBeAdded)
{
char *resultId = FindDistributedResultId(rangeTableEntry);
DistributedResult *distributedResult = GetNamedDistributedResult(resultId);
/* we're only interested in partition columns */
if (distributedResult->partitionColumnIndex + 1 != varToBeAdded->varattno)
{
return;
}
AttributeEquivalenceClassMember *attributeEqMember = palloc0(
sizeof(AttributeEquivalenceClassMember));
attributeEqMember->varattno = varToBeAdded->varattno;
attributeEqMember->varno = varToBeAdded->varno;
attributeEqMember->rteIdentity = GetRTEIdentity(rangeTableEntry);
attributeEqMember->resultId = pstrdup(resultId);
attributeEqMember->shardedRelationType = SHARDED_RESULT;
(*attrEquivalenceClass)->equivalentAttributes =
lappend((*attrEquivalenceClass)->equivalentAttributes,
attributeEqMember);
}
/* /*
* AttributeClassContainsAttributeClassMember returns true if it the input class member * AttributeClassContainsAttributeClassMember returns true if it the input class member
* is already exists in the attributeEquivalenceClass. An equality is identified by the * is already exists in the attributeEquivalenceClass. An equality is identified by the
@ -1728,14 +1813,26 @@ AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictio
/* check whether all relations exists in the main restriction list */ /* check whether all relations exists in the main restriction list */
foreach_ptr(relationRestriction, restrictionContext->relationRestrictionList) foreach_ptr(relationRestriction, restrictionContext->relationRestrictionList)
{ {
Oid relationId = relationRestriction->relationId; int colocationId = INVALID_COLOCATION_ID;
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) if (IsDistributedIntermediateResultRTE(relationRestriction->rte))
{ {
continue; char *resultId = FindDistributedResultId(relationRestriction->rte);
} DistributedResult *distributedResult = GetNamedDistributedResult(resultId);
int colocationId = TableColocationId(relationId); colocationId = distributedResult->colocationId;
}
else
{
Oid relationId = relationRestriction->relationId;
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
{
continue;
}
colocationId = TableColocationId(relationId);
}
if (initialColocationId == INVALID_COLOCATION_ID) if (initialColocationId == INVALID_COLOCATION_ID)
{ {
@ -1937,7 +2034,8 @@ RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int
ExtractRangeTableRelationWalker((Node *) rangeTableEntry->subquery, ExtractRangeTableRelationWalker((Node *) rangeTableEntry->subquery,
&rangeTableRelationList); &rangeTableRelationList);
} }
else if (rangeTableEntry->rtekind == RTE_RELATION) else if (rangeTableEntry->rtekind == RTE_RELATION ||
IsDistributedIntermediateResultRTE(rangeTableEntry))
{ {
ExtractRangeTableRelationWalker((Node *) rangeTableEntry, ExtractRangeTableRelationWalker((Node *) rangeTableEntry,
&rangeTableRelationList); &rangeTableRelationList);
@ -1952,8 +2050,6 @@ RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int
{ {
RangeTblEntry *rteRelation = (RangeTblEntry *) lfirst(rteRelationCell); RangeTblEntry *rteRelation = (RangeTblEntry *) lfirst(rteRelationCell);
Assert(rteRelation->rtekind == RTE_RELATION);
int rteIdentity = GetRTEIdentity(rteRelation); int rteIdentity = GetRTEIdentity(rteRelation);
if (bms_is_member(rteIdentity, queryRteIdentities)) if (bms_is_member(rteIdentity, queryRteIdentities))
{ {

View File

@ -5,3 +5,17 @@
#include "udfs/citus_finish_pg_upgrade/10.0-1.sql" #include "udfs/citus_finish_pg_upgrade/10.0-1.sql"
#include "../../columnar/sql/columnar--9.5-1--10.0-1.sql" #include "../../columnar/sql/columnar--9.5-1--10.0-1.sql"
CREATE OR REPLACE FUNCTION pg_catalog.read_distributed_intermediate_result(result_id text)
RETURNS record
LANGUAGE C STRICT VOLATILE PARALLEL SAFE
AS 'MODULE_PATHNAME', $$read_distributed_intermediate_result$$;
COMMENT ON FUNCTION pg_catalog.read_distributed_intermediate_result(text)
IS 'read a previously stored distributed intermediate result';
CREATE OR REPLACE FUNCTION pg_catalog.create_distributed_intermediate_result(result_id text, query text, distribution_column int, colocate_with text)
RETURNS void
LANGUAGE C STRICT VOLATILE
AS 'MODULE_PATHNAME', $$create_distributed_intermediate_result$$;
COMMENT ON FUNCTION pg_catalog.create_distributed_intermediate_result(text,text,int,text)
IS 'execute a query and write its results to local result file';

View File

@ -4,3 +4,6 @@
#include "../udfs/citus_finish_pg_upgrade/9.5-1.sql" #include "../udfs/citus_finish_pg_upgrade/9.5-1.sql"
#include "../../../columnar/sql/downgrades/columnar--10.0-1--9.5-1.sql" #include "../../../columnar/sql/downgrades/columnar--10.0-1--9.5-1.sql"
DROP FUNCTION pg_catalog.read_distributed_intermediate_result(text);
DROP FUNCTION pg_catalog.create_distributed_intermediate_result(text, text, int, text);

View File

@ -150,9 +150,9 @@ redistribute_task_list_results(PG_FUNCTION_ARGS)
DISTRIBUTED_TABLE) ? DISTRIBUTED_TABLE) ?
targetRelation->partitionColumn->varattno - 1 : 0; targetRelation->partitionColumn->varattno - 1 : 0;
List **shardResultIds = RedistributeTaskListResults(resultIdPrefix, taskList, DistributedResult *distResult =
partitionColumnIndex, RedistributeTaskListResults(resultIdPrefix, taskList, partitionColumnIndex,
targetRelation, binaryFormat); targetRelation, binaryFormat);
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
@ -163,10 +163,12 @@ redistribute_task_list_results(PG_FUNCTION_ARGS)
ShardInterval *shardInterval = ShardInterval *shardInterval =
targetRelation->sortedShardIntervalArray[shardIndex]; targetRelation->sortedShardIntervalArray[shardIndex];
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
DistributedResultShard *resultShard = &(distResult->resultShards[shardIndex]);
List *fragmentList = resultShard->fragmentList;
int fragmentCount = list_length(shardResultIds[shardIndex]); int fragmentCount = list_length(fragmentList);
Datum *resultIdValues = palloc0(fragmentCount * sizeof(Datum)); Datum *resultIdValues = palloc0(fragmentCount * sizeof(Datum));
List *sortedResultIds = SortList(shardResultIds[shardIndex], pg_qsort_strcmp); List *sortedResultIds = SortList(fragmentList, pg_qsort_strcmp);
const char *resultId = NULL; const char *resultId = NULL;
int resultIdIndex = 0; int resultIdIndex = 0;

View File

@ -271,6 +271,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
*/ */
DeallocateReservedConnections(); DeallocateReservedConnections();
ClearNamedDistributedResultsHash();
UnSetDistributedTransactionId(); UnSetDistributedTransactionId();
/* empty the CommitContext to ensure we're not leaking memory */ /* empty the CommitContext to ensure we're not leaking memory */
@ -330,6 +332,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
*/ */
DeallocateReservedConnections(); DeallocateReservedConnections();
ClearNamedDistributedResultsHash();
/* /*
* We reset these mainly for posterity. The only way we would normally * We reset these mainly for posterity. The only way we would normally
* get here with ExecutorLevel or PlannerLevel > 0 is during a fatal * get here with ExecutorLevel or PlannerLevel > 0 is during a fatal

View File

@ -250,8 +250,11 @@ CopyNodeRelationShard(COPYFUNC_ARGS)
{ {
DECLARE_FROM_AND_NEW_NODE(RelationShard); DECLARE_FROM_AND_NEW_NODE(RelationShard);
COPY_SCALAR_FIELD(shardedRelationType);
COPY_SCALAR_FIELD(relationId); COPY_SCALAR_FIELD(relationId);
COPY_SCALAR_FIELD(shardId); COPY_SCALAR_FIELD(shardId);
COPY_STRING_FIELD(resultId);
COPY_SCALAR_FIELD(shardIndex);
} }

View File

@ -459,8 +459,11 @@ OutRelationShard(OUTFUNC_ARGS)
WRITE_LOCALS(RelationShard); WRITE_LOCALS(RelationShard);
WRITE_NODE_TYPE("RELATIONSHARD"); WRITE_NODE_TYPE("RELATIONSHARD");
WRITE_ENUM_FIELD(shardedRelationType, ShardedRelationType);
WRITE_OID_FIELD(relationId); WRITE_OID_FIELD(relationId);
WRITE_UINT64_FIELD(shardId); WRITE_UINT64_FIELD(shardId);
WRITE_STRING_FIELD(resultId);
WRITE_INT_FIELD(shardIndex);
} }

View File

@ -117,11 +117,32 @@ typedef struct PlannerRestrictionContext
MemoryContext memoryContext; MemoryContext memoryContext;
} PlannerRestrictionContext; } PlannerRestrictionContext;
/* type of relation in a relation shard */
typedef enum ShardedRelationType
{
SHARDED_TABLE,
SHARDED_RESULT
} ShardedRelationType;
/* mapping from relation to shard */
typedef struct RelationShard typedef struct RelationShard
{ {
CitusNode type; CitusNode type;
/* type of relation (table or result) */
ShardedRelationType shardedRelationType;
/* OID of the relation (InvalidOid in case of result) */
Oid relationId; Oid relationId;
/* shard ID in the distributed table */
uint64 shardId; uint64 shardId;
/* name of the result (NULL in case of table) */
char *resultId;
/* index of the shard in a sorted shard interval array */
int shardIndex;
} RelationShard; } RelationShard;
typedef struct RelationRowLock typedef struct RelationRowLock

View File

@ -23,7 +23,8 @@
/* /*
* DistributedResultFragment represents a fragment of a distributed result. * DistributedResultFragment represents a fragment of a distributed result
* shard.
*/ */
typedef struct DistributedResultFragment typedef struct DistributedResultFragment
{ {
@ -34,7 +35,7 @@ typedef struct DistributedResultFragment
uint32 nodeId; uint32 nodeId;
/* number of rows in the result file */ /* number of rows in the result file */
int rowCount; uint64 rowCount;
/* /*
* The fragment contains the rows which match the partitioning method * The fragment contains the rows which match the partitioning method
@ -47,6 +48,72 @@ typedef struct DistributedResultFragment
int targetShardIndex; int targetShardIndex;
} DistributedResultFragment; } DistributedResultFragment;
/*
* DistributedResultState describes whether a distributed result is planned
* or already executed.
*/
typedef enum DistributedResultState
{
DISTRIBUTED_RESULT_PLANNED,
DISTRIBUTED_RESULT_AVAILABLE
} DistributedResultState;
/*
* DistributedResultShard represents a shard of a distributed result. A shard
* can consist of multiple fragments, which are intermediate results that are
* no the same node.
*/
typedef struct DistributedResultShard
{
/* what is the index of targetShardId in its relation's sorted shard list? */
int targetShardIndex;
/*
* The fragment contains the rows which match the partitioning method
* and partitioning ranges of targetShardId. The shape of each row matches
* the schema of the relation to which targetShardId belongs to.
*/
uint64 targetShardId;
/* result ids of fragments that make up the shard (if result is available) */
List *fragmentList;
/* sum of the number of rows in the fragments (if result is available) */
int64 rowCount;
} DistributedResultShard;
/*
* DistributedResult describes a distributed intermediate result which can be
* queried like a distributed table.
*
* A distributed intermediate result has a set of distributed result fragment
* for each shard.
*/
typedef struct DistributedResult
{
/* state of this distributed result (planner or executed) */
DistributedResultState state;
/* co-location ID with which the result is co-located */
int colocationId;
/* number of shards in the co-location group */
int shardCount;
/* index of the partition column */
int partitionColumnIndex;
/* whether the file format is binary */
bool binaryFormat;
/* array containing a list of result shards */
DistributedResultShard *resultShards;
} DistributedResult;
/* intermediate_results.c */ /* intermediate_results.c */
extern DestReceiver * CreateRemoteFileDestReceiver(const char *resultId, extern DestReceiver * CreateRemoteFileDestReceiver(const char *resultId,
@ -62,11 +129,17 @@ extern char * QueryResultFileName(const char *resultId);
extern char * CreateIntermediateResultsDirectory(void); extern char * CreateIntermediateResultsDirectory(void);
/* distributed_intermediate_results.c */ /* distributed_intermediate_results.c */
extern List ** RedistributeTaskListResults(const char *resultIdPrefix, extern DistributedResult * RegisterDistributedResult(char *resultIdPrefix, Query *query,
List *selectTaskList, int partitionColumnIndex,
int partitionColumnIndex, Oid targetRelationId);
CitusTableCacheEntry *targetRelation, extern DistributedResult * GetNamedDistributedResult(char *resultId);
bool binaryFormat); extern void ClearNamedDistributedResultsHash(void);
extern DistributedResult * RedistributeTaskListResults(const char *resultIdPrefix,
List *selectTaskList,
int partitionColumnIndex,
CitusTableCacheEntry *
targetRelation,
bool binaryFormat);
extern List * PartitionTasklistResults(const char *resultIdPrefix, List *selectTaskList, extern List * PartitionTasklistResults(const char *resultIdPrefix, List *selectTaskList,
int partitionColumnIndex, int partitionColumnIndex,
CitusTableCacheEntry *distributionScheme, CitusTableCacheEntry *distributionScheme,

View File

@ -230,6 +230,7 @@ extern Oid CitusCopyFormatTypeId(void);
/* function oids */ /* function oids */
extern Oid CitusReadIntermediateResultFuncId(void); extern Oid CitusReadIntermediateResultFuncId(void);
Oid CitusReadIntermediateResultArrayFuncId(void); Oid CitusReadIntermediateResultArrayFuncId(void);
extern Oid CitusReadDistributedIntermediateResultFuncId(void);
extern Oid CitusExtraDataContainerFuncId(void); extern Oid CitusExtraDataContainerFuncId(void);
extern Oid CitusAnyValueFunctionId(void); extern Oid CitusAnyValueFunctionId(void);
extern Oid PgTableVisibleFuncId(void); extern Oid PgTableVisibleFuncId(void);

View File

@ -196,12 +196,15 @@ extern bool FindNodeMatchingCheckFunctionInRangeTableList(List *rtable, bool (*c
Node *)); Node *));
extern bool IsCitusTableRTE(Node *node); extern bool IsCitusTableRTE(Node *node);
extern bool IsDistributedTableRTE(Node *node); extern bool IsDistributedTableRTE(Node *node);
extern bool IsDistributedRelationRTE(Node *node);
extern bool IsReferenceTableRTE(Node *node); extern bool IsReferenceTableRTE(Node *node);
extern bool QueryContainsDistributedTableRTE(Query *query); extern bool QueryContainsDistributedTableRTE(Query *query);
extern bool IsCitusExtraDataContainerRelation(RangeTblEntry *rte); extern bool IsCitusExtraDataContainerRelation(RangeTblEntry *rte);
extern bool ContainsReadIntermediateResultFunction(Node *node); extern bool ContainsReadIntermediateResultFunction(Node *node);
extern bool ContainsReadIntermediateResultArrayFunction(Node *node); extern bool ContainsReadIntermediateResultArrayFunction(Node *node);
extern bool IsDistributedIntermediateResultRTE(RangeTblEntry *rangeTableEntry);
extern char * FindIntermediateResultIdIfExists(RangeTblEntry *rte); extern char * FindIntermediateResultIdIfExists(RangeTblEntry *rte);
extern char * FindDistributedResultId(RangeTblEntry *rte);
extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ParentNode(MultiNode *multiNode);
extern MultiNode * ChildNode(MultiUnaryNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode);
extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode); extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode);

View File

@ -475,19 +475,21 @@ SELECT * FROM print_extension_changes();
-- Snapshot of state at 10.0-1 -- Snapshot of state at 10.0-1
ALTER EXTENSION citus UPDATE TO '10.0-1'; ALTER EXTENSION citus UPDATE TO '10.0-1';
SELECT * FROM print_extension_changes(); SELECT * FROM print_extension_changes();
previous_object | current_object previous_object | current_object
--------------------------------------------------------------------- ---------------------------------------------------------------------
| access method columnar | access method columnar
| function alter_columnar_table_reset(regclass,boolean,boolean,boolean) | function alter_columnar_table_reset(regclass,boolean,boolean,boolean)
| function alter_columnar_table_set(regclass,integer,integer,name) | function alter_columnar_table_set(regclass,integer,integer,name)
| function citus_internal.cstore_ensure_objects_exist() | function citus_internal.cstore_ensure_objects_exist()
| function create_distributed_intermediate_result(text,text,integer,text)
| function cstore.columnar_handler(internal) | function cstore.columnar_handler(internal)
| function read_distributed_intermediate_result(text)
| schema cstore | schema cstore
| table cstore.cstore_data_files | table cstore.cstore_data_files
| table cstore.cstore_skipnodes | table cstore.cstore_skipnodes
| table cstore.cstore_stripes | table cstore.cstore_stripes
| table cstore.options | table cstore.options
(10 rows) (12 rows)
DROP TABLE prev_objects, extension_diff; DROP TABLE prev_objects, extension_diff;
-- show running version -- show running version