diff --git a/src/backend/distributed/planner/combine_query_planner.c b/src/backend/distributed/planner/combine_query_planner.c index 1d6442ef9..5dc533a7f 100644 --- a/src/backend/distributed/planner/combine_query_planner.c +++ b/src/backend/distributed/planner/combine_query_planner.c @@ -317,8 +317,8 @@ BuildSelectStatementViaStdPlanner(Query *combineQuery, List *remoteScanTargetLis /* - * FindCitusExtradataContainerRTE is a helper function that finds the - * citus_extradata_container in range table entry. + * ExtractCitusExtradataContainerRTE is a helper function that stores rangeTblEntry + * to result if it has citus extra data container. * * The function returns true if it finds the RTE, and false otherwise. */ diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index fc031a27a..1fbead454 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -34,11 +34,13 @@ #include "distributed/combine_query_planner.h" #include "distributed/deparse_shard_query.h" #include "distributed/insert_select_planner.h" +#include "distributed/query_utils.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/metadata_cache.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" +#include "distributed/recursive_planning.h" #include "distributed/shard_utils.h" #include "distributed/utils/citus_stat_tenants.h" #include "distributed/version_compat.h" @@ -205,58 +207,6 @@ UpdateTaskQueryString(Query *query, Task *task) } -/* - * ExtractRangeTableIds walks over the given node, and finds all range - * table entries. - */ -bool -ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context) -{ - List **rangeTableList = context->result; - List *rtable = context->rtable; - - if (node == NULL) - { - return false; - } - - if (IsA(node, RangeTblRef)) - { - int rangeTableIndex = ((RangeTblRef *) node)->rtindex; - - RangeTblEntry *rte = rt_fetch(rangeTableIndex, rtable); - if (rte->rtekind == RTE_SUBQUERY) - { - Query *subquery = rte->subquery; - context->rtable = subquery->rtable; - ExtractRangeTableIds((Node *) subquery, context); - context->rtable = rtable; /* restore original rtable */ - } - else if (rte->rtekind == RTE_RELATION || rte->rtekind == RTE_FUNCTION) - { - (*rangeTableList) = lappend(*rangeTableList, rte); - ereport(DEBUG4, (errmsg("ExtractRangeTableIds: found range table id %d", rte->relid))); - } - else - { - ereport(DEBUG4, (errmsg("Unsupported RTE kind in ExtractRangeTableIds %d", rte->rtekind))); - } - return false; - } - else if (IsA(node, Query)) - { - context->rtable = ((Query *) node)->rtable; - query_tree_walker((Query *) node, ExtractRangeTableIds, context, 0); - context->rtable = rtable; /* restore original rtable */ - return false; - } - else - { - return expression_tree_walker(node, ExtractRangeTableIds, context); - } -} - - /* * Iterates through the FROM clause of the query and checks if there is a join * clause with a reference and distributed table. @@ -291,54 +241,16 @@ GetRepresentativeTablesFromJoinClause(List *fromlist, List *rtable, RangeTblEntr int outerRtIndex = ((RangeTblRef *)joinExpr->larg)->rtindex; RangeTblEntry *outerRte = rt_fetch(outerRtIndex, rtable); - /* the outer table is a reference table */ - if (outerRte->rtekind == RTE_FUNCTION) + if(!IsPushdownSafeForRTEInLeftJoin(outerRte)) { - RangeTblEntry *newRte = NULL; - if (!ExtractCitusExtradataContainerRTE(outerRte, &newRte)) - { - /* RTE does not contain citus_extradata_container */ - return -1; - } - } - else if (outerRte->rtekind == RTE_RELATION) - { - /* OK */ - ereport(DEBUG5, (errmsg("\t\t outerRte: is RTE_RELATION"))); - } - else - { - ereport(DEBUG5, (errmsg("\t\t not supported RTE kind %d", outerRte->rtekind))); + ereport(DEBUG5, (errmsg("GetRepresentativeTablesFromJoinClause: RTE is not pushdown safe"))); return -1; } ereport(DEBUG5, (errmsg("\t OK outerRte: %s", outerRte->eref->aliasname))); - - int innerRelid = InvalidOid; - ExtractRangeTableIdsContext context; - List *innerRteList = NIL; - context.result = &innerRteList; - context.rtable = rtable; - /* TODO: what if we call this also for LHS? */ - ExtractRangeTableIds((Node *)joinExpr->rarg, &context); - List *citusRelids = NIL; - RangeTblEntry *rte = NIL; - ListCell *lc; - - foreach(lc, innerRteList) + if (!CheckIfAllCitusRTEsAreColocated((Node *)joinExpr->rarg, rtable, innerRte)) { - rte = (RangeTblEntry *) lfirst(lc); - if (IsCitusTable(rte->relid)) - { - citusRelids = lappend_int(citusRelids, rte->relid); - *innerRte = rte; // set the value of innerRte - } - } - - if (!AllDistributedRelationsInListColocated(citusRelids)) - { - ereport(DEBUG5, (errmsg("The distributed tables are not colocated"))); return -1; } diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 64a6f88ad..0a0b87592 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -73,6 +73,7 @@ #include "distributed/citus_nodes.h" #include "distributed/citus_ruleutils.h" +#include "distributed/combine_query_planner.h" #include "distributed/commands/multi_copy.h" #include "distributed/distributed_planner.h" #include "distributed/errormessage.h" @@ -709,17 +710,27 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, { /* left join */ - /* TODO: For now, just disable the recursive planning here. - * However, we should add further checks, i.e., left node is a subquery - * that can not be pushed down with additional constrains. + /* Recursively plan the right side of the left left join when the following + * conditions are met: + * 1. The left side is recurring + * 2. The right side is not recurring + * 3. The left side is not a RangeTblRef (i.e., it is not a reference/local table) + * 4. The tables in the rigt side are not colocated. + * 5. The left side does not have the distribution column */ - if (leftNodeRecurs && !rightNodeRecurs && false) + if (leftNodeRecurs && !rightNodeRecurs) { - ereport(DEBUG1, (errmsg("recursively planning right side of " - "the left join since the outer side " - "is a recurring rel"))); - RecursivelyPlanDistributedJoinNode(rightNode, query, - recursivePlanningContext); + int outerRtIndex = ((RangeTblRef *) leftNode)->rtindex; + RangeTblEntry *rte = rt_fetch(outerRtIndex, query->rtable); + + if(!IsPushdownSafeForRTEInLeftJoin(rte)) + { + ereport(DEBUG1, (errmsg("recursively planning right side of " + "the left join since the outer side " + "is a recurring rel that is not an RTE"))); + RecursivelyPlanDistributedJoinNode(rightNode, query, + recursivePlanningContext); + } } /* @@ -2588,3 +2599,35 @@ GeneratingSubplans(void) { return recursivePlanningDepth > 0; } + + +/* + * IsPushdownSafeForRTEInLeftJoin returns true if the given range table entry + * is safe for pushdown. Currently, we only allow RTE_RELATION and RTE_FUNCTION. + */ +bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte) +{ + if (rte->rtekind == RTE_RELATION) + { + return true; + } + /* check if it is a citus table, e.g., ref table */ + else if (rte->rtekind == RTE_FUNCTION) + { + RangeTblEntry *newRte = NULL; + if(!ExtractCitusExtradataContainerRTE(rte, &newRte)) + { + ereport(DEBUG5, (errmsg("RTE type %d is not safe for pushdown, function but it does not contain citus extradata", + rte->rtekind))); + return false; + } + return true; + } + else + { + ereport(DEBUG5, (errmsg("RTE type %d is not safe for pushdown", + rte->rtekind))); + return false; + } + +} diff --git a/src/backend/distributed/utils/query_utils.c b/src/backend/distributed/utils/query_utils.c index ac33bdd52..39efcbcb5 100644 --- a/src/backend/distributed/utils/query_utils.c +++ b/src/backend/distributed/utils/query_utils.c @@ -15,9 +15,11 @@ #include "catalog/pg_class.h" #include "nodes/nodeFuncs.h" #include "nodes/primnodes.h" +#include "parser/parsetree.h" #include "distributed/listutils.h" #include "distributed/query_utils.h" +#include "distributed/relation_restriction_equivalence.h" #include "distributed/version_compat.h" @@ -178,3 +180,93 @@ ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList) return walkerResult; } + + +/* + * ExtractRangeTableIds walks over the given node, and finds all range + * table entries. + */ +bool +ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context) +{ + List **rangeTableList = context->idList; + List *rtable = context->rtable; + + if (node == NULL) + { + return false; + } + + if (IsA(node, RangeTblRef)) + { + int rangeTableIndex = ((RangeTblRef *) node)->rtindex; + + RangeTblEntry *rte = rt_fetch(rangeTableIndex, rtable); + if (rte->rtekind == RTE_SUBQUERY) + { + Query *subquery = rte->subquery; + context->rtable = subquery->rtable; + ExtractRangeTableIds((Node *) subquery, context); + context->rtable = rtable; /* restore original rtable */ + } + else if (rte->rtekind == RTE_RELATION || rte->rtekind == RTE_FUNCTION) + { + (*rangeTableList) = lappend(*rangeTableList, rte); + ereport(DEBUG4, (errmsg("ExtractRangeTableIds: found range table id %d", rte->relid))); + } + else + { + ereport(DEBUG4, (errmsg("Unsupported RTE kind in ExtractRangeTableIds %d", rte->rtekind))); + } + return false; + } + else if (IsA(node, Query)) + { + context->rtable = ((Query *) node)->rtable; + query_tree_walker((Query *) node, ExtractRangeTableIds, context, 0); + context->rtable = rtable; /* restore original rtable */ + return false; + } + else + { + return expression_tree_walker(node, ExtractRangeTableIds, context); + } +} + + +/* + * CheckIfAllCitusRTEsAreColocated checks if all distributed tables in the + * given node are colocated. If they are, it sets the value of rte to a + * representative table. + */ +bool CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte) +{ + ExtractRangeTableIdsContext context; + List *idList = NIL; + context.idList = &idList; + context.rtable = rtable; + ExtractRangeTableIds(node, &context); + + RangeTblEntry *rteTmp; + List *citusRelids = NIL; + ListCell *lc = NULL; + + foreach(lc, idList) + { + rteTmp = (RangeTblEntry *) lfirst(lc); + if (IsCitusTable(rteTmp->relid)) + { + citusRelids = lappend_int(citusRelids, rteTmp->relid); + *rte = rteTmp; // set the value of rte, a representative table + } + } + + if (!AllDistributedRelationsInListColocated(citusRelids)) + { + ereport(DEBUG5, (errmsg("The distributed tables are not colocated"))); + return false; + } + + return true; + +} \ No newline at end of file diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 7817cbd3d..e68bb067e 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -23,15 +23,8 @@ #include "distributed/query_utils.h" -/* Struct to pass rtable list and the result list to walker */ -typedef struct ExtractRangeTableIdsContext -{ - List **result; - List *rtable; -} ExtractRangeTableIdsContext; int GetRepresentativeTablesFromJoinClause(List *fromlist, List *rtable, RangeTblEntry **innerRte); -bool ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context); extern void RebuildQueryStrings(Job *workerJob); extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern bool UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList); diff --git a/src/include/distributed/query_utils.h b/src/include/distributed/query_utils.h index 0b216d158..51b00f551 100644 --- a/src/include/distributed/query_utils.h +++ b/src/include/distributed/query_utils.h @@ -30,6 +30,13 @@ typedef struct ExtractRangeTableWalkerContext ExtractRangeTableMode walkerMode; } ExtractRangeTableWalkerContext; +/* Struct to pass rtable list and the result list to walker */ +typedef struct ExtractRangeTableIdsContext +{ + List **idList; + List *rtable; +} ExtractRangeTableIdsContext; + /* Function declarations for query-walker utility functions */ extern bool ExtractRangeTableList(Node *node, ExtractRangeTableWalkerContext *context); @@ -38,5 +45,6 @@ extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList); extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList); extern bool ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList); - +extern bool ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context); +extern bool CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte); #endif /* QUERY_UTILS_H */ diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index b4aaa4785..b7fad123d 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -51,6 +51,7 @@ extern bool IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry); extern bool IsRelationLocalTableOrMatView(Oid relationId); extern bool ContainsReferencesToOuterQuery(Query *query); extern void UpdateVarNosInNode(Node *node, Index newVarNo); +extern bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte); #endif /* RECURSIVE_PLANNING_H */