From 847b79078fd47b3ab26955c19e5cee5fd0aa3ce8 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 11 Feb 2021 15:04:07 +0300 Subject: [PATCH] Not consider subplans in restriction list (#4679) * Not consider subplans in restriction list * Not consider sublink, alternative subplan in restrictions --- .../relation_restriction_equivalence.c | 19 ++- .../regress/expected/local_table_join.out | 144 +++++++++++++++++- src/test/regress/sql/local_table_join.sql | 77 ++++++++++ 3 files changed, 232 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index a251f9d30..0633f709f 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -148,7 +148,7 @@ static Index RelationRestrictionPartitionKeyIndex(RelationRestriction * relationRestriction); static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext * restrictionContext); -static bool IsParam(Node *node); +static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node); static JoinRestrictionContext * FilterJoinRestrictionContext( JoinRestrictionContext *joinRestrictionContext, Relids queryRteIdentities); @@ -1879,8 +1879,12 @@ GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, { Expr *restrictionClause = restrictInfo->clause; - /* we cannot process Params beacuse they are not known at this point */ - if (FindNodeMatchingCheckFunction((Node *) restrictionClause, IsParam)) + /* + * we cannot process some restriction clauses because they are not + * safe to recursively plan. + */ + if (FindNodeMatchingCheckFunction((Node *) restrictionClause, + IsNotSafeRestrictionToRecursivelyPlan)) { continue; } @@ -1945,16 +1949,17 @@ RelationRestrictionForRelation(RangeTblEntry *rangeTableEntry, /* - * IsParam determines whether the given node is a param. + * IsNotSafeRestrictionToRecursivelyPlan returns true if the given node + * is not a safe restriction to be recursivelly planned. */ static bool -IsParam(Node *node) +IsNotSafeRestrictionToRecursivelyPlan(Node *node) { - if (IsA(node, Param)) + if (IsA(node, Param) || IsA(node, SubLink) || IsA(node, SubPlan) || IsA(node, + AlternativeSubPlan)) { return true; } - return false; } diff --git a/src/test/regress/expected/local_table_join.out b/src/test/regress/expected/local_table_join.out index 3e7874870..65d0ddba2 100644 --- a/src/test/regress/expected/local_table_join.out +++ b/src/test/regress/expected/local_table_join.out @@ -1324,7 +1324,149 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) +--Issue 4678 +create table custom_pg_type(typdefault text); +insert into custom_pg_type VALUES ('b'); +create table tbl (a int); +insert into tbl VALUES (1); +-- check result with local tables +select typdefault from ( + select typdefault from ( + select typdefault from + custom_pg_type, + lateral ( + select a from tbl + where typdefault > 'a' + limit 1) as subq_0 + where ( + select true from pg_catalog.pg_am limit 1 + ) + ) as subq_1 +) as subq_2; + typdefault +--------------------------------------------------------------------- + b +(1 row) + +select create_distributed_table('tbl', 'a'); +NOTICE: Copying data from local table... +DEBUG: Copied 1 rows +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$local_table_join.tbl$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- subplans work but we might skip the restrictions in them +select typdefault from ( + select typdefault from ( + select typdefault from + custom_pg_type, + lateral ( + select a from tbl + where typdefault > 'a' + limit 1) as subq_0 + where ( + select true from pg_catalog.pg_am limit 1 + ) + ) as subq_1 +) as subq_2; +DEBUG: generating subplan XXX_1 for subquery SELECT true AS bool FROM pg_am LIMIT 1 +DEBUG: Wrapping relation "custom_pg_type" to a subquery +DEBUG: generating subplan XXX_2 for subquery SELECT typdefault FROM local_table_join.custom_pg_type WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT typdefault FROM (SELECT subq_1.typdefault FROM (SELECT custom_pg_type.typdefault FROM (SELECT custom_pg_type_1.typdefault FROM (SELECT intermediate_result.typdefault FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(typdefault text)) custom_pg_type_1) custom_pg_type, LATERAL (SELECT tbl.a FROM local_table_join.tbl WHERE (custom_pg_type.typdefault OPERATOR(pg_catalog.>) 'a'::text) LIMIT 1) subq_0 WHERE (SELECT intermediate_result.bool FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(bool boolean))) subq_1) subq_2 + typdefault +--------------------------------------------------------------------- + b +(1 row) + +-- Not supported because of 4470 +select typdefault from ( + select typdefault from ( + select typdefault from + custom_pg_type, + lateral ( + select a from tbl + where typdefault > 'a' + limit 1) as subq_0 + where ( + select true from pg_catalog.pg_am + where typdefault = 'a' LIMIT 1 + ) + ) as subq_1 +) as subq_2; +DEBUG: Wrapping relation "custom_pg_type" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT typdefault FROM local_table_join.custom_pg_type WHERE true +ERROR: direct joins between distributed and local tables are not supported +HINT: Use CTE's or subqueries to select from local tables and use them in joins +-- correlated sublinks are not yet supported because of #4470, unless we convert not-correlated table +SELECT COUNT(*) FROM distributed_table d1 JOIN postgres_table using(key) +WHERE d1.key IN (SELECT key FROM distributed_table WHERE d1.key = key and key = 5); +DEBUG: Wrapping relation "postgres_table" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.postgres_table WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table d1 JOIN (SELECT postgres_table_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) postgres_table_1) postgres_table USING (key)) WHERE (d1.key OPERATOR(pg_catalog.=) ANY (SELECT distributed_table.key FROM local_table_join.distributed_table WHERE ((d1.key OPERATOR(pg_catalog.=) distributed_table.key) AND (distributed_table.key OPERATOR(pg_catalog.=) 5)))) + count +--------------------------------------------------------------------- + 100 +(1 row) + +set citus.local_table_join_policy to 'prefer-distributed'; +SELECT COUNT(*) FROM distributed_table d1 JOIN postgres_table using(key) +WHERE d1.key IN (SELECT key FROM distributed_table WHERE d1.key = key and key = 5); +DEBUG: Wrapping relation "distributed_table" "d1" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.distributed_table d1 WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT d1_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) d1_1) d1 JOIN local_table_join.postgres_table USING (key)) WHERE (d1.key OPERATOR(pg_catalog.=) ANY (SELECT distributed_table.key FROM local_table_join.distributed_table WHERE ((d1.key OPERATOR(pg_catalog.=) distributed_table.key) AND (distributed_table.key OPERATOR(pg_catalog.=) 5)))) +ERROR: direct joins between distributed and local tables are not supported +HINT: Use CTE's or subqueries to select from local tables and use them in joins +set citus.local_table_join_policy to 'auto'; +-- Some more subqueries +SELECT COUNT(*) FROM distributed_table JOIN postgres_table using(key) +WHERE distributed_table.key IN (SELECT key FROM distributed_table WHERE key = 5); +DEBUG: Wrapping relation "postgres_table" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table JOIN (SELECT postgres_table_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) postgres_table_1) postgres_table USING (key)) WHERE (distributed_table.key OPERATOR(pg_catalog.=) ANY (SELECT distributed_table_1.key FROM local_table_join.distributed_table distributed_table_1 WHERE (distributed_table_1.key OPERATOR(pg_catalog.=) 5))) + count +--------------------------------------------------------------------- + 100 +(1 row) + +SELECT COUNT(*) FROM distributed_table JOIN postgres_table using(key) +WHERE distributed_table.key IN (SELECT key FROM distributed_table WHERE key = 5) AND distributed_table.key = 5; +DEBUG: Wrapping relation "postgres_table" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table JOIN (SELECT postgres_table_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) postgres_table_1) postgres_table USING (key)) WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (SELECT distributed_table_1.key FROM local_table_join.distributed_table distributed_table_1 WHERE (distributed_table_1.key OPERATOR(pg_catalog.=) 5))) AND (distributed_table.key OPERATOR(pg_catalog.=) 5)) + count +--------------------------------------------------------------------- + 100 +(1 row) + +SELECT COUNT(*) FROM distributed_table_pkey JOIN postgres_table using(key) +WHERE distributed_table_pkey.key IN (SELECT key FROM distributed_table_pkey WHERE key = 5); +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table_pkey_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_table_pkey_1) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) ANY (SELECT distributed_table_pkey_1.key FROM local_table_join.distributed_table_pkey distributed_table_pkey_1 WHERE (distributed_table_pkey_1.key OPERATOR(pg_catalog.=) 5))) +DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table_pkey_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_table_pkey_1) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) + count +--------------------------------------------------------------------- + 100 +(1 row) + +SELECT COUNT(*) FROM distributed_table_pkey JOIN postgres_table using(key) +WHERE distributed_table_pkey.key IN (SELECT key FROM distributed_table_pkey WHERE key = 5) AND distributed_table_pkey.key = 5; +DEBUG: Wrapping relation "distributed_table_pkey" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table_pkey_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_table_pkey_1) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE ((distributed_table_pkey.key OPERATOR(pg_catalog.=) ANY (SELECT distributed_table_pkey_1.key FROM local_table_join.distributed_table_pkey distributed_table_pkey_1 WHERE (distributed_table_pkey_1.key OPERATOR(pg_catalog.=) 5))) AND (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5)) +DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table_pkey_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_table_pkey_1) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE ((distributed_table_pkey.key OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) AND (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5)) + count +--------------------------------------------------------------------- + 100 +(1 row) + RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA local_table_join CASCADE; -NOTICE: drop cascades to 16 other objects +NOTICE: drop cascades to 18 other objects diff --git a/src/test/regress/sql/local_table_join.sql b/src/test/regress/sql/local_table_join.sql index c05ec05e6..0c8c3317c 100644 --- a/src/test/regress/sql/local_table_join.sql +++ b/src/test/regress/sql/local_table_join.sql @@ -335,6 +335,83 @@ ALTER TABLE local DROP column key3; ALTER TABLE local DROP column key1; SELECT COUNT(*) FROM distributed_table JOIN local ON distributed_table.value = 'text'; +--Issue 4678 + +create table custom_pg_type(typdefault text); +insert into custom_pg_type VALUES ('b'); + +create table tbl (a int); +insert into tbl VALUES (1); + +-- check result with local tables +select typdefault from ( + select typdefault from ( + select typdefault from + custom_pg_type, + lateral ( + select a from tbl + where typdefault > 'a' + limit 1) as subq_0 + where ( + select true from pg_catalog.pg_am limit 1 + ) + ) as subq_1 +) as subq_2; + +select create_distributed_table('tbl', 'a'); + +-- subplans work but we might skip the restrictions in them +select typdefault from ( + select typdefault from ( + select typdefault from + custom_pg_type, + lateral ( + select a from tbl + where typdefault > 'a' + limit 1) as subq_0 + where ( + select true from pg_catalog.pg_am limit 1 + ) + ) as subq_1 +) as subq_2; + +-- Not supported because of 4470 +select typdefault from ( + select typdefault from ( + select typdefault from + custom_pg_type, + lateral ( + select a from tbl + where typdefault > 'a' + limit 1) as subq_0 + where ( + select true from pg_catalog.pg_am + where typdefault = 'a' LIMIT 1 + ) + ) as subq_1 +) as subq_2; + +-- correlated sublinks are not yet supported because of #4470, unless we convert not-correlated table +SELECT COUNT(*) FROM distributed_table d1 JOIN postgres_table using(key) +WHERE d1.key IN (SELECT key FROM distributed_table WHERE d1.key = key and key = 5); + +set citus.local_table_join_policy to 'prefer-distributed'; +SELECT COUNT(*) FROM distributed_table d1 JOIN postgres_table using(key) +WHERE d1.key IN (SELECT key FROM distributed_table WHERE d1.key = key and key = 5); +set citus.local_table_join_policy to 'auto'; + +-- Some more subqueries +SELECT COUNT(*) FROM distributed_table JOIN postgres_table using(key) +WHERE distributed_table.key IN (SELECT key FROM distributed_table WHERE key = 5); + +SELECT COUNT(*) FROM distributed_table JOIN postgres_table using(key) +WHERE distributed_table.key IN (SELECT key FROM distributed_table WHERE key = 5) AND distributed_table.key = 5; + +SELECT COUNT(*) FROM distributed_table_pkey JOIN postgres_table using(key) +WHERE distributed_table_pkey.key IN (SELECT key FROM distributed_table_pkey WHERE key = 5); + +SELECT COUNT(*) FROM distributed_table_pkey JOIN postgres_table using(key) +WHERE distributed_table_pkey.key IN (SELECT key FROM distributed_table_pkey WHERE key = 5) AND distributed_table_pkey.key = 5; RESET client_min_messages; \set VERBOSITY terse