From 5618f3a3fcc6f1dc84e7d24bd382b9921bcdfd81 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Mon, 7 Dec 2020 16:55:15 +0300 Subject: [PATCH] Use BaseRestrictInfo for finding equality columns Baseinfo also has pushed down filters etc, so it makes more sense to use BaseRestrictInfo to determine what columns have constant equality filters. Also RteIdentity is used for removing conversion candidates instead of rteIndex. --- src/backend/distributed/commands/index.c | 1 + .../distributed/executor/citus_custom_scan.c | 32 +-- .../distributed/planner/distributed_planner.c | 2 +- .../planner/insert_select_planner.c | 3 +- .../planner/local_distributed_join_planner.c | 147 ++++++------- .../planner/multi_logical_planner.c | 8 +- .../planner/multi_physical_planner.c | 117 +++++++--- .../planner/multi_router_planner.c | 48 +++-- .../distributed/planner/recursive_planning.c | 76 +++++-- .../relation_restriction_equivalence.c | 53 +++-- src/backend/distributed/shared_library_init.c | 2 +- .../distributed/utils/citus_copyfuncs.c | 2 +- .../distributed/utils/citus_outfuncs.c | 1 + .../local_distributed_join_planner.h | 11 +- .../distributed/multi_physical_planner.h | 10 +- .../distributed/multi_router_planner.h | 10 +- src/include/distributed/recursive_planning.h | 22 +- .../relation_restriction_equivalence.h | 11 +- .../expected/local_dist_join_mixed.out | 101 +++++++-- .../regress/expected/multi_partitioning.out | 32 ++- src/test/regress/multi_schedule | 4 +- .../regress/sql/citus_local_dist_joins.sql | 16 +- .../regress/sql/local_dist_join_mixed.sql | 199 ++++++++++-------- .../sql/local_dist_join_modifications.sql | 14 +- 24 files changed, 589 insertions(+), 333 deletions(-) diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index 81a037abc..a44cde0cc 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -249,6 +249,7 @@ CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement) return namespaceId; } + /* * ExecuteFunctionOnEachTableIndex executes the given pgIndexProcessor function on each * index of the given relation. diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index b5617d1c6..eb59b87a3 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -368,22 +368,22 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags) RebuildQueryStrings(workerJob); } - if (workerJob->onDummyPlacement) { - /* if this job is on a dummy placement, then it doesn't operate on - an actual shard placement */ - return; + + /* We skip shard related things if the job contains only local tables */ + if (!OnlyLocalTableJob(workerJob)) + { + /* + * Now that we know the shard ID(s) we can acquire the necessary shard metadata + * locks. Once we have the locks it's safe to load the placement metadata. + */ + + /* prevent concurrent placement changes */ + AcquireMetadataLocks(workerJob->taskList); + + /* modify tasks are always assigned using first-replica policy */ + workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList); } - /* - * Now that we know the shard ID(s) we can acquire the necessary shard metadata - * locks. Once we have the locks it's safe to load the placement metadata. - */ - /* prevent concurrent placement changes */ - AcquireMetadataLocks(workerJob->taskList); - - - /* modify tasks are always assigned using first-replica policy */ - workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList); /* * Now that we have populated the task placements we can determine whether @@ -544,10 +544,12 @@ RegenerateTaskForFasthPathQuery(Job *workerJob) shardId = GetAnchorShardId(shardIntervalList); } + bool containsOnlyLocalTable = false; GenerateSingleShardRouterTaskList(workerJob, relationShardList, placementList, - shardId); + shardId, + containsOnlyLocalTable); } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 6a11bb352..a0c27fc66 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -410,7 +410,7 @@ AdjustPartitioningForDistributedPlanning(List *rangeTableList, /* * We want Postgres to behave partitioned tables as regular relations * (i.e. we do not want to expand them to their partitions). To do this - * we set each distributed partitioned table's inh flag to appropriate + * we set each partitioned table's inh flag to appropriate * value before and after dropping to the standart_planner. */ if (rangeTableEntry->rtekind == RTE_RELATION && diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 5949f1b18..f2da9ef90 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -789,7 +789,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, &relationShardList, &prunedShardIntervalListList, replacePrunedQueryWithDummy, - &multiShardModifyQuery, NULL); + &multiShardModifyQuery, NULL, + false); Assert(!multiShardModifyQuery); diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index 7fdf97af6..650136834 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -63,8 +63,7 @@ int LocalTableJoinPolicy = LOCAL_JOIN_POLICY_AUTO; typedef struct RangeTableEntryDetails { RangeTblEntry *rangeTableEntry; - Index rteIndex; - List *restrictionList; + int rteIdentity; List *requiredAttributeNumbers; bool hasConstantFilterOnUniqueColumn; } RangeTableEntryDetails; @@ -75,9 +74,8 @@ typedef struct ConversionCandidates List *localTableList; /* local or citus local table */ }ConversionCandidates; -static bool HasConstantFilterOnUniqueColumn(FromExpr *joinTree, - RangeTblEntry *rangeTableEntry, Index - rteIndex); +static bool HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry, + RelationRestriction *relationRestriction); static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, PlannerRestrictionContext * plannerRestrictionContext); @@ -92,8 +90,8 @@ static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(FromExpr *joinTree conversionCandidates, PlannerRestrictionContext * plannerRestrictionContext); -static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid - relationId); +static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, + int rteIdentity); static bool AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList); /* @@ -103,17 +101,16 @@ static bool AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList */ void RecursivelyPlanLocalTableJoins(Query *query, - RecursivePlanningContext *context, List* rangeTableList) + RecursivePlanningContext *context, List *rangeTableList) { - PlannerRestrictionContext *plannerRestrictionContext = - context->plannerRestrictionContext; + GetPlannerRestrictionContext(context); Oid resultRelationId = InvalidOid; if (IsModifyCommand(query)) { resultRelationId = ModifyQueryResultRelationId(query); - } + } ConversionCandidates *conversionCandidates = CreateConversionCandidates(query->jointree, plannerRestrictionContext, rangeTableList, resultRelationId); @@ -131,15 +128,15 @@ RecursivelyPlanLocalTableJoins(Query *query, } RangeTblEntry *rangeTableEntry = rangeTableEntryDetails->rangeTableEntry; - Oid relId = rangeTableEntryDetails->rangeTableEntry->relid; - List *restrictionList = rangeTableEntryDetails->restrictionList; List *requiredAttributeNumbers = rangeTableEntryDetails->requiredAttributeNumbers; - ReplaceRTERelationWithRteSubquery(rangeTableEntry, restrictionList, + ReplaceRTERelationWithRteSubquery(rangeTableEntry, requiredAttributeNumbers, context); - RemoveFromConversionCandidates(conversionCandidates, relId); + int rteIdentity = rangeTableEntryDetails->rteIdentity; + RemoveFromConversionCandidates(conversionCandidates, rteIdentity); } } + /* * GetNextRTEToConvertToSubquery returns the range table entry * which should be converted to a subquery. It considers the local join policy @@ -211,18 +208,33 @@ AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList) * the relevant list based on the relation id. */ static void -RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid relationId) +RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, int + rteIdentity) { - if (IsRelationLocalTableOrMatView(relationId)) + RangeTableEntryDetails *rangeTableEntryDetails = NULL; + foreach_ptr(rangeTableEntryDetails, conversionCandidates->localTableList) { - conversionCandidates->localTableList = - list_delete_first(conversionCandidates->localTableList); + if (rangeTableEntryDetails->rteIdentity == rteIdentity) + { + conversionCandidates->localTableList = + list_delete_ptr(conversionCandidates->localTableList, + rangeTableEntryDetails); + return; + } } - else + + foreach_ptr(rangeTableEntryDetails, conversionCandidates->distributedTableList) { - conversionCandidates->distributedTableList = - list_delete_first(conversionCandidates->distributedTableList); + if (rangeTableEntryDetails->rteIdentity == rteIdentity) + { + conversionCandidates->distributedTableList = + list_delete_ptr(conversionCandidates->distributedTableList, + rangeTableEntryDetails); + return; + } } + + ereport(ERROR, (errmsg("invalid rte index is given :%d", rteIdentity))); } @@ -241,7 +253,7 @@ ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList, return false; } - if (!ContainsTableToBeConvertedToSubquery(rangeTableList)) + if (!ContainsTableToBeConvertedToSubquery(rangeTableList)) { return false; } @@ -251,7 +263,7 @@ ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList, if (IsRouterPlannable(query, plannerRestrictionContext)) { ereport(DEBUG1, (errmsg("local-distributed table joins will not be converted, " - "as the query is router plannable"))); + "as the query is router plannable"))); return false; } return true; @@ -263,32 +275,22 @@ ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList, * filter on a unique column. */ static bool -HasConstantFilterOnUniqueColumn(FromExpr *joinTree, RangeTblEntry *rangeTableEntry, Index - rteIndex) +HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry, + RelationRestriction *relationRestriction) { if (rangeTableEntry == NULL) { return false; } + List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo; + List *restrictClauseList = get_all_actual_clauses(baseRestrictionList); List *rteEqualityQuals = - FetchEqualityAttrNumsForRTEFromQuals(joinTree->quals, rteIndex); + FetchEqualityAttrNumsForRTE((Node *) restrictClauseList); - Node *join = NULL; - foreach_ptr(join, joinTree->fromlist) - { - if (IsA(join, JoinExpr)) - { - JoinExpr *joinExpr = (JoinExpr *) join; - List *joinExprEqualityQuals = - FetchEqualityAttrNumsForRTEFromQuals(joinExpr->quals, rteIndex); - rteEqualityQuals = list_concat(rteEqualityQuals, joinExprEqualityQuals); - } - } - - List *uniqueIndexes = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid, - GetAllUniqueIndexes); + List *uniqueIndexAttrNumbers = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid, + GetAllUniqueIndexes); int columnNumber = 0; - foreach_int(columnNumber, uniqueIndexes) + foreach_int(columnNumber, uniqueIndexAttrNumbers) { if (list_member_int(rteEqualityQuals, columnNumber)) { @@ -327,38 +329,28 @@ GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes) * WHERE clause as a filter (e.g., not a join clause). */ static List * -RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, +RequiredAttrNumbersForRelation(RangeTblEntry *rangeTableEntry, PlannerRestrictionContext *plannerRestrictionContext) { - int rteIdentity = GetRTEIdentity(relationRte); - RelationRestrictionContext *relationRestrictionContext = - plannerRestrictionContext->relationRestrictionContext; - Relids queryRteIdentities = bms_make_singleton(rteIdentity); - RelationRestrictionContext *filteredRelationRestrictionContext = - FilterRelationRestrictionContext(relationRestrictionContext, queryRteIdentities); - List *filteredRelationRestrictionList = - filteredRelationRestrictionContext->relationRestrictionList; + RelationRestriction *relationRestriction = + RelationRestrictionForRelation(rangeTableEntry, plannerRestrictionContext); - if (list_length(filteredRelationRestrictionList) != 1) + if (relationRestriction == NULL) { return NIL; } - RelationRestriction *relationRestriction = - (RelationRestriction *) linitial(filteredRelationRestrictionList); PlannerInfo *plannerInfo = relationRestriction->plannerInfo; Query *queryToProcess = plannerInfo->parse; int rteIndex = relationRestriction->index; List *allVarsInQuery = pull_vars_of_level((Node *) queryToProcess, 0); - ListCell *varCell = NULL; List *requiredAttrNumbers = NIL; - foreach(varCell, allVarsInQuery) + Var *var = NULL; + foreach_ptr(var, allVarsInQuery) { - Var *var = (Var *) lfirst(varCell); - if (var->varno == rteIndex) { requiredAttrNumbers = list_append_unique_int(requiredAttrNumbers, @@ -379,15 +371,12 @@ CreateConversionCandidates(FromExpr *joinTree, PlannerRestrictionContext *plannerRestrictionContext, List *rangeTableList, Oid resultRelationId) { - ConversionCandidates *conversionCandidates = palloc0( - sizeof(ConversionCandidates)); + ConversionCandidates *conversionCandidates = + palloc0(sizeof(ConversionCandidates)); - int rteIndex = 0; RangeTblEntry *rangeTableEntry = NULL; foreach_ptr(rangeTableEntry, rangeTableList) { - rteIndex++; - /* we're only interested in tables */ if (!IsRecursivelyPlannableRelation(rangeTableEntry)) { @@ -400,23 +389,27 @@ CreateConversionCandidates(FromExpr *joinTree, continue; } + RelationRestriction *relationRestriction = + RelationRestrictionForRelation(rangeTableEntry, plannerRestrictionContext); + if (relationRestriction == NULL) + { + continue; + } + int rteIdentity = GetRTEIdentity(rangeTableEntry); + RangeTableEntryDetails *rangeTableEntryDetails = palloc0(sizeof(RangeTableEntryDetails)); - rangeTableEntryDetails->rangeTableEntry = rangeTableEntry; - rangeTableEntryDetails->rteIndex = rteIndex; - rangeTableEntryDetails->restrictionList = GetRestrictInfoListForRelation( - rangeTableEntry, plannerRestrictionContext); - rangeTableEntryDetails->requiredAttributeNumbers = RequiredAttrNumbersForRelation( - rangeTableEntry, plannerRestrictionContext); - rangeTableEntryDetails->hasConstantFilterOnUniqueColumn = - HasConstantFilterOnUniqueColumn(joinTree, - rangeTableEntry, - rteIndex); - bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid, - REFERENCE_TABLE) || - IsCitusTableType(rangeTableEntry->relid, - DISTRIBUTED_TABLE); + rangeTableEntryDetails->rangeTableEntry = rangeTableEntry; + rangeTableEntryDetails->rteIdentity = rteIdentity; + rangeTableEntryDetails->requiredAttributeNumbers = + RequiredAttrNumbersForRelation(rangeTableEntry, plannerRestrictionContext); + rangeTableEntryDetails->hasConstantFilterOnUniqueColumn = + HasConstantFilterOnUniqueColumn(rangeTableEntry, relationRestriction); + + bool referenceOrDistributedTable = + IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE) || + IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE); if (referenceOrDistributedTable) { conversionCandidates->distributedTableList = diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 02e780aa1..8d0289d7c 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -340,7 +340,13 @@ IsCitusTableRTE(Node *node) bool IsDistributedOrReferenceTableRTE(Node *node) { - return IsDistributedTableRTE(node) || IsReferenceTableRTE(node); + Oid relationId = NodeTryGetRteRelid(node); + if (!OidIsValid(relationId)) + { + return false; + } + return IsCitusTableType(relationId, DISTRIBUTED_TABLE) || + IsCitusTableType(relationId, REFERENCE_TABLE); } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 01802d8ec..ccfff1215 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -234,9 +234,9 @@ static StringInfo ColumnTypeArrayString(List *targetEntryList); static bool CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval); -static List * FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr, Index rteIndex); -static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr, Index rteIndex); - +static List * FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr); +static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr); +static List * FetchEqualityAttrNumsForList(List *nodeList); #if PG_VERSION_NUM >= PG_VERSION_13 static List * GetColumnOriginalIndexes(Oid relationId); #endif @@ -271,6 +271,27 @@ CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree, } +/* + * OnlyLocalTableJob true if the given task contains + * only postgres tables + */ +bool +OnlyLocalTableJob(Job *job) +{ + if (job == NULL) + { + return false; + } + List *taskList = job->taskList; + if (list_length(taskList) != 1) + { + return false; + } + Task *singleTask = (Task *) linitial(taskList); + return singleTask->containsOnlyLocalTable; +} + + /* * BuildJobTree builds the physical job tree from the given logical plan tree. * The function walks over the logical plan from the bottom up, finds boundaries @@ -2113,9 +2134,10 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction prunedRelationShardList, READ_TASK, false, &deferredErrorMessage); - if (deferredErrorMessage != NULL) { + if (deferredErrorMessage != NULL) + { RaiseDeferredErrorInternal(deferredErrorMessage, ERROR); - } + } } else { @@ -2209,10 +2231,10 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, if (list_length(relationRestrictionContext->relationRestrictionList) == 0) { *planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot handle complex subqueries when the " - "router executor is disabled", - NULL, NULL); - return NIL; + "cannot handle complex subqueries when the " + "router executor is disabled", + NULL, NULL); + return NIL; } /* defaults to be used if this is a reference table-only query */ @@ -2238,9 +2260,9 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, if (shardCount > 0 && shardCount != cacheEntry->shardIntervalArrayLength) { *planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "shard counts of co-located tables do not " - "match", - NULL, NULL); + "shard counts of co-located tables do not " + "match", + NULL, NULL); return NIL; } @@ -2306,9 +2328,10 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, taskType, modifyRequiresCoordinatorEvaluation, planningError); - if (*planningError != NULL) { + if (*planningError != NULL) + { return NIL; - } + } subqueryTask->jobId = jobId; sqlTaskList = lappend(sqlTaskList, subqueryTask); @@ -2565,11 +2588,11 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, if (list_length(taskPlacementList) == 0) { *planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot find a worker that has active placements for all " - "shards in the query", - NULL, NULL); + "cannot find a worker that has active placements for all " + "shards in the query", + NULL, NULL); - return NULL; + return NULL; } /* @@ -3615,26 +3638,58 @@ NodeIsRangeTblRefReferenceTable(Node *node, List *rangeTableList) /* - * FetchEqualityAttrNumsForRTEFromQuals fetches the attribute numbers from quals + * FetchEqualityAttrNumsForRTE fetches the attribute numbers from quals * which: * - has equality operator * - belongs to rangeTableEntry with rteIndex */ List * -FetchEqualityAttrNumsForRTEFromQuals(Node *quals, Index rteIndex) +FetchEqualityAttrNumsForRTE(Node *node) { - if (quals == NULL) + if (node == NULL) { return NIL; } - - if (IsA(quals, OpExpr)) + if (IsA(node, List)) { - return FetchEqualityAttrNumsForRTEOpExpr((OpExpr *) quals, rteIndex); + return FetchEqualityAttrNumsForList((List *) node); } - else if (IsA(quals, BoolExpr)) + else if (IsA(node, OpExpr)) { - return FetchEqualityAttrNumsForRTEBoolExpr((BoolExpr *) quals, rteIndex); + return FetchEqualityAttrNumsForRTEOpExpr((OpExpr *) node); + } + else if (IsA(node, BoolExpr)) + { + return FetchEqualityAttrNumsForRTEBoolExpr((BoolExpr *) node); + } + return NIL; +} + +/* + * FetchEqualityAttrNumsForList fetches the constant equality numbers + * from the given node list. + */ +static List *FetchEqualityAttrNumsForList(List *nodeList) +{ + List *attributeNums = NIL; + Node *node = NULL; + bool hasAtLeastOneEquality = false; + foreach_ptr(node, nodeList) + { + List *fetchedEqualityAttrNums = + FetchEqualityAttrNumsForRTE(node); + hasAtLeastOneEquality |= list_length(fetchedEqualityAttrNums) > 0; + attributeNums = list_concat(attributeNums, fetchedEqualityAttrNums); + } + + /* + * the given list is in the form of AND'ed expressions + * hence if we have one equality then it is enough. + * E.g: dist.a = 5 AND dist.a > 10 + */ + if (hasAtLeastOneEquality) + { + return attributeNums; } return NIL; } @@ -3647,7 +3702,7 @@ FetchEqualityAttrNumsForRTEFromQuals(Node *quals, Index rteIndex) * - belongs to rangeTableEntry with rteIndex */ static List * -FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr, Index rteIndex) +FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr) { if (!OperatorImplementsEquality(opExpr->opno)) { @@ -3656,7 +3711,7 @@ FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr, Index rteIndex) List *attributeNums = NIL; Var *var = NULL; - if (VarConstOpExprClause(opExpr, &var, NULL) && var->varno == rteIndex) + if (VarConstOpExprClause(opExpr, &var, NULL)) { attributeNums = lappend_int(attributeNums, var->varattno); } @@ -3671,7 +3726,7 @@ FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr, Index rteIndex) * - belongs to rangeTableEntry with rteIndex */ static List * -FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr, Index rteIndex) +FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr) { if (boolExpr->boolop != AND_EXPR && boolExpr->boolop != OR_EXPR) { @@ -3683,8 +3738,7 @@ FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr, Index rteIndex) Node *arg = NULL; foreach_ptr(arg, boolExpr->args) { - List *attributeNumsInSubExpression = FetchEqualityAttrNumsForRTEFromQuals(arg, - rteIndex); + List *attributeNumsInSubExpression = FetchEqualityAttrNumsForRTE(arg); if (boolExpr->boolop == AND_EXPR) { hasEquality |= list_length(attributeNumsInSubExpression) > 0; @@ -5466,7 +5520,6 @@ ActiveShardPlacementLists(List *taskList) { Task *task = (Task *) lfirst(taskCell); uint64 anchorShardId = task->anchorShardId; - List *shardPlacementList = ActiveShardPlacementList(anchorShardId); /* filter out shard placements that reside in inactive nodes */ diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 01ad53a6e..000061d9c 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -170,7 +170,8 @@ static int CompareInsertValuesByShardId(const void *leftElement, const void *rightElement); static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, - uint64 shardId, bool parametersInQueryResolved); + uint64 shardId, bool parametersInQueryResolved, + bool containsOnlyLocalTable); static bool RowLocksOnRelations(Node *node, List **rtiLockList); static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType @@ -1717,6 +1718,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon /* router planner should create task even if it doesn't hit a shard at all */ bool replacePrunedQueryWithDummy = true; + bool containsOnlyLocalTable = false; + /* check if this query requires coordinator evaluation */ bool requiresCoordinatorEvaluation = RequiresCoordinatorEvaluation(originalQuery); FastPathRestrictionContext *fastPathRestrictionContext = @@ -1744,7 +1747,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon &prunedShardIntervalListList, replacePrunedQueryWithDummy, &isMultiShardModifyQuery, - &partitionKeyValue); + &partitionKeyValue, + &containsOnlyLocalTable); } if (*planningError) @@ -1754,7 +1758,6 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon Job *job = CreateJob(originalQuery); job->partitionKeyValue = partitionKeyValue; - job->onDummyPlacement = replacePrunedQueryWithDummy; if (originalQuery->resultRelation > 0) { @@ -1783,14 +1786,15 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon MODIFY_TASK, requiresCoordinatorEvaluation, planningError); - if (*planningError) { + if (*planningError) + { return NULL; - } + } } else { GenerateSingleShardRouterTaskList(job, relationShardList, - placementList, shardId); + placementList, shardId, containsOnlyLocalTable); } job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation; @@ -1806,7 +1810,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon */ void GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, - List *placementList, uint64 shardId) + List *placementList, uint64 shardId, bool + containsOnlyLocalTable) { Query *originalQuery = job->jobQuery; @@ -1815,7 +1820,9 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, job->taskList = SingleShardTaskList(originalQuery, job->jobId, relationShardList, placementList, shardId, - job->parametersInJobQueryResolved); + job->parametersInJobQueryResolved, + containsOnlyLocalTable); + /* * Queries to reference tables, or distributed tables with multiple replica's have * their task placements reordered according to the configured @@ -1831,7 +1838,7 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, placementList); } } - else if (shardId == INVALID_SHARD_ID && !job->onDummyPlacement) + else if (shardId == INVALID_SHARD_ID && !containsOnlyLocalTable) { /* modification that prunes to 0 shards */ job->taskList = NIL; @@ -1841,7 +1848,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, job->taskList = SingleShardTaskList(originalQuery, job->jobId, relationShardList, placementList, shardId, - job->parametersInJobQueryResolved); + job->parametersInJobQueryResolved, + containsOnlyLocalTable); } } @@ -1934,7 +1942,8 @@ RemoveCoordinatorPlacementIfNotSingleNode(List *placementList) static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, - bool parametersInQueryResolved) + bool parametersInQueryResolved, + bool containsOnlyLocalTable) { TaskType taskType = READ_TASK; char replicationModel = 0; @@ -1993,6 +2002,7 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, } Task *task = CreateTask(taskType); + task->containsOnlyLocalTable = containsOnlyLocalTable; List *relationRowLockList = NIL; RowLocksOnRelations((Node *) query, &relationRowLockList); @@ -2104,6 +2114,8 @@ SelectsFromDistributedTable(List *rangeTableList, Query *query) } +static bool ContainsOnlyLocalTables(RTEListProperties *rteProperties); + /* * RouterQuery runs router pruning logic for SELECT, UPDATE and DELETE queries. * If there are shards present and query is routable, all RTEs have been updated @@ -2131,7 +2143,8 @@ PlanRouterQuery(Query *originalQuery, List **placementList, uint64 *anchorShardId, List **relationShardList, List **prunedShardIntervalListList, bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery, - Const **partitionValueConst) + Const **partitionValueConst, + bool *containsOnlyLocalTable) { bool isMultiShardQuery = false; DeferredErrorMessage *planningError = NULL; @@ -2247,6 +2260,10 @@ PlanRouterQuery(Query *originalQuery, /* both Postgres tables and materialized tables are locally avaliable */ RTEListProperties *rteProperties = GetRTEListPropertiesForQuery(originalQuery); + if (shardId == INVALID_SHARD_ID && ContainsOnlyLocalTables(rteProperties)) + { + *containsOnlyLocalTable = true; + } bool hasPostgresLocalRelation = rteProperties->hasPostgresLocalTable || rteProperties->hasMaterializedView; List *taskPlacementList = @@ -2280,6 +2297,13 @@ PlanRouterQuery(Query *originalQuery, } +static bool +ContainsOnlyLocalTables(RTEListProperties *rteProperties) +{ + return !rteProperties->hasDistributedTable && !rteProperties->hasReferenceTable; +} + + /* * CreateTaskPlacementListForShardIntervals returns a list of shard placements * on which it can access all shards in shardIntervalListList, which contains diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index a0c0a7b1d..fa7dbea9e 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -100,6 +100,20 @@ #include "utils/guc.h" #include "utils/lsyscache.h" +/* + * RecursivePlanningContext is used to recursively plan subqueries + * and CTEs, pull results to the coordinator, and push it back into + * the workers. + */ +struct RecursivePlanningContextInternal +{ + int level; + uint64 planId; + bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */ + List *subPlanList; + PlannerRestrictionContext *plannerRestrictionContext; +}; + /* track depth of current recursive planner query */ static int recursivePlanningDepth = 0; @@ -159,7 +173,7 @@ static bool AllDistributionKeysInSubqueryAreEqual(Query *subquery, static bool IsTableLocallyAccessible(Oid relationId); static bool ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *context); -static void RecursivelyPlanSubquery(Query *subquery, +static bool RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningContext); static void RecursivelyPlanSetOperations(Query *query, Node *node, RecursivePlanningContext *context); @@ -178,11 +192,10 @@ static Query * BuildReadIntermediateResultsQuery(List *targetEntryList, List *columnAliasList, Const *resultIdConst, Oid functionOid, bool useBinaryCopyFormat); -static void -UpdateVarNosInNode(Query *query, Index newVarNo); +static void UpdateVarNosInNode(Query *query, Index newVarNo); static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList); static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList, - List **joinRangeTableEntries); + List **joinRangeTableEntries); /* * GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs. @@ -347,22 +360,34 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context &rangeTableList); if (ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList, - plannerRestrictionContext)) { + plannerRestrictionContext)) + { /* - * Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", or - * a query with local table/citus local table and subquery. We convert local/citus local - * tables to a subquery until they can be planned. - * This is the last call in this function since we want the other calls to be finished - * so that we can check if the current plan is router plannable at any step within this function. - */ + * Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", or + * a query with local table/citus local table and subquery. We convert local/citus local + * tables to a subquery until they can be planned. + * This is the last call in this function since we want the other calls to be finished + * so that we can check if the current plan is router plannable at any step within this function. + */ RecursivelyPlanLocalTableJoins(query, context, rangeTableList); - } return NULL; } + +/* + * GetPlannerRestrictionContext returns the planner restriction context + * from the given context. + */ +PlannerRestrictionContext * +GetPlannerRestrictionContext(RecursivePlanningContext *recursivePlanningContext) +{ + return recursivePlanningContext->plannerRestrictionContext; +} + + /* * GetRangeTableEntriesFromJoinTree gets the range table entries that are * on the given join tree. @@ -407,7 +432,6 @@ GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList, } - /* * ShouldRecursivelyPlanNonColocatedSubqueries returns true if the input query contains joins * that are not on the distribution key. @@ -1180,7 +1204,7 @@ IsRelationLocalTableOrMatView(Oid relationId) * and immediately returns. Later, the planner decides on what to do * with the query. */ -static void +static bool RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningContext) { uint64 planId = planningContext->planId; @@ -1191,7 +1215,7 @@ RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningConte elog(DEBUG2, "skipping recursive planning for the subquery since it " "contains references to outer queries"); - return; + return false; } /* @@ -1234,6 +1258,7 @@ RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningConte /* finally update the input subquery to point the result query */ *subquery = *resultQuery; + return true; } @@ -1419,16 +1444,20 @@ NodeContainsSubqueryReferencingOuterQuery(Node *node) /* * ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry * with a subquery. The function also pushes down the filters to the subquery. - * + * * It then recursively plans the subquery. */ void -ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList, +ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *requiredAttrNumbers, RecursivePlanningContext *context) { Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers); - Expr *andedBoundExpressions = make_ands_explicit(restrictionList); + List *restrictionList = + GetRestrictInfoListForRelation(rangeTableEntry, + context->plannerRestrictionContext); + List *copyRestrictionList = copyObject(restrictionList); + Expr *andedBoundExpressions = make_ands_explicit(copyRestrictionList); subquery->jointree->quals = (Node *) andedBoundExpressions; UpdateVarNosInNode(subquery, SINGLE_RTE_INDEX); @@ -1456,7 +1485,13 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict } /* as we created the subquery, now forcefully recursively plan it */ - RecursivelyPlanSubquery(rangeTableEntry->subquery, context); + bool recursivellyPlanned = RecursivelyPlanSubquery(rangeTableEntry->subquery, + context); + if (!recursivellyPlanned) + { + ereport(ERROR, (errmsg( + "unexpected state: query should have been recursively planned"))); + } } @@ -1610,6 +1645,7 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList) return containsLocalTable && containsDistributedTable; } + /* * WrapFunctionsInSubqueries iterates over all the immediate Range Table Entries * of a query and wraps the functions inside (SELECT * FROM fnc() f) @@ -1733,7 +1769,6 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry) subquery->targetList = lappend(subquery->targetList, targetEntry); } } - /* * If tupleDesc is NULL we have 2 different cases: * @@ -1783,7 +1818,6 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry) columnType = list_nth_oid(rangeTblFunction->funccoltypes, targetColumnIndex); } - /* use the types in the function definition otherwise */ else { diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index ea4fe336d..cf1c5a59d 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -1848,34 +1848,25 @@ List * GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, PlannerRestrictionContext *plannerRestrictionContext) { - int rteIdentity = GetRTEIdentity(rangeTblEntry); - RelationRestrictionContext *relationRestrictionContext = - plannerRestrictionContext->relationRestrictionContext; - Relids queryRteIdentities = bms_make_singleton(rteIdentity); - RelationRestrictionContext *filteredRelationRestrictionContext = - FilterRelationRestrictionContext(relationRestrictionContext, queryRteIdentities); - List *filteredRelationRestrictionList = - filteredRelationRestrictionContext->relationRestrictionList; - - if (list_length(filteredRelationRestrictionList) != 1) + RelationRestriction *relationRestriction = + RelationRestrictionForRelation(rangeTblEntry, plannerRestrictionContext); + if (relationRestriction == NULL) { return NIL; } - RelationRestriction *relationRestriction = - (RelationRestriction *) linitial(filteredRelationRestrictionList); - RelOptInfo *relOptInfo = relationRestriction->relOptInfo; - List *baseRestrictInfo = relOptInfo->baserestrictinfo; List *joinRestrictInfo = relOptInfo->joininfo; + List *baseRestrictInfo = relOptInfo->baserestrictinfo; - List *joinRrestrictClauseList = get_all_actual_clauses(joinRestrictInfo); - if (ContainsFalseClause(joinRrestrictClauseList)) + List *joinRestrictClauseList = get_all_actual_clauses(joinRestrictInfo); + if (ContainsFalseClause(joinRestrictClauseList)) { /* found WHERE false, no need to continue */ - return copyObject((List *) joinRrestrictClauseList); + return copyObject((List *) joinRestrictClauseList); } + List *restrictExprList = NIL; RestrictInfo *restrictInfo = NULL; foreach_ptr(restrictInfo, baseRestrictInfo) @@ -1919,6 +1910,34 @@ GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, } +/* + * RelationRestrictionForRelation gets the relation restriction for the given + * range table entry. + */ +RelationRestriction * +RelationRestrictionForRelation(RangeTblEntry *rangeTableEntry, + PlannerRestrictionContext *plannerRestrictionContext) +{ + int rteIdentity = GetRTEIdentity(rangeTableEntry); + RelationRestrictionContext *relationRestrictionContext = + plannerRestrictionContext->relationRestrictionContext; + Relids queryRteIdentities = bms_make_singleton(rteIdentity); + RelationRestrictionContext *filteredRelationRestrictionContext = + FilterRelationRestrictionContext(relationRestrictionContext, queryRteIdentities); + List *filteredRelationRestrictionList = + filteredRelationRestrictionContext->relationRestrictionList; + + if (list_length(filteredRelationRestrictionList) != 1) + { + return NULL; + } + + RelationRestriction *relationRestriction = + (RelationRestriction *) linitial(filteredRelationRestrictionList); + return relationRestriction; +} + + /* * IsParam determines whether the given node is a param. */ diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index c1b4a7b0b..15a2ca416 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -720,7 +720,7 @@ RegisterCitusConfigVariables(void) PGC_SIGHUP, GUC_SUPERUSER_ONLY, NULL, NULL, LocalPoolSizeGucShowHook); - + DefineCustomEnumVariable( "citus.local_table_join_policy", gettext_noop("defines the behaviour when a distributed table " diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 698e96449..763f5e910 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -101,7 +101,6 @@ copyJobInfo(Job *newnode, Job *from) COPY_NODE_FIELD(partitionKeyValue); COPY_NODE_FIELD(localPlannedStatements); COPY_SCALAR_FIELD(parametersInJobQueryResolved); - COPY_SCALAR_FIELD(onDummyPlacement); } @@ -328,6 +327,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(fetchedExplainAnalyzePlacementIndex); COPY_STRING_FIELD(fetchedExplainAnalyzePlan); COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration); + COPY_SCALAR_FIELD(containsOnlyLocalTable); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 9adbca31a..e359fe7c3 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -540,6 +540,7 @@ OutTask(OUTFUNC_ARGS) WRITE_INT_FIELD(fetchedExplainAnalyzePlacementIndex); WRITE_STRING_FIELD(fetchedExplainAnalyzePlan); WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f"); + WRITE_BOOL_FIELD(containsOnlyLocalTable); } diff --git a/src/include/distributed/local_distributed_join_planner.h b/src/include/distributed/local_distributed_join_planner.h index 88fa5e1be..041cb63fd 100644 --- a/src/include/distributed/local_distributed_join_planner.h +++ b/src/include/distributed/local_distributed_join_planner.h @@ -27,11 +27,12 @@ typedef enum extern int LocalTableJoinPolicy; -extern bool -ShouldConvertLocalTableJoinsToSubqueries(Query *query, - List *rangeTableList, - PlannerRestrictionContext *plannerRestrictionContext); +extern bool ShouldConvertLocalTableJoinsToSubqueries(Query *query, + List *rangeTableList, + PlannerRestrictionContext * + plannerRestrictionContext); extern void RecursivelyPlanLocalTableJoins(Query *query, - RecursivePlanningContext *context, List *rangeTableList); + RecursivePlanningContext *context, + List *rangeTableList); #endif /* LOCAL_DISTRIBUTED_JOIN_PLANNER_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index bdd38cb54..125e7406f 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -163,7 +163,6 @@ typedef struct Job * query. */ bool parametersInJobQueryResolved; - bool onDummyPlacement; } Job; @@ -329,6 +328,11 @@ typedef struct Task * ExplainTaskList(). */ double fetchedExplainAnalyzeExecutionDuration; + + /* + * containsOnlyLocalTable is true if the task contains only postgres table/MV. + */ + bool containsOnlyLocalTable; } Task; @@ -579,6 +583,8 @@ extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId, bool modifyRequiresCoordinatorEvaluation, DeferredErrorMessage **planningError); +extern bool OnlyLocalTableJob(Job *job); + /* function declarations for managing jobs */ extern uint64 UniqueJobId(void); @@ -591,6 +597,6 @@ extern RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *column List *funcColumnTypeMods, List *funcCollations); -extern List * FetchEqualityAttrNumsForRTEFromQuals(Node *quals, Index rteIndex); +extern List * FetchEqualityAttrNumsForRTE(Node *quals); #endif /* MULTI_PHYSICAL_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index a6b6b6152..d16ad2d3e 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -42,7 +42,8 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, List **prunedShardIntervalListList, bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery, - Const **partitionValueConst); + Const **partitionValueConst, + bool *containOnlyLocalTable); extern List * RelationShardListForShardIntervalList(List *shardIntervalList, bool *shardsPresent); extern List * CreateTaskPlacementListForShardIntervals(List *shardIntervalList, @@ -83,9 +84,10 @@ extern List * TargetShardIntervalForFastPathQuery(Query *query, Const *inputDistributionKeyValue, Const **outGoingPartitionValueConst); extern void GenerateSingleShardRouterTaskList(Job *job, - List *relationShardList, - List *placementList, - uint64 shardId); + List *relationShardList, + List *placementList, + uint64 shardId, + bool containsOnlyLocalTable); extern bool IsRouterPlannable(Query *query, PlannerRestrictionContext *plannerRestrictionContext); diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index ed85f88ed..24aa3d08b 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -22,21 +22,16 @@ #include "nodes/relation.h" #endif -/* - * RecursivePlanningContext is used to recursively plan subqueries - * and CTEs, pull results to the coordinator, and push it back into - * the workers. - */ -typedef struct RecursivePlanningContext +typedef struct RecursivePlanningContextInternal RecursivePlanningContext; + +typedef struct RangeTblEntryIndex { - int level; - uint64 planId; - bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */ - List *subPlanList; - PlannerRestrictionContext *plannerRestrictionContext; -} RecursivePlanningContext; - + RangeTblEntry *rangeTableEntry; + Index rteIndex; +}RangeTblEntryIndex; +extern PlannerRestrictionContext * GetPlannerRestrictionContext( + RecursivePlanningContext *recursivePlanningContext); extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); @@ -50,7 +45,6 @@ extern Query * BuildReadIntermediateResultsArrayQuery(List *targetEntryList, extern bool GeneratingSubplans(void); extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList); extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, - List *restrictionList, List *requiredAttrNumbers, RecursivePlanningContext *context); extern bool ContainsTableToBeConvertedToSubquery(List *rangeTableList); diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index e9bdb9adc..bfa650c0e 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -41,13 +41,18 @@ extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery( extern List * GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, PlannerRestrictionContext * plannerRestrictionContext); +extern RelationRestriction * RelationRestrictionForRelation( + RangeTblEntry *rangeTableEntry, + PlannerRestrictionContext * + plannerRestrictionContext); extern JoinRestrictionContext * RemoveDuplicateJoinRestrictions(JoinRestrictionContext * joinRestrictionContext); extern bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList, RelationRestrictionContext * restrictionContext); -extern RelationRestrictionContext * -FilterRelationRestrictionContext(RelationRestrictionContext *relationRestrictionContext, - Relids queryRteIdentities); +extern RelationRestrictionContext * FilterRelationRestrictionContext( + RelationRestrictionContext *relationRestrictionContext, + Relids + queryRteIdentities); #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ diff --git a/src/test/regress/expected/local_dist_join_mixed.out b/src/test/regress/expected/local_dist_join_mixed.out index 95cf94ff9..b7d40fd7c 100644 --- a/src/test/regress/expected/local_dist_join_mixed.out +++ b/src/test/regress/expected/local_dist_join_mixed.out @@ -446,9 +446,11 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum((d1.id OP (1 row) SELECT count(*) FROM distributed d1 JOIN local USING (id) LEFT JOIN distributed d2 USING (id) WHERE d2.id = 1 ORDER BY 1 DESC LIMIT 4; -DEBUG: Wrapping relation "local" to a subquery: SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE (id OPERATOR(pg_catalog.=) 1) -DEBUG: generating subplan XXX_1 for subquery SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE (id OPERATOR(pg_catalog.=) 1) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_dist_join_mixed.distributed d1 JOIN (SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local USING (id)) LEFT JOIN local_dist_join_mixed.distributed d2 USING (id)) WHERE (d2.id OPERATOR(pg_catalog.=) 1) ORDER BY (count(*)) DESC LIMIT 4 +DEBUG: Wrapping relation "distributed" to a subquery: SELECT id, NULL::text AS name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed d1 WHERE (id OPERATOR(pg_catalog.=) 1) +DEBUG: generating subplan XXX_1 for subquery SELECT id, NULL::text AS name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed d1 WHERE (id OPERATOR(pg_catalog.=) 1) +DEBUG: Wrapping relation "distributed" to a subquery: SELECT id, NULL::text AS name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed d2 WHERE (id OPERATOR(pg_catalog.=) 1) +DEBUG: generating subplan XXX_2 for subquery SELECT id, NULL::text AS name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed d2 WHERE (id OPERATOR(pg_catalog.=) 1) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) d1 JOIN local_dist_join_mixed.local USING (id)) LEFT JOIN (SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) d2 USING (id)) WHERE (d2.id OPERATOR(pg_catalog.=) 1) ORDER BY (count(*)) DESC LIMIT 4 count --------------------------------------------------------------------- 1 @@ -1171,9 +1173,9 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c -- w count(*) it works fine as PG ignores the inner tables SELECT count(*) FROM distributed LEFT JOIN local USING (id); -DEBUG: Wrapping relation "local" to a subquery: SELECT NULL::bigint AS id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true -DEBUG: generating subplan XXX_1 for subquery SELECT NULL::bigint AS id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_dist_join_mixed.distributed LEFT JOIN (SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local USING (id)) +DEBUG: Wrapping relation "distributed" to a subquery: SELECT id, NULL::text AS name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed WHERE true +DEBUG: generating subplan XXX_1 for subquery SELECT id, NULL::text AS name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) distributed LEFT JOIN local_dist_join_mixed.local USING (id)) count --------------------------------------------------------------------- 101 @@ -1188,20 +1190,19 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 101 (1 row) -SELECT id, name FROM distributed LEFT JOIN local USING (id) LIMIT 1; -DEBUG: Wrapping relation "local" to a subquery: SELECT NULL::bigint AS id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true -DEBUG: generating subplan XXX_1 for subquery SELECT NULL::bigint AS id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT distributed.id, distributed.name FROM (local_dist_join_mixed.distributed LEFT JOIN (SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local USING (id)) LIMIT 1 -DEBUG: push down of limit count: 1 +SELECT id, name FROM distributed LEFT JOIN local USING (id) ORDER BY 1 LIMIT 1; +DEBUG: Wrapping relation "distributed" to a subquery: SELECT id, name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed WHERE true +DEBUG: generating subplan XXX_1 for subquery SELECT id, name, NULL::timestamp with time zone AS created_at FROM local_dist_join_mixed.distributed WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT distributed.id, distributed.name FROM ((SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) distributed LEFT JOIN local_dist_join_mixed.local USING (id)) ORDER BY distributed.id LIMIT 1 id | name --------------------------------------------------------------------- - 1 | 1 + 0 | 0 (1 row) -SELECT id, name FROM local LEFT JOIN distributed USING (id) LIMIT 1; +SELECT id, name FROM local LEFT JOIN distributed USING (id) ORDER BY 1 LIMIT 1; DEBUG: Wrapping relation "local" to a subquery: SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true DEBUG: generating subplan XXX_1 for subquery SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT local.id, distributed.name FROM ((SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local LEFT JOIN local_dist_join_mixed.distributed USING (id)) LIMIT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT local.id, distributed.name FROM ((SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local LEFT JOIN local_dist_join_mixed.distributed USING (id)) ORDER BY local.id LIMIT 1 ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join SELECT @@ -1521,6 +1522,78 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo1.id FROM --------------------------------------------------------------------- (0 rows) +SELECT + count(*) +FROM + distributed +JOIN LATERAL + (SELECT + * + FROM + local + JOIN + distributed d2 + ON(true) + WHERE local.id = distributed.id AND d2.id = local.id) as foo +ON (true); +DEBUG: Wrapping relation "local" to a subquery: SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true +DEBUG: generating subplan XXX_1 for subquery SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_dist_join_mixed.distributed JOIN LATERAL (SELECT local.id, local.title, d2.id, d2.name, d2.created_at FROM ((SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local JOIN local_dist_join_mixed.distributed d2 ON (true)) WHERE ((local.id OPERATOR(pg_catalog.=) distributed.id) AND (d2.id OPERATOR(pg_catalog.=) local.id))) foo(id, title, id_1, name, created_at) ON (true)) + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT local.title, local.title FROM local JOIN distributed USING(id) ORDER BY 1,2 LIMIt 1; +DEBUG: Wrapping relation "local" to a subquery: SELECT id, title FROM local_dist_join_mixed.local WHERE true +DEBUG: generating subplan XXX_1 for subquery SELECT id, title FROM local_dist_join_mixed.local WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT local.title, local.title FROM ((SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local JOIN local_dist_join_mixed.distributed USING (id)) ORDER BY local.title, local.title LIMIT 1 +DEBUG: push down of limit count: 1 + title | title +--------------------------------------------------------------------- + 0 | 0 +(1 row) + +SELECT NULL FROM local JOIN distributed USING(id) ORDER BY 1 LIMIt 1; +DEBUG: Wrapping relation "local" to a subquery: SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true +DEBUG: generating subplan XXX_1 for subquery SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT NULL::text FROM ((SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local JOIN local_dist_join_mixed.distributed USING (id)) ORDER BY NULL::text LIMIT 1 +DEBUG: push down of limit count: 1 + ?column? +--------------------------------------------------------------------- + +(1 row) + +SELECT distributed.name, distributed.name, local.title, local.title FROM local JOIN distributed USING(id) ORDER BY 1,2,3,4 LIMIT 1; +DEBUG: Wrapping relation "local" to a subquery: SELECT id, title FROM local_dist_join_mixed.local WHERE true +DEBUG: generating subplan XXX_1 for subquery SELECT id, title FROM local_dist_join_mixed.local WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT distributed.name, distributed.name, local.title, local.title FROM ((SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local JOIN local_dist_join_mixed.distributed USING (id)) ORDER BY distributed.name, distributed.name, local.title, local.title LIMIT 1 +DEBUG: push down of limit count: 1 + name | name | title | title +--------------------------------------------------------------------- + 0 | 0 | 0 | 0 +(1 row) + +SELECT + COUNT(*) +FROM + local +JOIN + distributed +USING + (id) +JOIN + (SELECT id, NULL, NULL FROM distributed) foo +USING + (id); +DEBUG: Wrapping relation "local" to a subquery: SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true +DEBUG: generating subplan XXX_1 for subquery SELECT id, NULL::text AS title FROM local_dist_join_mixed.local WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title text)) local JOIN local_dist_join_mixed.distributed USING (id)) JOIN (SELECT distributed_1.id, NULL::text, NULL::text FROM local_dist_join_mixed.distributed distributed_1) foo(id, "?column?", "?column?_1") USING (id)) + count +--------------------------------------------------------------------- + 101 +(1 row) + DROP SCHEMA local_dist_join_mixed CASCADE; NOTICE: drop cascades to 7 other objects DETAIL: drop cascades to table distributed diff --git a/src/test/regress/expected/multi_partitioning.out b/src/test/regress/expected/multi_partitioning.out index a31711fe7..c11d800b2 100644 --- a/src/test/regress/expected/multi_partitioning.out +++ b/src/test/regress/expected/multi_partitioning.out @@ -1471,17 +1471,29 @@ BEGIN; UPDATE partitioning_locks_2009 SET time = '2009-03-01'; -- see the locks on parent table SELECT * FROM lockinfo; - logicalrelid | locktype | mode + logicalrelid | locktype | mode --------------------------------------------------------------------- - partitioning_locks | shard | ShareUpdateExclusiveLock - partitioning_locks | shard | ShareUpdateExclusiveLock - partitioning_locks | shard | ShareUpdateExclusiveLock - partitioning_locks | shard | ShareUpdateExclusiveLock - partitioning_locks_2009 | shard | ShareUpdateExclusiveLock - partitioning_locks_2009 | shard | ShareUpdateExclusiveLock - partitioning_locks_2009 | shard | ShareUpdateExclusiveLock - partitioning_locks_2009 | shard | ShareUpdateExclusiveLock -(8 rows) + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | shard | ShareUpdateExclusiveLock + partitioning_locks | shard | ShareUpdateExclusiveLock + partitioning_locks | shard | ShareUpdateExclusiveLock + partitioning_locks | shard | ShareUpdateExclusiveLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | shard | ShareUpdateExclusiveLock + partitioning_locks_2009 | shard | ShareUpdateExclusiveLock + partitioning_locks_2009 | shard | ShareUpdateExclusiveLock + partitioning_locks_2009 | shard | ShareUpdateExclusiveLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock +(20 rows) COMMIT; -- test shard resource locks with TRUNCATE diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 80cf392a5..e80569d40 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -222,10 +222,10 @@ test: multi_modifying_xacts test: multi_repartition_udt multi_repartitioned_subquery_udf multi_subtransactions test: multi_transaction_recovery -test: local_dist_join_modifications +test: local_dist_join_modifications test: local_table_join test: local_dist_join_mixed -test: citus_local_dist_joins +test: citus_local_dist_joins # --------- # multi_copy creates hash and range-partitioned tables and performs COPY diff --git a/src/test/regress/sql/citus_local_dist_joins.sql b/src/test/regress/sql/citus_local_dist_joins.sql index 34523eec2..d9806ddd0 100644 --- a/src/test/regress/sql/citus_local_dist_joins.sql +++ b/src/test/regress/sql/citus_local_dist_joins.sql @@ -128,7 +128,7 @@ SET FROM citus_local WHERE - mv1.key = citus_local.key; + mv1.key = citus_local.key; ROLLBACK; BEGIN; @@ -139,7 +139,7 @@ SET FROM mv1 WHERE - mv1.key = citus_local.key; + mv1.key = citus_local.key; ROLLBACK; BEGIN; @@ -150,9 +150,9 @@ SET FROM mv2 WHERE - mv2.key = citus_local.key; + mv2.key = citus_local.key; ROLLBACK; - + -- DELETE operations BEGIN; @@ -204,21 +204,21 @@ DELETE FROM USING citus_local WHERE - mv1.key = citus_local.key; + mv1.key = citus_local.key; DELETE FROM citus_local USING mv1 WHERE - mv1.key = citus_local.key; + mv1.key = citus_local.key; DELETE FROM citus_local USING mv2 WHERE - mv2.key = citus_local.key; + mv2.key = citus_local.key; SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distributed_table LIMIT 1) d1) d2 using (key) JOIN reference_table USING(key) JOIN citus_local USING (key) JOIN (SELECT * FROM citus_local) c1 USING (key) WHERE d2.key > 10 AND d2.key = 10; SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distributed_table LIMIT 1) d1) d2 using (key) JOIN reference_table USING(key) JOIN citus_local USING (key) JOIN (SELECT * FROM citus_local) c1 USING (key) WHERE d2.key > 10 AND d2.key = 10; @@ -229,4 +229,4 @@ SET client_min_messages to ERROR; DROP TABLE citus_local; SELECT master_remove_node('localhost', :master_port); \set VERBOSITY terse -DROP SCHEMA citus_local_dist_joins CASCADE; \ No newline at end of file +DROP SCHEMA citus_local_dist_joins CASCADE; diff --git a/src/test/regress/sql/local_dist_join_mixed.sql b/src/test/regress/sql/local_dist_join_mixed.sql index 1a00c4f22..65ab89941 100644 --- a/src/test/regress/sql/local_dist_join_mixed.sql +++ b/src/test/regress/sql/local_dist_join_mixed.sql @@ -3,13 +3,13 @@ SET search_path TO local_dist_join_mixed; -CREATE TABLE distributed (id bigserial PRIMARY KEY, - name text, +CREATE TABLE distributed (id bigserial PRIMARY KEY, + name text, created_at timestamptz DEFAULT now()); -CREATE TABLE reference (id bigserial PRIMARY KEY, +CREATE TABLE reference (id bigserial PRIMARY KEY, title text); -CREATE TABLE local (id bigserial PRIMARY KEY, +CREATE TABLE local (id bigserial PRIMARY KEY, title text); -- these above restrictions brought us to the following schema @@ -100,118 +100,118 @@ SELECT sum(d1.id + local.id) OVER (PARTITION BY d1.id + local.id) FROM distribut -- nested subqueries -SELECT - count(*) -FROM +SELECT + count(*) +FROM (SELECT * FROM (SELECT * FROM distributed) as foo) as bar - JOIN + JOIN local USING(id); -SELECT - count(*) -FROM +SELECT + count(*) +FROM (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar - JOIN + JOIN local USING(id); -SELECT - count(*) -FROM +SELECT + count(*) +FROM (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar - JOIN + JOIN local USING(id); -SELECT - count(*) -FROM +SELECT + count(*) +FROM (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar - JOIN + JOIN (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 USING(id); --- TODO: Unnecessary recursive planning for local -SELECT - count(*) -FROM +-- TODO: Unnecessary recursive planning for local +SELECT + count(*) +FROM (SELECT *, random() FROM (SELECT *, random() FROM distributed LIMIT 1) as foo) as bar - JOIN + JOIN (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 USING(id); -- subqueries in WHERE clause -- is not colocated, and the JOIN inside as well. -- so should be recursively planned twice -SELECT - count(*) -FROM - distributed -WHERE - id > (SELECT - count(*) - FROM +SELECT + count(*) +FROM + distributed +WHERE + id > (SELECT + count(*) + FROM (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar - JOIN + JOIN (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 USING(id) ); -- two distributed tables are co-located and JOINed on distribution --- key, so should be fine to pushdown -SELECT - count(*) -FROM +-- key, so should be fine to pushdown +SELECT + count(*) +FROM distributed d_upper -WHERE - (SELECT +WHERE + (SELECT bar.id - FROM + FROM (SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar - JOIN + JOIN (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 USING(id) ) IS NOT NULL; -SELECT - count(*) -FROM +SELECT + count(*) +FROM distributed d_upper -WHERE - (SELECT +WHERE + (SELECT bar.id - FROM + FROM (SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar - JOIN + JOIN local as foo USING(id) ) IS NOT NULL; -SELECT - count(*) -FROM +SELECT + count(*) +FROM distributed d_upper -WHERE d_upper.id > - (SELECT +WHERE d_upper.id > + (SELECT bar.id - FROM + FROM (SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar - JOIN + JOIN local as foo USING(id) ); -SELECT - count(*) -FROM +SELECT + count(*) +FROM distributed d_upper -WHERE - (SELECT +WHERE + (SELECT bar.id - FROM + FROM (SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar - JOIN + JOIN (SELECT *, random() FROM (SELECT *,random() FROM local WHERE d_upper.id = id) as foo2) as bar2 USING(id) ) IS NOT NULL; @@ -222,15 +222,15 @@ WHERE -- subqueries in the target list -- router, should work -select (SELECT local.id) FROM local, distributed WHERE distributed.id = 1 LIMIT 1; +select (SELECT local.id) FROM local, distributed WHERE distributed.id = 1 LIMIT 1; -- should fail -select (SELECT local.id) FROM local, distributed WHERE distributed.id != 1 LIMIT 1; +select (SELECT local.id) FROM local, distributed WHERE distributed.id != 1 LIMIT 1; -- currently not supported, but should work with https://github.com/citusdata/citus/pull/4360/files -SELECT +SELECT name, (SELECT id FROM local WHERE id = e.id) -FROM +FROM distributed e ORDER BY 1,2 LIMIT 1; @@ -260,7 +260,7 @@ SELECT count(*) FROM (SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as ba) ) bar; -select count(DISTINCT id) +select count(DISTINCT id) FROM ( (SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as fo) @@ -303,14 +303,14 @@ SELECT COUNT(*) FROM distributed JOIN LATERAL (SELECT * FROM local WHERE local.i SELECT count(*) FROM distributed CROSS JOIN local; -SELECT count(*) FROM distributed CROSS JOIN local WHERE distributed.id = 1; +SELECT count(*) FROM distributed CROSS JOIN local WHERE distributed.id = 1; -- w count(*) it works fine as PG ignores the inner tables SELECT count(*) FROM distributed LEFT JOIN local USING (id); SELECT count(*) FROM local LEFT JOIN distributed USING (id); -SELECT id, name FROM distributed LEFT JOIN local USING (id) LIMIT 1; -SELECT id, name FROM local LEFT JOIN distributed USING (id) LIMIT 1; +SELECT id, name FROM distributed LEFT JOIN local USING (id) ORDER BY 1 LIMIT 1; +SELECT id, name FROM local LEFT JOIN distributed USING (id) ORDER BY 1 LIMIT 1; SELECT foo1.id @@ -326,18 +326,18 @@ SELECT id, name FROM local LEFT JOIN distributed USING (id) LIMIT 1; (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo10, (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo1 WHERE - foo1.id = foo9.id AND - foo1.id = foo8.id AND - foo1.id = foo7.id AND - foo1.id = foo6.id AND - foo1.id = foo5.id AND - foo1.id = foo4.id AND - foo1.id = foo3.id AND - foo1.id = foo2.id AND - foo1.id = foo10.id AND + foo1.id = foo9.id AND + foo1.id = foo8.id AND + foo1.id = foo7.id AND + foo1.id = foo6.id AND + foo1.id = foo5.id AND + foo1.id = foo4.id AND + foo1.id = foo3.id AND + foo1.id = foo2.id AND + foo1.id = foo10.id AND foo1.id = foo1.id -ORDER BY 1; - +ORDER BY 1; + SELECT foo1.id FROM @@ -352,7 +352,7 @@ WHERE foo1.id = foo3.id AND foo1.id = foo4.id AND foo1.id = foo5.id -ORDER BY 1; +ORDER BY 1; SELECT foo1.id @@ -368,8 +368,37 @@ WHERE foo1.id = foo3.id AND foo1.id = foo4.id AND foo1.id = foo5.id -ORDER BY 1; +ORDER BY 1; +SELECT + count(*) +FROM + distributed +JOIN LATERAL + (SELECT + * + FROM + local + JOIN + distributed d2 + ON(true) + WHERE local.id = distributed.id AND d2.id = local.id) as foo +ON (true); +SELECT local.title, local.title FROM local JOIN distributed USING(id) ORDER BY 1,2 LIMIt 1; +SELECT NULL FROM local JOIN distributed USING(id) ORDER BY 1 LIMIt 1; +SELECT distributed.name, distributed.name, local.title, local.title FROM local JOIN distributed USING(id) ORDER BY 1,2,3,4 LIMIT 1; +SELECT + COUNT(*) +FROM + local +JOIN + distributed +USING + (id) +JOIN + (SELECT id, NULL, NULL FROM distributed) foo +USING + (id); -DROP SCHEMA local_dist_join_mixed CASCADE; \ No newline at end of file +DROP SCHEMA local_dist_join_mixed CASCADE; diff --git a/src/test/regress/sql/local_dist_join_modifications.sql b/src/test/regress/sql/local_dist_join_modifications.sql index f8c8ee886..843f04dbe 100644 --- a/src/test/regress/sql/local_dist_join_modifications.sql +++ b/src/test/regress/sql/local_dist_join_modifications.sql @@ -104,7 +104,7 @@ SET FROM postgres_table WHERE - mv1.key = postgres_table.key; + mv1.key = postgres_table.key; ROLLBACK; BEGIN; @@ -115,7 +115,7 @@ SET FROM mv1 WHERE - mv1.key = postgres_table.key; + mv1.key = postgres_table.key; ROLLBACK; BEGIN; @@ -126,7 +126,7 @@ SET FROM mv2 WHERE - mv2.key = postgres_table.key; + mv2.key = postgres_table.key; ROLLBACK; -- in case of update/delete we always recursively plan @@ -351,22 +351,22 @@ DELETE FROM USING postgres_table WHERE - mv1.key = postgres_table.key; + mv1.key = postgres_table.key; DELETE FROM postgres_table USING mv1 WHERE - mv1.key = postgres_table.key; + mv1.key = postgres_table.key; DELETE FROM postgres_table USING mv2 WHERE - mv2.key = postgres_table.key; + mv2.key = postgres_table.key; SET client_min_messages to ERROR; -DROP SCHEMA local_dist_join_modifications CASCADE; \ No newline at end of file +DROP SCHEMA local_dist_join_modifications CASCADE;