Merge branch 'main' into fix-collation-mismatch-create-dist-table

fix-collation-mismatch-create-dist-table
Onur Tirtir 2025-10-17 12:55:26 +03:00 committed by GitHub
commit e0522bbc2b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 462 additions and 57 deletions

View File

@ -1394,9 +1394,6 @@ static StripeMetadata *
UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset,
uint64 dataLength, uint64 rowCount, uint64 chunkCount)
{
SnapshotData dirtySnapshot;
InitDirtySnapshot(dirtySnapshot);
ScanKeyData scanKey[2];
ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(storageId));
@ -1405,23 +1402,16 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset,
Oid columnarStripesOid = ColumnarStripeRelationId();
#if PG_VERSION_NUM >= 180000
/* CatalogTupleUpdate performs a normal heap UPDATE → RowExclusiveLock */
const LOCKMODE openLockMode = RowExclusiveLock;
#else
/* Inplace update never changed tuple length → AccessShareLock was enough */
const LOCKMODE openLockMode = AccessShareLock;
#endif
Relation columnarStripes = table_open(columnarStripesOid, openLockMode);
Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
Oid indexId = ColumnarStripePKeyIndexRelationId();
bool indexOk = OidIsValid(indexId);
SysScanDesc scanDescriptor = systable_beginscan(columnarStripes, indexId, indexOk,
&dirtySnapshot, 2, scanKey);
void *state;
HeapTuple tuple;
systable_inplace_update_begin(columnarStripes, indexId, indexOk, NULL,
2, scanKey, &tuple, &state);
static bool loggedSlowMetadataAccessWarning = false;
if (!indexOk && !loggedSlowMetadataAccessWarning)
@ -1430,8 +1420,7 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset,
loggedSlowMetadataAccessWarning = true;
}
HeapTuple oldTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(oldTuple))
if (!HeapTupleIsValid(tuple))
{
ereport(ERROR, (errmsg("attempted to modify an unexpected stripe, "
"columnar storage with id=" UINT64_FORMAT
@ -1439,6 +1428,11 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset,
storageId, stripeId)));
}
/*
* systable_inplace_update_finish already doesn't allow changing size of the original
* tuple, so we don't allow setting any Datum's to NULL values.
*/
Datum *newValues = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
bool *update = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
@ -1453,43 +1447,21 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset,
newValues[Anum_columnar_stripe_row_count - 1] = UInt64GetDatum(rowCount);
newValues[Anum_columnar_stripe_chunk_count - 1] = Int32GetDatum(chunkCount);
HeapTuple modifiedTuple = heap_modify_tuple(oldTuple,
tupleDescriptor,
newValues,
newNulls,
update);
tuple = heap_modify_tuple(tuple,
tupleDescriptor,
newValues,
newNulls,
update);
#if PG_VERSION_NUM < PG_VERSION_18
systable_inplace_update_finish(state, tuple);
/*
* heap_inplace_update already doesn't allow changing size of the original
* tuple, so we don't allow setting any Datum's to NULL values.
*/
heap_inplace_update(columnarStripes, modifiedTuple);
/*
* Existing tuple now contains modifications, because we used
* heap_inplace_update().
*/
HeapTuple newTuple = oldTuple;
#else
/* Regular catalog UPDATE keeps indexes in sync */
CatalogTupleUpdate(columnarStripes, &oldTuple->t_self, modifiedTuple);
HeapTuple newTuple = modifiedTuple;
#endif
StripeMetadata *modifiedStripeMetadata = BuildStripeMetadata(columnarStripes,
tuple);
CommandCounterIncrement();
/*
* Must not pass modifiedTuple, because BuildStripeMetadata expects a real
* heap tuple with MVCC fields.
*/
StripeMetadata *modifiedStripeMetadata =
BuildStripeMetadata(columnarStripes, newTuple);
systable_endscan(scanDescriptor);
table_close(columnarStripes, openLockMode);
heap_freetuple(tuple);
table_close(columnarStripes, AccessShareLock);
pfree(newValues);
pfree(newNulls);

View File

@ -13,6 +13,7 @@
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "access/htup_details.h"
#include "access/xact.h"
@ -144,6 +145,9 @@ static void ConcatenateRTablesAndPerminfos(PlannedStmt *mainPlan,
static bool CheckPostPlanDistribution(DistributedPlanningContext *planContext,
bool isDistributedQuery,
List *rangeTableList);
#if PG_VERSION_NUM >= PG_VERSION_18
static int DisableSelfJoinElimination(void);
#endif
/* Distributed planner hook */
PlannedStmt *
@ -155,6 +159,9 @@ distributed_planner(Query *parse,
bool needsDistributedPlanning = false;
bool fastPathRouterQuery = false;
FastPathRestrictionContext fastPathContext = { 0 };
#if PG_VERSION_NUM >= PG_VERSION_18
int saveNestLevel = -1;
#endif
List *rangeTableList = ExtractRangeTableEntryList(parse);
@ -218,6 +225,10 @@ distributed_planner(Query *parse,
bool setPartitionedTablesInherited = false;
AdjustPartitioningForDistributedPlanning(rangeTableList,
setPartitionedTablesInherited);
#if PG_VERSION_NUM >= PG_VERSION_18
saveNestLevel = DisableSelfJoinElimination();
#endif
}
}
@ -264,6 +275,13 @@ distributed_planner(Query *parse,
planContext.plan = standard_planner(planContext.query, NULL,
planContext.cursorOptions,
planContext.boundParams);
#if PG_VERSION_NUM >= PG_VERSION_18
if (needsDistributedPlanning)
{
Assert(saveNestLevel > 0);
AtEOXact_GUC(true, saveNestLevel);
}
#endif
needsDistributedPlanning = CheckPostPlanDistribution(&planContext,
needsDistributedPlanning,
rangeTableList);
@ -2791,3 +2809,27 @@ CheckPostPlanDistribution(DistributedPlanningContext *planContext, bool
return isDistributedQuery;
}
#if PG_VERSION_NUM >= PG_VERSION_18
/*
* DisableSelfJoinElimination is used to prevent self join elimination
* during distributed query planning to ensure shard queries are correctly
* generated. PG18's self join elimination (fc069a3a6) changes the Query
* in a way that can cause problems for queries with a mix of Citus and
* Postgres tables. Self join elimination is allowed on Postgres tables
* only so queries involving shards get the benefit of it.
*/
static int
DisableSelfJoinElimination(void)
{
int NestLevel = NewGUCNestLevel();
set_config_option("enable_self_join_elimination", "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
return NestLevel;
}
#endif

View File

@ -165,10 +165,289 @@ ORDER BY contype;
dist_n_after_drop | n | 1
(2 rows)
-- cleanup
RESET client_min_messages;
-- Purpose: test self join elimination for distributed, citus local and local tables.
--
CREATE TABLE sje_d1 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now());
CREATE TABLE sje_d2 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now());
CREATE TABLE sje_local (id bigserial PRIMARY KEY, title text);
SELECT create_distributed_table('sje_d1', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('sje_d2', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO sje_d1 SELECT i, i::text, now() FROM generate_series(0,100)i;
INSERT INTO sje_d2 SELECT i, i::text, now() FROM generate_series(0,100)i;
INSERT INTO sje_local SELECT i, i::text FROM generate_series(0,100)i;
-- Self-join elimination is applied when distributed tables are involved
-- The query plan has only one join
EXPLAIN (costs off)
select count(1) from sje_d1 INNER
JOIN sje_d2 u1 USING (id) INNER
JOIN sje_d2 u2 USING (id) INNER
JOIN sje_d2 u3 USING (id) INNER
JOIN sje_d2 u4 USING (id) INNER
JOIN sje_d2 u5 USING (id) INNER
JOIN sje_d2 u6 USING (id);
QUERY PLAN
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Hash Join
Hash Cond: (sje_d1.id = u6.id)
-> Seq Scan on sje_d1_102012 sje_d1
-> Hash
-> Seq Scan on sje_d2_102016 u6
(12 rows)
select count(1) from sje_d1 INNER
JOIN sje_d2 u1 USING (id) INNER
JOIN sje_d2 u2 USING (id) INNER
JOIN sje_d2 u3 USING (id) INNER
JOIN sje_d2 u4 USING (id) INNER
JOIN sje_d2 u5 USING (id) INNER
JOIN sje_d2 u6 USING (id);
count
---------------------------------------------------------------------
101
(1 row)
-- Self-join elimination applied to from list join
EXPLAIN (costs off)
SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3
WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id;
QUERY PLAN
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Hash Join
Hash Cond: (d1.id = u3.id)
-> Seq Scan on sje_d1_102012 d1
-> Hash
-> Seq Scan on sje_d2_102016 u3
(12 rows)
SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3
WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id;
count
---------------------------------------------------------------------
101
(1 row)
-- Self-join elimination is not applied when a local table is involved
-- This is a limitation that will be resolved in citus 14
EXPLAIN (costs off)
select count(1) from sje_d1 INNER
JOIN sje_local u1 USING (id) INNER
JOIN sje_local u2 USING (id) INNER
JOIN sje_local u3 USING (id) INNER
JOIN sje_local u4 USING (id) INNER
JOIN sje_local u5 USING (id) INNER
JOIN sje_local u6 USING (id);
QUERY PLAN
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Seq Scan on sje_local u1
-> Distributed Subplan XXX_2
-> Seq Scan on sje_local u2
-> Distributed Subplan XXX_3
-> Seq Scan on sje_local u3
-> Distributed Subplan XXX_4
-> Seq Scan on sje_local u4
-> Distributed Subplan XXX_5
-> Seq Scan on sje_local u5
-> Distributed Subplan XXX_6
-> Seq Scan on sje_local u6
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Hash Join
Hash Cond: (intermediate_result_5.id = sje_d1.id)
-> Function Scan on read_intermediate_result intermediate_result_5
-> Hash
-> Hash Join
Hash Cond: (intermediate_result_4.id = sje_d1.id)
-> Function Scan on read_intermediate_result intermediate_result_4
-> Hash
-> Hash Join
Hash Cond: (intermediate_result_3.id = sje_d1.id)
-> Function Scan on read_intermediate_result intermediate_result_3
-> Hash
-> Hash Join
Hash Cond: (intermediate_result_2.id = sje_d1.id)
-> Function Scan on read_intermediate_result intermediate_result_2
-> Hash
-> Hash Join
Hash Cond: (intermediate_result_1.id = sje_d1.id)
-> Function Scan on read_intermediate_result intermediate_result_1
-> Hash
-> Hash Join
Hash Cond: (intermediate_result.id = sje_d1.id)
-> Function Scan on read_intermediate_result intermediate_result
-> Hash
-> Seq Scan on sje_d1_102012 sje_d1
(44 rows)
select count(1) from sje_d1 INNER
JOIN sje_local u1 USING (id) INNER
JOIN sje_local u2 USING (id) INNER
JOIN sje_local u3 USING (id) INNER
JOIN sje_local u4 USING (id) INNER
JOIN sje_local u5 USING (id) INNER
JOIN sje_local u6 USING (id);
count
---------------------------------------------------------------------
101
(1 row)
-- to test USING vs ON equivalence
EXPLAIN (costs off)
SELECT count(1)
FROM sje_d1 d
JOIN sje_d2 u1 ON (d.id = u1.id)
JOIN sje_d2 u2 ON (u1.id = u2.id);
QUERY PLAN
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Hash Join
Hash Cond: (d.id = u2.id)
-> Seq Scan on sje_d1_102012 d
-> Hash
-> Seq Scan on sje_d2_102016 u2
(12 rows)
SELECT count(1)
FROM sje_d1 d
JOIN sje_d2 u1 ON (d.id = u1.id)
JOIN sje_d2 u2 ON (u1.id = u2.id);
count
---------------------------------------------------------------------
101
(1 row)
-- Null-introducing join can have SJE
EXPLAIN (costs off)
SELECT count(*)
FROM sje_d1 d
LEFT JOIN sje_d2 u1 USING (id)
LEFT JOIN sje_d2 u2 USING (id);
QUERY PLAN
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Seq Scan on sje_d1_102012 d
(8 rows)
SELECT count(*)
FROM sje_d1 d
LEFT JOIN sje_d2 u1 USING (id)
LEFT JOIN sje_d2 u2 USING (id);
count
---------------------------------------------------------------------
101
(1 row)
-- prepared statement
PREPARE sje_p(int,int) AS
SELECT count(1)
FROM sje_d1 d
JOIN sje_d2 u1 USING (id)
JOIN sje_d2 u2 USING (id)
WHERE d.id BETWEEN $1 AND $2;
EXPLAIN (costs off)
EXECUTE sje_p(10,20);
QUERY PLAN
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Hash Join
Hash Cond: (u2.id = d.id)
-> Seq Scan on sje_d2_102016 u2
-> Hash
-> Bitmap Heap Scan on sje_d1_102012 d
Recheck Cond: ((id >= 10) AND (id <= 20))
-> Bitmap Index Scan on sje_d1_pkey_102012
Index Cond: ((id >= 10) AND (id <= 20))
(15 rows)
EXECUTE sje_p(10,20);
count
---------------------------------------------------------------------
11
(1 row)
-- cte
EXPLAIN (costs off)
WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0)
SELECT count(1)
FROM sje_d1 d
JOIN z USING (id)
JOIN sje_d2 u2 USING (id);
QUERY PLAN
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Hash Join
Hash Cond: (d.id = u2.id)
-> Seq Scan on sje_d1_102012 d
-> Hash
-> Seq Scan on sje_d2_102016 u2
Filter: ((id % '2'::bigint) = 0)
(13 rows)
WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0)
SELECT count(1)
FROM sje_d1 d
JOIN z USING (id)
JOIN sje_d2 u2 USING (id);
count
---------------------------------------------------------------------
51
(1 row)
-- cleanup with minimum verbosity
SET client_min_messages TO ERROR;
RESET search_path;
DROP SCHEMA pg18_nn CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table pg18_nn.nn_local
drop cascades to table pg18_nn.nn_dist
RESET client_min_messages;

View File

@ -134,7 +134,119 @@ WHERE conrelid = 'pg18_nn.nn_dist'::regclass
GROUP BY contype
ORDER BY contype;
-- cleanup
RESET client_min_messages;
-- Purpose: test self join elimination for distributed, citus local and local tables.
--
CREATE TABLE sje_d1 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now());
CREATE TABLE sje_d2 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now());
CREATE TABLE sje_local (id bigserial PRIMARY KEY, title text);
SELECT create_distributed_table('sje_d1', 'id');
SELECT create_distributed_table('sje_d2', 'id');
INSERT INTO sje_d1 SELECT i, i::text, now() FROM generate_series(0,100)i;
INSERT INTO sje_d2 SELECT i, i::text, now() FROM generate_series(0,100)i;
INSERT INTO sje_local SELECT i, i::text FROM generate_series(0,100)i;
-- Self-join elimination is applied when distributed tables are involved
-- The query plan has only one join
EXPLAIN (costs off)
select count(1) from sje_d1 INNER
JOIN sje_d2 u1 USING (id) INNER
JOIN sje_d2 u2 USING (id) INNER
JOIN sje_d2 u3 USING (id) INNER
JOIN sje_d2 u4 USING (id) INNER
JOIN sje_d2 u5 USING (id) INNER
JOIN sje_d2 u6 USING (id);
select count(1) from sje_d1 INNER
JOIN sje_d2 u1 USING (id) INNER
JOIN sje_d2 u2 USING (id) INNER
JOIN sje_d2 u3 USING (id) INNER
JOIN sje_d2 u4 USING (id) INNER
JOIN sje_d2 u5 USING (id) INNER
JOIN sje_d2 u6 USING (id);
-- Self-join elimination applied to from list join
EXPLAIN (costs off)
SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3
WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id;
SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3
WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id;
-- Self-join elimination is not applied when a local table is involved
-- This is a limitation that will be resolved in citus 14
EXPLAIN (costs off)
select count(1) from sje_d1 INNER
JOIN sje_local u1 USING (id) INNER
JOIN sje_local u2 USING (id) INNER
JOIN sje_local u3 USING (id) INNER
JOIN sje_local u4 USING (id) INNER
JOIN sje_local u5 USING (id) INNER
JOIN sje_local u6 USING (id);
select count(1) from sje_d1 INNER
JOIN sje_local u1 USING (id) INNER
JOIN sje_local u2 USING (id) INNER
JOIN sje_local u3 USING (id) INNER
JOIN sje_local u4 USING (id) INNER
JOIN sje_local u5 USING (id) INNER
JOIN sje_local u6 USING (id);
-- to test USING vs ON equivalence
EXPLAIN (costs off)
SELECT count(1)
FROM sje_d1 d
JOIN sje_d2 u1 ON (d.id = u1.id)
JOIN sje_d2 u2 ON (u1.id = u2.id);
SELECT count(1)
FROM sje_d1 d
JOIN sje_d2 u1 ON (d.id = u1.id)
JOIN sje_d2 u2 ON (u1.id = u2.id);
-- Null-introducing join can have SJE
EXPLAIN (costs off)
SELECT count(*)
FROM sje_d1 d
LEFT JOIN sje_d2 u1 USING (id)
LEFT JOIN sje_d2 u2 USING (id);
SELECT count(*)
FROM sje_d1 d
LEFT JOIN sje_d2 u1 USING (id)
LEFT JOIN sje_d2 u2 USING (id);
-- prepared statement
PREPARE sje_p(int,int) AS
SELECT count(1)
FROM sje_d1 d
JOIN sje_d2 u1 USING (id)
JOIN sje_d2 u2 USING (id)
WHERE d.id BETWEEN $1 AND $2;
EXPLAIN (costs off)
EXECUTE sje_p(10,20);
EXECUTE sje_p(10,20);
-- cte
EXPLAIN (costs off)
WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0)
SELECT count(1)
FROM sje_d1 d
JOIN z USING (id)
JOIN sje_d2 u2 USING (id);
WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0)
SELECT count(1)
FROM sje_d1 d
JOIN z USING (id)
JOIN sje_d2 u2 USING (id);
-- cleanup with minimum verbosity
SET client_min_messages TO ERROR;
RESET search_path;
DROP SCHEMA pg18_nn CASCADE;
RESET client_min_messages;