From 3f4952cc2b5f931f2cf2aa603a72f9178de03073 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 27 Aug 2020 08:42:47 +0200 Subject: [PATCH] Pushdown projections when relations are recursively planned This is important to limit the data transfer size. --- .../planner/query_colocation_checker.c | 12 ++- .../distributed/planner/recursive_planning.c | 96 ++++++++++++++--- .../distributed/query_colocation_checker.h | 3 +- .../regress/expected/local_table_join.out | 100 +++++++++--------- 4 files changed, 146 insertions(+), 65 deletions(-) diff --git a/src/backend/distributed/planner/query_colocation_checker.c b/src/backend/distributed/planner/query_colocation_checker.c index f0c11cbc0..62905396d 100644 --- a/src/backend/distributed/planner/query_colocation_checker.c +++ b/src/backend/distributed/planner/query_colocation_checker.c @@ -76,7 +76,7 @@ CreateColocatedJoinChecker(Query *subquery, PlannerRestrictionContext *restricti * functions (i.e., FilterPlannerRestrictionForQuery()) rely on queries * not relations. */ - anchorSubquery = WrapRteRelationIntoSubquery(anchorRangeTblEntry); + anchorSubquery = WrapRteRelationIntoSubquery(anchorRangeTblEntry, NIL); } else if (anchorRangeTblEntry->rtekind == RTE_SUBQUERY) { @@ -260,7 +260,7 @@ SubqueryColocated(Query *subquery, ColocatedJoinChecker *checker) * designed for generating a stub query. */ Query * -WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation) +WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation, List *requiredAttributes) { Query *subquery = makeNode(Query); RangeTblRef *newRangeTableRef = makeNode(RangeTblRef); @@ -291,6 +291,14 @@ WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation) makeTargetEntry((Expr *) targetColumn, attributeNumber, strdup(attributeTuple->attname.data), false); + if (!list_member_int(requiredAttributes, attributeNumber)) + { + targetEntry->expr = + (Expr *) makeNullConst(attributeTuple->atttypid, + attributeTuple->atttypmod, + attributeTuple->attcollation); + } + subquery->targetList = lappend(subquery->targetList, targetEntry); } diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index c9744e575..d71e0dab8 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -76,6 +76,11 @@ #include "distributed/version_compat.h" #include "lib/stringinfo.h" #include "optimizer/clauses.h" +#if PG_VERSION_NUM >= PG_VERSION_12 +#include "optimizer/optimizer.h" +#else +#include "optimizer/var.h" +#endif #include "optimizer/planner.h" #include "optimizer/prep.h" #include "parser/parsetree.h" @@ -187,14 +192,16 @@ static bool ContainsReferencesToOuterQueryWalker(Node *node, VarLevelsUpWalkerContext *context); static bool NodeContainsSubqueryReferencingOuterQuery(Node *node); static void ConvertLocalTableJoinsToSubqueries(Query *query, - PlannerRestrictionContext * - plannerRestrictionContext); + RecursivePlanningContext *planningContext); +static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, + RecursivePlanningContext *planningContext); static RangeTblEntry * MostFilteredRte(PlannerRestrictionContext * plannerRestrictionContext, List *rangeTableList, List **restrictionList, bool localTable); static void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, - List *restrictionList); + List *restrictionList, + List *requiredAttrNumbers); static bool AllDataLocallyAccessible(List *rangeTableList); static void WrapFunctionsInSubqueries(Query *query); static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry); @@ -312,7 +319,7 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context * Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", so we * recursively plan one side of the join so that the logical planner can plan. */ - ConvertLocalTableJoinsToSubqueries(query, context->plannerRestrictionContext); + ConvertLocalTableJoinsToSubqueries(query, context); /* descend into subqueries */ query_tree_walker(query, RecursivelyPlanSubqueryWalker, context, 0); @@ -1377,7 +1384,7 @@ NodeContainsSubqueryReferencingOuterQuery(Node *node) */ static void ConvertLocalTableJoinsToSubqueries(Query *query, - PlannerRestrictionContext *plannerRestrictionContext) + RecursivePlanningContext *context) { List *rangeTableList = query->rtable; @@ -1408,6 +1415,8 @@ ConvertLocalTableJoinsToSubqueries(Query *query, bool localTable = true; + PlannerRestrictionContext *plannerRestrictionContext = + context->plannerRestrictionContext; RangeTblEntry *mostFilteredLocalRte = MostFilteredRte(plannerRestrictionContext, rangeTableList, &localTableRestrictList, localTable); @@ -1415,6 +1424,12 @@ ConvertLocalTableJoinsToSubqueries(Query *query, MostFilteredRte(plannerRestrictionContext, rangeTableList, &distributedTableRestrictList, !localTable); + List *requiredAttrNumbersForLocalRte = + RequiredAttrNumbersForRelation(mostFilteredLocalRte, context); + List *requiredAttrNumbersForDistriutedRte = + RequiredAttrNumbersForRelation(mostFilteredDistributedRte, context); + + elog(DEBUG4, "Local relation with the most number of filters " "on it: \"%s\"", get_rel_name(mostFilteredLocalRte->relid)); elog(DEBUG4, "Distributed relation with the most number of filters " @@ -1423,12 +1438,14 @@ ConvertLocalTableJoinsToSubqueries(Query *query, if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PULL_LOCAL) { ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte, - localTableRestrictList); + localTableRestrictList, + requiredAttrNumbersForLocalRte); } else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PULL_DISTRIBUTED) { ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte, - distributedTableRestrictList); + distributedTableRestrictList, + requiredAttrNumbersForDistriutedRte); } else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_AUTO) { @@ -1447,7 +1464,8 @@ ConvertLocalTableJoinsToSubqueries(Query *query, * local relation, if exists. */ ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte, - distributedTableRestrictList); + distributedTableRestrictList, + requiredAttrNumbersForDistriutedRte); } else if (localTableHasFilter || !distributedTableHasFilter) { @@ -1468,12 +1486,14 @@ ConvertLocalTableJoinsToSubqueries(Query *query, * tuples. Today, we do not have such an infrastructure. */ ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte, - localTableRestrictList); + localTableRestrictList, + requiredAttrNumbersForLocalRte); } else { ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte, - distributedTableRestrictList); + distributedTableRestrictList, + requiredAttrNumbersForDistriutedRte); } } else @@ -1484,6 +1504,56 @@ ConvertLocalTableJoinsToSubqueries(Query *query, } +/* + * RequiredAttrNumbersForRelation returns the required attribute numbers for + * the input RTE relation in order for the planning to succeed. + * + * The function could be optimized by not adding the columns that only appear + * WHERE clause as a filter (e.g., not a join clause). + */ +static List * +RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, + RecursivePlanningContext *planningContext) +{ + PlannerRestrictionContext *plannerRestrictionContext = + planningContext->plannerRestrictionContext; + + /* TODO: Get rid of this hack, find relation restriction information directly */ + PlannerRestrictionContext *filteredPlannerRestrictionContext = + FilterPlannerRestrictionForQuery(plannerRestrictionContext, + WrapRteRelationIntoSubquery(relationRte, NIL)); + + RelationRestrictionContext *relationRestrictionContext = + filteredPlannerRestrictionContext->relationRestrictionContext; + List *filteredRelationRestrictionList = + relationRestrictionContext->relationRestrictionList; + RelationRestriction *relationRestriction = + (RelationRestriction *) linitial(filteredRelationRestrictionList); + + PlannerInfo *plannerInfo = relationRestriction->plannerInfo; + Query *queryToProcess = plannerInfo->parse; + int rteIndex = relationRestriction->index; + + List *allVarsInQuery = pull_vars_of_level((Node *) queryToProcess, 0); + ListCell *varCell = NULL; + + List *requiredAttrNumbers = NIL; + + foreach(varCell, allVarsInQuery) + { + Var *var = (Var *) lfirst(varCell); + + if (var->varno == rteIndex) + { + requiredAttrNumbers = list_append_unique_int(requiredAttrNumbers, + var->varattno); + } + } + + return requiredAttrNumbers; +} + + /* * MostFilteredRte returns a range table entry which has the most filters * on it along with the restrictions (e.g., fills **restrictionList). @@ -1543,9 +1613,10 @@ MostFilteredRte(PlannerRestrictionContext *plannerRestrictionContext, * with a subquery. The function also pushes down the filters to the subquery. */ static void -ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList) +ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList, + List *requiredAttrNumbers) { - Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry); + Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers); Expr *andedBoundExpressions = make_ands_explicit(restrictionList); subquery->jointree->quals = (Node *) andedBoundExpressions; @@ -1556,6 +1627,7 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict rangeTableEntry->rtekind = RTE_SUBQUERY; rangeTableEntry->subquery = subquery; + /* * If the relation is inherited, it'll still be inherited as * we've copied it earlier. This is to prevent the newly created diff --git a/src/include/distributed/query_colocation_checker.h b/src/include/distributed/query_colocation_checker.h index 2a27fa9f1..fc63522b2 100644 --- a/src/include/distributed/query_colocation_checker.h +++ b/src/include/distributed/query_colocation_checker.h @@ -34,7 +34,8 @@ extern ColocatedJoinChecker CreateColocatedJoinChecker(Query *subquery, PlannerRestrictionContext * restrictionContext); extern bool SubqueryColocated(Query *subquery, ColocatedJoinChecker *context); -extern Query * WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation); +extern Query * WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation, + List *requiredAttributes); #endif /* QUERY_COLOCATION_CHECKER_H */ diff --git a/src/test/regress/expected/local_table_join.out b/src/test/regress/expected/local_table_join.out index 38f6f9cb2..489d4be0f 100644 --- a/src/test/regress/expected/local_table_join.out +++ b/src/test/regress/expected/local_table_join.out @@ -25,8 +25,8 @@ ERROR: relation postgres_table is not distributed -- the user prefers local table recursively planned SET citus.local_table_join_policy TO 'pull-local'; SELECT count(*) FROM postgres_table JOIN distributed_table USING(key); -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table USING (key)) count --------------------------------------------------------------------- @@ -34,8 +34,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) SELECT count(*) FROM postgres_table JOIN reference_table USING(key); -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.reference_table USING (key)) count --------------------------------------------------------------------- @@ -45,8 +45,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c -- the user prefers distributed table recursively planned SET citus.local_table_join_policy TO 'pull-distributed'; SELECT count(*) FROM postgres_table JOIN distributed_table USING(key); -DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table USING (key)) count --------------------------------------------------------------------- @@ -54,8 +54,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) SELECT count(*) FROM postgres_table JOIN reference_table USING(key); -DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 +DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) reference_table USING (key)) count --------------------------------------------------------------------- @@ -68,8 +68,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c RESET citus.local_table_join_policy; -- on the default mode, the local tables should be recursively planned SELECT count(*) FROM distributed_table JOIN postgres_table USING(key); -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) count --------------------------------------------------------------------- @@ -77,8 +77,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) SELECT count(*) FROM reference_table JOIN postgres_table USING(key); -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.reference_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) count --------------------------------------------------------------------- @@ -86,8 +86,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) JOIN reference_table USING (key); -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) JOIN local_table_join.reference_table USING (key)) count --------------------------------------------------------------------- @@ -98,8 +98,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c -- if the distributed table has at least one filter, we prefer -- recursively planning of the distributed table SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test'; -DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.distributed_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.distributed_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table.value OPERATOR(pg_catalog.=) 'test'::text) count --------------------------------------------------------------------- @@ -109,8 +109,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c -- but if the filters can be pushed downn to the local table via the join -- we are smart about recursively planning the local table SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.key = 1; -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE (distributed_table.key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- @@ -119,8 +119,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c -- if both local and distributed tables have a filter, we prefer local SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' AND postgres_table.value = 'test'; -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE ((distributed_table.value OPERATOR(pg_catalog.=) 'test'::text) AND (postgres_table.value OPERATOR(pg_catalog.=) 'test'::text)) count --------------------------------------------------------------------- @@ -128,8 +128,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' OR postgres_table.value = 'test'; -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE ((distributed_table.value OPERATOR(pg_catalog.=) 'test'::text) OR (postgres_table.value OPERATOR(pg_catalog.=) 'test'::text)) count --------------------------------------------------------------------- @@ -139,10 +139,10 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c -- multiple local/distributed tables -- only local tables are recursively planned SELECT count(*) FROM distributed_table d1 JOIN postgres_table p1 USING(key) JOIN distributed_table d2 USING(key) JOIN postgres_table p2 USING(key); -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 -DEBUG: generating subplan XXX_2 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.distributed_table d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1 USING (key)) JOIN local_table_join.distributed_table d2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 USING (key)) count --------------------------------------------------------------------- @@ -158,12 +158,12 @@ FROM distributed_table d1 JOIN postgres_table p1 USING(key) JOIN distributed_table d2 USING(key) JOIN postgres_table p2 USING(key) WHERE d1.value = '1'; -DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.distributed_table d1 WHERE (value OPERATOR(pg_catalog.=) '1'::text) OFFSET 0 -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.distributed_table d1 WHERE (value OPERATOR(pg_catalog.=) '1'::text) OFFSET 0 -DEBUG: generating subplan XXX_2 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 -DEBUG: generating subplan XXX_3 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d1 WHERE (value OPERATOR(pg_catalog.=) '1'::text) OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d1 WHERE (value OPERATOR(pg_catalog.=) '1'::text) OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: generating subplan XXX_3 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1 USING (key)) JOIN local_table_join.distributed_table d2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 USING (key)) WHERE (d1.value OPERATOR(pg_catalog.=) '1'::text) count --------------------------------------------------------------------- @@ -178,10 +178,10 @@ FROM distributed_table d1 JOIN postgres_table p1 USING(key) JOIN distributed_table d2 USING(key) JOIN postgres_table p2 USING(key) WHERE d1.key = 1; -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table p1 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table p2 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table p1 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 -DEBUG: generating subplan XXX_2 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table p2 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.distributed_table d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1 USING (key)) JOIN local_table_join.distributed_table d2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 USING (key)) WHERE (d1.key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- @@ -197,8 +197,8 @@ FROM distributed_table WHERE distributed_table.key = postgres_table.key; -DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table WHERE (distributed_table.key OPERATOR(pg_catalog.=) postgres_table.key) UPDATE distributed_table @@ -208,8 +208,8 @@ FROM postgres_table WHERE distributed_table.key = postgres_table.key; -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table.key OPERATOR(pg_catalog.=) postgres_table.key) -- modifications with multiple tables UPDATE @@ -220,10 +220,10 @@ FROM postgres_table p1, postgres_table p2 WHERE distributed_table.key = p1.key AND p1.key = p2.key; -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 -DEBUG: generating subplan XXX_2 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1, (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 WHERE ((distributed_table.key OPERATOR(pg_catalog.=) p1.key) AND (p1.key OPERATOR(pg_catalog.=) p2.key)) UPDATE distributed_table @@ -233,8 +233,8 @@ FROM postgres_table p1, distributed_table d2 WHERE distributed_table.key = p1.key AND p1.key = d2.key; -DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1, local_table_join.distributed_table d2 WHERE ((distributed_table.key OPERATOR(pg_catalog.=) p1.key) AND (p1.key OPERATOR(pg_catalog.=) d2.key)) -- pretty inefficient plan as it requires -- recursive planninng of 2 distributed tables @@ -246,10 +246,10 @@ FROM distributed_table d1, distributed_table d2 WHERE postgres_table.key = d1.key AND d1.key = d2.key; -DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.distributed_table d1 WHERE true OFFSET 0 -DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, value, value_2 FROM local_table_join.distributed_table d2 WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.distributed_table d1 WHERE true OFFSET 0 -DEBUG: generating subplan XXX_2 for subquery SELECT key, value, value_2 FROM local_table_join.distributed_table d2 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d1 WHERE true OFFSET 0 +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d2 WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d1 WHERE true OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d2 WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1, (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d2 WHERE ((postgres_table.key OPERATOR(pg_catalog.=) d1.key) AND (d1.key OPERATOR(pg_catalog.=) d2.key)) \set VERBOSITY terse RESET client_min_messages;