-- -- Failure tests for COPY to reference tables -- CREATE SCHEMA copy_reference_failure; SET search_path TO 'copy_reference_failure'; SET citus.next_shard_id TO 130000; -- we don't want to see the prepared transaction numbers in the warnings SET client_min_messages TO ERROR; SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) CREATE TABLE test_table(id int, value_1 int); SELECT create_reference_table('test_table'); create_reference_table --------------------------------------------------------------------- (1 row) CREATE VIEW unhealthy_shard_count AS SELECT count(*) FROM pg_dist_shard_placement pdsp JOIN pg_dist_shard pds ON pdsp.shardid=pds.shardid WHERE logicalrelid='copy_reference_failure.test_table'::regclass AND shardstate != 1; -- in the first test, kill just in the first -- response we get from the worker SELECT citus.mitmproxy('conn.kill()'); mitmproxy --------------------------------------------------------------------- (1 row) \copy test_table FROM STDIN DELIMITER ',' ERROR: failure on connection marked as essential: localhost:xxxxx SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test_table; count --------------------------------------------------------------------- 0 (1 row) -- kill as soon as the coordinator sends begin SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) \copy test_table FROM STDIN DELIMITER ',' ERROR: failure on connection marked as essential: localhost:xxxxx SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test_table; count --------------------------------------------------------------------- 0 (1 row) -- cancel as soon as the coordinator sends begin SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- (1 row) \copy test_table FROM STDIN DELIMITER ',' ERROR: canceling statement due to user request SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test_table; count --------------------------------------------------------------------- 0 (1 row) -- kill as soon as the coordinator sends COPY command SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) \copy test_table FROM STDIN DELIMITER ',' ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx COPY test_table, line 1: "1,2" SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test_table; count --------------------------------------------------------------------- 0 (1 row) -- cancel as soon as the coordinator sends COPY command SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- (1 row) \copy test_table FROM STDIN DELIMITER ',' ERROR: canceling statement due to user request CONTEXT: COPY test_table, line 1: "1,2" SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test_table; count --------------------------------------------------------------------- 0 (1 row) -- kill as soon as the worker sends CopyComplete SELECT citus.mitmproxy('conn.onCommandComplete(command="^COPY 3").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) \copy test_table FROM STDIN DELIMITER ',' ERROR: failed to COPY to shard xxxxx on localhost:xxxxx SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test_table; count --------------------------------------------------------------------- 0 (1 row) -- cancel as soon as the coordinator sends CopyData SELECT citus.mitmproxy('conn.onCommandComplete(command="^COPY 3").cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- (1 row) \copy test_table FROM STDIN DELIMITER ',' ERROR: canceling statement due to user request SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test_table; count --------------------------------------------------------------------- 0 (1 row) -- kill the connection when we try to start the COPY -- the query should abort SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); mitmproxy --------------------------------------------------------------------- (1 row) \copy test_table FROM STDIN DELIMITER ',' ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx COPY test_table, line 1: "1,2" SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test_table; count --------------------------------------------------------------------- 0 (1 row) -- killing on PREPARE should be fine, everything should be rollbacked SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) \copy test_table FROM STDIN DELIMITER ',' ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test_table; count --------------------------------------------------------------------- 0 (1 row) -- cancelling on PREPARE should be fine, everything should be rollbacked SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- (1 row) \copy test_table FROM STDIN DELIMITER ',' ERROR: canceling statement due to user request SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test_table; count --------------------------------------------------------------------- 0 (1 row) -- killing on command complete of COMMIT PREPARE, we should see that the command succeeds -- and all the workers committed SELECT citus.mitmproxy('conn.onCommandComplete(command="^COMMIT PREPARED").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) \copy test_table FROM STDIN DELIMITER ',' SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) -- we shouldn't have any prepared transactions in the workers SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test_table; count --------------------------------------------------------------------- 3 (1 row) TRUNCATE test_table; -- kill as soon as the coordinator sends COMMIT SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) \copy test_table FROM STDIN DELIMITER ',' SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) -- Since we kill connections to one worker after commit arrives but the -- other worker connections are healthy, we cannot commit on 1 worker -- which has 1 active shard placements, but the other does. That's why -- we expect to see 1 recovered prepared transactions. SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 1 (1 row) SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test_table; count --------------------------------------------------------------------- 3 (1 row) TRUNCATE test_table; -- finally, test failing on ROLLBACK just after the coordinator -- sends the ROLLBACK so the command can be rollbacked SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) BEGIN; SET LOCAL client_min_messages TO WARNING; \copy test_table FROM STDIN DELIMITER ',' ROLLBACK; WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test_table; count --------------------------------------------------------------------- 0 (1 row) -- but now kill just after the worker sends response to -- ROLLBACK command, command should have been rollbacked -- both on the distributed table and the placements SELECT citus.mitmproxy('conn.onCommandComplete(command="^ROLLBACK").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) BEGIN; SET LOCAL client_min_messages TO WARNING; \copy test_table FROM STDIN DELIMITER ',' ROLLBACK; WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 0 (1 row) SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test_table; count --------------------------------------------------------------------- 0 (1 row) DROP SCHEMA copy_reference_failure CASCADE; SET search_path TO default;