From 2a44029aaf78cd6c518b5d2b7c09253c738620b7 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Fri, 4 Dec 2020 12:24:37 +0300 Subject: [PATCH] Simplify ContainsTableToBeConvertedToSubquery AllDataLocallyAccessible and ContainsLocalTableSubqueryJoin are removed. We can possibly remove ModifiesLocalTableWithRemoteCitusLocalTable as well. Though this removal has a side effect that now when all the data is locally available, we could still wrap a relation into a subquery, I guess that should be resolved in the router planner itself. Add more tests --- .../distributed/planner/distributed_planner.c | 1 - .../planner/local_distributed_join_planner.c | 24 +- .../planner/multi_logical_planner.c | 11 + .../planner/multi_router_planner.c | 12 +- .../distributed/planner/recursive_planning.c | 129 +----- .../relation_restriction_equivalence.c | 6 +- .../distributed/multi_logical_planner.h | 1 + src/include/distributed/recursive_planning.h | 1 - .../relation_restriction_equivalence.h | 4 +- .../expected/citus_local_tables_queries.out | 18 +- .../expected/local_distributed_table_join.out | 0 .../regress/expected/local_table_join.out | 45 ++- .../sql/local_distributed_table_join.sql | 371 ++++++++++++++++++ src/test/regress/sql/local_table_join.sql | 3 + 14 files changed, 451 insertions(+), 175 deletions(-) create mode 100644 src/test/regress/expected/local_distributed_table_join.out create mode 100644 src/test/regress/sql/local_distributed_table_join.sql diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 21de545e2..6a11bb352 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -414,7 +414,6 @@ AdjustPartitioningForDistributedPlanning(List *rangeTableList, * value before and after dropping to the standart_planner. */ if (rangeTableEntry->rtekind == RTE_RELATION && - IsCitusTable(rangeTableEntry->relid) && PartitionedTable(rangeTableEntry->relid)) { rangeTableEntry->inh = setPartitionedTablesInherited; diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index 8e03fd6d8..877616fa0 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -141,6 +141,10 @@ RecursivelyPlanLocalTableJoins(Query *query, RangeTableEntryDetails *rangeTableEntryDetails = GetNextRTEToConvertToSubquery(joinTree, conversionCandidates, plannerRestrictionContext); + if (rangeTableEntryDetails == NULL) + { + break; + } RangeTblEntry *rangeTableEntry = rangeTableEntryDetails->rangeTableEntry; Oid relId = rangeTableEntryDetails->rangeTableEntry->relid; @@ -298,7 +302,8 @@ ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList, /* user doesn't want Citus to enable local table joins */ return false; } - if (!ContainsTableToBeConvertedToSubquery(rangeTableList, resultRelationId)) + + if (!ContainsTableToBeConvertedToSubquery(rangeTableList, resultRelationId)) { return false; } @@ -385,23 +390,22 @@ static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, PlannerRestrictionContext *plannerRestrictionContext) { - /* TODO: Get rid of this hack, find relation restriction information directly */ - PlannerRestrictionContext *filteredPlannerRestrictionContext = - FilterPlannerRestrictionForQuery(plannerRestrictionContext, - WrapRteRelationIntoSubquery(relationRte, NIL)); - + int rteIdentity = GetRTEIdentity(relationRte); RelationRestrictionContext *relationRestrictionContext = - filteredPlannerRestrictionContext->relationRestrictionContext; + plannerRestrictionContext->relationRestrictionContext; + Relids queryRteIdentities = bms_make_singleton(rteIdentity); + RelationRestrictionContext *filteredRelationRestrictionContext = + FilterRelationRestrictionContext(relationRestrictionContext, queryRteIdentities); List *filteredRelationRestrictionList = - relationRestrictionContext->relationRestrictionList; + filteredRelationRestrictionContext->relationRestrictionList; - if (list_length(filteredRelationRestrictionList) == 0) + if (list_length(filteredRelationRestrictionList) != 1) { return NIL; } + RelationRestriction *relationRestriction = (RelationRestriction *) linitial(filteredRelationRestrictionList); - PlannerInfo *plannerInfo = relationRestriction->plannerInfo; Query *queryToProcess = plannerInfo->parse; int rteIndex = relationRestriction->index; diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index b74b7a352..02e780aa1 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -333,6 +333,17 @@ IsCitusTableRTE(Node *node) } +/* + * IsDistributedOrReferenceTableRTE returns true if the given node + * is eeither a distributed(hash/range/append) or reference table. + */ +bool +IsDistributedOrReferenceTableRTE(Node *node) +{ + return IsDistributedTableRTE(node) || IsReferenceTableRTE(node); +} + + /* * IsDistributedTableRTE gets a node and returns true if the node * is a range table relation entry that points to a distributed relation, diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 8f0ae7fb3..2b97da267 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -874,14 +874,8 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer { ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList); } - RangeTblEntry *resultRte = ExtractResultRelationRTE(queryTree); - Oid resultRelationId = InvalidOid; - if (resultRte) - { - resultRelationId = resultRte->relid; - } - bool containsTableToBeConvertedToSubquery = - ContainsTableToBeConvertedToSubquery(queryTree->rtable, resultRelationId); + bool containsLocalTableDistributedTableJoin = + ContainsLocalTableDistributedTableJoin(queryTree->rtable); RangeTblEntry *rangeTableEntry = NULL; foreach_ptr(rangeTableEntry, rangeTableList) @@ -907,7 +901,7 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer else { if (IsRelationLocalTableOrMatView(rangeTableEntry->relid) && - containsTableToBeConvertedToSubquery) + containsLocalTableDistributedTableJoin) { StringInfo errorMessage = makeStringInfo(); char *relationName = get_rel_name(rangeTableEntry->relid); diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 1ef1371a3..7fe70a4f3 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -156,7 +156,6 @@ static bool ShouldRecursivelyPlanSubquery(Query *subquery, static bool AllDistributionKeysInSubqueryAreEqual(Query *subquery, PlannerRestrictionContext * restrictionContext); -static bool AllDataLocallyAccessible(List *rangeTableList); static bool IsTableLocallyAccessible(Oid relationId); static bool ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *context); @@ -182,7 +181,6 @@ static Query * BuildReadIntermediateResultsQuery(List *targetEntryList, static void UpdateVarNosInQualForSubquery(Query *query); static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList, Oid resultRelationId); -static bool ContainsOnlyReferenceAndCitusLocalRelation(List *rangeTableList); /* * GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs. @@ -1427,22 +1425,10 @@ UpdateVarNosInQualForSubquery(Query *query) bool ContainsTableToBeConvertedToSubquery(List *rangeTableList, Oid resultRelationId) { - if (AllDataLocallyAccessible(rangeTableList)) - { - return false; - } - if (ContainsOnlyReferenceAndCitusLocalRelation(rangeTableList)) - { - return false; - } if (ContainsLocalTableDistributedTableJoin(rangeTableList)) { return true; } - if (ContainsLocalTableSubqueryJoin(rangeTableList, resultRelationId)) - { - return true; - } if (ModifiesLocalTableWithRemoteCitusLocalTable(rangeTableList, resultRelationId)) { return true; @@ -1485,37 +1471,6 @@ ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList, Oid resultRela } -/* - * AllDataLocallyAccessible return true if all data for the relations in the - * rangeTableList is locally accessible. - */ -static bool -AllDataLocallyAccessible(List *rangeTableList) -{ - RangeTblEntry *rangeTableEntry = NULL; - foreach_ptr(rangeTableEntry, rangeTableList) - { - if (rangeTableEntry->rtekind == RTE_SUBQUERY) - { - /* TODO:: check if it has distributed table */ - return false; - } - if (!IsRecursivelyPlannableRelation(rangeTableEntry)) - { - continue; - } - - Oid relationId = rangeTableEntry->relid; - if (!IsTableLocallyAccessible(relationId)) - { - return false; - } - } - - return true; -} - - /* * IsTableLocallyAccessible returns true if the given table * can be accessed in local. @@ -1568,7 +1523,8 @@ IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry) /* * ContainsLocalTableDistributedTableJoin returns true if the input range table list - * contains a direct join between local and distributed tables. + * contains a direct join between local RTE and an RTE that contains a distributed + * or reference table. */ bool ContainsLocalTableDistributedTableJoin(List *rangeTableList) @@ -1576,22 +1532,16 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList) bool containsLocalTable = false; bool containsDistributedTable = false; - ListCell *rangeTableCell = NULL; - foreach(rangeTableCell, rangeTableList) + RangeTblEntry *rangeTableEntry = NULL; + foreach_ptr(rangeTableEntry, rangeTableList) { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - - if (!IsRecursivelyPlannableRelation(rangeTableEntry)) - { - continue; - } - - if (IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE) || - IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE)) + if (FindNodeMatchingCheckFunctionInRangeTableList(list_make1(rangeTableEntry), + IsDistributedOrReferenceTableRTE)) { containsDistributedTable = true; } - else if (IsLocalTableRteOrMatView((Node *) rangeTableEntry)) + else if (IsRecursivelyPlannableRelation(rangeTableEntry) && + IsLocalTableRteOrMatView((Node *) rangeTableEntry)) { /* we consider citus local tables as local table */ containsLocalTable = true; @@ -1601,69 +1551,6 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList) return containsLocalTable && containsDistributedTable; } - -/* - * ContainsOnlyReferenceAndCitusLocalRelation returns true if there is no - * relation other than citus local and reference tables - */ -static bool -ContainsOnlyReferenceAndCitusLocalRelation(List *rangeTableList) -{ - RangeTblEntry *rangeTableEntry = NULL; - foreach_ptr(rangeTableEntry, rangeTableList) - { - if (!IsRecursivelyPlannableRelation(rangeTableEntry)) - { - continue; - } - - if (!IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE) && - !IsCitusTableType(rangeTableEntry->relid, CITUS_LOCAL_TABLE)) - { - return false; - } - } - - return true; -} - - -/* - * ContainsLocalTableSubqueryJoin returns true if the input range table list - * contains a direct join between local table/citus local table and subquery. - */ -bool -ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId) -{ - bool containsLocalTable = false; - bool containsSubquery = false; - - ListCell *rangeTableCell = NULL; - foreach(rangeTableCell, rangeTableList) - { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - - if (rangeTableEntry->rtekind == RTE_SUBQUERY) - { - containsSubquery = true; - } - - if (!IsRecursivelyPlannableRelation(rangeTableEntry)) - { - continue; - } - - if (IsLocalTableRteOrMatView((Node *) rangeTableEntry) && - rangeTableEntry->relid != resultRelationId) - { - containsLocalTable = true; - } - } - - return containsLocalTable && containsSubquery; -} - - /* * WrapFunctionsInSubqueries iterates over all the immediate Range Table Entries * of a query and wraps the functions inside (SELECT * FROM fnc() f) diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index c9b86163f..ea4fe336d 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -145,10 +145,6 @@ static Index RelationRestrictionPartitionKeyIndex(RelationRestriction * static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext * restrictionContext); static bool IsParam(Node *node); -static RelationRestrictionContext * FilterRelationRestrictionContext( - RelationRestrictionContext *relationRestrictionContext, - Relids - queryRteIdentities); static JoinRestrictionContext * FilterJoinRestrictionContext( JoinRestrictionContext *joinRestrictionContext, Relids queryRteIdentities); @@ -1944,7 +1940,7 @@ IsParam(Node *node) * in the queryRteIdentities and returns a newly allocated * RelationRestrictionContext. */ -static RelationRestrictionContext * +RelationRestrictionContext * FilterRelationRestrictionContext(RelationRestrictionContext *relationRestrictionContext, Relids queryRteIdentities) { diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 993e8b819..f8d1811a2 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -195,6 +195,7 @@ extern bool TargetListOnPartitionColumn(Query *query, List *targetEntryList); extern bool FindNodeMatchingCheckFunctionInRangeTableList(List *rtable, bool (*check)( Node *)); extern bool IsCitusTableRTE(Node *node); +extern bool IsDistributedOrReferenceTableRTE(Node *node); extern bool IsDistributedTableRTE(Node *node); extern bool IsReferenceTableRTE(Node *node); extern bool QueryContainsDistributedTableRTE(Query *query); diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index f58fa48f5..5a2484665 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -53,7 +53,6 @@ extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList, List *requiredAttrNumbers, RecursivePlanningContext *context); -extern bool ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId); extern bool ContainsTableToBeConvertedToSubquery(List *rangeTableList, Oid resultRelationId); extern bool IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry); diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index 4aed46a2c..e9bdb9adc 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -47,5 +47,7 @@ extern JoinRestrictionContext * RemoveDuplicateJoinRestrictions(JoinRestrictionC extern bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList, RelationRestrictionContext * restrictionContext); - +extern RelationRestrictionContext * +FilterRelationRestrictionContext(RelationRestrictionContext *relationRestrictionContext, + Relids queryRteIdentities); #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ diff --git a/src/test/regress/expected/citus_local_tables_queries.out b/src/test/regress/expected/citus_local_tables_queries.out index 973237f85..0529d6f02 100644 --- a/src/test/regress/expected/citus_local_tables_queries.out +++ b/src/test/regress/expected/citus_local_tables_queries.out @@ -704,7 +704,8 @@ UPDATE reference_table SET b = 6 FROM citus_local_table WHERE citus_local_table.a = reference_table.a; -NOTICE: executing the command locally: UPDATE citus_local_table_queries.reference_table_1509002 reference_table SET b = 6 FROM citus_local_table_queries.citus_local_table_1509000 citus_local_table WHERE (citus_local_table.a OPERATOR(pg_catalog.=) reference_table.a) +NOTICE: executing the command locally: SELECT a, NULL::integer AS b FROM citus_local_table_queries.citus_local_table_1509000 citus_local_table WHERE true OFFSET 0 +NOTICE: executing the command locally: UPDATE citus_local_table_queries.reference_table_1509002 reference_table SET b = 6 FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_local_table WHERE (citus_local_table.a OPERATOR(pg_catalog.=) reference_table.a) -- should not work, add HINT use CTEs UPDATE citus_local_table SET b = 6 @@ -716,7 +717,8 @@ UPDATE citus_local_table SET b = 6 FROM reference_table WHERE citus_local_table.a = reference_table.a; -NOTICE: executing the command locally: UPDATE citus_local_table_queries.citus_local_table_1509000 citus_local_table SET b = 6 FROM citus_local_table_queries.reference_table_1509002 reference_table WHERE (citus_local_table.a OPERATOR(pg_catalog.=) reference_table.a) +NOTICE: executing the command locally: SELECT a, NULL::integer AS b FROM citus_local_table_queries.reference_table_1509002 reference_table WHERE true OFFSET 0 +NOTICE: executing the command locally: UPDATE citus_local_table_queries.citus_local_table_1509000 citus_local_table SET b = 6 FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) reference_table WHERE (citus_local_table.a OPERATOR(pg_catalog.=) reference_table.a) -- should not work, add HINT use CTEs DELETE FROM distributed_table USING citus_local_table @@ -730,12 +732,14 @@ NOTICE: executing the command locally: DELETE FROM citus_local_table_queries.ci DELETE FROM reference_table USING citus_local_table WHERE citus_local_table.a = reference_table.a; -NOTICE: executing the command locally: DELETE FROM citus_local_table_queries.reference_table_1509002 reference_table USING citus_local_table_queries.citus_local_table_1509000 citus_local_table WHERE (citus_local_table.a OPERATOR(pg_catalog.=) reference_table.a) +NOTICE: executing the command locally: SELECT a, NULL::integer AS b FROM citus_local_table_queries.citus_local_table_1509000 citus_local_table WHERE true OFFSET 0 +NOTICE: executing the command locally: DELETE FROM citus_local_table_queries.reference_table_1509002 reference_table USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_local_table WHERE (citus_local_table.a OPERATOR(pg_catalog.=) reference_table.a) -- should work, add HINT use CTEs DELETE FROM citus_local_table USING reference_table WHERE citus_local_table.a = reference_table.a; -NOTICE: executing the command locally: DELETE FROM citus_local_table_queries.citus_local_table_1509000 citus_local_table USING citus_local_table_queries.reference_table_1509002 reference_table WHERE (citus_local_table.a OPERATOR(pg_catalog.=) reference_table.a) +NOTICE: executing the command locally: SELECT a, NULL::integer AS b FROM citus_local_table_queries.reference_table_1509002 reference_table WHERE true OFFSET 0 +NOTICE: executing the command locally: DELETE FROM citus_local_table_queries.citus_local_table_1509000 citus_local_table USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) reference_table WHERE (citus_local_table.a OPERATOR(pg_catalog.=) reference_table.a) -- just works DELETE FROM citus_local_table WHERE citus_local_table.a IN (SELECT a FROM distributed_table); @@ -743,7 +747,8 @@ NOTICE: executing the command locally: DELETE FROM citus_local_table_queries.ci -- just works DELETE FROM citus_local_table WHERE citus_local_table.a IN (SELECT a FROM reference_table); -NOTICE: executing the command locally: DELETE FROM citus_local_table_queries.citus_local_table_1509000 citus_local_table WHERE (a OPERATOR(pg_catalog.=) ANY (SELECT reference_table.a FROM citus_local_table_queries.reference_table_1509002 reference_table)) +NOTICE: executing the command locally: SELECT a FROM citus_local_table_queries.reference_table_1509002 reference_table +NOTICE: executing the command locally: DELETE FROM citus_local_table_queries.citus_local_table_1509000 citus_local_table WHERE (a OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer))) -- just works WITH distributed_table_cte AS (SELECT * FROM distributed_table) UPDATE citus_local_table @@ -757,7 +762,8 @@ UPDATE citus_local_table SET b = 6 FROM reference_table_cte WHERE citus_local_table.a = reference_table_cte.a; -NOTICE: executing the command locally: WITH reference_table_cte AS (SELECT reference_table.a, reference_table.b FROM citus_local_table_queries.reference_table_1509002 reference_table) UPDATE citus_local_table_queries.citus_local_table_1509000 citus_local_table SET b = 6 FROM reference_table_cte WHERE (citus_local_table.a OPERATOR(pg_catalog.=) reference_table_cte.a) +NOTICE: executing the command locally: SELECT a, b FROM citus_local_table_queries.reference_table_1509002 reference_table +NOTICE: executing the command locally: UPDATE citus_local_table_queries.citus_local_table_1509000 citus_local_table SET b = 6 FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) reference_table_cte WHERE (citus_local_table.a OPERATOR(pg_catalog.=) reference_table_cte.a) --------------------------------------------------------------------- ----- VIEW QUERIES ----- --------------------------------------------------------------------- diff --git a/src/test/regress/expected/local_distributed_table_join.out b/src/test/regress/expected/local_distributed_table_join.out new file mode 100644 index 000000000..e69de29bb diff --git a/src/test/regress/expected/local_table_join.out b/src/test/regress/expected/local_table_join.out index a52c174f2..7710453ed 100644 --- a/src/test/regress/expected/local_table_join.out +++ b/src/test/regress/expected/local_table_join.out @@ -145,8 +145,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c -- partititoned local tables should work as well SELECT count(*) FROM distributed_table JOIN local_partitioned_table USING(key); -DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) count --------------------------------------------------------------------- @@ -154,8 +154,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) SELECT count(*) FROM reference_table JOIN local_partitioned_table USING(key); -DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.reference_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) count --------------------------------------------------------------------- @@ -163,8 +163,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) SELECT count(*) FROM distributed_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key); -DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.reference_table USING (key)) count --------------------------------------------------------------------- @@ -283,8 +283,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key); -DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) count --------------------------------------------------------------------- @@ -292,8 +292,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) WHERE distributed_partitioned_table.key = 10; -DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) WHERE (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10) count --------------------------------------------------------------------- @@ -301,8 +301,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) SELECT count(*) FROM distributed_partitioned_table JOIN local_partitioned_table USING(key) JOIN reference_table USING (key); -DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.reference_table USING (key)) count --------------------------------------------------------------------- @@ -313,8 +313,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c SELECT COUNT(*) FROM postgres_table join distributed_table_pkey using(key) join local_partitioned_table using(key) join distributed_table using(key) where distributed_table_pkey.key = 5; DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 -DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.distributed_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) count --------------------------------------------------------------------- @@ -324,8 +324,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c SELECT COUNT(*) FROM postgres_table join local_partitioned_table using(key) join distributed_table_pkey using(key) join distributed_table using(key) where distributed_table_pkey.key = 5; DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 -DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.distributed_table_pkey USING (key)) JOIN local_table_join.distributed_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) count --------------------------------------------------------------------- @@ -335,8 +335,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c SELECT COUNT(*) FROM postgres_table join distributed_table using(key) join local_partitioned_table using(key) join distributed_table_pkey using(key) where distributed_table_pkey.key = 5; DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 -DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) JOIN local_table_join.distributed_table_pkey USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) count --------------------------------------------------------------------- @@ -346,8 +346,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c SELECT COUNT(*) FROM distributed_table_pkey join distributed_table using(key) join postgres_table using(key) join local_partitioned_table using(key) where distributed_table_pkey.key = 5; DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 -DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_2 for subquery SELECT NULL::integer AS key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE true OFFSET 0 +DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE (key OPERATOR(pg_catalog.=) 5) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.distributed_table_pkey JOIN local_table_join.distributed_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) count --------------------------------------------------------------------- @@ -935,7 +935,7 @@ SET client_min_messages to ERROR; SELECT master_add_node('localhost', :master_port, groupId => 0); master_add_node --------------------------------------------------------------------- - 5 + 5 (1 row) CREATE TABLE citus_local(key int, value text); @@ -1090,6 +1090,9 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c -- TODO:: we should support this? UPDATE reference_table SET key = 1 FROM postgres_table WHERE postgres_table.key = 10; +DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.reference_table SET key = 1 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (postgres_table.key OPERATOR(pg_catalog.=) 10) ERROR: relation postgres_table is not distributed UPDATE reference_table SET key = 1 FROM (SELECT * FROM postgres_table) l WHERE l.key = 10; DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table diff --git a/src/test/regress/sql/local_distributed_table_join.sql b/src/test/regress/sql/local_distributed_table_join.sql new file mode 100644 index 000000000..78ff90614 --- /dev/null +++ b/src/test/regress/sql/local_distributed_table_join.sql @@ -0,0 +1,371 @@ +CREATE SCHEMA local_distributed_table_join; +SET search_path TO local_distributed_table_join; + + + +CREATE TABLE distributed (id bigserial PRIMARY KEY, + name text, + created_at timestamptz DEFAULT now()); +CREATE TABLE reference (id bigserial PRIMARY KEY, + title text); + +CREATE TABLE local (id bigserial PRIMARY KEY, + title text); + +-- these above restrictions brought us to the following schema +SELECT create_reference_table('reference'); +SELECT create_distributed_table('distributed', 'id'); + +INSERT INTO distributed SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO reference SELECT i, i::text FROM generate_series(0,100)i; +INSERT INTO local SELECT i, i::text FROM generate_series(0,100)i; + + +-- very simple 1-1 Joins +SELECT count(*) FROM distributed JOIN local USING (id); +SELECT count(*) FROM distributed JOIN local ON (name = title); +SELECT count(*) FROM distributed d1 JOIN local ON (name = d1.id::text); +SELECT count(*) FROM distributed d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int); +SELECT count(*) FROM distributed d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; +SELECT count(*) FROM distributed JOIN local USING (id) WHERE false; +SELECT count(*) FROM distributed d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 OR True; +SELECT count(*) FROM distributed d1 JOIN local ON (name::int + local.id > d1.id AND d1.id < local.title::int) WHERE d1.id = 1; +SELECT count(*) FROM distributed JOIN local ON (hashtext(name) = hashtext(title)); +SELECT hashtext(local.id::text) FROM distributed JOIN local ON (hashtext(name) = hashtext(title)) ORDER BY 1 LIMIT 4; +SELECT '' as "xxx", local.*, 'xxx' as "test" FROM distributed JOIN local ON (hashtext(name) = hashtext(title)) ORDER BY 1,2,3 LIMIT 4; +SELECT local.title, count(*) FROM distributed JOIN local USING (id) GROUP BY 1 ORDER BY 1, 2 DESC LIMIT 5; +SELECT distributed.id as id1, local.id as id2 FROM distributed JOIN local USING(id) ORDER BY distributed.id + local.id LIMIT 5; +SELECT distributed.id as id1, local.id as id2, count(*) FROM distributed JOIN local USING(id) GROUP BY distributed.id, local.id ORDER BY 1,2 LIMIT 5; + + +-- basic subqueries that cannot be pulled up +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local USING (id); +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = title); +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text); +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int); +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 AND false; +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 OR true; + +-- pull up subqueries as they are pretty simple, local table should be recursively planned +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local USING (id); +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = title); +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text); +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int); +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 AND false; +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 OR true; +SELECT count(*) FROM (SELECT * FROM distributed WHERE id = 2) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; +SELECT count(*) FROM (SELECT * FROM distributed WHERE false) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; + +-- TEMPORARY table +CREATE TEMPORARY TABLE temp_local AS SELECT * FROM local; +SELECT count(*) FROM distributed JOIN temp_local USING (id); + +-- UNLOGGED table +CREATE UNLOGGED TABLE unlogged_local AS SELECT * FROM local; +SELECT count(*) FROM distributed JOIN unlogged_local USING (id); + +-- mat view +CREATE MATERIALIZED VIEW mat_view AS SELECT * FROM local; +SELECT count(*) FROM distributed JOIN mat_view USING (id); + +CREATE VIEW local_regular_view AS SELECT * FROM local; +CREATE VIEW dist_regular_view AS SELECT * FROM distributed; + +SELECT count(*) FROM distributed JOIN local_regular_view USING (id); +SELECT count(*) FROM local JOIN dist_regular_view USING (id); +SELECT count(*) FROM dist_regular_view JOIN local_regular_view USING (id); + + +-- join alias/table alias +SELECT COUNT(*) FROM (distributed JOIN local USING (id)) AS t(a,b,c,d) ORDER BY d,c,a,b LIMIT 3; +SELECT COUNT(*) FROM (distributed d1(x,y,y1) JOIN local l1(x,t) USING (x)) AS t(a,b,c,d) ORDER BY d,c,a,b LIMIT 3; + +-- final queries are pushdown queries +SELECT sum(d1.id + local.id) FROM distributed d1 JOIN local USING (id); +SELECT sum(d1.id + local.id) OVER (PARTITION BY d1.id) FROM distributed d1 JOIN local USING (id) ORDER BY 1 DESC LIMIT 4; +SELECT count(*) FROM distributed d1 JOIN local USING (id) LEFT JOIN distributed d2 USING (id) ORDER BY 1 DESC LIMIT 4; +SELECT count(DISTINCT d1.name::int * local.id) FROM distributed d1 JOIN local USING (id); + +-- final queries are router queries +SELECT sum(d1.id + local.id) FROM distributed d1 JOIN local USING (id) WHERE d1.id = 1; +SELECT sum(d1.id + local.id) OVER (PARTITION BY d1.id) FROM distributed d1 JOIN local USING (id) WHERE d1.id = 1 ORDER BY 1 DESC LIMIT 4; +SELECT count(*) FROM distributed d1 JOIN local USING (id) LEFT JOIN distributed d2 USING (id) WHERE d2.id = 1 ORDER BY 1 DESC LIMIT 4; + +-- final queries are pull to coordinator queries +SELECT sum(d1.id + local.id) OVER (PARTITION BY d1.id + local.id) FROM distributed d1 JOIN local USING (id) ORDER BY 1 DESC LIMIT 4; + + + +-- nested subqueries +SELECT + count(*) +FROM + (SELECT * FROM (SELECT * FROM distributed) as foo) as bar + JOIN + local + USING(id); + + +SELECT + count(*) +FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar + JOIN + local + USING(id); + +SELECT + count(*) +FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar + JOIN + local + USING(id); +SELECT + count(*) +FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar + JOIN + (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 + USING(id); + +-- TODO: Unnecessary recursive planning for local +SELECT + count(*) +FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed LIMIT 1) as foo) as bar + JOIN + (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 + USING(id); + +-- subqueries in WHERE clause +-- is not colocated, and the JOIN inside as well. +-- so should be recursively planned twice +SELECT + count(*) +FROM + distributed +WHERE + id > (SELECT + count(*) + FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar + JOIN + (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 + USING(id) + ); + +-- two distributed tables are co-located and JOINed on distribution +-- key, so should be fine to pushdown +SELECT + count(*) +FROM + distributed d_upper +WHERE + (SELECT + bar.id + FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar + JOIN + (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 + USING(id) + ) IS NOT NULL; + +SELECT + count(*) +FROM + distributed d_upper +WHERE + (SELECT + bar.id + FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar + JOIN + local as foo + USING(id) + ) IS NOT NULL; + +SELECT + count(*) +FROM + distributed d_upper +WHERE d_upper.id > + (SELECT + bar.id + FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar + JOIN + local as foo + USING(id) + ); + +SELECT + count(*) +FROM + distributed d_upper +WHERE + (SELECT + bar.id + FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar + JOIN + (SELECT *, random() FROM (SELECT *,random() FROM local WHERE d_upper.id = id) as foo2) as bar2 + USING(id) + ) IS NOT NULL; + + + + +-- subqueries in the target list + +-- router, should work +select (SELECT local.id) FROM local, distributed WHERE distributed.id = 1 LIMIT 1; + +-- should fail +select (SELECT local.id) FROM local, distributed WHERE distributed.id != 1 LIMIT 1; + +-- currently not supported, but should work with https://github.com/citusdata/citus/pull/4360/files +SELECT + name, (SELECT id FROM local WHERE id = e.id) +FROM + distributed e +ORDER BY 1,2 LIMIT 1; + + +-- set operations + +SELECT local.* FROM distributed JOIN local USING (id) + EXCEPT +SELECT local.* FROM distributed JOIN local USING (id); + +SELECT distributed.* FROM distributed JOIN local USING (id) + EXCEPT +SELECT distributed.* FROM distributed JOIN local USING (id); + + +SELECT count(*) FROM +( + (SELECT * FROM (SELECT * FROM local) as f JOIN distributed USING (id)) + UNION ALL + (SELECT * FROM (SELECT * FROM local) as f2 JOIN distributed USING (id)) +) bar; + +SELECT count(*) FROM +( + (SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as fo) + UNION ALL + (SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as ba) +) bar; + +select count(DISTINCT id) +FROM +( + (SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as fo) + UNION ALL + (SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as ba) +) bar; + +-- 25 Joins +select ' select count(*) from distributed ' || string_Agg('INNER +JOIN local u'|| x::text || ' USING (id)',' ') from +generate_Series(1,25)x; +\gexec + +select ' select count(*) from distributed ' || string_Agg('INNER +JOIN local u'|| x::text || ' ON (false)',' ') from +generate_Series(1,25)x; +\gexec + +select ' select count(*) from local ' || string_Agg('INNER +JOIN distributed u'|| x::text || ' USING (id)',' ') from +generate_Series(1,25)x; +\gexec + +select ' select count(*) from local ' || string_Agg('INNER +JOIN distributed u'|| x::text || ' ON (false)',' ') from +generate_Series(1,25)x; +\gexec + +-- lateral joins + +SELECT COUNT(*) FROM (VALUES (1), (2), (3)) as f(x) LATERAL JOIN (SELECT * FROM local WHERE id = x) as bar; + +SELECT COUNT(*) FROM local JOIN LATERAL (SELECT * FROM distributed WHERE local.id = distributed.id) as foo ON (true); +SELECT COUNT(*) FROM local JOIN LATERAL (SELECT * FROM distributed WHERE local.id > distributed.id) as foo ON (true); + +SELECT COUNT(*) FROM distributed JOIN LATERAL (SELECT * FROM local WHERE local.id = distributed.id) as foo ON (true); +SELECT COUNT(*) FROM distributed JOIN LATERAL (SELECT * FROM local WHERE local.id > distributed.id) as foo ON (true); + + + + +SELECT count(*) FROM distributed CROSS JOIN local; +SELECT count(*) FROM distributed CROSS JOIN local WHERE distributed.id = 1; + +-- w count(*) it works fine as PG ignores the inner tables +SELECT count(*) FROM distributed LEFT JOIN local USING (id); +SELECT count(*) FROM local LEFT JOIN distributed USING (id); + +SELECT * FROM distributed LEFT JOIN local USING (id) LIMIT 1; +SELECT * FROM local LEFT JOIN distributed USING (id) LIMIT 1; + + SELECT + foo1.id, random() + FROM + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo9, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo8, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo7, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo6, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo5, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo4, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo3, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo2, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo10, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo1 + WHERE + foo1.id = foo9.id AND + foo1.id = foo8.id AND + foo1.id = foo7.id AND + foo1.id = foo6.id AND + foo1.id = foo5.id AND + foo1.id = foo4.id AND + foo1.id = foo3.id AND + foo1.id = foo2.id AND + foo1.id = foo10.id AND + foo1.id = foo1.id ; + + SELECT + foo1.id + FROM + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo1, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo2, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo3, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo4, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo5 + WHERE + foo1.id = foo4.id AND + foo1.id = foo2.id AND + foo1.id = foo3.id AND + foo1.id = foo4.id AND + foo1.id = foo5.id; + + SELECT + foo1.id + FROM + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 1) as foo1, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 2) as foo2, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 3) as foo3, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 4) as foo4, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 5) as foo5 + WHERE + foo1.id = foo4.id AND + foo1.id = foo2.id AND + foo1.id = foo3.id AND + foo1.id = foo4.id AND + foo1.id = foo5.id; + + + +DROP SCHEMA local_distributed_table_join CASCADE; \ No newline at end of file diff --git a/src/test/regress/sql/local_table_join.sql b/src/test/regress/sql/local_table_join.sql index 5ae9b6f59..c3786cc4f 100644 --- a/src/test/regress/sql/local_table_join.sql +++ b/src/test/regress/sql/local_table_join.sql @@ -141,6 +141,9 @@ SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_t SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON postgres_table.key = 10; +select count(*) FROM postgres_table JOIN (SELECT a.key,random() FROM distributed_table a JOIN distributed_table b USING(key)) as foo USING(key); +select count(*) FROM (SELECT a.key, random() FROM distributed_table a JOIN distributed_table b USING(key)) as foo JOIN postgres_table USING(key); + SELECT count(*) FROM postgres_table JOIN (SELECT * FROM distributed_table) d1 USING(key); -- since this is already router plannable, we don't recursively plan the postgres table SELECT count(*) FROM postgres_table JOIN (SELECT * FROM distributed_table LIMIT 1) d1 USING(key);