-- -- Tests sequential and parallel DDL command execution -- in combination with 1PC and 2PC -- Note: this test should not be executed in parallel with -- other tests since we're relying on disabling 2PC recovery -- -- We're also setting force_max_query_parallelization throughout -- the tests because the test file has the assumption that -- each placement is accessed with a seperate connection SET citus.force_max_query_parallelization TO on; CREATE SCHEMA test_seq_ddl; SET search_path TO 'test_seq_ddl'; SET citus.next_shard_id TO 16000; SET citus.next_placement_id TO 16000; -- this function simply checks the equality of the number of transactions in the -- pg_dist_transaction and number of primary worker nodes -- The function is useful to ensure that a single connection is opened per worker -- in a distributed transaction CREATE OR REPLACE FUNCTION distributed_2PCs_are_equal_to_worker_count() RETURNS bool AS $$ DECLARE result bool; BEGIN SELECT tx_count = worker_count FROM (SELECT count(*) as tx_count FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%') as s1, (SELECT count(*) as worker_count FROM pg_dist_node WHERE noderole = 'primary' AND groupid <> 0 ) as s2 INTO result; RETURN result; END; $$ LANGUAGE 'plpgsql' IMMUTABLE; -- this function simply checks the equality of the number of transactions in the -- pg_dist_transaction and number of shard placements for a distributed table -- The function is useful to ensure that a single connection is opened per -- shard placement in a distributed transaction CREATE OR REPLACE FUNCTION distributed_2PCs_are_equal_to_placement_count() RETURNS bool AS $$ DECLARE result bool; BEGIN SELECT count(*) = current_setting('citus.shard_count')::bigint * current_setting('citus.shard_replication_factor')::bigint FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%' INTO result; RETURN result; END; $$ LANGUAGE 'plpgsql' IMMUTABLE; -- this function simply checks existence of distributed transcations in -- pg_dist_transaction CREATE OR REPLACE FUNCTION no_distributed_2PCs() RETURNS bool AS $$ DECLARE result bool; BEGIN SELECT tx_count = 0 FROM (SELECT count(*) as tx_count FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%') as s1 INTO result; RETURN result; END; $$ LANGUAGE 'plpgsql' IMMUTABLE; CREATE OR REPLACE FUNCTION set_local_multi_shard_modify_mode_to_sequential() RETURNS void LANGUAGE C STABLE STRICT AS 'citus', $$set_local_multi_shard_modify_mode_to_sequential$$; -- disbable 2PC recovery since our tests will check that ALTER SYSTEM SET citus.recover_2pc_interval TO -1; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) CREATE TABLE test_table(a int, b int); SELECT create_distributed_table('test_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) -- not useful if not in transaction SELECT set_local_multi_shard_modify_mode_to_sequential(); set_local_multi_shard_modify_mode_to_sequential --------------------------------------------------------------------- (1 row) -- we should see #worker transactions -- when sequential mode is used SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) ALTER TABLE test_table ADD CONSTRAINT a_check CHECK(a > 0); SELECT distributed_2PCs_are_equal_to_worker_count(); distributed_2pcs_are_equal_to_worker_count --------------------------------------------------------------------- t (1 row) -- we should see placement count # transactions -- when parallel mode is used SET citus.multi_shard_modify_mode TO 'parallel'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) ALTER TABLE test_table ADD CONSTRAINT b_check CHECK(b > 0); SELECT distributed_2PCs_are_equal_to_placement_count(); distributed_2pcs_are_equal_to_placement_count --------------------------------------------------------------------- t (1 row) -- even if 1PC used, we use 2PC as we modify replicated tables -- see distributed TXs in the pg_dist_transaction SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) ALTER TABLE test_table ADD CONSTRAINT c_check CHECK(a > 0); SELECT no_distributed_2PCs(); no_distributed_2pcs --------------------------------------------------------------------- f (1 row) SET citus.multi_shard_modify_mode TO 'parallel'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) ALTER TABLE test_table ADD CONSTRAINT d_check CHECK(a > 0); SELECT no_distributed_2PCs(); no_distributed_2pcs --------------------------------------------------------------------- f (1 row) CREATE TABLE ref_test(a int); SELECT create_reference_table('ref_test'); create_reference_table --------------------------------------------------------------------- (1 row) -- reference tables should always use 2PC SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) CREATE INDEX ref_test_seq_index ON ref_test(a); SELECT distributed_2PCs_are_equal_to_worker_count(); distributed_2pcs_are_equal_to_worker_count --------------------------------------------------------------------- t (1 row) -- reference tables should always use 2PC SET citus.multi_shard_modify_mode TO 'parallel'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) CREATE INDEX ref_test_seq_index_2 ON ref_test(a); SELECT distributed_2PCs_are_equal_to_worker_count(); distributed_2pcs_are_equal_to_worker_count --------------------------------------------------------------------- t (1 row) -- tables with replication factor > 1 should also obey -- multi_shard_modify_mode SET citus.shard_replication_factor TO 2; CREATE TABLE test_table_rep_2 (a int); SELECT create_distributed_table('test_table_rep_2', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) -- even if 1PC used, we use 2PC with rep > 1 SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) CREATE INDEX test_table_rep_2_i_1 ON test_table_rep_2(a); SELECT no_distributed_2PCs(); no_distributed_2pcs --------------------------------------------------------------------- f (1 row) SET citus.multi_shard_modify_mode TO 'parallel'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) CREATE INDEX test_table_rep_2_i_2 ON test_table_rep_2(a); SELECT no_distributed_2PCs(); no_distributed_2pcs --------------------------------------------------------------------- f (1 row) -- 2PC should always use 2PC with rep > 1 SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) CREATE INDEX test_table_rep_2_i_3 ON test_table_rep_2(a); SELECT distributed_2PCs_are_equal_to_worker_count(); distributed_2pcs_are_equal_to_worker_count --------------------------------------------------------------------- t (1 row) SET citus.multi_shard_modify_mode TO 'parallel'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) CREATE INDEX test_table_rep_2_i_4 ON test_table_rep_2(a); SELECT distributed_2PCs_are_equal_to_placement_count(); distributed_2pcs_are_equal_to_placement_count --------------------------------------------------------------------- t (1 row) -- CREATE INDEX CONCURRENTLY should work fine with rep > 1 -- with both 2PC and different parallel modes SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) CREATE INDEX CONCURRENTLY test_table_rep_2_i_5 ON test_table_rep_2(a); -- we shouldn't see any distributed transactions SELECT no_distributed_2PCs(); no_distributed_2pcs --------------------------------------------------------------------- t (1 row) SET citus.multi_shard_modify_mode TO 'parallel'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) CREATE INDEX CONCURRENTLY test_table_rep_2_i_6 ON test_table_rep_2(a); -- we shouldn't see any distributed transactions SELECT no_distributed_2PCs(); no_distributed_2pcs --------------------------------------------------------------------- t (1 row) -- test TRUNCATE on sequential and parallel modes CREATE TABLE test_seq_truncate (a int); INSERT INTO test_seq_truncate SELECT i FROM generate_series(0, 100) i; SELECT create_distributed_table('test_seq_truncate', 'a'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_seq_ddl.test_seq_truncate$$) create_distributed_table --------------------------------------------------------------------- (1 row) -- with parallel modification mode, we should see #shards records SET citus.multi_shard_modify_mode TO 'parallel'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) TRUNCATE test_seq_truncate; SELECT distributed_2PCs_are_equal_to_placement_count(); distributed_2pcs_are_equal_to_placement_count --------------------------------------------------------------------- t (1 row) -- with sequential modification mode, we should see #primary worker records SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) TRUNCATE test_seq_truncate; SELECT distributed_2PCs_are_equal_to_worker_count(); distributed_2pcs_are_equal_to_worker_count --------------------------------------------------------------------- t (1 row) -- truncate with rep > 1 should work both in parallel and seq. modes CREATE TABLE test_seq_truncate_rep_2 (a int); SET citus.shard_replication_factor TO 2; SELECT create_distributed_table('test_seq_truncate_rep_2', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i; SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) TRUNCATE test_seq_truncate_rep_2; SELECT distributed_2PCs_are_equal_to_worker_count(); distributed_2pcs_are_equal_to_worker_count --------------------------------------------------------------------- t (1 row) SET citus.multi_shard_modify_mode TO 'parallel'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) TRUNCATE test_seq_truncate_rep_2; SELECT distributed_2PCs_are_equal_to_placement_count(); distributed_2pcs_are_equal_to_placement_count --------------------------------------------------------------------- t (1 row) CREATE TABLE multi_shard_modify_test ( t_key integer not null, t_name varchar(25) not null, t_value integer not null); SELECT create_distributed_table('multi_shard_modify_test', 't_key'); create_distributed_table --------------------------------------------------------------------- (1 row) -- with parallel modification mode, we should see #shards records SET citus.multi_shard_modify_mode TO 'parallel'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) DELETE FROM multi_shard_modify_test; SELECT distributed_2PCs_are_equal_to_placement_count(); distributed_2pcs_are_equal_to_placement_count --------------------------------------------------------------------- t (1 row) -- with sequential modification mode, we should see #primary worker records SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) DELETE FROM multi_shard_modify_test; SELECT distributed_2PCs_are_equal_to_worker_count(); distributed_2pcs_are_equal_to_worker_count --------------------------------------------------------------------- t (1 row) -- one more realistic test with sequential inserts and truncate in the same tx INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i; BEGIN; INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4); -- now switch to sequential mode to enable a successful TRUNCATE SELECT set_local_multi_shard_modify_mode_to_sequential(); set_local_multi_shard_modify_mode_to_sequential --------------------------------------------------------------------- (1 row) TRUNCATE multi_shard_modify_test; COMMIT; -- see that all the data successfully removed SELECT count(*) FROM multi_shard_modify_test; count --------------------------------------------------------------------- 0 (1 row) -- test INSERT ... SELECT queries -- with sequential modification mode, we should see #primary worker records SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; SELECT distributed_2PCs_are_equal_to_worker_count(); distributed_2pcs_are_equal_to_worker_count --------------------------------------------------------------------- t (1 row) SET citus.multi_shard_modify_mode TO 'parallel'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; SELECT distributed_2PCs_are_equal_to_placement_count(); distributed_2pcs_are_equal_to_placement_count --------------------------------------------------------------------- t (1 row) -- one more realistic test with sequential inserts and INSERT .. SELECT in the same tx INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i; BEGIN; INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4); -- now switch to sequential mode to enable a successful INSERT .. SELECT SELECT set_local_multi_shard_modify_mode_to_sequential(); set_local_multi_shard_modify_mode_to_sequential --------------------------------------------------------------------- (1 row) INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; COMMIT; -- see that all the data successfully inserted SELECT count(*) FROM multi_shard_modify_test; count --------------------------------------------------------------------- 210 (1 row) ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; SET citus.shard_replication_factor TO DEFAULT; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) -- The following tests are added to test if create_distributed_table honors sequential mode SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) -- Check if multi_shard_update works properly after create_distributed_table in sequential mode CREATE TABLE test_seq_multi_shard_update(a int, b int); BEGIN; SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; SELECT create_distributed_table('test_seq_multi_shard_update', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO test_seq_multi_shard_update VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0); DELETE FROM test_seq_multi_shard_update WHERE b < 2; COMMIT; SELECT distributed_2PCs_are_equal_to_worker_count(); distributed_2pcs_are_equal_to_worker_count --------------------------------------------------------------------- t (1 row) DROP TABLE test_seq_multi_shard_update; -- Check if truncate works properly after create_distributed_table in sequential mode SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) CREATE TABLE test_seq_truncate_after_create(a int, b int); BEGIN; SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; SELECT create_distributed_table('test_seq_truncate_after_create', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO test_seq_truncate_after_create VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0); TRUNCATE test_seq_truncate_after_create; COMMIT; SELECT distributed_2PCs_are_equal_to_worker_count(); distributed_2pcs_are_equal_to_worker_count --------------------------------------------------------------------- t (1 row) DROP TABLE test_seq_truncate_after_create; -- Check if drop table works properly after create_distributed_table in sequential mode SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) CREATE TABLE test_seq_drop_table(a int, b int); BEGIN; SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; SELECT create_distributed_table('test_seq_drop_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) DROP TABLE test_seq_drop_table; COMMIT; SELECT distributed_2PCs_are_equal_to_worker_count(); distributed_2pcs_are_equal_to_worker_count --------------------------------------------------------------------- t (1 row) -- Check if copy errors out properly after create_distributed_table in sequential mode SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) CREATE TABLE test_seq_copy(a int, b int); BEGIN; SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; SELECT create_distributed_table('test_seq_copy', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) \COPY test_seq_copy FROM STDIN DELIMITER AS ','; ROLLBACK; SELECT distributed_2PCs_are_equal_to_worker_count(); distributed_2pcs_are_equal_to_worker_count --------------------------------------------------------------------- f (1 row) DROP TABLE test_seq_copy; -- Check if DDL + CREATE INDEX works properly after create_distributed_table in sequential mode SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) CREATE TABLE test_seq_ddl_index(a int, b int); BEGIN; SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; SELECT create_distributed_table('test_seq_ddl_index', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO test_seq_ddl_index VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0); ALTER TABLE test_seq_ddl_index ADD COLUMN c int; CREATE INDEX idx ON test_seq_ddl_index(c); COMMIT; SELECT distributed_2PCs_are_equal_to_worker_count(); distributed_2pcs_are_equal_to_worker_count --------------------------------------------------------------------- t (1 row) DROP TABLE test_seq_ddl_index; -- create_distributed_table should works on relations with data in sequential mode in and out transaction block CREATE TABLE test_create_seq_table (a int); INSERT INTO test_create_seq_table VALUES (1); SET citus.multi_shard_modify_mode TO 'sequential'; SELECT create_distributed_table('test_create_seq_table' ,'a'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_seq_ddl.test_create_seq_table$$) create_distributed_table --------------------------------------------------------------------- (1 row) SELECT undistribute_table('test_create_seq_table'); NOTICE: creating a new table for test_seq_ddl.test_create_seq_table NOTICE: moving the data of test_seq_ddl.test_create_seq_table NOTICE: dropping the old test_seq_ddl.test_create_seq_table NOTICE: renaming the new table to test_seq_ddl.test_create_seq_table undistribute_table --------------------------------------------------------------------- (1 row) RESET citus.multi_shard_modify_mode; BEGIN; SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; select create_distributed_table('test_create_seq_table' ,'a'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_seq_ddl.test_create_seq_table$$) create_distributed_table --------------------------------------------------------------------- (1 row) ROLLBACK; -- trigger switch-over when using single connection per worker BEGIN; SET citus.next_shard_id TO 16900; SET LOCAL citus.shard_count TO 4; SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; CREATE UNLOGGED TABLE trigger_switchover(a int, b int, c int, d int, e int, f int, g int, h int); INSERT INTO trigger_switchover SELECT s AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,250000) s; SELECT create_distributed_table('trigger_switchover','a'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_seq_ddl.trigger_switchover$$) create_distributed_table --------------------------------------------------------------------- (1 row) ABORT; SET search_path TO 'public'; DROP SCHEMA test_seq_ddl CASCADE; NOTICE: drop cascades to 12 other objects DETAIL: drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_worker_count() drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_placement_count() drop cascades to function test_seq_ddl.no_distributed_2pcs() drop cascades to function test_seq_ddl.set_local_multi_shard_modify_mode_to_sequential() drop cascades to table test_seq_ddl.test_table drop cascades to table test_seq_ddl.ref_test drop cascades to table test_seq_ddl.ref_test_16004 drop cascades to table test_seq_ddl.test_table_rep_2 drop cascades to table test_seq_ddl.test_seq_truncate drop cascades to table test_seq_ddl.test_seq_truncate_rep_2 drop cascades to table test_seq_ddl.multi_shard_modify_test drop cascades to table test_seq_ddl.test_create_seq_table