mirror of https://github.com/citusdata/citus.git
431 lines
17 KiB
PL/PgSQL
431 lines
17 KiB
PL/PgSQL
--
|
|
-- Test TRUNCATE command failures
|
|
--
|
|
CREATE SCHEMA truncate_failure;
|
|
SET search_path TO 'truncate_failure';
|
|
SET citus.next_shard_id TO 120000;
|
|
-- we don't want to see the prepared transaction numbers in the warnings
|
|
SET client_min_messages TO ERROR;
|
|
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
|
|
-- we'll start with replication factor 1, 1PC and parallel mode
|
|
SET citus.multi_shard_commit_protocol TO '1pc';
|
|
SET citus.shard_count = 4;
|
|
SET citus.shard_replication_factor = 1;
|
|
|
|
CREATE TABLE test_table (key int, value int);
|
|
SELECT create_distributed_table('test_table', 'key');
|
|
|
|
INSERT INTO test_table SELECT x,x FROM generate_series(1,20) as f(x);
|
|
|
|
CREATE VIEW unhealthy_shard_count AS
|
|
SELECT count(*)
|
|
FROM pg_dist_shard_placement pdsp
|
|
JOIN
|
|
pg_dist_shard pds
|
|
ON pdsp.shardid=pds.shardid
|
|
WHERE logicalrelid='truncate_failure.test_table'::regclass AND shardstate != 1;
|
|
|
|
-- in the first test, kill just in the first
|
|
-- response we get from the worker
|
|
SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- cancel just in the first
|
|
-- response we get from the worker
|
|
SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- kill as soon as the coordinator sends begin
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- cancel as soon as the coordinator sends begin
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- kill as soon as the coordinator sends TRUNCATE TABLE command
|
|
SELECT citus.mitmproxy('conn.onQuery(query="TRUNCATE TABLE truncate_failure.test_table").kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- cancel as soon as the coordinator sends TRUNCATE TABLE command
|
|
SELECT citus.mitmproxy('conn.onQuery(query="TRUNCATE TABLE truncate_failure.test_table").cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- kill as soon as the coordinator sends COMMIT
|
|
-- One shard should not get truncated but the other should
|
|
-- since it is sent from another connection.
|
|
-- Thus, we should see a partially successful truncate
|
|
-- Note: This is the result of using 1pc and there is no way to recover from it
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- refill the table
|
|
TRUNCATE test_table;
|
|
INSERT INTO test_table SELECT x,x FROM generate_series(1,20) as f(x);
|
|
|
|
-- cancel as soon as the coordinator sends COMMIT
|
|
-- interrupts are held during COMMIT/ROLLBACK, so the command
|
|
-- should have been applied without any issues since cancel is ignored
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- refill the table
|
|
TRUNCATE test_table;
|
|
INSERT INTO test_table SELECT x,x FROM generate_series(1,20) as f(x);
|
|
|
|
SET client_min_messages TO WARNING;
|
|
-- now kill just after the worker sends response to
|
|
-- COMMIT command, so we'll have lots of warnings but the command
|
|
-- should have been committed both on the distributed table and the placements
|
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^COMMIT").kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
SET client_min_messages TO ERROR;
|
|
|
|
INSERT INTO test_table SELECT x,x FROM generate_series(1,20) as f(x);
|
|
|
|
-- now cancel just after the worker sends response to
|
|
-- but Postgres doesn't accept interrupts during COMMIT and ROLLBACK
|
|
-- so should not cancel at all, so not an effective test but adding in
|
|
-- case Citus messes up this behaviour
|
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^COMMIT").cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
INSERT INTO test_table SELECT x,x FROM generate_series(1,20) as f(x);
|
|
|
|
-- Let's test Truncate on reference tables with a FK from a hash distributed table
|
|
CREATE TABLE reference_table(i int UNIQUE);
|
|
INSERT INTO reference_table SELECT x FROM generate_series(1,20) as f(x);
|
|
SELECT create_reference_table('reference_table');
|
|
|
|
ALTER TABLE test_table ADD CONSTRAINT foreign_key FOREIGN KEY (value) REFERENCES reference_table(i);
|
|
-- immediately kill when we see prepare transaction to see if the command
|
|
-- still cascaded to referencing table or failed successfuly
|
|
SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").kill()');
|
|
TRUNCATE reference_table CASCADE;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
SELECT count(*) FROM reference_table;
|
|
|
|
-- immediately cancel when we see prepare transaction to see if the command
|
|
-- still cascaded to referencing table or failed successfuly
|
|
SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE reference_table CASCADE;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
SELECT count(*) FROM reference_table;
|
|
|
|
-- immediately kill when we see cascading TRUNCATE on the hash table to see
|
|
-- rollbacked properly
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE TABLE").after(2).kill()');
|
|
TRUNCATE reference_table CASCADE;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
SELECT count(*) FROM reference_table;
|
|
|
|
-- immediately cancel when we see cascading TRUNCATE on the hash table to see
|
|
-- if the command still cascaded to referencing table or failed successfuly
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE TABLE").after(2).cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE reference_table CASCADE;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
SELECT count(*) FROM reference_table;
|
|
|
|
-- immediately kill after we get prepare transaction complete
|
|
-- to see if the command still cascaded to referencing table or
|
|
-- failed successfuly
|
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="PREPARE TRANSACTION").kill()');
|
|
TRUNCATE reference_table CASCADE;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT recover_prepared_transactions();
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- immediately cancel after we get prepare transaction complete
|
|
-- to see if the command still cascaded to referencing table or
|
|
-- failed successfuly
|
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="PREPARE TRANSACTION").cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE reference_table CASCADE;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT recover_prepared_transactions();
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- now, lets test with 2PC
|
|
SET citus.multi_shard_commit_protocol TO '2pc';
|
|
|
|
-- in the first test, kill just in the first
|
|
-- response we get from the worker
|
|
SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- cancel just in the first
|
|
-- response we get from the worker
|
|
SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- kill as soon as the coordinator sends begin
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- cancel as soon as the coordinator sends begin
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- kill as soon as the coordinator sends TRUNCATE TABLE command
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE TABLE truncate_failure.test_table").kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- cancel as soon as the coordinator sends TRUNCATE TABLE command
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE TABLE truncate_failure.test_table").cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- killing on PREPARE should be fine, everything should be rollbacked
|
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^PREPARE TRANSACTION").kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
|
|
-- we should be able to revocer the transaction and
|
|
-- see that the command is rollbacked
|
|
SELECT recover_prepared_transactions();
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- cancelling on PREPARE should be fine, everything should be rollbacked
|
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^PREPARE TRANSACTION").cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
|
|
-- we should be able to revocer the transaction and
|
|
-- see that the command is rollbacked
|
|
SELECT recover_prepared_transactions();
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- killing on command complete of COMMIT PREPARE, we should see that the command succeeds
|
|
-- and all the workers committed
|
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^COMMIT PREPARED").kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
|
|
-- we shouldn't have any prepared transactions in the workers
|
|
SELECT recover_prepared_transactions();
|
|
SELECT count(*) FROM test_table;
|
|
|
|
INSERT INTO test_table SELECT x,x FROM generate_series(1,20) as f(x);
|
|
|
|
-- kill as soon as the coordinator sends COMMIT
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT PREPARED").kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
-- Since we kill connections to one worker after commit arrives but the
|
|
-- other worker connections are healthy, we cannot commit on 1 worker
|
|
-- which has 2 active shard placements, but the other does. That's why
|
|
-- we expect to see 2 recovered prepared transactions.
|
|
SELECT recover_prepared_transactions();
|
|
SELECT count(*) FROM test_table;
|
|
|
|
INSERT INTO test_table SELECT x,x FROM generate_series(1,20) as f(x);
|
|
|
|
-- finally, test failing on ROLLBACK with 2CPC
|
|
-- fail just after the coordinator sends the ROLLBACK
|
|
-- so the command can be rollbacked
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").kill()');
|
|
BEGIN;
|
|
TRUNCATE test_table;
|
|
ROLLBACK;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- but now kill just after the worker sends response to
|
|
-- ROLLBACK command, so we'll have lots of warnings but the command
|
|
-- should have been rollbacked both on the distributed table and the placements
|
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^ROLLBACK").kill()');
|
|
BEGIN;
|
|
TRUNCATE test_table;
|
|
ROLLBACK;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT recover_prepared_transactions();
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- final set of tests with 2PC and replication factor = 2
|
|
SET citus.multi_shard_commit_protocol TO '2pc';
|
|
SET citus.shard_count = 4;
|
|
SET citus.shard_replication_factor = 2;
|
|
|
|
-- re-create the table with replication factor 2
|
|
DROP TABLE test_table CASCADE;
|
|
CREATE TABLE test_table (key int, value int);
|
|
SELECT create_distributed_table('test_table', 'key');
|
|
INSERT INTO test_table SELECT x,x FROM generate_series(1,20) as f(x);
|
|
|
|
CREATE VIEW unhealthy_shard_count AS
|
|
SELECT count(*)
|
|
FROM pg_dist_shard_placement pdsp
|
|
JOIN
|
|
pg_dist_shard pds
|
|
ON pdsp.shardid=pds.shardid
|
|
WHERE logicalrelid='truncate_failure.test_table'::regclass AND shardstate != 1;
|
|
|
|
-- in the first test, kill just in the first
|
|
-- response we get from the worker
|
|
SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- cancel just in the first
|
|
-- response we get from the worker
|
|
SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- kill as soon as the coordinator sends begin
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- cancel as soon as the coordinator sends begin
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- kill as soon as the coordinator sends TRUNCATE TABLE command
|
|
SELECT citus.mitmproxy('conn.onQuery(query="TRUNCATE TABLE truncate_failure.test_table").kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- cancel as soon as the coordinator sends TRUNCATE TABLE command
|
|
SELECT citus.mitmproxy('conn.onQuery(query="TRUNCATE TABLE truncate_failure.test_table").cancel(' || pg_backend_pid() || ')');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- killing on PREPARE should be fine, everything should be rollbacked
|
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="PREPARE TRANSACTION").kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
|
|
-- we should be able to revocer the transaction and
|
|
-- see that the command is rollbacked
|
|
SELECT recover_prepared_transactions();
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- killing on command complete of COMMIT PREPARE, we should see that the command succeeds
|
|
-- and all the workers committed
|
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^COMMIT PREPARED").kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
|
|
-- we shouldn't have any prepared transactions in the workers
|
|
SELECT recover_prepared_transactions();
|
|
SELECT count(*) FROM test_table;
|
|
|
|
INSERT INTO test_table SELECT x,x FROM generate_series(1,20) as f(x);
|
|
|
|
-- kill as soon as the coordinator sends COMMIT
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT PREPARED").kill()');
|
|
TRUNCATE test_table;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
-- Since we kill connections to one worker after commit arrives but the
|
|
-- other worker connections are healthy, we cannot commit on 1 worker
|
|
-- which has 4 active shard placements (2 shards, replication factor=2),
|
|
-- but the other does. That's why we expect to see 4 recovered prepared
|
|
-- transactions.
|
|
SELECT recover_prepared_transactions();
|
|
SELECT count(*) FROM test_table;
|
|
|
|
INSERT INTO test_table SELECT x,x FROM generate_series(1,20) as f(x);
|
|
|
|
-- finally, test failing on ROLLBACK with 2CPC
|
|
-- fail just after the coordinator sends the ROLLBACK
|
|
-- so the command can be rollbacked
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^ROLLBACK").kill()');
|
|
BEGIN;
|
|
TRUNCATE test_table;
|
|
ROLLBACK;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT count(*) FROM test_table;
|
|
|
|
-- but now kill just after the worker sends response to
|
|
-- ROLLBACK command, so we'll have lots of warnings but the command
|
|
-- should have been rollbacked both on the distributed table and the placements
|
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="^ROLLBACK").kill()');
|
|
BEGIN;
|
|
TRUNCATE test_table;
|
|
ROLLBACK;
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
SELECT * FROM unhealthy_shard_count;
|
|
SELECT recover_prepared_transactions();
|
|
SELECT count(*) FROM test_table;
|
|
|
|
DROP SCHEMA truncate_failure CASCADE;
|
|
SET search_path TO default;
|