diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index 4a31aa543..2ff2257ec 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -54,20 +54,34 @@ #include "utils/guc.h" #include "utils/lsyscache.h" -static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList); +typedef struct RTEToSubqueryConverterReference { + RangeTblEntry* rangeTableEntry; + Index rteIndex; + List* restrictionList; + List* requiredAttributeNumbers; +} RTEToSubqueryConverterReference; + +typedef struct RTEToSubqueryConverterContext{ + List* distributedTableList; /* reference or distributed table */ + List* localTableList; + List* citusLocalTableList; + bool hasSubqueryRTE; +}RTEToSubqueryConverterContext; + +static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList, Oid resultRelationId); static bool HasUniqueFilter(RangeTblEntry* distRTE, List* distRTERestrictionList, List* requiredAttrNumbersForDistRTE); -static void AutoConvertLocalTableJoinToSubquery(RangeTblEntry* localRTE, RangeTblEntry* distRTE, - List* localRTERestrictionList, List* distRTERestrictionList, - List *requiredAttrNumbersForLocalRTE, List *requiredAttrNumbersForDistRTE); +static bool AutoConvertLocalTableJoinToSubquery(FromExpr* joinTree, + RTEToSubqueryConverterReference* distRTEContext); static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, RecursivePlanningContext *planningContext); -static RangeTblEntry * FindNextRTECandidate(PlannerRestrictionContext * - plannerRestrictionContext, - List *rangeTableList, List **restrictionList, - bool localTable); -static bool AllDataLocallyAccessible(List *rangeTableList); +static RTEToSubqueryConverterContext * CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context, + List *rangeTableList); static void GetAllUniqueIndexes(Form_pg_index indexForm, List** uniqueIndexes); - +static RTEToSubqueryConverterReference* + GetNextRTEToConvertToSubquery(FromExpr* joinTree, RTEToSubqueryConverterContext* rteToSubqueryConverterContext, + PlannerRestrictionContext *plannerRestrictionContext, RangeTblEntry* resultRelation); +static void PopFromRTEToSubqueryConverterContext(RTEToSubqueryConverterContext* rteToSubqueryConverterContext, + bool isCitusLocalTable); /* * ConvertLocalTableJoinsToSubqueries gets a query and the planner * restrictions. As long as there is a join between a local table @@ -83,70 +97,109 @@ ConvertLocalTableJoinsToSubqueries(Query *query, RecursivePlanningContext *context) { List *rangeTableList = query->rtable; - if(!ShouldConvertLocalTableJoinsToSubqueries(rangeTableList)) { + RangeTblEntry *resultRelation = ExtractResultRelationRTE(query); + Oid resultRelationId = InvalidOid; + if (resultRelation) { + resultRelationId = resultRelation->relid; + } + if (!ShouldConvertLocalTableJoinsToSubqueries(rangeTableList, resultRelationId)) { return; } + RTEToSubqueryConverterContext* rteToSubqueryConverterContext = CreateRTEToSubqueryConverterContext( + context, rangeTableList); - RangeTblEntry *resultRelation = ExtractResultRelationRTE(query); - while (ContainsLocalTableDistributedTableJoin(rangeTableList)) + RTEToSubqueryConverterReference* rteToSubqueryConverterReference = + GetNextRTEToConvertToSubquery(query->jointree, rteToSubqueryConverterContext, + context->plannerRestrictionContext, resultRelation); + while (rteToSubqueryConverterReference) { - List *localTableRestrictList = NIL; - List *distributedTableRestrictList = NIL; + ReplaceRTERelationWithRteSubquery(rteToSubqueryConverterReference->rangeTableEntry, + rteToSubqueryConverterReference->restrictionList, + rteToSubqueryConverterReference->requiredAttributeNumbers); + rteToSubqueryConverterReference = + GetNextRTEToConvertToSubquery(query->jointree, rteToSubqueryConverterContext, + context->plannerRestrictionContext, resultRelation); + } +} - bool localTable = true; +static RTEToSubqueryConverterReference* + GetNextRTEToConvertToSubquery(FromExpr* joinTree, RTEToSubqueryConverterContext* rteToSubqueryConverterContext, + PlannerRestrictionContext *plannerRestrictionContext, RangeTblEntry* resultRelation) { - PlannerRestrictionContext *plannerRestrictionContext = - context->plannerRestrictionContext; - RangeTblEntry *localRTECandidate = - FindNextRTECandidate(plannerRestrictionContext, rangeTableList, - &localTableRestrictList, localTable); - RangeTblEntry *distributedRTECandidate = - FindNextRTECandidate(plannerRestrictionContext, rangeTableList, - &distributedTableRestrictList, !localTable); + RTEToSubqueryConverterReference* localRTECandidate = NULL; + RTEToSubqueryConverterReference* nonLocalRTECandidate = NULL; + bool citusLocalTableChosen = false; - List *requiredAttrNumbersForLocalRte = - RequiredAttrNumbersForRelation(localRTECandidate, context); - List *requiredAttrNumbersForDistributedRte = - RequiredAttrNumbersForRelation(distributedRTECandidate, context); + if (list_length(rteToSubqueryConverterContext->localTableList) > 0) { + localRTECandidate = linitial(rteToSubqueryConverterContext->localTableList); + }else if (list_length(rteToSubqueryConverterContext->citusLocalTableList) > 0) { + localRTECandidate = linitial(rteToSubqueryConverterContext->citusLocalTableList); + citusLocalTableChosen = true; + } + if (localRTECandidate == NULL) { + return NULL; + } - if (resultRelation) { + if (list_length(rteToSubqueryConverterContext->distributedTableList) > 0) { + nonLocalRTECandidate = linitial(rteToSubqueryConverterContext->distributedTableList); + } + if (nonLocalRTECandidate == NULL && !rteToSubqueryConverterContext->hasSubqueryRTE) { + return NULL; + } - if (resultRelation->relid == localRTECandidate->relid) { - ReplaceRTERelationWithRteSubquery(distributedRTECandidate, - distributedTableRestrictList, - requiredAttrNumbersForDistributedRte); - continue; - }else if (resultRelation->relid == distributedRTECandidate->relid) { - ReplaceRTERelationWithRteSubquery(localRTECandidate, - localTableRestrictList, - requiredAttrNumbersForLocalRte); - continue; - } + if (resultRelation) { + if(resultRelation == localRTECandidate->rangeTableEntry) { + rteToSubqueryConverterContext->distributedTableList = list_delete_first( + rteToSubqueryConverterContext->distributedTableList + ); + return nonLocalRTECandidate; + } + if (resultRelation == nonLocalRTECandidate->rangeTableEntry) { + PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen); + return localRTECandidate; + } + } + + if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL) { + PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen); + return localRTECandidate; + }else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) { + if (nonLocalRTECandidate) { + rteToSubqueryConverterContext->distributedTableList = list_delete_first( + rteToSubqueryConverterContext->distributedTableList + ); + return nonLocalRTECandidate; + }else { + PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen); + return localRTECandidate; } - if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL) - { - ReplaceRTERelationWithRteSubquery(localRTECandidate, - localTableRestrictList, - requiredAttrNumbersForLocalRte); - } - else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) - { - ReplaceRTERelationWithRteSubquery(distributedRTECandidate, - distributedTableRestrictList, - requiredAttrNumbersForDistributedRte); - } - else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_AUTO) - { - AutoConvertLocalTableJoinToSubquery(localRTECandidate, distributedRTECandidate, - localTableRestrictList, distributedTableRestrictList, - requiredAttrNumbersForLocalRte, requiredAttrNumbersForDistributedRte); - } - else - { - elog(ERROR, "unexpected local table join policy: %d", LocalTableJoinPolicy); + }else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_AUTO) { + bool shouldConvertNonLocalTable = AutoConvertLocalTableJoinToSubquery(joinTree, nonLocalRTECandidate); + if (shouldConvertNonLocalTable) { + rteToSubqueryConverterContext->distributedTableList = list_delete_first( + rteToSubqueryConverterContext->distributedTableList + ); + return nonLocalRTECandidate; + }else { + PopFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext,citusLocalTableChosen); + return localRTECandidate; } + }else { + elog(ERROR, "unexpected local table join policy: %d", LocalTableJoinPolicy); + } + return NULL; +} + +static void PopFromRTEToSubqueryConverterContext(RTEToSubqueryConverterContext* rteToSubqueryConverterContext, + bool isCitusLocalTable) { + if (isCitusLocalTable) { + rteToSubqueryConverterContext->citusLocalTableList = + list_delete_first(rteToSubqueryConverterContext->citusLocalTableList); + }else { + rteToSubqueryConverterContext->localTableList = + list_delete_first(rteToSubqueryConverterContext->localTableList); } } @@ -154,46 +207,42 @@ ConvertLocalTableJoinsToSubqueries(Query *query, * ShouldConvertLocalTableJoinsToSubqueries returns true if we should * convert local-dist table joins to subqueries. */ -static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList) { +static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList, Oid resultRelationId) { if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER) { /* user doesn't want Citus to enable local table joins */ return false; } - - if (!ContainsLocalTableDistributedTableJoin(rangeTableList)) - { - /* nothing to do as there are no relevant joins */ - return false; - } - - if (AllDataLocallyAccessible(rangeTableList)) - { - /* recursively planning is overkill, router planner can already handle this */ + if (!ContainsTableToBeConvertedToSubquery(rangeTableList, resultRelationId)) { return false; } return true; } -static void AutoConvertLocalTableJoinToSubquery(RangeTblEntry* localRTE, RangeTblEntry* distRTE, - List* localRTERestrictionList, List* distRTERestrictionList, - List *requiredAttrNumbersForLocalRTE, List *requiredAttrNumbersForDistRTE) { - - bool hasUniqueFilter = HasUniqueFilter(distRTE, distRTERestrictionList, requiredAttrNumbersForDistRTE); - if (hasUniqueFilter) { - ReplaceRTERelationWithRteSubquery(distRTE, - distRTERestrictionList, - requiredAttrNumbersForDistRTE); - }else { - ReplaceRTERelationWithRteSubquery(localRTE, - localRTERestrictionList, - requiredAttrNumbersForLocalRTE); +static bool AutoConvertLocalTableJoinToSubquery(FromExpr* joinTree, + RTEToSubqueryConverterReference* rteToSubqueryConverterReference) { + if (rteToSubqueryConverterReference == NULL) { + return false; } + List* distRTEEqualityQuals = + FetchAttributeNumsForRTEFromQuals(joinTree->quals, rteToSubqueryConverterReference->rteIndex); + + Node* join = NULL; + foreach_ptr(join, joinTree->fromlist) { + if (IsA(join, JoinExpr)) { + JoinExpr* joinExpr = (JoinExpr*) join; + distRTEEqualityQuals = list_concat(distRTEEqualityQuals, + FetchAttributeNumsForRTEFromQuals(joinExpr->quals, rteToSubqueryConverterReference->rteIndex) + ); + } + } + + bool hasUniqueFilter = HasUniqueFilter(rteToSubqueryConverterReference->rangeTableEntry, + rteToSubqueryConverterReference->restrictionList, distRTEEqualityQuals); + return hasUniqueFilter; } -// TODO:: This function should only consider equality, -// currently it will return true for dist.a > 5. We should check this from join->quals. static bool HasUniqueFilter(RangeTblEntry* distRTE, List* distRTERestrictionList, List* requiredAttrNumbersForDistRTE) { List* uniqueIndexes = ExecuteFunctionOnEachTableIndex(distRTE->relid, GetAllUniqueIndexes); int columnNumber = 0; @@ -237,6 +286,10 @@ RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, filteredPlannerRestrictionContext->relationRestrictionContext; List *filteredRelationRestrictionList = relationRestrictionContext->relationRestrictionList; + + if (list_length(filteredRelationRestrictionList) == 0) { + return NIL; + } RelationRestriction *relationRestriction = (RelationRestriction *) linitial(filteredRelationRestrictionList); @@ -265,23 +318,27 @@ RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, /* - * FindNextRTECandidate returns a range table entry which has the most filters + * CreateRTEToSubqueryConverterContext returns a range table entry which has the most filters * on it along with the restrictions (e.g., fills **restrictionList). * * The function also gets a boolean localTable parameter, so the caller * can choose to run the function for only local tables or distributed tables. */ -static RangeTblEntry * -FindNextRTECandidate(PlannerRestrictionContext *plannerRestrictionContext, - List *rangeTableList, List **restrictionList, - bool localTable) +static RTEToSubqueryConverterContext* +CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context, + List *rangeTableList) { - ListCell *rangeTableCell = NULL; + + RTEToSubqueryConverterContext* rteToSubqueryConverterContext = palloc0(sizeof(RTEToSubqueryConverterContext)); - foreach(rangeTableCell, rangeTableList) + int rteIndex = 0; + RangeTblEntry* rangeTableEntry = NULL; + foreach_ptr(rangeTableEntry, rangeTableList) { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - + rteIndex++; + if (rangeTableEntry->rtekind == RTE_SUBQUERY) { + rteToSubqueryConverterContext->hasSubqueryRTE = true; + } /* we're only interested in tables */ if (!(rangeTableEntry->rtekind == RTE_RELATION && rangeTableEntry->relkind == RELKIND_RELATION)) @@ -289,73 +346,25 @@ FindNextRTECandidate(PlannerRestrictionContext *plannerRestrictionContext, continue; } - if (localTable && IsCitusTable(rangeTableEntry->relid)) - { - continue; - } + RTEToSubqueryConverterReference* rteToSubqueryConverter = palloc(sizeof(RTEToSubqueryConverterReference)); + rteToSubqueryConverter->rangeTableEntry = rangeTableEntry; + rteToSubqueryConverter->rteIndex = rteIndex; + rteToSubqueryConverter->restrictionList = GetRestrictInfoListForRelation(rangeTableEntry, + context->plannerRestrictionContext, 1); + rteToSubqueryConverter->requiredAttributeNumbers = RequiredAttrNumbersForRelation(rangeTableEntry, context); - if (!localTable && !IsCitusTable(rangeTableEntry->relid)) - { - continue; - } - - List *currentRestrictionList = - GetRestrictInfoListForRelation(rangeTableEntry, - plannerRestrictionContext, 1); - - *restrictionList = currentRestrictionList; - return rangeTableEntry; - } - // TODO:: Put Illegal state error code - ereport(ERROR, (errmsg("unexpected state: could not find any RTE to convert to subquery in range table list"))); - return NULL; -} - -/* - * AllDataLocallyAccessible return true if all data for the relations in the - * rangeTableList is locally accessible. - */ -static bool -AllDataLocallyAccessible(List *rangeTableList) -{ - ListCell *rangeTableCell = NULL; - foreach(rangeTableCell, rangeTableList) - { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - - /* we're only interested in tables */ - if (!(rangeTableEntry->rtekind == RTE_RELATION && - rangeTableEntry->relkind == RELKIND_RELATION)) - { - continue; - } - - - Oid relationId = rangeTableEntry->relid; - - if (!IsCitusTable(relationId)) - { - /* local tables are locally accessible */ - continue; - } - - List *shardIntervalList = LoadShardIntervalList(relationId); - if (list_length(shardIntervalList) > 1) - { - /* we currently only consider single placement tables */ - return false; - } - - ShardInterval *shardInterval = linitial(shardIntervalList); - uint64 shardId = shardInterval->shardId; - ShardPlacement *localShardPlacement = - ShardPlacementOnGroup(shardId, GetLocalGroupId()); - if (localShardPlacement == NULL) - { - /* the table doesn't have a placement on this node */ - return false; + bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE) || + IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE); + if (referenceOrDistributedTable) { + rteToSubqueryConverterContext->distributedTableList = + lappend(rteToSubqueryConverterContext->distributedTableList, rteToSubqueryConverter); + }else if (IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE)) { + rteToSubqueryConverterContext->citusLocalTableList = + lappend(rteToSubqueryConverterContext->citusLocalTableList, rteToSubqueryConverter); + }else { + rteToSubqueryConverterContext->localTableList = + lappend(rteToSubqueryConverterContext->localTableList, rteToSubqueryConverter); } } - - return true; + return rteToSubqueryConverterContext; } \ No newline at end of file diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 1dd5c661a..6e05268f9 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -3588,6 +3588,56 @@ NodeIsRangeTblRefReferenceTable(Node *node, List *rangeTableList) return IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE); } +List* FetchAttributeNumsForRTEFromQuals(Node* quals, Index rteIndex) { + List* attributeNums = NIL; + if (quals == NULL) + { + return NIL; + } + + if (IsA(quals, OpExpr)) + { + if (!NodeIsEqualsOpExpr(quals)) + { + return NIL; + } + OpExpr *nextJoinClauseOpExpr = castNode(OpExpr, quals); + + Var* var = NULL; + if (VarConstOpExprClause(nextJoinClauseOpExpr, &var, NULL)) { + attributeNums = lappend_int(attributeNums, var->varattno); + return attributeNums; + } + + } + else if (IsA(quals, BoolExpr)) + { + BoolExpr *boolExpr = (BoolExpr *) quals; + + if (boolExpr->boolop != AND_EXPR && boolExpr->boolop != OR_EXPR) { + return attributeNums; + } + + bool hasEquality = true; + Node* arg = NULL; + foreach_ptr(arg, boolExpr->args) + { + List* attributeNumsInSubExpression = FetchAttributeNumsForRTEFromQuals(arg, rteIndex); + if (boolExpr->boolop == AND_EXPR) + { + hasEquality |= list_length(attributeNumsInSubExpression) > 0; + }else if (boolExpr->boolop == OR_EXPR){ + hasEquality &= list_length(attributeNumsInSubExpression) > 0; + } + attributeNums = list_concat(attributeNums, attributeNumsInSubExpression); + + } + if (hasEquality) { + return attributeNums; + } + } + return NIL; +} /* * JoinSequenceArray walks over the join nodes in the job query and constructs a join diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 24414fa90..6b9858e23 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -181,7 +181,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType taskAssignmentPolicy, List *placementList); - +static bool IsLocalOrCitusLocalTable(Oid relationId); /* * CreateRouterPlan attempts to create a router executor plan for the given @@ -530,11 +530,11 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, Oid distributedTableId = ModifyQueryResultRelationId(queryTree); *distributedTableIdOutput = distributedTableId; - if (ContainsLocalTableDistributedTableJoin(queryTree->rtable)) + if (ContainsTableToBeConvertedToSubquery(queryTree->rtable, distributedTableId)) { return deferredError; } - + Var *partitionColumn = NULL; if (IsCitusTable(distributedTableId)) @@ -773,6 +773,13 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, return NULL; } +static bool IsLocalOrCitusLocalTable(Oid relationId) { + if (!IsCitusTable(relationId)) { + return true; + } + return IsCitusTableType(relationId, CITUS_LOCAL_TABLE); +} + /* * NodeIsFieldStore returns true if given Node is a FieldStore object. @@ -896,7 +903,7 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer List *rangeTableList = NIL; uint32 queryTableCount = 0; CmdType commandType = queryTree->commandType; - bool fastPathRouterQuery = +bool fastPathRouterQuery = plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; /* @@ -958,13 +965,24 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer /* for other kinds of relations, check if its distributed */ else { - if (ContainsLocalTableDistributedTableJoin(queryTree->rtable)) - { + RangeTblEntry *resultRte = ExtractResultRelationRTE(queryTree); + Oid resultRelationId = InvalidOid; + if (resultRte) { + resultRelationId = resultRte->relid; + } + if (IsLocalOrCitusLocalTable(rangeTableEntry->relid) && + ContainsTableToBeConvertedToSubquery(queryTree->rtable, resultRelationId) + ) + { StringInfo errorMessage = makeStringInfo(); char *relationName = get_rel_name(rangeTableEntry->relid); - - appendStringInfo(errorMessage, "relation %s is not distributed", + if (IsCitusTable(rangeTableEntry->relid)) { + appendStringInfo(errorMessage, "citus local table %s cannot be used in this join", relationName); + }else { + appendStringInfo(errorMessage, "relation %s is not distributed", + relationName); + } return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, NULL, NULL); diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 96cd55845..2213f92d5 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -163,6 +163,7 @@ static bool ShouldRecursivelyPlanSubquery(Query *subquery, static bool AllDistributionKeysInSubqueryAreEqual(Query *subquery, PlannerRestrictionContext * restrictionContext); +static bool AllDataLocallyAccessible(List *rangeTableList); static bool ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *context); static void RecursivelyPlanSetOperations(Query *query, Node *node, @@ -184,6 +185,7 @@ static Query * BuildReadIntermediateResultsQuery(List *targetEntryList, List *columnAliasList, Const *resultIdConst, Oid functionOid, bool useBinaryCopyFormat); + /* * GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs. * The function returns the subplans if necessary. For the details of when/how subplans are @@ -1383,6 +1385,64 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict } } +bool ContainsTableToBeConvertedToSubquery(List* rangeTableList, Oid resultRelationId) { + if (AllDataLocallyAccessible(rangeTableList)) { + return false; + } + return ContainsLocalTableDistributedTableJoin(rangeTableList) || + ContainsLocalTableSubqueryJoin(rangeTableList, resultRelationId); +} + +/* + * AllDataLocallyAccessible return true if all data for the relations in the + * rangeTableList is locally accessible. + */ +static bool +AllDataLocallyAccessible(List *rangeTableList) +{ + RangeTblEntry* rangeTableEntry = NULL; + foreach_ptr(rangeTableEntry, rangeTableList) + { + if (rangeTableEntry->rtekind == RTE_SUBQUERY) { + // TODO:: check if it has distributed table + return false; + } + /* we're only interested in tables */ + if (!(rangeTableEntry->rtekind == RTE_RELATION && + rangeTableEntry->relkind == RELKIND_RELATION)) + { + continue; + } + + + Oid relationId = rangeTableEntry->relid; + + if (!IsCitusTable(relationId)) + { + /* local tables are locally accessible */ + continue; + } + + List *shardIntervalList = LoadShardIntervalList(relationId); + if (list_length(shardIntervalList) != 1) + { + /* we currently only consider single placement tables */ + return false; + } + + ShardInterval *shardInterval = linitial(shardIntervalList); + uint64 shardId = shardInterval->shardId; + ShardPlacement *localShardPlacement = + ShardPlacementOnGroup(shardId, GetLocalGroupId()); + if (localShardPlacement == NULL) + { + /* the table doesn't have a placement on this node */ + return false; + } + } + + return true; +} /* * ContainsLocalTableDistributedTableJoin returns true if the input range table list @@ -1400,19 +1460,20 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList) RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); /* we're only interested in tables */ - if (rangeTableEntry->rtekind != RTE_RELATION || - rangeTableEntry->relkind != RELKIND_RELATION) + /* TODO:: What about partitioned tables? */ + if (!(rangeTableEntry->rtekind == RTE_RELATION && + rangeTableEntry->relkind == RELKIND_RELATION)) { continue; } - /* TODO: do NOT forget Citus local tables */ - if (IsCitusTable(rangeTableEntry->relid)) + if (IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE) || IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE)) { containsDistributedTable = true; } - else + else if (IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE) || !IsCitusTable(rangeTableEntry->relid)) { + /* we consider citus local tables as local table */ containsLocalTable = true; } } @@ -1421,6 +1482,41 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList) } +/* + * ContainsLocalTableSubqueryJoin returns true if the input range table list + * contains a direct join between local table/citus local table and subquery. + */ +bool +ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId) +{ + bool containsLocalTable = false; + bool containsSubquery = false; + + ListCell *rangeTableCell = NULL; + foreach(rangeTableCell, rangeTableList) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + + if (rangeTableEntry->rtekind == RTE_SUBQUERY) { + containsSubquery = true; + } + /* we're only interested in tables */ + /* TODO:: What about partitioned tables? */ + if (!(rangeTableEntry->rtekind == RTE_RELATION && + rangeTableEntry->relkind == RELKIND_RELATION)) + { + continue; + } + + if (!IsCitusTable(rangeTableEntry->relid) && rangeTableEntry->relid != resultRelationId) + { + containsLocalTable = true; + } + } + + return containsLocalTable && containsSubquery; +} + /* * WrapFunctionsInSubqueries iterates over all the immediate Range Table Entries * of a query and wraps the functions inside (SELECT * FROM fnc() f) diff --git a/src/backend/distributed/planner/shard_pruning.c b/src/backend/distributed/planner/shard_pruning.c index b7133561d..26b84d9ee 100644 --- a/src/backend/distributed/planner/shard_pruning.c +++ b/src/backend/distributed/planner/shard_pruning.c @@ -254,8 +254,6 @@ static bool IsValidPartitionKeyRestriction(OpExpr *opClause); static void AddPartitionKeyRestrictionToInstance(ClauseWalkerContext *context, OpExpr *opClause, Var *varClause, Const *constantClause); -static bool VarConstOpExprClause(OpExpr *opClause, Var **varClause, - Const **constantClause); static void AddSAOPartitionKeyRestrictionToInstance(ClauseWalkerContext *context, ScalarArrayOpExpr * arrayOperatorExpression); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 9f90798da..2f0abb26b 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -589,5 +589,6 @@ extern RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *column List *funcColumnTypeMods, List *funcCollations); +extern List* FetchAttributeNumsForRTEFromQuals(Node* quals, Index rteIndex); #endif /* MULTI_PHYSICAL_PLANNER_H */ diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index d6fed178e..d48a20f13 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -63,6 +63,8 @@ extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList); extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList, List *requiredAttrNumbers); - +extern bool +ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId); +extern bool ContainsTableToBeConvertedToSubquery(List* rangeTableList, Oid resultRelationId); #endif /* RECURSIVE_PLANNING_H */ diff --git a/src/include/distributed/shard_pruning.h b/src/include/distributed/shard_pruning.h index 8f8ca69e7..04176314e 100644 --- a/src/include/distributed/shard_pruning.h +++ b/src/include/distributed/shard_pruning.h @@ -24,4 +24,6 @@ extern List * get_all_actual_clauses(List *restrictinfo_list); extern Const * TransformPartitionRestrictionValue(Var *partitionColumn, Const *restrictionValue, bool missingOk); +bool VarConstOpExprClause(OpExpr *opClause, Var **varClause, Const **constantClause); + #endif /* SHARD_PRUNING_H_ */ diff --git a/src/test/regress/expected/local_table_join.out b/src/test/regress/expected/local_table_join.out index be290b200..eac5a5087 100644 --- a/src/test/regress/expected/local_table_join.out +++ b/src/test/regress/expected/local_table_join.out @@ -114,9 +114,9 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c -- a unique index on key so dist table should be recursively planned SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key); -DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0 -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey USING (key)) +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey USING (key)) count --------------------------------------------------------------------- 0 @@ -132,9 +132,9 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON postgres_table.key = distributed_table_pkey.key; -DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0 -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON ((postgres_table.key OPERATOR(pg_catalog.=) distributed_table_pkey.key))) +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON ((postgres_table.key OPERATOR(pg_catalog.=) distributed_table_pkey.key))) count --------------------------------------------------------------------- 0 @@ -149,11 +149,141 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) +-- it should favor distributed table only if it has equality on the unique column +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key > 10; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON ((distributed_table_pkey.key OPERATOR(pg_catalog.>) 10))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key < 10; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON ((distributed_table_pkey.key OPERATOR(pg_catalog.<) 10))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10; +DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON ((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 AND distributed_table_pkey.key > 10 ; +DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_pkey.key OPERATOR(pg_catalog.>) 10)))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 AND distributed_table_pkey.key > 10 ; +DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_pkey.key OPERATOR(pg_catalog.>) 10)))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 AND distributed_table_pkey.key > 10 AND postgres_table.key = 5; +DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_pkey.key OPERATOR(pg_catalog.>) 10) AND (postgres_table.key OPERATOR(pg_catalog.=) 5)))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key > 10; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR (distributed_table_pkey.key OPERATOR(pg_catalog.>) 10)))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key = 20; +DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR (key OPERATOR(pg_catalog.=) 20)) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR (key OPERATOR(pg_catalog.=) 20)) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20)))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key = 20 OR distributed_table_pkey.key = 30; +DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR (key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 30)) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR (key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 30)) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20) OR (distributed_table_pkey.key OPERATOR(pg_catalog.=) 30)))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key = ( + SELECT count(*) FROM distributed_table_pkey +); +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS count FROM local_table_join.distributed_table_pkey +DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR (distributed_table_pkey.key OPERATOR(pg_catalog.=) (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)))))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key = 5 and distributed_table_pkey.key > 15); +DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR ((key OPERATOR(pg_catalog.=) 5) AND (key OPERATOR(pg_catalog.>) 15))) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR ((key OPERATOR(pg_catalog.=) 5) AND (key OPERATOR(pg_catalog.>) 15))) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR ((distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) AND (distributed_table_pkey.key OPERATOR(pg_catalog.>) 15))))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key > 10 and distributed_table_pkey.key > 15); +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR ((distributed_table_pkey.key OPERATOR(pg_catalog.>) 10) AND (distributed_table_pkey.key OPERATOR(pg_catalog.>) 15))))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key > 10 and distributed_table_pkey.value = 'notext'); +DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR ((key OPERATOR(pg_catalog.>) 10) AND (value OPERATOR(pg_catalog.=) 'notext'::text))) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE ((key OPERATOR(pg_catalog.=) 10) OR ((key OPERATOR(pg_catalog.>) 10) AND (value OPERATOR(pg_catalog.=) 'notext'::text))) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR ((distributed_table_pkey.key OPERATOR(pg_catalog.>) 10) AND (distributed_table_pkey.value OPERATOR(pg_catalog.=) 'notext'::text))))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key = 10 and distributed_table_pkey.value = 'notext'); +DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON (((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) OR ((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_pkey.value OPERATOR(pg_catalog.=) 'notext'::text))))) + count +--------------------------------------------------------------------- + 0 +(1 row) + -- a unique index on key so dist table should be recursively planned SELECT count(*) FROM postgres_table JOIN distributed_table_windex USING(key); -DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex USING (key)) +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_windex USING (key)) count --------------------------------------------------------------------- 0 @@ -169,9 +299,9 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) SELECT count(*) FROM postgres_table JOIN distributed_table_windex ON postgres_table.key = distributed_table_windex.key; -DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex ON ((postgres_table.key OPERATOR(pg_catalog.=) distributed_table_windex.key))) +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_windex ON ((postgres_table.key OPERATOR(pg_catalog.=) distributed_table_windex.key))) count --------------------------------------------------------------------- 0 @@ -449,7 +579,157 @@ DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, N DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d1 WHERE true OFFSET 0 DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d2 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1, (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d2 WHERE ((postgres_table.key OPERATOR(pg_catalog.=) d1.key) AND (d1.key OPERATOR(pg_catalog.=) d2.key)) -\set VERBOSITY terse +-- currently can't plan subquery-local table join +SELECT count(*) +FROM + (SELECT * FROM (SELECT * FROM distributed_table) d1) d2 +JOIN postgres_table +USING(key); +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT d1.key, d1.value, d1.value_2 FROM (SELECT distributed_table.key, distributed_table.value, distributed_table.value_2 FROM local_table_join.distributed_table) d1) d2 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +--------------------------------------------------------------------- +SET client_min_messages to ERROR; +SELECT master_add_node('localhost', :master_port, groupId => 0); + master_add_node +--------------------------------------------------------------------- + 30 +(1 row) + +CREATE TABLE citus_local(key int, value text); +SELECT create_citus_local_table('citus_local'); + create_citus_local_table +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; +-- same for citus local table - distributed table joins +-- a unique index on key so dist table should be recursively planned +SELECT count(*) FROM citus_local JOIN distributed_table_windex USING(key); +DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM citus_local JOIN distributed_table_windex USING(value); +DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex USING (value)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM citus_local JOIN distributed_table_windex ON citus_local.key = distributed_table_windex.key; +DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex ON ((citus_local.key OPERATOR(pg_catalog.=) distributed_table_windex.key))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM citus_local JOIN distributed_table_windex ON distributed_table_windex.key = 10; +DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex ON ((distributed_table_windex.key OPERATOR(pg_catalog.=) 10))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- no unique index, citus local table should be recursively planned +SELECT count(*) FROM citus_local JOIN distributed_table USING(key); +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM citus_local JOIN distributed_table USING(value); +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (value)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM citus_local JOIN distributed_table ON citus_local.key = distributed_table.key; +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table ON ((citus_local.key OPERATOR(pg_catalog.=) distributed_table.key))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM citus_local JOIN distributed_table ON distributed_table.key = 10; +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table ON ((distributed_table.key OPERATOR(pg_catalog.=) 10))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM citus_local JOIN distributed_table USING(key) JOIN postgres_table USING (key) JOIN reference_table USING(key); +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.citus_local JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (key)) JOIN local_table_join.postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) reference_table USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- update +UPDATE + distributed_table_windex +SET + value = 'test' +FROM + citus_local +WHERE + distributed_table_windex.key = citus_local.key; +DEBUG: Wrapping local relation "citus_local" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.citus_local WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.citus_local WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_windex SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) citus_local WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) citus_local.key) +UPDATE + citus_local +SET + value = 'test' +FROM + distributed_table_windex +WHERE + distributed_table_windex.key = citus_local.key; +DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.citus_local SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) citus_local.key) +DROP TABLE citus_local; +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 15 at PERFORM +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 15 at PERFORM RESET client_min_messages; +SELECT master_remove_node('localhost', :master_port); + master_remove_node +--------------------------------------------------------------------- + +(1 row) + +\set VERBOSITY terse DROP SCHEMA local_table_join CASCADE; -NOTICE: drop cascades to 5 other objects +NOTICE: drop cascades to 6 other objects diff --git a/src/test/regress/sql/local_table_join.sql b/src/test/regress/sql/local_table_join.sql index be1946533..7805e9dda 100644 --- a/src/test/regress/sql/local_table_join.sql +++ b/src/test/regress/sql/local_table_join.sql @@ -1,7 +1,6 @@ CREATE SCHEMA local_table_join; SET search_path TO local_table_join; - CREATE TABLE postgres_table (key int, value text, value_2 jsonb); CREATE TABLE reference_table (key int, value text, value_2 jsonb); SELECT create_reference_table('reference_table'); @@ -52,6 +51,24 @@ SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key); SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(value); SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON postgres_table.key = distributed_table_pkey.key; SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10; +-- it should favor distributed table only if it has equality on the unique column +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key > 10; +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key < 10; +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10; +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 AND distributed_table_pkey.key > 10 ; +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 AND distributed_table_pkey.key > 10 ; +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 AND distributed_table_pkey.key > 10 AND postgres_table.key = 5; + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key > 10; +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key = 20; +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key = 20 OR distributed_table_pkey.key = 30; +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR distributed_table_pkey.key = ( + SELECT count(*) FROM distributed_table_pkey +); +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key = 5 and distributed_table_pkey.key > 15); +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key > 10 and distributed_table_pkey.key > 15); +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key > 10 and distributed_table_pkey.value = 'notext'); +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10 OR (distributed_table_pkey.key = 10 and distributed_table_pkey.value = 'notext'); -- a unique index on key so dist table should be recursively planned @@ -246,7 +263,61 @@ FROM WHERE postgres_table.key = d1.key AND d1.key = d2.key; +-- currently can't plan subquery-local table join +SELECT count(*) +FROM + (SELECT * FROM (SELECT * FROM distributed_table) d1) d2 +JOIN postgres_table +USING(key); -\set VERBOSITY terse + + +--------------------------------------------------------- + +SET client_min_messages to ERROR; +SELECT master_add_node('localhost', :master_port, groupId => 0); + + +CREATE TABLE citus_local(key int, value text); +SELECT create_citus_local_table('citus_local'); +SET client_min_messages TO DEBUG1; + +-- same for citus local table - distributed table joins +-- a unique index on key so dist table should be recursively planned +SELECT count(*) FROM citus_local JOIN distributed_table_windex USING(key); +SELECT count(*) FROM citus_local JOIN distributed_table_windex USING(value); +SELECT count(*) FROM citus_local JOIN distributed_table_windex ON citus_local.key = distributed_table_windex.key; +SELECT count(*) FROM citus_local JOIN distributed_table_windex ON distributed_table_windex.key = 10; + +-- no unique index, citus local table should be recursively planned +SELECT count(*) FROM citus_local JOIN distributed_table USING(key); +SELECT count(*) FROM citus_local JOIN distributed_table USING(value); +SELECT count(*) FROM citus_local JOIN distributed_table ON citus_local.key = distributed_table.key; +SELECT count(*) FROM citus_local JOIN distributed_table ON distributed_table.key = 10; + +SELECT count(*) FROM citus_local JOIN distributed_table USING(key) JOIN postgres_table USING (key) JOIN reference_table USING(key); + +-- update +UPDATE + distributed_table_windex +SET + value = 'test' +FROM + citus_local +WHERE + distributed_table_windex.key = citus_local.key; + +UPDATE + citus_local +SET + value = 'test' +FROM + distributed_table_windex +WHERE + distributed_table_windex.key = citus_local.key; + +DROP TABLE citus_local; RESET client_min_messages; +SELECT master_remove_node('localhost', :master_port); +\set VERBOSITY terse DROP SCHEMA local_table_join CASCADE; diff --git a/src/test/regress/sql/subqueries_not_supported.sql b/src/test/regress/sql/subqueries_not_supported.sql index a91c9a097..1ece06c34 100644 --- a/src/test/regress/sql/subqueries_not_supported.sql +++ b/src/test/regress/sql/subqueries_not_supported.sql @@ -9,9 +9,9 @@ SET client_min_messages TO DEBUG1; CREATE TABLE users_table_local AS SELECT * FROM users_table; --- we don't support subqueries with local tables when they are not leaf queries +-- TODO:: Move this out of this file SELECT - * + COUNT(*) FROM ( SELECT @@ -23,7 +23,7 @@ FROM RESET client_min_messages; -- we don't support subqueries with local tables when they are not leaf queries -SELECT user_id FROM users_table WHERE user_id IN +SELECT COUNT(user_id) FROM users_table WHERE user_id IN (SELECT user_id FROM