diff --git a/src/test/regress/expected/failure_copy_on_hash.out b/src/test/regress/expected/failure_copy_on_hash.out new file mode 100644 index 000000000..d4c7d7059 --- /dev/null +++ b/src/test/regress/expected/failure_copy_on_hash.out @@ -0,0 +1,415 @@ +-- +-- Failure tests for COPY to hash distributed tables +-- +CREATE SCHEMA copy_distributed_table; +SET search_path TO 'copy_distributed_table'; +SET citus.next_shard_id TO 1710000; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +-- With one placement COPY should error out and placement should stay healthy. +SET citus.shard_replication_factor TO 1; +SET citus.shard_count to 4; +CREATE TABLE test_table(id int, value_1 int); +SELECT create_distributed_table('test_table','id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE VIEW unhealthy_shard_count AS + SELECT count(*) + FROM pg_dist_shard_placement pdsp + JOIN + pg_dist_shard pds + ON pdsp.shardid=pds.shardid + WHERE logicalrelid='copy_distributed_table.test_table'::regclass AND shardstate != 1; +-- Just kill the connection after sending the first query to the worker. +SELECT citus.mitmproxy('conn.kill()'); + mitmproxy +----------- + +(1 row) + +\COPY test_table FROM stdin delimiter ','; +WARNING: connection error: localhost:9060 +DETAIL: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: COPY test_table, line 1: "1,2" +ERROR: could not connect to any active placements +CONTEXT: COPY test_table, line 1: "1,2" +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM unhealthy_shard_count; + count +------- + 0 +(1 row) + +SELECT count(*) FROM test_table; + count +------- + 0 +(1 row) + +-- Now, kill the connection while copying the data +SELECT citus.mitmproxy('conn.onCopyData().kill()'); + mitmproxy +----------- + +(1 row) + +\COPY test_table FROM stdin delimiter ','; +ERROR: failed to COPY to shard 1710000 on localhost:9060 +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM unhealthy_shard_count; + count +------- + 0 +(1 row) + +SELECT count(*) FROM test_table; + count +------- + 0 +(1 row) + +-- Similar to the above one, but now cancel the connection +-- instead of killing it. +SELECT citus.mitmproxy('conn.onCopyData().cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +\COPY test_table FROM stdin delimiter ','; +ERROR: canceling statement due to user request +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM unhealthy_shard_count; + count +------- + 0 +(1 row) + +SELECT count(*) FROM test_table; + count +------- + 0 +(1 row) + +-- kill the connection after worker sends command complete message +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY 1").kill()'); + mitmproxy +----------- + +(1 row) + +\COPY test_table FROM stdin delimiter ','; +ERROR: failed to COPY to shard 1710002 on localhost:9060 +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM unhealthy_shard_count; + count +------- + 0 +(1 row) + +SELECT count(*) FROM test_table; + count +------- + 0 +(1 row) + +-- similar to above one, but cancel the connection on command complete +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY 1").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +\COPY test_table FROM stdin delimiter ','; +ERROR: canceling statement due to user request +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM unhealthy_shard_count; + count +------- + 0 +(1 row) + +SELECT count(*) FROM test_table; + count +------- + 0 +(1 row) + +-- kill the connection on PREPARE TRANSACTION +SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").kill()'); + mitmproxy +----------- + +(1 row) + +\COPY test_table FROM stdin delimiter ','; +ERROR: connection not open +CONTEXT: while executing command on localhost:9060 +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM unhealthy_shard_count; + count +------- + 0 +(1 row) + +SELECT count(*) FROM test_table; + count +------- + 0 +(1 row) + +-- we don't want to see the prepared transaction numbers in the warnings +SET client_min_messages TO ERROR; +-- kill on command complete on COMMIT PREPARE, command should succeed +SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT PREPARED").kill()'); + mitmproxy +----------- + +(1 row) + +\COPY test_table FROM stdin delimiter ','; +SET client_min_messages TO NOTICE; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM unhealthy_shard_count; + count +------- + 0 +(1 row) + +SELECT count(*) FROM test_table; + count +------- + 4 +(1 row) + +TRUNCATE TABLE test_table; +-- kill on ROLLBACK, command could be rollbacked +SELECT citus.mitmproxy('conn.onQuery(query="ROLLBACK").kill()'); + mitmproxy +----------- + +(1 row) + +BEGIN; +\COPY test_table FROM stdin delimiter ','; +ROLLBACK; +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM unhealthy_shard_count; + count +------- + 0 +(1 row) + +SELECT count(*) FROM test_table; + count +------- + 0 +(1 row) + +DROP TABLE test_table CASCADE; +NOTICE: drop cascades to view unhealthy_shard_count +-- With two placement, should we error out or mark untouched shard placements as inactive? +SET citus.shard_replication_factor TO 2; +CREATE TABLE test_table_2(id int, value_1 int); +SELECT create_distributed_table('test_table_2','id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT citus.mitmproxy('conn.kill()'); + mitmproxy +----------- + +(1 row) + +\COPY test_table_2 FROM stdin delimiter ','; +WARNING: connection error: localhost:9060 +DETAIL: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: COPY test_table_2, line 1: "1,2" +WARNING: connection error: localhost:9060 +DETAIL: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: COPY test_table_2, line 2: "3,4" +WARNING: connection error: localhost:9060 +DETAIL: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: COPY test_table_2, line 3: "6,7" +WARNING: connection error: localhost:9060 +DETAIL: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: COPY test_table_2, line 5: "9,10" +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate + FROM pg_dist_shard_placement as pdsd + INNER JOIN pg_dist_shard as pds + ON pdsd.shardid = pds.shardid + WHERE pds.logicalrelid = 'test_table_2'::regclass + ORDER BY shardid, nodeport; + logicalrelid | shardid | shardstate +--------------+---------+------------ + test_table_2 | 1710004 | 3 + test_table_2 | 1710004 | 1 + test_table_2 | 1710005 | 3 + test_table_2 | 1710005 | 1 + test_table_2 | 1710006 | 3 + test_table_2 | 1710006 | 1 + test_table_2 | 1710007 | 3 + test_table_2 | 1710007 | 1 +(8 rows) + +-- Create test_table_2 again to have healthy one +DROP TABLE test_table_2; +CREATE TABLE test_table_2(id int, value_1 int); +SELECT create_distributed_table('test_table_2','id'); + create_distributed_table +-------------------------- + +(1 row) + +-- Kill the connection when we try to start the COPY +-- The query should abort +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); + mitmproxy +----------- + +(1 row) + +\COPY test_table_2 FROM stdin delimiter ','; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +COPY test_table_2, line 1: "1,2" +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate + FROM pg_dist_shard_placement as pdsd + INNER JOIN pg_dist_shard as pds + ON pdsd.shardid = pds.shardid + WHERE pds.logicalrelid = 'test_table_2'::regclass + ORDER BY shardid, nodeport; + logicalrelid | shardid | shardstate +--------------+---------+------------ + test_table_2 | 1710008 | 1 + test_table_2 | 1710008 | 1 + test_table_2 | 1710009 | 1 + test_table_2 | 1710009 | 1 + test_table_2 | 1710010 | 1 + test_table_2 | 1710010 | 1 + test_table_2 | 1710011 | 1 + test_table_2 | 1710011 | 1 +(8 rows) + +-- Create test_table_2 again to have healthy one +DROP TABLE test_table_2; +CREATE TABLE test_table_2(id int, value_1 int); +SELECT create_distributed_table('test_table_2','id'); + create_distributed_table +-------------------------- + +(1 row) + +-- When kill on copying data, it will be rollbacked and placements won't be labaled as invalid. +-- Note that now we sent data to shard 210007, yet it is not marked as invalid. +-- You can check the issue about this behaviour: https://github.com/citusdata/citus/issues/1933 +SELECT citus.mitmproxy('conn.onCopyData().kill()'); + mitmproxy +----------- + +(1 row) + +\COPY test_table_2 FROM stdin delimiter ','; +ERROR: failed to COPY to shard 1710012 on localhost:9060 +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate + FROM pg_dist_shard_placement as pdsd + INNER JOIN pg_dist_shard as pds + ON pdsd.shardid = pds.shardid + WHERE pds.logicalrelid = 'test_table_2'::regclass + ORDER BY shardid, nodeport; + logicalrelid | shardid | shardstate +--------------+---------+------------ + test_table_2 | 1710012 | 1 + test_table_2 | 1710012 | 1 + test_table_2 | 1710013 | 1 + test_table_2 | 1710013 | 1 + test_table_2 | 1710014 | 1 + test_table_2 | 1710014 | 1 + test_table_2 | 1710015 | 1 + test_table_2 | 1710015 | 1 +(8 rows) + +DROP SCHEMA copy_distributed_table CASCADE; +NOTICE: drop cascades to table test_table_2 +SET search_path TO default; diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index b84f3442b..df4440b39 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -4,12 +4,12 @@ test: failure_test_helpers # this should only be run by pg_regress_multi, you don't need it test: failure_setup test: multi_test_helpers - test: failure_ddl test: failure_truncate test: failure_create_index_concurrently test: failure_add_disable_node test: failure_copy_to_reference +test: failure_copy_on_hash test: failure_1pc_copy_hash test: failure_1pc_copy_append diff --git a/src/test/regress/sql/failure_copy_on_hash.sql b/src/test/regress/sql/failure_copy_on_hash.sql new file mode 100644 index 000000000..45b54d9b5 --- /dev/null +++ b/src/test/regress/sql/failure_copy_on_hash.sql @@ -0,0 +1,213 @@ +-- +-- Failure tests for COPY to hash distributed tables +-- + +CREATE SCHEMA copy_distributed_table; +SET search_path TO 'copy_distributed_table'; +SET citus.next_shard_id TO 1710000; + +SELECT citus.mitmproxy('conn.allow()'); + +-- With one placement COPY should error out and placement should stay healthy. +SET citus.shard_replication_factor TO 1; +SET citus.shard_count to 4; + +CREATE TABLE test_table(id int, value_1 int); +SELECT create_distributed_table('test_table','id'); + +CREATE VIEW unhealthy_shard_count AS + SELECT count(*) + FROM pg_dist_shard_placement pdsp + JOIN + pg_dist_shard pds + ON pdsp.shardid=pds.shardid + WHERE logicalrelid='copy_distributed_table.test_table'::regclass AND shardstate != 1; + +-- Just kill the connection after sending the first query to the worker. +SELECT citus.mitmproxy('conn.kill()'); +\COPY test_table FROM stdin delimiter ','; +1,2 +3,4 +6,7 +8,9 +\. + +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM unhealthy_shard_count; +SELECT count(*) FROM test_table; + +-- Now, kill the connection while copying the data +SELECT citus.mitmproxy('conn.onCopyData().kill()'); +\COPY test_table FROM stdin delimiter ','; +1,2 +3,4 +6,7 +8,9 +\. + +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM unhealthy_shard_count; +SELECT count(*) FROM test_table; + +-- Similar to the above one, but now cancel the connection +-- instead of killing it. +SELECT citus.mitmproxy('conn.onCopyData().cancel(' || pg_backend_pid() || ')'); +\COPY test_table FROM stdin delimiter ','; +1,2 +3,4 +\. + +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM unhealthy_shard_count; +SELECT count(*) FROM test_table; + +-- kill the connection after worker sends command complete message +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY 1").kill()'); +\COPY test_table FROM stdin delimiter ','; +1,2 +3,4 +6,7 +8,9 +\. + +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM unhealthy_shard_count; +SELECT count(*) FROM test_table; + +-- similar to above one, but cancel the connection on command complete +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY 1").cancel(' || pg_backend_pid() || ')'); +\COPY test_table FROM stdin delimiter ','; +1,2 +3,4 +6,7 +8,9 +\. + +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM unhealthy_shard_count; +SELECT count(*) FROM test_table; + +-- kill the connection on PREPARE TRANSACTION +SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").kill()'); +\COPY test_table FROM stdin delimiter ','; +1,2 +3,4 +\. + +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM unhealthy_shard_count; +SELECT count(*) FROM test_table; + +-- we don't want to see the prepared transaction numbers in the warnings +SET client_min_messages TO ERROR; + +-- kill on command complete on COMMIT PREPARE, command should succeed +SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT PREPARED").kill()'); +\COPY test_table FROM stdin delimiter ','; +1,2 +3,4 +6,7 +8,9 +\. + +SET client_min_messages TO NOTICE; + +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM unhealthy_shard_count; +SELECT count(*) FROM test_table; + +TRUNCATE TABLE test_table; + +-- kill on ROLLBACK, command could be rollbacked +SELECT citus.mitmproxy('conn.onQuery(query="ROLLBACK").kill()'); +BEGIN; +\COPY test_table FROM stdin delimiter ','; +1,2 +3,4 +\. +ROLLBACK; + +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM unhealthy_shard_count; +SELECT count(*) FROM test_table; +DROP TABLE test_table CASCADE; + +-- With two placement, should we error out or mark untouched shard placements as inactive? +SET citus.shard_replication_factor TO 2; + +CREATE TABLE test_table_2(id int, value_1 int); +SELECT create_distributed_table('test_table_2','id'); + +SELECT citus.mitmproxy('conn.kill()'); + +\COPY test_table_2 FROM stdin delimiter ','; +1,2 +3,4 +6,7 +8,9 +9,10 +11,12 +13,14 +\. + +SELECT citus.mitmproxy('conn.allow()'); +SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate + FROM pg_dist_shard_placement as pdsd + INNER JOIN pg_dist_shard as pds + ON pdsd.shardid = pds.shardid + WHERE pds.logicalrelid = 'test_table_2'::regclass + ORDER BY shardid, nodeport; + +-- Create test_table_2 again to have healthy one +DROP TABLE test_table_2; +CREATE TABLE test_table_2(id int, value_1 int); +SELECT create_distributed_table('test_table_2','id'); + +-- Kill the connection when we try to start the COPY +-- The query should abort +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); + +\COPY test_table_2 FROM stdin delimiter ','; +1,2 +3,4 +6,7 +8,9 +\. + +SELECT citus.mitmproxy('conn.allow()'); +SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate + FROM pg_dist_shard_placement as pdsd + INNER JOIN pg_dist_shard as pds + ON pdsd.shardid = pds.shardid + WHERE pds.logicalrelid = 'test_table_2'::regclass + ORDER BY shardid, nodeport; + +-- Create test_table_2 again to have healthy one +DROP TABLE test_table_2; +CREATE TABLE test_table_2(id int, value_1 int); +SELECT create_distributed_table('test_table_2','id'); + +-- When kill on copying data, it will be rollbacked and placements won't be labaled as invalid. +-- Note that now we sent data to shard 210007, yet it is not marked as invalid. +-- You can check the issue about this behaviour: https://github.com/citusdata/citus/issues/1933 +SELECT citus.mitmproxy('conn.onCopyData().kill()'); +\COPY test_table_2 FROM stdin delimiter ','; +1,2 +3,4 +6,7 +8,9 +9,10 +11,12 +13,14 +\. + +SELECT citus.mitmproxy('conn.allow()'); +SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate + FROM pg_dist_shard_placement as pdsd + INNER JOIN pg_dist_shard as pds + ON pdsd.shardid = pds.shardid + WHERE pds.logicalrelid = 'test_table_2'::regclass + ORDER BY shardid, nodeport; + +DROP SCHEMA copy_distributed_table CASCADE; +SET search_path TO default;