mirror of https://github.com/citusdata/citus.git
385 lines
11 KiB
Plaintext
385 lines
11 KiB
Plaintext
SELECT citus.mitmproxy('conn.allow()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
SET citus.shard_count = 1;
|
|
SET citus.shard_replication_factor = 2; -- one shard per worker
|
|
SET citus.multi_shard_commit_protocol TO '1pc';
|
|
SET citus.next_shard_id TO 100400;
|
|
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100;
|
|
CREATE TABLE copy_test (key int, value int);
|
|
SELECT create_distributed_table('copy_test', 'key');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
|
SELECT count(1) FROM copy_test;
|
|
count
|
|
-------
|
|
4
|
|
(1 row)
|
|
|
|
-- ==== kill the connection when we try to start a transaction ====
|
|
-- the query should abort
|
|
SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction").killall()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
|
WARNING: connection not open
|
|
CONTEXT: while executing command on localhost:9060
|
|
COPY copy_test, line 1: "0, 0"
|
|
ERROR: failure on connection marked as essential: localhost:9060
|
|
CONTEXT: COPY copy_test, line 1: "0, 0"
|
|
-- ==== kill the connection when we try to start the COPY ====
|
|
-- the query should abort
|
|
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
|
ERROR: server closed the connection unexpectedly
|
|
This probably means the server terminated abnormally
|
|
before or while processing the request.
|
|
CONTEXT: while executing command on localhost:9060
|
|
COPY copy_test, line 1: "0, 0"
|
|
-- ==== kill the connection when we first start sending data ====
|
|
-- the query should abort
|
|
SELECT citus.mitmproxy('conn.onCopyData().killall()'); -- raw rows from the client
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
|
ERROR: failed to COPY to shard 100400 on localhost:9060
|
|
-- ==== kill the connection when the worker confirms it's received the data ====
|
|
-- the query should abort
|
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").killall()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
|
ERROR: failed to COPY to shard 100400 on localhost:9060
|
|
-- ==== kill the connection when we try to send COMMIT ====
|
|
-- the query should succeed, and the placement should be marked inactive
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN (
|
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
|
|
) AND shardstate = 3;
|
|
count
|
|
-------
|
|
0
|
|
(1 row)
|
|
|
|
SELECT count(1) FROM copy_test;
|
|
count
|
|
-------
|
|
4
|
|
(1 row)
|
|
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
|
WARNING: connection not open
|
|
CONTEXT: while executing command on localhost:9060
|
|
WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync
|
|
WARNING: connection not open
|
|
CONTEXT: while executing command on localhost:9060
|
|
-- the shard is marked invalid
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN (
|
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
|
|
) AND shardstate = 3;
|
|
count
|
|
-------
|
|
1
|
|
(1 row)
|
|
|
|
SELECT count(1) FROM copy_test;
|
|
count
|
|
-------
|
|
8
|
|
(1 row)
|
|
|
|
-- ==== clean up a little bit before running the next test ====
|
|
UPDATE pg_dist_shard_placement SET shardstate = 1
|
|
WHERE shardid IN (
|
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
|
|
);
|
|
TRUNCATE copy_test;
|
|
-- ==== try to COPY invalid data ====
|
|
-- here the coordinator actually sends the data, but then unexpectedly closes the
|
|
-- connection when it notices the data stream is broken. Crucially, it closes the
|
|
-- connection before sending COMMIT, so no data makes it into the worker.
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV;
|
|
ERROR: missing data for column "value"
|
|
CONTEXT: COPY copy_test, line 5: "10"
|
|
-- kill the connection if the coordinator sends COMMIT. It doesn't, so nothing changes
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").kill()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV;
|
|
ERROR: missing data for column "value"
|
|
CONTEXT: COPY copy_test, line 5: "10"
|
|
SELECT * FROM copy_test ORDER BY key, value;
|
|
key | value
|
|
-----+-------
|
|
(0 rows)
|
|
|
|
-- ==== clean up some more to prepare for tests with only one replica ====
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
TRUNCATE copy_test;
|
|
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port;
|
|
SELECT * FROM pg_dist_shard_placement WHERE shardid IN (
|
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
|
|
) ORDER BY nodeport, placementid;
|
|
shardid | shardstate | shardlength | nodename | nodeport | placementid
|
|
---------+------------+-------------+-----------+----------+-------------
|
|
100400 | 1 | 0 | localhost | 9060 | 100
|
|
100400 | 3 | 0 | localhost | 57637 | 101
|
|
(2 rows)
|
|
|
|
-- ==== okay, run some tests where there's only one active shard ====
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
|
SELECT * FROM copy_test;
|
|
key | value
|
|
-----+-------
|
|
0 | 0
|
|
1 | 1
|
|
2 | 4
|
|
3 | 9
|
|
(4 rows)
|
|
|
|
-- the worker is unreachable
|
|
SELECT citus.mitmproxy('conn.killall()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
|
WARNING: connection not open
|
|
CONTEXT: while executing command on localhost:9060
|
|
COPY copy_test, line 1: "0, 0"
|
|
ERROR: failure on connection marked as essential: localhost:9060
|
|
CONTEXT: COPY copy_test, line 1: "0, 0"
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
SELECT * FROM copy_test;
|
|
key | value
|
|
-----+-------
|
|
0 | 0
|
|
1 | 1
|
|
2 | 4
|
|
3 | 9
|
|
(4 rows)
|
|
|
|
-- the first message fails
|
|
SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").killall()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
|
WARNING: connection not open
|
|
CONTEXT: while executing command on localhost:9060
|
|
COPY copy_test, line 1: "0, 0"
|
|
ERROR: failure on connection marked as essential: localhost:9060
|
|
CONTEXT: COPY copy_test, line 1: "0, 0"
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
SELECT * FROM copy_test;
|
|
key | value
|
|
-----+-------
|
|
0 | 0
|
|
1 | 1
|
|
2 | 4
|
|
3 | 9
|
|
(4 rows)
|
|
|
|
-- the COPY message fails
|
|
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
|
ERROR: server closed the connection unexpectedly
|
|
This probably means the server terminated abnormally
|
|
before or while processing the request.
|
|
CONTEXT: while executing command on localhost:9060
|
|
COPY copy_test, line 1: "0, 0"
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
SELECT * FROM copy_test;
|
|
key | value
|
|
-----+-------
|
|
0 | 0
|
|
1 | 1
|
|
2 | 4
|
|
3 | 9
|
|
(4 rows)
|
|
|
|
-- the COPY data fails
|
|
SELECT citus.mitmproxy('conn.onCopyData().killall()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
|
ERROR: failed to COPY to shard 100400 on localhost:9060
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
SELECT * FROM copy_test;
|
|
key | value
|
|
-----+-------
|
|
0 | 0
|
|
1 | 1
|
|
2 | 4
|
|
3 | 9
|
|
(4 rows)
|
|
|
|
-- the COMMIT fails
|
|
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
|
WARNING: connection not open
|
|
CONTEXT: while executing command on localhost:9060
|
|
WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync
|
|
WARNING: connection not open
|
|
CONTEXT: while executing command on localhost:9060
|
|
WARNING: could not commit transaction for shard 100400 on any active node
|
|
ERROR: could not commit transaction on any active node
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
SELECT * FROM copy_test;
|
|
key | value
|
|
-----+-------
|
|
0 | 0
|
|
1 | 1
|
|
2 | 4
|
|
3 | 9
|
|
(4 rows)
|
|
|
|
-- the placement is not marked invalid
|
|
SELECT * FROM pg_dist_shard_placement WHERE shardid IN (
|
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
|
|
) ORDER BY nodeport, placementid;
|
|
shardid | shardstate | shardlength | nodename | nodeport | placementid
|
|
---------+------------+-------------+-----------+----------+-------------
|
|
100400 | 1 | 0 | localhost | 9060 | 100
|
|
100400 | 3 | 0 | localhost | 57637 | 101
|
|
(2 rows)
|
|
|
|
-- the COMMIT makes it through but the connection dies before we get a response
|
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT").killall()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
|
WARNING: connection not open
|
|
CONTEXT: while executing command on localhost:9060
|
|
WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync
|
|
WARNING: connection not open
|
|
CONTEXT: while executing command on localhost:9060
|
|
WARNING: could not commit transaction for shard 100400 on any active node
|
|
ERROR: could not commit transaction on any active node
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
SELECT * FROM pg_dist_shard_placement WHERE shardid IN (
|
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
|
|
) ORDER BY nodeport, placementid;
|
|
shardid | shardstate | shardlength | nodename | nodeport | placementid
|
|
---------+------------+-------------+-----------+----------+-------------
|
|
100400 | 1 | 0 | localhost | 9060 | 100
|
|
100400 | 3 | 0 | localhost | 57637 | 101
|
|
(2 rows)
|
|
|
|
SELECT * FROM copy_test;
|
|
key | value
|
|
-----+-------
|
|
0 | 0
|
|
1 | 1
|
|
2 | 4
|
|
3 | 9
|
|
0 | 0
|
|
1 | 1
|
|
2 | 4
|
|
3 | 9
|
|
(8 rows)
|
|
|
|
-- ==== Clean up, we're done here ====
|
|
SELECT citus.mitmproxy('conn.allow()');
|
|
mitmproxy
|
|
-----------
|
|
|
|
(1 row)
|
|
|
|
DROP TABLE copy_test;
|