diff --git a/src/test/regress/expected/failure_create_distributed_table_non_empty.out b/src/test/regress/expected/failure_create_distributed_table_non_empty.out new file mode 100644 index 000000000..a5135271c --- /dev/null +++ b/src/test/regress/expected/failure_create_distributed_table_non_empty.out @@ -0,0 +1,977 @@ +-- +-- Failure tests for COPY to reference tables +-- +CREATE SCHEMA create_distributed_table_non_empty_failure; +SET search_path TO 'create_distributed_table_non_empty_failure'; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +-- we'll start with replication factor 1 and 2pc +SET citus.shard_replication_factor TO 1; +SET citus.shard_count to 4; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); +-- in the first test, kill the first connection we sent from the coordinator +SELECT citus.mitmproxy('conn.kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: connection error: localhost:9060 +DETAIL: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- in the first test, cancel the first connection we sent from the coordinator +SELECT citus.mitmproxy('conn.cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: canceling statement due to user request +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- kill as soon as the coordinator sends CREATE SCHEMA +SELECT citus.mitmproxy('conn.onQuery(query="^CREATE SCHEMA").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$); + run_command_on_workers +------------------------ + (localhost,9060,t,0) + (localhost,57637,t,1) +(2 rows) + +-- cancel as soon as the coordinator sends CREATE SCHEMA +-- Note: Schema should be created in workers because Citus +-- does not check for interrupts until GetRemoteCommandResult is called. +-- Since we already sent the command at this stage, the schemas get created in workers +SELECT citus.mitmproxy('conn.onQuery(query="^CREATE SCHEMA").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: canceling statement due to user request +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$); + run_command_on_workers +------------------------ + (localhost,9060,t,1) + (localhost,57637,t,1) +(2 rows) + +SELECT run_command_on_workers($$DROP SCHEMA IF EXISTS create_distributed_table_non_empty_failure$$); + run_command_on_workers +----------------------------------- + (localhost,9060,t,"DROP SCHEMA") + (localhost,57637,t,"DROP SCHEMA") +(2 rows) + +-- kill as soon as the coordinator sends begin +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +ERROR: connection error: localhost:9060 +DETAIL: connection not open +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$); + run_command_on_workers +------------------------ + (localhost,9060,t,1) + (localhost,57637,t,1) +(2 rows) + +-- cancel as soon as the coordinator sends begin +-- shards will be created because we ignore cancel requests during the shard creation +-- Interrupts are hold in CreateShardsWithRoundRobinPolicy +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +WARNING: cancel requests are ignored during shard creation +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 4 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$); + run_command_on_workers +------------------------ + (localhost,9060,t,1) + (localhost,57637,t,1) +(2 rows) + +DROP TABLE test_table ; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); +-- kill as soon as the coordinator sends CREATE TABLE +SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- kill as soon as the coordinator sends COPY +SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- kill when the COPY is completed, it should be rollbacked properly +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +NOTICE: Copying data from local table... +ERROR: failed to COPY to shard 102052 on localhost:9060 +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- cancel as soon as the coordinator sends COPY, table +-- should not be created and rollbacked properly +SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: canceling statement due to user request +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- cancel when the COPY is completed, it should be rollbacked properly +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +NOTICE: Copying data from local table... +ERROR: canceling statement due to user request +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- immediately kill when we see prepare transaction to see if the command +-- successfully rollbacked the created shards +-- we don't want to see the prepared transaction numbers in the warnings +SET client_min_messages TO ERROR; +SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: connection not open +CONTEXT: while executing command on localhost:9060 +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- immediately cancel when we see prepare transaction to see if the command +-- successfully rollbacked the created shards +SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: canceling statement due to user request +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 1 +(1 row) + +-- kill as soon as the coordinator sends COMMIT +-- shards should be created and kill should not affect +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT PREPARED").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 4 +(1 row) + +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 2 +(1 row) + +DROP TABLE test_table ; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); +-- cancel as soon as the coordinator sends COMMIT +-- shards should be created and kill should not affect +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT PREPARED").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 4 +(1 row) + +DROP TABLE test_table ; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); +-- kill as soon as the coordinator sends ROLLBACK +-- the command can be rollbacked +SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").kill()'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SELECT create_distributed_table('test_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +ROLLBACK; +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- cancel as soon as the coordinator sends ROLLBACK +-- should be rollbacked +SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SELECT create_distributed_table('test_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +ROLLBACK; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- We are done with pure create_distributed_table testing and now +-- testing for co-located tables. +CREATE TABLE colocated_table(id int, value_1 int); +SELECT create_distributed_table('colocated_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- Now, cancel the connection just after transaction is opened on +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +ERROR: canceling statement due to user request +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- Now, kill the connection just after transaction is opened on +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +ERROR: connection error: localhost:9060 +DETAIL: connection not open +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$); + run_command_on_workers +------------------------ + (localhost,9060,t,0) + (localhost,57637,t,0) +(2 rows) + +-- Now, cancel the connection just after the COPY started to +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +ERROR: canceling statement due to user request +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- Now, kill the connection just after the COPY started to +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$); + run_command_on_workers +------------------------ + (localhost,9060,t,0) + (localhost,57637,t,0) +(2 rows) + +-- Now, cancel the connection when we issue CREATE TABLE on +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_apply_shard_ddl_command").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +ERROR: canceling statement due to user request +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- Now, kill the connection when we issue CREATE TABLE on +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_apply_shard_ddl_command").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$); + run_command_on_workers +------------------------ + (localhost,9060,t,0) + (localhost,57637,t,0) +(2 rows) + +-- Now run the same tests with 1pc +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +DROP TABLE colocated_table; +DROP TABLE test_table; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); +SET citus.multi_shard_commit_protocol TO '1pc'; +SELECT citus.mitmproxy('conn.kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: connection error: localhost:9060 +DETAIL: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$); + run_command_on_workers +------------------------ + (localhost,9060,t,0) + (localhost,57637,t,0) +(2 rows) + +-- in the first test, cancel the first connection we sent from the coordinator +SELECT citus.mitmproxy('conn.cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: canceling statement due to user request +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$); + run_command_on_workers +------------------------ + (localhost,9060,t,0) + (localhost,57637,t,0) +(2 rows) + +-- kill as soon as the coordinator sends begin +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: connection error: localhost:9060 +DETAIL: connection not open +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$); + run_command_on_workers +------------------------ + (localhost,9060,t,1) + (localhost,57637,t,1) +(2 rows) + +-- cancel as soon as the coordinator sends begin +-- shards will be created because we ignore cancel requests during the shard creation +-- Interrupts are hold in CreateShardsWithRoundRobinPolicy +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 4 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$); + run_command_on_workers +------------------------ + (localhost,9060,t,1) + (localhost,57637,t,1) +(2 rows) + +DROP TABLE test_table ; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); +-- kill as soon as the coordinator sends CREATE TABLE +SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- kill as soon as the coordinator sends COPY +SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- kill when the COPY is completed, it should be rollbacked properly +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: failed to COPY to shard 102132 on localhost:9060 +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- cancel as soon as the coordinator sends COPY, table +-- should not be created and rollbacked properly +SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: canceling statement due to user request +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- cancel when the COPY is completed, it should be rollbacked properly +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id'); +ERROR: canceling statement due to user request +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- kill as soon as the coordinator sends ROLLBACK +-- the command can be rollbacked +SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").kill()'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SELECT create_distributed_table('test_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +ROLLBACK; +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- cancel as soon as the coordinator sends ROLLBACK +-- should be rollbacked +SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SELECT create_distributed_table('test_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +ROLLBACK; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- kill as soon as the coordinator sends COMMIT +-- the command can be COMMITed +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SELECT create_distributed_table('test_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 4 +(1 row) + +DROP TABLE test_table; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); +-- cancel as soon as the coordinator sends COMMIT +-- should be COMMITed +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +BEGIN; +SELECT create_distributed_table('test_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +COMMIT; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 4 +(1 row) + +DROP TABLE test_table; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); +CREATE TABLE colocated_table(id int, value_1 int); +SELECT create_distributed_table('colocated_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- Now, cancel the connection just after transaction is opened on +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +ERROR: canceling statement due to user request +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- Now, kill the connection just after transaction is opened on +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +ERROR: connection error: localhost:9060 +DETAIL: connection not open +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- Now, cancel the connection just after the COPY started to +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +ERROR: canceling statement due to user request +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +-- Now, kill the connection just after the COPY started to +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + count +------- + 0 +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$); + run_command_on_workers +------------------------ + (localhost,9060,t,0) + (localhost,57637,t,0) +(2 rows) + +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +DROP SCHEMA create_distributed_table_non_empty_failure CASCADE; diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index 0c31c3f93..180897a0c 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -4,6 +4,7 @@ test: failure_test_helpers # this should only be run by pg_regress_multi, you don't need it test: failure_setup test: multi_test_helpers + test: failure_ddl test: failure_truncate test: failure_create_index_concurrently @@ -11,6 +12,7 @@ test: failure_add_disable_node test: failure_copy_to_reference test: failure_copy_on_hash test: failure_create_reference_table +test: failure_create_distributed_table_non_empty test: failure_1pc_copy_hash test: failure_1pc_copy_append diff --git a/src/test/regress/sql/failure_create_distributed_table_non_empty.sql b/src/test/regress/sql/failure_create_distributed_table_non_empty.sql new file mode 100644 index 000000000..ca0acb74c --- /dev/null +++ b/src/test/regress/sql/failure_create_distributed_table_non_empty.sql @@ -0,0 +1,332 @@ +-- +-- Failure tests for COPY to reference tables +-- +CREATE SCHEMA create_distributed_table_non_empty_failure; +SET search_path TO 'create_distributed_table_non_empty_failure'; + +SELECT citus.mitmproxy('conn.allow()'); + +-- we'll start with replication factor 1 and 2pc +SET citus.shard_replication_factor TO 1; +SET citus.shard_count to 4; + +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); + +-- in the first test, kill the first connection we sent from the coordinator +SELECT citus.mitmproxy('conn.kill()'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- in the first test, cancel the first connection we sent from the coordinator +SELECT citus.mitmproxy('conn.cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- kill as soon as the coordinator sends CREATE SCHEMA +SELECT citus.mitmproxy('conn.onQuery(query="^CREATE SCHEMA").kill()'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$); + +-- cancel as soon as the coordinator sends CREATE SCHEMA +-- Note: Schema should be created in workers because Citus +-- does not check for interrupts until GetRemoteCommandResult is called. +-- Since we already sent the command at this stage, the schemas get created in workers +SELECT citus.mitmproxy('conn.onQuery(query="^CREATE SCHEMA").cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$); +SELECT run_command_on_workers($$DROP SCHEMA IF EXISTS create_distributed_table_non_empty_failure$$); + +-- kill as soon as the coordinator sends begin +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()'); +SELECT create_distributed_table('test_table', 'id'); +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$); + +-- cancel as soon as the coordinator sends begin +-- shards will be created because we ignore cancel requests during the shard creation +-- Interrupts are hold in CreateShardsWithRoundRobinPolicy +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id'); +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$); +DROP TABLE test_table ; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); + +-- kill as soon as the coordinator sends CREATE TABLE +SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").kill()'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- kill as soon as the coordinator sends COPY +SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- kill when the COPY is completed, it should be rollbacked properly +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").kill()'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- cancel as soon as the coordinator sends COPY, table +-- should not be created and rollbacked properly +SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- cancel when the COPY is completed, it should be rollbacked properly +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- immediately kill when we see prepare transaction to see if the command +-- successfully rollbacked the created shards +-- we don't want to see the prepared transaction numbers in the warnings +SET client_min_messages TO ERROR; +SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").kill()'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- immediately cancel when we see prepare transaction to see if the command +-- successfully rollbacked the created shards +SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +SELECT citus.mitmproxy('conn.allow()'); +SELECT recover_prepared_transactions(); + +-- kill as soon as the coordinator sends COMMIT +-- shards should be created and kill should not affect +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT PREPARED").kill()'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +SELECT citus.mitmproxy('conn.allow()'); + +SELECT recover_prepared_transactions(); +DROP TABLE test_table ; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); + +-- cancel as soon as the coordinator sends COMMIT +-- shards should be created and kill should not affect +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT PREPARED").cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id'); +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +DROP TABLE test_table ; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); + +-- kill as soon as the coordinator sends ROLLBACK +-- the command can be rollbacked +SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").kill()'); +BEGIN; +SELECT create_distributed_table('test_table', 'id'); +ROLLBACK; +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- cancel as soon as the coordinator sends ROLLBACK +-- should be rollbacked +SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").cancel(' || pg_backend_pid() || ')'); +BEGIN; +SELECT create_distributed_table('test_table', 'id'); +ROLLBACK; +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- We are done with pure create_distributed_table testing and now +-- testing for co-located tables. +CREATE TABLE colocated_table(id int, value_1 int); +SELECT create_distributed_table('colocated_table', 'id'); + +-- Now, cancel the connection just after transaction is opened on +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- Now, kill the connection just after transaction is opened on +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()'); +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$); + +-- Now, cancel the connection just after the COPY started to +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- Now, kill the connection just after the COPY started to +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$); + +-- Now, cancel the connection when we issue CREATE TABLE on +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_apply_shard_ddl_command").cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- Now, kill the connection when we issue CREATE TABLE on +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_apply_shard_ddl_command").kill()'); +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$); + +-- Now run the same tests with 1pc +SELECT citus.mitmproxy('conn.allow()'); +DROP TABLE colocated_table; +DROP TABLE test_table; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); +SET citus.multi_shard_commit_protocol TO '1pc'; + +SELECT citus.mitmproxy('conn.kill()'); +SELECT create_distributed_table('test_table', 'id'); +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$); + +-- in the first test, cancel the first connection we sent from the coordinator +SELECT citus.mitmproxy('conn.cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id'); +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$); + +-- kill as soon as the coordinator sends begin +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()'); +SELECT create_distributed_table('test_table', 'id'); +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$); + +-- cancel as soon as the coordinator sends begin +-- shards will be created because we ignore cancel requests during the shard creation +-- Interrupts are hold in CreateShardsWithRoundRobinPolicy +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id'); +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$); +DROP TABLE test_table ; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); + +-- kill as soon as the coordinator sends CREATE TABLE +SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").kill()'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- kill as soon as the coordinator sends COPY +SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- kill when the COPY is completed, it should be rollbacked properly +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").kill()'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- cancel as soon as the coordinator sends COPY, table +-- should not be created and rollbacked properly +SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- cancel when the COPY is completed, it should be rollbacked properly +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- kill as soon as the coordinator sends ROLLBACK +-- the command can be rollbacked +SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").kill()'); +BEGIN; +SELECT create_distributed_table('test_table', 'id'); +ROLLBACK; +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- cancel as soon as the coordinator sends ROLLBACK +-- should be rollbacked +SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").cancel(' || pg_backend_pid() || ')'); +BEGIN; +SELECT create_distributed_table('test_table', 'id'); +ROLLBACK; +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- kill as soon as the coordinator sends COMMIT +-- the command can be COMMITed +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()'); +BEGIN; +SELECT create_distributed_table('test_table', 'id'); +COMMIT; +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +DROP TABLE test_table; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); + +-- cancel as soon as the coordinator sends COMMIT +-- should be COMMITed +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").cancel(' || pg_backend_pid() || ')'); +BEGIN; +SELECT create_distributed_table('test_table', 'id'); +COMMIT; +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +DROP TABLE test_table; +CREATE TABLE test_table(id int, value_1 int); +INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); + +CREATE TABLE colocated_table(id int, value_1 int); +SELECT create_distributed_table('colocated_table', 'id'); + +-- Now, cancel the connection just after transaction is opened on +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- Now, kill the connection just after transaction is opened on +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()'); +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- Now, cancel the connection just after the COPY started to +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')'); +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +-- Now, kill the connection just after the COPY started to +-- workers. Note that, when there is a colocated table, interrupts +-- are not held and we can cancel in the middle of the execution +SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()'); +SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table'); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; +SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$); + +SELECT citus.mitmproxy('conn.allow()'); +DROP SCHEMA create_distributed_table_non_empty_failure CASCADE; \ No newline at end of file