diff --git a/src/test/regress/expected/pg17_1.out b/src/test/regress/expected/pg17_1.out new file mode 100644 index 000000000..31d36b907 --- /dev/null +++ b/src/test/regress/expected/pg17_1.out @@ -0,0 +1,3191 @@ +-- +-- PG17 +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 17 AS server_version_ge_17 +\gset +-- PG17 has the capabilty to pull up a correlated ANY subquery to a join if +-- the subquery only refers to its immediate parent query. Previously, the +-- subquery needed to be implemented as a SubPlan node, typically as a +-- filter on a scan or join node. This PG17 capability enables Citus to +-- run queries with correlated subqueries in certain cases, as shown here. +-- Relevant PG commit: +-- https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=9f1337639 +-- This feature is tested for all PG versions, not just PG17; each test query with +-- a correlated subquery should fail with PG version < 17.0, but the test query +-- rewritten to reflect how PG17 optimizes it should succeed with PG < 17.0 +CREATE SCHEMA pg17_corr_subq_folding; +SET search_path TO pg17_corr_subq_folding; +SET citus.next_shard_id TO 20240017; +SET citus.shard_count TO 2; +SET citus.shard_replication_factor TO 1; +CREATE TABLE test (x int, y int); +SELECT create_distributed_table('test', 'x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test VALUES (1,1), (2,2); +-- Query 1: WHERE clause has a correlated subquery with a UNION. PG17 can plan +-- this as a nested loop join with the subquery as the inner. The correlation +-- is on the distribution column so the join can be pushed down by Citus. +explain (costs off) +SELECT * +FROM test a +WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) +ORDER BY 1,2; + QUERY PLAN +--------------------------------------------------------------------- + Sort + Sort Key: remote_scan.x, remote_scan.y + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Nested Loop + -> Seq Scan on test_20240017 a + -> Subquery Scan on "ANY_subquery" + Filter: (a.x = "ANY_subquery".x) + -> HashAggregate + Group Key: b.x + -> Append + -> Seq Scan on test_20240017 b + -> Seq Scan on test_20240017 c + Filter: (a.x = x) +(17 rows) + +SET client_min_messages TO DEBUG2; +SELECT * +FROM test a +WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) +ORDER BY 1,2; +DEBUG: Router planner cannot handle multi-shard select queries + x | y +--------------------------------------------------------------------- + 1 | 1 + 2 | 2 +(2 rows) + +RESET client_min_messages; +-- Query 1 rewritten with subquery pulled up to a join, as done by PG17 planner; +-- this query can be run without issues by Citus with older (pre PG17) PGs. +explain (costs off) +SELECT a.* +FROM test a JOIN LATERAL (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) dt1 ON a.x = dt1.x +ORDER BY 1,2; + QUERY PLAN +--------------------------------------------------------------------- + Sort + Sort Key: remote_scan.x, remote_scan.y + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Nested Loop + -> Seq Scan on test_20240017 a + -> Subquery Scan on dt1 + Filter: (a.x = dt1.x) + -> HashAggregate + Group Key: b.x + -> Append + -> Seq Scan on test_20240017 b + -> Seq Scan on test_20240017 c + Filter: (a.x = x) +(17 rows) + +SET client_min_messages TO DEBUG2; +SELECT a.* +FROM test a JOIN LATERAL (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) dt1 ON a.x = dt1.x +ORDER BY 1,2; +DEBUG: Router planner cannot handle multi-shard select queries + x | y +--------------------------------------------------------------------- + 1 | 1 + 2 | 2 +(2 rows) + +RESET client_min_messages; +CREATE TABLE users (user_id int, time int, dept int, info bigint); +CREATE TABLE events (user_id int, time int, event_type int, payload text); +select create_distributed_table('users', 'user_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('events', 'user_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +insert into users +select i, 2021 + (i % 3), i % 5, 99999 * i from generate_series(1, 10) i; +insert into events +select i % 10 + 1, 2021 + (i % 3), i %11, md5((i*i)::text) from generate_series(1, 100) i; +-- Query 2. In Citus correlated subqueries can not be used in the WHERE +-- clause but if the subquery can be pulled up to a join it becomes possible +-- for Citus to run the query, per this example. Pre PG17 the suqbuery +-- was implemented as a SubPlan filter on the events table scan. +EXPLAIN (costs off) +WITH event_id + AS(SELECT user_id AS events_user_id, + time AS events_time, + event_type + FROM events) +SELECT Count(*) +FROM event_id +WHERE (events_user_id) IN (SELECT user_id + FROM users + WHERE users.time = events_time); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: ((events."time" = users."time") AND (events.user_id = users.user_id)) + -> Seq Scan on events_20240021 events + -> Hash + -> HashAggregate + Group Key: users."time", users.user_id + -> Seq Scan on users_20240019 users +(14 rows) + +SET client_min_messages TO DEBUG2; +WITH event_id + AS(SELECT user_id AS events_user_id, + time AS events_time, + event_type + FROM events) +SELECT Count(*) +FROM event_id +WHERE (events_user_id) IN (SELECT user_id + FROM users + WHERE users.time = events_time); +DEBUG: CTE event_id is going to be inlined via distributed planning +DEBUG: Router planner cannot handle multi-shard select queries + count +--------------------------------------------------------------------- + 31 +(1 row) + +RESET client_min_messages; +-- Query 2 rewritten with subquery pulled up to a join, as done by pg17 planner. Citus +-- Citus is able to run this query with previous pg versions. Note that the CTE can be +-- disregarded because it is inlined, being only referenced once. +EXPLAIN (COSTS OFF) +SELECT Count(*) +FROM (SELECT user_id AS events_user_id, + time AS events_time, + event_type FROM events) dt1 +INNER JOIN (SELECT distinct user_id, time FROM users) dt + ON events_user_id = dt.user_id and events_time = dt.time; + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: ((events.user_id = users.user_id) AND (events."time" = users."time")) + -> Seq Scan on events_20240021 events + -> Hash + -> HashAggregate + Group Key: users.user_id, users."time" + -> Seq Scan on users_20240019 users +(14 rows) + +SET client_min_messages TO DEBUG2; +SELECT Count(*) +FROM (SELECT user_id AS events_user_id, + time AS events_time, + event_type FROM events) dt1 +INNER JOIN (SELECT distinct user_id, time FROM users) dt + ON events_user_id = dt.user_id and events_time = dt.time; +DEBUG: Router planner cannot handle multi-shard select queries + count +--------------------------------------------------------------------- + 31 +(1 row) + +RESET client_min_messages; +-- Query 3: another example where recursive planning was prevented due to +-- correlated subqueries, but with PG17 folding the subquery to a join it is +-- possible for Citus to plan and run the query. +EXPLAIN (costs off) +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id IN + (SELECT s2.user_id FROM users as s2 + GROUP BY d1.user_id, s2.user_id)) dt +GROUP BY dept; + QUERY PLAN +--------------------------------------------------------------------- + HashAggregate + Group Key: remote_scan.dept + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> GroupAggregate + Group Key: users.dept + -> Sort + Sort Key: users.dept + -> Nested Loop Semi Join + -> Hash Join + Hash Cond: (d1.user_id = users.user_id) + -> Seq Scan on events_20240021 d1 + -> Hash + -> Seq Scan on users_20240019 users + Filter: (dept = ANY ('{3,4}'::integer[])) + -> Subquery Scan on "ANY_subquery" + Filter: (d1.user_id = "ANY_subquery".user_id) + -> HashAggregate + Group Key: s2.user_id + -> Seq Scan on users_20240019 s2 +(23 rows) + +SET client_min_messages TO DEBUG2; +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id IN + (SELECT s2.user_id FROM users as s2 + GROUP BY d1.user_id, s2.user_id)) dt +GROUP BY dept; +DEBUG: Router planner cannot handle multi-shard select queries + dept | sum +--------------------------------------------------------------------- + 3 | 110 + 4 | 130 +(2 rows) + +RESET client_min_messages; +-- Query 3 rewritten in a similar way to how the PG17 pulls up the subquery; +-- the join is on the distribution key so Citus can push down. +EXPLAIN (costs off) +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 + JOIN LATERAL (SELECT s2.user_id FROM users as s2 + GROUP BY s2.user_id HAVING d1.user_id IS NOT NULL) as d2 ON 1=1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id = d2.user_id) dt +GROUP BY dept; + QUERY PLAN +--------------------------------------------------------------------- + HashAggregate + Group Key: remote_scan.dept + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> GroupAggregate + Group Key: users.dept + -> Sort + Sort Key: users.dept + -> Nested Loop + -> Hash Join + Hash Cond: (d1.user_id = users.user_id) + -> Seq Scan on events_20240021 d1 + -> Hash + -> Seq Scan on users_20240019 users + Filter: (dept = ANY ('{3,4}'::integer[])) + -> Subquery Scan on d2 + Filter: (d1.user_id = d2.user_id) + -> HashAggregate + Group Key: s2.user_id + -> Result + One-Time Filter: (d1.user_id IS NOT NULL) + -> Seq Scan on users_20240019 s2 +(25 rows) + +SET client_min_messages TO DEBUG2; +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 + JOIN LATERAL (SELECT s2.user_id FROM users as s2 + GROUP BY s2.user_id HAVING d1.user_id IS NOT NULL) as d2 ON 1=1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id = d2.user_id) dt +GROUP BY dept; +DEBUG: Router planner cannot handle multi-shard select queries + dept | sum +--------------------------------------------------------------------- + 3 | 110 + 4 | 130 +(2 rows) + +RESET client_min_messages; +RESET search_path; +DROP SCHEMA pg17_corr_subq_folding CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table pg17_corr_subq_folding.test +drop cascades to table pg17_corr_subq_folding.users +drop cascades to table pg17_corr_subq_folding.events +-- Queries with outer joins with pseudoconstant quals work only in PG17 +-- Relevant PG17 commit: +-- https://github.com/postgres/postgres/commit/9e9931d2b +CREATE SCHEMA pg17_outerjoin; +SET search_path to pg17_outerjoin, public; +SET citus.next_shard_id TO 20250321; +-- issue https://github.com/citusdata/citus/issues/7697 +create table t0 (vkey int4 , c3 timestamp); +create table t3 ( vkey int4 ,c26 timestamp); +create table t4 ( vkey int4 ); +insert into t0 (vkey, c3) values (13,make_timestamp(2019, 10, 23, 15, 34, 50)); +insert into t3 (vkey,c26) values (1, make_timestamp(2024, 3, 26, 17, 36, 53)); +insert into t4 (vkey) values (1); +select * from + (t0 full outer join t3 + on (t0.c3 = t3.c26 )) +where (exists (select * from t4)) order by 1, 2, 3; + vkey | c3 | vkey | c26 +--------------------------------------------------------------------- + 13 | Wed Oct 23 15:34:50 2019 | | + | | 1 | Tue Mar 26 17:36:53 2024 +(2 rows) + +SELECT create_distributed_table('t0', 'vkey'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17_outerjoin.t0$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select * from + (t0 full outer join t3 + on (t0.c3 = t3.c26 )) +where (exists (select * from t4)) order by 1, 2, 3; + vkey | c3 | vkey | c26 +--------------------------------------------------------------------- + 13 | Wed Oct 23 15:34:50 2019 | | + | | 1 | Tue Mar 26 17:36:53 2024 +(2 rows) + +-- issue https://github.com/citusdata/citus/issues/7696 +create table t1 ( vkey int4 ); +create table t2 ( vkey int4 ); +insert into t2 (vkey) values (5); +select * from (t2 full outer join t1 on(t2.vkey = t1.vkey )) +where not((85) in (select 1 from t2)); + vkey | vkey +--------------------------------------------------------------------- + 5 | +(1 row) + +SELECT create_distributed_table('t1', 'vkey'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('t2'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17_outerjoin.t2$$) + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +select * from (t2 full outer join t1 on(t2.vkey = t1.vkey )) +where not((85) in (select 1 from t2)); + vkey | vkey +--------------------------------------------------------------------- + 5 | +(1 row) + +-- issue https://github.com/citusdata/citus/issues/7698 +create table t5 ( vkey int4, c10 int4 ); +create table t6 ( vkey int4 ); +insert into t5 (vkey,c10) values (4, -70); +insert into t6 (vkey) values (1); +select t6.vkey +from (t5 right outer join t6 + on (t5.c10 = t6.vkey)) +where exists (select * from t6); + vkey +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT create_distributed_table('t5', 'vkey'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17_outerjoin.t5$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select t6.vkey +from (t5 right outer join t6 + on (t5.c10 = t6.vkey)) +where exists (select * from t6); + vkey +--------------------------------------------------------------------- + 1 +(1 row) + +-- issue https://github.com/citusdata/citus/issues/7119 +-- this test was removed in +-- https://github.com/citusdata/citus/commit/a5ce601c0 +-- Citus doesn't support it in PG15 and PG16, but supports it in PG17 +CREATE TABLE users_table_local AS SELECT * FROM users_table; +CREATE TABLE events_table_local AS SELECT * FROM events_table; +SET client_min_messages TO DEBUG1; +-- subquery in FROM -> FROM -> WHERE -> WHERE should be replaced if +-- it contains onle local tables +-- Later the upper level query is also recursively planned due to LIMIT +SELECT user_id, array_length(events_table, 1) +FROM ( + SELECT user_id, array_agg(event ORDER BY time) AS events_table + FROM ( + SELECT + u.user_id, e.event_type::text AS event, e.time + FROM + users_table AS u, + events_table AS e + WHERE u.user_id = e.user_id AND + u.user_id IN + ( + SELECT + user_id + FROM + users_table + WHERE value_2 >= 5 + AND EXISTS (SELECT user_id FROM events_table_local WHERE event_type > 1 AND event_type <= 3 AND value_3 > 1) + AND NOT EXISTS (SELECT user_id FROM events_table WHERE event_type > 3 AND event_type <= 4 AND value_3 > 1 AND user_id = users_table.user_id) + LIMIT 5 + ) + ) t + GROUP BY user_id +) q +ORDER BY 2 DESC, 1; +DEBUG: generating subplan XXX_1 for subquery SELECT user_id FROM pg17_outerjoin.events_table_local WHERE ((event_type OPERATOR(pg_catalog.>) 1) AND (event_type OPERATOR(pg_catalog.<=) 3) AND (value_3 OPERATOR(pg_catalog.>) (1)::double precision)) +DEBUG: push down of limit count: 5 +DEBUG: generating subplan XXX_2 for subquery SELECT user_id FROM public.users_table WHERE ((value_2 OPERATOR(pg_catalog.>=) 5) AND (EXISTS (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer))) AND (NOT (EXISTS (SELECT events_table.user_id FROM public.events_table WHERE ((events_table.event_type OPERATOR(pg_catalog.>) 3) AND (events_table.event_type OPERATOR(pg_catalog.<=) 4) AND (events_table.value_3 OPERATOR(pg_catalog.>) (1)::double precision) AND (events_table.user_id OPERATOR(pg_catalog.=) users_table.user_id)))))) LIMIT 5 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, array_length(events_table, 1) AS array_length FROM (SELECT t.user_id, array_agg(t.event ORDER BY t."time") AS events_table FROM (SELECT u.user_id, (e.event_type)::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer))))) t GROUP BY t.user_id) q ORDER BY (array_length(events_table, 1)) DESC, user_id + user_id | array_length +--------------------------------------------------------------------- + 5 | 364 +(1 row) + +RESET search_path; +SET citus.next_shard_id TO 20240023; +SET client_min_messages TO ERROR; +DROP SCHEMA pg17_outerjoin CASCADE; +RESET client_min_messages; +\if :server_version_ge_17 +\else +\q +\endif +-- PG17-specific tests go here. +-- +CREATE SCHEMA pg17; +SET search_path to pg17; +-- Test specifying access method on partitioned tables. PG17 feature, added by: +-- https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=374c7a229 +-- The following tests were failing tests in tableam but will pass on PG >= 17. +-- There is some set-up duplication of tableam, and this test can be returned +-- to tableam when 17 is the minimum supported PG version. +SELECT public.run_command_on_coordinator_and_workers($Q$ + SET citus.enable_ddl_propagation TO off; + CREATE FUNCTION fake_am_handler(internal) + RETURNS table_am_handler + AS 'citus' + LANGUAGE C; + CREATE ACCESS METHOD fake_am TYPE TABLE HANDLER fake_am_handler; +$Q$); + run_command_on_coordinator_and_workers +--------------------------------------------------------------------- + +(1 row) + +-- Since Citus assumes access methods are part of the extension, make fake_am +-- owned manually to be able to pass checks on Citus while distributing tables. +ALTER EXTENSION citus ADD ACCESS METHOD fake_am; +CREATE TABLE test_partitioned(id int, p int, val int) +PARTITION BY RANGE (p) USING fake_am; +-- Test that children inherit access method from parent +CREATE TABLE test_partitioned_p1 PARTITION OF test_partitioned + FOR VALUES FROM (1) TO (10); +CREATE TABLE test_partitioned_p2 PARTITION OF test_partitioned + FOR VALUES FROM (11) TO (20); +INSERT INTO test_partitioned VALUES (1, 5, -1), (2, 15, -2); +WARNING: fake_tuple_insert +WARNING: fake_tuple_insert +INSERT INTO test_partitioned VALUES (3, 6, -6), (4, 16, -4); +WARNING: fake_tuple_insert +WARNING: fake_tuple_insert +SELECT count(1) FROM test_partitioned_p1; +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT count(1) FROM test_partitioned_p2; +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot + count +--------------------------------------------------------------------- + 2 +(1 row) + +-- Both child table partitions inherit fake_am +SELECT c.relname, am.amname FROM pg_class c, pg_am am +WHERE c.relam = am.oid AND c.oid IN ('test_partitioned_p1'::regclass, 'test_partitioned_p2'::regclass) +ORDER BY c.relname; + relname | amname +--------------------------------------------------------------------- + test_partitioned_p1 | fake_am + test_partitioned_p2 | fake_am +(2 rows) + +-- Clean up +DROP TABLE test_partitioned; +ALTER EXTENSION citus DROP ACCESS METHOD fake_am; +SELECT public.run_command_on_coordinator_and_workers($Q$ + RESET citus.enable_ddl_propagation; +$Q$); + run_command_on_coordinator_and_workers +--------------------------------------------------------------------- + +(1 row) + +-- End of testing specifying access method on partitioned tables. +-- MAINTAIN privilege tests +CREATE ROLE regress_maintain; +CREATE ROLE regress_no_maintain; +ALTER ROLE regress_maintain WITH login; +GRANT USAGE ON SCHEMA pg17 TO regress_maintain; +ALTER ROLE regress_no_maintain WITH login; +GRANT USAGE ON SCHEMA pg17 TO regress_no_maintain; +SET citus.shard_count TO 1; -- For consistent remote command logging +CREATE TABLE dist_test(a int, b int); +SELECT create_distributed_table('dist_test', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_test SELECT i % 10, i FROM generate_series(1, 100) t(i); +SET citus.log_remote_commands TO on; +SET citus.grep_remote_commands = '%maintain%'; +GRANT MAINTAIN ON dist_test TO regress_maintain; +NOTICE: issuing GRANT maintain ON dist_test TO regress_maintain +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing GRANT maintain ON dist_test TO regress_maintain +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (20240023, 'pg17', 'GRANT maintain ON dist_test TO regress_maintain') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +RESET citus.grep_remote_commands; +SET ROLE regress_no_maintain; +-- Current role does not have MAINTAIN privileges on dist_test +ANALYZE dist_test; +WARNING: permission denied to analyze "dist_test", skipping it +NOTICE: issuing ANALYZE pg17.dist_test_20240023 +DETAIL: on server regress_no_maintain@localhost:xxxxx connectionId: xxxxxxx +VACUUM dist_test; +WARNING: permission denied to vacuum "dist_test", skipping it +NOTICE: issuing VACUUM pg17.dist_test_20240023 +DETAIL: on server regress_no_maintain@localhost:xxxxx connectionId: xxxxxxx +SET ROLE regress_maintain; +-- Current role has MAINTAIN privileges on dist_test +ANALYZE dist_test; +NOTICE: issuing ANALYZE pg17.dist_test_20240023 +DETAIL: on server regress_maintain@localhost:xxxxx connectionId: xxxxxxx +VACUUM dist_test; +NOTICE: issuing VACUUM pg17.dist_test_20240023 +DETAIL: on server regress_maintain@localhost:xxxxx connectionId: xxxxxxx +-- Take away regress_maintain's MAINTAIN privileges on dist_test +RESET ROLE; +SET citus.grep_remote_commands = '%maintain%'; +REVOKE MAINTAIN ON dist_test FROM regress_maintain; +NOTICE: issuing REVOKE maintain ON dist_test FROM regress_maintain RESTRICT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing REVOKE maintain ON dist_test FROM regress_maintain RESTRICT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (20240023, 'pg17', 'REVOKE maintain ON dist_test FROM regress_maintain RESTRICT') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +RESET citus.grep_remote_commands; +SET ROLE regress_maintain; +-- Current role does not have MAINTAIN privileges on dist_test +ANALYZE dist_test; +WARNING: permission denied to analyze "dist_test", skipping it +NOTICE: issuing ANALYZE pg17.dist_test_20240023 +DETAIL: on server regress_maintain@localhost:xxxxx connectionId: xxxxxxx +VACUUM dist_test; +WARNING: permission denied to vacuum "dist_test", skipping it +NOTICE: issuing VACUUM pg17.dist_test_20240023 +DETAIL: on server regress_maintain@localhost:xxxxx connectionId: xxxxxxx +RESET ROLE; +-- End of MAINTAIN privilege tests +-- Partitions inherit identity column +RESET citus.log_remote_commands; +-- PG17 added support for identity columns in partioned tables: +-- https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=699586315 +-- In particular, partitions with their own identity columns are not allowed. +-- Citus does not need to propagate identity columns in partitions; the identity +-- is inherited by PG17 behavior, as shown in this test. +CREATE TABLE partitioned_table ( + a bigint GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10), + c int +) +PARTITION BY RANGE (c); +CREATE TABLE pt_1 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (50); +SELECT create_distributed_table('partitioned_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE pt_2 PARTITION OF partitioned_table FOR VALUES FROM (50) TO (1000); +-- (1) The partitioned table has pt_1 and pt_2 as its partitions +\d+ partitioned_table; + Partitioned table "pg17.partitioned_table" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity | plain | | + c | integer | | | | plain | | +Partition key: RANGE (c) +Not-null constraints: + "partitioned_table_a_not_null" NOT NULL "a" +Partitions: pt_1 FOR VALUES FROM (1) TO (50), + pt_2 FOR VALUES FROM (50) TO (1000) + +-- (2) The partitions have the same identity column as the parent table; +-- This is PG17 behavior for support for identity in partitioned tables. +\d pt_1; + Table "pg17.pt_1" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + c | integer | | | +Partition of: partitioned_table FOR VALUES FROM (1) TO (50) + +\d pt_2; + Table "pg17.pt_2" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + c | integer | | | +Partition of: partitioned_table FOR VALUES FROM (50) TO (1000) + +-- Attaching a partition inherits the identity column from the parent table +CREATE TABLE pt_3 (a bigint not null, c int); +ALTER TABLE partitioned_table ATTACH PARTITION pt_3 FOR VALUES FROM (1000) TO (2000); +\d+ partitioned_table; + Partitioned table "pg17.partitioned_table" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity | plain | | + c | integer | | | | plain | | +Partition key: RANGE (c) +Not-null constraints: + "partitioned_table_a_not_null" NOT NULL "a" +Partitions: pt_1 FOR VALUES FROM (1) TO (50), + pt_2 FOR VALUES FROM (50) TO (1000), + pt_3 FOR VALUES FROM (1000) TO (2000) + +\d pt_3; + Table "pg17.pt_3" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + c | integer | | | +Partition of: partitioned_table FOR VALUES FROM (1000) TO (2000) + +-- Partition pt_4 has its own identity column, which is not allowed in PG17 +-- and will produce an error on attempting to attach it to the partitioned table +CREATE TABLE pt_4 (a bigint GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10), c int); +ALTER TABLE partitioned_table ATTACH PARTITION pt_4 FOR VALUES FROM (2000) TO (3000); +ERROR: table "pt_4" being attached contains an identity column "a" +DETAIL: The new partition may not contain an identity column. +\c - - - :worker_1_port +SET search_path TO pg17; +-- Show that DDL for partitioned_table has correctly propagated to the worker node; +-- (1) The partitioned table has pt_1, pt_2 and pt_3 as its partitions +\d+ partitioned_table; + Partitioned table "pg17.partitioned_table" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity | plain | | + c | integer | | | | plain | | +Partition key: RANGE (c) +Not-null constraints: + "partitioned_table_a_not_null" NOT NULL "a" +Partitions: pt_1 FOR VALUES FROM (1) TO (50), + pt_2 FOR VALUES FROM (50) TO (1000), + pt_3 FOR VALUES FROM (1000) TO (2000) + +-- (2) The partititions have the same identity column as the parent table +\d pt_1; + Table "pg17.pt_1" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + c | integer | | | +Partition of: partitioned_table FOR VALUES FROM (1) TO (50) + +\d pt_2; + Table "pg17.pt_2" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + c | integer | | | +Partition of: partitioned_table FOR VALUES FROM (50) TO (1000) + +\d pt_3; + Table "pg17.pt_3" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + c | integer | | | +Partition of: partitioned_table FOR VALUES FROM (1000) TO (2000) + +\c - - - :master_port +SET search_path TO pg17; +-- Test detaching a partition with an identity column +ALTER TABLE partitioned_table DETACH PARTITION pt_3; +-- partitioned_table has pt_1, pt_2 as its partitions +-- and pt_3 does not have an identity column +\d+ partitioned_table; + Partitioned table "pg17.partitioned_table" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity | plain | | + c | integer | | | | plain | | +Partition key: RANGE (c) +Not-null constraints: + "partitioned_table_a_not_null" NOT NULL "a" +Partitions: pt_1 FOR VALUES FROM (1) TO (50), + pt_2 FOR VALUES FROM (50) TO (1000) + +\d pt_3; + Table "pg17.pt_3" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | + c | integer | | | + +-- Verify that the detach has propagated to the worker node +\c - - - :worker_1_port +SET search_path TO pg17; +\d+ partitioned_table; + Partitioned table "pg17.partitioned_table" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity | plain | | + c | integer | | | | plain | | +Partition key: RANGE (c) +Not-null constraints: + "partitioned_table_a_not_null" NOT NULL "a" +Partitions: pt_1 FOR VALUES FROM (1) TO (50), + pt_2 FOR VALUES FROM (50) TO (1000) + +\d pt_3; + Table "pg17.pt_3" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | + c | integer | | | + +\c - - - :master_port +SET search_path TO pg17; +CREATE TABLE alt_test (a int, b date, c int) PARTITION BY RANGE(c); +SELECT create_distributed_table('alt_test', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE alt_test_pt_1 PARTITION OF alt_test FOR VALUES FROM (1) TO (50); +CREATE TABLE alt_test_pt_2 PARTITION OF alt_test FOR VALUES FROM (50) TO (100); +-- Citus does not support adding an identity column for a distributed table (#6738) +-- Attempting to add a column with identity produces an error +ALTER TABLE alt_test ADD COLUMN d bigint GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10); +ERROR: cannot execute ADD COLUMN commands involving identity columns when metadata is synchronized to workers +-- alter table set identity is currently not supported, so adding identity to +-- an existing column generates an error +ALTER TABLE alt_test ALTER COLUMN a SET GENERATED BY DEFAULT SET INCREMENT BY 2 SET START WITH 75 RESTART; +ERROR: alter table command is currently unsupported +DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP|VALIDATE CONSTRAINT, SET (), RESET (), ENABLE|DISABLE|NO FORCE|FORCE ROW LEVEL SECURITY, ATTACH|DETACH PARTITION and TYPE subcommands are supported. +-- Verify that the identity column was not added, on coordinator and worker nodes +\d+ alt_test; + Partitioned table "pg17.alt_test" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------------------------------------------------------------------- + a | integer | | | | plain | | + b | date | | | | plain | | + c | integer | | | | plain | | +Partition key: RANGE (c) +Partitions: alt_test_pt_1 FOR VALUES FROM (1) TO (50), + alt_test_pt_2 FOR VALUES FROM (50) TO (100) + +\d alt_test_pt_1; + Table "pg17.alt_test_pt_1" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | integer | | | + b | date | | | + c | integer | | | +Partition of: alt_test FOR VALUES FROM (1) TO (50) + +\d alt_test_pt_2; + Table "pg17.alt_test_pt_2" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | integer | | | + b | date | | | + c | integer | | | +Partition of: alt_test FOR VALUES FROM (50) TO (100) + +\c - - - :worker_1_port +SET search_path TO pg17; +\d+ alt_test; + Partitioned table "pg17.alt_test" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------------------------------------------------------------------- + a | integer | | | | plain | | + b | date | | | | plain | | + c | integer | | | | plain | | +Partition key: RANGE (c) +Partitions: alt_test_pt_1 FOR VALUES FROM (1) TO (50), + alt_test_pt_2 FOR VALUES FROM (50) TO (100) + +\d alt_test_pt_1; + Table "pg17.alt_test_pt_1" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | integer | | | + b | date | | | + c | integer | | | +Partition of: alt_test FOR VALUES FROM (1) TO (50) + +\d alt_test_pt_2; + Table "pg17.alt_test_pt_2" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | integer | | | + b | date | | | + c | integer | | | +Partition of: alt_test FOR VALUES FROM (50) TO (100) + +\c - - - :master_port +SET search_path TO pg17; +DROP TABLE alt_test; +CREATE TABLE alt_test (a bigint GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10), + b int, + c int) +PARTITION BY RANGE(c); +SELECT create_distributed_table('alt_test', 'b'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE alt_test_pt_1 PARTITION OF alt_test FOR VALUES FROM (1) TO (50); +CREATE TABLE alt_test_pt_2 PARTITION OF alt_test FOR VALUES FROM (50) TO (100); +-- Dropping of the identity property from a column is currently not supported; +-- Attempting to drop identity produces an error +ALTER TABLE alt_test ALTER COLUMN a DROP IDENTITY; +ERROR: alter table command is currently unsupported +DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP|VALIDATE CONSTRAINT, SET (), RESET (), ENABLE|DISABLE|NO FORCE|FORCE ROW LEVEL SECURITY, ATTACH|DETACH PARTITION and TYPE subcommands are supported. +-- Verify that alt_test still has identity on column a +\d+ alt_test; + Partitioned table "pg17.alt_test" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity | plain | | + b | integer | | | | plain | | + c | integer | | | | plain | | +Partition key: RANGE (c) +Not-null constraints: + "alt_test_a_not_null" NOT NULL "a" +Partitions: alt_test_pt_1 FOR VALUES FROM (1) TO (50), + alt_test_pt_2 FOR VALUES FROM (50) TO (100) + +\d alt_test_pt_1; + Table "pg17.alt_test_pt_1" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + b | integer | | | + c | integer | | | +Partition of: alt_test FOR VALUES FROM (1) TO (50) + +\d alt_test_pt_2; + Table "pg17.alt_test_pt_2" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + b | integer | | | + c | integer | | | +Partition of: alt_test FOR VALUES FROM (50) TO (100) + +\c - - - :worker_1_port +SET search_path TO pg17; +\d+ alt_test; + Partitioned table "pg17.alt_test" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity | plain | | + b | integer | | | | plain | | + c | integer | | | | plain | | +Partition key: RANGE (c) +Not-null constraints: + "alt_test_a_not_null" NOT NULL "a" +Partitions: alt_test_pt_1 FOR VALUES FROM (1) TO (50), + alt_test_pt_2 FOR VALUES FROM (50) TO (100) + +\d alt_test_pt_1; + Table "pg17.alt_test_pt_1" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + b | integer | | | + c | integer | | | +Partition of: alt_test FOR VALUES FROM (1) TO (50) + +\d alt_test_pt_2 + Table "pg17.alt_test_pt_2" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + b | integer | | | + c | integer | | | +Partition of: alt_test FOR VALUES FROM (50) TO (100) + +\c - - - :master_port +SET search_path TO pg17; +-- Repeat testing of partitions with identity column on a citus local table +CREATE TABLE local_partitioned_table ( + a bigint GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10), + c int +) +PARTITION BY RANGE (c); +CREATE TABLE lpt_1 PARTITION OF local_partitioned_table FOR VALUES FROM (1) TO (50); +SELECT citus_add_local_table_to_metadata('local_partitioned_table'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +-- Can create tables as partitions and attach tables as partitions to a citus local table: +CREATE TABLE lpt_2 PARTITION OF local_partitioned_table FOR VALUES FROM (50) TO (1000); +CREATE TABLE lpt_3 (a bigint not null, c int); +ALTER TABLE local_partitioned_table ATTACH PARTITION lpt_3 FOR VALUES FROM (1000) TO (2000); +-- The partitions have the same identity column as the parent table, on coordinator and worker nodes +\d+ local_partitioned_table; + Partitioned table "pg17.local_partitioned_table" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity | plain | | + c | integer | | | | plain | | +Partition key: RANGE (c) +Not-null constraints: + "local_partitioned_table_a_not_null" NOT NULL "a" +Partitions: lpt_1 FOR VALUES FROM (1) TO (50), + lpt_2 FOR VALUES FROM (50) TO (1000), + lpt_3 FOR VALUES FROM (1000) TO (2000) + +\d lpt_1; + Table "pg17.lpt_1" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + c | integer | | | +Partition of: local_partitioned_table FOR VALUES FROM (1) TO (50) + +\d lpt_2; + Table "pg17.lpt_2" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + c | integer | | | +Partition of: local_partitioned_table FOR VALUES FROM (50) TO (1000) + +\d lpt_3; + Table "pg17.lpt_3" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + c | integer | | | +Partition of: local_partitioned_table FOR VALUES FROM (1000) TO (2000) + +\c - - - :worker_1_port +SET search_path TO pg17; +\d+ local_partitioned_table; + Partitioned table "pg17.local_partitioned_table" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity | plain | | + c | integer | | | | plain | | +Partition key: RANGE (c) +Not-null constraints: + "local_partitioned_table_a_not_null" NOT NULL "a" +Partitions: lpt_1 FOR VALUES FROM (1) TO (50), + lpt_2 FOR VALUES FROM (50) TO (1000), + lpt_3 FOR VALUES FROM (1000) TO (2000) + +\d lpt_1; + Table "pg17.lpt_1" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + c | integer | | | +Partition of: local_partitioned_table FOR VALUES FROM (1) TO (50) + +\d lpt_2; + Table "pg17.lpt_2" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + c | integer | | | +Partition of: local_partitioned_table FOR VALUES FROM (50) TO (1000) + +\d lpt_3; + Table "pg17.lpt_3" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity + c | integer | | | +Partition of: local_partitioned_table FOR VALUES FROM (1000) TO (2000) + +\c - - - :master_port +SET search_path TO pg17; +-- Test detaching a partition with an identity column from a citus local table +ALTER TABLE local_partitioned_table DETACH PARTITION lpt_3; +\d+ local_partitioned_table; + Partitioned table "pg17.local_partitioned_table" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity | plain | | + c | integer | | | | plain | | +Partition key: RANGE (c) +Not-null constraints: + "local_partitioned_table_a_not_null" NOT NULL "a" +Partitions: lpt_1 FOR VALUES FROM (1) TO (50), + lpt_2 FOR VALUES FROM (50) TO (1000) + +\d lpt_3; + Table "pg17.lpt_3" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | + c | integer | | | + +\c - - - :worker_1_port +SET search_path TO pg17; +\d+ local_partitioned_table; + Partitioned table "pg17.local_partitioned_table" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------------------------------------------------------------------- + a | bigint | | not null | generated by default as identity | plain | | + c | integer | | | | plain | | +Partition key: RANGE (c) +Not-null constraints: + "local_partitioned_table_a_not_null" NOT NULL "a" +Partitions: lpt_1 FOR VALUES FROM (1) TO (50), + lpt_2 FOR VALUES FROM (50) TO (1000) + +\d lpt_3; + Table "pg17.lpt_3" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | bigint | | not null | + c | integer | | | + +\c - - - :master_port +SET search_path TO pg17; +DROP TABLE partitioned_table; +DROP TABLE local_partitioned_table; +DROP TABLE lpt_3; +DROP TABLE pt_3; +DROP TABLE pt_4; +DROP TABLE alt_test; +-- End of partition with identity columns testing +-- Correlated sublinks are now supported as of PostgreSQL 17, resolving issue #4470. +-- Enable DEBUG-level logging to capture detailed execution plans +-- Create the tables +CREATE TABLE postgres_table (key int, value text, value_2 jsonb); +CREATE TABLE reference_table (key int, value text, value_2 jsonb); +SELECT create_reference_table('reference_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE distributed_table (key int, value text, value_2 jsonb); +SELECT create_distributed_table('distributed_table', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Insert test data +INSERT INTO postgres_table SELECT i, i::varchar(256), '{}'::jsonb FROM generate_series(1, 10) i; +INSERT INTO reference_table SELECT i, i::varchar(256), '{}'::jsonb FROM generate_series(1, 10) i; +INSERT INTO distributed_table SELECT i, i::varchar(256), '{}'::jsonb FROM generate_series(1, 10) i; +-- Set local table join policy to auto before running the tests +SET citus.local_table_join_policy TO 'auto'; +SET client_min_messages TO DEBUG1; +-- Correlated sublinks are supported in PostgreSQL 17 +SELECT COUNT(*) FROM distributed_table d1 JOIN postgres_table USING (key) +WHERE d1.key IN (SELECT key FROM distributed_table WHERE d1.key = key AND key = 5); +DEBUG: Wrapping relation "postgres_table" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT key FROM pg17.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (pg17.distributed_table d1 JOIN (SELECT postgres_table_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) postgres_table_1) postgres_table USING (key)) WHERE (d1.key OPERATOR(pg_catalog.=) ANY (SELECT distributed_table.key FROM pg17.distributed_table WHERE ((d1.key OPERATOR(pg_catalog.=) distributed_table.key) AND (distributed_table.key OPERATOR(pg_catalog.=) 5)))) + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT COUNT(*) FROM distributed_table d1 JOIN postgres_table USING (key) +WHERE d1.key IN (SELECT key FROM distributed_table WHERE d1.key = key AND key = 5); +DEBUG: Wrapping relation "postgres_table" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT key FROM pg17.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (pg17.distributed_table d1 JOIN (SELECT postgres_table_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) postgres_table_1) postgres_table USING (key)) WHERE (d1.key OPERATOR(pg_catalog.=) ANY (SELECT distributed_table.key FROM pg17.distributed_table WHERE ((d1.key OPERATOR(pg_catalog.=) distributed_table.key) AND (distributed_table.key OPERATOR(pg_catalog.=) 5)))) + count +--------------------------------------------------------------------- + 1 +(1 row) + +SET citus.local_table_join_policy TO 'prefer-distributed'; +SELECT COUNT(*) FROM distributed_table d1 JOIN postgres_table USING (key) +WHERE d1.key IN (SELECT key FROM distributed_table WHERE d1.key = key AND key = 5); +DEBUG: Wrapping relation "distributed_table" "d1" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT key FROM pg17.distributed_table d1 WHERE (key OPERATOR(pg_catalog.=) 5) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT d1_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) d1_1) d1 JOIN pg17.postgres_table USING (key)) WHERE (d1.key OPERATOR(pg_catalog.=) ANY (SELECT distributed_table.key FROM pg17.distributed_table WHERE ((d1.key OPERATOR(pg_catalog.=) distributed_table.key) AND (distributed_table.key OPERATOR(pg_catalog.=) 5)))) +ERROR: direct joins between distributed and local tables are not supported +HINT: Use CTE's or subqueries to select from local tables and use them in joins +RESET citus.local_table_join_policy; +RESET client_min_messages; +DROP TABLE reference_table; +-- End for Correlated sublinks are now supported as of PostgreSQL 17, resolving issue #4470. +-- Test for exclusion constraints on partitioned and distributed partitioned tables in Citus environment +-- Step 1: Create a distributed partitioned table +\c - - :master_host :master_port +SET search_path TO pg17; +CREATE TABLE distributed_partitioned_table ( + id serial NOT NULL, + partition_col int NOT NULL, + PRIMARY KEY (id, partition_col) +) PARTITION BY RANGE (partition_col); +-- Add partitions to the distributed partitioned table +CREATE TABLE distributed_partitioned_table_p1 PARTITION OF distributed_partitioned_table +FOR VALUES FROM (1) TO (100); +CREATE TABLE distributed_partitioned_table_p2 PARTITION OF distributed_partitioned_table +FOR VALUES FROM (100) TO (200); +-- Distribute the table +SELECT create_distributed_table('distributed_partitioned_table', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Step 2: Create a partitioned Citus local table +CREATE TABLE local_partitioned_table ( + id serial NOT NULL, + partition_col int NOT NULL, + PRIMARY KEY (id, partition_col) +) PARTITION BY RANGE (partition_col); +-- Add partitions to the local partitioned table +CREATE TABLE local_partitioned_table_p1 PARTITION OF local_partitioned_table +FOR VALUES FROM (1) TO (100); +CREATE TABLE local_partitioned_table_p2 PARTITION OF local_partitioned_table +FOR VALUES FROM (100) TO (200); +SELECT citus_add_local_table_to_metadata('local_partitioned_table'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +-- Verify the Citus tables +SELECT table_name, citus_table_type FROM pg_catalog.citus_tables +WHERE table_name::regclass::text LIKE '%_partitioned_table' ORDER BY 1; +ERROR: relation "pg_catalog.citus_tables" does not exist +-- Step 3: Add an exclusion constraint with a name to the distributed partitioned table +ALTER TABLE distributed_partitioned_table ADD CONSTRAINT dist_exclude_named EXCLUDE USING btree (id WITH =, partition_col WITH =); +-- Step 4: Verify propagation of exclusion constraint to worker nodes +\c - - :public_worker_1_host :worker_1_port +SET search_path TO pg17; +SELECT conname FROM pg_constraint WHERE conrelid = 'pg17.distributed_partitioned_table'::regclass AND conname = 'dist_exclude_named'; + conname +--------------------------------------------------------------------- + dist_exclude_named +(1 row) + +-- Step 5: Add an exclusion constraint with a name to the Citus local partitioned table +\c - - :master_host :master_port +SET search_path TO pg17; +ALTER TABLE local_partitioned_table ADD CONSTRAINT local_exclude_named EXCLUDE USING btree (partition_col WITH =); +-- Step 6: Verify the exclusion constraint on the local partitioned table +SELECT conname, contype FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x'; + conname | contype +--------------------------------------------------------------------- + local_exclude_named | x +(1 row) + +-- Step 7: Add exclusion constraints without names to both tables +ALTER TABLE distributed_partitioned_table ADD EXCLUDE USING btree (id WITH =, partition_col WITH =); +ALTER TABLE local_partitioned_table ADD EXCLUDE USING btree (partition_col WITH =); +-- Step 8: Verify the unnamed exclusion constraints were added +SELECT conname, contype FROM pg_constraint WHERE conrelid = 'local_partitioned_table'::regclass AND contype = 'x'; + conname | contype +--------------------------------------------------------------------- + local_exclude_named | x + local_partitioned_table_partition_col_excl | x +(2 rows) + +\c - - :public_worker_1_host :worker_1_port +SET search_path TO pg17; +SELECT conname, contype FROM pg_constraint WHERE conrelid = 'pg17.distributed_partitioned_table'::regclass AND contype = 'x'; + conname | contype +--------------------------------------------------------------------- + dist_exclude_named | x + distributed_partitioned_table_id_partition_col_excl | x +(2 rows) + +-- Step 9: Drop the exclusion constraints from both tables +\c - - :master_host :master_port +SET search_path TO pg17; +ALTER TABLE distributed_partitioned_table DROP CONSTRAINT dist_exclude_named; +ALTER TABLE local_partitioned_table DROP CONSTRAINT local_exclude_named; +-- Step 10: Verify the constraints were dropped +SELECT * FROM pg_constraint WHERE conname = 'dist_exclude_named' AND contype = 'x'; + oid | conname | connamespace | contype | condeferrable | condeferred | convalidated | conrelid | contypid | conindid | conparentid | confrelid | confupdtype | confdeltype | confmatchtype | conislocal | coninhcount | connoinherit | conkey | confkey | conpfeqop | conppeqop | conffeqop | confdelsetcols | conexclop | conbin +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x'; + oid | conname | connamespace | contype | condeferrable | condeferred | convalidated | conrelid | contypid | conindid | conparentid | confrelid | confupdtype | confdeltype | confmatchtype | conislocal | coninhcount | connoinherit | conkey | confkey | conpfeqop | conppeqop | conffeqop | confdelsetcols | conexclop | conbin +--------------------------------------------------------------------- +(0 rows) + +-- Step 11: Clean up - Drop the tables +DROP TABLE distributed_partitioned_table CASCADE; +DROP TABLE local_partitioned_table CASCADE; +-- End of Test for exclusion constraints on partitioned and distributed partitioned tables in Citus environment +-- Propagate SET STATISTICS DEFAULT +-- Relevant PG commit: +-- https://github.com/postgres/postgres/commit/4f622503d +SET citus.next_shard_id TO 25122024; +CREATE TABLE tbl (c1 int, c2 int); +SELECT citus_add_local_table_to_metadata('tbl'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +CREATE INDEX tbl_idx ON tbl (c1, (c1+0)) INCLUDE (c2); +-- Citus currently doesn't support ALTER TABLE ALTER COLUMN SET STATISTICS anyway +ALTER TABLE tbl ALTER COLUMN 1 SET STATISTICS 100; +ERROR: alter table command is currently unsupported +DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP|VALIDATE CONSTRAINT, SET (), RESET (), ENABLE|DISABLE|NO FORCE|FORCE ROW LEVEL SECURITY, ATTACH|DETACH PARTITION and TYPE subcommands are supported. +ALTER TABLE tbl ALTER COLUMN 1 SET STATISTICS DEFAULT; +ERROR: alter table command is currently unsupported +DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP|VALIDATE CONSTRAINT, SET (), RESET (), ENABLE|DISABLE|NO FORCE|FORCE ROW LEVEL SECURITY, ATTACH|DETACH PARTITION and TYPE subcommands are supported. +ALTER TABLE tbl ALTER COLUMN 1 SET STATISTICS -1; +ERROR: alter table command is currently unsupported +DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP|VALIDATE CONSTRAINT, SET (), RESET (), ENABLE|DISABLE|NO FORCE|FORCE ROW LEVEL SECURITY, ATTACH|DETACH PARTITION and TYPE subcommands are supported. +-- Citus propagates ALTER INDEX ALTER COLUMN SET STATISTICS DEFAULT to the nodes and shards +SET citus.log_remote_commands TO true; +SET citus.grep_remote_commands = '%STATISTICS%'; +ALTER INDEX tbl_idx ALTER COLUMN 2 SET STATISTICS 1000; +NOTICE: issuing ALTER INDEX tbl_idx ALTER COLUMN 2 SET STATISTICS 1000; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER INDEX tbl_idx ALTER COLUMN 2 SET STATISTICS 1000; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (25122024, 'pg17', 'ALTER INDEX tbl_idx ALTER COLUMN 2 SET STATISTICS 1000;') +\d+ tbl_idx + Index "pg17.tbl_idx" + Column | Type | Key? | Definition | Storage | Stats target +--------------------------------------------------------------------- + c1 | integer | yes | c1 | plain | + expr | integer | yes | (c1 + 0) | plain | 1000 + c2 | integer | no | c2 | plain | +btree, for table "pg17.tbl" + +\d+ tbl_idx_25122024 + Index "pg17.tbl_idx_25122024" + Column | Type | Key? | Definition | Storage | Stats target +--------------------------------------------------------------------- + c1 | integer | yes | c1 | plain | + expr | integer | yes | (c1 + 0) | plain | 1000 + c2 | integer | no | c2 | plain | +btree, for table "pg17.tbl_25122024" + +ALTER INDEX tbl_idx ALTER COLUMN 2 SET STATISTICS DEFAULT; +NOTICE: issuing ALTER INDEX tbl_idx ALTER COLUMN 2 SET STATISTICS DEFAULT; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER INDEX tbl_idx ALTER COLUMN 2 SET STATISTICS DEFAULT; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (25122024, 'pg17', 'ALTER INDEX tbl_idx ALTER COLUMN 2 SET STATISTICS DEFAULT;') +\d+ tbl_idx + Index "pg17.tbl_idx" + Column | Type | Key? | Definition | Storage | Stats target +--------------------------------------------------------------------- + c1 | integer | yes | c1 | plain | + expr | integer | yes | (c1 + 0) | plain | + c2 | integer | no | c2 | plain | +btree, for table "pg17.tbl" + +\d+ tbl_idx_25122024 + Index "pg17.tbl_idx_25122024" + Column | Type | Key? | Definition | Storage | Stats target +--------------------------------------------------------------------- + c1 | integer | yes | c1 | plain | + expr | integer | yes | (c1 + 0) | plain | + c2 | integer | no | c2 | plain | +btree, for table "pg17.tbl_25122024" + +ALTER INDEX tbl_idx ALTER COLUMN 2 SET STATISTICS -1; +NOTICE: issuing ALTER INDEX tbl_idx ALTER COLUMN 2 SET STATISTICS -1; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER INDEX tbl_idx ALTER COLUMN 2 SET STATISTICS -1; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (25122024, 'pg17', 'ALTER INDEX tbl_idx ALTER COLUMN 2 SET STATISTICS -1;') +\d+ tbl_idx + Index "pg17.tbl_idx" + Column | Type | Key? | Definition | Storage | Stats target +--------------------------------------------------------------------- + c1 | integer | yes | c1 | plain | + expr | integer | yes | (c1 + 0) | plain | + c2 | integer | no | c2 | plain | +btree, for table "pg17.tbl" + +\d+ tbl_idx_25122024 + Index "pg17.tbl_idx_25122024" + Column | Type | Key? | Definition | Storage | Stats target +--------------------------------------------------------------------- + c1 | integer | yes | c1 | plain | + expr | integer | yes | (c1 + 0) | plain | + c2 | integer | no | c2 | plain | +btree, for table "pg17.tbl_25122024" + +-- End of testing SET STATISTICS DEFAULT +-- COPY ON_ERROR option +-- Error out for Citus tables because we don't support it yet +-- Relevant PG17 commits: +-- https://github.com/postgres/postgres/commit/9e2d87011 +-- https://github.com/postgres/postgres/commit/b725b7eec +CREATE TABLE check_ign_err (n int, m int[], k int); +SELECT create_distributed_table('check_ign_err', 'n'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COPY check_ign_err FROM STDIN WITH (on_error stop); +ERROR: Citus does not support COPY FROM with ON_ERROR option. +COPY check_ign_err FROM STDIN WITH (ON_ERROR ignore); +ERROR: Citus does not support COPY FROM with ON_ERROR option. +COPY check_ign_err FROM STDIN WITH (on_error ignore, log_verbosity verbose); +ERROR: Citus does not support COPY FROM with ON_ERROR option. +COPY check_ign_err FROM STDIN WITH (log_verbosity verbose, on_error ignore); +ERROR: Citus does not support COPY FROM with ON_ERROR option. +COPY check_ign_err FROM STDIN WITH (log_verbosity verbose); +ERROR: Citus does not support COPY FROM with LOG_VERBOSITY option. +-- End of Test for COPY ON_ERROR option +-- Test FORCE_NOT_NULL and FORCE_NULL options +-- FORCE_NULL * and FORCE_NOT_NULL * options for COPY FROM were added in PG17 +-- Same tests as in PG copy2.sql, we just distribute the table first +-- Relevant PG17 commit: https://github.com/postgres/postgres/commit/f6d4c9cf1 +CREATE TABLE forcetest ( + a INT NOT NULL, + b TEXT NOT NULL, + c TEXT, + d TEXT, + e TEXT +); +\pset null NULL +SELECT create_distributed_table('forcetest', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- should succeed with no effect ("b" remains an empty string, "c" remains NULL) +-- expected output for inserted row in test: +-- b | c +--------------------------------------------------------------------- +-- | NULL +--(1 row) +BEGIN; +COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NOT_NULL(b), FORCE_NULL(c)); +COMMIT; +SELECT b, c FROM forcetest WHERE a = 1; + b | c +--------------------------------------------------------------------- + | NULL +(1 row) + +-- should succeed, FORCE_NULL and FORCE_NOT_NULL can be both specified +-- expected output for inserted row in test: +-- c | d +--------------------------------------------------------------------- +-- | NULL +--(1 row) +BEGIN; +COPY forcetest (a, b, c, d) FROM STDIN WITH (FORMAT csv, FORCE_NOT_NULL(c,d), FORCE_NULL(c,d)); +COMMIT; +SELECT c, d FROM forcetest WHERE a = 2; + c | d +--------------------------------------------------------------------- + | NULL +(1 row) + +-- should succeed with no effect ("b" remains an empty string, "c" remains NULL) +-- expected output for inserted row in test: +-- b | c +--------------------------------------------------------------------- +-- | NULL +--(1 row) +BEGIN; +COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NOT_NULL *, FORCE_NULL *); +COMMIT; +SELECT b, c FROM forcetest WHERE a = 4; + b | c +--------------------------------------------------------------------- + | NULL +(1 row) + +-- should succeed with effect ("b" remains an empty string) +-- expected output for inserted row in test: +-- b | c +--------------------------------------------------------------------- +-- | +--(1 row) +BEGIN; +COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NOT_NULL *); +COMMIT; +SELECT b, c FROM forcetest WHERE a = 5; + b | c +--------------------------------------------------------------------- + | +(1 row) + +-- should succeed with effect ("c" remains NULL) +-- expected output for inserted row in test: +-- b | c +--------------------------------------------------------------------- +-- b | NULL +--(1 row) +BEGIN; +COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NULL *); +COMMIT; +SELECT b, c FROM forcetest WHERE a = 6; + b | c +--------------------------------------------------------------------- + b | NULL +(1 row) + +\pset null '' +-- End of Testing FORCE_NOT_NULL and FORCE_NULL options +-- Test for ALTER TABLE SET ACCESS METHOD DEFAULT +-- Step 1: Local table setup (non-distributed) +CREATE TABLE test_local_table (id int); +SELECT citus_add_local_table_to_metadata('test_local_table'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +-- Step 2: Attempt to set access method to DEFAULT on a Citus local table (should fail) +ALTER TABLE test_local_table SET ACCESS METHOD DEFAULT; +ERROR: DEFAULT option in ALTER TABLE ... SET ACCESS METHOD is currently unsupported. +HINT: You can rerun the command by explicitly writing the access method name. +-- Step 3: Setup: create and distribute a table +CREATE TABLE test_alter_access_method (id int); +SELECT create_distributed_table('test_alter_access_method', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Step 4: Attempt to set access method to DEFAULT on a distributed table (should fail with your custom error) +ALTER TABLE test_alter_access_method SET ACCESS METHOD DEFAULT; +ERROR: DEFAULT option in ALTER TABLE ... SET ACCESS METHOD is currently unsupported. +HINT: You can rerun the command by explicitly writing the access method name. +-- Step 5: Create and distribute a partitioned table +CREATE TABLE test_partitioned_alter (id int, val text) PARTITION BY RANGE (id); +CREATE TABLE test_partitioned_alter_part1 PARTITION OF test_partitioned_alter FOR VALUES FROM (1) TO (100); +SELECT create_distributed_table('test_partitioned_alter', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Step 6: Attempt to set access method to DEFAULT on a partitioned, distributed table (should fail) +ALTER TABLE test_partitioned_alter SET ACCESS METHOD DEFAULT; +ERROR: DEFAULT option in ALTER TABLE ... SET ACCESS METHOD is currently unsupported. +HINT: You can rerun the command by explicitly writing the access method name. +-- Cleanup +DROP TABLE test_local_table CASCADE; +DROP TABLE test_alter_access_method CASCADE; +DROP TABLE test_partitioned_alter CASCADE; +-- End of Test for ALTER TABLE SET ACCESS METHOD DEFAULT +-- Test for ALTER TABLE ... ALTER COLUMN ... SET EXPRESSION +-- Step 1: Local table setup (non-distributed) +CREATE TABLE test_local_table_expr (id int, col int); +SELECT citus_add_local_table_to_metadata('test_local_table_expr'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +-- Step 2: Attempt to set expression on a Citus local table (should fail) +ALTER TABLE test_local_table_expr ALTER COLUMN col SET EXPRESSION AS (id * 4); +ERROR: ALTER TABLE ... ALTER COLUMN ... SET EXPRESSION commands are currently unsupported. +-- Step 3: Create and distribute a table +CREATE TABLE test_distributed_table_expr (id int, col int); +SELECT create_distributed_table('test_distributed_table_expr', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Step 4: Attempt to set expression on a distributed table (should fail) +ALTER TABLE test_distributed_table_expr ALTER COLUMN col SET EXPRESSION AS (id * 4); +ERROR: ALTER TABLE ... ALTER COLUMN ... SET EXPRESSION commands are currently unsupported. +-- Step 5: Create and distribute a partitioned table +CREATE TABLE test_partitioned_expr (id int, val text) PARTITION BY RANGE (id); +CREATE TABLE test_partitioned_expr_part1 PARTITION OF test_partitioned_expr + FOR VALUES FROM (1) TO (100); +SELECT create_distributed_table('test_partitioned_expr', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Step 6: Attempt to set expression on a partitioned, distributed table (should fail) +ALTER TABLE test_partitioned_expr ALTER COLUMN val SET EXPRESSION AS (id * 4); +ERROR: ALTER TABLE ... ALTER COLUMN ... SET EXPRESSION commands are currently unsupported. +-- Cleanup +DROP TABLE test_local_table_expr CASCADE; +DROP TABLE test_distributed_table_expr CASCADE; +DROP TABLE test_partitioned_expr CASCADE; +-- End of Test for ALTER TABLE ... ALTER COLUMN ... SET EXPRESSION +RESET citus.grep_remote_commands; +RESET citus.log_remote_commands; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 27122024; +-- PG17 has added support for AT LOCAL operator +-- it converts the given time type to +-- time stamp with the session's TimeZone value as time zone. +-- Here we add tests that validate that we can use AT LOCAL at INSERT commands +-- Relevant PG commit: +-- https://github.com/postgres/postgres/commit/97957fdba +CREATE TABLE test_at_local (id int, time_example timestamp with time zone); +SELECT create_distributed_table('test_at_local', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +BEGIN; +SET LOCAL TimeZone TO 'Europe/Tirane'; +SELECT timestamp '2001-02-16 20:38:40' AT LOCAL; + timezone +--------------------------------------------------------------------- + Fri Feb 16 20:38:40 2001 CET +(1 row) + +-- verify that we evaluate AT LOCAL at the coordinator and then perform the insert remotely +SET citus.log_remote_commands TO on; +INSERT INTO test_at_local VALUES (1, timestamp '2001-02-16 20:38:40' AT LOCAL); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing INSERT INTO pg17.test_at_local_27122024 (id, time_example) VALUES (1, 'Fri Feb 16 20:38:40 2001 CET'::timestamp with time zone) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +ROLLBACK; +NOTICE: issuing ROLLBACK +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +-- End of Testing AT LOCAL option +-- interval can have infinite values +-- Relevant PG17 commit: https://github.com/postgres/postgres/commit/519fc1bd9 +-- disallow those in create_time_partitions +-- test create_time_partitions with infinity values +CREATE TABLE date_partitioned_table( + measureid integer, + eventdate date, + measure_data jsonb) PARTITION BY RANGE(eventdate); +SELECT create_time_partitions('date_partitioned_table', INTERVAL 'infinity', '2022-01-01', '2021-01-01'); +ERROR: Partition interval must be a finite value +CONTEXT: PL/pgSQL function create_time_partitions(regclass,interval,timestamp with time zone,timestamp with time zone) line XX at RAISE +SELECT create_time_partitions('date_partitioned_table', INTERVAL '-infinity', '2022-01-01', '2021-01-01'); +ERROR: Partition interval must be a finite value +CONTEXT: PL/pgSQL function create_time_partitions(regclass,interval,timestamp with time zone,timestamp with time zone) line XX at RAISE +-- end of testing interval with infinite values +-- various jsonpath methods were added in PG17 +-- relevant PG commit: https://github.com/postgres/postgres/commit/66ea94e8e +-- here we add the same test as in pg15_jsonpath.sql for the new additions +CREATE TABLE jsonpath_test (id serial, sample text); +SELECT create_distributed_table('jsonpath_test', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +\COPY jsonpath_test(sample) FROM STDIN +-- Cast the text into jsonpath on the worker nodes. +SELECT sample, sample::jsonpath FROM jsonpath_test ORDER BY id; + sample | sample +--------------------------------------------------------------------- + $.bigint().integer().number().decimal() | $.bigint().integer().number().decimal() + $.boolean() | $.boolean() + $.date() | $.date() + $.decimal(4,2) | $.decimal(4,2) + $.string() | $.string() + $.time() | $.time() + $.time(6) | $.time(6) + $.time_tz() | $.time_tz() + $.time_tz(4) | $.time_tz(4) + $.timestamp() | $.timestamp() + $.timestamp(2) | $.timestamp(2) + $.timestamp_tz() | $.timestamp_tz() + $.timestamp_tz(0) | $.timestamp_tz(0) +(13 rows) + +-- Pull the data, and cast on the coordinator node +WITH samples as (SELECT id, sample FROM jsonpath_test OFFSET 0) +SELECT sample, sample::jsonpath FROM samples ORDER BY id; + sample | sample +--------------------------------------------------------------------- + $.bigint().integer().number().decimal() | $.bigint().integer().number().decimal() + $.boolean() | $.boolean() + $.date() | $.date() + $.decimal(4,2) | $.decimal(4,2) + $.string() | $.string() + $.time() | $.time() + $.time(6) | $.time(6) + $.time_tz() | $.time_tz() + $.time_tz(4) | $.time_tz(4) + $.timestamp() | $.timestamp() + $.timestamp(2) | $.timestamp(2) + $.timestamp_tz() | $.timestamp_tz() + $.timestamp_tz(0) | $.timestamp_tz(0) +(13 rows) + +-- End of testing jsonpath methods +-- xmltext() function added in PG17, test with columnar and distributed table +-- Relevant PG17 commit: https://github.com/postgres/postgres/commit/526fe0d79 +CREATE TABLE test_xml (id int, a xml) USING columnar; +-- expected to insert x<P>73</P>0.42truej +INSERT INTO test_xml VALUES (1, xmltext('x'|| '

73

'::xml || .42 || true || 'j'::char)); +SELECT * FROM test_xml ORDER BY 1; + id | a +--------------------------------------------------------------------- + 1 | x<P>73</P>0.42truej +(1 row) + +SELECT create_distributed_table('test_xml', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17.test_xml$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- expected to insert foo & <"bar"> +INSERT INTO test_xml VALUES (2, xmltext('foo & <"bar">')); +SELECT * FROM test_xml ORDER BY 1; + id | a +--------------------------------------------------------------------- + 1 | x<P>73</P>0.42truej + 2 | foo & <"bar"> +(2 rows) + +-- end of xmltest() testing with Citus +-- +-- random(min, max) to generate random numbers in a specified range +-- adding here the same tests as the ones with random() in aggregate_support.sql +-- Relevant PG commit: https://github.com/postgres/postgres/commit/e6341323a +-- +CREATE TABLE dist_table (dist_col int, agg_col numeric); +SELECT create_distributed_table('dist_table', 'dist_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE ref_table (int_col int); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- Test the cases where the worker agg exec. returns no tuples. +SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col) +FROM (SELECT *, random(0, 1) FROM dist_table) a; + percentile_disc +--------------------------------------------------------------------- + +(1 row) + +SELECT PERCENTILE_DISC((2 > random(0, 1))::int::numeric / 10) + WITHIN GROUP (ORDER BY agg_col) +FROM dist_table +LEFT JOIN ref_table ON TRUE; + percentile_disc +--------------------------------------------------------------------- + +(1 row) + +-- run the same queries after loading some data +INSERT INTO dist_table VALUES (2, 11.2), (3, NULL), (6, 3.22), (3, 4.23), (5, 5.25), + (4, 63.4), (75, NULL), (80, NULL), (96, NULL), (8, 1078), (0, 1.19); +SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col) +FROM (SELECT *, random(0, 1) FROM dist_table) a; + percentile_disc +--------------------------------------------------------------------- + 3.22 +(1 row) + +SELECT PERCENTILE_DISC((2 > random_normal(0, 1))::int::numeric / 10) + WITHIN GROUP (ORDER BY agg_col) +FROM dist_table +LEFT JOIN ref_table ON TRUE; + percentile_disc +--------------------------------------------------------------------- + 1.19 +(1 row) + +-- End of random(min, max) testing with Citus +-- Test: Access Method Behavior for Partitioned Tables +-- This test verifies the ability to specify and modify table access methods for partitioned tables +-- using CREATE TABLE ... USING and ALTER TABLE ... SET ACCESS METHOD, including distributed tables. +-- Step 1: Create a partitioned table with a specified access method +CREATE TABLE test_partitioned_alter (id INT PRIMARY KEY, value TEXT) +PARTITION BY RANGE (id) +USING heap; +-- Step 2: Create partitions for the partitioned table +CREATE TABLE test_partition_1 PARTITION OF test_partitioned_alter + FOR VALUES FROM (0) TO (100); +CREATE TABLE test_partition_2 PARTITION OF test_partitioned_alter + FOR VALUES FROM (100) TO (200); +-- Step 3: Distribute the partitioned table +SELECT create_distributed_table('test_partitioned_alter', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Step 4: Verify that the table and partitions are created and distributed correctly on the coordinator +SELECT relname, relam +FROM pg_class +WHERE relname = 'test_partitioned_alter'; + relname | relam +--------------------------------------------------------------------- + test_partitioned_alter | 2 +(1 row) + +SELECT relname, relam +FROM pg_class +WHERE relname IN ('test_partition_1', 'test_partition_2') +ORDER BY relname; + relname | relam +--------------------------------------------------------------------- + test_partition_1 | 2 + test_partition_2 | 2 +(2 rows) + +-- Step 4 (Repeat on a Worker Node): Verify that the table and partitions are created correctly +\c - - - :worker_1_port +SET search_path TO pg17; +-- Verify the table's access method on the worker node +SELECT relname, relam +FROM pg_class +WHERE relname = 'test_partitioned_alter'; + relname | relam +--------------------------------------------------------------------- + test_partitioned_alter | 2 +(1 row) + +-- Verify the partitions' access methods on the worker node +SELECT relname, relam +FROM pg_class +WHERE relname IN ('test_partition_1', 'test_partition_2') +ORDER BY relname; + relname | relam +--------------------------------------------------------------------- + test_partition_1 | 2 + test_partition_2 | 2 +(2 rows) + +\c - - - :master_port +SET search_path TO pg17; +-- Step 5: Test ALTER TABLE ... SET ACCESS METHOD to a different method +ALTER TABLE test_partitioned_alter SET ACCESS METHOD columnar; +-- Verify the access method in the distributed parent and existing partitions +-- Note: Specifying an access method for a partitioned table lets the value be used for all +-- future partitions created under it, closely mirroring the behavior of the TABLESPACE +-- option for partitioned tables. Existing partitions are not modified. +-- Reference: https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=374c7a2290429eac3217b0c7b0b485db9c2bcc72 +-- Verify the parent table's access method +SELECT relname, relam +FROM pg_class +WHERE relname = 'test_partitioned_alter'; + relname | relam +--------------------------------------------------------------------- + test_partitioned_alter | 16413 +(1 row) + +-- Verify the partitions' access methods +SELECT relname, relam +FROM pg_class +WHERE relname IN ('test_partition_1', 'test_partition_2') +ORDER BY relname; + relname | relam +--------------------------------------------------------------------- + test_partition_1 | 2 + test_partition_2 | 2 +(2 rows) + +-- Step 6: Verify the change is applied to future partitions +CREATE TABLE test_partition_3 PARTITION OF test_partitioned_alter + FOR VALUES FROM (200) TO (300); +SELECT relname, relam +FROM pg_class +WHERE relname = 'test_partition_3'; + relname | relam +--------------------------------------------------------------------- + test_partition_3 | 16413 +(1 row) + +-- Step 6 (Repeat on a Worker Node): Verify that the new partition is created correctly +\c - - - :worker_1_port +SET search_path TO pg17; +-- Verify the new partition's access method on the worker node +SELECT relname, relam +FROM pg_class +WHERE relname = 'test_partition_3'; + relname | relam +--------------------------------------------------------------------- + test_partition_3 | 16413 +(1 row) + +\c - - - :master_port +SET search_path TO pg17; +-- Clean up +DROP TABLE test_partitioned_alter CASCADE; +-- End of Test: Access Method Behavior for Partitioned Tables +-- Test for REINDEX support in event triggers for Citus-related objects +-- Create a test table with a distributed setup +CREATE TABLE reindex_test (id SERIAL PRIMARY KEY, data TEXT); +SELECT create_distributed_table('reindex_test', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Create an index to test REINDEX functionality +CREATE INDEX reindex_test_data_idx ON reindex_test (data); +-- Create event triggers to capture REINDEX events (start and end) +CREATE OR REPLACE FUNCTION log_reindex_events() RETURNS event_trigger LANGUAGE plpgsql AS $$ +DECLARE + command_tag TEXT; + command_object JSONB; +BEGIN + command_tag := tg_tag; + command_object := jsonb_build_object( + 'object_type', tg_event, + 'command_tag', command_tag, + 'query', current_query() + ); + RAISE NOTICE 'Event Trigger Log: %', command_object::TEXT; +END; +$$; +CREATE EVENT TRIGGER reindex_event_trigger + ON ddl_command_start + WHEN TAG IN ('REINDEX') +EXECUTE FUNCTION log_reindex_events(); +CREATE EVENT TRIGGER reindex_event_trigger_end + ON ddl_command_end + WHEN TAG IN ('REINDEX') +EXECUTE FUNCTION log_reindex_events(); +-- Insert some data to create index bloat +INSERT INTO reindex_test (data) +SELECT 'value_' || g.i +FROM generate_series(1, 10000) g(i); +-- Perform REINDEX TABLE ... CONCURRENTLY and verify event trigger logs +REINDEX TABLE CONCURRENTLY reindex_test; +NOTICE: Event Trigger Log: {"query": "REINDEX TABLE CONCURRENTLY reindex_test;", "command_tag": "REINDEX", "object_type": "ddl_command_start"} +CONTEXT: PL/pgSQL function log_reindex_events() line XX at RAISE +NOTICE: Event Trigger Log: {"query": "REINDEX TABLE CONCURRENTLY reindex_test;", "command_tag": "REINDEX", "object_type": "ddl_command_end"} +CONTEXT: PL/pgSQL function log_reindex_events() line XX at RAISE +-- Perform REINDEX INDEX ... CONCURRENTLY and verify event trigger logs +REINDEX INDEX CONCURRENTLY reindex_test_data_idx; +NOTICE: Event Trigger Log: {"query": "REINDEX INDEX CONCURRENTLY reindex_test_data_idx;", "command_tag": "REINDEX", "object_type": "ddl_command_start"} +CONTEXT: PL/pgSQL function log_reindex_events() line XX at RAISE +NOTICE: Event Trigger Log: {"query": "REINDEX INDEX CONCURRENTLY reindex_test_data_idx;", "command_tag": "REINDEX", "object_type": "ddl_command_end"} +CONTEXT: PL/pgSQL function log_reindex_events() line XX at RAISE +-- Cleanup +DROP EVENT TRIGGER reindex_event_trigger; +DROP EVENT TRIGGER reindex_event_trigger_end; +DROP TABLE reindex_test CASCADE; +-- End of test for REINDEX support in event triggers for Citus-related objects +-- Propagate EXPLAIN MEMORY +-- Relevant PG commit: https://github.com/postgres/postgres/commit/5de890e36 +-- Propagate EXPLAIN SERIALIZE +-- Relevant PG commit: https://github.com/postgres/postgres/commit/06286709e +SET citus.next_shard_id TO 12242024; +CREATE TABLE int8_tbl(q1 int8, q2 int8); +SELECT create_distributed_table('int8_tbl', 'q1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO int8_tbl VALUES + (' 123 ',' 456'), + ('123 ','4567890123456789'), + ('4567890123456789','123'), + (+4567890123456789,'4567890123456789'), + ('+4567890123456789','-4567890123456789'); +-- memory tests, same as postgres tests, we just distributed the table +-- we can see the memory used separately per each task in worker nodes +SET citus.log_remote_commands TO true; +-- for explain analyze, we run worker_save_query_explain_analyze query +-- for regular explain, we run EXPLAIN query +-- therefore let's grep the commands based on the shard id +SET citus.grep_remote_commands TO '%12242024%'; +select public.explain_filter('explain (memory) select * from int8_tbl i8'); +NOTICE: issuing EXPLAIN (ANALYZE FALSE, VERBOSE FALSE, COSTS TRUE, BUFFERS FALSE, WAL FALSE, TIMING FALSE, SUMMARY FALSE, MEMORY TRUE, SERIALIZE none, FORMAT TEXT) SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) + Task Count: N + Tasks Shown: One of N + -> Task + Node: host=localhost port=N dbname=regression + -> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) + Planning: + Memory: used=NkB allocated=NkB + Memory: used=NkB allocated=NkB +(9 rows) + +select public.explain_filter('explain (memory, analyze) select * from int8_tbl i8'); +NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": false, "wal": false, "memory": true, "serialize": "none", "timing": true, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Task Count: N + Tuple data received from nodes: N bytes + Tasks Shown: One of N + -> Task + Tuple data received from node: N bytes + Node: host=localhost port=N dbname=regression + -> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Planning: + Memory: used=NkB allocated=NkB + Planning Time: N.N ms + Execution Time: N.N ms + Memory: used=NkB allocated=NkB + Planning Time: N.N ms + Execution Time: N.N ms +(15 rows) + +select public.explain_filter('explain (memory, summary, format yaml) select * from int8_tbl i8'); +NOTICE: issuing EXPLAIN (ANALYZE FALSE, VERBOSE FALSE, COSTS TRUE, BUFFERS FALSE, WAL FALSE, TIMING FALSE, SUMMARY TRUE, MEMORY TRUE, SERIALIZE none, FORMAT YAML) SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + - Plan: + + Node Type: "Custom Scan" + + Custom Plan Provider: "Citus Adaptive" + + Parallel Aware: false + + Async Capable: false + + Startup Cost: N.N + + Total Cost: N.N + + Plan Rows: N + + Plan Width: N + + Distributed Query: + + Job: + + Task Count: N + + Tasks Shown: "One of N" + + Tasks: + + - Node: "host=localhost port=N dbname=regression"+ + Remote Plan: + + - Plan: + + Node Type: "Seq Scan" + + Parallel Aware: false + + Async Capable: false + + Relation Name: "int8_tbl_12242024" + + Alias: "i8" + + Startup Cost: N.N + + Total Cost: N.N + + Plan Rows: N + + Plan Width: N + + Planning: + + Memory Used: N + + Memory Allocated: N + + Planning Time: N.N + + + + Planning: + + Memory Used: N + + Memory Allocated: N + + Planning Time: N.N +(1 row) + +select public.explain_filter('explain (memory, analyze, format json) select * from int8_tbl i8'); +NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": false, "wal": false, "memory": true, "serialize": "none", "timing": true, "summary": true, "format": "JSON"}') AS (field_0 bigint, field_1 bigint) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + [ + + { + + "Plan": { + + "Node Type": "Custom Scan", + + "Custom Plan Provider": "Citus Adaptive", + + "Parallel Aware": false, + + "Async Capable": false, + + "Startup Cost": N.N, + + "Total Cost": N.N, + + "Plan Rows": N, + + "Plan Width": N, + + "Actual Startup Time": N.N, + + "Actual Total Time": N.N, + + "Actual Rows": N, + + "Actual Loops": N, + + "Distributed Query": { + + "Job": { + + "Task Count": N, + + "Tuple data received from nodes": "N bytes", + + "Tasks Shown": "One of N", + + "Tasks": [ + + { + + "Tuple data received from node": "N bytes", + + "Node": "host=localhost port=N dbname=regression",+ + "Remote Plan": [ + + [ + + { + + "Plan": { + + "Node Type": "Seq Scan", + + "Parallel Aware": false, + + "Async Capable": false, + + "Relation Name": "int8_tbl_12242024", + + "Alias": "i8", + + "Startup Cost": N.N, + + "Total Cost": N.N, + + "Plan Rows": N, + + "Plan Width": N, + + "Actual Startup Time": N.N, + + "Actual Total Time": N.N, + + "Actual Rows": N, + + "Actual Loops": N + + }, + + "Planning": { + + "Memory Used": N, + + "Memory Allocated": N + + }, + + "Planning Time": N.N, + + "Triggers": [ + + ], + + "Execution Time": N.N + + } + + ] + + + + ] + + } + + ] + + } + + } + + }, + + "Planning": { + + "Memory Used": N, + + "Memory Allocated": N + + }, + + "Planning Time": N.N, + + "Triggers": [ + + ], + + "Execution Time": N.N + + } + + ] +(1 row) + +prepare int8_query as select * from int8_tbl i8; +select public.explain_filter('explain (memory) execute int8_query'); +NOTICE: issuing EXPLAIN (ANALYZE FALSE, VERBOSE FALSE, COSTS TRUE, BUFFERS FALSE, WAL FALSE, TIMING FALSE, SUMMARY FALSE, MEMORY TRUE, SERIALIZE none, FORMAT TEXT) SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) + Task Count: N + Tasks Shown: One of N + -> Task + Node: host=localhost port=N dbname=regression + -> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) + Planning: + Memory: used=NkB allocated=NkB + Memory: used=NkB allocated=NkB +(9 rows) + +-- serialize tests, same as postgres tests, we just distributed the table +select public.explain_filter('explain (analyze, serialize, buffers, format yaml) select * from int8_tbl i8'); +NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": true, "wal": false, "memory": false, "serialize": "text", "timing": true, "summary": true, "format": "YAML"}') AS (field_0 bigint, field_1 bigint) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + - Plan: + + Node Type: "Custom Scan" + + Custom Plan Provider: "Citus Adaptive" + + Parallel Aware: false + + Async Capable: false + + Startup Cost: N.N + + Total Cost: N.N + + Plan Rows: N + + Plan Width: N + + Actual Startup Time: N.N + + Actual Total Time: N.N + + Actual Rows: N + + Actual Loops: N + + Distributed Query: + + Job: + + Task Count: N + + Tuple data received from nodes: "N bytes" + + Tasks Shown: "One of N" + + Tasks: + + - Tuple data received from node: "N bytes" + + Node: "host=localhost port=N dbname=regression"+ + Remote Plan: + + - Plan: + + Node Type: "Seq Scan" + + Parallel Aware: false + + Async Capable: false + + Relation Name: "int8_tbl_12242024" + + Alias: "i8" + + Startup Cost: N.N + + Total Cost: N.N + + Plan Rows: N + + Plan Width: N + + Actual Startup Time: N.N + + Actual Total Time: N.N + + Actual Rows: N + + Actual Loops: N + + Shared Hit Blocks: N + + Shared Read Blocks: N + + Shared Dirtied Blocks: N + + Shared Written Blocks: N + + Local Hit Blocks: N + + Local Read Blocks: N + + Local Dirtied Blocks: N + + Local Written Blocks: N + + Temp Read Blocks: N + + Temp Written Blocks: N + + Planning: + + Shared Hit Blocks: N + + Shared Read Blocks: N + + Shared Dirtied Blocks: N + + Shared Written Blocks: N + + Local Hit Blocks: N + + Local Read Blocks: N + + Local Dirtied Blocks: N + + Local Written Blocks: N + + Temp Read Blocks: N + + Temp Written Blocks: N + + Planning Time: N.N + + Triggers: + + Serialization: + + Time: N.N + + Output Volume: N + + Format: "text" + + Shared Hit Blocks: N + + Shared Read Blocks: N + + Shared Dirtied Blocks: N + + Shared Written Blocks: N + + Local Hit Blocks: N + + Local Read Blocks: N + + Local Dirtied Blocks: N + + Local Written Blocks: N + + Temp Read Blocks: N + + Temp Written Blocks: N + + Execution Time: N.N + + + + Shared Hit Blocks: N + + Shared Read Blocks: N + + Shared Dirtied Blocks: N + + Shared Written Blocks: N + + Local Hit Blocks: N + + Local Read Blocks: N + + Local Dirtied Blocks: N + + Local Written Blocks: N + + Temp Read Blocks: N + + Temp Written Blocks: N + + Planning: + + Shared Hit Blocks: N + + Shared Read Blocks: N + + Shared Dirtied Blocks: N + + Shared Written Blocks: N + + Local Hit Blocks: N + + Local Read Blocks: N + + Local Dirtied Blocks: N + + Local Written Blocks: N + + Temp Read Blocks: N + + Temp Written Blocks: N + + Planning Time: N.N + + Triggers: + + Serialization: + + Time: N.N + + Output Volume: N + + Format: "text" + + Shared Hit Blocks: N + + Shared Read Blocks: N + + Shared Dirtied Blocks: N + + Shared Written Blocks: N + + Local Hit Blocks: N + + Local Read Blocks: N + + Local Dirtied Blocks: N + + Local Written Blocks: N + + Temp Read Blocks: N + + Temp Written Blocks: N + + Execution Time: N.N +(1 row) + +select public.explain_filter('explain (analyze,serialize) select * from int8_tbl i8'); +NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": false, "wal": false, "memory": false, "serialize": "text", "timing": true, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Task Count: N + Tuple data received from nodes: N bytes + Tasks Shown: One of N + -> Task + Tuple data received from node: N bytes + Node: host=localhost port=N dbname=regression + -> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Planning Time: N.N ms + Serialization: time=N.N ms output=NkB format=text + Execution Time: N.N ms + Planning Time: N.N ms + Serialization: time=N.N ms output=NkB format=text + Execution Time: N.N ms +(14 rows) + +select public.explain_filter('explain (analyze,serialize text,buffers,timing off) select * from int8_tbl i8'); +NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": true, "wal": false, "memory": false, "serialize": "text", "timing": false, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual rows=N loops=N) + Task Count: N + Tuple data received from nodes: N bytes + Tasks Shown: One of N + -> Task + Tuple data received from node: N bytes + Node: host=localhost port=N dbname=regression + -> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual rows=N loops=N) + Planning Time: N.N ms + Serialization: output=NkB format=text + Execution Time: N.N ms + Planning Time: N.N ms + Serialization: output=NkB format=text + Execution Time: N.N ms +(14 rows) + +select public.explain_filter('explain (analyze,serialize binary,buffers,timing) select * from int8_tbl i8'); +NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": true, "wal": false, "memory": false, "serialize": "binary", "timing": true, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Task Count: N + Tuple data received from nodes: N bytes + Tasks Shown: One of N + -> Task + Tuple data received from node: N bytes + Node: host=localhost port=N dbname=regression + -> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Planning Time: N.N ms + Serialization: time=N.N ms output=NkB format=binary + Execution Time: N.N ms + Planning Time: N.N ms + Serialization: time=N.N ms output=NkB format=binary + Execution Time: N.N ms +(14 rows) + +-- this tests an edge case where we have no data to return +select public.explain_filter('explain (analyze,serialize) create temp table explain_temp as select * from int8_tbl i8'); +NOTICE: issuing SELECT * FROM worker_save_query_explain_analyze('SELECT q1, q2 FROM pg17.int8_tbl_12242024 i8 WHERE true', '{"verbose": false, "costs": true, "buffers": false, "wal": false, "memory": false, "serialize": "text", "timing": true, "summary": true, "format": "TEXT"}') AS (field_0 bigint, field_1 bigint) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXECUTE statement + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Task Count: N + Tuple data received from nodes: N bytes + Tasks Shown: One of N + -> Task + Tuple data received from node: N bytes + Node: host=localhost port=N dbname=regression + -> Seq Scan on int8_tbl_12242024 i8 (cost=N.N..N.N rows=N width=N) (actual time=N.N..N.N rows=N loops=N) + Planning Time: N.N ms + Serialization: time=N.N ms output=NkB format=text + Execution Time: N.N ms + Planning Time: N.N ms + Serialization: time=N.N ms output=NkB format=text + Execution Time: N.N ms +(14 rows) + +RESET citus.log_remote_commands; +-- End of EXPLAIN MEMORY SERIALIZE tests +-- Add support for MERGE ... WHEN NOT MATCHED BY SOURCE. +-- Relevant PG commit: +-- https://github.com/postgres/postgres/commit/0294df2f1 +SET citus.next_shard_id TO 1072025; +-- Regular Postgres tables +CREATE TABLE postgres_target_1 (tid integer, balance float, val text); +CREATE TABLE postgres_target_2 (tid integer, balance float, val text); +CREATE TABLE postgres_source (sid integer, delta float); +INSERT INTO postgres_target_1 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO postgres_target_2 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO postgres_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- Citus local tables +CREATE TABLE citus_local_target (tid integer, balance float, val text); +CREATE TABLE citus_local_source (sid integer, delta float); +SELECT citus_add_local_table_to_metadata('citus_local_target'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('citus_local_source'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO citus_local_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO citus_local_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- Citus distributed tables +CREATE TABLE citus_distributed_target (tid integer, balance float, val text); +CREATE TABLE citus_distributed_source (sid integer, delta float); +SELECT create_distributed_table('citus_distributed_target', 'tid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('citus_distributed_source', 'sid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO citus_distributed_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO citus_distributed_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- Citus reference tables +CREATE TABLE citus_reference_target (tid integer, balance float, val text); +CREATE TABLE citus_reference_source (sid integer, delta float); +SELECT create_reference_table('citus_reference_target'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('citus_reference_source'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO citus_reference_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO citus_reference_source SELECT id, id * 10 FROM generate_series(1,14) AS id; +-- Try all combinations of tables with two queries: +-- 1: Simple Merge +-- 2: Merge with a constant qual +-- Run the merge queries with the postgres tables +-- to save the expected output +-- try simple MERGE +MERGE INTO postgres_target_1 t + USING postgres_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT * FROM postgres_target_1 ORDER BY tid, val; + tid | balance | val +--------------------------------------------------------------------- + 1 | 110 | initial updated by merge + 2 | 20 | inserted by merge + 3 | 330 | initial updated by merge + 4 | 40 | inserted by merge + 5 | 550 | initial updated by merge + 6 | 60 | inserted by merge + 7 | 770 | initial updated by merge + 8 | 80 | inserted by merge + 9 | 990 | initial updated by merge + 10 | 100 | inserted by merge + 11 | 1210 | initial updated by merge + 12 | 120 | inserted by merge + 13 | 1430 | initial updated by merge + 14 | 140 | inserted by merge + 15 | 1500 | initial not matched by source +(15 rows) + +-- same with a constant qual +MERGE INTO postgres_target_2 t + USING postgres_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT * FROM postgres_target_2 ORDER BY tid, val; + tid | balance | val +--------------------------------------------------------------------- + 1 | 110 | initial updated by merge + 2 | 20 | inserted by merge + 3 | 300 | initial not matched by source + 3 | 30 | inserted by merge + 4 | 40 | inserted by merge + 5 | 500 | initial not matched by source + 5 | 50 | inserted by merge + 6 | 60 | inserted by merge + 7 | 700 | initial not matched by source + 7 | 70 | inserted by merge + 8 | 80 | inserted by merge + 9 | 900 | initial not matched by source + 9 | 90 | inserted by merge + 10 | 100 | inserted by merge + 11 | 1100 | initial not matched by source + 11 | 110 | inserted by merge + 12 | 120 | inserted by merge + 13 | 1300 | initial not matched by source + 13 | 130 | inserted by merge + 14 | 140 | inserted by merge + 15 | 1500 | initial not matched by source +(21 rows) + +-- function to compare the output from Citus tables +-- with the expected output from Postgres tables +CREATE OR REPLACE FUNCTION compare_tables(table1 TEXT, table2 TEXT) RETURNS BOOLEAN AS $$ +DECLARE ret BOOL; +BEGIN +EXECUTE 'select count(*) = 0 from (( + SELECT * FROM ' || table1 || + ' EXCEPT + SELECT * FROM ' || table2 || ' ) + UNION ALL ( + SELECT * FROM ' || table2 || + ' EXCEPT + SELECT * FROM ' || table1 || ' ))' INTO ret; +RETURN ret; +END +$$ LANGUAGE PLPGSQL; +-- Local-Local +-- Let's also print the command here +-- try simple MERGE +BEGIN; +SET citus.log_local_commands TO on; +MERGE INTO citus_local_target t + USING citus_local_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_local_target', 'postgres_target_1'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO citus_local_target t + USING citus_local_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED BY TARGET THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_local_target', 'postgres_target_2'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Local-Reference +-- try simple MERGE +BEGIN; +MERGE INTO citus_local_target t + USING citus_reference_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED BY TARGET THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_local_target', 'postgres_target_1'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO citus_local_target t + USING citus_reference_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_local_target', 'postgres_target_2'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Local-Distributed - Merge currently not supported, Feature in development. +-- try simple MERGE +MERGE INTO citus_local_target t + USING citus_distributed_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +ERROR: MERGE involving repartition of rows is supported only if the target is distributed +-- Distributed-Local +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_local_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_1'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_local_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED BY TARGET THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_2'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Distributed-Distributed +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_distributed_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_1'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_distributed_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_2'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Distributed-Reference +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_reference_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_1'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target t + USING citus_reference_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target', 'postgres_target_2'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Reference-N/A - Reference table as target is not allowed in Merge +-- try simple MERGE +MERGE INTO citus_reference_target t + USING citus_distributed_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +ERROR: Reference table as target is not allowed in MERGE command +-- Test Distributed-reference and distributed-local when the source table has fewer rows +-- than distributed target; this tests that MERGE with NOT MATCHED BY SOURCE needs to run +-- on all shards of the distributed target, regardless of whether or not the reshuffled +-- source table has data in the corresponding shard. +-- Re-populate the Postgres tables; +DELETE FROM postgres_source; +DELETE FROM postgres_target_1; +DELETE FROM postgres_target_2; +-- This time, the source table has fewer rows +INSERT INTO postgres_target_1 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO postgres_target_2 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO postgres_source SELECT id, id * 10 FROM generate_series(1,4) AS id; +-- try simple MERGE +MERGE INTO postgres_target_1 t + USING postgres_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT * FROM postgres_target_1 ORDER BY tid, val; + tid | balance | val +--------------------------------------------------------------------- + 1 | 110 | initial updated by merge + 2 | 20 | inserted by merge + 3 | 330 | initial updated by merge + 4 | 40 | inserted by merge + 5 | 500 | initial not matched by source + 7 | 700 | initial not matched by source + 9 | 900 | initial not matched by source + 11 | 1100 | initial not matched by source + 13 | 1300 | initial not matched by source + 15 | 1500 | initial not matched by source +(10 rows) + +-- same with a constant qual +MERGE INTO postgres_target_2 t + USING postgres_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT * FROM postgres_target_2 ORDER BY tid, val; + tid | balance | val +--------------------------------------------------------------------- + 1 | 110 | initial updated by merge + 2 | 20 | inserted by merge + 3 | 300 | initial not matched by source + 3 | 30 | inserted by merge + 4 | 40 | inserted by merge + 5 | 500 | initial not matched by source + 7 | 700 | initial not matched by source + 9 | 900 | initial not matched by source + 11 | 1100 | initial not matched by source + 13 | 1300 | initial not matched by source + 15 | 1500 | initial not matched by source +(11 rows) + +-- Re-populate the Citus tables; this time, the source table has fewer rows +DELETE FROM citus_local_source; +DELETE FROM citus_reference_source; +INSERT INTO citus_reference_source SELECT id, id * 10 FROM generate_series(1,4) AS id; +INSERT INTO citus_local_source SELECT id, id * 10 FROM generate_series(1,4) AS id; +SET citus.shard_count to 32; +CREATE TABLE citus_distributed_target32 (tid integer, balance float, val text); +SELECT create_distributed_table('citus_distributed_target32', 'tid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO citus_distributed_target32 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +-- Distributed-Local +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target32 t + USING citus_local_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target32', 'postgres_target_1'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target32 t + USING citus_local_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED BY TARGET THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target32', 'postgres_target_2'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Distributed-Reference +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target32 t + USING citus_reference_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target32', 'postgres_target_1'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target32 t + USING citus_reference_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target32', 'postgres_target_2'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Test that MERGE with NOT MATCHED BY SOURCE runs on all shards of +-- a distributed table when the source is a repartition query with +-- rows that do not match the distributed target +set citus.shard_count = 32; +CREATE TABLE dist_target (tid integer, balance float); +CREATE TABLE dist_src1(sid integer, tid integer, val float); +CREATE TABLE dist_src2(sid integer); +CREATE TABLE dist_ref(sid integer); +INSERT INTO dist_target SELECT id, 0 FROM generate_series(1,9,2) AS id; +INSERT INTO dist_src1 SELECT id, id%3 + 1, id*10 FROM generate_series(1,15) AS id; +INSERT INTO dist_src2 SELECT id FROM generate_series(1,100) AS id; +INSERT INTO dist_ref SELECT id FROM generate_series(1,10) AS id; +-- Run a MERGE command with dist_target as target and an aggregating query +-- as source; note that at this point all tables are vanilla Postgres tables +BEGIN; +SELECT * FROM dist_target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 0 + 3 | 0 + 5 | 0 + 7 | 0 + 9 | 0 +(5 rows) + +MERGE INTO dist_target t +USING (SELECT dt.tid, avg(dt.val) as av, min(dt.val) as m, max(dt.val) as x + FROM dist_src1 dt INNER JOIN dist_src2 dt2 on dt.sid=dt2.sid + INNER JOIN dist_ref dr ON dt.sid=dr.sid + GROUP BY dt.tid) dv ON (t.tid=dv.tid) +WHEN MATCHED THEN + UPDATE SET balance = dv.av +WHEN NOT MATCHED THEN + INSERT (tid, balance) VALUES (dv.tid, dv.m) +WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET balance = 99.95; +SELECT * FROM dist_target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 60 + 2 | 10 + 3 | 50 + 5 | 99.95 + 7 | 99.95 + 9 | 99.95 +(6 rows) + +ROLLBACK; +-- Distribute the tables +SELECT create_distributed_table('dist_target', 'tid'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17.dist_target$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_src1', 'sid'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17.dist_src1$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_src2', 'sid'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17.dist_src2$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('dist_ref'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17.dist_ref$$) + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- Re-run the merge; the target is now distributed and the source is a +-- distributed query that is repartitioned. +BEGIN; +SELECT * FROM dist_target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 0 + 3 | 0 + 5 | 0 + 7 | 0 + 9 | 0 +(5 rows) + +MERGE INTO dist_target t +USING (SELECT dt.tid, avg(dt.val) as av, min(dt.val) as m, max(dt.val) as x + FROM dist_src1 dt INNER JOIN dist_src2 dt2 on dt.sid=dt2.sid + INNER JOIN dist_ref dr ON dt.sid=dr.sid + GROUP BY dt.tid) dv ON (t.tid=dv.tid) +WHEN MATCHED THEN + UPDATE SET balance = dv.av +WHEN NOT MATCHED THEN + INSERT (tid, balance) VALUES (dv.tid, dv.m) +WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET balance = 99.95; +-- Data in dist_target is as it was with vanilla Postgres tables: +SELECT * FROM dist_target ORDER BY tid; + tid | balance +--------------------------------------------------------------------- + 1 | 60 + 2 | 10 + 3 | 50 + 5 | 99.95 + 7 | 99.95 + 9 | 99.95 +(6 rows) + +ROLLBACK; +-- Reset shard_count for the DEBUG output in the following test +SET citus.shard_count to 4; +-- Complex repartition query example with a mix of tables +-- Example from blog post +-- https://www.citusdata.com/blog/2023/07/27/how-citus-12-supports-postgres-merge +-- Contains information about the machines in the manufacturing facility +CREATE TABLE machines ( + machine_id NUMERIC PRIMARY KEY, + machine_name VARCHAR(100), + location VARCHAR(50), + status VARCHAR(20) +); +SELECT create_reference_table('machines'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- Holds data on the various sensors installed on each machine +CREATE TABLE sensors ( + sensor_id NUMERIC PRIMARY KEY, + sensor_name VARCHAR(100), + machine_id NUMERIC, + sensor_type VARCHAR(50) +); +SELECT create_distributed_table('sensors', 'sensor_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Stores real-time readings from the sensors +CREATE TABLE sensor_readings ( + reading_id NUMERIC , + sensor_id NUMERIC, + reading_value NUMERIC, + reading_timestamp TIMESTAMP +); +SELECT create_distributed_table('sensor_readings', 'sensor_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Holds real-time sensor readings for machines on 'Production Floor 1' +CREATE TABLE real_sensor_readings ( + real_reading_id NUMERIC , + sensor_id NUMERIC, + reading_value NUMERIC, + reading_timestamp TIMESTAMP +); +SELECT create_distributed_table('real_sensor_readings', 'sensor_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Insert data into the machines table +INSERT INTO machines (machine_id, machine_name, location, status) +VALUES + (1, 'Machine A', 'Production Floor 1', 'Active'), + (2, 'Machine B', 'Production Floor 2', 'Active'), + (3, 'Machine C', 'Production Floor 1', 'Inactive'); +-- Insert data into the sensors table +INSERT INTO sensors (sensor_id, sensor_name, machine_id, sensor_type) +VALUES + (1, 'Temperature Sensor 1', 1, 'Temperature'), + (2, 'Pressure Sensor 1', 1, 'Pressure'), + (3, 'Temperature Sensor 2', 2, 'Temperature'), + (4, 'Vibration Sensor 1', 3, 'Vibration'); +-- Insert data into the real_sensor_readings table +INSERT INTO real_sensor_readings (real_reading_id, sensor_id, reading_value, reading_timestamp) +VALUES + (1, 1, 35.6, TIMESTAMP '2023-07-20 10:15:00'), + (2, 1, 36.8, TIMESTAMP '2023-07-20 10:30:00'), + (3, 2, 100.5, TIMESTAMP '2023-07-20 10:15:00'), + (4, 2, 101.2, TIMESTAMP '2023-07-20 10:30:00'), + (5, 3, 36.2, TIMESTAMP '2023-07-20 10:15:00'), + (6, 3, 36.5, TIMESTAMP '2023-07-20 10:30:00'), + (7, 4, 0.02, TIMESTAMP '2023-07-20 10:15:00'), + (8, 4, 0.03, TIMESTAMP '2023-07-20 10:30:00'); +-- Insert DUMMY data to use for WHEN NOT MATCHED BY SOURCE +INSERT INTO sensor_readings VALUES (0, 0, 0, TIMESTAMP '2023-07-20 10:15:00'); +SET client_min_messages TO DEBUG1; +-- Complex merge query which needs repartitioning +MERGE INTO sensor_readings SR +USING (SELECT +rsr.sensor_id, +AVG(rsr.reading_value) AS average_reading, +MAX(rsr.reading_timestamp) AS last_reading_timestamp, +MAX(rsr.real_reading_id) AS rid +FROM sensors s +INNER JOIN machines m ON s.machine_id = m.machine_id +INNER JOIN real_sensor_readings rsr ON s.sensor_id = rsr.sensor_id +WHERE m.location = 'Production Floor 1' +GROUP BY rsr.sensor_id +) NEW_READINGS +ON (SR.sensor_id = NEW_READINGS.sensor_id) +-- Existing reading, update it +WHEN MATCHED THEN +UPDATE SET reading_value = NEW_READINGS.average_reading, reading_timestamp = NEW_READINGS.last_reading_timestamp +-- New reading, record it +WHEN NOT MATCHED BY TARGET THEN +INSERT (reading_id, sensor_id, reading_value, reading_timestamp) +VALUES (NEW_READINGS.rid, NEW_READINGS.sensor_id, +NEW_READINGS.average_reading, NEW_READINGS.last_reading_timestamp) +-- Target has dummy entry not matched by source +-- dummy move change reading_value to 100 to notice the change +WHEN NOT MATCHED BY SOURCE THEN +UPDATE SET reading_value = 100; +DEBUG: A mix of distributed and reference table, try repartitioning +DEBUG: A mix of distributed and reference table, routable query is not possible +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:0 from the source list to redistribute +DEBUG: Executing subplans of the source query and storing the results at the respective node(s) +DEBUG: Redistributing source result rows across nodes +DEBUG: Executing final MERGE on workers using intermediate results +DEBUG: +DEBUG: +DEBUG: +DEBUG: +RESET client_min_messages; +-- Expected output is: +-- reading_id | sensor_id | reading_value | reading_timestamp +-- ------------+-----------+------------------------+--------------------- +-- 0 | 0 | 100 | 2023-07-20 10:15:00 +-- 2 | 1 | 36.2000000000000000 | 2023-07-20 10:30:00 +-- 4 | 2 | 100.8500000000000000 | 2023-07-20 10:30:00 +-- 8 | 4 | 0.02500000000000000000 | 2023-07-20 10:30:00 +SELECT * FROM sensor_readings ORDER BY 1; + reading_id | sensor_id | reading_value | reading_timestamp +--------------------------------------------------------------------- + 0 | 0 | 100 | Thu Jul 20 10:15:00 2023 + 2 | 1 | 36.2000000000000000 | Thu Jul 20 10:30:00 2023 + 4 | 2 | 100.8500000000000000 | Thu Jul 20 10:30:00 2023 + 8 | 4 | 0.02500000000000000000 | Thu Jul 20 10:30:00 2023 +(4 rows) + +-- End of MERGE ... WHEN NOT MATCHED BY SOURCE tests +-- Issue #7846: Test crash scenarios with MERGE on non-distributed and distributed tables +-- Step 1: Connect to a worker node to verify shard visibility +\c postgresql://postgres@localhost::worker_1_port/regression?application_name=psql +SET search_path TO pg17; +-- Step 2: Create and test a non-distributed table +CREATE TABLE non_dist_table_12345 (id INTEGER); +-- Test MERGE on the non-distributed table +MERGE INTO non_dist_table_12345 AS target_0 +USING pg_catalog.pg_class AS ref_0 +ON target_0.id = ref_0.relpages +WHEN NOT MATCHED THEN DO NOTHING; +-- Step 3: Switch back to the coordinator for distributed table operations +\c postgresql://postgres@localhost::master_port/regression?application_name=psql +SET search_path TO pg17; +-- Step 4: Create and test a distributed table +CREATE TABLE dist_table_67890 (id INTEGER); +SELECT create_distributed_table('dist_table_67890', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Test MERGE on the distributed table +MERGE INTO dist_table_67890 AS target_0 +USING pg_catalog.pg_class AS ref_0 +ON target_0.id = ref_0.relpages +WHEN NOT MATCHED THEN DO NOTHING; +ERROR: MERGE INTO an distributed table from Postgres table is not yet supported +\c - - - :master_port +-- End of Issue #7846 +\set VERBOSITY terse +SET client_min_messages TO WARNING; +DROP SCHEMA pg17 CASCADE; +\set VERBOSITY default +RESET client_min_messages; +DROP ROLE regress_maintain; +DROP ROLE regress_no_maintain;