From 851386f94ab4a1fb50ac757cd34d61deea17b0ec Mon Sep 17 00:00:00 2001 From: eaydingol Date: Tue, 27 May 2025 18:49:54 +0300 Subject: [PATCH] Check if the outer table has distribution column, wip, still need to check the using clause or join constraint --- .../distributed/planner/deparse_shard_query.c | 25 ++++++++++++---- .../distributed/planner/recursive_planning.c | 23 +++++++++++++- src/backend/distributed/utils/query_utils.c | 30 +++++++++++++++++-- src/include/distributed/query_utils.h | 1 + 4 files changed, 71 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index cf285098f..5511c402c 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -291,6 +291,7 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList) * distributed tables in the join clause. */ RangeTblEntry *innerRte = NULL; + RangeTblEntry *outerRte = NULL; int outerRtIndex = -1; bool result = ExtractIndexesForConstaints(fromExpr->fromlist, query->rtable, &outerRtIndex, &innerRte); if (!result) @@ -310,11 +311,25 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList) Var *partitionColumnVar = cacheEntry->partitionColumn; /* - * we will add constraints for the outer table, so we need to set the varno - * TODO: this only works when the outer table has the distribution column, - * we shoul not end up here if this is not the case. + * we will add constraints for the outer table, we need to find the column in the outer + * table that is comparable to the partition column of the inner table. + * If the column does not exist, we return without modifying the query. + * If the column exists, we create a Var node for the outer table's partition column. */ - partitionColumnVar->varno = outerRtIndex; + outerRte = rt_fetch(outerRtIndex, query->rtable); + AttrNumber attnum = GetAttrNumForMatchingColumn(outerRte, relationId, partitionColumnVar); + // TODO: we also have to check that the tables are joined on the partition column. + if( attnum == InvalidAttrNumber) + { + return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); + } + + Var* outerTablePartitionColumnVar = makeVar( + outerRtIndex, attnum, partitionColumnVar->vartype, + partitionColumnVar->vartypmod, + partitionColumnVar->varcollid, + 0); + bool isFirstShard = IsFirstShard(cacheEntry, shardId); /* load the interval for the shard and create constant nodes for the upper/lower bounds */ @@ -327,7 +342,7 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList) /* create a function expression node for the hash partition column */ FuncExpr *hashFunction = makeNode(FuncExpr); hashFunction->funcid = cacheEntry->hashFunction->fn_oid; - hashFunction->args = list_make1(partitionColumnVar); + hashFunction->args = list_make1(outerTablePartitionColumnVar); hashFunction->funcresulttype = get_func_rettype(cacheEntry->hashFunction->fn_oid); hashFunction->funcretset = false; diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 3c6690bde..b1268b073 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -756,6 +756,7 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, int outerRtIndex = ((RangeTblRef *) leftNode)->rtindex; RangeTblEntry *rte = rt_fetch(outerRtIndex, query->rtable); RangeTblEntry *innerRte = NULL; + bool planned = false; if (!IsPushdownSafeForRTEInLeftJoin(rte)) { ereport(DEBUG1, (errmsg("recursively planning right side of " @@ -763,6 +764,7 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, "is a recurring rel that is not an RTE"))); RecursivelyPlanDistributedJoinNode(rightNode, query, recursivePlanningContext); + planned = true; } else if (!CheckIfAllCitusRTEsAreColocated(rightNode, query->rtable, &innerRte)) { @@ -770,8 +772,27 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, "since tables in the inner side of the left " "join are not colocated"))); RecursivelyPlanDistributedJoinNode(rightNode, query, - recursivePlanningContext); + recursivePlanningContext); + planned = true; } + + if(!planned) + { + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(innerRte->relid) + if(GetAttrNumForMatchingColumn(rte, innerRte->relid, cacheEntry->partitionColumn) == InvalidAttrNumber) + { + ereport(DEBUG1, (errmsg("recursively planning right side of the left join " + "since the outer side does not have the distribution column"))); + RecursivelyPlanDistributedJoinNode(rightNode, query, recursivePlanningContext); + } + else + { + ereport(DEBUG1, (errmsg("not recursively planning right side of the left join " + "since the outer side is a RangeTblRef and " + "the inner side is colocated with it"))); + } + } + } /* * rightNodeRecurs if there is a recurring table in the right side. However, if the right side diff --git a/src/backend/distributed/utils/query_utils.c b/src/backend/distributed/utils/query_utils.c index 39efcbcb5..e4267aea1 100644 --- a/src/backend/distributed/utils/query_utils.c +++ b/src/backend/distributed/utils/query_utils.c @@ -21,6 +21,7 @@ #include "distributed/query_utils.h" #include "distributed/relation_restriction_equivalence.h" #include "distributed/version_compat.h" +#include "utils/lsyscache.h" static bool CitusQueryableRangeTableRelation(RangeTblEntry *rangeTableEntry); @@ -239,7 +240,8 @@ ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context) * given node are colocated. If they are, it sets the value of rte to a * representative table. */ -bool CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte) +bool +CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte) { ExtractRangeTableIdsContext context; List *idList = NIL; @@ -268,5 +270,29 @@ bool CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **r } return true; +} -} \ No newline at end of file +/* + * GetAttrNumForMatchingColumn returns the attribute number for the column + * in the target relation that matches the given Var. If the column does not + * exist or is not comparable, it returns InvalidAttrNumber. + */ +AttrNumber +GetAttrNumForMatchingColumn(RangeTblEntry *rteTarget, Oid relid, Var *var) +{ + char *targetColumnName = get_attname(relid, var->varattno, false); + AttrNumber attnum = get_attnum(rteTarget->relid, targetColumnName); + if (attnum == InvalidAttrNumber) + { + ereport(DEBUG5, (errmsg("Column %s does not exist in relation %s", + targetColumnName, rteTarget->eref->aliasname))); + return InvalidAttrNumber; + } + if(var->vartype != get_atttype(rteTarget->relid, attnum)) + { + ereport(DEBUG5, (errmsg("Column %s is not comparable for tables with relids %d and %d", + targetColumnName, rteTarget->relid, relid))); + return InvalidAttrNumber; + } + return attnum; +} diff --git a/src/include/distributed/query_utils.h b/src/include/distributed/query_utils.h index 51b00f551..680f2bfc2 100644 --- a/src/include/distributed/query_utils.h +++ b/src/include/distributed/query_utils.h @@ -47,4 +47,5 @@ extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList); extern bool ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList); extern bool ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context); extern bool CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte); +AttrNumber GetAttrNumForMatchingColumn(RangeTblEntry *rteTarget, Oid relid, Var *var); #endif /* QUERY_UTILS_H */