mirror of https://github.com/citusdata/citus.git
Add failure test for copy on hash distributed table
parent
82fa85fa5b
commit
bc27651dd9
|
@ -0,0 +1,415 @@
|
||||||
|
--
|
||||||
|
-- Failure tests for COPY to hash distributed tables
|
||||||
|
--
|
||||||
|
CREATE SCHEMA copy_distributed_table;
|
||||||
|
SET search_path TO 'copy_distributed_table';
|
||||||
|
SET citus.next_shard_id TO 1710000;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- With one placement COPY should error out and placement should stay healthy.
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.shard_count to 4;
|
||||||
|
CREATE TABLE test_table(id int, value_1 int);
|
||||||
|
SELECT create_distributed_table('test_table','id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE VIEW unhealthy_shard_count AS
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard_placement pdsp
|
||||||
|
JOIN
|
||||||
|
pg_dist_shard pds
|
||||||
|
ON pdsp.shardid=pds.shardid
|
||||||
|
WHERE logicalrelid='copy_distributed_table.test_table'::regclass AND shardstate != 1;
|
||||||
|
-- Just kill the connection after sending the first query to the worker.
|
||||||
|
SELECT citus.mitmproxy('conn.kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
WARNING: connection error: localhost:9060
|
||||||
|
DETAIL: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: COPY test_table, line 1: "1,2"
|
||||||
|
ERROR: could not connect to any active placements
|
||||||
|
CONTEXT: COPY test_table, line 1: "1,2"
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Now, kill the connection while copying the data
|
||||||
|
SELECT citus.mitmproxy('conn.onCopyData().kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
ERROR: failed to COPY to shard 1710000 on localhost:9060
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Similar to the above one, but now cancel the connection
|
||||||
|
-- instead of killing it.
|
||||||
|
SELECT citus.mitmproxy('conn.onCopyData().cancel(' || pg_backend_pid() || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- kill the connection after worker sends command complete message
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY 1").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
ERROR: failed to COPY to shard 1710002 on localhost:9060
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- similar to above one, but cancel the connection on command complete
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY 1").cancel(' || pg_backend_pid() || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- kill the connection on PREPARE TRANSACTION
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
ERROR: connection not open
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- we don't want to see the prepared transaction numbers in the warnings
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
-- kill on command complete on COMMIT PREPARE, command should succeed
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT PREPARED").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
SET client_min_messages TO NOTICE;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
TRUNCATE TABLE test_table;
|
||||||
|
-- kill on ROLLBACK, command could be rollbacked
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="ROLLBACK").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
ROLLBACK;
|
||||||
|
WARNING: connection not open
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE test_table CASCADE;
|
||||||
|
NOTICE: drop cascades to view unhealthy_shard_count
|
||||||
|
-- With two placement, should we error out or mark untouched shard placements as inactive?
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
CREATE TABLE test_table_2(id int, value_1 int);
|
||||||
|
SELECT create_distributed_table('test_table_2','id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY test_table_2 FROM stdin delimiter ',';
|
||||||
|
WARNING: connection error: localhost:9060
|
||||||
|
DETAIL: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: COPY test_table_2, line 1: "1,2"
|
||||||
|
WARNING: connection error: localhost:9060
|
||||||
|
DETAIL: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: COPY test_table_2, line 2: "3,4"
|
||||||
|
WARNING: connection error: localhost:9060
|
||||||
|
DETAIL: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: COPY test_table_2, line 3: "6,7"
|
||||||
|
WARNING: connection error: localhost:9060
|
||||||
|
DETAIL: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: COPY test_table_2, line 5: "9,10"
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate
|
||||||
|
FROM pg_dist_shard_placement as pdsd
|
||||||
|
INNER JOIN pg_dist_shard as pds
|
||||||
|
ON pdsd.shardid = pds.shardid
|
||||||
|
WHERE pds.logicalrelid = 'test_table_2'::regclass
|
||||||
|
ORDER BY shardid, nodeport;
|
||||||
|
logicalrelid | shardid | shardstate
|
||||||
|
--------------+---------+------------
|
||||||
|
test_table_2 | 1710004 | 3
|
||||||
|
test_table_2 | 1710004 | 1
|
||||||
|
test_table_2 | 1710005 | 3
|
||||||
|
test_table_2 | 1710005 | 1
|
||||||
|
test_table_2 | 1710006 | 3
|
||||||
|
test_table_2 | 1710006 | 1
|
||||||
|
test_table_2 | 1710007 | 3
|
||||||
|
test_table_2 | 1710007 | 1
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
-- Create test_table_2 again to have healthy one
|
||||||
|
DROP TABLE test_table_2;
|
||||||
|
CREATE TABLE test_table_2(id int, value_1 int);
|
||||||
|
SELECT create_distributed_table('test_table_2','id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Kill the connection when we try to start the COPY
|
||||||
|
-- The query should abort
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY test_table_2 FROM stdin delimiter ',';
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
COPY test_table_2, line 1: "1,2"
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate
|
||||||
|
FROM pg_dist_shard_placement as pdsd
|
||||||
|
INNER JOIN pg_dist_shard as pds
|
||||||
|
ON pdsd.shardid = pds.shardid
|
||||||
|
WHERE pds.logicalrelid = 'test_table_2'::regclass
|
||||||
|
ORDER BY shardid, nodeport;
|
||||||
|
logicalrelid | shardid | shardstate
|
||||||
|
--------------+---------+------------
|
||||||
|
test_table_2 | 1710008 | 1
|
||||||
|
test_table_2 | 1710008 | 1
|
||||||
|
test_table_2 | 1710009 | 1
|
||||||
|
test_table_2 | 1710009 | 1
|
||||||
|
test_table_2 | 1710010 | 1
|
||||||
|
test_table_2 | 1710010 | 1
|
||||||
|
test_table_2 | 1710011 | 1
|
||||||
|
test_table_2 | 1710011 | 1
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
-- Create test_table_2 again to have healthy one
|
||||||
|
DROP TABLE test_table_2;
|
||||||
|
CREATE TABLE test_table_2(id int, value_1 int);
|
||||||
|
SELECT create_distributed_table('test_table_2','id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- When kill on copying data, it will be rollbacked and placements won't be labaled as invalid.
|
||||||
|
-- Note that now we sent data to shard 210007, yet it is not marked as invalid.
|
||||||
|
-- You can check the issue about this behaviour: https://github.com/citusdata/citus/issues/1933
|
||||||
|
SELECT citus.mitmproxy('conn.onCopyData().kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\COPY test_table_2 FROM stdin delimiter ',';
|
||||||
|
ERROR: failed to COPY to shard 1710012 on localhost:9060
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate
|
||||||
|
FROM pg_dist_shard_placement as pdsd
|
||||||
|
INNER JOIN pg_dist_shard as pds
|
||||||
|
ON pdsd.shardid = pds.shardid
|
||||||
|
WHERE pds.logicalrelid = 'test_table_2'::regclass
|
||||||
|
ORDER BY shardid, nodeport;
|
||||||
|
logicalrelid | shardid | shardstate
|
||||||
|
--------------+---------+------------
|
||||||
|
test_table_2 | 1710012 | 1
|
||||||
|
test_table_2 | 1710012 | 1
|
||||||
|
test_table_2 | 1710013 | 1
|
||||||
|
test_table_2 | 1710013 | 1
|
||||||
|
test_table_2 | 1710014 | 1
|
||||||
|
test_table_2 | 1710014 | 1
|
||||||
|
test_table_2 | 1710015 | 1
|
||||||
|
test_table_2 | 1710015 | 1
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
DROP SCHEMA copy_distributed_table CASCADE;
|
||||||
|
NOTICE: drop cascades to table test_table_2
|
||||||
|
SET search_path TO default;
|
|
@ -4,12 +4,12 @@ test: failure_test_helpers
|
||||||
# this should only be run by pg_regress_multi, you don't need it
|
# this should only be run by pg_regress_multi, you don't need it
|
||||||
test: failure_setup
|
test: failure_setup
|
||||||
test: multi_test_helpers
|
test: multi_test_helpers
|
||||||
|
|
||||||
test: failure_ddl
|
test: failure_ddl
|
||||||
test: failure_truncate
|
test: failure_truncate
|
||||||
test: failure_create_index_concurrently
|
test: failure_create_index_concurrently
|
||||||
test: failure_add_disable_node
|
test: failure_add_disable_node
|
||||||
test: failure_copy_to_reference
|
test: failure_copy_to_reference
|
||||||
|
test: failure_copy_on_hash
|
||||||
|
|
||||||
test: failure_1pc_copy_hash
|
test: failure_1pc_copy_hash
|
||||||
test: failure_1pc_copy_append
|
test: failure_1pc_copy_append
|
||||||
|
|
|
@ -0,0 +1,213 @@
|
||||||
|
--
|
||||||
|
-- Failure tests for COPY to hash distributed tables
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE SCHEMA copy_distributed_table;
|
||||||
|
SET search_path TO 'copy_distributed_table';
|
||||||
|
SET citus.next_shard_id TO 1710000;
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
-- With one placement COPY should error out and placement should stay healthy.
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.shard_count to 4;
|
||||||
|
|
||||||
|
CREATE TABLE test_table(id int, value_1 int);
|
||||||
|
SELECT create_distributed_table('test_table','id');
|
||||||
|
|
||||||
|
CREATE VIEW unhealthy_shard_count AS
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard_placement pdsp
|
||||||
|
JOIN
|
||||||
|
pg_dist_shard pds
|
||||||
|
ON pdsp.shardid=pds.shardid
|
||||||
|
WHERE logicalrelid='copy_distributed_table.test_table'::regclass AND shardstate != 1;
|
||||||
|
|
||||||
|
-- Just kill the connection after sending the first query to the worker.
|
||||||
|
SELECT citus.mitmproxy('conn.kill()');
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
1,2
|
||||||
|
3,4
|
||||||
|
6,7
|
||||||
|
8,9
|
||||||
|
\.
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- Now, kill the connection while copying the data
|
||||||
|
SELECT citus.mitmproxy('conn.onCopyData().kill()');
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
1,2
|
||||||
|
3,4
|
||||||
|
6,7
|
||||||
|
8,9
|
||||||
|
\.
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- Similar to the above one, but now cancel the connection
|
||||||
|
-- instead of killing it.
|
||||||
|
SELECT citus.mitmproxy('conn.onCopyData().cancel(' || pg_backend_pid() || ')');
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
1,2
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- kill the connection after worker sends command complete message
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY 1").kill()');
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
1,2
|
||||||
|
3,4
|
||||||
|
6,7
|
||||||
|
8,9
|
||||||
|
\.
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- similar to above one, but cancel the connection on command complete
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY 1").cancel(' || pg_backend_pid() || ')');
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
1,2
|
||||||
|
3,4
|
||||||
|
6,7
|
||||||
|
8,9
|
||||||
|
\.
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- kill the connection on PREPARE TRANSACTION
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").kill()');
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
1,2
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
-- we don't want to see the prepared transaction numbers in the warnings
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
|
||||||
|
-- kill on command complete on COMMIT PREPARE, command should succeed
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT PREPARED").kill()');
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
1,2
|
||||||
|
3,4
|
||||||
|
6,7
|
||||||
|
8,9
|
||||||
|
\.
|
||||||
|
|
||||||
|
SET client_min_messages TO NOTICE;
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
|
||||||
|
TRUNCATE TABLE test_table;
|
||||||
|
|
||||||
|
-- kill on ROLLBACK, command could be rollbacked
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="ROLLBACK").kill()');
|
||||||
|
BEGIN;
|
||||||
|
\COPY test_table FROM stdin delimiter ',';
|
||||||
|
1,2
|
||||||
|
3,4
|
||||||
|
\.
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT * FROM unhealthy_shard_count;
|
||||||
|
SELECT count(*) FROM test_table;
|
||||||
|
DROP TABLE test_table CASCADE;
|
||||||
|
|
||||||
|
-- With two placement, should we error out or mark untouched shard placements as inactive?
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
|
||||||
|
CREATE TABLE test_table_2(id int, value_1 int);
|
||||||
|
SELECT create_distributed_table('test_table_2','id');
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.kill()');
|
||||||
|
|
||||||
|
\COPY test_table_2 FROM stdin delimiter ',';
|
||||||
|
1,2
|
||||||
|
3,4
|
||||||
|
6,7
|
||||||
|
8,9
|
||||||
|
9,10
|
||||||
|
11,12
|
||||||
|
13,14
|
||||||
|
\.
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate
|
||||||
|
FROM pg_dist_shard_placement as pdsd
|
||||||
|
INNER JOIN pg_dist_shard as pds
|
||||||
|
ON pdsd.shardid = pds.shardid
|
||||||
|
WHERE pds.logicalrelid = 'test_table_2'::regclass
|
||||||
|
ORDER BY shardid, nodeport;
|
||||||
|
|
||||||
|
-- Create test_table_2 again to have healthy one
|
||||||
|
DROP TABLE test_table_2;
|
||||||
|
CREATE TABLE test_table_2(id int, value_1 int);
|
||||||
|
SELECT create_distributed_table('test_table_2','id');
|
||||||
|
|
||||||
|
-- Kill the connection when we try to start the COPY
|
||||||
|
-- The query should abort
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()');
|
||||||
|
|
||||||
|
\COPY test_table_2 FROM stdin delimiter ',';
|
||||||
|
1,2
|
||||||
|
3,4
|
||||||
|
6,7
|
||||||
|
8,9
|
||||||
|
\.
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate
|
||||||
|
FROM pg_dist_shard_placement as pdsd
|
||||||
|
INNER JOIN pg_dist_shard as pds
|
||||||
|
ON pdsd.shardid = pds.shardid
|
||||||
|
WHERE pds.logicalrelid = 'test_table_2'::regclass
|
||||||
|
ORDER BY shardid, nodeport;
|
||||||
|
|
||||||
|
-- Create test_table_2 again to have healthy one
|
||||||
|
DROP TABLE test_table_2;
|
||||||
|
CREATE TABLE test_table_2(id int, value_1 int);
|
||||||
|
SELECT create_distributed_table('test_table_2','id');
|
||||||
|
|
||||||
|
-- When kill on copying data, it will be rollbacked and placements won't be labaled as invalid.
|
||||||
|
-- Note that now we sent data to shard 210007, yet it is not marked as invalid.
|
||||||
|
-- You can check the issue about this behaviour: https://github.com/citusdata/citus/issues/1933
|
||||||
|
SELECT citus.mitmproxy('conn.onCopyData().kill()');
|
||||||
|
\COPY test_table_2 FROM stdin delimiter ',';
|
||||||
|
1,2
|
||||||
|
3,4
|
||||||
|
6,7
|
||||||
|
8,9
|
||||||
|
9,10
|
||||||
|
11,12
|
||||||
|
13,14
|
||||||
|
\.
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate
|
||||||
|
FROM pg_dist_shard_placement as pdsd
|
||||||
|
INNER JOIN pg_dist_shard as pds
|
||||||
|
ON pdsd.shardid = pds.shardid
|
||||||
|
WHERE pds.logicalrelid = 'test_table_2'::regclass
|
||||||
|
ORDER BY shardid, nodeport;
|
||||||
|
|
||||||
|
DROP SCHEMA copy_distributed_table CASCADE;
|
||||||
|
SET search_path TO default;
|
Loading…
Reference in New Issue