From eebcd995b3dc75470d55c086eaba2f09528c834e Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Fri, 27 Nov 2020 19:02:51 +0300 Subject: [PATCH] Add some more tests --- .../planner/local_distributed_join_planner.c | 182 +++++++++--------- .../planner/multi_router_planner.c | 96 +-------- .../distributed/planner/recursive_planning.c | 22 ++- .../relation_restriction_equivalence.c | 1 - .../distributed/planner/shard_pruning.c | 2 +- src/include/distributed/recursive_planning.h | 2 +- .../regress/expected/local_table_join.out | 127 +++++++++++- src/test/regress/sql/local_table_join.sql | 30 +++ 8 files changed, 256 insertions(+), 206 deletions(-) diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index 680fb7b3a..d9535d1c6 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -54,55 +54,55 @@ #include "utils/guc.h" #include "utils/lsyscache.h" -typedef struct RTEToSubqueryConverterReference +typedef struct RangeTableEntryDetails { RangeTblEntry *rangeTableEntry; Index rteIndex; List *restrictionList; List *requiredAttributeNumbers; -} RTEToSubqueryConverterReference; +} RangeTableEntryDetails; -typedef struct RTEToSubqueryConverterContext +typedef struct ConversionCandidates { List *distributedTableList; /* reference or distributed table */ List *localTableList; /* local or citus local table */ bool hasSubqueryRTE; -}RTEToSubqueryConverterContext; +}ConversionCandidates; static Oid GetResultRelationId(Query *query); static Oid GetRTEToSubqueryConverterReferenceRelId( - RTEToSubqueryConverterReference *rteToSubqueryConverterReference); + RangeTableEntryDetails *rangeTableEntryDetails); static bool ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid resultRelationId); static bool HasUniqueFilter(RangeTblEntry *distRTE, List *distRTERestrictionList, List *requiredAttrNumbersForDistRTE); static bool ShouldConvertDistributedTable(FromExpr *joinTree, - RTEToSubqueryConverterReference *distRTEContext); + RangeTableEntryDetails *distRTEContext); static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, RecursivePlanningContext *planningContext); -static RTEToSubqueryConverterContext * CreateRTEToSubqueryConverterContext( +static ConversionCandidates * CreateConversionCandidates( RecursivePlanningContext *context, List * rangeTableList); static void GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes); -static RTEToSubqueryConverterReference * GetNextRTEToConvertToSubquery(FromExpr *joinTree, - RTEToSubqueryConverterContext - * - rteToSubqueryConverterContext, - PlannerRestrictionContext - * - plannerRestrictionContext, - Oid - resultRelationId); +static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(FromExpr *joinTree, + ConversionCandidates + * + conversionCandidates, + PlannerRestrictionContext + * + plannerRestrictionContext, + Oid + resultRelationId); static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList, List **joinRangeTableEntries); -static void RemoveFromRTEToSubqueryConverterContext( - RTEToSubqueryConverterContext *rteToSubqueryConverterContext, Oid relationId); +static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid + relationId); static bool FillLocalAndDistributedRTECandidates( - RTEToSubqueryConverterContext *rteToSubqueryConverterContext, - RTEToSubqueryConverterReference ** + ConversionCandidates *conversionCandidates, + RangeTableEntryDetails ** localRTECandidate, - RTEToSubqueryConverterReference ** + RangeTableEntryDetails ** distributedRTECandidate); /* @@ -123,31 +123,31 @@ ConvertUnplannableTableJoinsToSubqueries(Query *query, return; } - RTEToSubqueryConverterContext *rteToSubqueryConverterContext = - CreateRTEToSubqueryConverterContext( + ConversionCandidates *conversionCandidates = + CreateConversionCandidates( context, rangeTableList); - RTEToSubqueryConverterReference *rteToSubqueryConverterReference = - GetNextRTEToConvertToSubquery(query->jointree, rteToSubqueryConverterContext, + RangeTableEntryDetails *rangeTableEntryDetails = + GetNextRTEToConvertToSubquery(query->jointree, conversionCandidates, context->plannerRestrictionContext, resultRelationId); PlannerRestrictionContext *plannerRestrictionContext = FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query); - while (rteToSubqueryConverterReference && !IsRouterPlannable(query, - plannerRestrictionContext)) + while (rangeTableEntryDetails && !IsRouterPlannable(query, + plannerRestrictionContext)) { ReplaceRTERelationWithRteSubquery( - rteToSubqueryConverterReference->rangeTableEntry, - rteToSubqueryConverterReference->restrictionList, - rteToSubqueryConverterReference-> + rangeTableEntryDetails->rangeTableEntry, + rangeTableEntryDetails->restrictionList, + rangeTableEntryDetails-> requiredAttributeNumbers, context); - RemoveFromRTEToSubqueryConverterContext(rteToSubqueryConverterContext, - rteToSubqueryConverterReference-> - rangeTableEntry->relid); - rteToSubqueryConverterReference = - GetNextRTEToConvertToSubquery(query->jointree, rteToSubqueryConverterContext, + RemoveFromConversionCandidates(conversionCandidates, + rangeTableEntryDetails-> + rangeTableEntry->relid); + rangeTableEntryDetails = + GetNextRTEToConvertToSubquery(query->jointree, conversionCandidates, context->plannerRestrictionContext, resultRelationId); } @@ -220,15 +220,15 @@ GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList, * which should be converted to a subquery. It considers the local join policy * and result relation. */ -static RTEToSubqueryConverterReference * +static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(FromExpr *joinTree, - RTEToSubqueryConverterContext *rteToSubqueryConverterContext, + ConversionCandidates *conversionCandidates, PlannerRestrictionContext *plannerRestrictionContext, Oid resultRelationId) { - RTEToSubqueryConverterReference *localRTECandidate = NULL; - RTEToSubqueryConverterReference *distributedRTECandidate = NULL; - if (!FillLocalAndDistributedRTECandidates(rteToSubqueryConverterContext, + RangeTableEntryDetails *localRTECandidate = NULL; + RangeTableEntryDetails *distributedRTECandidate = NULL; + if (!FillLocalAndDistributedRTECandidates(conversionCandidates, &localRTECandidate, &distributedRTECandidate)) { @@ -283,28 +283,27 @@ GetNextRTEToConvertToSubquery(FromExpr *joinTree, * It returns true if we should continue converting tables to subqueries. */ static bool -FillLocalAndDistributedRTECandidates( - RTEToSubqueryConverterContext *rteToSubqueryConverterContext, - RTEToSubqueryConverterReference **localRTECandidate, - RTEToSubqueryConverterReference ** - distributedRTECandidate) +FillLocalAndDistributedRTECandidates(ConversionCandidates *conversionCandidates, + RangeTableEntryDetails **localRTECandidate, + RangeTableEntryDetails ** + distributedRTECandidate) { - if (list_length(rteToSubqueryConverterContext->localTableList) > 0) + if (list_length(conversionCandidates->localTableList) > 0) { - *localRTECandidate = linitial(rteToSubqueryConverterContext->localTableList); + *localRTECandidate = linitial(conversionCandidates->localTableList); } if (*localRTECandidate == NULL) { return false; } - if (list_length(rteToSubqueryConverterContext->distributedTableList) > 0) + if (list_length(conversionCandidates->distributedTableList) > 0) { *distributedRTECandidate = linitial( - rteToSubqueryConverterContext->distributedTableList); + conversionCandidates->distributedTableList); } return *distributedRTECandidate != NULL || - rteToSubqueryConverterContext->hasSubqueryRTE; + conversionCandidates->hasSubqueryRTE; } @@ -313,35 +312,33 @@ FillLocalAndDistributedRTECandidates( * if it is a valid one. */ static Oid -GetRTEToSubqueryConverterReferenceRelId( - RTEToSubqueryConverterReference *rteToSubqueryConverterReference) +GetRTEToSubqueryConverterReferenceRelId(RangeTableEntryDetails *rangeTableEntryDetails) { - if (rteToSubqueryConverterReference && - rteToSubqueryConverterReference->rangeTableEntry) + if (rangeTableEntryDetails && + rangeTableEntryDetails->rangeTableEntry) { - return rteToSubqueryConverterReference->rangeTableEntry->relid; + return rangeTableEntryDetails->rangeTableEntry->relid; } return InvalidOid; } /* - * RemoveFromRTEToSubqueryConverterContext removes an element from + * RemoveFromConversionCandidates removes an element from * the relevant list based on the relation id. */ static void -RemoveFromRTEToSubqueryConverterContext( - RTEToSubqueryConverterContext *rteToSubqueryConverterContext, Oid relationId) +RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid relationId) { if (IsLocalOrCitusLocalTable(relationId)) { - rteToSubqueryConverterContext->localTableList = - list_delete_first(rteToSubqueryConverterContext->localTableList); + conversionCandidates->localTableList = + list_delete_first(conversionCandidates->localTableList); } else { - rteToSubqueryConverterContext->distributedTableList = - list_delete_first(rteToSubqueryConverterContext->distributedTableList); + conversionCandidates->distributedTableList = + list_delete_first(conversionCandidates->distributedTableList); } } @@ -373,16 +370,16 @@ ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList, Oid resultRelatio */ static bool ShouldConvertDistributedTable(FromExpr *joinTree, - RTEToSubqueryConverterReference * - rteToSubqueryConverterReference) + RangeTableEntryDetails * + rangeTableEntryDetails) { - if (rteToSubqueryConverterReference == NULL) + if (rangeTableEntryDetails == NULL) { return false; } List *distRTEEqualityQuals = FetchEqualityAttrNumsForRTEFromQuals(joinTree->quals, - rteToSubqueryConverterReference->rteIndex); + rangeTableEntryDetails->rteIndex); Node *join = NULL; foreach_ptr(join, joinTree->fromlist) @@ -393,15 +390,15 @@ ShouldConvertDistributedTable(FromExpr *joinTree, distRTEEqualityQuals = list_concat(distRTEEqualityQuals, FetchEqualityAttrNumsForRTEFromQuals( joinExpr->quals, - rteToSubqueryConverterReference-> + rangeTableEntryDetails-> rteIndex) ); } } bool hasUniqueFilter = HasUniqueFilter( - rteToSubqueryConverterReference->rangeTableEntry, - rteToSubqueryConverterReference-> + rangeTableEntryDetails->rangeTableEntry, + rangeTableEntryDetails-> restrictionList, distRTEEqualityQuals); return hasUniqueFilter; } @@ -505,18 +502,15 @@ RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, /* - * CreateRTEToSubqueryConverterContext returns a range table entry which has the most filters - * on it along with the restrictions (e.g., fills **restrictionList). - * - * The function also gets a boolean localTable parameter, so the caller - * can choose to run the function for only local tables or distributed tables. + * CreateConversionCandidates creates the conversion candidates that might + * be converted to a subquery so that citus planners can work. */ -static RTEToSubqueryConverterContext * -CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context, - List *rangeTableList) +static ConversionCandidates * +CreateConversionCandidates(RecursivePlanningContext *context, + List *rangeTableList) { - RTEToSubqueryConverterContext *rteToSubqueryConverterContext = palloc0( - sizeof(RTEToSubqueryConverterContext)); + ConversionCandidates *conversionCandidates = palloc0( + sizeof(ConversionCandidates)); int rteIndex = 0; RangeTblEntry *rangeTableEntry = NULL; @@ -525,24 +519,24 @@ CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context, rteIndex++; if (rangeTableEntry->rtekind == RTE_SUBQUERY) { - rteToSubqueryConverterContext->hasSubqueryRTE = true; + conversionCandidates->hasSubqueryRTE = true; } /* we're only interested in tables */ - if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) + if (!IsRecursivelyPlannableRelation(rangeTableEntry)) { continue; } - RTEToSubqueryConverterReference *rteToSubqueryConverter = palloc( - sizeof(RTEToSubqueryConverterReference)); - rteToSubqueryConverter->rangeTableEntry = rangeTableEntry; - rteToSubqueryConverter->rteIndex = rteIndex; - rteToSubqueryConverter->restrictionList = GetRestrictInfoListForRelation( + RangeTableEntryDetails *rangeTableEntryDetails = palloc0( + sizeof(RangeTableEntryDetails)); + rangeTableEntryDetails->rangeTableEntry = rangeTableEntry; + rangeTableEntryDetails->rteIndex = rteIndex; + rangeTableEntryDetails->restrictionList = GetRestrictInfoListForRelation( rangeTableEntry, context-> plannerRestrictionContext, 1); - rteToSubqueryConverter->requiredAttributeNumbers = RequiredAttrNumbersForRelation( + rangeTableEntryDetails->requiredAttributeNumbers = RequiredAttrNumbersForRelation( rangeTableEntry, context); bool referenceOrDistributedTable = IsCitusTableType(rangeTableEntry->relid, @@ -551,16 +545,16 @@ CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context, DISTRIBUTED_TABLE); if (referenceOrDistributedTable) { - rteToSubqueryConverterContext->distributedTableList = - lappend(rteToSubqueryConverterContext->distributedTableList, - rteToSubqueryConverter); + conversionCandidates->distributedTableList = + lappend(conversionCandidates->distributedTableList, + rangeTableEntryDetails); } else { - rteToSubqueryConverterContext->localTableList = - lappend(rteToSubqueryConverterContext->localTableList, - rteToSubqueryConverter); + conversionCandidates->localTableList = + lappend(conversionCandidates->localTableList, + rangeTableEntryDetails); } } - return rteToSubqueryConverterContext; + return conversionCandidates; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 4869ef7cf..c8d71619e 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -132,12 +132,6 @@ static DeferredErrorMessage * ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, Oid *distributedTableId); static bool NodeIsFieldStore(Node *node); -static DeferredErrorMessage * DeferErrorIfUnsupportedModifyQueryWithLocalTable( - Query *query); -static DeferredErrorMessage * DeferErrorIfUnsupportedModifyQueryWithCitusLocalTable( - RTEListProperties *rteListProperties, Oid targetRelationId); -static DeferredErrorMessage * DeferErrorIfUnsupportedModifyQueryWithPostgresLocalTable( - RTEListProperties *rteListProperties, Oid targetRelationId); static DeferredErrorMessage * MultiShardUpdateDeleteSupported(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); @@ -307,7 +301,8 @@ IsRouterPlannable(Query *query, PlannerRestrictionContext *plannerRestrictionCon DeferredErrorMessage *deferredErrorMessage = NULL; if (copyQuery->commandType == CMD_SELECT) { - deferredErrorMessage = MultiRouterPlannableQuery(copyQuery); + deferredErrorMessage = DeferErrorIfUnsupportedRouterPlannableSelectQuery( + copyQuery); } if (deferredErrorMessage) { @@ -809,93 +804,6 @@ NodeIsFieldStore(Node *node) } -/* - * DeferErrorIfUnsupportedModifyQueryWithLocalTable returns DeferredErrorMessage - * for unsupported modify queries that cannot be planned by router planner due to - * unsupported usage of postgres local or citus local tables. - */ -static DeferredErrorMessage * -DeferErrorIfUnsupportedModifyQueryWithLocalTable(Query *query) -{ - RTEListProperties *rteListProperties = GetRTEListPropertiesForQuery(query); - Oid targetRelationId = ModifyQueryResultRelationId(query); - - DeferredErrorMessage *deferredErrorMessage = - DeferErrorIfUnsupportedModifyQueryWithCitusLocalTable(rteListProperties, - targetRelationId); - if (deferredErrorMessage) - { - return deferredErrorMessage; - } - - deferredErrorMessage = DeferErrorIfUnsupportedModifyQueryWithPostgresLocalTable( - rteListProperties, - targetRelationId); - return deferredErrorMessage; -} - - -/* - * DeferErrorIfUnsupportedModifyQueryWithCitusLocalTable is a helper function - * that takes RTEListProperties & targetRelationId and returns deferred error - * if query is not supported due to unsupported usage of citus local tables. - */ -static DeferredErrorMessage * -DeferErrorIfUnsupportedModifyQueryWithCitusLocalTable( - RTEListProperties *rteListProperties, Oid targetRelationId) -{ - if (rteListProperties->hasDistributedTable && rteListProperties->hasCitusLocalTable) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot plan modifications with citus local tables and " - "distributed tables", NULL, - LOCAL_TABLE_SUBQUERY_CTE_HINT); - } - - if (IsCitusTableType(targetRelationId, REFERENCE_TABLE) && - rteListProperties->hasCitusLocalTable) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot plan modifications of reference tables with citus " - "local tables", NULL, - LOCAL_TABLE_SUBQUERY_CTE_HINT); - } - - return NULL; -} - - -/* - * DeferErrorIfUnsupportedModifyQueryWithPostgresLocalTable is a helper - * function that takes RTEListProperties & targetRelationId and returns - * deferred error if query is not supported due to unsupported usage of - * postgres local tables. - */ -static DeferredErrorMessage * -DeferErrorIfUnsupportedModifyQueryWithPostgresLocalTable( - RTEListProperties *rteListProperties, Oid targetRelationId) -{ - if (rteListProperties->hasPostgresLocalTable && - rteListProperties->hasCitusTable) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot plan modifications with local tables involving " - "citus tables", NULL, - LOCAL_TABLE_SUBQUERY_CTE_HINT); - } - - if (!IsCitusTable(targetRelationId)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot plan modifications of local tables involving " - "distributed tables", - NULL, NULL); - } - - return NULL; -} - - /* * ModifyQuerySupported returns NULL if the query only contains supported * features, otherwise it returns an error description. diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index f1c821690..63c9d85e0 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -1400,8 +1400,16 @@ ContainsTableToBeConvertedToSubquery(List *rangeTableList, Oid resultRelationId) { return false; } - return ContainsLocalTableDistributedTableJoin(rangeTableList) || - ContainsLocalTableSubqueryJoin(rangeTableList, resultRelationId); + if (ContainsLocalTableDistributedTableJoin(rangeTableList)) + { + return true; + } + if (ContainsLocalTableSubqueryJoin(rangeTableList, resultRelationId)) + { + return true; + } + + return false; } @@ -1420,7 +1428,7 @@ AllDataLocallyAccessible(List *rangeTableList) /* TODO:: check if it has distributed table */ return false; } - if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) + if (!IsRecursivelyPlannableRelation(rangeTableEntry)) { continue; } @@ -1457,11 +1465,11 @@ AllDataLocallyAccessible(List *rangeTableList) /* - * SubqueryConvertableRelationForJoin returns true if the given range table entry + * IsRecursivelyPlannableRelation returns true if the given range table entry * is a relation type that can be converted to a subquery. */ bool -SubqueryConvertableRelationForJoin(RangeTblEntry *rangeTableEntry) +IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry) { if (rangeTableEntry->rtekind != RTE_RELATION) { @@ -1487,7 +1495,7 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) + if (!IsRecursivelyPlannableRelation(rangeTableEntry)) { continue; } @@ -1529,7 +1537,7 @@ ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId) containsSubquery = true; } - if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) + if (!IsRecursivelyPlannableRelation(rangeTableEntry)) { continue; } diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index a744e80ff..64620ef4f 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -1910,7 +1910,6 @@ GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, */ Expr *copyOfRestrictClause = (Expr *) copyObject((Node *) restrictionClause); List *varClauses = pull_var_clause_default((Node *) copyOfRestrictClause); - ListCell *varClauseCell = NULL; Var *column = NULL; foreach_ptr(column, varClauses) { diff --git a/src/backend/distributed/planner/shard_pruning.c b/src/backend/distributed/planner/shard_pruning.c index 26b84d9ee..8d606cc14 100644 --- a/src/backend/distributed/planner/shard_pruning.c +++ b/src/backend/distributed/planner/shard_pruning.c @@ -894,7 +894,7 @@ PrunableExpressionsWalker(PruningTreeNode *node, ClauseWalkerContext *context) * VarConstOpExprClause check whether an expression is a valid comparison of a Var to a Const. * Also obtaining the var with constant when valid. */ -static bool +bool VarConstOpExprClause(OpExpr *opClause, Var **varClause, Const **constantClause) { Var *foundVarClause = NULL; diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index 6dd146482..a88e14bc2 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -67,6 +67,6 @@ extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, extern bool ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId); extern bool ContainsTableToBeConvertedToSubquery(List *rangeTableList, Oid resultRelationId); -extern bool SubqueryConvertableRelationForJoin(RangeTblEntry *rangeTableEntry); +extern bool IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry); #endif /* RECURSIVE_PLANNING_H */ diff --git a/src/test/regress/expected/local_table_join.out b/src/test/regress/expected/local_table_join.out index 2dacb8db8..9d2ebb12a 100644 --- a/src/test/regress/expected/local_table_join.out +++ b/src/test/regress/expected/local_table_join.out @@ -39,6 +39,16 @@ SELECT create_distributed_table('distributed_partitioned_table', 'key'); (1 row) +CREATE TABLE local_partitioned_table(key int, value text) PARTITION BY RANGE (key); +CREATE TABLE local_partitioned_table_1 PARTITION OF local_partitioned_table FOR VALUES FROM (0) TO (10); +CREATE TABLE local_partitioned_table_2 PARTITION OF local_partitioned_table FOR VALUES FROM (10) TO (20); +CREATE TABLE distributed_table_composite (key int, value text, value_2 jsonb, primary key (key, value)); +SELECT create_distributed_table('distributed_table_composite', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + SET client_min_messages TO DEBUG1; -- the user doesn't allow local / distributed table joinn SET citus.local_table_join_policy TO 'never'; @@ -121,6 +131,34 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) +-- partititoned local tables should work as well +SELECT count(*) FROM distributed_table JOIN local_partitioned_table USING(key); +DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM reference_table JOIN local_partitioned_table USING(key); +DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.reference_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM distributed_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key); +DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.reference_table USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + -- partitioned tables should work as well SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key); DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 @@ -149,6 +187,43 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) +SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key); +DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) WHERE distributed_partitioned_table.key = 10; +DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) WHERE (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key); +DEBUG: Wrapping local relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.reference_table USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- TODO:: We should probably recursively plan postgres table here because primary key is on key,value not key. +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10; +DEBUG: Wrapping local relation "distributed_table_composite" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_composite.key OPERATOR(pg_catalog.=) 10) + count +--------------------------------------------------------------------- + 0 +(1 row) + -- a unique index on key so dist table should be recursively planned SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key); DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 @@ -508,7 +583,7 @@ SET FROM postgres_table WHERE - distributed_table_windex.key = postgres_table.key; + distributed_table_windex.key = postgres_table.key; DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_windex SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) postgres_table.key) @@ -555,7 +630,7 @@ SET FROM postgres_table WHERE - distributed_table_windex.key = postgres_table.key; + distributed_table_windex.key = postgres_table.key; DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_windex SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) postgres_table.key) @@ -600,7 +675,7 @@ SET FROM postgres_table WHERE - distributed_table_windex.key = postgres_table.key; + distributed_table_windex.key = postgres_table.key; DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_windex SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) postgres_table.key) @@ -800,7 +875,7 @@ SET FROM citus_local WHERE - distributed_table_windex.key = citus_local.key; + distributed_table_windex.key = citus_local.key; DEBUG: Wrapping local relation "citus_local" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.citus_local WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.citus_local WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_windex SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) citus_local WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) citus_local.key) @@ -811,15 +886,51 @@ SET FROM distributed_table_windex WHERE - distributed_table_windex.key = citus_local.key; + distributed_table_windex.key = citus_local.key; DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.citus_local SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) citus_local.key) +-- complex queries +SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distributed_table LIMIT 1) d1) d2 using (key) JOIN reference_table USING(key) JOIN citus_local USING (key) JOIN (SELECT * FROM citus_local) c1 USING (key) WHERE d2.key > 10 AND d2.key = 10; +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.distributed_table LIMIT 1 +DEBUG: generating subplan XXX_2 for subquery SELECT key, value FROM local_table_join.citus_local +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((local_table_join.postgres_table JOIN (SELECT d1.key, d1.value, d1.value_2 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1) d2 USING (key)) JOIN local_table_join.reference_table USING (key)) JOIN local_table_join.citus_local USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) c1 USING (key)) WHERE ((d2.key OPERATOR(pg_catalog.>) 10) AND (d2.key OPERATOR(pg_catalog.=) 10)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distributed_table LIMIT 1) d1) d2 using (key) JOIN reference_table USING(key) JOIN citus_local USING (key) JOIN (SELECT * FROM citus_local) c1 USING (key) WHERE d2.key > 10 AND d2.key = 10; +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.distributed_table LIMIT 1 +DEBUG: generating subplan XXX_2 for subquery SELECT key, value FROM local_table_join.citus_local +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((local_table_join.postgres_table JOIN (SELECT d1.key, d1.value, d1.value_2 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1) d2 USING (key)) JOIN local_table_join.reference_table USING (key)) JOIN local_table_join.citus_local USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) c1 USING (key)) WHERE ((d2.key OPERATOR(pg_catalog.>) 10) AND (d2.key OPERATOR(pg_catalog.=) 10)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- TODO:: we should support this? +UPDATE reference_table SET key = 1 FROM postgres_table WHERE postgres_table.key = 10; +ERROR: relation postgres_table is not distributed +UPDATE reference_table SET key = 1 FROM (SELECT * FROM postgres_table) l WHERE l.key = 10; +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.reference_table SET key = 1 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) l WHERE (l.key OPERATOR(pg_catalog.=) 10) +-- TODO:: we should probably not wrap postgres_table here as there is a WHERE FALSE? +-- though then the planner could give an error +SELECT count(*) FROM postgres_table JOIN distributed_table USING(key) WHERE FALSE; +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE false OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE false OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (key)) WHERE false + count +--------------------------------------------------------------------- + 0 +(1 row) + DROP TABLE citus_local; CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" PL/pgSQL function citus_drop_trigger() line 15 at PERFORM -CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" -PL/pgSQL function citus_drop_trigger() line 15 at PERFORM RESET client_min_messages; SELECT master_remove_node('localhost', :master_port); master_remove_node @@ -829,4 +940,4 @@ SELECT master_remove_node('localhost', :master_port); \set VERBOSITY terse DROP SCHEMA local_table_join CASCADE; -NOTICE: drop cascades to 7 other objects +NOTICE: drop cascades to 9 other objects diff --git a/src/test/regress/sql/local_table_join.sql b/src/test/regress/sql/local_table_join.sql index 0fb65d824..59b4b543a 100644 --- a/src/test/regress/sql/local_table_join.sql +++ b/src/test/regress/sql/local_table_join.sql @@ -19,6 +19,12 @@ CREATE TABLE distributed_partitioned_table_1 PARTITION OF distributed_partitione CREATE TABLE distributed_partitioned_table_2 PARTITION OF distributed_partitioned_table FOR VALUES FROM (10) TO (20); SELECT create_distributed_table('distributed_partitioned_table', 'key'); +CREATE TABLE local_partitioned_table(key int, value text) PARTITION BY RANGE (key); +CREATE TABLE local_partitioned_table_1 PARTITION OF local_partitioned_table FOR VALUES FROM (0) TO (10); +CREATE TABLE local_partitioned_table_2 PARTITION OF local_partitioned_table FOR VALUES FROM (10) TO (20); + +CREATE TABLE distributed_table_composite (key int, value text, value_2 jsonb, primary key (key, value)); +SELECT create_distributed_table('distributed_table_composite', 'key'); SET client_min_messages TO DEBUG1; @@ -51,11 +57,23 @@ SELECT count(*) FROM distributed_table JOIN postgres_table USING(key); SELECT count(*) FROM reference_table JOIN postgres_table USING(key); SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) JOIN reference_table USING (key); +-- partititoned local tables should work as well +SELECT count(*) FROM distributed_table JOIN local_partitioned_table USING(key); +SELECT count(*) FROM reference_table JOIN local_partitioned_table USING(key); +SELECT count(*) FROM distributed_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key); + -- partitioned tables should work as well SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key); SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) WHERE distributed_partitioned_table.key = 10; SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key); +SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key); +SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) WHERE distributed_partitioned_table.key = 10; +SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key); + +-- TODO:: We should probably recursively plan postgres table here because primary key is on key,value not key. +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10; + -- a unique index on key so dist table should be recursively planned SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key); SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(value); @@ -352,6 +370,18 @@ FROM WHERE distributed_table_windex.key = citus_local.key; +-- complex queries +SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distributed_table LIMIT 1) d1) d2 using (key) JOIN reference_table USING(key) JOIN citus_local USING (key) JOIN (SELECT * FROM citus_local) c1 USING (key) WHERE d2.key > 10 AND d2.key = 10; +SELECT count(*) FROM postgres_table JOIN (SELECT * FROM (SELECT * FROM distributed_table LIMIT 1) d1) d2 using (key) JOIN reference_table USING(key) JOIN citus_local USING (key) JOIN (SELECT * FROM citus_local) c1 USING (key) WHERE d2.key > 10 AND d2.key = 10; + +-- TODO:: we should support this? +UPDATE reference_table SET key = 1 FROM postgres_table WHERE postgres_table.key = 10; +UPDATE reference_table SET key = 1 FROM (SELECT * FROM postgres_table) l WHERE l.key = 10; + +-- TODO:: we should probably not wrap postgres_table here as there is a WHERE FALSE? +-- though then the planner could give an error +SELECT count(*) FROM postgres_table JOIN distributed_table USING(key) WHERE FALSE; + DROP TABLE citus_local; RESET client_min_messages; SELECT master_remove_node('localhost', :master_port);