From abd50a0bb891065efa7ba4eaf786acf5a1172bc0 Mon Sep 17 00:00:00 2001 From: naisila Date: Tue, 14 Oct 2025 12:45:17 +0300 Subject: [PATCH 1/5] Revert "PG18 - Adapt columnar stripe metadata updates (#8030)" This reverts commit 5d805eb10bfe996cafaa51f274486b4410e00497. heap_inplace_update was incorrectly replaced by CatalogTupleUpdate in 5d805eb. In Citus, we assume a stripe entry with some columns set to null means that a write is in-progress, because otherwise we wouldn't see a such row. But this breaks when we use CatalogTupleUpdate because it inserts a new version of the row, which leaves the in-progress version behind. Among other things, this was causing various issues in PG18 - check-columnar test. --- src/backend/columnar/columnar_metadata.c | 47 ++++++++++-------------- 1 file changed, 19 insertions(+), 28 deletions(-) diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index f699553b6..5eb042bfd 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -1405,17 +1405,7 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset, Oid columnarStripesOid = ColumnarStripeRelationId(); -#if PG_VERSION_NUM >= 180000 - - /* CatalogTupleUpdate performs a normal heap UPDATE → RowExclusiveLock */ - const LOCKMODE openLockMode = RowExclusiveLock; -#else - - /* In‑place update never changed tuple length → AccessShareLock was enough */ - const LOCKMODE openLockMode = AccessShareLock; -#endif - - Relation columnarStripes = table_open(columnarStripesOid, openLockMode); + Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes); Oid indexId = ColumnarStripePKeyIndexRelationId(); @@ -1439,6 +1429,17 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset, storageId, stripeId))); } + +/* + * heap_modify_tuple + heap_inplace_update only exist on PG < 18; + * on PG18 the in-place helper was removed upstream, so we skip the whole block. + */ +#if PG_VERSION_NUM < PG_VERSION_18 + + /* + * heap_inplace_update already doesn't allow changing size of the original + * tuple, so we don't allow setting any Datum's to NULL values. + */ Datum *newValues = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); bool *newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); bool *update = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); @@ -1459,37 +1460,27 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset, newNulls, update); -#if PG_VERSION_NUM < PG_VERSION_18 - - /* - * heap_inplace_update already doesn't allow changing size of the original - * tuple, so we don't allow setting any Datum's to NULL values. - */ heap_inplace_update(columnarStripes, modifiedTuple); +#endif + /* * Existing tuple now contains modifications, because we used * heap_inplace_update(). */ HeapTuple newTuple = oldTuple; -#else - - /* Regular catalog UPDATE keeps indexes in sync */ - CatalogTupleUpdate(columnarStripes, &oldTuple->t_self, modifiedTuple); - HeapTuple newTuple = modifiedTuple; -#endif - - CommandCounterIncrement(); /* * Must not pass modifiedTuple, because BuildStripeMetadata expects a real * heap tuple with MVCC fields. */ - StripeMetadata *modifiedStripeMetadata = - BuildStripeMetadata(columnarStripes, newTuple); + StripeMetadata *modifiedStripeMetadata = BuildStripeMetadata(columnarStripes, + newTuple); + + CommandCounterIncrement(); systable_endscan(scanDescriptor); - table_close(columnarStripes, openLockMode); + table_close(columnarStripes, AccessShareLock); pfree(newValues); pfree(newNulls); From 76f18624e5cb1059d0d7b3187eb958befc0ccaff Mon Sep 17 00:00:00 2001 From: naisila Date: Tue, 14 Oct 2025 12:51:44 +0300 Subject: [PATCH 2/5] PG18 - use systable_inplace_update_* in UpdateStripeMetadataRow PG18 has removed heap_inplace_update(), which is crucial for citus_columnar extension because we always want to update stripe entries for columnar in-place. Relevant PG18 commit: https://github.com/postgres/postgres/commit/a07e03f heap_inplace_update() has been replaced by heap_inplace_update_and_unlock, which is used inside systable_inplace_update_finish, which is used together with systable_inplace_update_begin. This change has been back-patched up to v12, which is enough for us since the oldest version Citus supports is v15. In PG<18, a deprecated heap_inplace_update() is retained, however, let's start using the new functions because they are better, and such that we don't need to wrap these changes in PG18 version conditionals. Basically, in this commit we replace the following: SysScanDesc scanDescriptor = systable_beginscan(columnarStripes, indexId, indexOk, &dirtySnapshot, 2, scanKey); heap_inplace_update(columnarStripes, modifiedTuple); with the following: systable_inplace_update_begin(columnarStripes, indexId, indexOk, NULL, 2, scanKey, &tuple, &state); systable_inplace_update_finish(state, tuple); For more understanding, it's best to refer to an example: REL_18_0/src/backend/catalog/toasting.c#L349-L371 of how systable_inplace_update_begin and systable_inplace_update_finish are used in PG18, because they mirror the need of citus columnar. Fixes #8207 --- src/backend/columnar/columnar_metadata.c | 51 ++++++++---------------- 1 file changed, 16 insertions(+), 35 deletions(-) diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index 5eb042bfd..0b4f2400c 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -1394,9 +1394,6 @@ static StripeMetadata * UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset, uint64 dataLength, uint64 rowCount, uint64 chunkCount) { - SnapshotData dirtySnapshot; - InitDirtySnapshot(dirtySnapshot); - ScanKeyData scanKey[2]; ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(storageId)); @@ -1410,8 +1407,11 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset, Oid indexId = ColumnarStripePKeyIndexRelationId(); bool indexOk = OidIsValid(indexId); - SysScanDesc scanDescriptor = systable_beginscan(columnarStripes, indexId, indexOk, - &dirtySnapshot, 2, scanKey); + + void *state; + HeapTuple tuple; + systable_inplace_update_begin(columnarStripes, indexId, indexOk, NULL, + 2, scanKey, &tuple, &state); static bool loggedSlowMetadataAccessWarning = false; if (!indexOk && !loggedSlowMetadataAccessWarning) @@ -1420,8 +1420,7 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset, loggedSlowMetadataAccessWarning = true; } - HeapTuple oldTuple = systable_getnext(scanDescriptor); - if (!HeapTupleIsValid(oldTuple)) + if (!HeapTupleIsValid(tuple)) { ereport(ERROR, (errmsg("attempted to modify an unexpected stripe, " "columnar storage with id=" UINT64_FORMAT @@ -1429,17 +1428,11 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset, storageId, stripeId))); } - -/* - * heap_modify_tuple + heap_inplace_update only exist on PG < 18; - * on PG18 the in-place helper was removed upstream, so we skip the whole block. - */ -#if PG_VERSION_NUM < PG_VERSION_18 - /* - * heap_inplace_update already doesn't allow changing size of the original + * systable_inplace_update_finish already doesn't allow changing size of the original * tuple, so we don't allow setting any Datum's to NULL values. */ + Datum *newValues = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); bool *newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); bool *update = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); @@ -1454,32 +1447,20 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset, newValues[Anum_columnar_stripe_row_count - 1] = UInt64GetDatum(rowCount); newValues[Anum_columnar_stripe_chunk_count - 1] = Int32GetDatum(chunkCount); - HeapTuple modifiedTuple = heap_modify_tuple(oldTuple, - tupleDescriptor, - newValues, - newNulls, - update); + tuple = heap_modify_tuple(tuple, + tupleDescriptor, + newValues, + newNulls, + update); - heap_inplace_update(columnarStripes, modifiedTuple); -#endif + systable_inplace_update_finish(state, tuple); - - /* - * Existing tuple now contains modifications, because we used - * heap_inplace_update(). - */ - HeapTuple newTuple = oldTuple; - - /* - * Must not pass modifiedTuple, because BuildStripeMetadata expects a real - * heap tuple with MVCC fields. - */ StripeMetadata *modifiedStripeMetadata = BuildStripeMetadata(columnarStripes, - newTuple); + tuple); CommandCounterIncrement(); - systable_endscan(scanDescriptor); + heap_freetuple(tuple); table_close(columnarStripes, AccessShareLock); pfree(newValues); From 5d71fca3b468ffb654075c7f91ddf83b1e5e947a Mon Sep 17 00:00:00 2001 From: Colm Date: Fri, 17 Oct 2025 10:25:33 +0100 Subject: [PATCH 3/5] PG18 regress sanity: disable `enable_self_join_elimination` on queries (#8242) .. involving Citus tables. Interim fix for #8217 to achieve regress sanity with PG18. A complete fix will follow with PG18 feature integration. --- .../distributed/planner/distributed_planner.c | 42 +++ src/test/regress/expected/pg18.out | 289 +++++++++++++++++- src/test/regress/sql/pg18.sql | 116 ++++++- 3 files changed, 440 insertions(+), 7 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index a15c3eede..be046bf9b 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "funcapi.h" +#include "miscadmin.h" #include "access/htup_details.h" #include "access/xact.h" @@ -144,6 +145,9 @@ static void ConcatenateRTablesAndPerminfos(PlannedStmt *mainPlan, static bool CheckPostPlanDistribution(DistributedPlanningContext *planContext, bool isDistributedQuery, List *rangeTableList); +#if PG_VERSION_NUM >= PG_VERSION_18 +static int DisableSelfJoinElimination(void); +#endif /* Distributed planner hook */ PlannedStmt * @@ -155,6 +159,9 @@ distributed_planner(Query *parse, bool needsDistributedPlanning = false; bool fastPathRouterQuery = false; FastPathRestrictionContext fastPathContext = { 0 }; +#if PG_VERSION_NUM >= PG_VERSION_18 + int saveNestLevel = -1; +#endif List *rangeTableList = ExtractRangeTableEntryList(parse); @@ -218,6 +225,10 @@ distributed_planner(Query *parse, bool setPartitionedTablesInherited = false; AdjustPartitioningForDistributedPlanning(rangeTableList, setPartitionedTablesInherited); + +#if PG_VERSION_NUM >= PG_VERSION_18 + saveNestLevel = DisableSelfJoinElimination(); +#endif } } @@ -264,6 +275,13 @@ distributed_planner(Query *parse, planContext.plan = standard_planner(planContext.query, NULL, planContext.cursorOptions, planContext.boundParams); +#if PG_VERSION_NUM >= PG_VERSION_18 + if (needsDistributedPlanning) + { + Assert(saveNestLevel > 0); + AtEOXact_GUC(true, saveNestLevel); + } +#endif needsDistributedPlanning = CheckPostPlanDistribution(&planContext, needsDistributedPlanning, rangeTableList); @@ -2791,3 +2809,27 @@ CheckPostPlanDistribution(DistributedPlanningContext *planContext, bool return isDistributedQuery; } + + +#if PG_VERSION_NUM >= PG_VERSION_18 + +/* + * DisableSelfJoinElimination is used to prevent self join elimination + * during distributed query planning to ensure shard queries are correctly + * generated. PG18's self join elimination (fc069a3a6) changes the Query + * in a way that can cause problems for queries with a mix of Citus and + * Postgres tables. Self join elimination is allowed on Postgres tables + * only so queries involving shards get the benefit of it. + */ +static int +DisableSelfJoinElimination(void) +{ + int NestLevel = NewGUCNestLevel(); + set_config_option("enable_self_join_elimination", "off", + (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + return NestLevel; +} + + +#endif diff --git a/src/test/regress/expected/pg18.out b/src/test/regress/expected/pg18.out index 622540e7e..d07e6e3f2 100644 --- a/src/test/regress/expected/pg18.out +++ b/src/test/regress/expected/pg18.out @@ -165,10 +165,289 @@ ORDER BY contype; dist_n_after_drop | n | 1 (2 rows) --- cleanup -RESET client_min_messages; +-- Purpose: test self join elimination for distributed, citus local and local tables. +-- +CREATE TABLE sje_d1 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now()); +CREATE TABLE sje_d2 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now()); +CREATE TABLE sje_local (id bigserial PRIMARY KEY, title text); +SELECT create_distributed_table('sje_d1', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('sje_d2', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO sje_d1 SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO sje_d2 SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO sje_local SELECT i, i::text FROM generate_series(0,100)i; +-- Self-join elimination is applied when distributed tables are involved +-- The query plan has only one join +EXPLAIN (costs off) +select count(1) from sje_d1 INNER +JOIN sje_d2 u1 USING (id) INNER +JOIN sje_d2 u2 USING (id) INNER +JOIN sje_d2 u3 USING (id) INNER +JOIN sje_d2 u4 USING (id) INNER +JOIN sje_d2 u5 USING (id) INNER +JOIN sje_d2 u6 USING (id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (sje_d1.id = u6.id) + -> Seq Scan on sje_d1_102012 sje_d1 + -> Hash + -> Seq Scan on sje_d2_102016 u6 +(12 rows) + +select count(1) from sje_d1 INNER +JOIN sje_d2 u1 USING (id) INNER +JOIN sje_d2 u2 USING (id) INNER +JOIN sje_d2 u3 USING (id) INNER +JOIN sje_d2 u4 USING (id) INNER +JOIN sje_d2 u5 USING (id) INNER +JOIN sje_d2 u6 USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- Self-join elimination applied to from list join +EXPLAIN (costs off) +SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3 +WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id; + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (d1.id = u3.id) + -> Seq Scan on sje_d1_102012 d1 + -> Hash + -> Seq Scan on sje_d2_102016 u3 +(12 rows) + +SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3 +WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- Self-join elimination is not applied when a local table is involved +-- This is a limitation that will be resolved in citus 14 +EXPLAIN (costs off) +select count(1) from sje_d1 INNER +JOIN sje_local u1 USING (id) INNER +JOIN sje_local u2 USING (id) INNER +JOIN sje_local u3 USING (id) INNER +JOIN sje_local u4 USING (id) INNER +JOIN sje_local u5 USING (id) INNER +JOIN sje_local u6 USING (id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Seq Scan on sje_local u1 + -> Distributed Subplan XXX_2 + -> Seq Scan on sje_local u2 + -> Distributed Subplan XXX_3 + -> Seq Scan on sje_local u3 + -> Distributed Subplan XXX_4 + -> Seq Scan on sje_local u4 + -> Distributed Subplan XXX_5 + -> Seq Scan on sje_local u5 + -> Distributed Subplan XXX_6 + -> Seq Scan on sje_local u6 + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (intermediate_result_5.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_5 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result_4.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_4 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result_3.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_3 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result_2.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_2 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result_1.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result_1 + -> Hash + -> Hash Join + Hash Cond: (intermediate_result.id = sje_d1.id) + -> Function Scan on read_intermediate_result intermediate_result + -> Hash + -> Seq Scan on sje_d1_102012 sje_d1 +(44 rows) + +select count(1) from sje_d1 INNER +JOIN sje_local u1 USING (id) INNER +JOIN sje_local u2 USING (id) INNER +JOIN sje_local u3 USING (id) INNER +JOIN sje_local u4 USING (id) INNER +JOIN sje_local u5 USING (id) INNER +JOIN sje_local u6 USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- to test USING vs ON equivalence +EXPLAIN (costs off) +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 ON (d.id = u1.id) +JOIN sje_d2 u2 ON (u1.id = u2.id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (d.id = u2.id) + -> Seq Scan on sje_d1_102012 d + -> Hash + -> Seq Scan on sje_d2_102016 u2 +(12 rows) + +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 ON (d.id = u1.id) +JOIN sje_d2 u2 ON (u1.id = u2.id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- Null-introducing join can have SJE +EXPLAIN (costs off) +SELECT count(*) +FROM sje_d1 d +LEFT JOIN sje_d2 u1 USING (id) +LEFT JOIN sje_d2 u2 USING (id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Seq Scan on sje_d1_102012 d +(8 rows) + +SELECT count(*) +FROM sje_d1 d +LEFT JOIN sje_d2 u1 USING (id) +LEFT JOIN sje_d2 u2 USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- prepared statement +PREPARE sje_p(int,int) AS +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 USING (id) +JOIN sje_d2 u2 USING (id) +WHERE d.id BETWEEN $1 AND $2; +EXPLAIN (costs off) +EXECUTE sje_p(10,20); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (u2.id = d.id) + -> Seq Scan on sje_d2_102016 u2 + -> Hash + -> Bitmap Heap Scan on sje_d1_102012 d + Recheck Cond: ((id >= 10) AND (id <= 20)) + -> Bitmap Index Scan on sje_d1_pkey_102012 + Index Cond: ((id >= 10) AND (id <= 20)) +(15 rows) + +EXECUTE sje_p(10,20); + count +--------------------------------------------------------------------- + 11 +(1 row) + +-- cte +EXPLAIN (costs off) +WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0) +SELECT count(1) +FROM sje_d1 d +JOIN z USING (id) +JOIN sje_d2 u2 USING (id); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: (d.id = u2.id) + -> Seq Scan on sje_d1_102012 d + -> Hash + -> Seq Scan on sje_d2_102016 u2 + Filter: ((id % '2'::bigint) = 0) +(13 rows) + +WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0) +SELECT count(1) +FROM sje_d1 d +JOIN z USING (id) +JOIN sje_d2 u2 USING (id); + count +--------------------------------------------------------------------- + 51 +(1 row) + +-- cleanup with minimum verbosity +SET client_min_messages TO ERROR; RESET search_path; DROP SCHEMA pg18_nn CASCADE; -NOTICE: drop cascades to 2 other objects -DETAIL: drop cascades to table pg18_nn.nn_local -drop cascades to table pg18_nn.nn_dist +RESET client_min_messages; diff --git a/src/test/regress/sql/pg18.sql b/src/test/regress/sql/pg18.sql index 8281ab239..6d7685b94 100644 --- a/src/test/regress/sql/pg18.sql +++ b/src/test/regress/sql/pg18.sql @@ -134,7 +134,119 @@ WHERE conrelid = 'pg18_nn.nn_dist'::regclass GROUP BY contype ORDER BY contype; --- cleanup -RESET client_min_messages; +-- Purpose: test self join elimination for distributed, citus local and local tables. +-- +CREATE TABLE sje_d1 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now()); +CREATE TABLE sje_d2 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now()); +CREATE TABLE sje_local (id bigserial PRIMARY KEY, title text); + +SELECT create_distributed_table('sje_d1', 'id'); +SELECT create_distributed_table('sje_d2', 'id'); + +INSERT INTO sje_d1 SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO sje_d2 SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO sje_local SELECT i, i::text FROM generate_series(0,100)i; + +-- Self-join elimination is applied when distributed tables are involved +-- The query plan has only one join +EXPLAIN (costs off) +select count(1) from sje_d1 INNER +JOIN sje_d2 u1 USING (id) INNER +JOIN sje_d2 u2 USING (id) INNER +JOIN sje_d2 u3 USING (id) INNER +JOIN sje_d2 u4 USING (id) INNER +JOIN sje_d2 u5 USING (id) INNER +JOIN sje_d2 u6 USING (id); + +select count(1) from sje_d1 INNER +JOIN sje_d2 u1 USING (id) INNER +JOIN sje_d2 u2 USING (id) INNER +JOIN sje_d2 u3 USING (id) INNER +JOIN sje_d2 u4 USING (id) INNER +JOIN sje_d2 u5 USING (id) INNER +JOIN sje_d2 u6 USING (id); + +-- Self-join elimination applied to from list join +EXPLAIN (costs off) +SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3 +WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id; + +SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3 +WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id; + +-- Self-join elimination is not applied when a local table is involved +-- This is a limitation that will be resolved in citus 14 +EXPLAIN (costs off) +select count(1) from sje_d1 INNER +JOIN sje_local u1 USING (id) INNER +JOIN sje_local u2 USING (id) INNER +JOIN sje_local u3 USING (id) INNER +JOIN sje_local u4 USING (id) INNER +JOIN sje_local u5 USING (id) INNER +JOIN sje_local u6 USING (id); + +select count(1) from sje_d1 INNER +JOIN sje_local u1 USING (id) INNER +JOIN sje_local u2 USING (id) INNER +JOIN sje_local u3 USING (id) INNER +JOIN sje_local u4 USING (id) INNER +JOIN sje_local u5 USING (id) INNER +JOIN sje_local u6 USING (id); + + +-- to test USING vs ON equivalence +EXPLAIN (costs off) +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 ON (d.id = u1.id) +JOIN sje_d2 u2 ON (u1.id = u2.id); + +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 ON (d.id = u1.id) +JOIN sje_d2 u2 ON (u1.id = u2.id); + +-- Null-introducing join can have SJE +EXPLAIN (costs off) +SELECT count(*) +FROM sje_d1 d +LEFT JOIN sje_d2 u1 USING (id) +LEFT JOIN sje_d2 u2 USING (id); + +SELECT count(*) +FROM sje_d1 d +LEFT JOIN sje_d2 u1 USING (id) +LEFT JOIN sje_d2 u2 USING (id); + +-- prepared statement +PREPARE sje_p(int,int) AS +SELECT count(1) +FROM sje_d1 d +JOIN sje_d2 u1 USING (id) +JOIN sje_d2 u2 USING (id) +WHERE d.id BETWEEN $1 AND $2; + +EXPLAIN (costs off) +EXECUTE sje_p(10,20); + +EXECUTE sje_p(10,20); + +-- cte +EXPLAIN (costs off) +WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0) +SELECT count(1) +FROM sje_d1 d +JOIN z USING (id) +JOIN sje_d2 u2 USING (id); + +WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0) +SELECT count(1) +FROM sje_d1 d +JOIN z USING (id) +JOIN sje_d2 u2 USING (id); + +-- cleanup with minimum verbosity +SET client_min_messages TO ERROR; RESET search_path; DROP SCHEMA pg18_nn CASCADE; +RESET client_min_messages; From 3ca66e1fcca8b05ea7ee04591f496f66e9a6015c Mon Sep 17 00:00:00 2001 From: Colm Date: Fri, 17 Oct 2025 11:21:25 +0100 Subject: [PATCH 4/5] PG18: Fix "Unrecognized range table id" in INSERT .. SELECT planning (#8256) The error `Unrecognized range table id` seen in regress test `insert_select_into_local_tables` is a consequence of the INSERT .. SELECT planner getting confused by a SELECT query with a GROUP BY and hence a Group RTE, introduced in PG18 (commit 247dea89f). The solution is to flatten the relevant parts of the SELECT query before preparing the INSERT .. SELECT query tree for use by Citus. --- .../planner/insert_select_planner.c | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 1cf996b77..1cd4161b6 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -428,11 +428,10 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery, ParamListInfo boundParams, bool hasUnresolvedParams, PlannerRestrictionContext *plannerRestrictionContext) { - RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); - PrepareInsertSelectForCitusPlanner(insertSelectQuery); /* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */ + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); Query *selectQuery = selectRte->subquery; bool allowRecursivePlanning = true; @@ -513,6 +512,24 @@ PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery) bool isWrapped = false; +#if PG_VERSION_NUM >= PG_VERSION_18 + +/* + * PG18 is stricter about GroupRTE/GroupVar. For INSERT … SELECT with a GROUP BY, + * flatten the SELECT’s targetList and havingQual so Vars point to base RTEs and + * avoid Unrecognized range table id. + */ + if (selectRte->subquery->hasGroupRTE) + { + Query *selectQuery = selectRte->subquery; + selectQuery->targetList = (List *) + flatten_group_exprs(NULL, selectQuery, + (Node *) selectQuery->targetList); + selectQuery->havingQual = + flatten_group_exprs(NULL, selectQuery, selectQuery->havingQual); + } +#endif + if (selectRte->subquery->setOperations != NULL) { /* @@ -1431,11 +1448,6 @@ static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams) { Query *insertSelectQuery = copyObject(parse); - - RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); - RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery); - Oid targetRelationId = insertRte->relid; - DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery); @@ -1450,6 +1462,7 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou PrepareInsertSelectForCitusPlanner(insertSelectQuery); /* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */ + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); Query *selectQuery = selectRte->subquery; /* @@ -1472,6 +1485,9 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, NULL, cursorOptions, boundParams); + /* decide whether we can repartition the results */ + RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery); + Oid targetRelationId = insertRte->relid; bool repartitioned = IsRedistributablePlan(selectPlan->planTree) && IsSupportedRedistributionTarget(targetRelationId); From 90f2ab6648a03764b9f7127cd2df638148367aee Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Fri, 17 Oct 2025 14:57:36 +0300 Subject: [PATCH 5/5] Actually deprecate mark_tables_colocated() --- .../distributed/utils/colocation_utils.c | 32 ++----------------- .../expected/multi_reference_table.out | 2 +- .../regress/sql/multi_reference_table.sql | 2 +- 3 files changed, 5 insertions(+), 31 deletions(-) diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index af507d5b9..816e3ce2a 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -73,34 +73,8 @@ PG_FUNCTION_INFO_V1(update_distributed_table_colocation); Datum mark_tables_colocated(PG_FUNCTION_ARGS) { - CheckCitusVersion(ERROR); - EnsureCoordinator(); - - Oid sourceRelationId = PG_GETARG_OID(0); - ArrayType *relationIdArrayObject = PG_GETARG_ARRAYTYPE_P(1); - - int relationCount = ArrayObjectCount(relationIdArrayObject); - if (relationCount < 1) - { - ereport(ERROR, (errmsg("at least one target table is required for this " - "operation"))); - } - - EnsureTableOwner(sourceRelationId); - - Datum *relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject); - - for (int relationIndex = 0; relationIndex < relationCount; relationIndex++) - { - Oid nextRelationOid = DatumGetObjectId(relationIdDatumArray[relationIndex]); - - /* we require that the user either owns all tables or is superuser */ - EnsureTableOwner(nextRelationOid); - - MarkTablesColocated(sourceRelationId, nextRelationOid); - } - - PG_RETURN_VOID(); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("this function is deprecated and no longer is used"))); } @@ -1306,7 +1280,7 @@ ColocatedShardIdInRelation(Oid relationId, int shardIndex) /* * DeleteColocationGroupIfNoTablesBelong function deletes given co-location group if there * is no relation in that co-location group. A co-location group may become empty after - * mark_tables_colocated or upgrade_reference_table UDF calls. In that case we need to + * update_distributed_table_colocation UDF calls. In that case we need to * remove empty co-location group to prevent orphaned co-location groups. */ void diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 68835be40..b86606f7c 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -1200,7 +1200,7 @@ DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: performing repartitioned INSERT ... SELECT RESET client_min_messages; --- some tests for mark_tables_colocated +-- some tests for update_distributed_table_colocation -- should error out SELECT update_distributed_table_colocation('colocated_table_test_2', colocate_with => 'reference_table_test'); ERROR: relation reference_table_test should be a hash or single shard distributed table diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index d538effe6..924a135b0 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -768,7 +768,7 @@ WHERE RESET client_min_messages; --- some tests for mark_tables_colocated +-- some tests for update_distributed_table_colocation -- should error out SELECT update_distributed_table_colocation('colocated_table_test_2', colocate_with => 'reference_table_test');