mirror of https://github.com/citusdata/citus.git
Merge pull request #2278 from citusdata/failure-copy-reference
Adds failure tests for COPY to reference tablepull/2319/merge
commit
6ac0434cf3
|
@ -0,0 +1,461 @@
|
||||||
|
--
|
||||||
|
-- Failure tests for COPY to reference tables
|
||||||
|
--
|
||||||
|
CREATE SCHEMA copy_reference_failure;
|
||||||
|
SET search_path TO 'copy_reference_failure';
|
||||||
|
SET citus.next_shard_id TO 130000;
|
||||||
|
-- we don't want to see the prepared transaction numbers in the warnings
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE test_table(id int, value_1 int);
|
||||||
|
SELECT create_reference_table('test_table');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE VIEW unhealthy_shard_count AS
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard_placement pdsp
|
||||||
|
JOIN
|
||||||
|
pg_dist_shard pds
|
||||||
|
ON pdsp.shardid=pds.shardid
|
||||||
|
WHERE logicalrelid='copy_reference_failure.test_table'::regclass AND shardstate != 1;
|
||||||
|
-- in the first test, kill just in the first
|
||||||
|
-- response we get from the worker
|
||||||
|
SELECT citus.mitmproxy('conn.kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
ERROR: connection error: localhost:57640
|
||||||
|
DETAIL: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: COPY test_table, line 1: "1,2"
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- kill as soon as the coordinator sends begin
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
ERROR: failure on connection marked as essential: localhost:57640
|
||||||
|
CONTEXT: COPY test_table, line 1: "1,2"
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cancel as soon as the coordinator sends begin
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
CONTEXT: COPY test_table, line 1: "1,2"
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- kill as soon as the coordinator sends COPY command
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:57640
|
||||||
|
COPY test_table, line 1: "1,2"
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cancel as soon as the coordinator sends COPY command
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
CONTEXT: COPY test_table, line 1: "1,2"
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- kill as soon as the worker sends CopyComplete
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^COPY 3").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
ERROR: failed to COPY to shard 130000 on localhost:57640
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cancel as soon as the coordinator sends CopyData
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^COPY 3").cancel(' || pg_backend_pid() || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- kill the connection when we try to start the COPY
|
||||||
|
-- the query should abort
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:57640
|
||||||
|
COPY test_table, line 1: "1,2"
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- killing on PREPARE should be fine, everything should be rollbacked
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
ERROR: connection not open
|
||||||
|
CONTEXT: while executing command on localhost:57640
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cancelling on PREPARE should be fine, everything should be rollbacked
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").cancel(' || pg_backend_pid() || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- killing on command complete of COMMIT PREPARE, we should see that the command succeeds
|
||||||
|
-- and all the workers committed
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^COMMIT PREPARED").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- we shouldn't have any prepared transactions in the workers
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
TRUNCATE test_table;
|
||||||
|
-- kill as soon as the coordinator sends COMMIT
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Since we kill connections to one worker after commit arrives but the
|
||||||
|
-- other worker connections are healthy, we cannot commit on 1 worker
|
||||||
|
-- which has 1 active shard placements, but the other does. That's why
|
||||||
|
-- we expect to see 1 recovered prepared transactions.
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
TRUNCATE test_table;
|
||||||
|
-- finally, test failing on ROLLBACK just after the coordinator
|
||||||
|
-- sends the ROLLBACK so the command can be rollbacked
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL client_min_messages TO WARNING;
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
ROLLBACK;
|
||||||
|
WARNING: connection not open
|
||||||
|
CONTEXT: while executing command on localhost:57640
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- but now kill just after the worker sends response to
|
||||||
|
-- ROLLBACK command, command should have been rollbacked
|
||||||
|
-- both on the distributed table and the placements
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^ROLLBACK").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL client_min_messages TO WARNING;
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
ROLLBACK;
|
||||||
|
WARNING: connection not open
|
||||||
|
CONTEXT: while executing command on localhost:57640
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP SCHEMA copy_reference_failure CASCADE;
|
||||||
|
SET search_path TO default;
|
|
@ -9,3 +9,4 @@ test: failure_ddl
|
||||||
test: failure_truncate
|
test: failure_truncate
|
||||||
test: failure_create_index_concurrently
|
test: failure_create_index_concurrently
|
||||||
test: failure_add_disable_node
|
test: failure_add_disable_node
|
||||||
|
test: failure_copy_to_reference
|
||||||
|
|
|
@ -0,0 +1,204 @@
|
||||||
|
--
|
||||||
|
-- Failure tests for COPY to reference tables
|
||||||
|
--
|
||||||
|
CREATE SCHEMA copy_reference_failure;
|
||||||
|
SET search_path TO 'copy_reference_failure';
|
||||||
|
SET citus.next_shard_id TO 130000;
|
||||||
|
|
||||||
|
-- we don't want to see the prepared transaction numbers in the warnings
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
CREATE TABLE test_table(id int, value_1 int);
|
||||||
|
SELECT create_reference_table('test_table');
|
||||||
|
|
||||||
|
CREATE VIEW unhealthy_shard_count AS
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard_placement pdsp
|
||||||
|
JOIN
|
||||||
|
pg_dist_shard pds
|
||||||
|
ON pdsp.shardid=pds.shardid
|
||||||
|
WHERE logicalrelid='copy_reference_failure.test_table'::regclass AND shardstate != 1;
|
||||||
|
|
||||||
|
-- in the first test, kill just in the first
|
||||||
|
-- response we get from the worker
|
||||||
|
SELECT citus.mitmproxy('conn.kill()');
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
1,2
|
||||||
|
2,3
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- kill as soon as the coordinator sends begin
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
1,2
|
||||||
|
2,3
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- cancel as soon as the coordinator sends begin
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')');
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
1,2
|
||||||
|
2,3
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- kill as soon as the coordinator sends COPY command
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
1,2
|
||||||
|
2,3
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- cancel as soon as the coordinator sends COPY command
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").cancel(' || pg_backend_pid() || ')');
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
1,2
|
||||||
|
2,3
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- kill as soon as the worker sends CopyComplete
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^COPY 3").kill()');
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
1,2
|
||||||
|
2,3
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- cancel as soon as the coordinator sends CopyData
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^COPY 3").cancel(' || pg_backend_pid() || ')');
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
1,2
|
||||||
|
2,3
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- kill the connection when we try to start the COPY
|
||||||
|
-- the query should abort
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()');
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
1,2
|
||||||
|
2,3
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- killing on PREPARE should be fine, everything should be rollbacked
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()');
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
1,2
|
||||||
|
2,3
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- cancelling on PREPARE should be fine, everything should be rollbacked
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").cancel(' || pg_backend_pid() || ')');
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
1,2
|
||||||
|
2,3
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- killing on command complete of COMMIT PREPARE, we should see that the command succeeds
|
||||||
|
-- and all the workers committed
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^COMMIT PREPARED").kill()');
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
1,2
|
||||||
|
2,3
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
-- we shouldn't have any prepared transactions in the workers
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
TRUNCATE test_table;
|
||||||
|
|
||||||
|
-- kill as soon as the coordinator sends COMMIT
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()');
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
1,2
|
||||||
|
2,3
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
-- Since we kill connections to one worker after commit arrives but the
|
||||||
|
-- other worker connections are healthy, we cannot commit on 1 worker
|
||||||
|
-- which has 1 active shard placements, but the other does. That's why
|
||||||
|
-- we expect to see 1 recovered prepared transactions.
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
TRUNCATE test_table;
|
||||||
|
|
||||||
|
-- finally, test failing on ROLLBACK just after the coordinator
|
||||||
|
-- sends the ROLLBACK so the command can be rollbacked
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").kill()');
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL client_min_messages TO WARNING;
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
1,2
|
||||||
|
2,3
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
ROLLBACK;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- but now kill just after the worker sends response to
|
||||||
|
-- ROLLBACK command, command should have been rollbacked
|
||||||
|
-- both on the distributed table and the placements
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^ROLLBACK").kill()');
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL client_min_messages TO WARNING;
|
||||||
|
\copy test_table FROM STDIN DELIMITER ','
|
||||||
|
1,2
|
||||||
|
2,3
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
ROLLBACK;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
DROP SCHEMA copy_reference_failure CASCADE;
|
||||||
|
SET search_path TO default;
|
||||||
|
|
Loading…
Reference in New Issue