mirror of https://github.com/citusdata/citus.git
failure tests on create_distributed_table nonempty
parent
dc55498f80
commit
c1f7631f98
|
@ -0,0 +1,977 @@
|
|||
--
|
||||
-- Failure tests for COPY to reference tables
|
||||
--
|
||||
CREATE SCHEMA create_distributed_table_non_empty_failure;
|
||||
SET search_path TO 'create_distributed_table_non_empty_failure';
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- we'll start with replication factor 1 and 2pc
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count to 4;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
-- in the first test, kill the first connection we sent from the coordinator
|
||||
SELECT citus.mitmproxy('conn.kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: connection error: localhost:9060
|
||||
DETAIL: server closed the connection unexpectedly
|
||||
This probably means the server terminated abnormally
|
||||
before or while processing the request.
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- in the first test, cancel the first connection we sent from the coordinator
|
||||
SELECT citus.mitmproxy('conn.cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: canceling statement due to user request
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- kill as soon as the coordinator sends CREATE SCHEMA
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^CREATE SCHEMA").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: server closed the connection unexpectedly
|
||||
This probably means the server terminated abnormally
|
||||
before or while processing the request.
|
||||
CONTEXT: while executing command on localhost:9060
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$);
|
||||
run_command_on_workers
|
||||
------------------------
|
||||
(localhost,9060,t,0)
|
||||
(localhost,57637,t,1)
|
||||
(2 rows)
|
||||
|
||||
-- cancel as soon as the coordinator sends CREATE SCHEMA
|
||||
-- Note: Schema should be created in workers because Citus
|
||||
-- does not check for interrupts until GetRemoteCommandResult is called.
|
||||
-- Since we already sent the command at this stage, the schemas get created in workers
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^CREATE SCHEMA").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: canceling statement due to user request
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$);
|
||||
run_command_on_workers
|
||||
------------------------
|
||||
(localhost,9060,t,1)
|
||||
(localhost,57637,t,1)
|
||||
(2 rows)
|
||||
|
||||
SELECT run_command_on_workers($$DROP SCHEMA IF EXISTS create_distributed_table_non_empty_failure$$);
|
||||
run_command_on_workers
|
||||
-----------------------------------
|
||||
(localhost,9060,t,"DROP SCHEMA")
|
||||
(localhost,57637,t,"DROP SCHEMA")
|
||||
(2 rows)
|
||||
|
||||
-- kill as soon as the coordinator sends begin
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
WARNING: connection not open
|
||||
CONTEXT: while executing command on localhost:9060
|
||||
ERROR: connection error: localhost:9060
|
||||
DETAIL: connection not open
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$);
|
||||
run_command_on_workers
|
||||
------------------------
|
||||
(localhost,9060,t,1)
|
||||
(localhost,57637,t,1)
|
||||
(2 rows)
|
||||
|
||||
-- cancel as soon as the coordinator sends begin
|
||||
-- shards will be created because we ignore cancel requests during the shard creation
|
||||
-- Interrupts are hold in CreateShardsWithRoundRobinPolicy
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
WARNING: cancel requests are ignored during shard creation
|
||||
NOTICE: Copying data from local table...
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
4
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$);
|
||||
run_command_on_workers
|
||||
------------------------
|
||||
(localhost,9060,t,1)
|
||||
(localhost,57637,t,1)
|
||||
(2 rows)
|
||||
|
||||
DROP TABLE test_table ;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
-- kill as soon as the coordinator sends CREATE TABLE
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: server closed the connection unexpectedly
|
||||
This probably means the server terminated abnormally
|
||||
before or while processing the request.
|
||||
CONTEXT: while executing command on localhost:9060
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- kill as soon as the coordinator sends COPY
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: server closed the connection unexpectedly
|
||||
This probably means the server terminated abnormally
|
||||
before or while processing the request.
|
||||
CONTEXT: while executing command on localhost:9060
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- kill when the COPY is completed, it should be rollbacked properly
|
||||
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
NOTICE: Copying data from local table...
|
||||
ERROR: failed to COPY to shard 102052 on localhost:9060
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- cancel as soon as the coordinator sends COPY, table
|
||||
-- should not be created and rollbacked properly
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: canceling statement due to user request
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- cancel when the COPY is completed, it should be rollbacked properly
|
||||
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
NOTICE: Copying data from local table...
|
||||
ERROR: canceling statement due to user request
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- immediately kill when we see prepare transaction to see if the command
|
||||
-- successfully rollbacked the created shards
|
||||
-- we don't want to see the prepared transaction numbers in the warnings
|
||||
SET client_min_messages TO ERROR;
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: connection not open
|
||||
CONTEXT: while executing command on localhost:9060
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- immediately cancel when we see prepare transaction to see if the command
|
||||
-- successfully rollbacked the created shards
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: canceling statement due to user request
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT recover_prepared_transactions();
|
||||
recover_prepared_transactions
|
||||
-------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- kill as soon as the coordinator sends COMMIT
|
||||
-- shards should be created and kill should not affect
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT PREPARED").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
4
|
||||
(1 row)
|
||||
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT recover_prepared_transactions();
|
||||
recover_prepared_transactions
|
||||
-------------------------------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
DROP TABLE test_table ;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
-- cancel as soon as the coordinator sends COMMIT
|
||||
-- shards should be created and kill should not affect
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT PREPARED").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
4
|
||||
(1 row)
|
||||
|
||||
DROP TABLE test_table ;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
-- kill as soon as the coordinator sends ROLLBACK
|
||||
-- the command can be rollbacked
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- cancel as soon as the coordinator sends ROLLBACK
|
||||
-- should be rollbacked
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- We are done with pure create_distributed_table testing and now
|
||||
-- testing for co-located tables.
|
||||
CREATE TABLE colocated_table(id int, value_1 int);
|
||||
SELECT create_distributed_table('colocated_table', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Now, cancel the connection just after transaction is opened on
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
ERROR: canceling statement due to user request
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Now, kill the connection just after transaction is opened on
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
ERROR: connection error: localhost:9060
|
||||
DETAIL: connection not open
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$);
|
||||
run_command_on_workers
|
||||
------------------------
|
||||
(localhost,9060,t,0)
|
||||
(localhost,57637,t,0)
|
||||
(2 rows)
|
||||
|
||||
-- Now, cancel the connection just after the COPY started to
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
ERROR: canceling statement due to user request
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Now, kill the connection just after the COPY started to
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
ERROR: server closed the connection unexpectedly
|
||||
This probably means the server terminated abnormally
|
||||
before or while processing the request.
|
||||
CONTEXT: while executing command on localhost:9060
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$);
|
||||
run_command_on_workers
|
||||
------------------------
|
||||
(localhost,9060,t,0)
|
||||
(localhost,57637,t,0)
|
||||
(2 rows)
|
||||
|
||||
-- Now, cancel the connection when we issue CREATE TABLE on
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_apply_shard_ddl_command").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
ERROR: canceling statement due to user request
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Now, kill the connection when we issue CREATE TABLE on
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_apply_shard_ddl_command").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
ERROR: server closed the connection unexpectedly
|
||||
This probably means the server terminated abnormally
|
||||
before or while processing the request.
|
||||
CONTEXT: while executing command on localhost:9060
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$);
|
||||
run_command_on_workers
|
||||
------------------------
|
||||
(localhost,9060,t,0)
|
||||
(localhost,57637,t,0)
|
||||
(2 rows)
|
||||
|
||||
-- Now run the same tests with 1pc
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
DROP TABLE colocated_table;
|
||||
DROP TABLE test_table;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
SET citus.multi_shard_commit_protocol TO '1pc';
|
||||
SELECT citus.mitmproxy('conn.kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: connection error: localhost:9060
|
||||
DETAIL: server closed the connection unexpectedly
|
||||
This probably means the server terminated abnormally
|
||||
before or while processing the request.
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$);
|
||||
run_command_on_workers
|
||||
------------------------
|
||||
(localhost,9060,t,0)
|
||||
(localhost,57637,t,0)
|
||||
(2 rows)
|
||||
|
||||
-- in the first test, cancel the first connection we sent from the coordinator
|
||||
SELECT citus.mitmproxy('conn.cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: canceling statement due to user request
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$);
|
||||
run_command_on_workers
|
||||
------------------------
|
||||
(localhost,9060,t,0)
|
||||
(localhost,57637,t,0)
|
||||
(2 rows)
|
||||
|
||||
-- kill as soon as the coordinator sends begin
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: connection error: localhost:9060
|
||||
DETAIL: connection not open
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$);
|
||||
run_command_on_workers
|
||||
------------------------
|
||||
(localhost,9060,t,1)
|
||||
(localhost,57637,t,1)
|
||||
(2 rows)
|
||||
|
||||
-- cancel as soon as the coordinator sends begin
|
||||
-- shards will be created because we ignore cancel requests during the shard creation
|
||||
-- Interrupts are hold in CreateShardsWithRoundRobinPolicy
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
4
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$);
|
||||
run_command_on_workers
|
||||
------------------------
|
||||
(localhost,9060,t,1)
|
||||
(localhost,57637,t,1)
|
||||
(2 rows)
|
||||
|
||||
DROP TABLE test_table ;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
-- kill as soon as the coordinator sends CREATE TABLE
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: server closed the connection unexpectedly
|
||||
This probably means the server terminated abnormally
|
||||
before or while processing the request.
|
||||
CONTEXT: while executing command on localhost:9060
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- kill as soon as the coordinator sends COPY
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: server closed the connection unexpectedly
|
||||
This probably means the server terminated abnormally
|
||||
before or while processing the request.
|
||||
CONTEXT: while executing command on localhost:9060
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- kill when the COPY is completed, it should be rollbacked properly
|
||||
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: failed to COPY to shard 102132 on localhost:9060
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- cancel as soon as the coordinator sends COPY, table
|
||||
-- should not be created and rollbacked properly
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: canceling statement due to user request
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- cancel when the COPY is completed, it should be rollbacked properly
|
||||
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ERROR: canceling statement due to user request
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- kill as soon as the coordinator sends ROLLBACK
|
||||
-- the command can be rollbacked
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- cancel as soon as the coordinator sends ROLLBACK
|
||||
-- should be rollbacked
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- kill as soon as the coordinator sends COMMIT
|
||||
-- the command can be COMMITed
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
4
|
||||
(1 row)
|
||||
|
||||
DROP TABLE test_table;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
-- cancel as soon as the coordinator sends COMMIT
|
||||
-- should be COMMITed
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
4
|
||||
(1 row)
|
||||
|
||||
DROP TABLE test_table;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
CREATE TABLE colocated_table(id int, value_1 int);
|
||||
SELECT create_distributed_table('colocated_table', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Now, cancel the connection just after transaction is opened on
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
ERROR: canceling statement due to user request
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Now, kill the connection just after transaction is opened on
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
ERROR: connection error: localhost:9060
|
||||
DETAIL: connection not open
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Now, cancel the connection just after the COPY started to
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
ERROR: canceling statement due to user request
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Now, kill the connection just after the COPY started to
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
ERROR: server closed the connection unexpectedly
|
||||
This probably means the server terminated abnormally
|
||||
before or while processing the request.
|
||||
CONTEXT: while executing command on localhost:9060
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$);
|
||||
run_command_on_workers
|
||||
------------------------
|
||||
(localhost,9060,t,0)
|
||||
(localhost,57637,t,0)
|
||||
(2 rows)
|
||||
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
||||
(1 row)
|
||||
|
||||
DROP SCHEMA create_distributed_table_non_empty_failure CASCADE;
|
|
@ -4,6 +4,7 @@ test: failure_test_helpers
|
|||
# this should only be run by pg_regress_multi, you don't need it
|
||||
test: failure_setup
|
||||
test: multi_test_helpers
|
||||
|
||||
test: failure_ddl
|
||||
test: failure_truncate
|
||||
test: failure_create_index_concurrently
|
||||
|
@ -11,6 +12,7 @@ test: failure_add_disable_node
|
|||
test: failure_copy_to_reference
|
||||
test: failure_copy_on_hash
|
||||
test: failure_create_reference_table
|
||||
test: failure_create_distributed_table_non_empty
|
||||
|
||||
test: failure_1pc_copy_hash
|
||||
test: failure_1pc_copy_append
|
||||
|
|
|
@ -0,0 +1,332 @@
|
|||
--
|
||||
-- Failure tests for COPY to reference tables
|
||||
--
|
||||
CREATE SCHEMA create_distributed_table_non_empty_failure;
|
||||
SET search_path TO 'create_distributed_table_non_empty_failure';
|
||||
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
|
||||
-- we'll start with replication factor 1 and 2pc
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count to 4;
|
||||
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
|
||||
-- in the first test, kill the first connection we sent from the coordinator
|
||||
SELECT citus.mitmproxy('conn.kill()');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- in the first test, cancel the first connection we sent from the coordinator
|
||||
SELECT citus.mitmproxy('conn.cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- kill as soon as the coordinator sends CREATE SCHEMA
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^CREATE SCHEMA").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$);
|
||||
|
||||
-- cancel as soon as the coordinator sends CREATE SCHEMA
|
||||
-- Note: Schema should be created in workers because Citus
|
||||
-- does not check for interrupts until GetRemoteCommandResult is called.
|
||||
-- Since we already sent the command at this stage, the schemas get created in workers
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^CREATE SCHEMA").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$);
|
||||
SELECT run_command_on_workers($$DROP SCHEMA IF EXISTS create_distributed_table_non_empty_failure$$);
|
||||
|
||||
-- kill as soon as the coordinator sends begin
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$);
|
||||
|
||||
-- cancel as soon as the coordinator sends begin
|
||||
-- shards will be created because we ignore cancel requests during the shard creation
|
||||
-- Interrupts are hold in CreateShardsWithRoundRobinPolicy
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$);
|
||||
DROP TABLE test_table ;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
|
||||
-- kill as soon as the coordinator sends CREATE TABLE
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- kill as soon as the coordinator sends COPY
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- kill when the COPY is completed, it should be rollbacked properly
|
||||
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- cancel as soon as the coordinator sends COPY, table
|
||||
-- should not be created and rollbacked properly
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- cancel when the COPY is completed, it should be rollbacked properly
|
||||
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- immediately kill when we see prepare transaction to see if the command
|
||||
-- successfully rollbacked the created shards
|
||||
-- we don't want to see the prepared transaction numbers in the warnings
|
||||
SET client_min_messages TO ERROR;
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- immediately cancel when we see prepare transaction to see if the command
|
||||
-- successfully rollbacked the created shards
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT recover_prepared_transactions();
|
||||
|
||||
-- kill as soon as the coordinator sends COMMIT
|
||||
-- shards should be created and kill should not affect
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT PREPARED").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
|
||||
SELECT recover_prepared_transactions();
|
||||
DROP TABLE test_table ;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
|
||||
-- cancel as soon as the coordinator sends COMMIT
|
||||
-- shards should be created and kill should not affect
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT PREPARED").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
DROP TABLE test_table ;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
|
||||
-- kill as soon as the coordinator sends ROLLBACK
|
||||
-- the command can be rollbacked
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").kill()');
|
||||
BEGIN;
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ROLLBACK;
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- cancel as soon as the coordinator sends ROLLBACK
|
||||
-- should be rollbacked
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").cancel(' || pg_backend_pid() || ')');
|
||||
BEGIN;
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ROLLBACK;
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- We are done with pure create_distributed_table testing and now
|
||||
-- testing for co-located tables.
|
||||
CREATE TABLE colocated_table(id int, value_1 int);
|
||||
SELECT create_distributed_table('colocated_table', 'id');
|
||||
|
||||
-- Now, cancel the connection just after transaction is opened on
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- Now, kill the connection just after transaction is opened on
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$);
|
||||
|
||||
-- Now, cancel the connection just after the COPY started to
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- Now, kill the connection just after the COPY started to
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$);
|
||||
|
||||
-- Now, cancel the connection when we issue CREATE TABLE on
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_apply_shard_ddl_command").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- Now, kill the connection when we issue CREATE TABLE on
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_apply_shard_ddl_command").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$);
|
||||
|
||||
-- Now run the same tests with 1pc
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
DROP TABLE colocated_table;
|
||||
DROP TABLE test_table;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
SET citus.multi_shard_commit_protocol TO '1pc';
|
||||
|
||||
SELECT citus.mitmproxy('conn.kill()');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$);
|
||||
|
||||
-- in the first test, cancel the first connection we sent from the coordinator
|
||||
SELECT citus.mitmproxy('conn.cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$);
|
||||
|
||||
-- kill as soon as the coordinator sends begin
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$);
|
||||
|
||||
-- cancel as soon as the coordinator sends begin
|
||||
-- shards will be created because we ignore cancel requests during the shard creation
|
||||
-- Interrupts are hold in CreateShardsWithRoundRobinPolicy
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.schemata WHERE schema_name = 'create_distributed_table_non_empty_failure'$$);
|
||||
DROP TABLE test_table ;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
|
||||
-- kill as soon as the coordinator sends CREATE TABLE
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- kill as soon as the coordinator sends COPY
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- kill when the COPY is completed, it should be rollbacked properly
|
||||
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- cancel as soon as the coordinator sends COPY, table
|
||||
-- should not be created and rollbacked properly
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- cancel when the COPY is completed, it should be rollbacked properly
|
||||
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- kill as soon as the coordinator sends ROLLBACK
|
||||
-- the command can be rollbacked
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").kill()');
|
||||
BEGIN;
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ROLLBACK;
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- cancel as soon as the coordinator sends ROLLBACK
|
||||
-- should be rollbacked
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").cancel(' || pg_backend_pid() || ')');
|
||||
BEGIN;
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
ROLLBACK;
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- kill as soon as the coordinator sends COMMIT
|
||||
-- the command can be COMMITed
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()');
|
||||
BEGIN;
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
COMMIT;
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
DROP TABLE test_table;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
|
||||
-- cancel as soon as the coordinator sends COMMIT
|
||||
-- should be COMMITed
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").cancel(' || pg_backend_pid() || ')');
|
||||
BEGIN;
|
||||
SELECT create_distributed_table('test_table', 'id');
|
||||
COMMIT;
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
DROP TABLE test_table;
|
||||
CREATE TABLE test_table(id int, value_1 int);
|
||||
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
|
||||
|
||||
CREATE TABLE colocated_table(id int, value_1 int);
|
||||
SELECT create_distributed_table('colocated_table', 'id');
|
||||
|
||||
-- Now, cancel the connection just after transaction is opened on
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- Now, kill the connection just after transaction is opened on
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- Now, cancel the connection just after the COPY started to
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')');
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
|
||||
-- Now, kill the connection just after the COPY started to
|
||||
-- workers. Note that, when there is a colocated table, interrupts
|
||||
-- are not held and we can cancel in the middle of the execution
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||
SELECT create_distributed_table('test_table', 'id', colocate_with => 'colocated_table');
|
||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'create_distributed_table_non_empty_failure' and table_name LIKE 'test_table%'$$);
|
||||
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
DROP SCHEMA create_distributed_table_non_empty_failure CASCADE;
|
Loading…
Reference in New Issue