mirror of https://github.com/citusdata/citus.git
Add PG18 logical replication tests for generated columns via publication column lists (#8382)
DESCRIPTION: Add PG18 tests verifying generated-column replication via publication lists https://github.com/postgres/postgres/commit/745217a05 Adds a new regression-test section that validates **end-to-end logical replication behavior** for PG18 generated columns when using publication **column lists**, using worker1 as publisher and worker2 as subscriber. ### Case A: column list includes generated column `b` * Publisher table has `b GENERATED ALWAYS AS (a * 10) STORED` * Publication uses `FOR TABLE ... (id, a, b)` with `publish_generated_columns = none` * Subscriber defines `b` as a **plain column** (not generated) to ensure replicated values are applied * Verifies: * Initial sync copies `b` values (20, 70) * Streaming INSERT/UPDATE replicates `b` values (e.g., `(1,5,50)` and `(3,9,90)`) ### Case B: column list excludes `b` (precedence) * Publisher table still has generated `b` * Publication uses `FOR TABLE ... (id, a)` with `publish_generated_columns = stored` * Subscriber defines `b` as a plain column * Verifies: * `b` is **not** replicated and remains `NULL` for initial sync and streaming changes, demonstrating column-list precedence via data.pull/8356/head^2
parent
f8d36f79d7
commit
84fc6801ba
|
|
@ -1249,8 +1249,224 @@ DROP PUBLICATION pub_gen_cols_stored;
|
|||
DROP PUBLICATION pub_gen_cols_none;
|
||||
DROP PUBLICATION pub_gen_cols_list_includes_b;
|
||||
DROP PUBLICATION pub_gen_cols_list_excludes_b;
|
||||
-- ============================================================
|
||||
-- PG18: replicate stored generated columns when included in publication column list
|
||||
-- Upstream: 745217a05
|
||||
-- Publisher: worker1 | Subscriber: worker2
|
||||
-- We validate initial COPY + streaming INSERT/UPDATE semantics.
|
||||
-- ============================================================
|
||||
CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_table(tableName text, expectedCount integer)
|
||||
RETURNS void AS $$
|
||||
DECLARE
|
||||
actualCount integer;
|
||||
i int;
|
||||
BEGIN
|
||||
FOR i IN 1..600 LOOP
|
||||
EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount;
|
||||
EXIT WHEN actualCount = expectedCount;
|
||||
PERFORM pg_sleep(0.05);
|
||||
END LOOP;
|
||||
|
||||
IF actualCount IS DISTINCT FROM expectedCount THEN
|
||||
RAISE EXCEPTION 'timeout waiting for % rows in %, got %',
|
||||
expectedCount, tableName, actualCount;
|
||||
END IF;
|
||||
END$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_query(query text, expectedCount integer)
|
||||
RETURNS void AS $$
|
||||
DECLARE
|
||||
actualCount integer;
|
||||
i int;
|
||||
BEGIN
|
||||
FOR i IN 1..600 LOOP
|
||||
EXECUTE query INTO actualCount;
|
||||
EXIT WHEN actualCount = expectedCount;
|
||||
PERFORM pg_sleep(0.05);
|
||||
END LOOP;
|
||||
|
||||
IF actualCount IS DISTINCT FROM expectedCount THEN
|
||||
RAISE EXCEPTION 'timeout waiting for query [%] to return %, got %',
|
||||
query, expectedCount, actualCount;
|
||||
END IF;
|
||||
END$$ LANGUAGE plpgsql;
|
||||
-- Build conninfo safely (psql vars do NOT expand inside single quotes)
|
||||
\set conninfo_w1 '\'user=postgres host=localhost port=' :worker_1_port ' dbname=regression\''
|
||||
-- --------------------------
|
||||
-- Case A: column list includes generated column b
|
||||
-- Expectation: subscriber receives b values (publisher b = a*10) because b is published in the column list.
|
||||
-- --------------------------
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO pg18_publication;
|
||||
DROP PUBLICATION IF EXISTS pub_gen_repl_a;
|
||||
NOTICE: publication "pub_gen_repl_a" does not exist, skipping
|
||||
DROP TABLE IF EXISTS gen_pub_repl_a;
|
||||
NOTICE: table "gen_pub_repl_a" does not exist, skipping
|
||||
CREATE TABLE gen_pub_repl_a (
|
||||
id int PRIMARY KEY,
|
||||
a int,
|
||||
b int GENERATED ALWAYS AS (a * 10) STORED
|
||||
);
|
||||
INSERT INTO gen_pub_repl_a (id, a) VALUES (1, 2), (2, 7);
|
||||
SET citus.enable_ddl_propagation TO off;
|
||||
CREATE PUBLICATION pub_gen_repl_a
|
||||
FOR TABLE gen_pub_repl_a (id, a, b)
|
||||
WITH (publish = 'insert, update', publish_generated_columns = none);
|
||||
RESET citus.enable_ddl_propagation;
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO pg18_publication;
|
||||
DROP SUBSCRIPTION IF EXISTS sub_gen_repl_a;
|
||||
NOTICE: subscription "sub_gen_repl_a" does not exist, skipping
|
||||
DROP TABLE IF EXISTS gen_pub_repl_a;
|
||||
NOTICE: table "gen_pub_repl_a" does not exist, skipping
|
||||
-- IMPORTANT: subscriber b is NOT generated. This is the PG18 “replicate generated values” use-case.
|
||||
CREATE TABLE gen_pub_repl_a (
|
||||
id int PRIMARY KEY,
|
||||
a int,
|
||||
b int
|
||||
);
|
||||
CREATE SUBSCRIPTION sub_gen_repl_a
|
||||
CONNECTION :conninfo_w1
|
||||
PUBLICATION pub_gen_repl_a
|
||||
WITH (copy_data = true);
|
||||
NOTICE: created replication slot "sub_gen_repl_a" on publisher
|
||||
SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_a', 2);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM gen_pub_repl_a ORDER BY id; -- expect b=20,70
|
||||
id | a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 2 | 20
|
||||
2 | 7 | 70
|
||||
(2 rows)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO pg18_publication;
|
||||
INSERT INTO gen_pub_repl_a (id, a) VALUES (3, 9);
|
||||
UPDATE gen_pub_repl_a SET a = 5 WHERE id = 1;
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO pg18_publication;
|
||||
SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_a', 3);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_query(
|
||||
$$SELECT COUNT(*) FROM gen_pub_repl_a WHERE id=1 AND a=5 AND b=50$$, 1);
|
||||
wait_for_expected_rowcount_at_query
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM gen_pub_repl_a ORDER BY id; -- expect (1,5,50) (2,7,70) (3,9,90)
|
||||
id | a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 5 | 50
|
||||
2 | 7 | 70
|
||||
3 | 9 | 90
|
||||
(3 rows)
|
||||
|
||||
-- Cleanup Case A
|
||||
DROP SUBSCRIPTION sub_gen_repl_a;
|
||||
NOTICE: dropped replication slot "sub_gen_repl_a" on publisher
|
||||
DROP TABLE gen_pub_repl_a;
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO pg18_publication;
|
||||
DROP PUBLICATION pub_gen_repl_a;
|
||||
DROP TABLE gen_pub_repl_a;
|
||||
-- --------------------------
|
||||
-- Case B: column list excludes generated column b but publish_generated_columns=stored
|
||||
-- Expectation (precedence): b is NOT replicated because column list excludes it, so subscriber b stays NULL.
|
||||
-- --------------------------
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO pg18_publication;
|
||||
DROP PUBLICATION IF EXISTS pub_gen_repl_b;
|
||||
NOTICE: publication "pub_gen_repl_b" does not exist, skipping
|
||||
DROP TABLE IF EXISTS gen_pub_repl_b;
|
||||
NOTICE: table "gen_pub_repl_b" does not exist, skipping
|
||||
CREATE TABLE gen_pub_repl_b (
|
||||
id int PRIMARY KEY,
|
||||
a int,
|
||||
b int GENERATED ALWAYS AS (a * 10) STORED
|
||||
);
|
||||
INSERT INTO gen_pub_repl_b (id, a) VALUES (1, 2), (2, 7);
|
||||
SET citus.enable_ddl_propagation TO off;
|
||||
CREATE PUBLICATION pub_gen_repl_b
|
||||
FOR TABLE gen_pub_repl_b (id, a)
|
||||
WITH (publish = 'insert, update', publish_generated_columns = stored);
|
||||
RESET citus.enable_ddl_propagation;
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO pg18_publication;
|
||||
DROP SUBSCRIPTION IF EXISTS sub_gen_repl_b;
|
||||
NOTICE: subscription "sub_gen_repl_b" does not exist, skipping
|
||||
DROP TABLE IF EXISTS gen_pub_repl_b;
|
||||
NOTICE: table "gen_pub_repl_b" does not exist, skipping
|
||||
CREATE TABLE gen_pub_repl_b (
|
||||
id int PRIMARY KEY,
|
||||
a int,
|
||||
b int
|
||||
);
|
||||
CREATE SUBSCRIPTION sub_gen_repl_b
|
||||
CONNECTION :conninfo_w1
|
||||
PUBLICATION pub_gen_repl_b
|
||||
WITH (copy_data = true);
|
||||
NOTICE: created replication slot "sub_gen_repl_b" on publisher
|
||||
SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_b', 2);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM gen_pub_repl_b ORDER BY id; -- expect b is NULL
|
||||
id | a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 2 |
|
||||
2 | 7 |
|
||||
(2 rows)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO pg18_publication;
|
||||
INSERT INTO gen_pub_repl_b (id, a) VALUES (3, 9);
|
||||
UPDATE gen_pub_repl_b SET a = 5 WHERE id = 1;
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO pg18_publication;
|
||||
SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_b', 3);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_query(
|
||||
$$SELECT COUNT(*) FROM gen_pub_repl_b WHERE id=1 AND a=5 AND b IS NULL$$, 1);
|
||||
wait_for_expected_rowcount_at_query
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM gen_pub_repl_b ORDER BY id;
|
||||
id | a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 5 |
|
||||
2 | 7 |
|
||||
3 | 9 |
|
||||
(3 rows)
|
||||
|
||||
-- Cleanup B
|
||||
DROP SUBSCRIPTION sub_gen_repl_b;
|
||||
NOTICE: dropped replication slot "sub_gen_repl_b" on publisher
|
||||
DROP TABLE gen_pub_repl_b;
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO pg18_publication;
|
||||
DROP PUBLICATION pub_gen_repl_b;
|
||||
DROP TABLE gen_pub_repl_b;
|
||||
\c - - - :master_port
|
||||
SET search_path TO pg18_publication;
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA pg18_publication CASCADE;
|
||||
NOTICE: drop cascades to table gen_pub_tab
|
||||
RESET client_min_messages;
|
||||
SET search_path TO pg18_nn;
|
||||
-- END: PG18: verify publish_generated_columns is preserved for distributed tables
|
||||
-- PG18 Feature: FOREIGN KEY constraints can be specified as NOT ENFORCED
|
||||
|
|
|
|||
|
|
@ -779,7 +779,195 @@ DROP PUBLICATION pub_gen_cols_stored;
|
|||
DROP PUBLICATION pub_gen_cols_none;
|
||||
DROP PUBLICATION pub_gen_cols_list_includes_b;
|
||||
DROP PUBLICATION pub_gen_cols_list_excludes_b;
|
||||
|
||||
|
||||
-- ============================================================
|
||||
-- PG18: replicate stored generated columns when included in publication column list
|
||||
-- Upstream: 745217a05
|
||||
-- Publisher: worker1 | Subscriber: worker2
|
||||
-- We validate initial COPY + streaming INSERT/UPDATE semantics.
|
||||
-- ============================================================
|
||||
CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_table(tableName text, expectedCount integer)
|
||||
RETURNS void AS $$
|
||||
DECLARE
|
||||
actualCount integer;
|
||||
i int;
|
||||
BEGIN
|
||||
FOR i IN 1..600 LOOP
|
||||
EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount;
|
||||
EXIT WHEN actualCount = expectedCount;
|
||||
PERFORM pg_sleep(0.05);
|
||||
END LOOP;
|
||||
|
||||
IF actualCount IS DISTINCT FROM expectedCount THEN
|
||||
RAISE EXCEPTION 'timeout waiting for % rows in %, got %',
|
||||
expectedCount, tableName, actualCount;
|
||||
END IF;
|
||||
END$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_query(query text, expectedCount integer)
|
||||
RETURNS void AS $$
|
||||
DECLARE
|
||||
actualCount integer;
|
||||
i int;
|
||||
BEGIN
|
||||
FOR i IN 1..600 LOOP
|
||||
EXECUTE query INTO actualCount;
|
||||
EXIT WHEN actualCount = expectedCount;
|
||||
PERFORM pg_sleep(0.05);
|
||||
END LOOP;
|
||||
|
||||
IF actualCount IS DISTINCT FROM expectedCount THEN
|
||||
RAISE EXCEPTION 'timeout waiting for query [%] to return %, got %',
|
||||
query, expectedCount, actualCount;
|
||||
END IF;
|
||||
END$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Build conninfo safely (psql vars do NOT expand inside single quotes)
|
||||
\set conninfo_w1 '\'user=postgres host=localhost port=' :worker_1_port ' dbname=regression\''
|
||||
|
||||
-- --------------------------
|
||||
-- Case A: column list includes generated column b
|
||||
-- Expectation: subscriber receives b values (publisher b = a*10) because b is published in the column list.
|
||||
-- --------------------------
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO pg18_publication;
|
||||
|
||||
DROP PUBLICATION IF EXISTS pub_gen_repl_a;
|
||||
DROP TABLE IF EXISTS gen_pub_repl_a;
|
||||
|
||||
|
||||
CREATE TABLE gen_pub_repl_a (
|
||||
id int PRIMARY KEY,
|
||||
a int,
|
||||
b int GENERATED ALWAYS AS (a * 10) STORED
|
||||
);
|
||||
|
||||
INSERT INTO gen_pub_repl_a (id, a) VALUES (1, 2), (2, 7);
|
||||
|
||||
SET citus.enable_ddl_propagation TO off;
|
||||
CREATE PUBLICATION pub_gen_repl_a
|
||||
FOR TABLE gen_pub_repl_a (id, a, b)
|
||||
WITH (publish = 'insert, update', publish_generated_columns = none);
|
||||
RESET citus.enable_ddl_propagation;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO pg18_publication;
|
||||
|
||||
DROP SUBSCRIPTION IF EXISTS sub_gen_repl_a;
|
||||
DROP TABLE IF EXISTS gen_pub_repl_a;
|
||||
|
||||
-- IMPORTANT: subscriber b is NOT generated. This is the PG18 “replicate generated values” use-case.
|
||||
CREATE TABLE gen_pub_repl_a (
|
||||
id int PRIMARY KEY,
|
||||
a int,
|
||||
b int
|
||||
);
|
||||
CREATE SUBSCRIPTION sub_gen_repl_a
|
||||
CONNECTION :conninfo_w1
|
||||
PUBLICATION pub_gen_repl_a
|
||||
WITH (copy_data = true);
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_a', 2);
|
||||
SELECT * FROM gen_pub_repl_a ORDER BY id; -- expect b=20,70
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO pg18_publication;
|
||||
|
||||
INSERT INTO gen_pub_repl_a (id, a) VALUES (3, 9);
|
||||
UPDATE gen_pub_repl_a SET a = 5 WHERE id = 1;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO pg18_publication;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_a', 3);
|
||||
SELECT wait_for_expected_rowcount_at_query(
|
||||
$$SELECT COUNT(*) FROM gen_pub_repl_a WHERE id=1 AND a=5 AND b=50$$, 1);
|
||||
SELECT * FROM gen_pub_repl_a ORDER BY id; -- expect (1,5,50) (2,7,70) (3,9,90)
|
||||
|
||||
-- Cleanup Case A
|
||||
DROP SUBSCRIPTION sub_gen_repl_a;
|
||||
DROP TABLE gen_pub_repl_a;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO pg18_publication;
|
||||
|
||||
DROP PUBLICATION pub_gen_repl_a;
|
||||
DROP TABLE gen_pub_repl_a;
|
||||
|
||||
-- --------------------------
|
||||
-- Case B: column list excludes generated column b but publish_generated_columns=stored
|
||||
-- Expectation (precedence): b is NOT replicated because column list excludes it, so subscriber b stays NULL.
|
||||
-- --------------------------
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO pg18_publication;
|
||||
|
||||
DROP PUBLICATION IF EXISTS pub_gen_repl_b;
|
||||
DROP TABLE IF EXISTS gen_pub_repl_b;
|
||||
|
||||
CREATE TABLE gen_pub_repl_b (
|
||||
id int PRIMARY KEY,
|
||||
a int,
|
||||
b int GENERATED ALWAYS AS (a * 10) STORED
|
||||
);
|
||||
|
||||
INSERT INTO gen_pub_repl_b (id, a) VALUES (1, 2), (2, 7);
|
||||
|
||||
SET citus.enable_ddl_propagation TO off;
|
||||
CREATE PUBLICATION pub_gen_repl_b
|
||||
FOR TABLE gen_pub_repl_b (id, a)
|
||||
WITH (publish = 'insert, update', publish_generated_columns = stored);
|
||||
RESET citus.enable_ddl_propagation;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO pg18_publication;
|
||||
|
||||
DROP SUBSCRIPTION IF EXISTS sub_gen_repl_b;
|
||||
DROP TABLE IF EXISTS gen_pub_repl_b;
|
||||
|
||||
CREATE TABLE gen_pub_repl_b (
|
||||
id int PRIMARY KEY,
|
||||
a int,
|
||||
b int
|
||||
);
|
||||
CREATE SUBSCRIPTION sub_gen_repl_b
|
||||
CONNECTION :conninfo_w1
|
||||
PUBLICATION pub_gen_repl_b
|
||||
WITH (copy_data = true);
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_b', 2);
|
||||
SELECT * FROM gen_pub_repl_b ORDER BY id; -- expect b is NULL
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO pg18_publication;
|
||||
|
||||
INSERT INTO gen_pub_repl_b (id, a) VALUES (3, 9);
|
||||
UPDATE gen_pub_repl_b SET a = 5 WHERE id = 1;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO pg18_publication;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_b', 3);
|
||||
SELECT wait_for_expected_rowcount_at_query(
|
||||
$$SELECT COUNT(*) FROM gen_pub_repl_b WHERE id=1 AND a=5 AND b IS NULL$$, 1);
|
||||
SELECT * FROM gen_pub_repl_b ORDER BY id;
|
||||
|
||||
-- Cleanup B
|
||||
DROP SUBSCRIPTION sub_gen_repl_b;
|
||||
DROP TABLE gen_pub_repl_b;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO pg18_publication;
|
||||
DROP PUBLICATION pub_gen_repl_b;
|
||||
DROP TABLE gen_pub_repl_b;
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO pg18_publication;
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA pg18_publication CASCADE;
|
||||
RESET client_min_messages;
|
||||
SET search_path TO pg18_nn;
|
||||
-- END: PG18: verify publish_generated_columns is preserved for distributed tables
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue