diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index 650136834..1d8dfd9f2 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -74,25 +74,29 @@ typedef struct ConversionCandidates List *localTableList; /* local or citus local table */ }ConversionCandidates; +typedef struct IndexColumns +{ + List *indexColumnNos; +}IndexColumns; + static bool HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry, RelationRestriction *relationRestriction); static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, PlannerRestrictionContext * plannerRestrictionContext); -static ConversionCandidates * CreateConversionCandidates(FromExpr *joinTree, - PlannerRestrictionContext * +static ConversionCandidates * CreateConversionCandidates(PlannerRestrictionContext * plannerRestrictionContext, List *rangeTableList, Oid resultRelationId); static void GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes); -static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(FromExpr *joinTree, - ConversionCandidates * +static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(ConversionCandidates * conversionCandidates, PlannerRestrictionContext * plannerRestrictionContext); static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, int rteIdentity); static bool AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList); +static bool FirstIntListContainsSecondIntList(List *firstIntList, List *secondIntList); /* * RecursivelyPlanLocalTableJoins gets a query and the planner @@ -112,15 +116,14 @@ RecursivelyPlanLocalTableJoins(Query *query, resultRelationId = ModifyQueryResultRelationId(query); } ConversionCandidates *conversionCandidates = - CreateConversionCandidates(query->jointree, plannerRestrictionContext, + CreateConversionCandidates(plannerRestrictionContext, rangeTableList, resultRelationId); while (ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList, plannerRestrictionContext)) { - FromExpr *joinTree = query->jointree; RangeTableEntryDetails *rangeTableEntryDetails = - GetNextRTEToConvertToSubquery(joinTree, conversionCandidates, + GetNextRTEToConvertToSubquery(conversionCandidates, plannerRestrictionContext); if (rangeTableEntryDetails == NULL) { @@ -140,11 +143,10 @@ RecursivelyPlanLocalTableJoins(Query *query, /* * GetNextRTEToConvertToSubquery returns the range table entry * which should be converted to a subquery. It considers the local join policy - * and result relation. + * for conversion priorities. */ static RangeTableEntryDetails * -GetNextRTEToConvertToSubquery(FromExpr *joinTree, - ConversionCandidates *conversionCandidates, +GetNextRTEToConvertToSubquery(ConversionCandidates *conversionCandidates, PlannerRestrictionContext *plannerRestrictionContext) { RangeTableEntryDetails *localRTECandidate = NULL; @@ -169,6 +171,11 @@ GetNextRTEToConvertToSubquery(FromExpr *joinTree, } else { + /* + * We want to convert distributed tables only if all the distributed tables + * have a constant filter on a unique index, otherwise we would be redundantly + * converting a distributed table as we will convert all the other local tables. + */ bool allRangeTableEntriesHaveUniqueIndex = AllRangeTableEntriesHaveUniqueIndex( conversionCandidates->distributedTableList); @@ -284,15 +291,22 @@ HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry, } List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo; List *restrictClauseList = get_all_actual_clauses(baseRestrictionList); - List *rteEqualityQuals = + if (ContainsFalseClause(restrictClauseList)) + { + /* If there is a WHERE FALSE, we consider it as a constant filter. */ + return true; + } + List *rteEqualityColumnsNos = FetchEqualityAttrNumsForRTE((Node *) restrictClauseList); - List *uniqueIndexAttrNumbers = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid, + List *uniqueIndexColumnsList = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid, GetAllUniqueIndexes); - int columnNumber = 0; - foreach_int(columnNumber, uniqueIndexAttrNumbers) + IndexColumns *indexColumns = NULL; + foreach_ptr(indexColumns, uniqueIndexColumnsList) { - if (list_member_int(rteEqualityQuals, columnNumber)) + List *uniqueIndexColumnNos = indexColumns->indexColumnNos; + if (FirstIntListContainsSecondIntList(rteEqualityColumnsNos, + uniqueIndexColumnNos)) { return true; } @@ -301,6 +315,25 @@ HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry, } +/* + * FirstIntListContainsSecondIntList returns true if the first int List + * contains every element of the second int List. + */ +static bool +FirstIntListContainsSecondIntList(List *firstIntList, List *secondIntList) +{ + int curInt = 0; + foreach_int(curInt, secondIntList) + { + if (!list_member_int(firstIntList, curInt)) + { + return false; + } + } + return true; +} + + /* * GetAllUniqueIndexes adds the given index's column numbers if it is a * unique index. @@ -308,15 +341,23 @@ HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry, * probably return true only if all the columns in the index exist in the filter. */ static void -GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes) +GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexGroups) { if (indexForm->indisunique || indexForm->indisprimary) { + IndexColumns *indexColumns = palloc0(sizeof(IndexColumns)); + List *uniqueIndexes = NIL; for (int i = 0; i < indexForm->indkey.dim1; i++) { - *uniqueIndexes = list_append_unique_int(*uniqueIndexes, - indexForm->indkey.values[i]); + uniqueIndexes = list_append_unique_int(uniqueIndexes, + indexForm->indkey.values[i]); } + if (list_length(uniqueIndexes) == 0) + { + return; + } + indexColumns->indexColumnNos = uniqueIndexes; + *uniqueIndexGroups = lappend(*uniqueIndexGroups, indexColumns); } } @@ -367,8 +408,7 @@ RequiredAttrNumbersForRelation(RangeTblEntry *rangeTableEntry, * be converted to a subquery so that citus planners can work. */ static ConversionCandidates * -CreateConversionCandidates(FromExpr *joinTree, - PlannerRestrictionContext *plannerRestrictionContext, +CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext, List *rangeTableList, Oid resultRelationId) { ConversionCandidates *conversionCandidates = diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index ccfff1215..2bf343e1c 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -3665,11 +3665,13 @@ FetchEqualityAttrNumsForRTE(Node *node) return NIL; } + /* * FetchEqualityAttrNumsForList fetches the constant equality numbers * from the given node list. */ -static List *FetchEqualityAttrNumsForList(List *nodeList) +static List * +FetchEqualityAttrNumsForList(List *nodeList) { List *attributeNums = NIL; Node *node = NULL; @@ -3682,7 +3684,7 @@ static List *FetchEqualityAttrNumsForList(List *nodeList) attributeNums = list_concat(attributeNums, fetchedEqualityAttrNums); } - /* + /* * the given list is in the form of AND'ed expressions * hence if we have one equality then it is enough. * E.g: dist.a = 5 AND dist.a > 10 diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index fa7dbea9e..e0f69340a 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -1769,6 +1769,7 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry) subquery->targetList = lappend(subquery->targetList, targetEntry); } } + /* * If tupleDesc is NULL we have 2 different cases: * @@ -1818,6 +1819,7 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry) columnType = list_nth_oid(rangeTblFunction->funccoltypes, targetColumnIndex); } + /* use the types in the function definition otherwise */ else { diff --git a/src/test/regress/expected/citus_local_dist_joins.out b/src/test/regress/expected/citus_local_dist_joins.out index dbbf37bfe..950cb7464 100644 --- a/src/test/regress/expected/citus_local_dist_joins.out +++ b/src/test/regress/expected/citus_local_dist_joins.out @@ -1,12 +1,7 @@ CREATE SCHEMA citus_local_dist_joins; SET search_path TO citus_local_dist_joins; SET client_min_messages to ERROR; -SELECT master_add_node('localhost', :master_port, groupId => 0); - master_add_node ---------------------------------------------------------------------- - 5 -(1 row) - +SELECT master_add_node('localhost', :master_port, groupId => 0) AS coordinator_nodeid \gset CREATE TABLE citus_local(key int, value text); SELECT create_citus_local_table('citus_local'); create_citus_local_table diff --git a/src/test/regress/expected/citus_local_tables_queries.out b/src/test/regress/expected/citus_local_tables_queries.out index 6ccc25bf9..2cf429b7f 100644 --- a/src/test/regress/expected/citus_local_tables_queries.out +++ b/src/test/regress/expected/citus_local_tables_queries.out @@ -756,14 +756,14 @@ SET b = 6 FROM distributed_table_cte WHERE citus_local_table.a = distributed_table_cte.a; NOTICE: executing the command locally: UPDATE citus_local_table_queries.citus_local_table_1509000 citus_local_table SET b = 6 FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) distributed_table_cte WHERE (citus_local_table.a OPERATOR(pg_catalog.=) distributed_table_cte.a) +SET citus.log_local_commands to off; -- just works WITH reference_table_cte AS (SELECT * FROM reference_table) UPDATE citus_local_table SET b = 6 FROM reference_table_cte WHERE citus_local_table.a = reference_table_cte.a; -NOTICE: executing the command locally: SELECT a, b FROM citus_local_table_queries.reference_table_1509002 reference_table -NOTICE: executing the command locally: UPDATE citus_local_table_queries.citus_local_table_1509000 citus_local_table SET b = 6 FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) reference_table_cte WHERE (citus_local_table.a OPERATOR(pg_catalog.=) reference_table_cte.a) +set citus.log_local_commands to on; --------------------------------------------------------------------- ----- VIEW QUERIES ----- --------------------------------------------------------------------- diff --git a/src/test/regress/expected/local_table_join.out b/src/test/regress/expected/local_table_join.out index 647c1609e..3ccd32346 100644 --- a/src/test/regress/expected/local_table_join.out +++ b/src/test/regress/expected/local_table_join.out @@ -397,11 +397,82 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) --- TODO:: We should probably recursively plan postgres table here because primary key is on key,value not key. +-- We will plan postgres table as the index is on key,value not just key SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10; -DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (key OPERATOR(pg_catalog.=) 10) -DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (key OPERATOR(pg_catalog.=) 10) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_composite.key OPERATOR(pg_catalog.=) 10) +DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table_composite JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE (distributed_table_composite.key OPERATOR(pg_catalog.=) 10) + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10 OR distributed_table_composite.key = 20; +DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table_composite JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20)) + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key > 10 AND distributed_table_composite.value = 'text'; +DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table_composite JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.>) 10) AND (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10 AND distributed_table_composite.value = 'text'; +DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 10) AND (value OPERATOR(pg_catalog.=) 'text'::text)) +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 10) AND (value OPERATOR(pg_catalog.=) 'text'::text)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite JOIN local_table_join.postgres_table USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) + WHERE (distributed_table_composite.key > 10 OR distributed_table_composite.key = 20) + AND (distributed_table_composite.value = 'text' OR distributed_table_composite.value = 'text'); +DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table_composite JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE (((distributed_table_composite.key OPERATOR(pg_catalog.>) 10) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20)) AND ((distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) + WHERE (distributed_table_composite.key > 10 OR distributed_table_composite.value = 'text') + AND (distributed_table_composite.value = 'text' OR distributed_table_composite.key = 30); +DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (((key OPERATOR(pg_catalog.>) 10) OR (value OPERATOR(pg_catalog.=) 'text'::text)) AND ((value OPERATOR(pg_catalog.=) 'text'::text) OR (key OPERATOR(pg_catalog.=) 30))) +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (((key OPERATOR(pg_catalog.>) 10) OR (value OPERATOR(pg_catalog.=) 'text'::text)) AND ((value OPERATOR(pg_catalog.=) 'text'::text) OR (key OPERATOR(pg_catalog.=) 30))) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite JOIN local_table_join.postgres_table USING (key)) WHERE (((distributed_table_composite.key OPERATOR(pg_catalog.>) 10) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text)) AND ((distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 30))) + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) + WHERE (distributed_table_composite.key > 10 AND distributed_table_composite.value = 'text') + OR (distributed_table_composite.value = 'text' AND distributed_table_composite.key = 30); +DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table_composite JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE (((distributed_table_composite.key OPERATOR(pg_catalog.>) 10) AND (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text)) OR ((distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text) AND (distributed_table_composite.key OPERATOR(pg_catalog.=) 30))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) + WHERE (distributed_table_composite.key > 10 AND distributed_table_composite.key = 20) + OR (distributed_table_composite.value = 'text' AND distributed_table_composite.value = 'text'); +DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 20)) OR ((value OPERATOR(pg_catalog.=) 'text'::text) AND (value OPERATOR(pg_catalog.=) 'text'::text))) +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 20)) OR ((value OPERATOR(pg_catalog.=) 'text'::text) AND (value OPERATOR(pg_catalog.=) 'text'::text))) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite JOIN local_table_join.postgres_table USING (key)) WHERE (((distributed_table_composite.key OPERATOR(pg_catalog.>) 10) AND (distributed_table_composite.key OPERATOR(pg_catalog.=) 20)) OR ((distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text) AND (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text))) count --------------------------------------------------------------------- 1 @@ -764,8 +835,6 @@ ERROR: relation postgres_table is not distributed UPDATE reference_table SET key = 1 FROM (SELECT * FROM postgres_table) l WHERE l.key = 10; DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.reference_table SET key = 1 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) l WHERE (l.key OPERATOR(pg_catalog.=) 10) --- TODO:: we should probably not wrap postgres_table here as there is a WHERE FALSE? --- though then the planner could give an error SELECT count(*) FROM postgres_table JOIN distributed_table USING(key) WHERE FALSE; DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE false DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE false @@ -775,6 +844,244 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) +SELECT count(*) FROM (SELECT * FROM distributed_table JOIN postgres_table USING(key) WHERE false) foo JOIN local_partitioned_table USING(key); +DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE false +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE false +DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE false +DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE false +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table.key, distributed_table.value, distributed_table.value_2, postgres_table.value, postgres_table.value_2 FROM (local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE false) foo(key, value, value_2, value_1, value_2_1) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +WITH dist_cte AS (SELECT * FROM distributed_table_pkey WHERE key = 5) +SELECT COUNT(*) FROM dist_cte JOIN postgres_table USING(key) WHERE dist_cte.key = 5; +DEBUG: CTE dist_cte is going to be inlined via distributed planning +DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2 FROM local_table_join.distributed_table_pkey WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5)) dist_cte JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE (dist_cte.key OPERATOR(pg_catalog.=) 5) + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT COUNT(*) FROM postgres_table JOIN distributed_table_pkey USING(key) + WHERE (distributed_table_pkey.key IN (SELECT COUNT(*) AS count FROM postgres_table JOIN distributed_table USING(key)) ); +DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true +DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table USING (key)) +DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true +DEBUG: generating subplan XXX_3 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.count FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint))) + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- PREPARED statements +PREPARE local_dist_table_join_select(int) AS SELECT COUNT(*) FROM distributed_table_pkey JOIN postgres_table USING(key) WHERE distributed_table_pkey.key = $1; +EXECUTE local_dist_table_join_select(10); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) + count +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE local_dist_table_join_select(10); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) + count +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE local_dist_table_join_select(10); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) + count +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE local_dist_table_join_select(10); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) + count +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE local_dist_table_join_select(10); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) + count +--------------------------------------------------------------------- + 1 +(1 row) + +EXECUTE local_dist_table_join_select(10); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 10) + count +--------------------------------------------------------------------- + 1 +(1 row) + +PREPARE local_dist_table_join_update(int) AS UPDATE postgres_table SET key = 5 FROM distributed_table_pkey WHERE distributed_table_pkey.key = $1; +EXECUTE local_dist_table_join_update(20); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20) +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET key = 5 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20) +EXECUTE local_dist_table_join_update(20); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20) +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET key = 5 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20) +EXECUTE local_dist_table_join_update(20); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20) +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET key = 5 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20) +EXECUTE local_dist_table_join_update(20); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20) +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET key = 5 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20) +EXECUTE local_dist_table_join_update(20); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20) +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET key = 5 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20) +EXECUTE local_dist_table_join_update(20); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20) +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET key = 5 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20) +PREPARE local_dist_table_join_subquery(int) AS SELECT COUNT(*) FROM postgres_table JOIN (SELECT * FROM distributed_table_pkey JOIN local_partitioned_table USING(key) WHERE distributed_table_pkey.key = $1) foo USING(key); +EXECUTE local_dist_table_join_subquery(5); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: generating subplan XXX_2 for subquery SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2, local_partitioned_table.value FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2, intermediate_result.value_1 AS value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb, value_1 text)) foo(key, value, value_2, value_1) USING (key)) + count +--------------------------------------------------------------------- + 100 +(1 row) + +EXECUTE local_dist_table_join_subquery(5); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: generating subplan XXX_2 for subquery SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2, local_partitioned_table.value FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2, intermediate_result.value_1 AS value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb, value_1 text)) foo(key, value, value_2, value_1) USING (key)) + count +--------------------------------------------------------------------- + 100 +(1 row) + +EXECUTE local_dist_table_join_subquery(5); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: generating subplan XXX_2 for subquery SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2, local_partitioned_table.value FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2, intermediate_result.value_1 AS value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb, value_1 text)) foo(key, value, value_2, value_1) USING (key)) + count +--------------------------------------------------------------------- + 100 +(1 row) + +EXECUTE local_dist_table_join_subquery(5); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: generating subplan XXX_2 for subquery SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2, local_partitioned_table.value FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2, intermediate_result.value_1 AS value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb, value_1 text)) foo(key, value, value_2, value_1) USING (key)) + count +--------------------------------------------------------------------- + 100 +(1 row) + +EXECUTE local_dist_table_join_subquery(5); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: generating subplan XXX_2 for subquery SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2, local_partitioned_table.value FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2, intermediate_result.value_1 AS value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb, value_1 text)) foo(key, value, value_2, value_1) USING (key)) + count +--------------------------------------------------------------------- + 100 +(1 row) + +EXECUTE local_dist_table_join_subquery(5); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: generating subplan XXX_2 for subquery SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2, local_partitioned_table.value FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2, intermediate_result.value_1 AS value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb, value_1 text)) foo(key, value, value_2, value_1) USING (key)) + count +--------------------------------------------------------------------- + 100 +(1 row) + +PREPARE local_dist_table_join_filters(int) AS SELECT COUNT(*) FROM local_partitioned_table JOIN distributed_table_composite USING(key) + WHERE( + distributed_table_composite.key = $1 OR + distributed_table_composite.key = 20 OR + (distributed_table_composite.key = 10 AND distributed_table_composite.key > 0) OR + distributed_table_composite.value = 'text' + ); +EXECUTE local_dist_table_join_filters(20); +DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text)) +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.local_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_composite.key OPERATOR(pg_catalog.>) 0)) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text)) + count +--------------------------------------------------------------------- + 2 +(1 row) + +EXECUTE local_dist_table_join_filters(20); +DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text)) +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.local_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_composite.key OPERATOR(pg_catalog.>) 0)) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text)) + count +--------------------------------------------------------------------- + 2 +(1 row) + +EXECUTE local_dist_table_join_filters(20); +DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text)) +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.local_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_composite.key OPERATOR(pg_catalog.>) 0)) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text)) + count +--------------------------------------------------------------------- + 2 +(1 row) + +EXECUTE local_dist_table_join_filters(20); +DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text)) +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.local_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_composite.key OPERATOR(pg_catalog.>) 0)) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text)) + count +--------------------------------------------------------------------- + 2 +(1 row) + +EXECUTE local_dist_table_join_filters(20); +DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text)) +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.local_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_composite.key OPERATOR(pg_catalog.>) 0)) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text)) + count +--------------------------------------------------------------------- + 2 +(1 row) + +EXECUTE local_dist_table_join_filters(20); +DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text)) +DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.local_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_composite.key OPERATOR(pg_catalog.>) 0)) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text)) + count +--------------------------------------------------------------------- + 2 +(1 row) + RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA local_table_join CASCADE; diff --git a/src/test/regress/sql/citus_local_dist_joins.sql b/src/test/regress/sql/citus_local_dist_joins.sql index d9806ddd0..5b6950c48 100644 --- a/src/test/regress/sql/citus_local_dist_joins.sql +++ b/src/test/regress/sql/citus_local_dist_joins.sql @@ -2,7 +2,7 @@ CREATE SCHEMA citus_local_dist_joins; SET search_path TO citus_local_dist_joins; SET client_min_messages to ERROR; -SELECT master_add_node('localhost', :master_port, groupId => 0); +SELECT master_add_node('localhost', :master_port, groupId => 0) AS coordinator_nodeid \gset CREATE TABLE citus_local(key int, value text); diff --git a/src/test/regress/sql/citus_local_tables_queries.sql b/src/test/regress/sql/citus_local_tables_queries.sql index 564b7c6c8..f98ead754 100644 --- a/src/test/regress/sql/citus_local_tables_queries.sql +++ b/src/test/regress/sql/citus_local_tables_queries.sql @@ -413,13 +413,14 @@ SET b = 6 FROM distributed_table_cte WHERE citus_local_table.a = distributed_table_cte.a; +SET citus.log_local_commands to off; -- just works WITH reference_table_cte AS (SELECT * FROM reference_table) UPDATE citus_local_table SET b = 6 FROM reference_table_cte WHERE citus_local_table.a = reference_table_cte.a; - +set citus.log_local_commands to on; ------------------------ ----- VIEW QUERIES ----- ------------------------ diff --git a/src/test/regress/sql/local_table_join.sql b/src/test/regress/sql/local_table_join.sql index dad8459f1..40dd2aff6 100644 --- a/src/test/regress/sql/local_table_join.sql +++ b/src/test/regress/sql/local_table_join.sql @@ -119,8 +119,23 @@ SELECT count(*) FROM (SELECT *, random() FROM distributed_table_pkey) as d1 JOI SELECT count(*) FROM (SELECT *, random() FROM distributed_partitioned_table) as d1 JOIN postgres_table ON (postgres_table.key = d1.key AND d1.key < postgres_table.key) WHERE d1.key = 1 AND false; SELECT count(*) FROM (SELECT *, random() FROM distributed_partitioned_table) as d1 JOIN postgres_table ON (postgres_table.key::int = d1.key::int AND d1.key < postgres_table.key) WHERE d1.key::int = 1 AND false; --- TODO:: We should probably recursively plan postgres table here because primary key is on key,value not key. +-- We will plan postgres table as the index is on key,value not just key SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10; +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10 OR distributed_table_composite.key = 20; +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key > 10 AND distributed_table_composite.value = 'text'; +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10 AND distributed_table_composite.value = 'text'; +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) + WHERE (distributed_table_composite.key > 10 OR distributed_table_composite.key = 20) + AND (distributed_table_composite.value = 'text' OR distributed_table_composite.value = 'text'); +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) + WHERE (distributed_table_composite.key > 10 OR distributed_table_composite.value = 'text') + AND (distributed_table_composite.value = 'text' OR distributed_table_composite.key = 30); +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) + WHERE (distributed_table_composite.key > 10 AND distributed_table_composite.value = 'text') + OR (distributed_table_composite.value = 'text' AND distributed_table_composite.key = 30); +SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) + WHERE (distributed_table_composite.key > 10 AND distributed_table_composite.key = 20) + OR (distributed_table_composite.value = 'text' AND distributed_table_composite.value = 'text'); -- a unique index on key so dist table should be recursively planned SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key); @@ -207,10 +222,59 @@ UPDATE reference_table SET key = 1 FROM postgres_table WHERE postgres_table.key UPDATE reference_table SET key = 1 FROM (SELECT * FROM postgres_table) l WHERE l.key = 10; --- TODO:: we should probably not wrap postgres_table here as there is a WHERE FALSE? --- though then the planner could give an error SELECT count(*) FROM postgres_table JOIN distributed_table USING(key) WHERE FALSE; +SELECT count(*) FROM (SELECT * FROM distributed_table JOIN postgres_table USING(key) WHERE false) foo JOIN local_partitioned_table USING(key); + +WITH dist_cte AS (SELECT * FROM distributed_table_pkey WHERE key = 5) +SELECT COUNT(*) FROM dist_cte JOIN postgres_table USING(key) WHERE dist_cte.key = 5; + +SELECT COUNT(*) FROM postgres_table JOIN distributed_table_pkey USING(key) + WHERE (distributed_table_pkey.key IN (SELECT COUNT(*) AS count FROM postgres_table JOIN distributed_table USING(key)) ); + +-- PREPARED statements +PREPARE local_dist_table_join_select(int) AS SELECT COUNT(*) FROM distributed_table_pkey JOIN postgres_table USING(key) WHERE distributed_table_pkey.key = $1; + +EXECUTE local_dist_table_join_select(10); +EXECUTE local_dist_table_join_select(10); +EXECUTE local_dist_table_join_select(10); +EXECUTE local_dist_table_join_select(10); +EXECUTE local_dist_table_join_select(10); +EXECUTE local_dist_table_join_select(10); + +PREPARE local_dist_table_join_update(int) AS UPDATE postgres_table SET key = 5 FROM distributed_table_pkey WHERE distributed_table_pkey.key = $1; + +EXECUTE local_dist_table_join_update(20); +EXECUTE local_dist_table_join_update(20); +EXECUTE local_dist_table_join_update(20); +EXECUTE local_dist_table_join_update(20); +EXECUTE local_dist_table_join_update(20); +EXECUTE local_dist_table_join_update(20); + +PREPARE local_dist_table_join_subquery(int) AS SELECT COUNT(*) FROM postgres_table JOIN (SELECT * FROM distributed_table_pkey JOIN local_partitioned_table USING(key) WHERE distributed_table_pkey.key = $1) foo USING(key); + +EXECUTE local_dist_table_join_subquery(5); +EXECUTE local_dist_table_join_subquery(5); +EXECUTE local_dist_table_join_subquery(5); +EXECUTE local_dist_table_join_subquery(5); +EXECUTE local_dist_table_join_subquery(5); +EXECUTE local_dist_table_join_subquery(5); + +PREPARE local_dist_table_join_filters(int) AS SELECT COUNT(*) FROM local_partitioned_table JOIN distributed_table_composite USING(key) + WHERE( + distributed_table_composite.key = $1 OR + distributed_table_composite.key = 20 OR + (distributed_table_composite.key = 10 AND distributed_table_composite.key > 0) OR + distributed_table_composite.value = 'text' + ); + +EXECUTE local_dist_table_join_filters(20); +EXECUTE local_dist_table_join_filters(20); +EXECUTE local_dist_table_join_filters(20); +EXECUTE local_dist_table_join_filters(20); +EXECUTE local_dist_table_join_filters(20); +EXECUTE local_dist_table_join_filters(20); + RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA local_table_join CASCADE;