Add tests for 1PC COPY on append and hash-distributed tables

Add tests for 1PC COPY on append and hash-distributed tables
pull/2273/head
Brian Cloutier 2018-07-31 15:17:59 -07:00 committed by GitHub
parent f0f7a691a3
commit 82fa85fa5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 901 additions and 0 deletions

View File

@ -0,0 +1,279 @@
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
SET citus.shard_count = 1;
SET citus.shard_replication_factor = 2; -- one shard per worker
SET citus.multi_shard_commit_protocol TO '1pc';
SET citus.next_shard_id TO 100400;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100;
CREATE TABLE copy_test (key int, value int);
SELECT create_distributed_table('copy_test', 'key', 'append');
create_distributed_table
--------------------------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT count(1) FROM copy_test;
count
-------
4
(1 row)
---- all of the following tests test behavior with 2 shard placements ----
SHOW citus.shard_replication_factor;
citus.shard_replication_factor
--------------------------------
2
(1 row)
---- kill the connection when we try to create the shard ----
SELECT citus.mitmproxy('conn.onQuery(query="worker_apply_shard_ddl_command").kill()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
ERROR: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:9060
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
(2 rows)
SELECT count(1) FROM copy_test;
count
-------
4
(1 row)
---- kill the connection when we try to start a transaction ----
SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").kill()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
ERROR: failure on connection marked as essential: localhost:9060
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
(2 rows)
SELECT count(1) FROM copy_test;
count
-------
4
(1 row)
---- kill the connection when we start the COPY ----
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").kill()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
ERROR: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:9060
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
(2 rows)
SELECT count(1) FROM copy_test;
count
-------
4
(1 row)
---- kill the connection when we send the data ----
SELECT citus.mitmproxy('conn.onCopyData().kill()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
ERROR: failed to COPY to shard 100404 on localhost:9060
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
(2 rows)
SELECT count(1) FROM copy_test;
WARNING: could not consume data from worker node
count
-------
4
(1 row)
---- cancel the connection when we send the data ----
SELECT citus.mitmproxy(format('conn.onCopyData().cancel(%s)', pg_backend_pid()));
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
ERROR: canceling statement due to user request
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
(2 rows)
SELECT count(1) FROM copy_test;
ERROR: canceling statement due to user request
---- kill the connection when we try to get the size of the table ----
SELECT citus.mitmproxy('conn.onQuery(query="pg_table_size").kill()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:9060
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
ERROR: failure on connection marked as essential: localhost:9060
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
(2 rows)
SELECT count(1) FROM copy_test;
count
-------
4
(1 row)
-- we round-robin when picking which node to run pg_table_size on, this COPY runs it on
-- the other node, so the next copy will try to run it on our node
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY p.nodeport, p.placementid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 112
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 113
(4 rows)
SELECT count(1) FROM copy_test;
count
-------
8
(1 row)
---- kill the connection when we try to get the min, max of the table ----
SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:9060
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
ERROR: failure on connection marked as essential: localhost:9060
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 112
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 113
(4 rows)
SELECT count(1) FROM copy_test;
count
-------
8
(1 row)
---- kill the connection when we try to COMMIT ----
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 112
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 113
copy_test | 100409 | t | 0 | 3 | 100409 | 3 | 8192 | localhost | 9060 | 116
copy_test | 100409 | t | 0 | 3 | 100409 | 1 | 8192 | localhost | 57637 | 117
(6 rows)
SELECT count(1) FROM copy_test;
count
-------
12
(1 row)
-- ==== Clean up, we're done here ====
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
DROP TABLE copy_test;

View File

@ -0,0 +1,384 @@
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
SET citus.shard_count = 1;
SET citus.shard_replication_factor = 2; -- one shard per worker
SET citus.multi_shard_commit_protocol TO '1pc';
SET citus.next_shard_id TO 100400;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100;
CREATE TABLE copy_test (key int, value int);
SELECT create_distributed_table('copy_test', 'key');
create_distributed_table
--------------------------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT count(1) FROM copy_test;
count
-------
4
(1 row)
-- ==== kill the connection when we try to start a transaction ====
-- the query should abort
SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction").killall()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
COPY copy_test, line 1: "0, 0"
ERROR: failure on connection marked as essential: localhost:9060
CONTEXT: COPY copy_test, line 1: "0, 0"
-- ==== kill the connection when we try to start the COPY ====
-- the query should abort
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
ERROR: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:9060
COPY copy_test, line 1: "0, 0"
-- ==== kill the connection when we first start sending data ====
-- the query should abort
SELECT citus.mitmproxy('conn.onCopyData().killall()'); -- raw rows from the client
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
ERROR: failed to COPY to shard 100400 on localhost:9060
-- ==== kill the connection when the worker confirms it's received the data ====
-- the query should abort
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").killall()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
ERROR: failed to COPY to shard 100400 on localhost:9060
-- ==== kill the connection when we try to send COMMIT ====
-- the query should succeed, and the placement should be marked inactive
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN (
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
) AND shardstate = 3;
count
-------
0
(1 row)
SELECT count(1) FROM copy_test;
count
-------
4
(1 row)
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
-- the shard is marked invalid
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN (
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
) AND shardstate = 3;
count
-------
1
(1 row)
SELECT count(1) FROM copy_test;
count
-------
8
(1 row)
-- ==== clean up a little bit before running the next test ====
UPDATE pg_dist_shard_placement SET shardstate = 1
WHERE shardid IN (
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
);
TRUNCATE copy_test;
-- ==== try to COPY invalid data ====
-- here the coordinator actually sends the data, but then unexpectedly closes the
-- connection when it notices the data stream is broken. Crucially, it closes the
-- connection before sending COMMIT, so no data makes it into the worker.
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV;
ERROR: missing data for column "value"
CONTEXT: COPY copy_test, line 5: "10"
-- kill the connection if the coordinator sends COMMIT. It doesn't, so nothing changes
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").kill()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV;
ERROR: missing data for column "value"
CONTEXT: COPY copy_test, line 5: "10"
SELECT * FROM copy_test ORDER BY key, value;
key | value
-----+-------
(0 rows)
-- ==== clean up some more to prepare for tests with only one replica ====
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
TRUNCATE copy_test;
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port;
SELECT * FROM pg_dist_shard_placement WHERE shardid IN (
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
) ORDER BY nodeport, placementid;
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
100400 | 1 | 0 | localhost | 9060 | 100
100400 | 3 | 0 | localhost | 57637 | 101
(2 rows)
-- ==== okay, run some tests where there's only one active shard ====
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM copy_test;
key | value
-----+-------
0 | 0
1 | 1
2 | 4
3 | 9
(4 rows)
-- the worker is unreachable
SELECT citus.mitmproxy('conn.killall()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
COPY copy_test, line 1: "0, 0"
ERROR: failure on connection marked as essential: localhost:9060
CONTEXT: COPY copy_test, line 1: "0, 0"
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
SELECT * FROM copy_test;
key | value
-----+-------
0 | 0
1 | 1
2 | 4
3 | 9
(4 rows)
-- the first message fails
SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").killall()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
COPY copy_test, line 1: "0, 0"
ERROR: failure on connection marked as essential: localhost:9060
CONTEXT: COPY copy_test, line 1: "0, 0"
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
SELECT * FROM copy_test;
key | value
-----+-------
0 | 0
1 | 1
2 | 4
3 | 9
(4 rows)
-- the COPY message fails
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
ERROR: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:9060
COPY copy_test, line 1: "0, 0"
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
SELECT * FROM copy_test;
key | value
-----+-------
0 | 0
1 | 1
2 | 4
3 | 9
(4 rows)
-- the COPY data fails
SELECT citus.mitmproxy('conn.onCopyData().killall()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
ERROR: failed to COPY to shard 100400 on localhost:9060
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
SELECT * FROM copy_test;
key | value
-----+-------
0 | 0
1 | 1
2 | 4
3 | 9
(4 rows)
-- the COMMIT fails
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: could not commit transaction for shard 100400 on any active node
ERROR: could not commit transaction on any active node
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
SELECT * FROM copy_test;
key | value
-----+-------
0 | 0
1 | 1
2 | 4
3 | 9
(4 rows)
-- the placement is not marked invalid
SELECT * FROM pg_dist_shard_placement WHERE shardid IN (
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
) ORDER BY nodeport, placementid;
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
100400 | 1 | 0 | localhost | 9060 | 100
100400 | 3 | 0 | localhost | 57637 | 101
(2 rows)
-- the COMMIT makes it through but the connection dies before we get a response
SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT").killall()');
mitmproxy
-----------
(1 row)
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: could not commit transaction for shard 100400 on any active node
ERROR: could not commit transaction on any active node
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
SELECT * FROM pg_dist_shard_placement WHERE shardid IN (
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
) ORDER BY nodeport, placementid;
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+-----------+----------+-------------
100400 | 1 | 0 | localhost | 9060 | 100
100400 | 3 | 0 | localhost | 57637 | 101
(2 rows)
SELECT * FROM copy_test;
key | value
-----+-------
0 | 0
1 | 1
2 | 4
3 | 9
0 | 0
1 | 1
2 | 4
3 | 9
(8 rows)
-- ==== Clean up, we're done here ====
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
DROP TABLE copy_test;

View File

@ -10,3 +10,6 @@ test: failure_truncate
test: failure_create_index_concurrently test: failure_create_index_concurrently
test: failure_add_disable_node test: failure_add_disable_node
test: failure_copy_to_reference test: failure_copy_to_reference
test: failure_1pc_copy_hash
test: failure_1pc_copy_append

View File

@ -0,0 +1,93 @@
SELECT citus.mitmproxy('conn.allow()');
SET citus.shard_count = 1;
SET citus.shard_replication_factor = 2; -- one shard per worker
SET citus.multi_shard_commit_protocol TO '1pc';
SET citus.next_shard_id TO 100400;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100;
CREATE TABLE copy_test (key int, value int);
SELECT create_distributed_table('copy_test', 'key', 'append');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT count(1) FROM copy_test;
---- all of the following tests test behavior with 2 shard placements ----
SHOW citus.shard_replication_factor;
---- kill the connection when we try to create the shard ----
SELECT citus.mitmproxy('conn.onQuery(query="worker_apply_shard_ddl_command").kill()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
SELECT count(1) FROM copy_test;
---- kill the connection when we try to start a transaction ----
SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").kill()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
SELECT count(1) FROM copy_test;
---- kill the connection when we start the COPY ----
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").kill()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
SELECT count(1) FROM copy_test;
---- kill the connection when we send the data ----
SELECT citus.mitmproxy('conn.onCopyData().kill()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
SELECT count(1) FROM copy_test;
---- cancel the connection when we send the data ----
SELECT citus.mitmproxy(format('conn.onCopyData().cancel(%s)', pg_backend_pid()));
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
SELECT count(1) FROM copy_test;
---- kill the connection when we try to get the size of the table ----
SELECT citus.mitmproxy('conn.onQuery(query="pg_table_size").kill()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
SELECT count(1) FROM copy_test;
-- we round-robin when picking which node to run pg_table_size on, this COPY runs it on
-- the other node, so the next copy will try to run it on our node
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY p.nodeport, p.placementid;
SELECT count(1) FROM copy_test;
---- kill the connection when we try to get the min, max of the table ----
SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
SELECT count(1) FROM copy_test;
---- kill the connection when we try to COMMIT ----
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY placementid;
SELECT count(1) FROM copy_test;
-- ==== Clean up, we're done here ====
SELECT citus.mitmproxy('conn.allow()');
DROP TABLE copy_test;

View File

@ -0,0 +1,142 @@
SELECT citus.mitmproxy('conn.allow()');
SET citus.shard_count = 1;
SET citus.shard_replication_factor = 2; -- one shard per worker
SET citus.multi_shard_commit_protocol TO '1pc';
SET citus.next_shard_id TO 100400;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100;
CREATE TABLE copy_test (key int, value int);
SELECT create_distributed_table('copy_test', 'key');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT count(1) FROM copy_test;
-- ==== kill the connection when we try to start a transaction ====
-- the query should abort
SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction").killall()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
-- ==== kill the connection when we try to start the COPY ====
-- the query should abort
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
-- ==== kill the connection when we first start sending data ====
-- the query should abort
SELECT citus.mitmproxy('conn.onCopyData().killall()'); -- raw rows from the client
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
-- ==== kill the connection when the worker confirms it's received the data ====
-- the query should abort
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").killall()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
-- ==== kill the connection when we try to send COMMIT ====
-- the query should succeed, and the placement should be marked inactive
SELECT citus.mitmproxy('conn.allow()');
SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN (
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
) AND shardstate = 3;
SELECT count(1) FROM copy_test;
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
-- the shard is marked invalid
SELECT citus.mitmproxy('conn.allow()');
SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN (
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
) AND shardstate = 3;
SELECT count(1) FROM copy_test;
-- ==== clean up a little bit before running the next test ====
UPDATE pg_dist_shard_placement SET shardstate = 1
WHERE shardid IN (
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
);
TRUNCATE copy_test;
-- ==== try to COPY invalid data ====
-- here the coordinator actually sends the data, but then unexpectedly closes the
-- connection when it notices the data stream is broken. Crucially, it closes the
-- connection before sending COMMIT, so no data makes it into the worker.
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV;
-- kill the connection if the coordinator sends COMMIT. It doesn't, so nothing changes
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").kill()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV;
SELECT * FROM copy_test ORDER BY key, value;
-- ==== clean up some more to prepare for tests with only one replica ====
SELECT citus.mitmproxy('conn.allow()');
TRUNCATE copy_test;
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port;
SELECT * FROM pg_dist_shard_placement WHERE shardid IN (
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
) ORDER BY nodeport, placementid;
-- ==== okay, run some tests where there's only one active shard ====
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM copy_test;
-- the worker is unreachable
SELECT citus.mitmproxy('conn.killall()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT citus.mitmproxy('conn.allow()');
SELECT * FROM copy_test;
-- the first message fails
SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").killall()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT citus.mitmproxy('conn.allow()');
SELECT * FROM copy_test;
-- the COPY message fails
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT citus.mitmproxy('conn.allow()');
SELECT * FROM copy_test;
-- the COPY data fails
SELECT citus.mitmproxy('conn.onCopyData().killall()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT citus.mitmproxy('conn.allow()');
SELECT * FROM copy_test;
-- the COMMIT fails
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT citus.mitmproxy('conn.allow()');
SELECT * FROM copy_test;
-- the placement is not marked invalid
SELECT * FROM pg_dist_shard_placement WHERE shardid IN (
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
) ORDER BY nodeport, placementid;
-- the COMMIT makes it through but the connection dies before we get a response
SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT").killall()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT citus.mitmproxy('conn.allow()');
SELECT * FROM pg_dist_shard_placement WHERE shardid IN (
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
) ORDER BY nodeport, placementid;
SELECT * FROM copy_test;
-- ==== Clean up, we're done here ====
SELECT citus.mitmproxy('conn.allow()');
DROP TABLE copy_test;