pull/7973/merge
eaydingol 2025-05-06 17:20:37 +01:00 committed by GitHub
commit 7d20d7a07b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 506 additions and 36 deletions

View File

@ -520,6 +520,30 @@ IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tabl
tableEntry->colocationId, tableType);
}
/*
* IsFirstShard returns true if the given shardId is the first shard.
*/
bool
IsFirstShard(CitusTableCacheEntry *tableEntry, uint64 shardId)
{
if (tableEntry == NULL || tableEntry->sortedShardIntervalArray == NULL)
{
return false;
}
if (tableEntry->sortedShardIntervalArray[0]->shardId == INVALID_SHARD_ID)
{
return false;
}
if (shardId == tableEntry->sortedShardIntervalArray[0]->shardId)
{
return true;
}
else
{
return false;
}
}
/*
* HasDistributionKey returns true if given Citus table has a distribution key.

View File

@ -316,6 +316,36 @@ BuildSelectStatementViaStdPlanner(Query *combineQuery, List *remoteScanTargetLis
}
/*
* ExtractCitusExtradataContainerRTE is a helper function that stores rangeTblEntry
* to result if it has citus extra data container.
*
* The function returns true if it finds the RTE, and false otherwise.
*/
bool
ExtractCitusExtradataContainerRTE(RangeTblEntry *rangeTblEntry, RangeTblEntry **result)
{
if (rangeTblEntry->rtekind == RTE_FUNCTION &&
list_length(rangeTblEntry->functions) == 1)
{
RangeTblFunction *rangeTblFunction = (RangeTblFunction *) linitial(
rangeTblEntry->functions);
if (!IsA(rangeTblFunction->funcexpr, FuncExpr))
{
return false;
}
FuncExpr *funcExpr = castNode(FuncExpr, rangeTblFunction->funcexpr);
if (funcExpr->funcid == CitusExtraDataContainerFuncId())
{
*result = rangeTblEntry;
return true;
}
}
return false;
}
/*
* Finds the rangetable entry in the query that refers to the citus_extradata_container
* and stores the pointer in result.
@ -331,25 +361,7 @@ FindCitusExtradataContainerRTE(Node *node, RangeTblEntry **result)
if (IsA(node, RangeTblEntry))
{
RangeTblEntry *rangeTblEntry = castNode(RangeTblEntry, node);
if (rangeTblEntry->rtekind == RTE_FUNCTION &&
list_length(rangeTblEntry->functions) == 1)
{
RangeTblFunction *rangeTblFunction = (RangeTblFunction *) linitial(
rangeTblEntry->functions);
if (!IsA(rangeTblFunction->funcexpr, FuncExpr))
{
return false;
}
FuncExpr *funcExpr = castNode(FuncExpr, rangeTblFunction->funcexpr);
if (funcExpr->funcid == CitusExtraDataContainerFuncId())
{
*result = rangeTblEntry;
return true;
}
}
/* query_tree_walker descends into RTEs */
return false;
return ExtractCitusExtradataContainerRTE(rangeTblEntry, result);
}
else if (IsA(node, Query))
{

View File

@ -16,6 +16,7 @@
#include "access/heapam.h"
#include "access/htup_details.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_operator.h"
#include "lib/stringinfo.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
@ -33,11 +34,13 @@
#include "distributed/combine_query_planner.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/insert_select_planner.h"
#include "distributed/query_utils.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/recursive_planning.h"
#include "distributed/shard_utils.h"
#include "distributed/stats/stat_tenants.h"
#include "distributed/version_compat.h"
@ -203,6 +206,202 @@ UpdateTaskQueryString(Query *query, Task *task)
SetTaskQueryIfShouldLazyDeparse(task, query);
}
/*
* Iterates through the FROM clause of the query and checks if there is a join
* expr with a reference and distributed table.
* If there is, it adds the index of the range table entry of the outer
* table in the join clause to the constraintIndexes list. It also sets the
* innerRte to point to the range table entry inner table.
*/
bool ExtractIndexesForConstaints(List *fromList, List *rtable,
int *outerRtIndex, RangeTblEntry **distRte)
{
ereport(DEBUG5, (errmsg("******")));
ListCell *fromExprCell;
// Check the first element of the from clause, the rest is already handled
foreach(fromExprCell, fromList)
{
Node *fromElement = (Node *) lfirst(fromExprCell);
if (IsA(fromElement, JoinExpr))
{
JoinExpr *joinExpr = (JoinExpr *) fromElement;
if(!IS_OUTER_JOIN(joinExpr->jointype))
{
continue;
}
*outerRtIndex = (((RangeTblRef *)joinExpr->larg)->rtindex);
RangeTblEntry *outerRte = rt_fetch(*outerRtIndex, rtable);
if(!IsPushdownSafeForRTEInLeftJoin(outerRte))
{
return false;
}
if (!CheckIfAllCitusRTEsAreColocated((Node *)joinExpr->rarg, rtable, distRte))
{
return false;
}
return true;
}
}
return false;
}
/*
* UpdateWhereClauseForOuterJoin walks over the query tree and appends quals
* to the WHERE clause to filter w.r.to the distribution column of the corresponding shard.
* TODO:
* - Not supported cases should not call this function.
* - Remove the excessive debug messages.
*/
bool
UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
{
if (node == NULL)
{
return false;
}
if (!IsA(node, Query))
{
return expression_tree_walker(node, UpdateWhereClauseForOuterJoin, relationShardList);
}
Query *query = (Query *) node;
if (query->jointree == NULL)
{
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
}
FromExpr *fromExpr = query->jointree;
if(fromExpr == NULL)
{
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
}
/*
* We need to find the outer table in the join clause to add the constraints w.r.to the shard
* intervals of the inner table.
* A representative inner table is sufficient as long as it is colocated with all other
* distributed tables in the join clause.
*/
RangeTblEntry *innerRte = NULL;
int outerRtIndex = -1;
bool result = ExtractIndexesForConstaints(fromExpr->fromlist, query->rtable, &outerRtIndex, &innerRte);
if (!result)
{
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: failed to extract indexes")));
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
}
ereport(DEBUG5, (errmsg("\t Distributed table from the inner part: %s", innerRte->eref->aliasname)));
RelationShard *relationShard = FindRelationShard(innerRte->relid, relationShardList);
uint64 shardId = relationShard->shardId;
Oid relationId = relationShard->relationId;
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
Var *partitionColumnVar = cacheEntry->partitionColumn;
/*
* we will add constraints for the outer table, so we need to set the varno
* TODO: this only works when the outer table has the distribution column,
* we shoul not end up here if this is not the case.
*/
partitionColumnVar->varno = outerRtIndex;
bool isFirstShard = IsFirstShard(cacheEntry, shardId);
/* load the interval for the shard and create constant nodes for the upper/lower bounds */
ShardInterval *shardInterval = LoadShardInterval(shardId);
Const *constNodeLowerBound = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), shardInterval->minValue, false, true);
Const *constNodeUpperBound = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), shardInterval->maxValue, false, true);
Const *constNodeZero = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), Int32GetDatum(0), false, true);
// TOOD: the following is only for hash partitioned tables
/* create a function expression node for the hash partition column */
FuncExpr *hashFunction = makeNode(FuncExpr);
hashFunction->funcid = cacheEntry->hashFunction->fn_oid;
hashFunction->args = list_make1(partitionColumnVar);
hashFunction->funcresulttype = get_func_rettype(cacheEntry->hashFunction->fn_oid);
hashFunction->funcretset = false;
/* create a function expression for the lower bound of the shard interval */
Oid resultTypeOid = get_func_rettype(cacheEntry->shardIntervalCompareFunction->fn_oid);
FuncExpr* lowerBoundFuncExpr = makeNode(FuncExpr);
lowerBoundFuncExpr->funcid = cacheEntry->shardIntervalCompareFunction->fn_oid;
lowerBoundFuncExpr->args = list_make2((Node *) constNodeLowerBound, (Node *) hashFunction);
lowerBoundFuncExpr->funcresulttype = resultTypeOid;
lowerBoundFuncExpr->funcretset = false;
Oid lessThan = GetSysCacheOid(OPERNAMENSP, Anum_pg_operator_oid, CStringGetDatum("<"),
resultTypeOid, resultTypeOid, ObjectIdGetDatum(11));
/*
* Finally, check if the comparison result is less than 0, i.e.,
* shardInterval->minValue < hash(partitionColumn)
* See SearchCachedShardInterval for the behavior at the boundaries.
*/
Expr *lowerBoundExpr = make_opclause(lessThan, BOOLOID, false, (Expr *) lowerBoundFuncExpr,
(Expr *) constNodeZero, InvalidOid, InvalidOid);
/* create a function expression for the upper bound of the shard interval */
FuncExpr* upperBoundFuncExpr = makeNode(FuncExpr);
upperBoundFuncExpr->funcid = cacheEntry->shardIntervalCompareFunction->fn_oid;
upperBoundFuncExpr->args = list_make2((Node *) hashFunction, (Expr *) constNodeUpperBound);
upperBoundFuncExpr->funcresulttype = resultTypeOid;
upperBoundFuncExpr->funcretset = false;
Oid lessThanOrEqualTo = GetSysCacheOid(OPERNAMENSP, Anum_pg_operator_oid, CStringGetDatum("<="),
resultTypeOid, resultTypeOid, ObjectIdGetDatum(11));
/*
* Finally, check if the comparison result is less than or equal to 0, i.e.,
* hash(partitionColumn) <= shardInterval->maxValue
* See SearchCachedShardInterval for the behavior at the boundaries.
*/
Expr *upperBoundExpr = make_opclause(lessThanOrEqualTo, BOOLOID, false, (Expr *) upperBoundFuncExpr,
(Expr *) constNodeZero, InvalidOid, InvalidOid);
/* create a node for both upper and lower bound */
Node *shardIntervalBoundQuals = make_and_qual((Node *) lowerBoundExpr, (Node *) upperBoundExpr);
/*
* Add a null test for the partition column for the first shard.
* This is because we need to include the null values in exactly one of the shard queries.
* The null test is added as an OR clause to the existing AND clause.
*/
if (isFirstShard)
{
/* null test for the first shard */
NullTest *nullTest = makeNode(NullTest);
nullTest->nulltesttype = IS_NULL; /* Check for IS NULL */
nullTest->arg = (Expr *) partitionColumnVar; /* The variable to check */
nullTest->argisrow = false;
shardIntervalBoundQuals = (Node *) make_orclause(list_make2(nullTest, shardIntervalBoundQuals));
}
if (fromExpr->quals == NULL)
{
fromExpr->quals = (Node *) shardIntervalBoundQuals;
}
else
{
fromExpr->quals = make_and_qual(fromExpr->quals, shardIntervalBoundQuals);
}
// TODO: verify this, do we need the recursive call for all nodes?
/* We need to continue the recursive walk for the nested join statements.*/
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
}
/*
* UpdateRelationToShardNames walks over the query tree and appends shard ids to

View File

@ -174,6 +174,7 @@ static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
uint32 taskId,
TaskType taskType,
bool modifyRequiresCoordinatorEvaluation,
bool innerTableOfOuterJoin,
DeferredErrorMessage **planningError);
static List * SqlTaskList(Job *job);
static bool DependsOnHashPartitionJob(Job *job);
@ -2183,6 +2184,8 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
int minShardOffset = INT_MAX;
int prevShardCount = 0;
Bitmapset *taskRequiredForShardIndex = NULL;
Bitmapset *innerTableOfOuterJoinSet = NULL;
bool innerTableOfOuterJoin = false;
/* error if shards are not co-partitioned */
ErrorIfUnsupportedShardDistribution(query);
@ -2220,20 +2223,17 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
return NIL;
}
prevShardCount = cacheEntry->shardIntervalArrayLength;
innerTableOfOuterJoin = false;
/*
* For left joins we don't care about the shards pruned for the right hand side.
* If the right hand side would prune to a smaller set we should still send it to
* all tables of the left hand side. However if the right hand side is bigger than
* the left hand side we don't have to send the query to any shard that is not
* matching anything on the left hand side.
*
* Instead we will simply skip any RelationRestriction if it is an OUTER join and
* the table is part of the non-outer side of the join.
* For left outer joins, we need to check if the table is in the inner
* part of the join. If it is, we need to mark this shard and add interval
* constraints to the join.
*/
if (IsInnerTableOfOuterJoin(relationRestriction))
{
continue;
ereport(DEBUG1, errmsg("Inner Table of Outer Join %d",
relationRestriction->relationId));
innerTableOfOuterJoin = true;
}
ShardInterval *shardInterval = NULL;
@ -2244,6 +2244,15 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
taskRequiredForShardIndex =
bms_add_member(taskRequiredForShardIndex, shardIndex);
minShardOffset = Min(minShardOffset, shardIndex);
if (innerTableOfOuterJoin)
{
/*
* We need to keep track of the inner table of outer join
* shards so that we can process them later.
*/
innerTableOfOuterJoinSet =
bms_add_member(innerTableOfOuterJoinSet, shardIndex);
}
}
}
@ -2261,11 +2270,13 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
int shardOffset = minShardOffset - 1;
while ((shardOffset = bms_next_member(taskRequiredForShardIndex, shardOffset)) >= 0)
{
innerTableOfOuterJoin = bms_is_member(shardOffset, innerTableOfOuterJoinSet);
Task *subqueryTask = QueryPushdownTaskCreate(query, shardOffset,
relationRestrictionContext,
taskIdIndex,
taskType,
modifyRequiresCoordinatorEvaluation,
innerTableOfOuterJoin,
planningError);
if (*planningError != NULL)
{
@ -2439,6 +2450,7 @@ static Task *
QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext, uint32 taskId,
TaskType taskType, bool modifyRequiresCoordinatorEvaluation,
bool innerTableOfOuterJoin,
DeferredErrorMessage **planningError)
{
Query *taskQuery = copyObject(originalQuery);
@ -2531,11 +2543,21 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
*/
UpdateRelationToShardNames((Node *) taskQuery, relationShardList);
/*
* Augment the where clause with the shard intervals for inner table of outer
* joins.
*/
if (innerTableOfOuterJoin)
{
UpdateWhereClauseForOuterJoin((Node *) taskQuery, relationShardList);
}
/*
* Ands are made implicit during shard pruning, as predicate comparison and
* refutation depend on it being so. We need to make them explicit again so
* that the query string is generated as (...) AND (...) as opposed to
* (...), (...).
* TODO: do we need to run this before adding quals?
*/
if (taskQuery->jointree->quals != NULL && IsA(taskQuery->jointree->quals, List))
{

View File

@ -834,6 +834,11 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin(
* contains recurring rels, must be an unsupported lateral outer
* join.
*/
/*
* For now only stop returning an error here.
* TODO: later add all required checks to push down the query here
*/
continue;
recurType = FetchFirstRecurType(plannerInfo, outerrelRelids);
break;

View File

@ -73,6 +73,7 @@
#include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/combine_query_planner.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/distributed_planner.h"
#include "distributed/errormessage.h"
@ -87,6 +88,7 @@
#include "distributed/multi_server_executor.h"
#include "distributed/query_colocation_checker.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/query_utils.h"
#include "distributed/recursive_planning.h"
#include "distributed/relation_restriction_equivalence.h"
#include "distributed/shard_pruning.h"
@ -104,6 +106,7 @@ struct RecursivePlanningContextInternal
bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */
List *subPlanList;
PlannerRestrictionContext *plannerRestrictionContext;
bool restrictionEquivalenceCheck;
};
/* track depth of current recursive planner query */
@ -640,6 +643,35 @@ RecursivelyPlanNonColocatedSubqueriesInWhere(Query *query,
}
/*
* Returns true if the given node is recurring, or the node is a
* JoinExpr that contains a recurring node.
*/
static bool
JoinExprHasNonRecurringTable(Node *node, Query *query)
{
if (node == NULL)
{
return false;
}
else if (IsA(node, RangeTblRef))
{
return IsRTERefRecurring((RangeTblRef *) node, query);
}
else if (IsA(node, JoinExpr))
{
JoinExpr *joinExpr = (JoinExpr *) node;
return JoinExprHasNonRecurringTable(joinExpr->larg, query) ||
JoinExprHasNonRecurringTable(joinExpr->rarg, query);
}
else
{
return false;
}
}
/*
* RecursivelyPlanRecurringTupleOuterJoinWalker descends into a join tree and
* recursively plans all non-recurring (i.e., distributed) rels that that
@ -708,13 +740,51 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
case JOIN_LEFT:
{
/* <recurring> left join <distributed> */
if (leftNodeRecurs && !rightNodeRecurs)
/* Recursively plan the right side of the left left join when the following
* conditions are met:
* 1. The left side is recurring
* 2. The right side is not recurring
* 3. Either of the following:
* a. The left side is not a RangeTblRef (i.e., it is not a reference/local table)
* b. The tables in the rigt side are not colocated.
* 5. The left side does not have the distribution column (TODO: CHECK THIS)
*/
if (leftNodeRecurs && !rightNodeRecurs)
{
int outerRtIndex = ((RangeTblRef *) leftNode)->rtindex;
RangeTblEntry *rte = rt_fetch(outerRtIndex, query->rtable);
RangeTblEntry *innerRte = NULL;
if (!IsPushdownSafeForRTEInLeftJoin(rte))
{
ereport(DEBUG1, (errmsg("recursively planning right side of "
"the left join since the outer side "
"is a recurring rel that is not an RTE")));
RecursivelyPlanDistributedJoinNode(rightNode, query,
recursivePlanningContext);
}
else if (!CheckIfAllCitusRTEsAreColocated(rightNode, query->rtable, &innerRte))
{
ereport(DEBUG1, (errmsg("recursively planning right side of the left join "
"since tables in the inner side of the left "
"join are not colocated")));
RecursivelyPlanDistributedJoinNode(rightNode, query,
recursivePlanningContext);
}
}
/*
* rightNodeRecurs if there is a recurring table in the right side. However, if the right side
* is a join expression, we need to check if it contains a recurring table. If it does, we need to
* recursively plan the right side of the left join. Push-down path does not handle the nested joins
* yet, once we have that, we can remove this check.
*/
else if (leftNodeRecurs && rightNodeRecurs && JoinExprHasNonRecurringTable(rightNode, query))
{
ereport(DEBUG1, (errmsg("recursively planning right side of "
"the left join since the outer side "
"is a recurring rel")));
RecursivelyPlanDistributedJoinNode(rightNode, query,
recursivePlanningContext);
ereport(DEBUG1, (errmsg("recursively planning right side of the left join "
"since right side is a joinexpr with non-recurring tables")));
RecursivelyPlanDistributedJoinNode(rightNode, query,
recursivePlanningContext);
}
/*
@ -2583,3 +2653,35 @@ GeneratingSubplans(void)
{
return recursivePlanningDepth > 0;
}
/*
* IsPushdownSafeForRTEInLeftJoin returns true if the given range table entry
* is safe for pushdown. Currently, we only allow RTE_RELATION and RTE_FUNCTION.
*/
bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte)
{
if (rte->rtekind == RTE_RELATION)
{
return true;
}
/* check if it is a citus table, e.g., ref table */
else if (rte->rtekind == RTE_FUNCTION)
{
RangeTblEntry *newRte = NULL;
if(!ExtractCitusExtradataContainerRTE(rte, &newRte))
{
ereport(DEBUG5, (errmsg("RTE type %d is not safe for pushdown, function but it does not contain citus extradata",
rte->rtekind)));
return false;
}
return true;
}
else
{
ereport(DEBUG5, (errmsg("RTE type %d is not safe for pushdown",
rte->rtekind)));
return false;
}
}

View File

@ -15,9 +15,11 @@
#include "catalog/pg_class.h"
#include "nodes/nodeFuncs.h"
#include "nodes/primnodes.h"
#include "parser/parsetree.h"
#include "distributed/listutils.h"
#include "distributed/query_utils.h"
#include "distributed/relation_restriction_equivalence.h"
#include "distributed/version_compat.h"
@ -178,3 +180,93 @@ ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList)
return walkerResult;
}
/*
* ExtractRangeTableIds walks over the given node, and finds all range
* table entries.
*/
bool
ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context)
{
List **rangeTableList = context->idList;
List *rtable = context->rtable;
if (node == NULL)
{
return false;
}
if (IsA(node, RangeTblRef))
{
int rangeTableIndex = ((RangeTblRef *) node)->rtindex;
RangeTblEntry *rte = rt_fetch(rangeTableIndex, rtable);
if (rte->rtekind == RTE_SUBQUERY)
{
Query *subquery = rte->subquery;
context->rtable = subquery->rtable;
ExtractRangeTableIds((Node *) subquery, context);
context->rtable = rtable; /* restore original rtable */
}
else if (rte->rtekind == RTE_RELATION || rte->rtekind == RTE_FUNCTION)
{
(*rangeTableList) = lappend(*rangeTableList, rte);
ereport(DEBUG4, (errmsg("ExtractRangeTableIds: found range table id %d", rte->relid)));
}
else
{
ereport(DEBUG4, (errmsg("Unsupported RTE kind in ExtractRangeTableIds %d", rte->rtekind)));
}
return false;
}
else if (IsA(node, Query))
{
context->rtable = ((Query *) node)->rtable;
query_tree_walker((Query *) node, ExtractRangeTableIds, context, 0);
context->rtable = rtable; /* restore original rtable */
return false;
}
else
{
return expression_tree_walker(node, ExtractRangeTableIds, context);
}
}
/*
* CheckIfAllCitusRTEsAreColocated checks if all distributed tables in the
* given node are colocated. If they are, it sets the value of rte to a
* representative table.
*/
bool CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte)
{
ExtractRangeTableIdsContext context;
List *idList = NIL;
context.idList = &idList;
context.rtable = rtable;
ExtractRangeTableIds(node, &context);
RangeTblEntry *rteTmp;
List *citusRelids = NIL;
ListCell *lc = NULL;
foreach(lc, idList)
{
rteTmp = (RangeTblEntry *) lfirst(lc);
if (IsCitusTable(rteTmp->relid))
{
citusRelids = lappend_int(citusRelids, rteTmp->relid);
*rte = rteTmp; // set the value of rte, a representative table
}
}
if (!AllDistributedRelationsInListColocated(citusRelids))
{
ereport(DEBUG5, (errmsg("The distributed tables are not colocated")));
return false;
}
return true;
}

View File

@ -26,6 +26,7 @@ extern Path * CreateCitusCustomScanPath(PlannerInfo *root, RelOptInfo *relOptInf
CustomScan *remoteScan);
extern PlannedStmt * PlanCombineQuery(struct DistributedPlan *distributedPlan,
struct CustomScan *dataScan);
extern bool ExtractCitusExtradataContainerRTE(RangeTblEntry *rangeTblEntry, RangeTblEntry **result);
extern bool FindCitusExtradataContainerRTE(Node *node, RangeTblEntry **result);
extern bool ReplaceCitusExtraDataContainer;
extern CustomScan *ReplaceCitusExtraDataContainerWithCustomScan;

View File

@ -20,10 +20,13 @@
#include "nodes/pg_list.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/query_utils.h"
bool ExtractIndexesForConstaints(List *fromList, List *rtable, int *outerRtIndex, RangeTblEntry **distRte);
extern void RebuildQueryStrings(Job *workerJob);
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
extern bool UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList);
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
extern void SetTaskQueryString(Task *task, char *queryString);
extern void SetTaskQueryStringList(Task *task, List *queryStringList);

View File

@ -146,6 +146,7 @@ extern bool IsCitusTableType(Oid relationId, CitusTableType tableType);
extern CitusTableType GetCitusTableType(CitusTableCacheEntry *tableEntry);
extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry,
CitusTableType tableType);
extern bool IsFirstShard(CitusTableCacheEntry *tableEntry, uint64 shardId);
bool HasDistributionKey(Oid relationId);
bool HasDistributionKeyCacheEntry(CitusTableCacheEntry *tableEntry);
extern char * GetTableTypeName(Oid tableId);

View File

@ -30,6 +30,13 @@ typedef struct ExtractRangeTableWalkerContext
ExtractRangeTableMode walkerMode;
} ExtractRangeTableWalkerContext;
/* Struct to pass rtable list and the result list to walker */
typedef struct ExtractRangeTableIdsContext
{
List **idList;
List *rtable;
} ExtractRangeTableIdsContext;
/* Function declarations for query-walker utility functions */
extern bool ExtractRangeTableList(Node *node, ExtractRangeTableWalkerContext *context);
@ -38,5 +45,6 @@ extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList);
extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList);
extern bool ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList);
extern bool ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context);
extern bool CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte);
#endif /* QUERY_UTILS_H */

View File

@ -51,6 +51,7 @@ extern bool IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry);
extern bool IsRelationLocalTableOrMatView(Oid relationId);
extern bool ContainsReferencesToOuterQuery(Query *query);
extern void UpdateVarNosInNode(Node *node, Index newVarNo);
extern bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte);
#endif /* RECURSIVE_PLANNING_H */