From 84fc6801bacc167fb05ff472cfe292c1c2d387f6 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Wed, 17 Dec 2025 14:58:20 +0300 Subject: [PATCH] Add PG18 logical replication tests for generated columns via publication column lists (#8382) DESCRIPTION: Add PG18 tests verifying generated-column replication via publication lists https://github.com/postgres/postgres/commit/745217a05 Adds a new regression-test section that validates **end-to-end logical replication behavior** for PG18 generated columns when using publication **column lists**, using worker1 as publisher and worker2 as subscriber. ### Case A: column list includes generated column `b` * Publisher table has `b GENERATED ALWAYS AS (a * 10) STORED` * Publication uses `FOR TABLE ... (id, a, b)` with `publish_generated_columns = none` * Subscriber defines `b` as a **plain column** (not generated) to ensure replicated values are applied * Verifies: * Initial sync copies `b` values (20, 70) * Streaming INSERT/UPDATE replicates `b` values (e.g., `(1,5,50)` and `(3,9,90)`) ### Case B: column list excludes `b` (precedence) * Publisher table still has generated `b` * Publication uses `FOR TABLE ... (id, a)` with `publish_generated_columns = stored` * Subscriber defines `b` as a plain column * Verifies: * `b` is **not** replicated and remains `NULL` for initial sync and streaming changes, demonstrating column-list precedence via data. --- src/test/regress/expected/pg18.out | 218 ++++++++++++++++++++++++++++- src/test/regress/sql/pg18.sql | 188 +++++++++++++++++++++++++ 2 files changed, 405 insertions(+), 1 deletion(-) diff --git a/src/test/regress/expected/pg18.out b/src/test/regress/expected/pg18.out index 8434dcb3c..9ac592479 100644 --- a/src/test/regress/expected/pg18.out +++ b/src/test/regress/expected/pg18.out @@ -1249,8 +1249,224 @@ DROP PUBLICATION pub_gen_cols_stored; DROP PUBLICATION pub_gen_cols_none; DROP PUBLICATION pub_gen_cols_list_includes_b; DROP PUBLICATION pub_gen_cols_list_excludes_b; +-- ============================================================ +-- PG18: replicate stored generated columns when included in publication column list +-- Upstream: 745217a05 +-- Publisher: worker1 | Subscriber: worker2 +-- We validate initial COPY + streaming INSERT/UPDATE semantics. +-- ============================================================ +CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_table(tableName text, expectedCount integer) +RETURNS void AS $$ +DECLARE + actualCount integer; + i int; +BEGIN + FOR i IN 1..600 LOOP + EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; + EXIT WHEN actualCount = expectedCount; + PERFORM pg_sleep(0.05); + END LOOP; + + IF actualCount IS DISTINCT FROM expectedCount THEN + RAISE EXCEPTION 'timeout waiting for % rows in %, got %', + expectedCount, tableName, actualCount; + END IF; +END$$ LANGUAGE plpgsql; +CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_query(query text, expectedCount integer) +RETURNS void AS $$ +DECLARE + actualCount integer; + i int; +BEGIN + FOR i IN 1..600 LOOP + EXECUTE query INTO actualCount; + EXIT WHEN actualCount = expectedCount; + PERFORM pg_sleep(0.05); + END LOOP; + + IF actualCount IS DISTINCT FROM expectedCount THEN + RAISE EXCEPTION 'timeout waiting for query [%] to return %, got %', + query, expectedCount, actualCount; + END IF; +END$$ LANGUAGE plpgsql; +-- Build conninfo safely (psql vars do NOT expand inside single quotes) +\set conninfo_w1 '\'user=postgres host=localhost port=' :worker_1_port ' dbname=regression\'' +-- -------------------------- +-- Case A: column list includes generated column b +-- Expectation: subscriber receives b values (publisher b = a*10) because b is published in the column list. +-- -------------------------- +\c - - - :worker_1_port +SET search_path TO pg18_publication; +DROP PUBLICATION IF EXISTS pub_gen_repl_a; +NOTICE: publication "pub_gen_repl_a" does not exist, skipping +DROP TABLE IF EXISTS gen_pub_repl_a; +NOTICE: table "gen_pub_repl_a" does not exist, skipping +CREATE TABLE gen_pub_repl_a ( + id int PRIMARY KEY, + a int, + b int GENERATED ALWAYS AS (a * 10) STORED +); +INSERT INTO gen_pub_repl_a (id, a) VALUES (1, 2), (2, 7); +SET citus.enable_ddl_propagation TO off; +CREATE PUBLICATION pub_gen_repl_a + FOR TABLE gen_pub_repl_a (id, a, b) + WITH (publish = 'insert, update', publish_generated_columns = none); +RESET citus.enable_ddl_propagation; +\c - - - :worker_2_port +SET search_path TO pg18_publication; +DROP SUBSCRIPTION IF EXISTS sub_gen_repl_a; +NOTICE: subscription "sub_gen_repl_a" does not exist, skipping +DROP TABLE IF EXISTS gen_pub_repl_a; +NOTICE: table "gen_pub_repl_a" does not exist, skipping +-- IMPORTANT: subscriber b is NOT generated. This is the PG18 “replicate generated values” use-case. +CREATE TABLE gen_pub_repl_a ( + id int PRIMARY KEY, + a int, + b int +); +CREATE SUBSCRIPTION sub_gen_repl_a + CONNECTION :conninfo_w1 + PUBLICATION pub_gen_repl_a + WITH (copy_data = true); +NOTICE: created replication slot "sub_gen_repl_a" on publisher +SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_a', 2); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM gen_pub_repl_a ORDER BY id; -- expect b=20,70 + id | a | b +--------------------------------------------------------------------- + 1 | 2 | 20 + 2 | 7 | 70 +(2 rows) + +\c - - - :worker_1_port +SET search_path TO pg18_publication; +INSERT INTO gen_pub_repl_a (id, a) VALUES (3, 9); +UPDATE gen_pub_repl_a SET a = 5 WHERE id = 1; +\c - - - :worker_2_port +SET search_path TO pg18_publication; +SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_a', 3); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT wait_for_expected_rowcount_at_query( + $$SELECT COUNT(*) FROM gen_pub_repl_a WHERE id=1 AND a=5 AND b=50$$, 1); + wait_for_expected_rowcount_at_query +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM gen_pub_repl_a ORDER BY id; -- expect (1,5,50) (2,7,70) (3,9,90) + id | a | b +--------------------------------------------------------------------- + 1 | 5 | 50 + 2 | 7 | 70 + 3 | 9 | 90 +(3 rows) + +-- Cleanup Case A +DROP SUBSCRIPTION sub_gen_repl_a; +NOTICE: dropped replication slot "sub_gen_repl_a" on publisher +DROP TABLE gen_pub_repl_a; +\c - - - :worker_1_port +SET search_path TO pg18_publication; +DROP PUBLICATION pub_gen_repl_a; +DROP TABLE gen_pub_repl_a; +-- -------------------------- +-- Case B: column list excludes generated column b but publish_generated_columns=stored +-- Expectation (precedence): b is NOT replicated because column list excludes it, so subscriber b stays NULL. +-- -------------------------- +\c - - - :worker_1_port +SET search_path TO pg18_publication; +DROP PUBLICATION IF EXISTS pub_gen_repl_b; +NOTICE: publication "pub_gen_repl_b" does not exist, skipping +DROP TABLE IF EXISTS gen_pub_repl_b; +NOTICE: table "gen_pub_repl_b" does not exist, skipping +CREATE TABLE gen_pub_repl_b ( + id int PRIMARY KEY, + a int, + b int GENERATED ALWAYS AS (a * 10) STORED +); +INSERT INTO gen_pub_repl_b (id, a) VALUES (1, 2), (2, 7); +SET citus.enable_ddl_propagation TO off; +CREATE PUBLICATION pub_gen_repl_b + FOR TABLE gen_pub_repl_b (id, a) + WITH (publish = 'insert, update', publish_generated_columns = stored); +RESET citus.enable_ddl_propagation; +\c - - - :worker_2_port +SET search_path TO pg18_publication; +DROP SUBSCRIPTION IF EXISTS sub_gen_repl_b; +NOTICE: subscription "sub_gen_repl_b" does not exist, skipping +DROP TABLE IF EXISTS gen_pub_repl_b; +NOTICE: table "gen_pub_repl_b" does not exist, skipping +CREATE TABLE gen_pub_repl_b ( + id int PRIMARY KEY, + a int, + b int +); +CREATE SUBSCRIPTION sub_gen_repl_b + CONNECTION :conninfo_w1 + PUBLICATION pub_gen_repl_b + WITH (copy_data = true); +NOTICE: created replication slot "sub_gen_repl_b" on publisher +SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_b', 2); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM gen_pub_repl_b ORDER BY id; -- expect b is NULL + id | a | b +--------------------------------------------------------------------- + 1 | 2 | + 2 | 7 | +(2 rows) + +\c - - - :worker_1_port +SET search_path TO pg18_publication; +INSERT INTO gen_pub_repl_b (id, a) VALUES (3, 9); +UPDATE gen_pub_repl_b SET a = 5 WHERE id = 1; +\c - - - :worker_2_port +SET search_path TO pg18_publication; +SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_b', 3); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT wait_for_expected_rowcount_at_query( + $$SELECT COUNT(*) FROM gen_pub_repl_b WHERE id=1 AND a=5 AND b IS NULL$$, 1); + wait_for_expected_rowcount_at_query +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM gen_pub_repl_b ORDER BY id; + id | a | b +--------------------------------------------------------------------- + 1 | 5 | + 2 | 7 | + 3 | 9 | +(3 rows) + +-- Cleanup B +DROP SUBSCRIPTION sub_gen_repl_b; +NOTICE: dropped replication slot "sub_gen_repl_b" on publisher +DROP TABLE gen_pub_repl_b; +\c - - - :worker_1_port +SET search_path TO pg18_publication; +DROP PUBLICATION pub_gen_repl_b; +DROP TABLE gen_pub_repl_b; +\c - - - :master_port +SET search_path TO pg18_publication; +SET client_min_messages TO ERROR; DROP SCHEMA pg18_publication CASCADE; -NOTICE: drop cascades to table gen_pub_tab +RESET client_min_messages; SET search_path TO pg18_nn; -- END: PG18: verify publish_generated_columns is preserved for distributed tables -- PG18 Feature: FOREIGN KEY constraints can be specified as NOT ENFORCED diff --git a/src/test/regress/sql/pg18.sql b/src/test/regress/sql/pg18.sql index c88034d24..a3954f628 100644 --- a/src/test/regress/sql/pg18.sql +++ b/src/test/regress/sql/pg18.sql @@ -779,7 +779,195 @@ DROP PUBLICATION pub_gen_cols_stored; DROP PUBLICATION pub_gen_cols_none; DROP PUBLICATION pub_gen_cols_list_includes_b; DROP PUBLICATION pub_gen_cols_list_excludes_b; + + +-- ============================================================ +-- PG18: replicate stored generated columns when included in publication column list +-- Upstream: 745217a05 +-- Publisher: worker1 | Subscriber: worker2 +-- We validate initial COPY + streaming INSERT/UPDATE semantics. +-- ============================================================ +CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_table(tableName text, expectedCount integer) +RETURNS void AS $$ +DECLARE + actualCount integer; + i int; +BEGIN + FOR i IN 1..600 LOOP + EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; + EXIT WHEN actualCount = expectedCount; + PERFORM pg_sleep(0.05); + END LOOP; + + IF actualCount IS DISTINCT FROM expectedCount THEN + RAISE EXCEPTION 'timeout waiting for % rows in %, got %', + expectedCount, tableName, actualCount; + END IF; +END$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_query(query text, expectedCount integer) +RETURNS void AS $$ +DECLARE + actualCount integer; + i int; +BEGIN + FOR i IN 1..600 LOOP + EXECUTE query INTO actualCount; + EXIT WHEN actualCount = expectedCount; + PERFORM pg_sleep(0.05); + END LOOP; + + IF actualCount IS DISTINCT FROM expectedCount THEN + RAISE EXCEPTION 'timeout waiting for query [%] to return %, got %', + query, expectedCount, actualCount; + END IF; +END$$ LANGUAGE plpgsql; + +-- Build conninfo safely (psql vars do NOT expand inside single quotes) +\set conninfo_w1 '\'user=postgres host=localhost port=' :worker_1_port ' dbname=regression\'' + +-- -------------------------- +-- Case A: column list includes generated column b +-- Expectation: subscriber receives b values (publisher b = a*10) because b is published in the column list. +-- -------------------------- + +\c - - - :worker_1_port +SET search_path TO pg18_publication; + +DROP PUBLICATION IF EXISTS pub_gen_repl_a; +DROP TABLE IF EXISTS gen_pub_repl_a; + + +CREATE TABLE gen_pub_repl_a ( + id int PRIMARY KEY, + a int, + b int GENERATED ALWAYS AS (a * 10) STORED +); + +INSERT INTO gen_pub_repl_a (id, a) VALUES (1, 2), (2, 7); + +SET citus.enable_ddl_propagation TO off; +CREATE PUBLICATION pub_gen_repl_a + FOR TABLE gen_pub_repl_a (id, a, b) + WITH (publish = 'insert, update', publish_generated_columns = none); +RESET citus.enable_ddl_propagation; + +\c - - - :worker_2_port +SET search_path TO pg18_publication; + +DROP SUBSCRIPTION IF EXISTS sub_gen_repl_a; +DROP TABLE IF EXISTS gen_pub_repl_a; + +-- IMPORTANT: subscriber b is NOT generated. This is the PG18 “replicate generated values” use-case. +CREATE TABLE gen_pub_repl_a ( + id int PRIMARY KEY, + a int, + b int +); +CREATE SUBSCRIPTION sub_gen_repl_a + CONNECTION :conninfo_w1 + PUBLICATION pub_gen_repl_a + WITH (copy_data = true); + +SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_a', 2); +SELECT * FROM gen_pub_repl_a ORDER BY id; -- expect b=20,70 + +\c - - - :worker_1_port +SET search_path TO pg18_publication; + +INSERT INTO gen_pub_repl_a (id, a) VALUES (3, 9); +UPDATE gen_pub_repl_a SET a = 5 WHERE id = 1; + +\c - - - :worker_2_port +SET search_path TO pg18_publication; + +SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_a', 3); +SELECT wait_for_expected_rowcount_at_query( + $$SELECT COUNT(*) FROM gen_pub_repl_a WHERE id=1 AND a=5 AND b=50$$, 1); +SELECT * FROM gen_pub_repl_a ORDER BY id; -- expect (1,5,50) (2,7,70) (3,9,90) + +-- Cleanup Case A +DROP SUBSCRIPTION sub_gen_repl_a; +DROP TABLE gen_pub_repl_a; + +\c - - - :worker_1_port +SET search_path TO pg18_publication; + +DROP PUBLICATION pub_gen_repl_a; +DROP TABLE gen_pub_repl_a; + +-- -------------------------- +-- Case B: column list excludes generated column b but publish_generated_columns=stored +-- Expectation (precedence): b is NOT replicated because column list excludes it, so subscriber b stays NULL. +-- -------------------------- + +\c - - - :worker_1_port +SET search_path TO pg18_publication; + +DROP PUBLICATION IF EXISTS pub_gen_repl_b; +DROP TABLE IF EXISTS gen_pub_repl_b; + +CREATE TABLE gen_pub_repl_b ( + id int PRIMARY KEY, + a int, + b int GENERATED ALWAYS AS (a * 10) STORED +); + +INSERT INTO gen_pub_repl_b (id, a) VALUES (1, 2), (2, 7); + +SET citus.enable_ddl_propagation TO off; +CREATE PUBLICATION pub_gen_repl_b + FOR TABLE gen_pub_repl_b (id, a) + WITH (publish = 'insert, update', publish_generated_columns = stored); +RESET citus.enable_ddl_propagation; + +\c - - - :worker_2_port +SET search_path TO pg18_publication; + +DROP SUBSCRIPTION IF EXISTS sub_gen_repl_b; +DROP TABLE IF EXISTS gen_pub_repl_b; + +CREATE TABLE gen_pub_repl_b ( + id int PRIMARY KEY, + a int, + b int +); +CREATE SUBSCRIPTION sub_gen_repl_b + CONNECTION :conninfo_w1 + PUBLICATION pub_gen_repl_b + WITH (copy_data = true); + +SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_b', 2); +SELECT * FROM gen_pub_repl_b ORDER BY id; -- expect b is NULL + +\c - - - :worker_1_port +SET search_path TO pg18_publication; + +INSERT INTO gen_pub_repl_b (id, a) VALUES (3, 9); +UPDATE gen_pub_repl_b SET a = 5 WHERE id = 1; + +\c - - - :worker_2_port +SET search_path TO pg18_publication; + +SELECT wait_for_expected_rowcount_at_table('gen_pub_repl_b', 3); +SELECT wait_for_expected_rowcount_at_query( + $$SELECT COUNT(*) FROM gen_pub_repl_b WHERE id=1 AND a=5 AND b IS NULL$$, 1); +SELECT * FROM gen_pub_repl_b ORDER BY id; + +-- Cleanup B +DROP SUBSCRIPTION sub_gen_repl_b; +DROP TABLE gen_pub_repl_b; + +\c - - - :worker_1_port +SET search_path TO pg18_publication; +DROP PUBLICATION pub_gen_repl_b; +DROP TABLE gen_pub_repl_b; + +\c - - - :master_port +SET search_path TO pg18_publication; +SET client_min_messages TO ERROR; DROP SCHEMA pg18_publication CASCADE; +RESET client_min_messages; SET search_path TO pg18_nn; -- END: PG18: verify publish_generated_columns is preserved for distributed tables