From 5693cabc41b81d342b440eac50554eedc1b18001 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Thu, 26 Nov 2020 09:39:54 +0300 Subject: [PATCH] Not convert an already routable plannable query We should not recursively plan an already routable plannable query. An example of this is (SELECT * FROM local JOIN (SELECT * FROM dist) d1 USING(a)); So we let the recursive planner do all of its work and at the end we convert the final query to to handle unsupported joins. While doing each conversion, we check if it is router plannable, if so we stop. Only consider range table entries that are in jointree If a range table is not in jointree then there is no point in considering that because we are trying to convert range table entries to subqueries for join use case. --- src/backend/distributed/commands/index.c | 11 +- .../distributed/metadata/metadata_cache.c | 15 + .../distributed/operations/node_protocol.c | 1 + .../planner/local_distributed_join_planner.c | 529 ++++++++++++------ .../planner/multi_physical_planner.c | 116 ++-- .../planner/multi_router_planner.c | 95 ++-- .../distributed/planner/recursive_planning.c | 75 ++- .../relation_restriction_equivalence.c | 14 +- src/include/distributed/commands.h | 5 + .../local_distributed_join_planner.h | 5 +- src/include/distributed/metadata_cache.h | 2 + .../distributed/multi_physical_planner.h | 2 +- .../distributed/multi_router_planner.h | 2 + src/include/distributed/recursive_planning.h | 11 +- .../regress/expected/local_table_join.out | 64 ++- src/test/regress/sql/local_table_join.sql | 43 +- 16 files changed, 688 insertions(+), 302 deletions(-) diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index 7a9c5c9ad..cf68b90f6 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -249,7 +249,15 @@ CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement) return namespaceId; } -List* ExecuteFunctionOnEachTableIndex(Oid relationId, IndexProcesor indexProcessor) { + +/* + * ExecuteFunctionOnEachTableIndex executes the given indexProcessor function on each + * index of the given relation. + * It returns a list that is filled by the indexProcessor. + */ +List * +ExecuteFunctionOnEachTableIndex(Oid relationId, IndexProcesor indexProcessor) +{ List *result = NIL; ScanKeyData scanKey[1]; int scanKeyCount = 1; @@ -285,6 +293,7 @@ List* ExecuteFunctionOnEachTableIndex(Oid relationId, IndexProcesor indexProcess return result; } + /* * ExecuteFunctionOnEachTableIndex executes the given pgIndexProcessor function on each * index of the given relation. diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 1e5ebac6a..83fbad1bc 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -293,6 +293,21 @@ EnsureModificationsCanRun(void) } +/* + * IsLocalOrCitusLocalTable returns true if the given relation + * is either a local or citus local table. + */ +bool +IsLocalOrCitusLocalTable(Oid relationId) +{ + if (!IsCitusTable(relationId)) + { + return true; + } + return IsCitusTableType(relationId, CITUS_LOCAL_TABLE); +} + + /* * IsCitusTableType returns true if the given table with relationId * belongs to a citus table that matches the given table type. If cache diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 0eb23f6a5..4a23a5052 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -729,6 +729,7 @@ GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLE } } + /* * IndexImpliedByAConstraint is a helper function to be used while scanning * pg_index. It returns true if the index identified by the given indexForm is diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index 72499a93c..680fb7b3a 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -54,212 +54,398 @@ #include "utils/guc.h" #include "utils/lsyscache.h" -typedef struct RTEToSubqueryConverterReference { - RangeTblEntry* rangeTableEntry; +typedef struct RTEToSubqueryConverterReference +{ + RangeTblEntry *rangeTableEntry; Index rteIndex; - List* restrictionList; - List* requiredAttributeNumbers; + List *restrictionList; + List *requiredAttributeNumbers; } RTEToSubqueryConverterReference; -typedef struct RTEToSubqueryConverterContext{ - List* distributedTableList; /* reference or distributed table */ - List* localTableList; - List* citusLocalTableList; +typedef struct RTEToSubqueryConverterContext +{ + List *distributedTableList; /* reference or distributed table */ + List *localTableList; /* local or citus local table */ bool hasSubqueryRTE; }RTEToSubqueryConverterContext; -static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList, Oid resultRelationId); -static bool HasUniqueFilter(RangeTblEntry* distRTE, List* distRTERestrictionList, List* requiredAttrNumbersForDistRTE); -static bool AutoConvertLocalTableJoinToSubquery(FromExpr* joinTree, - RTEToSubqueryConverterReference* distRTEContext); +static Oid GetResultRelationId(Query *query); +static Oid GetRTEToSubqueryConverterReferenceRelId( + RTEToSubqueryConverterReference *rteToSubqueryConverterReference); +static bool ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid + resultRelationId); +static bool HasUniqueFilter(RangeTblEntry *distRTE, List *distRTERestrictionList, + List *requiredAttrNumbersForDistRTE); +static bool ShouldConvertDistributedTable(FromExpr *joinTree, + RTEToSubqueryConverterReference *distRTEContext); static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, RecursivePlanningContext *planningContext); -static RTEToSubqueryConverterContext * CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context, - List *rangeTableList); -static void GetAllUniqueIndexes(Form_pg_index indexForm, List** uniqueIndexes); -static RTEToSubqueryConverterReference* - GetNextRTEToConvertToSubquery(FromExpr* joinTree, RTEToSubqueryConverterContext* rteToSubqueryConverterContext, - PlannerRestrictionContext *plannerRestrictionContext, RangeTblEntry* resultRelation); -static void PopFromRTEToSubqueryConverterContext(RTEToSubqueryConverterContext* rteToSubqueryConverterContext, - bool isCitusLocalTable); +static RTEToSubqueryConverterContext * CreateRTEToSubqueryConverterContext( + RecursivePlanningContext *context, + List * + rangeTableList); +static void GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes); +static RTEToSubqueryConverterReference * GetNextRTEToConvertToSubquery(FromExpr *joinTree, + RTEToSubqueryConverterContext + * + rteToSubqueryConverterContext, + PlannerRestrictionContext + * + plannerRestrictionContext, + Oid + resultRelationId); +static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList, + List **joinRangeTableEntries); +static void RemoveFromRTEToSubqueryConverterContext( + RTEToSubqueryConverterContext *rteToSubqueryConverterContext, Oid relationId); +static bool FillLocalAndDistributedRTECandidates( + RTEToSubqueryConverterContext *rteToSubqueryConverterContext, + RTEToSubqueryConverterReference ** + localRTECandidate, + RTEToSubqueryConverterReference ** + distributedRTECandidate); + /* - * ConvertLocalTableJoinsToSubqueries gets a query and the planner - * restrictions. As long as there is a join between a local table - * and distributed table, the function wraps one table in a - * subquery (by also pushing the filters on the table down - * to the subquery). - * - * Once this function returns, there are no direct joins between - * local and distributed tables. + * ConvertUnplannableTableJoinsToSubqueries gets a query and the planner + * restrictions. As long as the query is not plannable by router planner, + * it converts either a local or distributed table to a subquery. */ void -ConvertLocalTableJoinsToSubqueries(Query *query, - RecursivePlanningContext *context) +ConvertUnplannableTableJoinsToSubqueries(Query *query, + RecursivePlanningContext *context) { - List *rangeTableList = query->rtable; - RangeTblEntry *resultRelation = ExtractResultRelationRTE(query); - Oid resultRelationId = InvalidOid; - if (resultRelation) { - resultRelationId = resultRelation->relid; - } - if (!ShouldConvertLocalTableJoinsToSubqueries(rangeTableList, resultRelationId)) { + List *rangeTableList = NIL; + GetRangeTableEntriesFromJoinTree((Node *) query->jointree, query->rtable, + &rangeTableList); + Oid resultRelationId = GetResultRelationId(query); + if (!ShouldConvertLocalTableJoinsToSubqueries(rangeTableList, resultRelationId)) + { return; } - RTEToSubqueryConverterContext* rteToSubqueryConverterContext = CreateRTEToSubqueryConverterContext( - context, rangeTableList); + RTEToSubqueryConverterContext *rteToSubqueryConverterContext = + CreateRTEToSubqueryConverterContext( + context, rangeTableList); - RTEToSubqueryConverterReference* rteToSubqueryConverterReference = - GetNextRTEToConvertToSubquery(query->jointree, rteToSubqueryConverterContext, - context->plannerRestrictionContext, resultRelation); - while (rteToSubqueryConverterReference) + RTEToSubqueryConverterReference *rteToSubqueryConverterReference = + GetNextRTEToConvertToSubquery(query->jointree, rteToSubqueryConverterContext, + context->plannerRestrictionContext, + resultRelationId); + + PlannerRestrictionContext *plannerRestrictionContext = + FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query); + while (rteToSubqueryConverterReference && !IsRouterPlannable(query, + plannerRestrictionContext)) { - ReplaceRTERelationWithRteSubquery(rteToSubqueryConverterReference->rangeTableEntry, - rteToSubqueryConverterReference->restrictionList, - rteToSubqueryConverterReference->requiredAttributeNumbers); - rteToSubqueryConverterReference = + ReplaceRTERelationWithRteSubquery( + rteToSubqueryConverterReference->rangeTableEntry, + rteToSubqueryConverterReference->restrictionList, + rteToSubqueryConverterReference-> + requiredAttributeNumbers, + context); + RemoveFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext, + rteToSubqueryConverterReference-> + rangeTableEntry->relid); + rteToSubqueryConverterReference = GetNextRTEToConvertToSubquery(query->jointree, rteToSubqueryConverterContext, - context->plannerRestrictionContext, resultRelation); + context->plannerRestrictionContext, + resultRelationId); } } -static RTEToSubqueryConverterReference* - GetNextRTEToConvertToSubquery(FromExpr* joinTree, RTEToSubqueryConverterContext* rteToSubqueryConverterContext, - PlannerRestrictionContext *plannerRestrictionContext, RangeTblEntry* resultRelation) { - RTEToSubqueryConverterReference* localRTECandidate = NULL; - RTEToSubqueryConverterReference* nonLocalRTECandidate = NULL; - bool citusLocalTableChosen = false; +/* + * GetResultRelationId gets the result relation id from query + * if it exists. + */ +static Oid +GetResultRelationId(Query *query) +{ + RangeTblEntry *resultRelation = ExtractResultRelationRTE(query); + Oid resultRelationId = InvalidOid; + if (resultRelation) + { + resultRelationId = resultRelation->relid; + } + return resultRelationId; +} - if (list_length(rteToSubqueryConverterContext->localTableList) > 0) { - localRTECandidate = linitial(rteToSubqueryConverterContext->localTableList); - }else if (list_length(rteToSubqueryConverterContext->citusLocalTableList) > 0) { - localRTECandidate = linitial(rteToSubqueryConverterContext->citusLocalTableList); - citusLocalTableChosen = true; - } - if (localRTECandidate == NULL) { - return NULL; - } - if (list_length(rteToSubqueryConverterContext->distributedTableList) > 0) { - nonLocalRTECandidate = linitial(rteToSubqueryConverterContext->distributedTableList); - } - if (nonLocalRTECandidate == NULL && !rteToSubqueryConverterContext->hasSubqueryRTE) { - return NULL; +/* + * GetRangeTableEntriesFromJoinTree gets the range table entries that are + * on the given join tree. + */ +static void +GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList, + List **joinRangeTableEntries) +{ + if (joinNode == NULL) + { + return; } + else if (IsA(joinNode, FromExpr)) + { + FromExpr *fromExpr = (FromExpr *) joinNode; + Node *fromElement; - if (resultRelation) { - if(resultRelation == localRTECandidate->rangeTableEntry) { - rteToSubqueryConverterContext->distributedTableList = list_delete_first( - rteToSubqueryConverterContext->distributedTableList - ); - return nonLocalRTECandidate; + foreach_ptr(fromElement, fromExpr->fromlist) + { + GetRangeTableEntriesFromJoinTree(fromElement, rangeTableList, + joinRangeTableEntries); } - if (resultRelation == nonLocalRTECandidate->rangeTableEntry) { - PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen); + } + else if (IsA(joinNode, JoinExpr)) + { + JoinExpr *joinExpr = (JoinExpr *) joinNode; + GetRangeTableEntriesFromJoinTree(joinExpr->larg, rangeTableList, + joinRangeTableEntries); + GetRangeTableEntriesFromJoinTree(joinExpr->rarg, rangeTableList, + joinRangeTableEntries); + } + else if (IsA(joinNode, RangeTblRef)) + { + int rangeTableIndex = ((RangeTblRef *) joinNode)->rtindex; + RangeTblEntry *rte = rt_fetch(rangeTableIndex, rangeTableList); + *joinRangeTableEntries = lappend(*joinRangeTableEntries, rte); + } + else + { + pg_unreachable(); + } +} + + +/* + * GetNextRTEToConvertToSubquery returns the range table entry + * which should be converted to a subquery. It considers the local join policy + * and result relation. + */ +static RTEToSubqueryConverterReference * +GetNextRTEToConvertToSubquery(FromExpr *joinTree, + RTEToSubqueryConverterContext *rteToSubqueryConverterContext, + PlannerRestrictionContext *plannerRestrictionContext, Oid + resultRelationId) +{ + RTEToSubqueryConverterReference *localRTECandidate = NULL; + RTEToSubqueryConverterReference *distributedRTECandidate = NULL; + if (!FillLocalAndDistributedRTECandidates(rteToSubqueryConverterContext, + &localRTECandidate, + &distributedRTECandidate)) + { + return NULL; + } + + if (OidIsValid(resultRelationId)) + { + if (resultRelationId == GetRTEToSubqueryConverterReferenceRelId( + localRTECandidate)) + { + return distributedRTECandidate; + } + if (resultRelationId == GetRTEToSubqueryConverterReferenceRelId( + distributedRTECandidate)) + { return localRTECandidate; } } - if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL) { - PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen); + if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL) + { return localRTECandidate; - }else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) { - if (nonLocalRTECandidate) { - rteToSubqueryConverterContext->distributedTableList = list_delete_first( - rteToSubqueryConverterContext->distributedTableList - ); - return nonLocalRTECandidate; - }else { - PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen); - return localRTECandidate; - } - - }else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_AUTO) { - bool shouldConvertNonLocalTable = AutoConvertLocalTableJoinToSubquery(joinTree, nonLocalRTECandidate); - if (shouldConvertNonLocalTable) { - rteToSubqueryConverterContext->distributedTableList = list_delete_first( - rteToSubqueryConverterContext->distributedTableList - ); - return nonLocalRTECandidate; - }else { - PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen); - return localRTECandidate; - } - }else { - elog(ERROR, "unexpected local table join policy: %d", LocalTableJoinPolicy); } - return NULL; + else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) + { + if (distributedRTECandidate) + { + return distributedRTECandidate; + } + else + { + return localRTECandidate; + } + } + else + { + if (ShouldConvertDistributedTable(joinTree, distributedRTECandidate)) + { + return distributedRTECandidate; + } + else + { + return localRTECandidate; + } + } } -static void PopFromRTEToSubqueryConverterContext(RTEToSubqueryConverterContext* rteToSubqueryConverterContext, - bool isCitusLocalTable) { - if (isCitusLocalTable) { - rteToSubqueryConverterContext->citusLocalTableList = - list_delete_first(rteToSubqueryConverterContext->citusLocalTableList); - }else { - rteToSubqueryConverterContext->localTableList = + +/* + * FillLocalAndDistributedRTECandidates fills the local and distributed RTE candidates. + * It returns true if we should continue converting tables to subqueries. + */ +static bool +FillLocalAndDistributedRTECandidates( + RTEToSubqueryConverterContext *rteToSubqueryConverterContext, + RTEToSubqueryConverterReference **localRTECandidate, + RTEToSubqueryConverterReference ** + distributedRTECandidate) +{ + if (list_length(rteToSubqueryConverterContext->localTableList) > 0) + { + *localRTECandidate = linitial(rteToSubqueryConverterContext->localTableList); + } + if (*localRTECandidate == NULL) + { + return false; + } + + if (list_length(rteToSubqueryConverterContext->distributedTableList) > 0) + { + *distributedRTECandidate = linitial( + rteToSubqueryConverterContext->distributedTableList); + } + return *distributedRTECandidate != NULL || + rteToSubqueryConverterContext->hasSubqueryRTE; +} + + +/* + * GetRTEToSubqueryConverterReferenceRelId returns the underlying relation id + * if it is a valid one. + */ +static Oid +GetRTEToSubqueryConverterReferenceRelId( + RTEToSubqueryConverterReference *rteToSubqueryConverterReference) +{ + if (rteToSubqueryConverterReference && + rteToSubqueryConverterReference->rangeTableEntry) + { + return rteToSubqueryConverterReference->rangeTableEntry->relid; + } + return InvalidOid; +} + + +/* + * RemoveFromRTEToSubqueryConverterContext removes an element from + * the relevant list based on the relation id. + */ +static void +RemoveFromRTEToSubqueryConverterContext( + RTEToSubqueryConverterContext *rteToSubqueryConverterContext, Oid relationId) +{ + if (IsLocalOrCitusLocalTable(relationId)) + { + rteToSubqueryConverterContext->localTableList = list_delete_first(rteToSubqueryConverterContext->localTableList); } + else + { + rteToSubqueryConverterContext->distributedTableList = + list_delete_first(rteToSubqueryConverterContext->distributedTableList); + } } + /* * ShouldConvertLocalTableJoinsToSubqueries returns true if we should * convert local-dist table joins to subqueries. */ -static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList, Oid resultRelationId) { +static bool +ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid resultRelationId) +{ if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER) { /* user doesn't want Citus to enable local table joins */ return false; } - if (!ContainsTableToBeConvertedToSubquery(rangeTableList, resultRelationId)) { + if (!ContainsTableToBeConvertedToSubquery(rangeTableList, resultRelationId)) + { return false; } return true; } -static bool AutoConvertLocalTableJoinToSubquery(FromExpr* joinTree, - RTEToSubqueryConverterReference* rteToSubqueryConverterReference) { - if (rteToSubqueryConverterReference == NULL) { + +/* + * ShouldConvertDistributedTable returns true if we should convert the + * distributed table rte to a subquery. This will be the case if the distributed + * table has a unique index on a column that appears in filter. + */ +static bool +ShouldConvertDistributedTable(FromExpr *joinTree, + RTEToSubqueryConverterReference * + rteToSubqueryConverterReference) +{ + if (rteToSubqueryConverterReference == NULL) + { return false; } - List* distRTEEqualityQuals = - FetchAttributeNumsForRTEFromQuals(joinTree->quals, rteToSubqueryConverterReference->rteIndex); + List *distRTEEqualityQuals = + FetchEqualityAttrNumsForRTEFromQuals(joinTree->quals, + rteToSubqueryConverterReference->rteIndex); - Node* join = NULL; - foreach_ptr(join, joinTree->fromlist) { - if (IsA(join, JoinExpr)) { - JoinExpr* joinExpr = (JoinExpr*) join; - distRTEEqualityQuals = list_concat(distRTEEqualityQuals, - FetchAttributeNumsForRTEFromQuals(joinExpr->quals, rteToSubqueryConverterReference->rteIndex) - ); + Node *join = NULL; + foreach_ptr(join, joinTree->fromlist) + { + if (IsA(join, JoinExpr)) + { + JoinExpr *joinExpr = (JoinExpr *) join; + distRTEEqualityQuals = list_concat(distRTEEqualityQuals, + FetchEqualityAttrNumsForRTEFromQuals( + joinExpr->quals, + rteToSubqueryConverterReference-> + rteIndex) + ); } } - bool hasUniqueFilter = HasUniqueFilter(rteToSubqueryConverterReference->rangeTableEntry, - rteToSubqueryConverterReference->restrictionList, distRTEEqualityQuals); - return hasUniqueFilter; - + bool hasUniqueFilter = HasUniqueFilter( + rteToSubqueryConverterReference->rangeTableEntry, + rteToSubqueryConverterReference-> + restrictionList, distRTEEqualityQuals); + return hasUniqueFilter; } -static bool HasUniqueFilter(RangeTblEntry* distRTE, List* distRTERestrictionList, List* requiredAttrNumbersForDistRTE) { - List* uniqueIndexes = ExecuteFunctionOnEachTableIndex(distRTE->relid, GetAllUniqueIndexes); - int columnNumber = 0; - foreach_int(columnNumber, uniqueIndexes) { - if (list_member_int(requiredAttrNumbersForDistRTE, columnNumber)) { - return true; - } - } - return false; + +/* + * HasUniqueFilter returns true if the given RTE has a unique filter + * on a column, which is a member of the given requiredAttrNumbersForDistRTE. + */ +static bool +HasUniqueFilter(RangeTblEntry *distRTE, List *distRTERestrictionList, + List *requiredAttrNumbersForDistRTE) +{ + List *uniqueIndexes = ExecuteFunctionOnEachTableIndex(distRTE->relid, + GetAllUniqueIndexes); + int columnNumber = 0; + foreach_int(columnNumber, uniqueIndexes) + { + if (list_member_int(requiredAttrNumbersForDistRTE, columnNumber)) + { + return true; + } + } + return false; } -static void GetAllUniqueIndexes(Form_pg_index indexForm, List** uniqueIndexes) { - if (indexForm->indisunique || indexForm->indisprimary) { - for(int i = 0; i < indexForm->indkey.dim1; i++) { - *uniqueIndexes = list_append_unique_int(*uniqueIndexes, indexForm->indkey.values[i]); - } - } + +/* + * GetAllUniqueIndexes adds the given index's column numbers if it is a + * unique index. + * TODO:: if there is a unique index on a multiple column, then we should + * probably return true only if all the columns in the index exist in the filter. + */ +static void +GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes) +{ + if (indexForm->indisunique || indexForm->indisprimary) + { + for (int i = 0; i < indexForm->indkey.dim1; i++) + { + *uniqueIndexes = list_append_unique_int(*uniqueIndexes, + indexForm->indkey.values[i]); + } + } } @@ -287,7 +473,8 @@ RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, List *filteredRelationRestrictionList = relationRestrictionContext->relationRestrictionList; - if (list_length(filteredRelationRestrictionList) == 0) { + if (list_length(filteredRelationRestrictionList) == 0) + { return NIL; } RelationRestriction *relationRestriction = @@ -324,46 +511,56 @@ RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, * The function also gets a boolean localTable parameter, so the caller * can choose to run the function for only local tables or distributed tables. */ -static RTEToSubqueryConverterContext* +static RTEToSubqueryConverterContext * CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context, - List *rangeTableList) + List *rangeTableList) { - - RTEToSubqueryConverterContext* rteToSubqueryConverterContext = palloc0(sizeof(RTEToSubqueryConverterContext)); + RTEToSubqueryConverterContext *rteToSubqueryConverterContext = palloc0( + sizeof(RTEToSubqueryConverterContext)); int rteIndex = 0; - RangeTblEntry* rangeTableEntry = NULL; + RangeTblEntry *rangeTableEntry = NULL; foreach_ptr(rangeTableEntry, rangeTableList) { rteIndex++; - if (rangeTableEntry->rtekind == RTE_SUBQUERY) { + if (rangeTableEntry->rtekind == RTE_SUBQUERY) + { rteToSubqueryConverterContext->hasSubqueryRTE = true; } + /* we're only interested in tables */ if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) { continue; } - RTEToSubqueryConverterReference* rteToSubqueryConverter = palloc(sizeof(RTEToSubqueryConverterReference)); + RTEToSubqueryConverterReference *rteToSubqueryConverter = palloc( + sizeof(RTEToSubqueryConverterReference)); rteToSubqueryConverter->rangeTableEntry = rangeTableEntry; rteToSubqueryConverter->rteIndex = rteIndex; - rteToSubqueryConverter->restrictionList = GetRestrictInfoListForRelation(rangeTableEntry, - context->plannerRestrictionContext, 1); - rteToSubqueryConverter->requiredAttributeNumbers = RequiredAttrNumbersForRelation(rangeTableEntry, context); + rteToSubqueryConverter->restrictionList = GetRestrictInfoListForRelation( + rangeTableEntry, + context-> + plannerRestrictionContext, 1); + rteToSubqueryConverter->requiredAttributeNumbers = RequiredAttrNumbersForRelation( + rangeTableEntry, context); - bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE) || - IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE); - if (referenceOrDistributedTable) { - rteToSubqueryConverterContext->distributedTableList = - lappend(rteToSubqueryConverterContext->distributedTableList, rteToSubqueryConverter); - }else if (IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE)) { - rteToSubqueryConverterContext->citusLocalTableList = - lappend(rteToSubqueryConverterContext->citusLocalTableList, rteToSubqueryConverter); - }else { - rteToSubqueryConverterContext->localTableList = - lappend(rteToSubqueryConverterContext->localTableList, rteToSubqueryConverter); + bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid, + REFERENCE_TABLE) || + IsCitusTableType(rangeTableEntry->relid, + DISTRIBUTED_TABLE); + if (referenceOrDistributedTable) + { + rteToSubqueryConverterContext->distributedTableList = + lappend(rteToSubqueryConverterContext->distributedTableList, + rteToSubqueryConverter); + } + else + { + rteToSubqueryConverterContext->localTableList = + lappend(rteToSubqueryConverterContext->localTableList, + rteToSubqueryConverter); } } return rteToSubqueryConverterContext; -} \ No newline at end of file +} diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 6e05268f9..0bc31a9f5 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -233,6 +233,9 @@ static StringInfo ColumnTypeArrayString(List *targetEntryList); static bool CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval); +static List * FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr, Index rteIndex); +static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr, Index rteIndex); + #if PG_VERSION_NUM >= PG_VERSION_13 static List * GetColumnOriginalIndexes(Oid relationId); #endif @@ -3588,8 +3591,16 @@ NodeIsRangeTblRefReferenceTable(Node *node, List *rangeTableList) return IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE); } -List* FetchAttributeNumsForRTEFromQuals(Node* quals, Index rteIndex) { - List* attributeNums = NIL; + +/* + * FetchEqualityAttrNumsForRTEFromQuals fetches the attribute numbers from quals + * which: + * - has equality operator + * - belongs to rangeTableEntry with rteIndex + */ +List * +FetchEqualityAttrNumsForRTEFromQuals(Node *quals, Index rteIndex) +{ if (quals == NULL) { return NIL; @@ -3597,48 +3608,79 @@ List* FetchAttributeNumsForRTEFromQuals(Node* quals, Index rteIndex) { if (IsA(quals, OpExpr)) { - if (!NodeIsEqualsOpExpr(quals)) - { - return NIL; - } - OpExpr *nextJoinClauseOpExpr = castNode(OpExpr, quals); - - Var* var = NULL; - if (VarConstOpExprClause(nextJoinClauseOpExpr, &var, NULL)) { - attributeNums = lappend_int(attributeNums, var->varattno); - return attributeNums; - } - + return FetchEqualityAttrNumsForRTEOpExpr((OpExpr *) quals, rteIndex); } else if (IsA(quals, BoolExpr)) { - BoolExpr *boolExpr = (BoolExpr *) quals; - - if (boolExpr->boolop != AND_EXPR && boolExpr->boolop != OR_EXPR) { - return attributeNums; - } - - bool hasEquality = true; - Node* arg = NULL; - foreach_ptr(arg, boolExpr->args) - { - List* attributeNumsInSubExpression = FetchAttributeNumsForRTEFromQuals(arg, rteIndex); - if (boolExpr->boolop == AND_EXPR) - { - hasEquality |= list_length(attributeNumsInSubExpression) > 0; - }else if (boolExpr->boolop == OR_EXPR){ - hasEquality &= list_length(attributeNumsInSubExpression) > 0; - } - attributeNums = list_concat(attributeNums, attributeNumsInSubExpression); - - } - if (hasEquality) { - return attributeNums; - } + return FetchEqualityAttrNumsForRTEBoolExpr((BoolExpr *) quals, rteIndex); } return NIL; } + +/* + * FetchEqualityAttrNumsForRTEOpExpr fetches the attribute numbers from opExpr + * which: + * - has equality operator + * - belongs to rangeTableEntry with rteIndex + */ +static List * +FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr, Index rteIndex) +{ + if (!OperatorImplementsEquality(opExpr->opno)) + { + return NIL; + } + + List *attributeNums = NIL; + Var *var = NULL; + if (VarConstOpExprClause(opExpr, &var, NULL) && var->varno == rteIndex) + { + attributeNums = lappend_int(attributeNums, var->varattno); + } + return attributeNums; +} + + +/* + * FetchEqualityAttrNumsForRTEBoolExpr fetches the attribute numbers from boolExpr + * which: + * - has equality operator + * - belongs to rangeTableEntry with rteIndex + */ +static List * +FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr, Index rteIndex) +{ + if (boolExpr->boolop != AND_EXPR && boolExpr->boolop != OR_EXPR) + { + return NIL; + } + + List *attributeNums = NIL; + bool hasEquality = true; + Node *arg = NULL; + foreach_ptr(arg, boolExpr->args) + { + List *attributeNumsInSubExpression = FetchEqualityAttrNumsForRTEFromQuals(arg, + rteIndex); + if (boolExpr->boolop == AND_EXPR) + { + hasEquality |= list_length(attributeNumsInSubExpression) > 0; + } + else if (boolExpr->boolop == OR_EXPR) + { + hasEquality &= list_length(attributeNumsInSubExpression) > 0; + } + attributeNums = list_concat(attributeNums, attributeNumsInSubExpression); + } + if (hasEquality) + { + return attributeNums; + } + return NIL; +} + + /* * JoinSequenceArray walks over the join nodes in the job query and constructs a join * sequence containing an entry for each joined table. The function then returns an diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 6b9858e23..4869ef7cf 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -49,6 +49,7 @@ #include "distributed/reference_table_utils.h" #include "distributed/relation_restriction_equivalence.h" #include "distributed/relay_utility.h" +#include "distributed/recursive_planning.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/shard_pruning.h" @@ -181,7 +182,6 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType taskAssignmentPolicy, List *placementList); -static bool IsLocalOrCitusLocalTable(Oid relationId); /* * CreateRouterPlan attempts to create a router executor plan for the given @@ -295,6 +295,32 @@ CreateSingleTaskRouterSelectPlan(DistributedPlan *distributedPlan, Query *origin } +/* + * IsRouterPlannable returns true if the given query can be planned by + * router planner. + */ +bool +IsRouterPlannable(Query *query, PlannerRestrictionContext *plannerRestrictionContext) +{ + /* copy the query as the following methods can change the underlying query */ + Query *copyQuery = copyObject(query); + DeferredErrorMessage *deferredErrorMessage = NULL; + if (copyQuery->commandType == CMD_SELECT) + { + deferredErrorMessage = MultiRouterPlannableQuery(copyQuery); + } + if (deferredErrorMessage) + { + return false; + } + + /* TODO:: we might not need this copy*/ + copyQuery = copyObject(query); + RouterJob(copyQuery, plannerRestrictionContext, &deferredErrorMessage); + return deferredErrorMessage == NULL; +} + + /* * ShardIntervalOpExpressions returns a list of OpExprs with exactly two * items in it. The list consists of shard interval ranges with partition columns @@ -511,8 +537,6 @@ IsTidColumn(Node *node) } -#include "distributed/recursive_planning.h" - /* * ModifyPartialQuerySupported implements a subset of what ModifyQuerySupported checks, * that subset being what's necessary to check modifying CTEs for. @@ -522,24 +546,25 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, Oid *distributedTableIdOutput) { DeferredErrorMessage *deferredError = DeferErrorIfModifyView(queryTree); - if (deferredError != NULL) { + if (deferredError != NULL) + { return deferredError; } uint32 rangeTableId = 1; CmdType commandType = queryTree->commandType; - Oid distributedTableId = ModifyQueryResultRelationId(queryTree); - *distributedTableIdOutput = distributedTableId; - if (ContainsTableToBeConvertedToSubquery(queryTree->rtable, distributedTableId)) + Oid resultRelationId = ModifyQueryResultRelationId(queryTree); + *distributedTableIdOutput = resultRelationId; + if (ContainsTableToBeConvertedToSubquery(queryTree->rtable, resultRelationId)) { return deferredError; } Var *partitionColumn = NULL; - if (IsCitusTable(distributedTableId)) + if (IsCitusTable(resultRelationId)) { - partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + partitionColumn = PartitionColumn(resultRelationId, rangeTableId); } deferredError = DeferErrorIfModifyView(queryTree); @@ -633,12 +658,12 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, } } - distributedTableId = ModifyQueryResultRelationId(queryTree); + resultRelationId = ModifyQueryResultRelationId(queryTree); rangeTableId = 1; - if (IsCitusTable(distributedTableId)) + if (IsCitusTable(resultRelationId)) { - partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + partitionColumn = PartitionColumn(resultRelationId, rangeTableId); } commandType = queryTree->commandType; if (commandType == CMD_INSERT || commandType == CMD_UPDATE || @@ -768,18 +793,11 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, /* set it for caller to use when we don't return any errors */ - *distributedTableIdOutput = distributedTableId; + *distributedTableIdOutput = resultRelationId; return NULL; } -static bool IsLocalOrCitusLocalTable(Oid relationId) { - if (!IsCitusTable(relationId)) { - return true; - } - return IsCitusTableType(relationId, CITUS_LOCAL_TABLE); -} - /* * NodeIsFieldStore returns true if given Node is a FieldStore object. @@ -903,7 +921,7 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer List *rangeTableList = NIL; uint32 queryTableCount = 0; CmdType commandType = queryTree->commandType; -bool fastPathRouterQuery = + bool fastPathRouterQuery = plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; /* @@ -941,6 +959,14 @@ bool fastPathRouterQuery = { ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList); } + RangeTblEntry *resultRte = ExtractResultRelationRTE(queryTree); + Oid resultRelationId = InvalidOid; + if (resultRte) + { + resultRelationId = resultRte->relid; + } + bool containsTableToBeConvertedToSubquery = + ContainsTableToBeConvertedToSubquery(queryTree->rtable, resultRelationId); RangeTblEntry *rangeTableEntry = NULL; foreach_ptr(rangeTableEntry, rangeTableList) @@ -965,25 +991,22 @@ bool fastPathRouterQuery = /* for other kinds of relations, check if its distributed */ else { - RangeTblEntry *resultRte = ExtractResultRelationRTE(queryTree); - Oid resultRelationId = InvalidOid; - if (resultRte) { - resultRelationId = resultRte->relid; - } if (IsLocalOrCitusLocalTable(rangeTableEntry->relid) && - ContainsTableToBeConvertedToSubquery(queryTree->rtable, resultRelationId) - ) - { + containsTableToBeConvertedToSubquery) + { StringInfo errorMessage = makeStringInfo(); char *relationName = get_rel_name(rangeTableEntry->relid); - if (IsCitusTable(rangeTableEntry->relid)) { - appendStringInfo(errorMessage, "citus local table %s cannot be used in this join", - relationName); - }else { - appendStringInfo(errorMessage, "relation %s is not distributed", - relationName); + if (IsCitusTable(rangeTableEntry->relid)) + { + appendStringInfo(errorMessage, + "citus local table %s cannot be joined with these distributed tables", + relationName); + } + else + { + appendStringInfo(errorMessage, "relation %s is not distributed", + relationName); } - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, NULL, NULL); } diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 390209fc0..f1c821690 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -163,14 +163,14 @@ static bool ShouldRecursivelyPlanSubquery(Query *subquery, static bool AllDistributionKeysInSubqueryAreEqual(Query *subquery, PlannerRestrictionContext * restrictionContext); -static bool AllDataLocallyAccessible(List *rangeTableList); +static bool AllDataLocallyAccessible(List *rangeTableList); static bool ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *context); +static void RecursivelyPlanSubquery(Query *subquery, + RecursivePlanningContext *planningContext); static void RecursivelyPlanSetOperations(Query *query, Node *node, RecursivePlanningContext *context); static bool IsLocalTableRteOrMatView(Node *node); -static void RecursivelyPlanSubquery(Query *subquery, - RecursivePlanningContext *planningContext); static DistributedSubPlan * CreateDistributedSubPlan(uint32 subPlanId, Query *subPlanQuery); static bool CteReferenceListWalker(Node *node, CteReferenceWalkerContext *context); @@ -290,12 +290,6 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context /* make sure function calls in joins are executed in the coordinator */ WrapFunctionsInSubqueries(query); - /* - * Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", so we - * recursively plan one side of the join so that the logical planner can plan. - */ - ConvertLocalTableJoinsToSubqueries(query, context); - /* descend into subqueries */ query_tree_walker(query, RecursivelyPlanSubqueryWalker, context, 0); @@ -346,6 +340,13 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context RecursivelyPlanNonColocatedSubqueries(query, context); } + /* + * Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", or + * a query with local table/citus local table and subquery. We convert local/citus local + * tables to a subquery until they can be planned + */ + ConvertUnplannableTableJoinsToSubqueries(query, context); + return NULL; } @@ -1346,13 +1347,15 @@ NodeContainsSubqueryReferencingOuterQuery(Node *node) return false; } + /* * ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry * with a subquery. The function also pushes down the filters to the subquery. */ void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList, - List *requiredAttrNumbers) + List *requiredAttrNumbers, + RecursivePlanningContext *context) { Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers); Expr *andedBoundExpressions = make_ands_explicit(restrictionList); @@ -1365,7 +1368,6 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict rangeTableEntry->rtekind = RTE_SUBQUERY; rangeTableEntry->subquery = subquery; - /* * If the relation is inherited, it'll still be inherited as * we've copied it earlier. This is to prevent the newly created @@ -1383,16 +1385,26 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict get_rel_name(rangeTableEntry->relid), ApplyLogRedaction(subqueryString->data)))); } + RecursivelyPlanSubquery(rangeTableEntry->subquery, context); } -bool ContainsTableToBeConvertedToSubquery(List* rangeTableList, Oid resultRelationId) { - if (AllDataLocallyAccessible(rangeTableList)) { + +/* + * ContainsTableToBeConvertedToSubquery checks if the given range table list contains + * any table that should be converted to a subquery, which otherwise is not plannable. + */ +bool +ContainsTableToBeConvertedToSubquery(List *rangeTableList, Oid resultRelationId) +{ + if (AllDataLocallyAccessible(rangeTableList)) + { return false; } - return ContainsLocalTableDistributedTableJoin(rangeTableList) || - ContainsLocalTableSubqueryJoin(rangeTableList, resultRelationId); + return ContainsLocalTableDistributedTableJoin(rangeTableList) || + ContainsLocalTableSubqueryJoin(rangeTableList, resultRelationId); } + /* * AllDataLocallyAccessible return true if all data for the relations in the * rangeTableList is locally accessible. @@ -1400,11 +1412,12 @@ bool ContainsTableToBeConvertedToSubquery(List* rangeTableList, Oid resultRelati static bool AllDataLocallyAccessible(List *rangeTableList) { - RangeTblEntry* rangeTableEntry = NULL; + RangeTblEntry *rangeTableEntry = NULL; foreach_ptr(rangeTableEntry, rangeTableList) { - if (rangeTableEntry->rtekind == RTE_SUBQUERY) { - // TODO:: check if it has distributed table + if (rangeTableEntry->rtekind == RTE_SUBQUERY) + { + /* TODO:: check if it has distributed table */ return false; } if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) @@ -1442,18 +1455,23 @@ AllDataLocallyAccessible(List *rangeTableList) return true; } + /* * SubqueryConvertableRelationForJoin returns true if the given range table entry * is a relation type that can be converted to a subquery. */ -bool SubqueryConvertableRelationForJoin(RangeTblEntry* rangeTableEntry) { - if (rangeTableEntry->rtekind != RTE_RELATION) { +bool +SubqueryConvertableRelationForJoin(RangeTblEntry *rangeTableEntry) +{ + if (rangeTableEntry->rtekind != RTE_RELATION) + { return false; } return rangeTableEntry->relkind == RELKIND_PARTITIONED_TABLE || - rangeTableEntry->relkind == RELKIND_RELATION; + rangeTableEntry->relkind == RELKIND_RELATION; } + /* * ContainsLocalTableDistributedTableJoin returns true if the input range table list * contains a direct join between local and distributed tables. @@ -1474,11 +1492,13 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList) continue; } - if (IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE) || IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE)) + if (IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE) || + IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE)) { containsDistributedTable = true; } - else if (IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE) || !IsCitusTable(rangeTableEntry->relid)) + else if (IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE) || + !IsCitusTable(rangeTableEntry->relid)) { /* we consider citus local tables as local table */ containsLocalTable = true; @@ -1504,7 +1524,8 @@ ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - if (rangeTableEntry->rtekind == RTE_SUBQUERY) { + if (rangeTableEntry->rtekind == RTE_SUBQUERY) + { containsSubquery = true; } @@ -1513,7 +1534,8 @@ ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId) continue; } - if (!IsCitusTable(rangeTableEntry->relid) && rangeTableEntry->relid != resultRelationId) + if (!IsCitusTable(rangeTableEntry->relid) && rangeTableEntry->relid != + resultRelationId) { containsLocalTable = true; } @@ -1522,6 +1544,7 @@ ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId) return containsLocalTable && containsSubquery; } + /* * WrapFunctionsInSubqueries iterates over all the immediate Range Table Entries * of a query and wraps the functions inside (SELECT * FROM fnc() f) @@ -1645,7 +1668,6 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry) subquery->targetList = lappend(subquery->targetList, targetEntry); } } - /* * If tupleDesc is NULL we have 2 different cases: * @@ -1695,7 +1717,6 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry) columnType = list_nth_oid(rangeTblFunction->funccoltypes, targetColumnIndex); } - /* use the types in the function definition otherwise */ else { diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index e138d9389..a744e80ff 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -1818,8 +1818,8 @@ FilterPlannerRestrictionForQuery(PlannerRestrictionContext *plannerRestrictionCo /* allocate the filtered planner restriction context and set all the fields */ PlannerRestrictionContext *filteredPlannerRestrictionContext = palloc0( sizeof(PlannerRestrictionContext)); - filteredPlannerRestrictionContext->fastPathRestrictionContext = - palloc0(sizeof(FastPathRestrictionContext)); + filteredPlannerRestrictionContext->fastPathRestrictionContext = palloc0( + sizeof(FastPathRestrictionContext)); filteredPlannerRestrictionContext->memoryContext = plannerRestrictionContext->memoryContext; @@ -1882,10 +1882,9 @@ GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, } List *restrictExprList = NIL; - ListCell *restrictCell = NULL; - foreach(restrictCell, baseRestrictInfo) + RestrictInfo *restrictInfo = NULL; + foreach_ptr(restrictInfo, baseRestrictInfo) { - RestrictInfo *restrictInfo = (RestrictInfo *) lfirst(restrictCell); Expr *restrictionClause = restrictInfo->clause; /* we cannot process Params beacuse they are not known at this point */ @@ -1912,10 +1911,9 @@ GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, Expr *copyOfRestrictClause = (Expr *) copyObject((Node *) restrictionClause); List *varClauses = pull_var_clause_default((Node *) copyOfRestrictClause); ListCell *varClauseCell = NULL; - foreach(varClauseCell, varClauses) + Var *column = NULL; + foreach_ptr(column, varClauses) { - Var *column = (Var *) lfirst(varClauseCell); - column->varno = rteIndex; column->varnosyn = rteIndex; } diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 3461e2246..e24b47757 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -186,8 +186,13 @@ extern List * PostprocessIndexStmt(Node *node, const char *queryString); extern void ErrorIfUnsupportedAlterIndexStmt(AlterTableStmt *alterTableStatement); extern void MarkIndexValid(IndexStmt *indexStmt); +<<<<<<< HEAD extern List * ExecuteFunctionOnEachTableIndex(Oid relationId, PGIndexProcessor pgIndexProcessor); +======= +extern List * ExecuteFunctionOnEachTableIndex(Oid relationId, IndexProcesor + indexProcessor); +>>>>>>> Increase readability of the current structure /* objectaddress.c - forward declarations */ extern ObjectAddress CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok); diff --git a/src/include/distributed/local_distributed_join_planner.h b/src/include/distributed/local_distributed_join_planner.h index 85d15b4ea..1fc7269a6 100644 --- a/src/include/distributed/local_distributed_join_planner.h +++ b/src/include/distributed/local_distributed_join_planner.h @@ -17,8 +17,7 @@ #include "distributed/recursive_planning.h" -extern void -ConvertLocalTableJoinsToSubqueries(Query *query, - RecursivePlanningContext *context); +extern void ConvertUnplannableTableJoinsToSubqueries(Query *query, + RecursivePlanningContext *context); #endif /* LOCAL_DISTRIBUTED_JOIN_PLANNER_H */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 7303e55fe..50d9bf251 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -134,6 +134,8 @@ typedef enum ANY_CITUS_TABLE_TYPE } CitusTableType; + +extern bool IsLocalOrCitusLocalTable(Oid relationId); extern bool IsCitusTableType(Oid relationId, CitusTableType tableType); extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry, CitusTableType tableType); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 2f0abb26b..24e60facd 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -589,6 +589,6 @@ extern RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *column List *funcColumnTypeMods, List *funcCollations); -extern List* FetchAttributeNumsForRTEFromQuals(Node* quals, Index rteIndex); +extern List * FetchEqualityAttrNumsForRTEFromQuals(Node *quals, Index rteIndex); #endif /* MULTI_PHYSICAL_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 18278bda5..7dff1015c 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -85,6 +85,8 @@ extern List * TargetShardIntervalForFastPathQuery(Query *query, extern void GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, List *placementList, uint64 shardId); +extern bool IsRouterPlannable(Query *query, + PlannerRestrictionContext *plannerRestrictionContext); /* * FastPathPlanner is a subset of router planner, that's why we prefer to diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index e4b46745f..6dd146482 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -62,10 +62,11 @@ extern bool GeneratingSubplans(void); extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList); extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList, - List *requiredAttrNumbers); -extern bool -ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId); -extern bool ContainsTableToBeConvertedToSubquery(List* rangeTableList, Oid resultRelationId); -extern bool SubqueryConvertableRelationForJoin(RangeTblEntry* rangeTableEntry); + List *requiredAttrNumbers, + RecursivePlanningContext *context); +extern bool ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId); +extern bool ContainsTableToBeConvertedToSubquery(List *rangeTableList, Oid + resultRelationId); +extern bool SubqueryConvertableRelationForJoin(RangeTblEntry *rangeTableEntry); #endif /* RECURSIVE_PLANNING_H */ diff --git a/src/test/regress/expected/local_table_join.out b/src/test/regress/expected/local_table_join.out index 95af74534..2dacb8db8 100644 --- a/src/test/regress/expected/local_table_join.out +++ b/src/test/regress/expected/local_table_join.out @@ -271,8 +271,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key = ( SELECT count(*) FROM distributed_table_pkey ); -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS count FROM local_table_join.distributed_table_pkey +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR (distributed_table_pkey.key OPERATOR(pg_catalog.=) (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)))))) count @@ -316,6 +316,34 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON postgres_table.key = 10; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON ((postgres_table.key OPERATOR(pg_catalog.=) 10))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN (SELECT * FROM distributed_table) d1 USING(key); +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN (SELECT distributed_table.key, distributed_table.value, distributed_table.value_2 FROM local_table_join.distributed_table) d1 USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- since this is already router plannable, we don't recursively plan the postgres table +SELECT count(*) FROM postgres_table JOIN (SELECT * FROM distributed_table LIMIT 1) d1 USING(key); +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.distributed_table LIMIT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1 USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + -- a unique index on key so dist table should be recursively planned SELECT count(*) FROM postgres_table JOIN distributed_table_windex USING(key); DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 @@ -395,8 +423,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c -- only local tables are recursively planned SELECT count(*) FROM distributed_table d1 JOIN postgres_table p1 USING(key) JOIN distributed_table d2 USING(key) JOIN postgres_table p2 USING(key); DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.distributed_table d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1 USING (key)) JOIN local_table_join.distributed_table d2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 USING (key)) count @@ -411,8 +439,8 @@ FROM WHERE d1.value = '1'; DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.distributed_table d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1 USING (key)) JOIN local_table_join.distributed_table d2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 USING (key)) WHERE (d1.value OPERATOR(pg_catalog.=) '1'::text) count @@ -429,8 +457,8 @@ FROM WHERE d1.key = 1; DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.distributed_table d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1 USING (key)) JOIN local_table_join.distributed_table d2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 USING (key)) WHERE (d1.key OPERATOR(pg_catalog.=) 1) count @@ -586,10 +614,30 @@ FROM WHERE distributed_table.key = p1.key AND p1.key = p2.key; DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1, (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 WHERE ((distributed_table.key OPERATOR(pg_catalog.=) p1.key) AND (p1.key OPERATOR(pg_catalog.=) p2.key)) +UPDATE + postgres_table +SET + value = 'test' +FROM + (SELECT * FROM distributed_table) d1 +WHERE + d1.key = postgres_table.key; +ERROR: relation postgres_table is not distributed +UPDATE + postgres_table +SET + value = 'test' +FROM + (SELECT * FROM distributed_table LIMIT 1) d1 +WHERE + d1.key = postgres_table.key; +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.distributed_table LIMIT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1 WHERE (d1.key OPERATOR(pg_catalog.=) postgres_table.key) UPDATE distributed_table SET @@ -612,8 +660,8 @@ FROM WHERE postgres_table.key = d1.key AND d1.key = d2.key; DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d1 WHERE true OFFSET 0 -DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d1 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d2 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1, (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d2 WHERE ((postgres_table.key OPERATOR(pg_catalog.=) d1.key) AND (d1.key OPERATOR(pg_catalog.=) d2.key)) -- currently can't plan subquery-local table join @@ -723,8 +771,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c SELECT count(*) FROM citus_local JOIN distributed_table USING(key) JOIN postgres_table USING (key) JOIN reference_table USING(key); DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 -DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (key)) JOIN local_table_join.postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) reference_table USING (key)) count @@ -735,8 +783,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key) JOIN citus_local USING(key) WHERE distributed_partitioned_table.key > 10 and distributed_partitioned_table.key = 10; DEBUG: Wrapping local relation "distributed_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.distributed_partitioned_table WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 -DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.distributed_partitioned_table WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 +DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) distributed_partitioned_table JOIN local_table_join.postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) reference_table USING (key)) JOIN local_table_join.citus_local USING (key)) WHERE ((distributed_partitioned_table.key OPERATOR(pg_catalog.>) 10) AND (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10)) count diff --git a/src/test/regress/sql/local_table_join.sql b/src/test/regress/sql/local_table_join.sql index 517a529cf..0fb65d824 100644 --- a/src/test/regress/sql/local_table_join.sql +++ b/src/test/regress/sql/local_table_join.sql @@ -45,7 +45,7 @@ SELECT count(*) FROM postgres_table JOIN reference_table USING(key); -- switch back to the default policy, which is auto SET citus.local_table_join_policy to 'auto'; --- on the auto mode, the local tables should be recursively planned +-- on the auto mode, the local tables should be recursively planned -- unless a unique index exists in a column for distributed table SELECT count(*) FROM distributed_table JOIN postgres_table USING(key); SELECT count(*) FROM reference_table JOIN postgres_table USING(key); @@ -80,6 +80,12 @@ SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_t SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key > 10 and distributed_table_pkey.value = 'notext'); SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key = 10 and distributed_table_pkey.value = 'notext'); +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON postgres_table.key = 10; + + +SELECT count(*) FROM postgres_table JOIN (SELECT * FROM distributed_table) d1 USING(key); +-- since this is already router plannable, we don't recursively plan the postgres table +SELECT count(*) FROM postgres_table JOIN (SELECT * FROM distributed_table LIMIT 1) d1 USING(key); -- a unique index on key so dist table should be recursively planned SELECT count(*) FROM postgres_table JOIN distributed_table_windex USING(key); @@ -93,7 +99,7 @@ SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE dist SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.key = 1; --- if both local and distributed tables have a filter, we prefer local unless distributed table has unique indexes on any equality filter +-- if both local and distributed tables have a filter, we prefer local unless distributed table has unique indexes on any equality filter SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' AND postgres_table.value = 'test'; SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' OR postgres_table.value = 'test'; @@ -158,9 +164,9 @@ SET FROM postgres_table WHERE - distributed_table_windex.key = postgres_table.key; + distributed_table_windex.key = postgres_table.key; --- in case of update/delete we always recursively plan +-- in case of update/delete we always recursively plan -- the tables other than target table no matter what the policy is SET citus.local_table_join_policy TO 'prefer-local'; @@ -200,7 +206,7 @@ SET FROM postgres_table WHERE - distributed_table_windex.key = postgres_table.key; + distributed_table_windex.key = postgres_table.key; SET citus.local_table_join_policy TO 'prefer-distributed'; @@ -240,7 +246,7 @@ SET FROM postgres_table WHERE - distributed_table_windex.key = postgres_table.key; + distributed_table_windex.key = postgres_table.key; -- modifications with multiple tables UPDATE @@ -252,6 +258,23 @@ FROM WHERE distributed_table.key = p1.key AND p1.key = p2.key; +UPDATE + postgres_table +SET + value = 'test' +FROM + (SELECT * FROM distributed_table) d1 +WHERE + d1.key = postgres_table.key; + +UPDATE + postgres_table +SET + value = 'test' +FROM + (SELECT * FROM distributed_table LIMIT 1) d1 +WHERE + d1.key = postgres_table.key; UPDATE distributed_table @@ -274,8 +297,8 @@ WHERE postgres_table.key = d1.key AND d1.key = d2.key; -- currently can't plan subquery-local table join -SELECT count(*) -FROM +SELECT count(*) +FROM (SELECT * FROM (SELECT * FROM distributed_table) d1) d2 JOIN postgres_table USING(key); @@ -318,7 +341,7 @@ SET FROM citus_local WHERE - distributed_table_windex.key = citus_local.key; + distributed_table_windex.key = citus_local.key; UPDATE citus_local @@ -327,7 +350,7 @@ SET FROM distributed_table_windex WHERE - distributed_table_windex.key = citus_local.key; + distributed_table_windex.key = citus_local.key; DROP TABLE citus_local; RESET client_min_messages;