diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index f699553b6..0b4f2400c 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -1394,9 +1394,6 @@ static StripeMetadata * UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset, uint64 dataLength, uint64 rowCount, uint64 chunkCount) { - SnapshotData dirtySnapshot; - InitDirtySnapshot(dirtySnapshot); - ScanKeyData scanKey[2]; ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(storageId)); @@ -1405,23 +1402,16 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset, Oid columnarStripesOid = ColumnarStripeRelationId(); -#if PG_VERSION_NUM >= 180000 - - /* CatalogTupleUpdate performs a normal heap UPDATE → RowExclusiveLock */ - const LOCKMODE openLockMode = RowExclusiveLock; -#else - - /* In‑place update never changed tuple length → AccessShareLock was enough */ - const LOCKMODE openLockMode = AccessShareLock; -#endif - - Relation columnarStripes = table_open(columnarStripesOid, openLockMode); + Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes); Oid indexId = ColumnarStripePKeyIndexRelationId(); bool indexOk = OidIsValid(indexId); - SysScanDesc scanDescriptor = systable_beginscan(columnarStripes, indexId, indexOk, - &dirtySnapshot, 2, scanKey); + + void *state; + HeapTuple tuple; + systable_inplace_update_begin(columnarStripes, indexId, indexOk, NULL, + 2, scanKey, &tuple, &state); static bool loggedSlowMetadataAccessWarning = false; if (!indexOk && !loggedSlowMetadataAccessWarning) @@ -1430,8 +1420,7 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset, loggedSlowMetadataAccessWarning = true; } - HeapTuple oldTuple = systable_getnext(scanDescriptor); - if (!HeapTupleIsValid(oldTuple)) + if (!HeapTupleIsValid(tuple)) { ereport(ERROR, (errmsg("attempted to modify an unexpected stripe, " "columnar storage with id=" UINT64_FORMAT @@ -1439,6 +1428,11 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset, storageId, stripeId))); } + /* + * systable_inplace_update_finish already doesn't allow changing size of the original + * tuple, so we don't allow setting any Datum's to NULL values. + */ + Datum *newValues = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); bool *newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); bool *update = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); @@ -1453,43 +1447,21 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset, newValues[Anum_columnar_stripe_row_count - 1] = UInt64GetDatum(rowCount); newValues[Anum_columnar_stripe_chunk_count - 1] = Int32GetDatum(chunkCount); - HeapTuple modifiedTuple = heap_modify_tuple(oldTuple, - tupleDescriptor, - newValues, - newNulls, - update); + tuple = heap_modify_tuple(tuple, + tupleDescriptor, + newValues, + newNulls, + update); -#if PG_VERSION_NUM < PG_VERSION_18 + systable_inplace_update_finish(state, tuple); - /* - * heap_inplace_update already doesn't allow changing size of the original - * tuple, so we don't allow setting any Datum's to NULL values. - */ - heap_inplace_update(columnarStripes, modifiedTuple); - - /* - * Existing tuple now contains modifications, because we used - * heap_inplace_update(). - */ - HeapTuple newTuple = oldTuple; -#else - - /* Regular catalog UPDATE keeps indexes in sync */ - CatalogTupleUpdate(columnarStripes, &oldTuple->t_self, modifiedTuple); - HeapTuple newTuple = modifiedTuple; -#endif + StripeMetadata *modifiedStripeMetadata = BuildStripeMetadata(columnarStripes, + tuple); CommandCounterIncrement(); - /* - * Must not pass modifiedTuple, because BuildStripeMetadata expects a real - * heap tuple with MVCC fields. - */ - StripeMetadata *modifiedStripeMetadata = - BuildStripeMetadata(columnarStripes, newTuple); - - systable_endscan(scanDescriptor); - table_close(columnarStripes, openLockMode); + heap_freetuple(tuple); + table_close(columnarStripes, AccessShareLock); pfree(newValues); pfree(newNulls); diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index a15c3eede..be046bf9b 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "funcapi.h" +#include "miscadmin.h" #include "access/htup_details.h" #include "access/xact.h" @@ -144,6 +145,9 @@ static void ConcatenateRTablesAndPerminfos(PlannedStmt *mainPlan, static bool CheckPostPlanDistribution(DistributedPlanningContext *planContext, bool isDistributedQuery, List *rangeTableList); +#if PG_VERSION_NUM >= PG_VERSION_18 +static int DisableSelfJoinElimination(void); +#endif /* Distributed planner hook */ PlannedStmt * @@ -155,6 +159,9 @@ distributed_planner(Query *parse, bool needsDistributedPlanning = false; bool fastPathRouterQuery = false; FastPathRestrictionContext fastPathContext = { 0 }; +#if PG_VERSION_NUM >= PG_VERSION_18 + int saveNestLevel = -1; +#endif List *rangeTableList = ExtractRangeTableEntryList(parse); @@ -218,6 +225,10 @@ distributed_planner(Query *parse, bool setPartitionedTablesInherited = false; AdjustPartitioningForDistributedPlanning(rangeTableList, setPartitionedTablesInherited); + +#if PG_VERSION_NUM >= PG_VERSION_18 + saveNestLevel = DisableSelfJoinElimination(); +#endif } } @@ -264,6 +275,13 @@ distributed_planner(Query *parse, planContext.plan = standard_planner(planContext.query, NULL, planContext.cursorOptions, planContext.boundParams); +#if PG_VERSION_NUM >= PG_VERSION_18 + if (needsDistributedPlanning) + { + Assert(saveNestLevel > 0); + AtEOXact_GUC(true, saveNestLevel); + } +#endif needsDistributedPlanning = CheckPostPlanDistribution(&planContext, needsDistributedPlanning, rangeTableList); @@ -2791,3 +2809,27 @@ CheckPostPlanDistribution(DistributedPlanningContext *planContext, bool return isDistributedQuery; } + + +#if PG_VERSION_NUM >= PG_VERSION_18 + +/* + * DisableSelfJoinElimination is used to prevent self join elimination + * during distributed query planning to ensure shard queries are correctly + * generated. PG18's self join elimination (fc069a3a6) changes the Query + * in a way that can cause problems for queries with a mix of Citus and + * Postgres tables. Self join elimination is allowed on Postgres tables + * only so queries involving shards get the benefit of it. + */ +static int +DisableSelfJoinElimination(void) +{ + int NestLevel = NewGUCNestLevel(); + set_config_option("enable_self_join_elimination", "off", + (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + return NestLevel; +} + + +#endif diff --git a/src/test/regress/expected/pg18.out b/src/test/regress/expected/pg18.out index 622540e7e..d07e6e3f2 100644 --- a/src/test/regress/expected/pg18.out +++ b/src/test/regress/expected/pg18.out @@ -165,10 +165,289 @@ ORDER BY contype; dist_n_after_drop | n | 1 (2 rows) --- cleanup -RESET client_min_messages; +-- Purpose: test self join elimination for distributed, citus local and local tables. +-- +CREATE TABLE sje_d1 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now()); +CREATE TABLE sje_d2 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now()); +CREATE TABLE sje_local (id bigserial PRIMARY KEY, title text); +SELECT create_distributed_table('sje_d1', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('sje_d2', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO sje_d1 SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO sje_d2 SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO sje_local SELECT i, i::text FROM generate_series(0,100)i; +-- Self-join elimination is applied when distributed tables are involved +-- The query plan has only one join +EXPLAIN (costs off) +select count(1) from sje_d1 INNER +JOIN sje_d2 u1 USING (id) INNER +JOIN sje_d2 u2 USING (id) INNER +JOIN sje_d2 u3 USING (id) INNER +JOIN sje_d2 u4 USING (id) INNER +JOIN sje_d2 u5 USING (id) INNER +JOIN sje_d2 u6 USING (id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (sje_d1.id = u6.id) + -> Seq Scan on sje_d1_102012 sje_d1 + -> Hash + -> Seq Scan on sje_d2_102016 u6 +(12 rows) + +select count(1) from sje_d1 INNER +JOIN sje_d2 u1 USING (id) INNER +JOIN sje_d2 u2 USING (id) INNER +JOIN sje_d2 u3 USING (id) INNER +JOIN sje_d2 u4 USING (id) INNER +JOIN sje_d2 u5 USING (id) INNER +JOIN sje_d2 u6 USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- Self-join elimination applied to from list join +EXPLAIN (costs off) +SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3 +WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id; + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (d1.id = u3.id) + -> Seq Scan on sje_d1_102012 d1 + -> Hash + -> Seq Scan on sje_d2_102016 u3 +(12 rows) + +SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3 +WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- Self-join elimination is not applied when a local table is involved +-- This is a limitation that will be resolved in citus 14 +EXPLAIN (costs off) +select count(1) from sje_d1 INNER +JOIN sje_local u1 USING (id) INNER +JOIN sje_local u2 USING (id) INNER +JOIN sje_local u3 USING (id) INNER +JOIN sje_local u4 USING (id) INNER +JOIN sje_local u5 USING (id) INNER +JOIN sje_local u6 USING (id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Seq Scan on sje_local u1 + -> Distributed Subplan XXX_2 + -> Seq Scan on sje_local u2 + -> Distributed Subplan XXX_3 + -> Seq Scan on sje_local u3 + -> Distributed Subplan XXX_4 + -> Seq Scan on sje_local u4 + -> Distributed Subplan XXX_5 + -> Seq Scan on sje_local u5 + -> Distributed Subplan XXX_6 + -> Seq Scan on sje_local u6 + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (intermediate_result_5.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_5 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result_4.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_4 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result_3.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_3 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result_2.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_2 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result_1.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_1 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result + -> Hash + -> Seq Scan on sje_d1_102012 sje_d1 +(44 rows) + +select count(1) from sje_d1 INNER +JOIN sje_local u1 USING (id) INNER +JOIN sje_local u2 USING (id) INNER +JOIN sje_local u3 USING (id) INNER +JOIN sje_local u4 USING (id) INNER +JOIN sje_local u5 USING (id) INNER +JOIN sje_local u6 USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- to test USING vs ON equivalence +EXPLAIN (costs off) +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 ON (d.id = u1.id) +JOIN sje_d2 u2 ON (u1.id = u2.id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (d.id = u2.id) + -> Seq Scan on sje_d1_102012 d + -> Hash + -> Seq Scan on sje_d2_102016 u2 +(12 rows) + +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 ON (d.id = u1.id) +JOIN sje_d2 u2 ON (u1.id = u2.id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- Null-introducing join can have SJE +EXPLAIN (costs off) +SELECT count(*) +FROM sje_d1 d +LEFT JOIN sje_d2 u1 USING (id) +LEFT JOIN sje_d2 u2 USING (id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Seq Scan on sje_d1_102012 d +(8 rows) + +SELECT count(*) +FROM sje_d1 d +LEFT JOIN sje_d2 u1 USING (id) +LEFT JOIN sje_d2 u2 USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- prepared statement +PREPARE sje_p(int,int) AS +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 USING (id) +JOIN sje_d2 u2 USING (id) +WHERE d.id BETWEEN $1 AND $2; +EXPLAIN (costs off) +EXECUTE sje_p(10,20); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (u2.id = d.id) + -> Seq Scan on sje_d2_102016 u2 + -> Hash + -> Bitmap Heap Scan on sje_d1_102012 d + Recheck Cond: ((id >= 10) AND (id <= 20)) + -> Bitmap Index Scan on sje_d1_pkey_102012 + Index Cond: ((id >= 10) AND (id <= 20)) +(15 rows) + +EXECUTE sje_p(10,20); + count +--------------------------------------------------------------------- + 11 +(1 row) + +-- cte +EXPLAIN (costs off) +WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0) +SELECT count(1) +FROM sje_d1 d +JOIN z USING (id) +JOIN sje_d2 u2 USING (id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (d.id = u2.id) + -> Seq Scan on sje_d1_102012 d + -> Hash + -> Seq Scan on sje_d2_102016 u2 + Filter: ((id % '2'::bigint) = 0) +(13 rows) + +WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0) +SELECT count(1) +FROM sje_d1 d +JOIN z USING (id) +JOIN sje_d2 u2 USING (id); + count +--------------------------------------------------------------------- + 51 +(1 row) + +-- cleanup with minimum verbosity +SET client_min_messages TO ERROR; RESET search_path; DROP SCHEMA pg18_nn CASCADE; -NOTICE: drop cascades to 2 other objects -DETAIL: drop cascades to table pg18_nn.nn_local -drop cascades to table pg18_nn.nn_dist +RESET client_min_messages; diff --git a/src/test/regress/sql/pg18.sql b/src/test/regress/sql/pg18.sql index 8281ab239..6d7685b94 100644 --- a/src/test/regress/sql/pg18.sql +++ b/src/test/regress/sql/pg18.sql @@ -134,7 +134,119 @@ WHERE conrelid = 'pg18_nn.nn_dist'::regclass GROUP BY contype ORDER BY contype; --- cleanup -RESET client_min_messages; +-- Purpose: test self join elimination for distributed, citus local and local tables. +-- +CREATE TABLE sje_d1 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now()); +CREATE TABLE sje_d2 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now()); +CREATE TABLE sje_local (id bigserial PRIMARY KEY, title text); + +SELECT create_distributed_table('sje_d1', 'id'); +SELECT create_distributed_table('sje_d2', 'id'); + +INSERT INTO sje_d1 SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO sje_d2 SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO sje_local SELECT i, i::text FROM generate_series(0,100)i; + +-- Self-join elimination is applied when distributed tables are involved +-- The query plan has only one join +EXPLAIN (costs off) +select count(1) from sje_d1 INNER +JOIN sje_d2 u1 USING (id) INNER +JOIN sje_d2 u2 USING (id) INNER +JOIN sje_d2 u3 USING (id) INNER +JOIN sje_d2 u4 USING (id) INNER +JOIN sje_d2 u5 USING (id) INNER +JOIN sje_d2 u6 USING (id); + +select count(1) from sje_d1 INNER +JOIN sje_d2 u1 USING (id) INNER +JOIN sje_d2 u2 USING (id) INNER +JOIN sje_d2 u3 USING (id) INNER +JOIN sje_d2 u4 USING (id) INNER +JOIN sje_d2 u5 USING (id) INNER +JOIN sje_d2 u6 USING (id); + +-- Self-join elimination applied to from list join +EXPLAIN (costs off) +SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3 +WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id; + +SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3 +WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id; + +-- Self-join elimination is not applied when a local table is involved +-- This is a limitation that will be resolved in citus 14 +EXPLAIN (costs off) +select count(1) from sje_d1 INNER +JOIN sje_local u1 USING (id) INNER +JOIN sje_local u2 USING (id) INNER +JOIN sje_local u3 USING (id) INNER +JOIN sje_local u4 USING (id) INNER +JOIN sje_local u5 USING (id) INNER +JOIN sje_local u6 USING (id); + +select count(1) from sje_d1 INNER +JOIN sje_local u1 USING (id) INNER +JOIN sje_local u2 USING (id) INNER +JOIN sje_local u3 USING (id) INNER +JOIN sje_local u4 USING (id) INNER +JOIN sje_local u5 USING (id) INNER +JOIN sje_local u6 USING (id); + + +-- to test USING vs ON equivalence +EXPLAIN (costs off) +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 ON (d.id = u1.id) +JOIN sje_d2 u2 ON (u1.id = u2.id); + +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 ON (d.id = u1.id) +JOIN sje_d2 u2 ON (u1.id = u2.id); + +-- Null-introducing join can have SJE +EXPLAIN (costs off) +SELECT count(*) +FROM sje_d1 d +LEFT JOIN sje_d2 u1 USING (id) +LEFT JOIN sje_d2 u2 USING (id); + +SELECT count(*) +FROM sje_d1 d +LEFT JOIN sje_d2 u1 USING (id) +LEFT JOIN sje_d2 u2 USING (id); + +-- prepared statement +PREPARE sje_p(int,int) AS +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 USING (id) +JOIN sje_d2 u2 USING (id) +WHERE d.id BETWEEN $1 AND $2; + +EXPLAIN (costs off) +EXECUTE sje_p(10,20); + +EXECUTE sje_p(10,20); + +-- cte +EXPLAIN (costs off) +WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0) +SELECT count(1) +FROM sje_d1 d +JOIN z USING (id) +JOIN sje_d2 u2 USING (id); + +WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0) +SELECT count(1) +FROM sje_d1 d +JOIN z USING (id) +JOIN sje_d2 u2 USING (id); + +-- cleanup with minimum verbosity +SET client_min_messages TO ERROR; RESET search_path; DROP SCHEMA pg18_nn CASCADE; +RESET client_min_messages;