diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index 7a9c5c9ad..cf68b90f6 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -249,7 +249,15 @@ CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement) return namespaceId; } -List* ExecuteFunctionOnEachTableIndex(Oid relationId, IndexProcesor indexProcessor) { + +/* + * ExecuteFunctionOnEachTableIndex executes the given indexProcessor function on each + * index of the given relation. + * It returns a list that is filled by the indexProcessor. + */ +List * +ExecuteFunctionOnEachTableIndex(Oid relationId, IndexProcesor indexProcessor) +{ List *result = NIL; ScanKeyData scanKey[1]; int scanKeyCount = 1; @@ -285,6 +293,7 @@ List* ExecuteFunctionOnEachTableIndex(Oid relationId, IndexProcesor indexProcess return result; } + /* * ExecuteFunctionOnEachTableIndex executes the given pgIndexProcessor function on each * index of the given relation. diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 1e5ebac6a..83fbad1bc 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -293,6 +293,21 @@ EnsureModificationsCanRun(void) } +/* + * IsLocalOrCitusLocalTable returns true if the given relation + * is either a local or citus local table. + */ +bool +IsLocalOrCitusLocalTable(Oid relationId) +{ + if (!IsCitusTable(relationId)) + { + return true; + } + return IsCitusTableType(relationId, CITUS_LOCAL_TABLE); +} + + /* * IsCitusTableType returns true if the given table with relationId * belongs to a citus table that matches the given table type. If cache diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 0eb23f6a5..4a23a5052 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -729,6 +729,7 @@ GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLE } } + /* * IndexImpliedByAConstraint is a helper function to be used while scanning * pg_index. It returns true if the index identified by the given indexForm is diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index 72499a93c..680fb7b3a 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -54,212 +54,398 @@ #include "utils/guc.h" #include "utils/lsyscache.h" -typedef struct RTEToSubqueryConverterReference { - RangeTblEntry* rangeTableEntry; +typedef struct RTEToSubqueryConverterReference +{ + RangeTblEntry *rangeTableEntry; Index rteIndex; - List* restrictionList; - List* requiredAttributeNumbers; + List *restrictionList; + List *requiredAttributeNumbers; } RTEToSubqueryConverterReference; -typedef struct RTEToSubqueryConverterContext{ - List* distributedTableList; /* reference or distributed table */ - List* localTableList; - List* citusLocalTableList; +typedef struct RTEToSubqueryConverterContext +{ + List *distributedTableList; /* reference or distributed table */ + List *localTableList; /* local or citus local table */ bool hasSubqueryRTE; }RTEToSubqueryConverterContext; -static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList, Oid resultRelationId); -static bool HasUniqueFilter(RangeTblEntry* distRTE, List* distRTERestrictionList, List* requiredAttrNumbersForDistRTE); -static bool AutoConvertLocalTableJoinToSubquery(FromExpr* joinTree, - RTEToSubqueryConverterReference* distRTEContext); +static Oid GetResultRelationId(Query *query); +static Oid GetRTEToSubqueryConverterReferenceRelId( + RTEToSubqueryConverterReference *rteToSubqueryConverterReference); +static bool ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid + resultRelationId); +static bool HasUniqueFilter(RangeTblEntry *distRTE, List *distRTERestrictionList, + List *requiredAttrNumbersForDistRTE); +static bool ShouldConvertDistributedTable(FromExpr *joinTree, + RTEToSubqueryConverterReference *distRTEContext); static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, RecursivePlanningContext *planningContext); -static RTEToSubqueryConverterContext * CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context, - List *rangeTableList); -static void GetAllUniqueIndexes(Form_pg_index indexForm, List** uniqueIndexes); -static RTEToSubqueryConverterReference* - GetNextRTEToConvertToSubquery(FromExpr* joinTree, RTEToSubqueryConverterContext* rteToSubqueryConverterContext, - PlannerRestrictionContext *plannerRestrictionContext, RangeTblEntry* resultRelation); -static void PopFromRTEToSubqueryConverterContext(RTEToSubqueryConverterContext* rteToSubqueryConverterContext, - bool isCitusLocalTable); +static RTEToSubqueryConverterContext * CreateRTEToSubqueryConverterContext( + RecursivePlanningContext *context, + List * + rangeTableList); +static void GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes); +static RTEToSubqueryConverterReference * GetNextRTEToConvertToSubquery(FromExpr *joinTree, + RTEToSubqueryConverterContext + * + rteToSubqueryConverterContext, + PlannerRestrictionContext + * + plannerRestrictionContext, + Oid + resultRelationId); +static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList, + List **joinRangeTableEntries); +static void RemoveFromRTEToSubqueryConverterContext( + RTEToSubqueryConverterContext *rteToSubqueryConverterContext, Oid relationId); +static bool FillLocalAndDistributedRTECandidates( + RTEToSubqueryConverterContext *rteToSubqueryConverterContext, + RTEToSubqueryConverterReference ** + localRTECandidate, + RTEToSubqueryConverterReference ** + distributedRTECandidate); + /* - * ConvertLocalTableJoinsToSubqueries gets a query and the planner - * restrictions. As long as there is a join between a local table - * and distributed table, the function wraps one table in a - * subquery (by also pushing the filters on the table down - * to the subquery). - * - * Once this function returns, there are no direct joins between - * local and distributed tables. + * ConvertUnplannableTableJoinsToSubqueries gets a query and the planner + * restrictions. As long as the query is not plannable by router planner, + * it converts either a local or distributed table to a subquery. */ void -ConvertLocalTableJoinsToSubqueries(Query *query, - RecursivePlanningContext *context) +ConvertUnplannableTableJoinsToSubqueries(Query *query, + RecursivePlanningContext *context) { - List *rangeTableList = query->rtable; - RangeTblEntry *resultRelation = ExtractResultRelationRTE(query); - Oid resultRelationId = InvalidOid; - if (resultRelation) { - resultRelationId = resultRelation->relid; - } - if (!ShouldConvertLocalTableJoinsToSubqueries(rangeTableList, resultRelationId)) { + List *rangeTableList = NIL; + GetRangeTableEntriesFromJoinTree((Node *) query->jointree, query->rtable, + &rangeTableList); + Oid resultRelationId = GetResultRelationId(query); + if (!ShouldConvertLocalTableJoinsToSubqueries(rangeTableList, resultRelationId)) + { return; } - RTEToSubqueryConverterContext* rteToSubqueryConverterContext = CreateRTEToSubqueryConverterContext( - context, rangeTableList); + RTEToSubqueryConverterContext *rteToSubqueryConverterContext = + CreateRTEToSubqueryConverterContext( + context, rangeTableList); - RTEToSubqueryConverterReference* rteToSubqueryConverterReference = - GetNextRTEToConvertToSubquery(query->jointree, rteToSubqueryConverterContext, - context->plannerRestrictionContext, resultRelation); - while (rteToSubqueryConverterReference) + RTEToSubqueryConverterReference *rteToSubqueryConverterReference = + GetNextRTEToConvertToSubquery(query->jointree, rteToSubqueryConverterContext, + context->plannerRestrictionContext, + resultRelationId); + + PlannerRestrictionContext *plannerRestrictionContext = + FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query); + while (rteToSubqueryConverterReference && !IsRouterPlannable(query, + plannerRestrictionContext)) { - ReplaceRTERelationWithRteSubquery(rteToSubqueryConverterReference->rangeTableEntry, - rteToSubqueryConverterReference->restrictionList, - rteToSubqueryConverterReference->requiredAttributeNumbers); - rteToSubqueryConverterReference = + ReplaceRTERelationWithRteSubquery( + rteToSubqueryConverterReference->rangeTableEntry, + rteToSubqueryConverterReference->restrictionList, + rteToSubqueryConverterReference-> + requiredAttributeNumbers, + context); + RemoveFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext, + rteToSubqueryConverterReference-> + rangeTableEntry->relid); + rteToSubqueryConverterReference = GetNextRTEToConvertToSubquery(query->jointree, rteToSubqueryConverterContext, - context->plannerRestrictionContext, resultRelation); + context->plannerRestrictionContext, + resultRelationId); } } -static RTEToSubqueryConverterReference* - GetNextRTEToConvertToSubquery(FromExpr* joinTree, RTEToSubqueryConverterContext* rteToSubqueryConverterContext, - PlannerRestrictionContext *plannerRestrictionContext, RangeTblEntry* resultRelation) { - RTEToSubqueryConverterReference* localRTECandidate = NULL; - RTEToSubqueryConverterReference* nonLocalRTECandidate = NULL; - bool citusLocalTableChosen = false; +/* + * GetResultRelationId gets the result relation id from query + * if it exists. + */ +static Oid +GetResultRelationId(Query *query) +{ + RangeTblEntry *resultRelation = ExtractResultRelationRTE(query); + Oid resultRelationId = InvalidOid; + if (resultRelation) + { + resultRelationId = resultRelation->relid; + } + return resultRelationId; +} - if (list_length(rteToSubqueryConverterContext->localTableList) > 0) { - localRTECandidate = linitial(rteToSubqueryConverterContext->localTableList); - }else if (list_length(rteToSubqueryConverterContext->citusLocalTableList) > 0) { - localRTECandidate = linitial(rteToSubqueryConverterContext->citusLocalTableList); - citusLocalTableChosen = true; - } - if (localRTECandidate == NULL) { - return NULL; - } - if (list_length(rteToSubqueryConverterContext->distributedTableList) > 0) { - nonLocalRTECandidate = linitial(rteToSubqueryConverterContext->distributedTableList); - } - if (nonLocalRTECandidate == NULL && !rteToSubqueryConverterContext->hasSubqueryRTE) { - return NULL; +/* + * GetRangeTableEntriesFromJoinTree gets the range table entries that are + * on the given join tree. + */ +static void +GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList, + List **joinRangeTableEntries) +{ + if (joinNode == NULL) + { + return; } + else if (IsA(joinNode, FromExpr)) + { + FromExpr *fromExpr = (FromExpr *) joinNode; + Node *fromElement; - if (resultRelation) { - if(resultRelation == localRTECandidate->rangeTableEntry) { - rteToSubqueryConverterContext->distributedTableList = list_delete_first( - rteToSubqueryConverterContext->distributedTableList - ); - return nonLocalRTECandidate; + foreach_ptr(fromElement, fromExpr->fromlist) + { + GetRangeTableEntriesFromJoinTree(fromElement, rangeTableList, + joinRangeTableEntries); } - if (resultRelation == nonLocalRTECandidate->rangeTableEntry) { - PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen); + } + else if (IsA(joinNode, JoinExpr)) + { + JoinExpr *joinExpr = (JoinExpr *) joinNode; + GetRangeTableEntriesFromJoinTree(joinExpr->larg, rangeTableList, + joinRangeTableEntries); + GetRangeTableEntriesFromJoinTree(joinExpr->rarg, rangeTableList, + joinRangeTableEntries); + } + else if (IsA(joinNode, RangeTblRef)) + { + int rangeTableIndex = ((RangeTblRef *) joinNode)->rtindex; + RangeTblEntry *rte = rt_fetch(rangeTableIndex, rangeTableList); + *joinRangeTableEntries = lappend(*joinRangeTableEntries, rte); + } + else + { + pg_unreachable(); + } +} + + +/* + * GetNextRTEToConvertToSubquery returns the range table entry + * which should be converted to a subquery. It considers the local join policy + * and result relation. + */ +static RTEToSubqueryConverterReference * +GetNextRTEToConvertToSubquery(FromExpr *joinTree, + RTEToSubqueryConverterContext *rteToSubqueryConverterContext, + PlannerRestrictionContext *plannerRestrictionContext, Oid + resultRelationId) +{ + RTEToSubqueryConverterReference *localRTECandidate = NULL; + RTEToSubqueryConverterReference *distributedRTECandidate = NULL; + if (!FillLocalAndDistributedRTECandidates(rteToSubqueryConverterContext, + &localRTECandidate, + &distributedRTECandidate)) + { + return NULL; + } + + if (OidIsValid(resultRelationId)) + { + if (resultRelationId == GetRTEToSubqueryConverterReferenceRelId( + localRTECandidate)) + { + return distributedRTECandidate; + } + if (resultRelationId == GetRTEToSubqueryConverterReferenceRelId( + distributedRTECandidate)) + { return localRTECandidate; } } - if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL) { - PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen); + if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL) + { return localRTECandidate; - }else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) { - if (nonLocalRTECandidate) { - rteToSubqueryConverterContext->distributedTableList = list_delete_first( - rteToSubqueryConverterContext->distributedTableList - ); - return nonLocalRTECandidate; - }else { - PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen); - return localRTECandidate; - } - - }else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_AUTO) { - bool shouldConvertNonLocalTable = AutoConvertLocalTableJoinToSubquery(joinTree, nonLocalRTECandidate); - if (shouldConvertNonLocalTable) { - rteToSubqueryConverterContext->distributedTableList = list_delete_first( - rteToSubqueryConverterContext->distributedTableList - ); - return nonLocalRTECandidate; - }else { - PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen); - return localRTECandidate; - } - }else { - elog(ERROR, "unexpected local table join policy: %d", LocalTableJoinPolicy); } - return NULL; + else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) + { + if (distributedRTECandidate) + { + return distributedRTECandidate; + } + else + { + return localRTECandidate; + } + } + else + { + if (ShouldConvertDistributedTable(joinTree, distributedRTECandidate)) + { + return distributedRTECandidate; + } + else + { + return localRTECandidate; + } + } } -static void PopFromRTEToSubqueryConverterContext(RTEToSubqueryConverterContext* rteToSubqueryConverterContext, - bool isCitusLocalTable) { - if (isCitusLocalTable) { - rteToSubqueryConverterContext->citusLocalTableList = - list_delete_first(rteToSubqueryConverterContext->citusLocalTableList); - }else { - rteToSubqueryConverterContext->localTableList = + +/* + * FillLocalAndDistributedRTECandidates fills the local and distributed RTE candidates. + * It returns true if we should continue converting tables to subqueries. + */ +static bool +FillLocalAndDistributedRTECandidates( + RTEToSubqueryConverterContext *rteToSubqueryConverterContext, + RTEToSubqueryConverterReference **localRTECandidate, + RTEToSubqueryConverterReference ** + distributedRTECandidate) +{ + if (list_length(rteToSubqueryConverterContext->localTableList) > 0) + { + *localRTECandidate = linitial(rteToSubqueryConverterContext->localTableList); + } + if (*localRTECandidate == NULL) + { + return false; + } + + if (list_length(rteToSubqueryConverterContext->distributedTableList) > 0) + { + *distributedRTECandidate = linitial( + rteToSubqueryConverterContext->distributedTableList); + } + return *distributedRTECandidate != NULL || + rteToSubqueryConverterContext->hasSubqueryRTE; +} + + +/* + * GetRTEToSubqueryConverterReferenceRelId returns the underlying relation id + * if it is a valid one. + */ +static Oid +GetRTEToSubqueryConverterReferenceRelId( + RTEToSubqueryConverterReference *rteToSubqueryConverterReference) +{ + if (rteToSubqueryConverterReference && + rteToSubqueryConverterReference->rangeTableEntry) + { + return rteToSubqueryConverterReference->rangeTableEntry->relid; + } + return InvalidOid; +} + + +/* + * RemoveFromRTEToSubqueryConverterContext removes an element from + * the relevant list based on the relation id. + */ +static void +RemoveFromRTEToSubqueryConverterContext( + RTEToSubqueryConverterContext *rteToSubqueryConverterContext, Oid relationId) +{ + if (IsLocalOrCitusLocalTable(relationId)) + { + rteToSubqueryConverterContext->localTableList = list_delete_first(rteToSubqueryConverterContext->localTableList); } + else + { + rteToSubqueryConverterContext->distributedTableList = + list_delete_first(rteToSubqueryConverterContext->distributedTableList); + } } + /* * ShouldConvertLocalTableJoinsToSubqueries returns true if we should * convert local-dist table joins to subqueries. */ -static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList, Oid resultRelationId) { +static bool +ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid resultRelationId) +{ if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER) { /* user doesn't want Citus to enable local table joins */ return false; } - if (!ContainsTableToBeConvertedToSubquery(rangeTableList, resultRelationId)) { + if (!ContainsTableToBeConvertedToSubquery(rangeTableList, resultRelationId)) + { return false; } return true; } -static bool AutoConvertLocalTableJoinToSubquery(FromExpr* joinTree, - RTEToSubqueryConverterReference* rteToSubqueryConverterReference) { - if (rteToSubqueryConverterReference == NULL) { + +/* + * ShouldConvertDistributedTable returns true if we should convert the + * distributed table rte to a subquery. This will be the case if the distributed + * table has a unique index on a column that appears in filter. + */ +static bool +ShouldConvertDistributedTable(FromExpr *joinTree, + RTEToSubqueryConverterReference * + rteToSubqueryConverterReference) +{ + if (rteToSubqueryConverterReference == NULL) + { return false; } - List* distRTEEqualityQuals = - FetchAttributeNumsForRTEFromQuals(joinTree->quals, rteToSubqueryConverterReference->rteIndex); + List *distRTEEqualityQuals = + FetchEqualityAttrNumsForRTEFromQuals(joinTree->quals, + rteToSubqueryConverterReference->rteIndex); - Node* join = NULL; - foreach_ptr(join, joinTree->fromlist) { - if (IsA(join, JoinExpr)) { - JoinExpr* joinExpr = (JoinExpr*) join; - distRTEEqualityQuals = list_concat(distRTEEqualityQuals, - FetchAttributeNumsForRTEFromQuals(joinExpr->quals, rteToSubqueryConverterReference->rteIndex) - ); + Node *join = NULL; + foreach_ptr(join, joinTree->fromlist) + { + if (IsA(join, JoinExpr)) + { + JoinExpr *joinExpr = (JoinExpr *) join; + distRTEEqualityQuals = list_concat(distRTEEqualityQuals, + FetchEqualityAttrNumsForRTEFromQuals( + joinExpr->quals, + rteToSubqueryConverterReference-> + rteIndex) + ); } } - bool hasUniqueFilter = HasUniqueFilter(rteToSubqueryConverterReference->rangeTableEntry, - rteToSubqueryConverterReference->restrictionList, distRTEEqualityQuals); - return hasUniqueFilter; - + bool hasUniqueFilter = HasUniqueFilter( + rteToSubqueryConverterReference->rangeTableEntry, + rteToSubqueryConverterReference-> + restrictionList, distRTEEqualityQuals); + return hasUniqueFilter; } -static bool HasUniqueFilter(RangeTblEntry* distRTE, List* distRTERestrictionList, List* requiredAttrNumbersForDistRTE) { - List* uniqueIndexes = ExecuteFunctionOnEachTableIndex(distRTE->relid, GetAllUniqueIndexes); - int columnNumber = 0; - foreach_int(columnNumber, uniqueIndexes) { - if (list_member_int(requiredAttrNumbersForDistRTE, columnNumber)) { - return true; - } - } - return false; + +/* + * HasUniqueFilter returns true if the given RTE has a unique filter + * on a column, which is a member of the given requiredAttrNumbersForDistRTE. + */ +static bool +HasUniqueFilter(RangeTblEntry *distRTE, List *distRTERestrictionList, + List *requiredAttrNumbersForDistRTE) +{ + List *uniqueIndexes = ExecuteFunctionOnEachTableIndex(distRTE->relid, + GetAllUniqueIndexes); + int columnNumber = 0; + foreach_int(columnNumber, uniqueIndexes) + { + if (list_member_int(requiredAttrNumbersForDistRTE, columnNumber)) + { + return true; + } + } + return false; } -static void GetAllUniqueIndexes(Form_pg_index indexForm, List** uniqueIndexes) { - if (indexForm->indisunique || indexForm->indisprimary) { - for(int i = 0; i < indexForm->indkey.dim1; i++) { - *uniqueIndexes = list_append_unique_int(*uniqueIndexes, indexForm->indkey.values[i]); - } - } + +/* + * GetAllUniqueIndexes adds the given index's column numbers if it is a + * unique index. + * TODO:: if there is a unique index on a multiple column, then we should + * probably return true only if all the columns in the index exist in the filter. + */ +static void +GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes) +{ + if (indexForm->indisunique || indexForm->indisprimary) + { + for (int i = 0; i < indexForm->indkey.dim1; i++) + { + *uniqueIndexes = list_append_unique_int(*uniqueIndexes, + indexForm->indkey.values[i]); + } + } } @@ -287,7 +473,8 @@ RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, List *filteredRelationRestrictionList = relationRestrictionContext->relationRestrictionList; - if (list_length(filteredRelationRestrictionList) == 0) { + if (list_length(filteredRelationRestrictionList) == 0) + { return NIL; } RelationRestriction *relationRestriction = @@ -324,46 +511,56 @@ RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, * The function also gets a boolean localTable parameter, so the caller * can choose to run the function for only local tables or distributed tables. */ -static RTEToSubqueryConverterContext* +static RTEToSubqueryConverterContext * CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context, - List *rangeTableList) + List *rangeTableList) { - - RTEToSubqueryConverterContext* rteToSubqueryConverterContext = palloc0(sizeof(RTEToSubqueryConverterContext)); + RTEToSubqueryConverterContext *rteToSubqueryConverterContext = palloc0( + sizeof(RTEToSubqueryConverterContext)); int rteIndex = 0; - RangeTblEntry* rangeTableEntry = NULL; + RangeTblEntry *rangeTableEntry = NULL; foreach_ptr(rangeTableEntry, rangeTableList) { rteIndex++; - if (rangeTableEntry->rtekind == RTE_SUBQUERY) { + if (rangeTableEntry->rtekind == RTE_SUBQUERY) + { rteToSubqueryConverterContext->hasSubqueryRTE = true; } + /* we're only interested in tables */ if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) { continue; } - RTEToSubqueryConverterReference* rteToSubqueryConverter = palloc(sizeof(RTEToSubqueryConverterReference)); + RTEToSubqueryConverterReference *rteToSubqueryConverter = palloc( + sizeof(RTEToSubqueryConverterReference)); rteToSubqueryConverter->rangeTableEntry = rangeTableEntry; rteToSubqueryConverter->rteIndex = rteIndex; - rteToSubqueryConverter->restrictionList = GetRestrictInfoListForRelation(rangeTableEntry, - context->plannerRestrictionContext, 1); - rteToSubqueryConverter->requiredAttributeNumbers = RequiredAttrNumbersForRelation(rangeTableEntry, context); + rteToSubqueryConverter->restrictionList = GetRestrictInfoListForRelation( + rangeTableEntry, + context-> + plannerRestrictionContext, 1); + rteToSubqueryConverter->requiredAttributeNumbers = RequiredAttrNumbersForRelation( + rangeTableEntry, context); - bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE) || - IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE); - if (referenceOrDistributedTable) { - rteToSubqueryConverterContext->distributedTableList = - lappend(rteToSubqueryConverterContext->distributedTableList, rteToSubqueryConverter); - }else if (IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE)) { - rteToSubqueryConverterContext->citusLocalTableList = - lappend(rteToSubqueryConverterContext->citusLocalTableList, rteToSubqueryConverter); - }else { - rteToSubqueryConverterContext->localTableList = - lappend(rteToSubqueryConverterContext->localTableList, rteToSubqueryConverter); + bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid, + REFERENCE_TABLE) || + IsCitusTableType(rangeTableEntry->relid, + DISTRIBUTED_TABLE); + if (referenceOrDistributedTable) + { + rteToSubqueryConverterContext->distributedTableList = + lappend(rteToSubqueryConverterContext->distributedTableList, + rteToSubqueryConverter); + } + else + { + rteToSubqueryConverterContext->localTableList = + lappend(rteToSubqueryConverterContext->localTableList, + rteToSubqueryConverter); } } return rteToSubqueryConverterContext; -} \ No newline at end of file +} diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 6e05268f9..0bc31a9f5 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -233,6 +233,9 @@ static StringInfo ColumnTypeArrayString(List *targetEntryList); static bool CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval); +static List * FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr, Index rteIndex); +static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr, Index rteIndex); + #if PG_VERSION_NUM >= PG_VERSION_13 static List * GetColumnOriginalIndexes(Oid relationId); #endif @@ -3588,8 +3591,16 @@ NodeIsRangeTblRefReferenceTable(Node *node, List *rangeTableList) return IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE); } -List* FetchAttributeNumsForRTEFromQuals(Node* quals, Index rteIndex) { - List* attributeNums = NIL; + +/* + * FetchEqualityAttrNumsForRTEFromQuals fetches the attribute numbers from quals + * which: + * - has equality operator + * - belongs to rangeTableEntry with rteIndex + */ +List * +FetchEqualityAttrNumsForRTEFromQuals(Node *quals, Index rteIndex) +{ if (quals == NULL) { return NIL; @@ -3597,48 +3608,79 @@ List* FetchAttributeNumsForRTEFromQuals(Node* quals, Index rteIndex) { if (IsA(quals, OpExpr)) { - if (!NodeIsEqualsOpExpr(quals)) - { - return NIL; - } - OpExpr *nextJoinClauseOpExpr = castNode(OpExpr, quals); - - Var* var = NULL; - if (VarConstOpExprClause(nextJoinClauseOpExpr, &var, NULL)) { - attributeNums = lappend_int(attributeNums, var->varattno); - return attributeNums; - } - + return FetchEqualityAttrNumsForRTEOpExpr((OpExpr *) quals, rteIndex); } else if (IsA(quals, BoolExpr)) { - BoolExpr *boolExpr = (BoolExpr *) quals; - - if (boolExpr->boolop != AND_EXPR && boolExpr->boolop != OR_EXPR) { - return attributeNums; - } - - bool hasEquality = true; - Node* arg = NULL; - foreach_ptr(arg, boolExpr->args) - { - List* attributeNumsInSubExpression = FetchAttributeNumsForRTEFromQuals(arg, rteIndex); - if (boolExpr->boolop == AND_EXPR) - { - hasEquality |= list_length(attributeNumsInSubExpression) > 0; - }else if (boolExpr->boolop == OR_EXPR){ - hasEquality &= list_length(attributeNumsInSubExpression) > 0; - } - attributeNums = list_concat(attributeNums, attributeNumsInSubExpression); - - } - if (hasEquality) { - return attributeNums; - } + return FetchEqualityAttrNumsForRTEBoolExpr((BoolExpr *) quals, rteIndex); } return NIL; } + +/* + * FetchEqualityAttrNumsForRTEOpExpr fetches the attribute numbers from opExpr + * which: + * - has equality operator + * - belongs to rangeTableEntry with rteIndex + */ +static List * +FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr, Index rteIndex) +{ + if (!OperatorImplementsEquality(opExpr->opno)) + { + return NIL; + } + + List *attributeNums = NIL; + Var *var = NULL; + if (VarConstOpExprClause(opExpr, &var, NULL) && var->varno == rteIndex) + { + attributeNums = lappend_int(attributeNums, var->varattno); + } + return attributeNums; +} + + +/* + * FetchEqualityAttrNumsForRTEBoolExpr fetches the attribute numbers from boolExpr + * which: + * - has equality operator + * - belongs to rangeTableEntry with rteIndex + */ +static List * +FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr, Index rteIndex) +{ + if (boolExpr->boolop != AND_EXPR && boolExpr->boolop != OR_EXPR) + { + return NIL; + } + + List *attributeNums = NIL; + bool hasEquality = true; + Node *arg = NULL; + foreach_ptr(arg, boolExpr->args) + { + List *attributeNumsInSubExpression = FetchEqualityAttrNumsForRTEFromQuals(arg, + rteIndex); + if (boolExpr->boolop == AND_EXPR) + { + hasEquality |= list_length(attributeNumsInSubExpression) > 0; + } + else if (boolExpr->boolop == OR_EXPR) + { + hasEquality &= list_length(attributeNumsInSubExpression) > 0; + } + attributeNums = list_concat(attributeNums, attributeNumsInSubExpression); + } + if (hasEquality) + { + return attributeNums; + } + return NIL; +} + + /* * JoinSequenceArray walks over the join nodes in the job query and constructs a join * sequence containing an entry for each joined table. The function then returns an diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 6b9858e23..4869ef7cf 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -49,6 +49,7 @@ #include "distributed/reference_table_utils.h" #include "distributed/relation_restriction_equivalence.h" #include "distributed/relay_utility.h" +#include "distributed/recursive_planning.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/shard_pruning.h" @@ -181,7 +182,6 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType taskAssignmentPolicy, List *placementList); -static bool IsLocalOrCitusLocalTable(Oid relationId); /* * CreateRouterPlan attempts to create a router executor plan for the given @@ -295,6 +295,32 @@ CreateSingleTaskRouterSelectPlan(DistributedPlan *distributedPlan, Query *origin } +/* + * IsRouterPlannable returns true if the given query can be planned by + * router planner. + */ +bool +IsRouterPlannable(Query *query, PlannerRestrictionContext *plannerRestrictionContext) +{ + /* copy the query as the following methods can change the underlying query */ + Query *copyQuery = copyObject(query); + DeferredErrorMessage *deferredErrorMessage = NULL; + if (copyQuery->commandType == CMD_SELECT) + { + deferredErrorMessage = MultiRouterPlannableQuery(copyQuery); + } + if (deferredErrorMessage) + { + return false; + } + + /* TODO:: we might not need this copy*/ + copyQuery = copyObject(query); + RouterJob(copyQuery, plannerRestrictionContext, &deferredErrorMessage); + return deferredErrorMessage == NULL; +} + + /* * ShardIntervalOpExpressions returns a list of OpExprs with exactly two * items in it. The list consists of shard interval ranges with partition columns @@ -511,8 +537,6 @@ IsTidColumn(Node *node) } -#include "distributed/recursive_planning.h" - /* * ModifyPartialQuerySupported implements a subset of what ModifyQuerySupported checks, * that subset being what's necessary to check modifying CTEs for. @@ -522,24 +546,25 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, Oid *distributedTableIdOutput) { DeferredErrorMessage *deferredError = DeferErrorIfModifyView(queryTree); - if (deferredError != NULL) { + if (deferredError != NULL) + { return deferredError; } uint32 rangeTableId = 1; CmdType commandType = queryTree->commandType; - Oid distributedTableId = ModifyQueryResultRelationId(queryTree); - *distributedTableIdOutput = distributedTableId; - if (ContainsTableToBeConvertedToSubquery(queryTree->rtable, distributedTableId)) + Oid resultRelationId = ModifyQueryResultRelationId(queryTree); + *distributedTableIdOutput = resultRelationId; + if (ContainsTableToBeConvertedToSubquery(queryTree->rtable, resultRelationId)) { return deferredError; } Var *partitionColumn = NULL; - if (IsCitusTable(distributedTableId)) + if (IsCitusTable(resultRelationId)) { - partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + partitionColumn = PartitionColumn(resultRelationId, rangeTableId); } deferredError = DeferErrorIfModifyView(queryTree); @@ -633,12 +658,12 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, } } - distributedTableId = ModifyQueryResultRelationId(queryTree); + resultRelationId = ModifyQueryResultRelationId(queryTree); rangeTableId = 1; - if (IsCitusTable(distributedTableId)) + if (IsCitusTable(resultRelationId)) { - partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + partitionColumn = PartitionColumn(resultRelationId, rangeTableId); } commandType = queryTree->commandType; if (commandType == CMD_INSERT || commandType == CMD_UPDATE || @@ -768,18 +793,11 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, /* set it for caller to use when we don't return any errors */ - *distributedTableIdOutput = distributedTableId; + *distributedTableIdOutput = resultRelationId; return NULL; } -static bool IsLocalOrCitusLocalTable(Oid relationId) { - if (!IsCitusTable(relationId)) { - return true; - } - return IsCitusTableType(relationId, CITUS_LOCAL_TABLE); -} - /* * NodeIsFieldStore returns true if given Node is a FieldStore object. @@ -903,7 +921,7 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer List *rangeTableList = NIL; uint32 queryTableCount = 0; CmdType commandType = queryTree->commandType; -bool fastPathRouterQuery = + bool fastPathRouterQuery = plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; /* @@ -941,6 +959,14 @@ bool fastPathRouterQuery = { ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList); } + RangeTblEntry *resultRte = ExtractResultRelationRTE(queryTree); + Oid resultRelationId = InvalidOid; + if (resultRte) + { + resultRelationId = resultRte->relid; + } + bool containsTableToBeConvertedToSubquery = + ContainsTableToBeConvertedToSubquery(queryTree->rtable, resultRelationId); RangeTblEntry *rangeTableEntry = NULL; foreach_ptr(rangeTableEntry, rangeTableList) @@ -965,25 +991,22 @@ bool fastPathRouterQuery = /* for other kinds of relations, check if its distributed */ else { - RangeTblEntry *resultRte = ExtractResultRelationRTE(queryTree); - Oid resultRelationId = InvalidOid; - if (resultRte) { - resultRelationId = resultRte->relid; - } if (IsLocalOrCitusLocalTable(rangeTableEntry->relid) && - ContainsTableToBeConvertedToSubquery(queryTree->rtable, resultRelationId) - ) - { + containsTableToBeConvertedToSubquery) + { StringInfo errorMessage = makeStringInfo(); char *relationName = get_rel_name(rangeTableEntry->relid); - if (IsCitusTable(rangeTableEntry->relid)) { - appendStringInfo(errorMessage, "citus local table %s cannot be used in this join", - relationName); - }else { - appendStringInfo(errorMessage, "relation %s is not distributed", - relationName); + if (IsCitusTable(rangeTableEntry->relid)) + { + appendStringInfo(errorMessage, + "citus local table %s cannot be joined with these distributed tables", + relationName); + } + else + { + appendStringInfo(errorMessage, "relation %s is not distributed", + relationName); } - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, NULL, NULL); } diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 390209fc0..f1c821690 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -163,14 +163,14 @@ static bool ShouldRecursivelyPlanSubquery(Query *subquery, static bool AllDistributionKeysInSubqueryAreEqual(Query *subquery, PlannerRestrictionContext * restrictionContext); -static bool AllDataLocallyAccessible(List *rangeTableList); +static bool AllDataLocallyAccessible(List *rangeTableList); static bool ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *context); +static void RecursivelyPlanSubquery(Query *subquery, + RecursivePlanningContext *planningContext); static void RecursivelyPlanSetOperations(Query *query, Node *node, RecursivePlanningContext *context); static bool IsLocalTableRteOrMatView(Node *node); -static void RecursivelyPlanSubquery(Query *subquery, - RecursivePlanningContext *planningContext); static DistributedSubPlan * CreateDistributedSubPlan(uint32 subPlanId, Query *subPlanQuery); static bool CteReferenceListWalker(Node *node, CteReferenceWalkerContext *context); @@ -290,12 +290,6 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context /* make sure function calls in joins are executed in the coordinator */ WrapFunctionsInSubqueries(query); - /* - * Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", so we - * recursively plan one side of the join so that the logical planner can plan. - */ - ConvertLocalTableJoinsToSubqueries(query, context); - /* descend into subqueries */ query_tree_walker(query, RecursivelyPlanSubqueryWalker, context, 0); @@ -346,6 +340,13 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context RecursivelyPlanNonColocatedSubqueries(query, context); } + /* + * Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", or + * a query with local table/citus local table and subquery. We convert local/citus local + * tables to a subquery until they can be planned + */ + ConvertUnplannableTableJoinsToSubqueries(query, context); + return NULL; } @@ -1346,13 +1347,15 @@ NodeContainsSubqueryReferencingOuterQuery(Node *node) return false; } + /* * ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry * with a subquery. The function also pushes down the filters to the subquery. */ void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList, - List *requiredAttrNumbers) + List *requiredAttrNumbers, + RecursivePlanningContext *context) { Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers); Expr *andedBoundExpressions = make_ands_explicit(restrictionList); @@ -1365,7 +1368,6 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict rangeTableEntry->rtekind = RTE_SUBQUERY; rangeTableEntry->subquery = subquery; - /* * If the relation is inherited, it'll still be inherited as * we've copied it earlier. This is to prevent the newly created @@ -1383,16 +1385,26 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict get_rel_name(rangeTableEntry->relid), ApplyLogRedaction(subqueryString->data)))); } + RecursivelyPlanSubquery(rangeTableEntry->subquery, context); } -bool ContainsTableToBeConvertedToSubquery(List* rangeTableList, Oid resultRelationId) { - if (AllDataLocallyAccessible(rangeTableList)) { + +/* + * ContainsTableToBeConvertedToSubquery checks if the given range table list contains + * any table that should be converted to a subquery, which otherwise is not plannable. + */ +bool +ContainsTableToBeConvertedToSubquery(List *rangeTableList, Oid resultRelationId) +{ + if (AllDataLocallyAccessible(rangeTableList)) + { return false; } - return ContainsLocalTableDistributedTableJoin(rangeTableList) || - ContainsLocalTableSubqueryJoin(rangeTableList, resultRelationId); + return ContainsLocalTableDistributedTableJoin(rangeTableList) || + ContainsLocalTableSubqueryJoin(rangeTableList, resultRelationId); } + /* * AllDataLocallyAccessible return true if all data for the relations in the * rangeTableList is locally accessible. @@ -1400,11 +1412,12 @@ bool ContainsTableToBeConvertedToSubquery(List* rangeTableList, Oid resultRelati static bool AllDataLocallyAccessible(List *rangeTableList) { - RangeTblEntry* rangeTableEntry = NULL; + RangeTblEntry *rangeTableEntry = NULL; foreach_ptr(rangeTableEntry, rangeTableList) { - if (rangeTableEntry->rtekind == RTE_SUBQUERY) { - // TODO:: check if it has distributed table + if (rangeTableEntry->rtekind == RTE_SUBQUERY) + { + /* TODO:: check if it has distributed table */ return false; } if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) @@ -1442,18 +1455,23 @@ AllDataLocallyAccessible(List *rangeTableList) return true; } + /* * SubqueryConvertableRelationForJoin returns true if the given range table entry * is a relation type that can be converted to a subquery. */ -bool SubqueryConvertableRelationForJoin(RangeTblEntry* rangeTableEntry) { - if (rangeTableEntry->rtekind != RTE_RELATION) { +bool +SubqueryConvertableRelationForJoin(RangeTblEntry *rangeTableEntry) +{ + if (rangeTableEntry->rtekind != RTE_RELATION) + { return false; } return rangeTableEntry->relkind == RELKIND_PARTITIONED_TABLE || - rangeTableEntry->relkind == RELKIND_RELATION; + rangeTableEntry->relkind == RELKIND_RELATION; } + /* * ContainsLocalTableDistributedTableJoin returns true if the input range table list * contains a direct join between local and distributed tables. @@ -1474,11 +1492,13 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList) continue; } - if (IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE) || IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE)) + if (IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE) || + IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE)) { containsDistributedTable = true; } - else if (IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE) || !IsCitusTable(rangeTableEntry->relid)) + else if (IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE) || + !IsCitusTable(rangeTableEntry->relid)) { /* we consider citus local tables as local table */ containsLocalTable = true; @@ -1504,7 +1524,8 @@ ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - if (rangeTableEntry->rtekind == RTE_SUBQUERY) { + if (rangeTableEntry->rtekind == RTE_SUBQUERY) + { containsSubquery = true; } @@ -1513,7 +1534,8 @@ ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId) continue; } - if (!IsCitusTable(rangeTableEntry->relid) && rangeTableEntry->relid != resultRelationId) + if (!IsCitusTable(rangeTableEntry->relid) && rangeTableEntry->relid != + resultRelationId) { containsLocalTable = true; } @@ -1522,6 +1544,7 @@ ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId) return containsLocalTable && containsSubquery; } + /* * WrapFunctionsInSubqueries iterates over all the immediate Range Table Entries * of a query and wraps the functions inside (SELECT * FROM fnc() f) @@ -1645,7 +1668,6 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry) subquery->targetList = lappend(subquery->targetList, targetEntry); } } - /* * If tupleDesc is NULL we have 2 different cases: * @@ -1695,7 +1717,6 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry) columnType = list_nth_oid(rangeTblFunction->funccoltypes, targetColumnIndex); } - /* use the types in the function definition otherwise */ else { diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index e138d9389..a744e80ff 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -1818,8 +1818,8 @@ FilterPlannerRestrictionForQuery(PlannerRestrictionContext *plannerRestrictionCo /* allocate the filtered planner restriction context and set all the fields */ PlannerRestrictionContext *filteredPlannerRestrictionContext = palloc0( sizeof(PlannerRestrictionContext)); - filteredPlannerRestrictionContext->fastPathRestrictionContext = - palloc0(sizeof(FastPathRestrictionContext)); + filteredPlannerRestrictionContext->fastPathRestrictionContext = palloc0( + sizeof(FastPathRestrictionContext)); filteredPlannerRestrictionContext->memoryContext = plannerRestrictionContext->memoryContext; @@ -1882,10 +1882,9 @@ GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, } List *restrictExprList = NIL; - ListCell *restrictCell = NULL; - foreach(restrictCell, baseRestrictInfo) + RestrictInfo *restrictInfo = NULL; + foreach_ptr(restrictInfo, baseRestrictInfo) { - RestrictInfo *restrictInfo = (RestrictInfo *) lfirst(restrictCell); Expr *restrictionClause = restrictInfo->clause; /* we cannot process Params beacuse they are not known at this point */ @@ -1912,10 +1911,9 @@ GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, Expr *copyOfRestrictClause = (Expr *) copyObject((Node *) restrictionClause); List *varClauses = pull_var_clause_default((Node *) copyOfRestrictClause); ListCell *varClauseCell = NULL; - foreach(varClauseCell, varClauses) + Var *column = NULL; + foreach_ptr(column, varClauses) { - Var *column = (Var *) lfirst(varClauseCell); - column->varno = rteIndex; column->varnosyn = rteIndex; } diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 3461e2246..e24b47757 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -186,8 +186,13 @@ extern List * PostprocessIndexStmt(Node *node, const char *queryString); extern void ErrorIfUnsupportedAlterIndexStmt(AlterTableStmt *alterTableStatement); extern void MarkIndexValid(IndexStmt *indexStmt); +<<<<<<< HEAD extern List * ExecuteFunctionOnEachTableIndex(Oid relationId, PGIndexProcessor pgIndexProcessor); +======= +extern List * ExecuteFunctionOnEachTableIndex(Oid relationId, IndexProcesor + indexProcessor); +>>>>>>> Increase readability of the current structure /* objectaddress.c - forward declarations */ extern ObjectAddress CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok); diff --git a/src/include/distributed/local_distributed_join_planner.h b/src/include/distributed/local_distributed_join_planner.h index 85d15b4ea..1fc7269a6 100644 --- a/src/include/distributed/local_distributed_join_planner.h +++ b/src/include/distributed/local_distributed_join_planner.h @@ -17,8 +17,7 @@ #include "distributed/recursive_planning.h" -extern void -ConvertLocalTableJoinsToSubqueries(Query *query, - RecursivePlanningContext *context); +extern void ConvertUnplannableTableJoinsToSubqueries(Query *query, + RecursivePlanningContext *context); #endif /* LOCAL_DISTRIBUTED_JOIN_PLANNER_H */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 7303e55fe..50d9bf251 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -134,6 +134,8 @@ typedef enum ANY_CITUS_TABLE_TYPE } CitusTableType; + +extern bool IsLocalOrCitusLocalTable(Oid relationId); extern bool IsCitusTableType(Oid relationId, CitusTableType tableType); extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry, CitusTableType tableType); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 2f0abb26b..24e60facd 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -589,6 +589,6 @@ extern RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *column List *funcColumnTypeMods, List *funcCollations); -extern List* FetchAttributeNumsForRTEFromQuals(Node* quals, Index rteIndex); +extern List * FetchEqualityAttrNumsForRTEFromQuals(Node *quals, Index rteIndex); #endif /* MULTI_PHYSICAL_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 18278bda5..7dff1015c 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -85,6 +85,8 @@ extern List * TargetShardIntervalForFastPathQuery(Query *query, extern void GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, List *placementList, uint64 shardId); +extern bool IsRouterPlannable(Query *query, + PlannerRestrictionContext *plannerRestrictionContext); /* * FastPathPlanner is a subset of router planner, that's why we prefer to diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index e4b46745f..6dd146482 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -62,10 +62,11 @@ extern bool GeneratingSubplans(void); extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList); extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList, - List *requiredAttrNumbers); -extern bool -ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId); -extern bool ContainsTableToBeConvertedToSubquery(List* rangeTableList, Oid resultRelationId); -extern bool SubqueryConvertableRelationForJoin(RangeTblEntry* rangeTableEntry); + List *requiredAttrNumbers, + RecursivePlanningContext *context); +extern bool ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId); +extern bool ContainsTableToBeConvertedToSubquery(List *rangeTableList, Oid + resultRelationId); +extern bool SubqueryConvertableRelationForJoin(RangeTblEntry *rangeTableEntry); #endif /* RECURSIVE_PLANNING_H */ diff --git a/src/test/regress/expected/local_table_join.out b/src/test/regress/expected/local_table_join.out index 95af74534..2dacb8db8 100644 --- a/src/test/regress/expected/local_table_join.out +++ b/src/test/regress/expected/local_table_join.out @@ -271,8 +271,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key = ( SELECT count(*) FROM distributed_table_pkey ); -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS count FROM local_table_join.distributed_table_pkey +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR (distributed_table_pkey.key OPERATOR(pg_catalog.=) (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)))))) count @@ -316,6 +316,34 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON postgres_table.key = 10; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON ((postgres_table.key OPERATOR(pg_catalog.=) 10))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN (SELECT * FROM distributed_table) d1 USING(key); +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN (SELECT distributed_table.key, distributed_table.value, distributed_table.value_2 FROM local_table_join.distributed_table) d1 USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- since this is already router plannable, we don't recursively plan the postgres table +SELECT count(*) FROM postgres_table JOIN (SELECT * FROM distributed_table LIMIT 1) d1 USING(key); +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.distributed_table LIMIT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1 USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + -- a unique index on key so dist table should be recursively planned SELECT count(*) FROM postgres_table JOIN distributed_table_windex USING(key); DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 @@ -395,8 +423,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c -- only local tables are recursively planned SELECT count(*) FROM distributed_table d1 JOIN postgres_table p1 USING(key) JOIN distributed_table d2 USING(key) JOIN postgres_table p2 USING(key); DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.distributed_table d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1 USING (key)) JOIN local_table_join.distributed_table d2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 USING (key)) count @@ -411,8 +439,8 @@ FROM WHERE d1.value = '1'; DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.distributed_table d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1 USING (key)) JOIN local_table_join.distributed_table d2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 USING (key)) WHERE (d1.value OPERATOR(pg_catalog.=) '1'::text) count @@ -429,8 +457,8 @@ FROM WHERE d1.key = 1; DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.distributed_table d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1 USING (key)) JOIN local_table_join.distributed_table d2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 USING (key)) WHERE (d1.key OPERATOR(pg_catalog.=) 1) count @@ -586,10 +614,30 @@ FROM WHERE distributed_table.key = p1.key AND p1.key = p2.key; DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1, (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 WHERE ((distributed_table.key OPERATOR(pg_catalog.=) p1.key) AND (p1.key OPERATOR(pg_catalog.=) p2.key)) +UPDATE + postgres_table +SET + value = 'test' +FROM + (SELECT * FROM distributed_table) d1 +WHERE + d1.key = postgres_table.key; +ERROR: relation postgres_table is not distributed +UPDATE + postgres_table +SET + value = 'test' +FROM + (SELECT * FROM distributed_table LIMIT 1) d1 +WHERE + d1.key = postgres_table.key; +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.distributed_table LIMIT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1 WHERE (d1.key OPERATOR(pg_catalog.=) postgres_table.key) UPDATE distributed_table SET @@ -612,8 +660,8 @@ FROM WHERE postgres_table.key = d1.key AND d1.key = d2.key; DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d1 WHERE true OFFSET 0 -DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d1 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d2 WHERE true OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d2 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1, (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d2 WHERE ((postgres_table.key OPERATOR(pg_catalog.=) d1.key) AND (d1.key OPERATOR(pg_catalog.=) d2.key)) -- currently can't plan subquery-local table join @@ -723,8 +771,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c SELECT count(*) FROM citus_local JOIN distributed_table USING(key) JOIN postgres_table USING (key) JOIN reference_table USING(key); DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 -DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (key)) JOIN local_table_join.postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) reference_table USING (key)) count @@ -735,8 +783,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key) JOIN citus_local USING(key) WHERE distributed_partitioned_table.key > 10 and distributed_partitioned_table.key = 10; DEBUG: Wrapping local relation "distributed_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.distributed_partitioned_table WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 -DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.distributed_partitioned_table WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 +DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) distributed_partitioned_table JOIN local_table_join.postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) reference_table USING (key)) JOIN local_table_join.citus_local USING (key)) WHERE ((distributed_partitioned_table.key OPERATOR(pg_catalog.>) 10) AND (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10)) count diff --git a/src/test/regress/sql/local_table_join.sql b/src/test/regress/sql/local_table_join.sql index 517a529cf..0fb65d824 100644 --- a/src/test/regress/sql/local_table_join.sql +++ b/src/test/regress/sql/local_table_join.sql @@ -45,7 +45,7 @@ SELECT count(*) FROM postgres_table JOIN reference_table USING(key); -- switch back to the default policy, which is auto SET citus.local_table_join_policy to 'auto'; --- on the auto mode, the local tables should be recursively planned +-- on the auto mode, the local tables should be recursively planned -- unless a unique index exists in a column for distributed table SELECT count(*) FROM distributed_table JOIN postgres_table USING(key); SELECT count(*) FROM reference_table JOIN postgres_table USING(key); @@ -80,6 +80,12 @@ SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_t SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key > 10 and distributed_table_pkey.value = 'notext'); SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key = 10 and distributed_table_pkey.value = 'notext'); +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON postgres_table.key = 10; + + +SELECT count(*) FROM postgres_table JOIN (SELECT * FROM distributed_table) d1 USING(key); +-- since this is already router plannable, we don't recursively plan the postgres table +SELECT count(*) FROM postgres_table JOIN (SELECT * FROM distributed_table LIMIT 1) d1 USING(key); -- a unique index on key so dist table should be recursively planned SELECT count(*) FROM postgres_table JOIN distributed_table_windex USING(key); @@ -93,7 +99,7 @@ SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE dist SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.key = 1; --- if both local and distributed tables have a filter, we prefer local unless distributed table has unique indexes on any equality filter +-- if both local and distributed tables have a filter, we prefer local unless distributed table has unique indexes on any equality filter SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' AND postgres_table.value = 'test'; SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' OR postgres_table.value = 'test'; @@ -158,9 +164,9 @@ SET FROM postgres_table WHERE - distributed_table_windex.key = postgres_table.key; + distributed_table_windex.key = postgres_table.key; --- in case of update/delete we always recursively plan +-- in case of update/delete we always recursively plan -- the tables other than target table no matter what the policy is SET citus.local_table_join_policy TO 'prefer-local'; @@ -200,7 +206,7 @@ SET FROM postgres_table WHERE - distributed_table_windex.key = postgres_table.key; + distributed_table_windex.key = postgres_table.key; SET citus.local_table_join_policy TO 'prefer-distributed'; @@ -240,7 +246,7 @@ SET FROM postgres_table WHERE - distributed_table_windex.key = postgres_table.key; + distributed_table_windex.key = postgres_table.key; -- modifications with multiple tables UPDATE @@ -252,6 +258,23 @@ FROM WHERE distributed_table.key = p1.key AND p1.key = p2.key; +UPDATE + postgres_table +SET + value = 'test' +FROM + (SELECT * FROM distributed_table) d1 +WHERE + d1.key = postgres_table.key; + +UPDATE + postgres_table +SET + value = 'test' +FROM + (SELECT * FROM distributed_table LIMIT 1) d1 +WHERE + d1.key = postgres_table.key; UPDATE distributed_table @@ -274,8 +297,8 @@ WHERE postgres_table.key = d1.key AND d1.key = d2.key; -- currently can't plan subquery-local table join -SELECT count(*) -FROM +SELECT count(*) +FROM (SELECT * FROM (SELECT * FROM distributed_table) d1) d2 JOIN postgres_table USING(key); @@ -318,7 +341,7 @@ SET FROM citus_local WHERE - distributed_table_windex.key = citus_local.key; + distributed_table_windex.key = citus_local.key; UPDATE citus_local @@ -327,7 +350,7 @@ SET FROM distributed_table_windex WHERE - distributed_table_windex.key = citus_local.key; + distributed_table_windex.key = citus_local.key; DROP TABLE citus_local; RESET client_min_messages;