diff --git a/src/test/regress/expected/failure_1pc_copy_append.out b/src/test/regress/expected/failure_1pc_copy_append.out index 7ccbbb4f1..eacce5ee5 100644 --- a/src/test/regress/expected/failure_1pc_copy_append.out +++ b/src/test/regress/expected/failure_1pc_copy_append.out @@ -35,7 +35,7 @@ SELECT citus.dump_network_traffic(); dump_network_traffic --------------------------------------------------------------------- (0,coordinator,"[initial message]") - (0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") + (0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(default_transaction_read_only=off)', 'ParameterStatus(in_hot_standby=off)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") (0,coordinator,"[""Query(query=SELECT worker_apply_shard_ddl_command (100400, 'CREATE TABLE public.copy_test (key integer, value integer) '))""]") (0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=worker_apply_shard_ddl_command,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']") (0,coordinator,"[""Query(query=SELECT worker_apply_shard_ddl_command (100400, 'ALTER TABLE public.copy_test OWNER TO postgres'))""]") diff --git a/src/test/regress/expected/failure_1pc_copy_append_0.out b/src/test/regress/expected/failure_1pc_copy_append_0.out new file mode 100644 index 000000000..7ccbbb4f1 --- /dev/null +++ b/src/test/regress/expected/failure_1pc_copy_append_0.out @@ -0,0 +1,312 @@ +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +-- do not cache any connections +SET citus.max_cached_conns_per_worker TO 1; +SET citus.shard_count = 1; +SET citus.shard_replication_factor = 2; -- one shard per worker +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.next_shard_id TO 100400; +ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; +CREATE TABLE copy_test (key int, value int); +SELECT create_distributed_table('copy_test', 'key', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT citus.clear_network_traffic(); + clear_network_traffic +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT count(1) FROM copy_test; + count +--------------------------------------------------------------------- + 4 +(1 row) + +SELECT citus.dump_network_traffic(); + dump_network_traffic +--------------------------------------------------------------------- + (0,coordinator,"[initial message]") + (0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") + (0,coordinator,"[""Query(query=SELECT worker_apply_shard_ddl_command (100400, 'CREATE TABLE public.copy_test (key integer, value integer) '))""]") + (0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=worker_apply_shard_ddl_command,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']") + (0,coordinator,"[""Query(query=SELECT worker_apply_shard_ddl_command (100400, 'ALTER TABLE public.copy_test OWNER TO postgres'))""]") + (0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=worker_apply_shard_ddl_command,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']") + (0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]") + (0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") + (0,coordinator,"[""Query(query=COPY public.copy_test_XXXXXX FROM STDIN WITH (format 'binary'))""]") + (0,worker,"[""Backend(type=G,body=b'\\\\x01\\\\x00\\\\x02\\\\x00\\\\x01\\\\x00\\\\x01')""]") + (0,coordinator,"[""CopyData(data=b'PGCOPY\\\\n\\\\xff\\\\r\\\\n\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x04')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x03\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\t')"", ""CopyData(data=b'\\\\xff\\\\xff')"", 'CopyDone()']") + (0,worker,"['CommandComplete(command=COPY 4)', 'ReadyForQuery(state=in_transaction_block)']") + (0,coordinator,"[""Query(query=SELECT pg_table_size('public.copy_test_XXXXXX'))""]") + (0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=pg_table_size,tableoid=0,colattrnum=0,typoid=20,typlen=8,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") + (0,coordinator,"['Query(query=SELECT min(key), max(key) FROM public.copy_test_XXXXXX)']") + (0,worker,"[""RowDescription(fieldcount=2,fields=['F(name=min,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)', 'F(name=max,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=2,columns=[""C(length=0,value=b\\'\\')"", ""C(length=1,value=b\\'0\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") + (0,coordinator,"['Query(query=COMMIT)']") + (0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']") + (0,coordinator,"['Query(query=SELECT count(1) AS count FROM public.copy_test_XXXXXX copy_test WHERE true)']") + (0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=count,tableoid=0,colattrnum=0,typoid=20,typlen=8,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']") +(20 rows) + +---- all of the following tests test behavior with 2 shard placements ---- +SHOW citus.shard_replication_factor; + citus.shard_replication_factor +--------------------------------------------------------------------- + 2 +(1 row) + +---- kill the connection when we try to create the shard ---- +SELECT citus.mitmproxy('conn.onQuery(query="worker_apply_shard_ddl_command").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:xxxxx +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------------------------------------------------------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +--------------------------------------------------------------------- + 4 +(1 row) + +---- kill the connection when we try to start a transaction ---- +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +connection not open +CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------------------------------------------------------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +--------------------------------------------------------------------- + 4 +(1 row) + +---- kill the connection when we start the COPY ---- +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:xxxxx +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------------------------------------------------------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +--------------------------------------------------------------------- + 4 +(1 row) + +---- kill the connection when we send the data ---- +SELECT citus.mitmproxy('conn.onCopyData().kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: failed to COPY to shard xxxxx on localhost:xxxxx +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------------------------------------------------------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 +(2 rows) + +SELECT citus.mitmproxy('conn.onQuery(query="SELECT|COPY").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT count(1) FROM copy_test; +WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. + count +--------------------------------------------------------------------- + 4 +(1 row) + +---- cancel the connection when we send the data ---- +SELECT citus.mitmproxy('conn.onQuery(query="SELECT|COPY").cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: canceling statement due to user request +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------------------------------------------------------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; +ERROR: canceling statement due to user request +---- kill the connection when we try to get the size of the table ---- +SELECT citus.mitmproxy('conn.onQuery(query="pg_table_size").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:xxxxx +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +connection not open +CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------------------------------------------------------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +--------------------------------------------------------------------- + 4 +(1 row) + +---- kill the connection when we try to get the min, max of the table ---- +SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:xxxxx +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +connection not open +CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------------------------------------------------------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +--------------------------------------------------------------------- + 4 +(1 row) + +---- kill the connection when we try to COMMIT ---- +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: failed to commit transaction on localhost:xxxxx +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +connection not open +connection not open +CONTEXT: while executing command on localhost:xxxxx +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------------------------------------------------------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 + copy_test | 100408 | t | 0 | 3 | 100408 | 1 | 8192 | localhost | 57637 | 112 + copy_test | 100408 | t | 0 | 3 | 100408 | 3 | 8192 | localhost | 9060 | 113 +(4 rows) + +SELECT count(1) FROM copy_test; + count +--------------------------------------------------------------------- + 8 +(1 row) + +-- ==== Clean up, we're done here ==== +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +DROP TABLE copy_test; diff --git a/src/test/regress/expected/failure_1pc_copy_hash.out b/src/test/regress/expected/failure_1pc_copy_hash.out index fe521c729..e99c6f560 100644 --- a/src/test/regress/expected/failure_1pc_copy_hash.out +++ b/src/test/regress/expected/failure_1pc_copy_hash.out @@ -36,7 +36,7 @@ SELECT citus.dump_network_traffic(); dump_network_traffic --------------------------------------------------------------------- (0,coordinator,"[initial message]") - (0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") + (0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(default_transaction_read_only=off)', 'ParameterStatus(in_hot_standby=off)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") (0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]") (0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") (0,coordinator,"[""Query(query=COPY public.copy_test_XXXXXX (key, value) FROM STDIN WITH (format 'binary'))""]") @@ -46,7 +46,7 @@ SELECT citus.dump_network_traffic(); (0,coordinator,"['Query(query=COMMIT)']") (0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']") (1,coordinator,"[initial message]") - (1,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") + (1,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(default_transaction_read_only=off)', 'ParameterStatus(in_hot_standby=off)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") (1,coordinator,"['Query(query=SELECT count(1) AS count FROM public.copy_test_XXXXXX copy_test)']") (1,worker,"[""RowDescription(fieldcount=1,fields=['F(name=count,tableoid=0,colattrnum=0,typoid=20,typlen=8,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']") (14 rows) diff --git a/src/test/regress/expected/failure_1pc_copy_hash_0.out b/src/test/regress/expected/failure_1pc_copy_hash_0.out new file mode 100644 index 000000000..fe521c729 --- /dev/null +++ b/src/test/regress/expected/failure_1pc_copy_hash_0.out @@ -0,0 +1,440 @@ +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +-- do not cache any connections +SET citus.max_cached_conns_per_worker TO 0; +SET citus.shard_count = 1; +SET citus.shard_replication_factor = 2; -- one shard per worker +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.next_shard_id TO 100400; +SET citus.max_cached_conns_per_worker TO 0; +ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; +CREATE TABLE copy_test (key int, value int); +SELECT create_distributed_table('copy_test', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT citus.clear_network_traffic(); + clear_network_traffic +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT count(1) FROM copy_test; + count +--------------------------------------------------------------------- + 4 +(1 row) + +SELECT citus.dump_network_traffic(); + dump_network_traffic +--------------------------------------------------------------------- + (0,coordinator,"[initial message]") + (0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") + (0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]") + (0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") + (0,coordinator,"[""Query(query=COPY public.copy_test_XXXXXX (key, value) FROM STDIN WITH (format 'binary'))""]") + (0,worker,"[""Backend(type=G,body=b'\\\\x01\\\\x00\\\\x02\\\\x00\\\\x01\\\\x00\\\\x01')""]") + (0,coordinator,"[""CopyData(data=b'PGCOPY\\\\n\\\\xff\\\\r\\\\n\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x04')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x03\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\t')"", ""CopyData(data=b'\\\\xff\\\\xff')"", 'CopyDone()']") + (0,worker,"['CommandComplete(command=COPY 4)', 'ReadyForQuery(state=in_transaction_block)']") + (0,coordinator,"['Query(query=COMMIT)']") + (0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']") + (1,coordinator,"[initial message]") + (1,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") + (1,coordinator,"['Query(query=SELECT count(1) AS count FROM public.copy_test_XXXXXX copy_test)']") + (1,worker,"[""RowDescription(fieldcount=1,fields=['F(name=count,tableoid=0,colattrnum=0,typoid=20,typlen=8,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']") +(14 rows) + +-- ==== kill the connection when we try to start a transaction ==== +-- the query should abort +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +connection not open +CONTEXT: while executing command on localhost:xxxxx +COPY copy_test, line 1: "0, 0" +ERROR: failure on connection marked as essential: localhost:xxxxx +CONTEXT: COPY copy_test, line 1: "0, 0" +-- ==== kill the connection when we try to start the COPY ==== +-- the query should abort +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:xxxxx +COPY copy_test, line 1: "0, 0" +-- ==== kill the connection when we first start sending data ==== +-- the query should abort +SELECT citus.mitmproxy('conn.onCopyData().killall()'); -- raw rows from the client + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: failed to COPY to shard xxxxx on localhost:xxxxx +-- ==== kill the connection when the worker confirms it's received the data ==== +-- the query should abort +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: failed to COPY to shard xxxxx on localhost:xxxxx +-- ==== kill the connection when we try to send COMMIT ==== +-- the query should succeed, and the placement should be marked inactive +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) AND shardstate = 3; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(1) FROM copy_test; + count +--------------------------------------------------------------------- + 4 +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: failed to commit transaction on localhost:xxxxx +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +connection not open +connection not open +CONTEXT: while executing command on localhost:xxxxx +-- the shard is marked invalid +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) AND shardstate = 3; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT count(1) FROM copy_test; + count +--------------------------------------------------------------------- + 8 +(1 row) + +-- ==== clean up a little bit before running the next test ==== +UPDATE pg_dist_shard_placement SET shardstate = 1 +WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +); +TRUNCATE copy_test; +-- ==== try to COPY invalid data ==== +-- here the coordinator actually sends the data, but then unexpectedly closes the +-- connection when it notices the data stream is broken. Crucially, it closes the +-- connection before sending COMMIT, so no data makes it into the worker. +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV; +ERROR: missing data for column "value" +CONTEXT: COPY copy_test, line 5: "10" +-- kill the connection if the coordinator sends COMMIT. It doesn't, so nothing changes +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV; +ERROR: missing data for column "value" +CONTEXT: COPY copy_test, line 5: "10" +SELECT * FROM copy_test ORDER BY key, value; + key | value +--------------------------------------------------------------------- +(0 rows) + +-- ==== clean up some more to prepare for tests with only one replica ==== +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +TRUNCATE copy_test; +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port; +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; + shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------------------------------------------------------------- + 100400 | 1 | 0 | localhost | 9060 | 100 + 100400 | 3 | 0 | localhost | 57637 | 101 +(2 rows) + +-- ==== okay, run some tests where there's only one active shard ==== +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM copy_test; + key | value +--------------------------------------------------------------------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the worker is unreachable +SELECT citus.mitmproxy('conn.killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: COPY copy_test, line 1: "0, 0" +ERROR: could not connect to any active placements +CONTEXT: COPY copy_test, line 1: "0, 0" +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM copy_test; + key | value +--------------------------------------------------------------------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the first message fails +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +connection not open +CONTEXT: while executing command on localhost:xxxxx +COPY copy_test, line 1: "0, 0" +ERROR: failure on connection marked as essential: localhost:xxxxx +CONTEXT: COPY copy_test, line 1: "0, 0" +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM copy_test; + key | value +--------------------------------------------------------------------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the COPY message fails +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:xxxxx +COPY copy_test, line 1: "0, 0" +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM copy_test; + key | value +--------------------------------------------------------------------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the COPY data fails +SELECT citus.mitmproxy('conn.onCopyData().killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: failed to COPY to shard xxxxx on localhost:xxxxx +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM copy_test; + key | value +--------------------------------------------------------------------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the COMMIT fails +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: failed to commit transaction on localhost:xxxxx +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +connection not open +connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: could not commit transaction for shard xxxxx on any active node +ERROR: could not commit transaction on any active node +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM copy_test; + key | value +--------------------------------------------------------------------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the placement is not marked invalid +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; + shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------------------------------------------------------------- + 100400 | 1 | 0 | localhost | 9060 | 100 + 100400 | 3 | 0 | localhost | 57637 | 101 +(2 rows) + +-- the COMMIT makes it through but the connection dies before we get a response +SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: failed to commit transaction on localhost:xxxxx +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +connection not open +connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: could not commit transaction for shard xxxxx on any active node +ERROR: could not commit transaction on any active node +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; + shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------------------------------------------------------------- + 100400 | 1 | 0 | localhost | 9060 | 100 + 100400 | 3 | 0 | localhost | 57637 | 101 +(2 rows) + +SELECT * FROM copy_test; + key | value +--------------------------------------------------------------------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(8 rows) + +-- ==== Clean up, we're done here ==== +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +DROP TABLE copy_test;