diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index a15c3eede..be046bf9b 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "funcapi.h" +#include "miscadmin.h" #include "access/htup_details.h" #include "access/xact.h" @@ -144,6 +145,9 @@ static void ConcatenateRTablesAndPerminfos(PlannedStmt *mainPlan, static bool CheckPostPlanDistribution(DistributedPlanningContext *planContext, bool isDistributedQuery, List *rangeTableList); +#if PG_VERSION_NUM >= PG_VERSION_18 +static int DisableSelfJoinElimination(void); +#endif /* Distributed planner hook */ PlannedStmt * @@ -155,6 +159,9 @@ distributed_planner(Query *parse, bool needsDistributedPlanning = false; bool fastPathRouterQuery = false; FastPathRestrictionContext fastPathContext = { 0 }; +#if PG_VERSION_NUM >= PG_VERSION_18 + int saveNestLevel = -1; +#endif List *rangeTableList = ExtractRangeTableEntryList(parse); @@ -218,6 +225,10 @@ distributed_planner(Query *parse, bool setPartitionedTablesInherited = false; AdjustPartitioningForDistributedPlanning(rangeTableList, setPartitionedTablesInherited); + +#if PG_VERSION_NUM >= PG_VERSION_18 + saveNestLevel = DisableSelfJoinElimination(); +#endif } } @@ -264,6 +275,13 @@ distributed_planner(Query *parse, planContext.plan = standard_planner(planContext.query, NULL, planContext.cursorOptions, planContext.boundParams); +#if PG_VERSION_NUM >= PG_VERSION_18 + if (needsDistributedPlanning) + { + Assert(saveNestLevel > 0); + AtEOXact_GUC(true, saveNestLevel); + } +#endif needsDistributedPlanning = CheckPostPlanDistribution(&planContext, needsDistributedPlanning, rangeTableList); @@ -2791,3 +2809,27 @@ CheckPostPlanDistribution(DistributedPlanningContext *planContext, bool return isDistributedQuery; } + + +#if PG_VERSION_NUM >= PG_VERSION_18 + +/* + * DisableSelfJoinElimination is used to prevent self join elimination + * during distributed query planning to ensure shard queries are correctly + * generated. PG18's self join elimination (fc069a3a6) changes the Query + * in a way that can cause problems for queries with a mix of Citus and + * Postgres tables. Self join elimination is allowed on Postgres tables + * only so queries involving shards get the benefit of it. + */ +static int +DisableSelfJoinElimination(void) +{ + int NestLevel = NewGUCNestLevel(); + set_config_option("enable_self_join_elimination", "off", + (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + return NestLevel; +} + + +#endif diff --git a/src/test/regress/expected/pg18.out b/src/test/regress/expected/pg18.out index 622540e7e..d07e6e3f2 100644 --- a/src/test/regress/expected/pg18.out +++ b/src/test/regress/expected/pg18.out @@ -165,10 +165,289 @@ ORDER BY contype; dist_n_after_drop | n | 1 (2 rows) --- cleanup -RESET client_min_messages; +-- Purpose: test self join elimination for distributed, citus local and local tables. +-- +CREATE TABLE sje_d1 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now()); +CREATE TABLE sje_d2 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now()); +CREATE TABLE sje_local (id bigserial PRIMARY KEY, title text); +SELECT create_distributed_table('sje_d1', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('sje_d2', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO sje_d1 SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO sje_d2 SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO sje_local SELECT i, i::text FROM generate_series(0,100)i; +-- Self-join elimination is applied when distributed tables are involved +-- The query plan has only one join +EXPLAIN (costs off) +select count(1) from sje_d1 INNER +JOIN sje_d2 u1 USING (id) INNER +JOIN sje_d2 u2 USING (id) INNER +JOIN sje_d2 u3 USING (id) INNER +JOIN sje_d2 u4 USING (id) INNER +JOIN sje_d2 u5 USING (id) INNER +JOIN sje_d2 u6 USING (id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (sje_d1.id = u6.id) + -> Seq Scan on sje_d1_102012 sje_d1 + -> Hash + -> Seq Scan on sje_d2_102016 u6 +(12 rows) + +select count(1) from sje_d1 INNER +JOIN sje_d2 u1 USING (id) INNER +JOIN sje_d2 u2 USING (id) INNER +JOIN sje_d2 u3 USING (id) INNER +JOIN sje_d2 u4 USING (id) INNER +JOIN sje_d2 u5 USING (id) INNER +JOIN sje_d2 u6 USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- Self-join elimination applied to from list join +EXPLAIN (costs off) +SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3 +WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id; + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (d1.id = u3.id) + -> Seq Scan on sje_d1_102012 d1 + -> Hash + -> Seq Scan on sje_d2_102016 u3 +(12 rows) + +SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3 +WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- Self-join elimination is not applied when a local table is involved +-- This is a limitation that will be resolved in citus 14 +EXPLAIN (costs off) +select count(1) from sje_d1 INNER +JOIN sje_local u1 USING (id) INNER +JOIN sje_local u2 USING (id) INNER +JOIN sje_local u3 USING (id) INNER +JOIN sje_local u4 USING (id) INNER +JOIN sje_local u5 USING (id) INNER +JOIN sje_local u6 USING (id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Seq Scan on sje_local u1 + -> Distributed Subplan XXX_2 + -> Seq Scan on sje_local u2 + -> Distributed Subplan XXX_3 + -> Seq Scan on sje_local u3 + -> Distributed Subplan XXX_4 + -> Seq Scan on sje_local u4 + -> Distributed Subplan XXX_5 + -> Seq Scan on sje_local u5 + -> Distributed Subplan XXX_6 + -> Seq Scan on sje_local u6 + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (intermediate_result_5.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_5 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result_4.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_4 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result_3.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_3 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result_2.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_2 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result_1.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_1 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result + -> Hash + -> Seq Scan on sje_d1_102012 sje_d1 +(44 rows) + +select count(1) from sje_d1 INNER +JOIN sje_local u1 USING (id) INNER +JOIN sje_local u2 USING (id) INNER +JOIN sje_local u3 USING (id) INNER +JOIN sje_local u4 USING (id) INNER +JOIN sje_local u5 USING (id) INNER +JOIN sje_local u6 USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- to test USING vs ON equivalence +EXPLAIN (costs off) +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 ON (d.id = u1.id) +JOIN sje_d2 u2 ON (u1.id = u2.id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (d.id = u2.id) + -> Seq Scan on sje_d1_102012 d + -> Hash + -> Seq Scan on sje_d2_102016 u2 +(12 rows) + +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 ON (d.id = u1.id) +JOIN sje_d2 u2 ON (u1.id = u2.id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- Null-introducing join can have SJE +EXPLAIN (costs off) +SELECT count(*) +FROM sje_d1 d +LEFT JOIN sje_d2 u1 USING (id) +LEFT JOIN sje_d2 u2 USING (id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Seq Scan on sje_d1_102012 d +(8 rows) + +SELECT count(*) +FROM sje_d1 d +LEFT JOIN sje_d2 u1 USING (id) +LEFT JOIN sje_d2 u2 USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- prepared statement +PREPARE sje_p(int,int) AS +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 USING (id) +JOIN sje_d2 u2 USING (id) +WHERE d.id BETWEEN $1 AND $2; +EXPLAIN (costs off) +EXECUTE sje_p(10,20); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (u2.id = d.id) + -> Seq Scan on sje_d2_102016 u2 + -> Hash + -> Bitmap Heap Scan on sje_d1_102012 d + Recheck Cond: ((id >= 10) AND (id <= 20)) + -> Bitmap Index Scan on sje_d1_pkey_102012 + Index Cond: ((id >= 10) AND (id <= 20)) +(15 rows) + +EXECUTE sje_p(10,20); + count +--------------------------------------------------------------------- + 11 +(1 row) + +-- cte +EXPLAIN (costs off) +WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0) +SELECT count(1) +FROM sje_d1 d +JOIN z USING (id) +JOIN sje_d2 u2 USING (id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (d.id = u2.id) + -> Seq Scan on sje_d1_102012 d + -> Hash + -> Seq Scan on sje_d2_102016 u2 + Filter: ((id % '2'::bigint) = 0) +(13 rows) + +WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0) +SELECT count(1) +FROM sje_d1 d +JOIN z USING (id) +JOIN sje_d2 u2 USING (id); + count +--------------------------------------------------------------------- + 51 +(1 row) + +-- cleanup with minimum verbosity +SET client_min_messages TO ERROR; RESET search_path; DROP SCHEMA pg18_nn CASCADE; -NOTICE: drop cascades to 2 other objects -DETAIL: drop cascades to table pg18_nn.nn_local -drop cascades to table pg18_nn.nn_dist +RESET client_min_messages; diff --git a/src/test/regress/sql/pg18.sql b/src/test/regress/sql/pg18.sql index 8281ab239..6d7685b94 100644 --- a/src/test/regress/sql/pg18.sql +++ b/src/test/regress/sql/pg18.sql @@ -134,7 +134,119 @@ WHERE conrelid = 'pg18_nn.nn_dist'::regclass GROUP BY contype ORDER BY contype; --- cleanup -RESET client_min_messages; +-- Purpose: test self join elimination for distributed, citus local and local tables. +-- +CREATE TABLE sje_d1 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now()); +CREATE TABLE sje_d2 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now()); +CREATE TABLE sje_local (id bigserial PRIMARY KEY, title text); + +SELECT create_distributed_table('sje_d1', 'id'); +SELECT create_distributed_table('sje_d2', 'id'); + +INSERT INTO sje_d1 SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO sje_d2 SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO sje_local SELECT i, i::text FROM generate_series(0,100)i; + +-- Self-join elimination is applied when distributed tables are involved +-- The query plan has only one join +EXPLAIN (costs off) +select count(1) from sje_d1 INNER +JOIN sje_d2 u1 USING (id) INNER +JOIN sje_d2 u2 USING (id) INNER +JOIN sje_d2 u3 USING (id) INNER +JOIN sje_d2 u4 USING (id) INNER +JOIN sje_d2 u5 USING (id) INNER +JOIN sje_d2 u6 USING (id); + +select count(1) from sje_d1 INNER +JOIN sje_d2 u1 USING (id) INNER +JOIN sje_d2 u2 USING (id) INNER +JOIN sje_d2 u3 USING (id) INNER +JOIN sje_d2 u4 USING (id) INNER +JOIN sje_d2 u5 USING (id) INNER +JOIN sje_d2 u6 USING (id); + +-- Self-join elimination applied to from list join +EXPLAIN (costs off) +SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3 +WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id; + +SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3 +WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id; + +-- Self-join elimination is not applied when a local table is involved +-- This is a limitation that will be resolved in citus 14 +EXPLAIN (costs off) +select count(1) from sje_d1 INNER +JOIN sje_local u1 USING (id) INNER +JOIN sje_local u2 USING (id) INNER +JOIN sje_local u3 USING (id) INNER +JOIN sje_local u4 USING (id) INNER +JOIN sje_local u5 USING (id) INNER +JOIN sje_local u6 USING (id); + +select count(1) from sje_d1 INNER +JOIN sje_local u1 USING (id) INNER +JOIN sje_local u2 USING (id) INNER +JOIN sje_local u3 USING (id) INNER +JOIN sje_local u4 USING (id) INNER +JOIN sje_local u5 USING (id) INNER +JOIN sje_local u6 USING (id); + + +-- to test USING vs ON equivalence +EXPLAIN (costs off) +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 ON (d.id = u1.id) +JOIN sje_d2 u2 ON (u1.id = u2.id); + +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 ON (d.id = u1.id) +JOIN sje_d2 u2 ON (u1.id = u2.id); + +-- Null-introducing join can have SJE +EXPLAIN (costs off) +SELECT count(*) +FROM sje_d1 d +LEFT JOIN sje_d2 u1 USING (id) +LEFT JOIN sje_d2 u2 USING (id); + +SELECT count(*) +FROM sje_d1 d +LEFT JOIN sje_d2 u1 USING (id) +LEFT JOIN sje_d2 u2 USING (id); + +-- prepared statement +PREPARE sje_p(int,int) AS +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 USING (id) +JOIN sje_d2 u2 USING (id) +WHERE d.id BETWEEN $1 AND $2; + +EXPLAIN (costs off) +EXECUTE sje_p(10,20); + +EXECUTE sje_p(10,20); + +-- cte +EXPLAIN (costs off) +WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0) +SELECT count(1) +FROM sje_d1 d +JOIN z USING (id) +JOIN sje_d2 u2 USING (id); + +WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0) +SELECT count(1) +FROM sje_d1 d +JOIN z USING (id) +JOIN sje_d2 u2 USING (id); + +-- cleanup with minimum verbosity +SET client_min_messages TO ERROR; RESET search_path; DROP SCHEMA pg18_nn CASCADE; +RESET client_min_messages;