SET citus.next_shard_id TO 1610000; CREATE SCHEMA multi_real_time_transaction; SET search_path = 'multi_real_time_transaction'; SET citus.shard_replication_factor to 1; CREATE TABLE test_table(id int, col_1 int, col_2 text); SELECT create_distributed_table('test_table','id'); create_distributed_table -------------------------- (1 row) \COPY test_table FROM stdin delimiter ','; CREATE TABLE co_test_table(id int, col_1 int, col_2 text); SELECT create_distributed_table('co_test_table','id'); create_distributed_table -------------------------- (1 row) \COPY co_test_table FROM stdin delimiter ','; CREATE TABLE ref_test_table(id int, col_1 int, col_2 text); SELECT create_reference_table('ref_test_table'); create_reference_table ------------------------ (1 row) \COPY ref_test_table FROM stdin delimiter ','; -- Test with select and router insert BEGIN; SELECT COUNT(*) FROM test_table; count ------- 6 (1 row) INSERT INTO test_table VALUES(7,8,'gg'); SELECT COUNT(*) FROM test_table; count ------- 7 (1 row) ROLLBACK; -- Test with select and multi-row insert BEGIN; SELECT COUNT(*) FROM test_table; count ------- 6 (1 row) INSERT INTO test_table VALUES (7,8,'gg'),(8,9,'hh'),(9,10,'ii'); SELECT COUNT(*) FROM test_table; count ------- 9 (1 row) ROLLBACK; -- Test with INSERT .. SELECT BEGIN; SELECT COUNT(*) FROM test_table; count ------- 6 (1 row) INSERT INTO test_table SELECT * FROM co_test_table; SELECT COUNT(*) FROM test_table; count ------- 12 (1 row) ROLLBACK; -- Test with COPY BEGIN; SELECT COUNT(*) FROM test_table; count ------- 6 (1 row) \COPY test_table FROM stdin delimiter ','; SELECT COUNT(*) FROM test_table; count ------- 9 (1 row) ROLLBACK; -- Test with router update BEGIN; SELECT SUM(col_1) FROM test_table; sum ----- 27 (1 row) UPDATE test_table SET col_1 = 0 WHERE id = 2; DELETE FROM test_table WHERE id = 3; SELECT SUM(col_1) FROM test_table; sum ----- 20 (1 row) ROLLBACK; -- Test with multi-shard update BEGIN; SELECT SUM(col_1) FROM test_table; sum ----- 27 (1 row) UPDATE test_table SET col_1 = 5; SELECT SUM(col_1) FROM test_table; sum ----- 30 (1 row) ROLLBACK; -- Test with subqueries BEGIN; SELECT SUM(col_1) FROM test_table; sum ----- 27 (1 row) UPDATE test_table SET col_1 = 4 WHERE test_table.col_1 IN (SELECT co_test_table.col_1 FROM co_test_table WHERE co_test_table.id = 1) AND test_table.id = 1; SELECT SUM(col_1) FROM test_table; sum ----- 29 (1 row) ROLLBACK; -- Test with partitioned table CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); SET citus.shard_replication_factor TO 1; -- create its partitions CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); -- load some data and distribute tables INSERT INTO partitioning_test VALUES (1, '2009-06-06'); INSERT INTO partitioning_test VALUES (2, '2010-07-07'); SELECT create_distributed_table('partitioning_test', 'id'); NOTICE: Copying data from local table... NOTICE: Copying data from local table... create_distributed_table -------------------------- (1 row) BEGIN; SELECT COUNT(*) FROM partitioning_test; count ------- 2 (1 row) INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09'); INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03'); SELECT COUNT(*) FROM partitioning_test; count ------- 4 (1 row) COMMIT; DROP TABLE partitioning_test; -- Test with create-drop table BEGIN; CREATE TABLE test_table_inn(id int, num_1 int); SELECT create_distributed_table('test_table_inn','id'); create_distributed_table -------------------------- (1 row) INSERT INTO test_table_inn VALUES(1,3),(4,5),(6,7); SELECT COUNT(*) FROM test_table_inn; count ------- 3 (1 row) DROP TABLE test_table_inn; COMMIT; -- Test with utility functions BEGIN; SELECT COUNT(*) FROM test_table; count ------- 6 (1 row) CREATE INDEX tt_ind_1 ON test_table(col_1); ALTER TABLE test_table ADD CONSTRAINT num_check CHECK (col_1 < 50); SELECT COUNT(*) FROM test_table; count ------- 6 (1 row) ROLLBACK; -- We don't get a distributed transaction id outside a transaction block SELECT (get_current_transaction_id()).transaction_number > 0 FROM test_table LIMIT 1; ?column? ---------- f (1 row) -- We should get a distributed transaction id inside a transaction block BEGIN; SELECT (get_current_transaction_id()).transaction_number > 0 FROM test_table LIMIT 1; ?column? ---------- t (1 row) END; -- Add a function to insert a row into a table SELECT public.run_command_on_master_and_workers($$ CREATE FUNCTION multi_real_time_transaction.insert_row_test(table_name name) RETURNS bool AS $BODY$ BEGIN EXECUTE format('INSERT INTO %s VALUES(100,100,''function'')', table_name); RETURN true; END; $BODY$ LANGUAGE plpgsql; $$); run_command_on_master_and_workers ----------------------------------- (1 row) -- SELECT should be rolled back because we send BEGIN BEGIN; SELECT count(*) FROM test_table; count ------- 6 (1 row) -- Sneakily insert directly into shards SELECT insert_row_test(pg_typeof(test_table)::name) FROM test_table; insert_row_test ----------------- t t t t t t (6 rows) SELECT count(*) FROM test_table; count ------- 12 (1 row) ABORT; SELECT count(*) FROM test_table; count ------- 6 (1 row) -- Test with foreign key ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id); ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE; BEGIN; DELETE FROM test_table where id = 1 or id = 3; SELECT * FROM co_test_table; id | col_1 | col_2 ----+-------+-------- 2 | 30 | 'bb10' (1 row) ROLLBACK; -- Test cancelling behaviour. See https://github.com/citusdata/citus/pull/1905. -- Repeating it multiple times to increase the chance of failure before PR #1905. SET client_min_messages TO ERROR; alter system set deadlock_timeout TO '50ms'; SELECT pg_reload_conf(); pg_reload_conf ---------------- t (1 row) BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock ROLLBACK; SET client_min_messages TO DEFAULT; alter system set deadlock_timeout TO DEFAULT; SELECT pg_reload_conf(); pg_reload_conf ---------------- t (1 row) DROP SCHEMA multi_real_time_transaction CASCADE; NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table test_table drop cascades to table co_test_table drop cascades to table ref_test_table drop cascades to function insert_row_test(name)