mirror of https://github.com/citusdata/citus.git
Add alternative output for pg14 in check-failure
parent
409132a374
commit
42e41900b3
|
@ -35,7 +35,7 @@ SELECT citus.dump_network_traffic();
|
||||||
dump_network_traffic
|
dump_network_traffic
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0,coordinator,"[initial message]")
|
(0,coordinator,"[initial message]")
|
||||||
(0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']")
|
(0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(default_transaction_read_only=off)', 'ParameterStatus(in_hot_standby=off)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']")
|
||||||
(0,coordinator,"[""Query(query=SELECT worker_apply_shard_ddl_command (100400, 'CREATE TABLE public.copy_test (key integer, value integer) '))""]")
|
(0,coordinator,"[""Query(query=SELECT worker_apply_shard_ddl_command (100400, 'CREATE TABLE public.copy_test (key integer, value integer) '))""]")
|
||||||
(0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=worker_apply_shard_ddl_command,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']")
|
(0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=worker_apply_shard_ddl_command,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']")
|
||||||
(0,coordinator,"[""Query(query=SELECT worker_apply_shard_ddl_command (100400, 'ALTER TABLE public.copy_test OWNER TO postgres'))""]")
|
(0,coordinator,"[""Query(query=SELECT worker_apply_shard_ddl_command (100400, 'ALTER TABLE public.copy_test OWNER TO postgres'))""]")
|
||||||
|
|
|
@ -0,0 +1,312 @@
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- do not cache any connections
|
||||||
|
SET citus.max_cached_conns_per_worker TO 1;
|
||||||
|
SET citus.shard_count = 1;
|
||||||
|
SET citus.shard_replication_factor = 2; -- one shard per worker
|
||||||
|
SET citus.multi_shard_commit_protocol TO '1pc';
|
||||||
|
SET citus.next_shard_id TO 100400;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100;
|
||||||
|
CREATE TABLE copy_test (key int, value int);
|
||||||
|
SELECT create_distributed_table('copy_test', 'key', 'append');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus.clear_network_traffic();
|
||||||
|
clear_network_traffic
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
SELECT count(1) FROM copy_test;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus.dump_network_traffic();
|
||||||
|
dump_network_traffic
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0,coordinator,"[initial message]")
|
||||||
|
(0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']")
|
||||||
|
(0,coordinator,"[""Query(query=SELECT worker_apply_shard_ddl_command (100400, 'CREATE TABLE public.copy_test (key integer, value integer) '))""]")
|
||||||
|
(0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=worker_apply_shard_ddl_command,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']")
|
||||||
|
(0,coordinator,"[""Query(query=SELECT worker_apply_shard_ddl_command (100400, 'ALTER TABLE public.copy_test OWNER TO postgres'))""]")
|
||||||
|
(0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=worker_apply_shard_ddl_command,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']")
|
||||||
|
(0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]")
|
||||||
|
(0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
|
||||||
|
(0,coordinator,"[""Query(query=COPY public.copy_test_XXXXXX FROM STDIN WITH (format 'binary'))""]")
|
||||||
|
(0,worker,"[""Backend(type=G,body=b'\\\\x01\\\\x00\\\\x02\\\\x00\\\\x01\\\\x00\\\\x01')""]")
|
||||||
|
(0,coordinator,"[""CopyData(data=b'PGCOPY\\\\n\\\\xff\\\\r\\\\n\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x04')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x03\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\t')"", ""CopyData(data=b'\\\\xff\\\\xff')"", 'CopyDone()']")
|
||||||
|
(0,worker,"['CommandComplete(command=COPY 4)', 'ReadyForQuery(state=in_transaction_block)']")
|
||||||
|
(0,coordinator,"[""Query(query=SELECT pg_table_size('public.copy_test_XXXXXX'))""]")
|
||||||
|
(0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=pg_table_size,tableoid=0,colattrnum=0,typoid=20,typlen=8,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
|
||||||
|
(0,coordinator,"['Query(query=SELECT min(key), max(key) FROM public.copy_test_XXXXXX)']")
|
||||||
|
(0,worker,"[""RowDescription(fieldcount=2,fields=['F(name=min,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)', 'F(name=max,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=2,columns=[""C(length=0,value=b\\'\\')"", ""C(length=1,value=b\\'0\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
|
||||||
|
(0,coordinator,"['Query(query=COMMIT)']")
|
||||||
|
(0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']")
|
||||||
|
(0,coordinator,"['Query(query=SELECT count(1) AS count FROM public.copy_test_XXXXXX copy_test WHERE true)']")
|
||||||
|
(0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=count,tableoid=0,colattrnum=0,typoid=20,typlen=8,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']")
|
||||||
|
(20 rows)
|
||||||
|
|
||||||
|
---- all of the following tests test behavior with 2 shard placements ----
|
||||||
|
SHOW citus.shard_replication_factor;
|
||||||
|
citus.shard_replication_factor
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
---- kill the connection when we try to create the shard ----
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="worker_apply_shard_ddl_command").kill()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
|
||||||
|
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
|
||||||
|
ORDER BY placementid;
|
||||||
|
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT count(1) FROM copy_test;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
---- kill the connection when we try to start a transaction ----
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").kill()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
connection not open
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
ERROR: failure on connection marked as essential: localhost:xxxxx
|
||||||
|
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
|
||||||
|
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
|
||||||
|
ORDER BY placementid;
|
||||||
|
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT count(1) FROM copy_test;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
---- kill the connection when we start the COPY ----
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").kill()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
|
||||||
|
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
|
||||||
|
ORDER BY placementid;
|
||||||
|
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT count(1) FROM copy_test;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
---- kill the connection when we send the data ----
|
||||||
|
SELECT citus.mitmproxy('conn.onCopyData().kill()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
ERROR: failed to COPY to shard xxxxx on localhost:xxxxx
|
||||||
|
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
|
||||||
|
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
|
||||||
|
ORDER BY placementid;
|
||||||
|
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT|COPY").kill()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(1) FROM copy_test;
|
||||||
|
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
---- cancel the connection when we send the data ----
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT|COPY").cancel(' || pg_backend_pid() || ')');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
|
||||||
|
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
|
||||||
|
ORDER BY placementid;
|
||||||
|
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT count(1) FROM copy_test;
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
---- kill the connection when we try to get the size of the table ----
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="pg_table_size").kill()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
connection not open
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
ERROR: failure on connection marked as essential: localhost:xxxxx
|
||||||
|
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
|
||||||
|
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
|
||||||
|
ORDER BY placementid;
|
||||||
|
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT count(1) FROM copy_test;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
---- kill the connection when we try to get the min, max of the table ----
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
connection not open
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
ERROR: failure on connection marked as essential: localhost:xxxxx
|
||||||
|
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
|
||||||
|
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
|
||||||
|
ORDER BY placementid;
|
||||||
|
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT count(1) FROM copy_test;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
---- kill the connection when we try to COMMIT ----
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
connection not open
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
WARNING: failed to commit transaction on localhost:xxxxx
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
connection not open
|
||||||
|
connection not open
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
|
||||||
|
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
|
||||||
|
ORDER BY placementid;
|
||||||
|
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
|
||||||
|
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
|
||||||
|
copy_test | 100408 | t | 0 | 3 | 100408 | 1 | 8192 | localhost | 57637 | 112
|
||||||
|
copy_test | 100408 | t | 0 | 3 | 100408 | 3 | 8192 | localhost | 9060 | 113
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
SELECT count(1) FROM copy_test;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
8
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- ==== Clean up, we're done here ====
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE copy_test;
|
|
@ -36,7 +36,7 @@ SELECT citus.dump_network_traffic();
|
||||||
dump_network_traffic
|
dump_network_traffic
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0,coordinator,"[initial message]")
|
(0,coordinator,"[initial message]")
|
||||||
(0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']")
|
(0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(default_transaction_read_only=off)', 'ParameterStatus(in_hot_standby=off)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']")
|
||||||
(0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]")
|
(0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]")
|
||||||
(0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
|
(0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
|
||||||
(0,coordinator,"[""Query(query=COPY public.copy_test_XXXXXX (key, value) FROM STDIN WITH (format 'binary'))""]")
|
(0,coordinator,"[""Query(query=COPY public.copy_test_XXXXXX (key, value) FROM STDIN WITH (format 'binary'))""]")
|
||||||
|
@ -46,7 +46,7 @@ SELECT citus.dump_network_traffic();
|
||||||
(0,coordinator,"['Query(query=COMMIT)']")
|
(0,coordinator,"['Query(query=COMMIT)']")
|
||||||
(0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']")
|
(0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']")
|
||||||
(1,coordinator,"[initial message]")
|
(1,coordinator,"[initial message]")
|
||||||
(1,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']")
|
(1,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(default_transaction_read_only=off)', 'ParameterStatus(in_hot_standby=off)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']")
|
||||||
(1,coordinator,"['Query(query=SELECT count(1) AS count FROM public.copy_test_XXXXXX copy_test)']")
|
(1,coordinator,"['Query(query=SELECT count(1) AS count FROM public.copy_test_XXXXXX copy_test)']")
|
||||||
(1,worker,"[""RowDescription(fieldcount=1,fields=['F(name=count,tableoid=0,colattrnum=0,typoid=20,typlen=8,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']")
|
(1,worker,"[""RowDescription(fieldcount=1,fields=['F(name=count,tableoid=0,colattrnum=0,typoid=20,typlen=8,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']")
|
||||||
(14 rows)
|
(14 rows)
|
||||||
|
|
|
@ -0,0 +1,440 @@
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- do not cache any connections
|
||||||
|
SET citus.max_cached_conns_per_worker TO 0;
|
||||||
|
SET citus.shard_count = 1;
|
||||||
|
SET citus.shard_replication_factor = 2; -- one shard per worker
|
||||||
|
SET citus.multi_shard_commit_protocol TO '1pc';
|
||||||
|
SET citus.next_shard_id TO 100400;
|
||||||
|
SET citus.max_cached_conns_per_worker TO 0;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100;
|
||||||
|
CREATE TABLE copy_test (key int, value int);
|
||||||
|
SELECT create_distributed_table('copy_test', 'key');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus.clear_network_traffic();
|
||||||
|
clear_network_traffic
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
SELECT count(1) FROM copy_test;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus.dump_network_traffic();
|
||||||
|
dump_network_traffic
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0,coordinator,"[initial message]")
|
||||||
|
(0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']")
|
||||||
|
(0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]")
|
||||||
|
(0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
|
||||||
|
(0,coordinator,"[""Query(query=COPY public.copy_test_XXXXXX (key, value) FROM STDIN WITH (format 'binary'))""]")
|
||||||
|
(0,worker,"[""Backend(type=G,body=b'\\\\x01\\\\x00\\\\x02\\\\x00\\\\x01\\\\x00\\\\x01')""]")
|
||||||
|
(0,coordinator,"[""CopyData(data=b'PGCOPY\\\\n\\\\xff\\\\r\\\\n\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x04')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x03\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\t')"", ""CopyData(data=b'\\\\xff\\\\xff')"", 'CopyDone()']")
|
||||||
|
(0,worker,"['CommandComplete(command=COPY 4)', 'ReadyForQuery(state=in_transaction_block)']")
|
||||||
|
(0,coordinator,"['Query(query=COMMIT)']")
|
||||||
|
(0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']")
|
||||||
|
(1,coordinator,"[initial message]")
|
||||||
|
(1,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']")
|
||||||
|
(1,coordinator,"['Query(query=SELECT count(1) AS count FROM public.copy_test_XXXXXX copy_test)']")
|
||||||
|
(1,worker,"[""RowDescription(fieldcount=1,fields=['F(name=count,tableoid=0,colattrnum=0,typoid=20,typlen=8,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']")
|
||||||
|
(14 rows)
|
||||||
|
|
||||||
|
-- ==== kill the connection when we try to start a transaction ====
|
||||||
|
-- the query should abort
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction").killall()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
connection not open
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
COPY copy_test, line 1: "0, 0"
|
||||||
|
ERROR: failure on connection marked as essential: localhost:xxxxx
|
||||||
|
CONTEXT: COPY copy_test, line 1: "0, 0"
|
||||||
|
-- ==== kill the connection when we try to start the COPY ====
|
||||||
|
-- the query should abort
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
COPY copy_test, line 1: "0, 0"
|
||||||
|
-- ==== kill the connection when we first start sending data ====
|
||||||
|
-- the query should abort
|
||||||
|
SELECT citus.mitmproxy('conn.onCopyData().killall()'); -- raw rows from the client
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
ERROR: failed to COPY to shard xxxxx on localhost:xxxxx
|
||||||
|
-- ==== kill the connection when the worker confirms it's received the data ====
|
||||||
|
-- the query should abort
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").killall()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
ERROR: failed to COPY to shard xxxxx on localhost:xxxxx
|
||||||
|
-- ==== kill the connection when we try to send COMMIT ====
|
||||||
|
-- the query should succeed, and the placement should be marked inactive
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN (
|
||||||
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
|
||||||
|
) AND shardstate = 3;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(1) FROM copy_test;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
connection not open
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
WARNING: failed to commit transaction on localhost:xxxxx
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
connection not open
|
||||||
|
connection not open
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
-- the shard is marked invalid
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN (
|
||||||
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
|
||||||
|
) AND shardstate = 3;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(1) FROM copy_test;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
8
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- ==== clean up a little bit before running the next test ====
|
||||||
|
UPDATE pg_dist_shard_placement SET shardstate = 1
|
||||||
|
WHERE shardid IN (
|
||||||
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
|
||||||
|
);
|
||||||
|
TRUNCATE copy_test;
|
||||||
|
-- ==== try to COPY invalid data ====
|
||||||
|
-- here the coordinator actually sends the data, but then unexpectedly closes the
|
||||||
|
-- connection when it notices the data stream is broken. Crucially, it closes the
|
||||||
|
-- connection before sending COMMIT, so no data makes it into the worker.
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV;
|
||||||
|
ERROR: missing data for column "value"
|
||||||
|
CONTEXT: COPY copy_test, line 5: "10"
|
||||||
|
-- kill the connection if the coordinator sends COMMIT. It doesn't, so nothing changes
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").kill()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV;
|
||||||
|
ERROR: missing data for column "value"
|
||||||
|
CONTEXT: COPY copy_test, line 5: "10"
|
||||||
|
SELECT * FROM copy_test ORDER BY key, value;
|
||||||
|
key | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- ==== clean up some more to prepare for tests with only one replica ====
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
TRUNCATE copy_test;
|
||||||
|
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port;
|
||||||
|
SELECT * FROM pg_dist_shard_placement WHERE shardid IN (
|
||||||
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
|
||||||
|
) ORDER BY nodeport, placementid;
|
||||||
|
shardid | shardstate | shardlength | nodename | nodeport | placementid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
100400 | 1 | 0 | localhost | 9060 | 100
|
||||||
|
100400 | 3 | 0 | localhost | 57637 | 101
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- ==== okay, run some tests where there's only one active shard ====
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
SELECT * FROM copy_test;
|
||||||
|
key | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0
|
||||||
|
1 | 1
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- the worker is unreachable
|
||||||
|
SELECT citus.mitmproxy('conn.killall()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: COPY copy_test, line 1: "0, 0"
|
||||||
|
ERROR: could not connect to any active placements
|
||||||
|
CONTEXT: COPY copy_test, line 1: "0, 0"
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM copy_test;
|
||||||
|
key | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0
|
||||||
|
1 | 1
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- the first message fails
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").killall()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
connection not open
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
COPY copy_test, line 1: "0, 0"
|
||||||
|
ERROR: failure on connection marked as essential: localhost:xxxxx
|
||||||
|
CONTEXT: COPY copy_test, line 1: "0, 0"
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM copy_test;
|
||||||
|
key | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0
|
||||||
|
1 | 1
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- the COPY message fails
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
COPY copy_test, line 1: "0, 0"
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM copy_test;
|
||||||
|
key | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0
|
||||||
|
1 | 1
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- the COPY data fails
|
||||||
|
SELECT citus.mitmproxy('conn.onCopyData().killall()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
ERROR: failed to COPY to shard xxxxx on localhost:xxxxx
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM copy_test;
|
||||||
|
key | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0
|
||||||
|
1 | 1
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- the COMMIT fails
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
connection not open
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
WARNING: failed to commit transaction on localhost:xxxxx
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
connection not open
|
||||||
|
connection not open
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
WARNING: could not commit transaction for shard xxxxx on any active node
|
||||||
|
ERROR: could not commit transaction on any active node
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM copy_test;
|
||||||
|
key | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0
|
||||||
|
1 | 1
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- the placement is not marked invalid
|
||||||
|
SELECT * FROM pg_dist_shard_placement WHERE shardid IN (
|
||||||
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
|
||||||
|
) ORDER BY nodeport, placementid;
|
||||||
|
shardid | shardstate | shardlength | nodename | nodeport | placementid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
100400 | 1 | 0 | localhost | 9060 | 100
|
||||||
|
100400 | 3 | 0 | localhost | 57637 | 101
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- the COMMIT makes it through but the connection dies before we get a response
|
||||||
|
SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT").killall()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
connection not open
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
WARNING: failed to commit transaction on localhost:xxxxx
|
||||||
|
WARNING: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
connection not open
|
||||||
|
connection not open
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
WARNING: could not commit transaction for shard xxxxx on any active node
|
||||||
|
ERROR: could not commit transaction on any active node
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM pg_dist_shard_placement WHERE shardid IN (
|
||||||
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass
|
||||||
|
) ORDER BY nodeport, placementid;
|
||||||
|
shardid | shardstate | shardlength | nodename | nodeport | placementid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
100400 | 1 | 0 | localhost | 9060 | 100
|
||||||
|
100400 | 3 | 0 | localhost | 57637 | 101
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT * FROM copy_test;
|
||||||
|
key | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0
|
||||||
|
1 | 1
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
0 | 0
|
||||||
|
1 | 1
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
-- ==== Clean up, we're done here ====
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE copy_test;
|
Loading…
Reference in New Issue