SET citus.next_shard_id TO 1200000; SET citus.next_placement_id TO 1200000; -- =================================================================== -- test end-to-end modification functionality -- =================================================================== CREATE TABLE researchers ( id bigint NOT NULL, lab_id int NOT NULL, name text NOT NULL ); CREATE TABLE labs ( id bigint NOT NULL, name text NOT NULL ); SELECT master_create_distributed_table('researchers', 'lab_id', 'hash'); master_create_distributed_table --------------------------------- (1 row) SELECT master_create_worker_shards('researchers', 2, 2); master_create_worker_shards ----------------------------- (1 row) SELECT master_create_distributed_table('labs', 'id', 'hash'); master_create_distributed_table --------------------------------- (1 row) SELECT master_create_worker_shards('labs', 1, 1); master_create_worker_shards ----------------------------- (1 row) -- might be confusing to have two people in the same lab with the same name CREATE UNIQUE INDEX avoid_name_confusion_idx ON researchers (lab_id, name); -- add some data INSERT INTO researchers VALUES (1, 1, 'Donald Knuth'); INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth'); INSERT INTO researchers VALUES (3, 2, 'Tony Hoare'); INSERT INTO researchers VALUES (4, 2, 'Kenneth Iverson'); -- replace a researcher, reusing their id in a multi-row INSERT BEGIN; DELETE FROM researchers WHERE lab_id = 1 AND id = 2; INSERT INTO researchers VALUES (2, 1, 'John Backus'), (12, 1, 'Frances E. Allen'); COMMIT; SELECT name FROM researchers WHERE lab_id = 1 AND id % 10 = 2; name ------------------ John Backus Frances E. Allen (2 rows) -- and the other way around BEGIN; INSERT INTO researchers VALUES (14, 2, 'Alan Kay'), (15, 2, 'Barbara Liskov'); DELETE FROM researchers WHERE id = 14 AND lab_id = 2; ROLLBACK; -- should have rolled everything back SELECT * FROM researchers WHERE id = 15 AND lab_id = 2; id | lab_id | name ----+--------+------ (0 rows) -- abort a modification BEGIN; DELETE FROM researchers WHERE lab_id = 1 AND id = 1; ABORT; SELECT name FROM researchers WHERE lab_id = 1 AND id = 1; name -------------- Donald Knuth (1 row) -- trigger a unique constraint violation BEGIN; UPDATE researchers SET name = 'John Backus' WHERE id = 1 AND lab_id = 1; ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200000" DETAIL: Key (lab_id, name)=(1, John Backus) already exists. CONTEXT: while executing command on localhost:57637 ABORT; -- creating savepoints should work... BEGIN; INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie'); SAVEPOINT hire_thompson; INSERT INTO researchers VALUES (6, 3, 'Ken Thompson'); COMMIT; SELECT name FROM researchers WHERE lab_id = 3 AND id = 6; name -------------- Ken Thompson (1 row) -- even if created by PL/pgSQL... \set VERBOSITY terse BEGIN; DO $$ BEGIN INSERT INTO researchers VALUES (10, 10, 'Edsger Dijkstra'); EXCEPTION WHEN not_null_violation THEN RAISE NOTICE 'caught not_null_violation'; END $$; COMMIT; -- rollback should also work BEGIN; INSERT INTO researchers VALUES (7, 4, 'Jim Gray'); SAVEPOINT hire_engelbart; INSERT INTO researchers VALUES (8, 4, 'Douglas Engelbart'); ROLLBACK TO hire_engelbart; COMMIT; SELECT name FROM researchers WHERE lab_id = 4; name ---------- Jim Gray (1 row) BEGIN; DO $$ BEGIN INSERT INTO researchers VALUES (11, 11, 'Whitfield Diffie'); INSERT INTO researchers VALUES (NULL, 10, 'Edsger Dijkstra'); EXCEPTION WHEN not_null_violation THEN RAISE NOTICE 'caught not_null_violation'; END $$; NOTICE: caught not_null_violation COMMIT; \set VERBOSITY default -- should be valid to edit labs after researchers... BEGIN; INSERT INTO researchers VALUES (8, 5, 'Douglas Engelbart'); INSERT INTO labs VALUES (5, 'Los Alamos'); COMMIT; SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id AND researchers.lab_id = 5; id | lab_id | name | id | name ----+--------+-------------------+----+------------ 8 | 5 | Douglas Engelbart | 5 | Los Alamos (1 row) -- and the other way around is also allowed BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); COMMIT; -- we should be able to expand the transaction participants BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200001" DETAIL: Key (lab_id, name)=(6, Leslie Lamport) already exists. CONTEXT: while executing command on localhost:57638 ABORT; -- SELECTs may occur after a modification: First check that selecting -- from the modified node works. BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; count ------- 1 (1 row) ABORT; -- then check that SELECT going to new node still is fine BEGIN; UPDATE pg_dist_shard_placement AS sp SET shardstate = 3 FROM pg_dist_shard AS s WHERE sp.shardid = s.shardid AND sp.nodename = 'localhost' AND sp.nodeport = :worker_1_port AND s.logicalrelid = 'researchers'::regclass; INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; count ------- 1 (1 row) ABORT; -- we can mix DDL and INSERT BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); ALTER TABLE labs ADD COLUMN motto text; ABORT; -- whether it occurs first or second BEGIN; ALTER TABLE labs ADD COLUMN motto text; INSERT INTO labs VALUES (6, 'Bell Labs'); ABORT; -- but the DDL should correctly roll back SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass; Column | Type | Modifiers --------+--------+----------- id | bigint | not null name | text | not null (2 rows) SELECT * FROM labs WHERE id = 6; id | name ----+----------- 6 | Bell Labs (1 row) -- COPY can happen after single row INSERT BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); \copy labs from stdin delimiter ',' COMMIT; -- COPY cannot be performed if multiple shards were modified over the same connection BEGIN; INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'); INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie'); \copy researchers from stdin delimiter ',' ERROR: cannot establish a new connection for placement 1200003, since DML has been executed on a connection that is in use CONTEXT: COPY researchers, line 2: "10,6,Lesport Lampie" ROLLBACK; -- COPY cannot be performed after a multi-row INSERT that uses one connection BEGIN; INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'), (10, 6, 'Lamport Leslie'); \copy researchers from stdin delimiter ',' ERROR: cannot establish a new connection for placement 1200003, since DML has been executed on a connection that is in use CONTEXT: COPY researchers, line 2: "10,6,Lesport Lampie" ROLLBACK; -- after a COPY you can modify multiple shards, since they'll use different connections BEGIN; \copy researchers from stdin delimiter ',' INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'); INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie'); ROLLBACK; -- after a COPY you can perform a multi-row INSERT BEGIN; \copy researchers from stdin delimiter ',' INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'), (10, 6, 'Lamport Leslie'); ROLLBACK; -- COPY can happen before single row INSERT BEGIN; \copy labs from stdin delimiter ',' SELECT name FROM labs WHERE id = 10; name ---------------- Weyland-Yutani Weyland-Yutani (2 rows) INSERT INTO labs VALUES (6, 'Bell Labs'); COMMIT; -- two consecutive COPYs in a transaction are allowed BEGIN; \copy labs from stdin delimiter ',' \copy labs from stdin delimiter ',' COMMIT; SELECT name FROM labs WHERE id = 11 OR id = 12 ORDER BY id; name ---------------- Planet Express fsociety (2 rows) -- 1pc failure test SELECT recover_prepared_transactions(); recover_prepared_transactions ------------------------------- 0 (1 row) -- copy with unique index violation BEGIN; \copy researchers FROM STDIN delimiter ',' \copy researchers FROM STDIN delimiter ',' ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200001" DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists. COMMIT; -- verify rollback SELECT * FROM researchers WHERE lab_id = 6; id | lab_id | name ----+--------+---------------- 9 | 6 | Leslie Lamport (1 row) SELECT count(*) FROM pg_dist_transaction; count ------- 0 (1 row) -- 2pc failure and success tests SET citus.multi_shard_commit_protocol TO '2pc'; SELECT recover_prepared_transactions(); recover_prepared_transactions ------------------------------- 0 (1 row) -- copy with unique index violation BEGIN; \copy researchers FROM STDIN delimiter ',' \copy researchers FROM STDIN delimiter ',' ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200001" DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists. COMMIT; -- verify rollback SELECT * FROM researchers WHERE lab_id = 6; id | lab_id | name ----+--------+---------------- 9 | 6 | Leslie Lamport (1 row) SELECT count(*) FROM pg_dist_transaction; count ------- 0 (1 row) BEGIN; \copy researchers FROM STDIN delimiter ',' \copy researchers FROM STDIN delimiter ',' COMMIT; -- verify success SELECT * FROM researchers WHERE lab_id = 6; id | lab_id | name ----+--------+---------------------- 9 | 6 | Leslie Lamport 17 | 6 | 'Bjarne Stroustrup' 18 | 6 | 'Dennis Ritchie' (3 rows) -- verify 2pc SELECT count(*) FROM pg_dist_transaction; count ------- 2 (1 row) RESET citus.multi_shard_commit_protocol; -- create a check function SELECT * from run_command_on_workers('CREATE FUNCTION reject_large_id() RETURNS trigger AS $rli$ BEGIN IF (NEW.id > 30) THEN RAISE ''illegal value''; END IF; RETURN NEW; END; $rli$ LANGUAGE plpgsql;') ORDER BY nodeport; nodename | nodeport | success | result -----------+----------+---------+----------------- localhost | 57637 | t | CREATE FUNCTION localhost | 57638 | t | CREATE FUNCTION (2 rows) -- register after insert trigger SELECT * FROM run_command_on_placements('researchers', 'CREATE CONSTRAINT TRIGGER reject_large_researcher_id AFTER INSERT ON %s DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE reject_large_id()') ORDER BY nodeport, shardid; nodename | nodeport | shardid | success | result -----------+----------+---------+---------+---------------- localhost | 57637 | 1200000 | t | CREATE TRIGGER localhost | 57637 | 1200001 | t | CREATE TRIGGER localhost | 57638 | 1200000 | t | CREATE TRIGGER localhost | 57638 | 1200001 | t | CREATE TRIGGER (4 rows) -- hide postgresql version dependend messages for next test only \set VERBOSITY terse -- deferred check should abort the transaction BEGIN; SET LOCAL citus.multi_shard_commit_protocol TO '1pc'; DELETE FROM researchers WHERE lab_id = 6; \copy researchers FROM STDIN delimiter ',' \copy researchers FROM STDIN delimiter ',' COMMIT; WARNING: illegal value WARNING: failed to commit critical transaction on localhost:57638, metadata is likely out of sync WARNING: illegal value WARNING: failed to commit critical transaction on localhost:57637, metadata is likely out of sync WARNING: could not commit transaction for shard 1200001 on any active node ERROR: could not commit transaction on any active node \unset VERBOSITY -- verify everyhing including delete is rolled back SELECT * FROM researchers WHERE lab_id = 6; id | lab_id | name ----+--------+---------------------- 9 | 6 | Leslie Lamport 17 | 6 | 'Bjarne Stroustrup' 18 | 6 | 'Dennis Ritchie' (3 rows) -- cleanup triggers and the function SELECT * from run_command_on_placements('researchers', 'drop trigger reject_large_researcher_id on %s') ORDER BY nodeport, shardid; nodename | nodeport | shardid | success | result -----------+----------+---------+---------+-------------- localhost | 57637 | 1200000 | t | DROP TRIGGER localhost | 57637 | 1200001 | t | DROP TRIGGER localhost | 57638 | 1200000 | t | DROP TRIGGER localhost | 57638 | 1200001 | t | DROP TRIGGER (4 rows) SELECT * FROM run_command_on_workers('drop function reject_large_id()') ORDER BY nodeport; nodename | nodeport | success | result -----------+----------+---------+--------------- localhost | 57637 | t | DROP FUNCTION localhost | 57638 | t | DROP FUNCTION (2 rows) -- ALTER and copy are compatible BEGIN; ALTER TABLE labs ADD COLUMN motto text; \copy labs from stdin delimiter ',' ROLLBACK; BEGIN; \copy labs from stdin delimiter ',' ALTER TABLE labs ADD COLUMN motto text; ABORT; -- cannot perform DDL once a connection is used for multiple shards BEGIN; SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0; lab_id -------- (0 rows) SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0; lab_id -------- (0 rows) ALTER TABLE researchers ADD COLUMN motto text; ERROR: cannot perform a parallel DDL command because multiple placements have been accessed over the same connection ROLLBACK; -- multi-shard operations can co-exist with DDL in a transactional way BEGIN; ALTER TABLE labs ADD COLUMN motto text; SELECT master_modify_multiple_shards('DELETE FROM labs'); master_modify_multiple_shards ------------------------------- 8 (1 row) ALTER TABLE labs ADD COLUMN score float; ROLLBACK; -- should have rolled everything back SELECT * FROM labs WHERE id = 12; id | name ----+---------- 12 | fsociety (1 row) -- now, for some special failures... CREATE TABLE objects ( id bigint PRIMARY KEY, name text NOT NULL ); SELECT master_create_distributed_table('objects', 'id', 'hash'); master_create_distributed_table --------------------------------- (1 row) SELECT master_create_worker_shards('objects', 1, 2); master_create_worker_shards ----------------------------- (1 row) -- test primary key violations BEGIN; INSERT INTO objects VALUES (1, 'apple'); INSERT INTO objects VALUES (1, 'orange'); ERROR: duplicate key value violates unique constraint "objects_pkey_1200003" DETAIL: Key (id)=(1) already exists. CONTEXT: while executing command on localhost:57637 COMMIT; -- data shouldn't have persisted... SELECT * FROM objects WHERE id = 1; id | name ----+------ (0 rows) -- and placements should still be healthy... SELECT count(*) FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid AND sp.shardstate = 1 AND s.logicalrelid = 'objects'::regclass; count ------- 2 (1 row) -- create trigger on one worker to reject certain values \c - - - :worker_2_port CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$ BEGIN IF (NEW.name = 'BAD') THEN RAISE 'illegal value'; END IF; RETURN NEW; END; $rb$ LANGUAGE plpgsql; CREATE CONSTRAINT TRIGGER reject_bad AFTER INSERT ON objects_1200003 DEFERRABLE INITIALLY IMMEDIATE FOR EACH ROW EXECUTE PROCEDURE reject_bad(); \c - - - :master_port -- test partial failure; worker_1 succeeds, 2 fails \set VERBOSITY terse BEGIN; INSERT INTO objects VALUES (1, 'apple'); INSERT INTO objects VALUES (2, 'BAD'); WARNING: illegal value INSERT INTO labs VALUES (7, 'E Corp'); COMMIT; -- data should be persisted SELECT * FROM objects WHERE id = 2; id | name ----+------ 2 | BAD (1 row) SELECT * FROM labs WHERE id = 7; id | name ----+-------- 7 | E Corp (1 row) -- but one placement should be bad SELECT count(*) FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid AND sp.nodename = 'localhost' AND sp.nodeport = :worker_2_port AND sp.shardstate = 3 AND s.logicalrelid = 'objects'::regclass; count ------- 1 (1 row) DELETE FROM objects; -- mark shards as healthy again; delete all data UPDATE pg_dist_shard_placement AS sp SET shardstate = 1 FROM pg_dist_shard AS s WHERE sp.shardid = s.shardid AND s.logicalrelid = 'objects'::regclass; -- what if there are errors on different shards at different times? \c - - - :worker_1_port CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$ BEGIN IF (NEW.name = 'BAD') THEN RAISE 'illegal value'; END IF; RETURN NEW; END; $rb$ LANGUAGE plpgsql; CREATE CONSTRAINT TRIGGER reject_bad AFTER INSERT ON labs_1200002 DEFERRABLE INITIALLY IMMEDIATE FOR EACH ROW EXECUTE PROCEDURE reject_bad(); \c - - - :master_port BEGIN; INSERT INTO objects VALUES (1, 'apple'); INSERT INTO objects VALUES (2, 'BAD'); WARNING: illegal value INSERT INTO labs VALUES (8, 'Aperture Science'); INSERT INTO labs VALUES (9, 'BAD'); ERROR: illegal value COMMIT; -- data should NOT be persisted SELECT * FROM objects WHERE id = 1; id | name ----+------ (0 rows) SELECT * FROM labs WHERE id = 8; id | name ----+------ (0 rows) -- all placements should remain healthy SELECT count(*) FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid AND sp.shardstate = 1 AND (s.logicalrelid = 'objects'::regclass OR s.logicalrelid = 'labs'::regclass); count ------- 3 (1 row) -- what if the failures happen at COMMIT time? \c - - - :worker_2_port DROP TRIGGER reject_bad ON objects_1200003; CREATE CONSTRAINT TRIGGER reject_bad AFTER INSERT ON objects_1200003 DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE reject_bad(); \c - - - :master_port -- should be the same story as before, just at COMMIT time BEGIN; INSERT INTO objects VALUES (1, 'apple'); INSERT INTO objects VALUES (2, 'BAD'); INSERT INTO labs VALUES (9, 'Umbrella Corporation'); COMMIT; WARNING: illegal value WARNING: failed to commit transaction on localhost:57638 -- data should be persisted SELECT * FROM objects WHERE id = 2; id | name ----+------ 2 | BAD (1 row) SELECT * FROM labs WHERE id = 7; id | name ----+-------- 7 | E Corp (1 row) -- but one placement should be bad SELECT count(*) FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid AND sp.nodename = 'localhost' AND sp.nodeport = :worker_2_port AND sp.shardstate = 3 AND s.logicalrelid = 'objects'::regclass; count ------- 1 (1 row) DELETE FROM objects; -- mark shards as healthy again; delete all data UPDATE pg_dist_shard_placement AS sp SET shardstate = 1 FROM pg_dist_shard AS s WHERE sp.shardid = s.shardid AND s.logicalrelid = 'objects'::regclass; -- what if all nodes have failures at COMMIT time? \c - - - :worker_1_port DROP TRIGGER reject_bad ON labs_1200002; CREATE CONSTRAINT TRIGGER reject_bad AFTER INSERT ON labs_1200002 DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE reject_bad(); \c - - - :master_port BEGIN; INSERT INTO objects VALUES (1, 'apple'); INSERT INTO objects VALUES (2, 'BAD'); INSERT INTO labs VALUES (8, 'Aperture Science'); INSERT INTO labs VALUES (9, 'BAD'); COMMIT; WARNING: illegal value WARNING: failed to commit transaction on localhost:57637 WARNING: illegal value WARNING: failed to commit transaction on localhost:57638 WARNING: could not commit transaction for shard 1200002 on any active node WARNING: could not commit transaction for shard 1200003 on any active node ERROR: could not commit transaction on any active node -- data should NOT be persisted SELECT * FROM objects WHERE id = 1; id | name ----+------ (0 rows) SELECT * FROM labs WHERE id = 8; id | name ----+------ (0 rows) -- all placements should remain healthy SELECT count(*) FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid AND sp.shardstate = 1 AND (s.logicalrelid = 'objects'::regclass OR s.logicalrelid = 'labs'::regclass); count ------- 3 (1 row) -- what if one shard (objects) succeeds but another (labs) completely fails? \c - - - :worker_2_port DROP TRIGGER reject_bad ON objects_1200003; \c - - - :master_port SET citus.next_shard_id TO 1200004; BEGIN; INSERT INTO objects VALUES (1, 'apple'); INSERT INTO labs VALUES (8, 'Aperture Science'); INSERT INTO labs VALUES (9, 'BAD'); COMMIT; WARNING: illegal value WARNING: failed to commit transaction on localhost:57637 WARNING: could not commit transaction for shard 1200002 on any active node \set VERBOSITY default -- data to objects should be persisted, but labs should not... SELECT * FROM objects WHERE id = 1; id | name ----+------- 1 | apple (1 row) SELECT * FROM labs WHERE id = 8; id | name ----+------ (0 rows) -- labs should be healthy, but one object placement shouldn't be SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid AND (s.logicalrelid = 'objects'::regclass OR s.logicalrelid = 'labs'::regclass) GROUP BY s.logicalrelid, sp.shardstate ORDER BY s.logicalrelid, sp.shardstate; logicalrelid | shardstate | count --------------+------------+------- labs | 1 | 1 objects | 1 | 1 objects | 3 | 1 (3 rows) -- some append-partitioned tests for good measure CREATE TABLE append_researchers ( LIKE researchers ); SELECT master_create_distributed_table('append_researchers', 'id', 'append'); master_create_distributed_table --------------------------------- (1 row) SET citus.shard_replication_factor TO 1; SELECT master_create_empty_shard('append_researchers') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 500000 WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('append_researchers') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 500000, shardmaxvalue = 1000000 WHERE shardid = :new_shard_id; SET citus.shard_replication_factor TO DEFAULT; -- try single-shard INSERT BEGIN; INSERT INTO append_researchers VALUES (0, 0, 'John Backus'); COMMIT; SELECT * FROM append_researchers WHERE id = 0; id | lab_id | name ----+--------+------------- 0 | 0 | John Backus (1 row) -- try rollback BEGIN; DELETE FROM append_researchers WHERE id = 0; ROLLBACK; SELECT * FROM append_researchers WHERE id = 0; id | lab_id | name ----+--------+------------- 0 | 0 | John Backus (1 row) -- try hitting shard on other node BEGIN; INSERT INTO append_researchers VALUES (1, 1, 'John McCarthy'); INSERT INTO append_researchers VALUES (500000, 500000, 'Tony Hoare'); ERROR: cannot run INSERT command which targets multiple shards HINT: Make sure the value for partition column "id" falls into a single shard. ROLLBACK; SELECT * FROM append_researchers; id | lab_id | name ----+--------+------------- 0 | 0 | John Backus (1 row) -- we use 2PC for reference tables by default -- let's add some tests for them CREATE TABLE reference_modifying_xacts (key int, value int); SELECT create_reference_table('reference_modifying_xacts'); create_reference_table ------------------------ (1 row) -- very basic test, ensure that INSERTs work INSERT INTO reference_modifying_xacts VALUES (1, 1); SELECT * FROM reference_modifying_xacts; key | value -----+------- 1 | 1 (1 row) -- now ensure that it works in a transaction as well BEGIN; INSERT INTO reference_modifying_xacts VALUES (2, 2); SELECT * FROM reference_modifying_xacts; key | value -----+------- 1 | 1 2 | 2 (2 rows) COMMIT; -- we should be able to see the insert outside of the transaction as well SELECT * FROM reference_modifying_xacts; key | value -----+------- 1 | 1 2 | 2 (2 rows) -- rollback should also work BEGIN; INSERT INTO reference_modifying_xacts VALUES (3, 3); SELECT * FROM reference_modifying_xacts; key | value -----+------- 1 | 1 2 | 2 3 | 3 (3 rows) ROLLBACK; -- see that we've not inserted SELECT * FROM reference_modifying_xacts; key | value -----+------- 1 | 1 2 | 2 (2 rows) -- lets fail on of the workers at before the commit time \c - - - :worker_1_port CREATE FUNCTION reject_bad_reference() RETURNS trigger AS $rb$ BEGIN IF (NEW.key = 999) THEN RAISE 'illegal value'; END IF; RETURN NEW; END; $rb$ LANGUAGE plpgsql; CREATE CONSTRAINT TRIGGER reject_bad_reference AFTER INSERT ON reference_modifying_xacts_1200006 DEFERRABLE INITIALLY IMMEDIATE FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference(); \c - - - :master_port \set VERBOSITY terse -- try without wrapping inside a transaction INSERT INTO reference_modifying_xacts VALUES (999, 3); ERROR: illegal value -- same test within a transaction BEGIN; INSERT INTO reference_modifying_xacts VALUES (999, 3); ERROR: illegal value COMMIT; -- lets fail one of the workers at COMMIT time \c - - - :worker_1_port DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006; CREATE CONSTRAINT TRIGGER reject_bad_reference AFTER INSERT ON reference_modifying_xacts_1200006 DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference(); \c - - - :master_port \set VERBOSITY terse -- try without wrapping inside a transaction INSERT INTO reference_modifying_xacts VALUES (999, 3); WARNING: illegal value ERROR: failure on connection marked as essential: localhost:57637 -- same test within a transaction BEGIN; INSERT INTO reference_modifying_xacts VALUES (999, 3); COMMIT; WARNING: illegal value ERROR: failure on connection marked as essential: localhost:57637 -- all placements should be healthy SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid AND s.logicalrelid = 'reference_modifying_xacts'::regclass GROUP BY s.logicalrelid, sp.shardstate ORDER BY s.logicalrelid, sp.shardstate; logicalrelid | shardstate | count ---------------------------+------------+------- reference_modifying_xacts | 1 | 2 (1 row) -- for the time-being drop the constraint \c - - - :worker_1_port DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006; \c - - - :master_port -- now create a hash distributed table and run tests -- including both the reference table and the hash -- distributed table SET citus.next_shard_id TO 1200007; SET citus.shard_count = 4; SET citus.shard_replication_factor = 1; CREATE TABLE hash_modifying_xacts (key int, value int); SELECT create_distributed_table('hash_modifying_xacts', 'key'); create_distributed_table -------------------------- (1 row) -- let's try to expand the xact participants BEGIN; INSERT INTO hash_modifying_xacts VALUES (1, 1); INSERT INTO reference_modifying_xacts VALUES (10, 10); COMMIT; -- it is allowed when turning off deadlock prevention BEGIN; INSERT INTO hash_modifying_xacts VALUES (1, 1); INSERT INTO reference_modifying_xacts VALUES (10, 10); ABORT; BEGIN; INSERT INTO hash_modifying_xacts VALUES (1, 1); INSERT INTO hash_modifying_xacts VALUES (2, 2); ABORT; -- lets fail one of the workers before COMMIT time for the hash table \c - - - :worker_1_port CREATE FUNCTION reject_bad_hash() RETURNS trigger AS $rb$ BEGIN IF (NEW.key = 997) THEN RAISE 'illegal value'; END IF; RETURN NEW; END; $rb$ LANGUAGE plpgsql; CREATE CONSTRAINT TRIGGER reject_bad_hash AFTER INSERT ON hash_modifying_xacts_1200007 DEFERRABLE INITIALLY IMMEDIATE FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash(); \c - - - :master_port \set VERBOSITY terse -- the transaction as a whole should fail BEGIN; INSERT INTO reference_modifying_xacts VALUES (55, 10); INSERT INTO hash_modifying_xacts VALUES (997, 1); ERROR: illegal value COMMIT; -- ensure that the value didn't go into the reference table SELECT * FROM reference_modifying_xacts WHERE key = 55; key | value -----+------- (0 rows) -- now lets fail on of the workers for the hash distributed table table -- when there is a reference table involved \c - - - :worker_1_port DROP TRIGGER reject_bad_hash ON hash_modifying_xacts_1200007; -- the trigger is on execution time CREATE CONSTRAINT TRIGGER reject_bad_hash AFTER INSERT ON hash_modifying_xacts_1200007 DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash(); \c - - - :master_port \set VERBOSITY terse -- the transaction as a whole should fail BEGIN; INSERT INTO reference_modifying_xacts VALUES (12, 12); INSERT INTO hash_modifying_xacts VALUES (997, 1); COMMIT; WARNING: illegal value ERROR: failure on connection marked as essential: localhost:57637 -- ensure that the values didn't go into the reference table SELECT * FROM reference_modifying_xacts WHERE key = 12; key | value -----+------- (0 rows) -- all placements should be healthy SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR s.logicalrelid = 'hash_modifying_xacts'::regclass) GROUP BY s.logicalrelid, sp.shardstate ORDER BY s.logicalrelid, sp.shardstate; logicalrelid | shardstate | count ---------------------------+------------+------- reference_modifying_xacts | 1 | 2 hash_modifying_xacts | 1 | 4 (2 rows) -- now, fail the insert on reference table -- and ensure that hash distributed table's -- change is rollbacked as well \c - - - :worker_1_port CREATE CONSTRAINT TRIGGER reject_bad_reference AFTER INSERT ON reference_modifying_xacts_1200006 DEFERRABLE INITIALLY IMMEDIATE FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference(); \c - - - :master_port \set VERBOSITY terse BEGIN; -- to expand participant to include all worker nodes INSERT INTO reference_modifying_xacts VALUES (66, 3); INSERT INTO hash_modifying_xacts VALUES (80, 1); INSERT INTO reference_modifying_xacts VALUES (999, 3); ERROR: illegal value COMMIT; SELECT * FROM hash_modifying_xacts WHERE key = 80; key | value -----+------- (0 rows) SELECT * FROM reference_modifying_xacts WHERE key = 66; key | value -----+------- (0 rows) SELECT * FROM reference_modifying_xacts WHERE key = 999; key | value -----+------- (0 rows) -- all placements should be healthy SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR s.logicalrelid = 'hash_modifying_xacts'::regclass) GROUP BY s.logicalrelid, sp.shardstate ORDER BY s.logicalrelid, sp.shardstate; logicalrelid | shardstate | count ---------------------------+------------+------- reference_modifying_xacts | 1 | 2 hash_modifying_xacts | 1 | 4 (2 rows) -- now show that all modifications to reference -- tables are done in 2PC SELECT recover_prepared_transactions(); recover_prepared_transactions ------------------------------- 0 (1 row) INSERT INTO reference_modifying_xacts VALUES (70, 70); SELECT count(*) FROM pg_dist_transaction; count ------- 2 (1 row) -- reset the transactions table SELECT recover_prepared_transactions(); recover_prepared_transactions ------------------------------- 0 (1 row) BEGIN; INSERT INTO reference_modifying_xacts VALUES (71, 71); COMMIT; SELECT count(*) FROM pg_dist_transaction; count ------- 2 (1 row) -- create a hash distributed tablw which spans all nodes SET citus.shard_count = 4; SET citus.shard_replication_factor = 2; CREATE TABLE hash_modifying_xacts_second (key int, value int); SELECT create_distributed_table('hash_modifying_xacts_second', 'key'); create_distributed_table -------------------------- (1 row) -- reset the transactions table SELECT recover_prepared_transactions(); recover_prepared_transactions ------------------------------- 0 (1 row) BEGIN; INSERT INTO hash_modifying_xacts_second VALUES (72, 1); INSERT INTO reference_modifying_xacts VALUES (72, 3); COMMIT; SELECT count(*) FROM pg_dist_transaction; count ------- 2 (1 row) -- reset the transactions table SELECT recover_prepared_transactions(); recover_prepared_transactions ------------------------------- 0 (1 row) DELETE FROM reference_modifying_xacts; SELECT count(*) FROM pg_dist_transaction; count ------- 2 (1 row) -- reset the transactions table SELECT recover_prepared_transactions(); recover_prepared_transactions ------------------------------- 0 (1 row) UPDATE reference_modifying_xacts SET key = 10; SELECT count(*) FROM pg_dist_transaction; count ------- 2 (1 row) -- now to one more type of failure testing -- in which we'll make the remote host unavailable -- first create the new user on all nodes CREATE USER test_user; NOTICE: not propagating CREATE ROLE/USER commands to worker nodes \c - - - :worker_1_port CREATE USER test_user; NOTICE: not propagating CREATE ROLE/USER commands to worker nodes \c - - - :worker_2_port CREATE USER test_user; NOTICE: not propagating CREATE ROLE/USER commands to worker nodes -- now connect back to the master with the new user \c - test_user - :master_port SET citus.next_shard_id TO 1200015; CREATE TABLE reference_failure_test (key int, value int); SELECT create_reference_table('reference_failure_test'); create_reference_table ------------------------ (1 row) -- create a hash distributed table SET citus.shard_count TO 4; CREATE TABLE numbers_hash_failure_test(key int, value int); SELECT create_distributed_table('numbers_hash_failure_test', 'key'); create_distributed_table -------------------------- (1 row) -- ensure that the shard is created for this user \c - test_user - :worker_1_port \dt reference_failure_test_1200015 List of relations Schema | Name | Type | Owner --------+--------------------------------+-------+----------- public | reference_failure_test_1200015 | table | test_user (1 row) -- now connect with the default user, -- and rename the existing user \c - :default_user - :worker_1_port ALTER USER test_user RENAME TO test_user_new; -- connect back to master and query the reference table \c - test_user - :master_port -- should fail since the worker doesn't have test_user anymore INSERT INTO reference_failure_test VALUES (1, '1'); WARNING: connection error: localhost:57637 ERROR: failure on connection marked as essential: localhost:57637 -- the same as the above, but wrapped within a transaction BEGIN; INSERT INTO reference_failure_test VALUES (1, '1'); WARNING: connection error: localhost:57637 ERROR: failure on connection marked as essential: localhost:57637 COMMIT; BEGIN; COPY reference_failure_test FROM STDIN WITH (FORMAT 'csv'); ERROR: connection error: localhost:57637 COMMIT; -- show that no data go through the table and shard states are good SELECT * FROM reference_failure_test; key | value -----+------- (0 rows) -- all placements should be healthy SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid AND s.logicalrelid = 'reference_failure_test'::regclass GROUP BY s.logicalrelid, sp.shardstate ORDER BY s.logicalrelid, sp.shardstate; logicalrelid | shardstate | count ------------------------+------------+------- reference_failure_test | 1 | 2 (1 row) BEGIN; COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv'); WARNING: connection error: localhost:57637 WARNING: connection error: localhost:57637 -- some placements are invalid before abort SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) WHERE logicalrelid = 'numbers_hash_failure_test'::regclass ORDER BY shardid, nodeport; shardid | shardstate | nodename | nodeport ---------+------------+-----------+---------- 1200016 | 3 | localhost | 57637 1200016 | 1 | localhost | 57638 1200017 | 1 | localhost | 57637 1200017 | 1 | localhost | 57638 1200018 | 1 | localhost | 57637 1200018 | 1 | localhost | 57638 1200019 | 3 | localhost | 57637 1200019 | 1 | localhost | 57638 (8 rows) ABORT; -- verify nothing is inserted SELECT count(*) FROM numbers_hash_failure_test; WARNING: connection error: localhost:57637 WARNING: connection error: localhost:57637 count ------- 0 (1 row) -- all placements to be market valid SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) WHERE logicalrelid = 'numbers_hash_failure_test'::regclass ORDER BY shardid, nodeport; shardid | shardstate | nodename | nodeport ---------+------------+-----------+---------- 1200016 | 1 | localhost | 57637 1200016 | 1 | localhost | 57638 1200017 | 1 | localhost | 57637 1200017 | 1 | localhost | 57638 1200018 | 1 | localhost | 57637 1200018 | 1 | localhost | 57638 1200019 | 1 | localhost | 57637 1200019 | 1 | localhost | 57638 (8 rows) BEGIN; COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv'); WARNING: connection error: localhost:57637 WARNING: connection error: localhost:57637 -- check shard states before commit SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) WHERE logicalrelid = 'numbers_hash_failure_test'::regclass ORDER BY shardid, nodeport; shardid | shardstate | nodename | nodeport ---------+------------+-----------+---------- 1200016 | 3 | localhost | 57637 1200016 | 1 | localhost | 57638 1200017 | 1 | localhost | 57637 1200017 | 1 | localhost | 57638 1200018 | 1 | localhost | 57637 1200018 | 1 | localhost | 57638 1200019 | 3 | localhost | 57637 1200019 | 1 | localhost | 57638 (8 rows) COMMIT; -- expect some placements to be market invalid after commit SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) WHERE logicalrelid = 'numbers_hash_failure_test'::regclass ORDER BY shardid, nodeport; shardid | shardstate | nodename | nodeport ---------+------------+-----------+---------- 1200016 | 3 | localhost | 57637 1200016 | 1 | localhost | 57638 1200017 | 1 | localhost | 57637 1200017 | 1 | localhost | 57638 1200018 | 1 | localhost | 57637 1200018 | 1 | localhost | 57638 1200019 | 3 | localhost | 57637 1200019 | 1 | localhost | 57638 (8 rows) -- verify data is inserted SELECT count(*) FROM numbers_hash_failure_test; WARNING: connection error: localhost:57637 WARNING: connection error: localhost:57637 count ------- 2 (1 row) -- break the other node as well \c - :default_user - :worker_2_port ALTER USER test_user RENAME TO test_user_new; \c - test_user - :master_port -- fails on all shard placements INSERT INTO numbers_hash_failure_test VALUES (2,2); WARNING: connection error: localhost:57638 ERROR: could not modify any active placements -- connect back to the master with the proper user to continue the tests \c - :default_user - :master_port SET citus.next_shard_id TO 1200020; SET citus.next_placement_id TO 1200033; -- unbreak both nodes by renaming the user back to the original name SELECT * FROM run_command_on_workers('ALTER USER test_user_new RENAME TO test_user'); nodename | nodeport | success | result -----------+----------+---------+------------ localhost | 57637 | t | ALTER ROLE localhost | 57638 | t | ALTER ROLE (2 rows) DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second, reference_failure_test, numbers_hash_failure_test; SELECT * FROM run_command_on_workers('DROP USER test_user'); nodename | nodeport | success | result -----------+----------+---------+----------- localhost | 57637 | t | DROP ROLE localhost | 57638 | t | DROP ROLE (2 rows) DROP USER test_user; -- set up foreign keys to test transactions with co-located and reference tables BEGIN; SET LOCAL citus.shard_replication_factor TO 1; SET LOCAL citus.shard_count TO 4; CREATE TABLE usergroups ( gid int PRIMARY KEY, name text ); SELECT create_reference_table('usergroups'); create_reference_table ------------------------ (1 row) CREATE TABLE itemgroups ( gid int PRIMARY KEY, name text ); SELECT create_reference_table('itemgroups'); create_reference_table ------------------------ (1 row) CREATE TABLE users ( id int PRIMARY KEY, name text, user_group int ); SELECT create_distributed_table('users', 'id'); create_distributed_table -------------------------- (1 row) CREATE TABLE items ( user_id int REFERENCES users (id) ON DELETE CASCADE, item_name text, item_group int ); SELECT create_distributed_table('items', 'user_id'); create_distributed_table -------------------------- (1 row) -- Table to find values that live in different shards on the same node SELECT id, shard_name('users', shardid), nodename, nodeport FROM pg_dist_shard_placement JOIN ( SELECT id, get_shard_id_for_distribution_column('users', id) shardid FROM generate_series(1,10) id ) ids USING (shardid) ORDER BY id; id | shard_name | nodename | nodeport ----+---------------+-----------+---------- 1 | users_1200022 | localhost | 57637 2 | users_1200025 | localhost | 57638 3 | users_1200023 | localhost | 57638 4 | users_1200023 | localhost | 57638 5 | users_1200022 | localhost | 57637 6 | users_1200024 | localhost | 57637 7 | users_1200023 | localhost | 57638 8 | users_1200022 | localhost | 57637 9 | users_1200025 | localhost | 57638 10 | users_1200022 | localhost | 57637 (10 rows) END; -- the INSERTs into items should see the users BEGIN; \COPY users FROM STDIN WITH CSV INSERT INTO items VALUES (1, 'item-1'); INSERT INTO items VALUES (6, 'item-6'); END; SELECT user_id FROM items ORDER BY user_id; user_id --------- 1 6 (2 rows) -- should not be able to open multiple connections per node after INSERTing over one connection BEGIN; INSERT INTO users VALUES (2, 'burak'); INSERT INTO users VALUES (3, 'burak'); \COPY items FROM STDIN WITH CSV ERROR: cannot establish a new connection for placement 1200042, since DML has been executed on a connection that is in use END; -- cannot perform DDL after a co-located table has been read over 1 connection BEGIN; SELECT id FROM users WHERE id = 1; id ---- 1 (1 row) SELECT id FROM users WHERE id = 6; id ---- 6 (1 row) ALTER TABLE items ADD COLUMN last_update timestamptz; ERROR: cannot perform a parallel DDL command because multiple placements have been accessed over the same connection END; -- but the other way around is fine BEGIN; ALTER TABLE items ADD COLUMN last_update timestamptz; SELECT id FROM users JOIN items ON (id = user_id) WHERE id = 1; id ---- 1 (1 row) SELECT id FROM users JOIN items ON (id = user_id) WHERE id = 6; id ---- 6 (1 row) END; BEGIN; -- establish multiple connections to a node \COPY users FROM STDIN WITH CSV -- now read from the reference table over each connection SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 2; user_id --------- (0 rows) SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 3; user_id --------- (0 rows) -- perform a DDL command on the reference table ALTER TABLE itemgroups ADD COLUMN last_update timestamptz; ERROR: cannot perform DDL on placement 1200036, which has been read over multiple connections END; BEGIN; -- establish multiple connections to a node \COPY users FROM STDIN WITH CSV -- read from the reference table over each connection SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 2; user_id --------- (0 rows) SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 3; user_id --------- (0 rows) -- perform a DDL command on a co-located reference table ALTER TABLE usergroups ADD COLUMN last_update timestamptz; ERROR: cannot perform DDL on placement 1200034 since a co-located placement has been read over multiple connections END; BEGIN; -- make a modification over connection 1 INSERT INTO usergroups VALUES (0,'istanbul'); -- copy over connections 1 and 2 \COPY users FROM STDIN WITH CSV -- cannot read modifications made over different connections SELECT id FROM users JOIN usergroups ON (gid = user_group) WHERE id = 3; ERROR: cannot perform query with placements that were modified over multiple connections END; -- make sure we can see cascading deletes BEGIN; SELECT master_modify_multiple_shards('DELETE FROM users'); master_modify_multiple_shards ------------------------------- 2 (1 row) SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 1; user_id --------- (0 rows) SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 6; user_id --------- (0 rows) END; -- test visibility after COPY INSERT INTO usergroups VALUES (2,'group'); BEGIN; -- opens two separate connections to node \COPY users FROM STDIN WITH CSV -- Uses first connection, which wrote the row with id = 2 SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 2; id | name | user_group | gid | name ----+-------+------------+-----+------- 2 | onder | 2 | 2 | group (1 row) -- Should use second connection, which wrote the row with id = 4 SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4; id | name | user_group | gid | name ----+-------+------------+-----+------- 4 | murat | 2 | 2 | group (1 row) END; DROP TABLE items, users, itemgroups, usergroups, researchers, labs;