mirror of https://github.com/citusdata/citus.git
Add copy failure tests inside transactions
parent
1107439ade
commit
0e635b69f0
|
@ -1127,6 +1127,15 @@ SELECT create_reference_table('reference_failure_test');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- create a hash distributed table
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
CREATE TABLE numbers_hash_failure_test(key int, value int);
|
||||||
|
SELECT create_distributed_table('numbers_hash_failure_test', 'key');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- ensure that the shard is created for this user
|
-- ensure that the shard is created for this user
|
||||||
\c - test_user - :worker_1_port
|
\c - test_user - :worker_1_port
|
||||||
\dt reference_failure_test_1200015
|
\dt reference_failure_test_1200015
|
||||||
|
@ -1152,6 +1161,10 @@ INSERT INTO reference_failure_test VALUES (1, '1');
|
||||||
WARNING: connection error: localhost:57637
|
WARNING: connection error: localhost:57637
|
||||||
ERROR: failure on connection marked as essential: localhost:57637
|
ERROR: failure on connection marked as essential: localhost:57637
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
BEGIN;
|
||||||
|
COPY reference_failure_test FROM STDIN WITH (FORMAT 'csv');
|
||||||
|
ERROR: connection error: localhost:57637
|
||||||
|
COMMIT;
|
||||||
-- show that no data go through the table and shard states are good
|
-- show that no data go through the table and shard states are good
|
||||||
SELECT * FROM reference_failure_test;
|
SELECT * FROM reference_failure_test;
|
||||||
key | value
|
key | value
|
||||||
|
@ -1171,12 +1184,109 @@ ORDER BY s.logicalrelid, sp.shardstate;
|
||||||
reference_failure_test | 1 | 2
|
reference_failure_test | 1 | 2
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
|
||||||
|
WARNING: connection error: localhost:57637
|
||||||
|
WARNING: connection error: localhost:57637
|
||||||
|
-- some placements are invalid before abort
|
||||||
|
SELECT shardid, shardstate, nodename, nodeport
|
||||||
|
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
|
||||||
|
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
|
||||||
|
ORDER BY shardid, nodeport;
|
||||||
|
shardid | shardstate | nodename | nodeport
|
||||||
|
---------+------------+-----------+----------
|
||||||
|
1200016 | 3 | localhost | 57637
|
||||||
|
1200016 | 1 | localhost | 57638
|
||||||
|
1200017 | 1 | localhost | 57637
|
||||||
|
1200017 | 1 | localhost | 57638
|
||||||
|
1200018 | 1 | localhost | 57637
|
||||||
|
1200018 | 1 | localhost | 57638
|
||||||
|
1200019 | 3 | localhost | 57637
|
||||||
|
1200019 | 1 | localhost | 57638
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
ABORT;
|
||||||
|
-- verify nothing is inserted
|
||||||
|
SELECT count(*) FROM numbers_hash_failure_test;
|
||||||
|
WARNING: connection error: localhost:57637
|
||||||
|
WARNING: connection error: localhost:57637
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- all placements to be market valid
|
||||||
|
SELECT shardid, shardstate, nodename, nodeport
|
||||||
|
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
|
||||||
|
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
|
||||||
|
ORDER BY shardid, nodeport;
|
||||||
|
shardid | shardstate | nodename | nodeport
|
||||||
|
---------+------------+-----------+----------
|
||||||
|
1200016 | 1 | localhost | 57637
|
||||||
|
1200016 | 1 | localhost | 57638
|
||||||
|
1200017 | 1 | localhost | 57637
|
||||||
|
1200017 | 1 | localhost | 57638
|
||||||
|
1200018 | 1 | localhost | 57637
|
||||||
|
1200018 | 1 | localhost | 57638
|
||||||
|
1200019 | 1 | localhost | 57637
|
||||||
|
1200019 | 1 | localhost | 57638
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
|
||||||
|
WARNING: connection error: localhost:57637
|
||||||
|
WARNING: connection error: localhost:57637
|
||||||
|
-- check shard states before commit
|
||||||
|
SELECT shardid, shardstate, nodename, nodeport
|
||||||
|
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
|
||||||
|
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
|
||||||
|
ORDER BY shardid, nodeport;
|
||||||
|
shardid | shardstate | nodename | nodeport
|
||||||
|
---------+------------+-----------+----------
|
||||||
|
1200016 | 3 | localhost | 57637
|
||||||
|
1200016 | 1 | localhost | 57638
|
||||||
|
1200017 | 1 | localhost | 57637
|
||||||
|
1200017 | 1 | localhost | 57638
|
||||||
|
1200018 | 1 | localhost | 57637
|
||||||
|
1200018 | 1 | localhost | 57638
|
||||||
|
1200019 | 3 | localhost | 57637
|
||||||
|
1200019 | 1 | localhost | 57638
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- expect some placements to be market invalid after commit
|
||||||
|
SELECT shardid, shardstate, nodename, nodeport
|
||||||
|
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
|
||||||
|
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
|
||||||
|
ORDER BY shardid, nodeport;
|
||||||
|
shardid | shardstate | nodename | nodeport
|
||||||
|
---------+------------+-----------+----------
|
||||||
|
1200016 | 3 | localhost | 57637
|
||||||
|
1200016 | 1 | localhost | 57638
|
||||||
|
1200017 | 1 | localhost | 57637
|
||||||
|
1200017 | 1 | localhost | 57638
|
||||||
|
1200018 | 1 | localhost | 57637
|
||||||
|
1200018 | 1 | localhost | 57638
|
||||||
|
1200019 | 3 | localhost | 57637
|
||||||
|
1200019 | 1 | localhost | 57638
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
-- verify data is inserted
|
||||||
|
SELECT count(*) FROM numbers_hash_failure_test;
|
||||||
|
WARNING: connection error: localhost:57637
|
||||||
|
WARNING: connection error: localhost:57637
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- connect back to the worker and set rename the test_user back
|
-- connect back to the worker and set rename the test_user back
|
||||||
\c - :default_user - :worker_1_port
|
\c - :default_user - :worker_1_port
|
||||||
ALTER USER test_user_new RENAME TO test_user;
|
ALTER USER test_user_new RENAME TO test_user;
|
||||||
-- connect back to the master with the proper user to continue the tests
|
-- connect back to the master with the proper user to continue the tests
|
||||||
\c - :default_user - :master_port
|
\c - :default_user - :master_port
|
||||||
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second, reference_failure_test;
|
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second,
|
||||||
|
reference_failure_test, numbers_hash_failure_test;
|
||||||
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
||||||
nodename | nodeport | success | result
|
nodename | nodeport | success | result
|
||||||
-----------+----------+---------+-----------
|
-----------+----------+---------+-----------
|
||||||
|
|
|
@ -832,6 +832,11 @@ CREATE USER test_user;
|
||||||
CREATE TABLE reference_failure_test (key int, value int);
|
CREATE TABLE reference_failure_test (key int, value int);
|
||||||
SELECT create_reference_table('reference_failure_test');
|
SELECT create_reference_table('reference_failure_test');
|
||||||
|
|
||||||
|
-- create a hash distributed table
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
CREATE TABLE numbers_hash_failure_test(key int, value int);
|
||||||
|
SELECT create_distributed_table('numbers_hash_failure_test', 'key');
|
||||||
|
|
||||||
-- ensure that the shard is created for this user
|
-- ensure that the shard is created for this user
|
||||||
\c - test_user - :worker_1_port
|
\c - test_user - :worker_1_port
|
||||||
\dt reference_failure_test_1200015
|
\dt reference_failure_test_1200015
|
||||||
|
@ -851,9 +856,16 @@ BEGIN;
|
||||||
INSERT INTO reference_failure_test VALUES (1, '1');
|
INSERT INTO reference_failure_test VALUES (1, '1');
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
COPY reference_failure_test FROM STDIN WITH (FORMAT 'csv');
|
||||||
|
2,2
|
||||||
|
\.
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
-- show that no data go through the table and shard states are good
|
-- show that no data go through the table and shard states are good
|
||||||
SELECT * FROM reference_failure_test;
|
SELECT * FROM reference_failure_test;
|
||||||
|
|
||||||
|
|
||||||
-- all placements should be healthy
|
-- all placements should be healthy
|
||||||
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
|
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
|
||||||
FROM pg_dist_shard_placement AS sp,
|
FROM pg_dist_shard_placement AS sp,
|
||||||
|
@ -863,13 +875,60 @@ AND s.logicalrelid = 'reference_failure_test'::regclass
|
||||||
GROUP BY s.logicalrelid, sp.shardstate
|
GROUP BY s.logicalrelid, sp.shardstate
|
||||||
ORDER BY s.logicalrelid, sp.shardstate;
|
ORDER BY s.logicalrelid, sp.shardstate;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
|
||||||
|
1,1
|
||||||
|
2,2
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- some placements are invalid before abort
|
||||||
|
SELECT shardid, shardstate, nodename, nodeport
|
||||||
|
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
|
||||||
|
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
|
||||||
|
ORDER BY shardid, nodeport;
|
||||||
|
|
||||||
|
ABORT;
|
||||||
|
|
||||||
|
-- verify nothing is inserted
|
||||||
|
SELECT count(*) FROM numbers_hash_failure_test;
|
||||||
|
|
||||||
|
-- all placements to be market valid
|
||||||
|
SELECT shardid, shardstate, nodename, nodeport
|
||||||
|
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
|
||||||
|
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
|
||||||
|
ORDER BY shardid, nodeport;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
|
||||||
|
1,1
|
||||||
|
2,2
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- check shard states before commit
|
||||||
|
SELECT shardid, shardstate, nodename, nodeport
|
||||||
|
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
|
||||||
|
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
|
||||||
|
ORDER BY shardid, nodeport;
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- expect some placements to be market invalid after commit
|
||||||
|
SELECT shardid, shardstate, nodename, nodeport
|
||||||
|
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
|
||||||
|
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
|
||||||
|
ORDER BY shardid, nodeport;
|
||||||
|
|
||||||
|
-- verify data is inserted
|
||||||
|
SELECT count(*) FROM numbers_hash_failure_test;
|
||||||
|
|
||||||
-- connect back to the worker and set rename the test_user back
|
-- connect back to the worker and set rename the test_user back
|
||||||
\c - :default_user - :worker_1_port
|
\c - :default_user - :worker_1_port
|
||||||
ALTER USER test_user_new RENAME TO test_user;
|
ALTER USER test_user_new RENAME TO test_user;
|
||||||
|
|
||||||
-- connect back to the master with the proper user to continue the tests
|
-- connect back to the master with the proper user to continue the tests
|
||||||
\c - :default_user - :master_port
|
\c - :default_user - :master_port
|
||||||
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second, reference_failure_test;
|
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second,
|
||||||
|
reference_failure_test, numbers_hash_failure_test;
|
||||||
|
|
||||||
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
||||||
DROP USER test_user;
|
DROP USER test_user;
|
Loading…
Reference in New Issue