diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index ee025dc40..321fdacb1 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -42,6 +42,7 @@ #include "distributed/query_utils.h" #include "distributed/recursive_planning.h" #include "distributed/shardinterval_utils.h" +#include "distributed/shard_utils.h" #include "distributed/version_compat.h" #include "distributed/worker_shard_visibility.h" #include "executor/executor.h" @@ -127,7 +128,7 @@ static void ResetPlannerRestrictionContext( static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams); static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList); static bool QueryIsNotSimpleSelect(Node *node); -static bool UpdateReferenceTablesWithShard(Node *node, void *context); +static void UpdateReferenceTablesWithShard(List *rangeTableList); static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, Node *distributionKeyValue); static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, @@ -160,8 +161,9 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * reference table names with shard tables names in the query, so * we can use the standard_planner for planning it locally. */ + UpdateReferenceTablesWithShard(rangeTableList); + needsDistributedPlanning = false; - UpdateReferenceTablesWithShard((Node *) parse, NULL); } else { @@ -318,6 +320,37 @@ ExtractRangeTableEntryList(Query *query) } +/* + * ExtractClassifiedRangeTableEntryList extracts reference table rte's from + * the given rte list. + * Callers of this function are responsible for passing referenceTableRTEList + * to be non-null and initially pointing to an empty list. + */ +List * +ExtractReferenceTableRTEList(List *rteList) +{ + List *referenceTableRTEList = NIL; + + RangeTblEntry *rte = NULL; + foreach_ptr(rte, rteList) + { + if (rte->rtekind != RTE_RELATION || rte->relkind != RELKIND_RELATION) + { + continue; + } + + Oid relationOid = rte->relid; + if (IsCitusTable(relationOid) && PartitionMethod(relationOid) == + DISTRIBUTE_BY_NONE) + { + referenceTableRTEList = lappend(referenceTableRTEList, rte); + } + } + + return referenceTableRTEList; +} + + /* * NeedsDistributedPlanning returns true if the Citus extension is loaded and * the query contains a distributed table. @@ -2463,62 +2496,25 @@ QueryIsNotSimpleSelect(Node *node) /* * UpdateReferenceTablesWithShard recursively replaces the reference table names - * in the given query with the shard table names. + * in the given range table list with the local shard table names. */ -static bool -UpdateReferenceTablesWithShard(Node *node, void *context) +static void +UpdateReferenceTablesWithShard(List *rangeTableList) { - if (node == NULL) + List *referenceTableRTEList = ExtractReferenceTableRTEList(rangeTableList); + + RangeTblEntry *rangeTableEntry = NULL; + foreach_ptr(rangeTableEntry, referenceTableRTEList) { - return false; + Oid referenceTableLocalShardOid = GetReferenceTableLocalShardOid( + rangeTableEntry->relid); + + rangeTableEntry->relid = referenceTableLocalShardOid; + + /* + * Parser locks relations in addRangeTableEntry(). So we should lock the + * modified ones too. + */ + LockRelationOid(referenceTableLocalShardOid, AccessShareLock); } - - /* want to look at all RTEs, even in subqueries, CTEs and such */ - if (IsA(node, Query)) - { - return query_tree_walker((Query *) node, UpdateReferenceTablesWithShard, - NULL, QTW_EXAMINE_RTES_BEFORE); - } - - if (!IsA(node, RangeTblEntry)) - { - return expression_tree_walker(node, UpdateReferenceTablesWithShard, - NULL); - } - - RangeTblEntry *newRte = (RangeTblEntry *) node; - - if (newRte->rtekind != RTE_RELATION) - { - return false; - } - - Oid relationId = newRte->relid; - if (!IsCitusTable(relationId)) - { - return false; - } - - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE) - { - return false; - } - - ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[0]; - uint64 shardId = shardInterval->shardId; - - char *relationName = get_rel_name(relationId); - AppendShardIdToName(&relationName, shardId); - - Oid schemaId = get_rel_namespace(relationId); - newRte->relid = get_relname_relid(relationName, schemaId); - - /* - * Parser locks relations in addRangeTableEntry(). So we should lock the - * modified ones too. - */ - LockRelationOid(newRte->relid, AccessShareLock); - - return false; } diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 4197c67bf..b0ecbc2ce 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -186,6 +186,7 @@ typedef struct CitusCustomScanPath extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); extern List * ExtractRangeTableEntryList(Query *query); +extern List * ExtractReferenceTableRTEList(List *rteList); extern bool NeedsDistributedPlanning(Query *query); extern struct DistributedPlan * GetDistributedPlan(CustomScan *node); extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,