From 2fae06056a236d6eaa0e04a2e97e882e29f91527 Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Wed, 1 Aug 2018 14:09:49 -0700 Subject: [PATCH] Attempt to stabilize packet dumps and add them back it --- .../expected/failure_1pc_copy_append.out | 31 ++++++++++++ .../expected/failure_1pc_copy_hash.out | 21 ++++++++ src/test/regress/mitmscripts/fluent.py | 49 +++++++++++++++---- src/test/regress/mitmscripts/structs.py | 4 +- .../regress/sql/failure_1pc_copy_append.sql | 4 ++ .../regress/sql/failure_1pc_copy_hash.sql | 4 ++ 6 files changed, 102 insertions(+), 11 deletions(-) diff --git a/src/test/regress/expected/failure_1pc_copy_append.out b/src/test/regress/expected/failure_1pc_copy_append.out index 0866e503d..88b1f0b8e 100644 --- a/src/test/regress/expected/failure_1pc_copy_append.out +++ b/src/test/regress/expected/failure_1pc_copy_append.out @@ -16,6 +16,12 @@ SELECT create_distributed_table('copy_test', 'key', 'append'); (1 row) +SELECT citus.clear_network_traffic(); + clear_network_traffic +----------------------- + +(1 row) + COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; SELECT count(1) FROM copy_test; count @@ -23,6 +29,31 @@ SELECT count(1) FROM copy_test; 4 (1 row) +SELECT citus.dump_network_traffic(); + dump_network_trafficcoordinator,"[initial message]") + (0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") + (0,coordinator,"[""Query(query=SELECT worker_apply_shard_ddl_command (100400, 'CREATE TABLE public.copy_test (key integer, value integer)'))""]") + (0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=worker_apply_shard_ddl_command,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']") + (0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]") + (0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") + (0,coordinator,"['Query(query=COPY public.copy_test_XXXXXX FROM STDIN WITH (FORMAT BINARY))']") + (0,worker,"[""Backend(type=G,body=b'\\\\x01\\\\x00\\\\x02\\\\x00\\\\x01\\\\x00\\\\x01')""]") + (0,coordinator,"[""CopyData(data=b'PGCOPY\\\\n\\\\xff\\\\r\\\\n\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x04')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x03\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\t')"", ""CopyData(data=b'\\\\xff\\\\xff')"", 'CopyDone()']") + (0,worker,"['CommandComplete(command=COPY 4)', 'ReadyForQuery(state=in_transaction_block)']") + (0,coordinator,"[""Query(query=SELECT pg_table_size('public.copy_test_XXXXXX'))""]") + (0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=pg_table_size,tableoid=0,colattrnum=0,typoid=20,typlen=8,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") + (0,coordinator,"['Query(query=SELECT min(key), max(key) FROM public.copy_test_XXXXXX)']") + (0,worker,"[""RowDescription(fieldcount=2,fields=['F(name=min,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)', 'F(name=max,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=2,columns=[""C(length=0,value=b\\'\\')"", ""C(length=1,value=b\\'0\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") + (0,coordinator,"['Query(query=COMMIT)']") + (0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']") + (1,coordinator,"[initial message]") + (1,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") + (1,coordinator,"['Query(query=COPY (SELECT count(1) AS count FROM copy_test_100400 copy_test WHERE true) TO STDOUT)']") + (1,worker,"[""CopyOutResponse(format=0,columncount=1,columns=['Anonymous(format=0)'])"", ""CopyData(data=b'4\\\\n')"", 'CopyDone()', 'CommandComplete(command=COPY 1)', 'ReadyForQuery(state=idle)']") +(20 rows) + ---- all of the following tests test behavior with 2 shard placements ---- SHOW citus.shard_replication_factor; citus.shard_replication_factor diff --git a/src/test/regress/expected/failure_1pc_copy_hash.out b/src/test/regress/expected/failure_1pc_copy_hash.out index 5e54cafc1..8eaf3626f 100644 --- a/src/test/regress/expected/failure_1pc_copy_hash.out +++ b/src/test/regress/expected/failure_1pc_copy_hash.out @@ -16,6 +16,12 @@ SELECT create_distributed_table('copy_test', 'key'); (1 row) +SELECT citus.clear_network_traffic(); + clear_network_traffic +----------------------- + +(1 row) + COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; SELECT count(1) FROM copy_test; count @@ -23,6 +29,21 @@ SELECT count(1) FROM copy_test; 4 (1 row) +SELECT citus.dump_network_traffic(); + dump_network_trafficcoordinator,"[initial message]") + (0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") + (0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]") + (0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") + (0,coordinator,"['Query(query=COPY public.copy_test_XXXXXX (key, value) FROM STDIN WITH (FORMAT BINARY))']") + (0,worker,"[""Backend(type=G,body=b'\\\\x01\\\\x00\\\\x02\\\\x00\\\\x01\\\\x00\\\\x01')""]") + (0,coordinator,"[""CopyData(data=b'PGCOPY\\\\n\\\\xff\\\\r\\\\n\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x04')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x03\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\t')"", ""CopyData(data=b'\\\\xff\\\\xff')"", 'CopyDone()']") + (0,worker,"['CommandComplete(command=COPY 4)', 'ReadyForQuery(state=in_transaction_block)']") + (0,coordinator,"['Query(query=COMMIT)']") + (0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']") +(10 rows) + -- ==== kill the connection when we try to start a transaction ==== -- the query should abort SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction").killall()'); diff --git a/src/test/regress/mitmscripts/fluent.py b/src/test/regress/mitmscripts/fluent.py index 845f308aa..c441e3e7d 100644 --- a/src/test/regress/mitmscripts/fluent.py +++ b/src/test/regress/mitmscripts/fluent.py @@ -1,3 +1,4 @@ +from collections import defaultdict import logging import re import os @@ -10,6 +11,7 @@ import time import traceback import queue +from construct.lib import ListContainer from mitmproxy import ctx from mitmproxy.utils import strutils from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer @@ -286,6 +288,37 @@ def listen_for_commands(fifoname): pretty = structs.print(message.parsed) return emit_row(message.connection_id, message.from_client, pretty) + def all_items(queue_): + 'Pulls everything out of the queue without blocking' + try: + while True: + yield queue_.get(block=False) + except queue.Empty: + pass + + def drop_terminate_messages(messages): + ''' + Terminate() messages happen eventually, Citus doesn't feel any need to send them + immediately, so tests which embed them aren't reproducible and fail to timing + issues. Here we simply drop those messages. + ''' + def isTerminate(msg, from_client): + kind = structs.message_type(msg, from_client) + return kind == 'Terminate' + + for message in messages: + if not message.parsed: + yield message + continue + message.parsed = ListContainer([ + msg for msg in message.parsed + if not isTerminate(msg, message.from_client) + ]) + message.parsed.from_frontend = message.from_client + if len(message.parsed) == 0: + continue + yield message + def handle_recorder(recorder): global connection_count result = '' @@ -297,15 +330,13 @@ def listen_for_commands(fifoname): # this should never happen raise Exception('Unrecognized command: {}'.format(recorder.command)) - try: - results = [] - while True: - message = captured_messages.get(block=False) - if recorder.command is 'reset': - continue - results.append(emit_message(message)) - except queue.Empty: - pass + results = [] + messages = all_items(captured_messages) + messages = drop_terminate_messages(messages) + for message in messages: + if recorder.command is 'reset': + continue + results.append(emit_message(message)) result = '\n'.join(results) logging.debug('about to write to fifo') diff --git a/src/test/regress/mitmscripts/structs.py b/src/test/regress/mitmscripts/structs.py index ebeed61a3..03c19c449 100644 --- a/src/test/regress/mitmscripts/structs.py +++ b/src/test/regress/mitmscripts/structs.py @@ -65,9 +65,9 @@ class Message: if not hasattr(cls, "_msgtypes"): raise Exception('Do not call this method on Message, call it on a subclass') if isinstance(msg, cl.ListContainer): - return ValueError("do not call this on a list of messages") + raise ValueError("do not call this on a list of messages") if not isinstance(msg, cl.Container): - return ValueError("must call this on a parsed message") + raise ValueError("must call this on a parsed message") if not hasattr(msg, "_type"): return "Anonymous" if msg._type and msg._type not in cls._classes: diff --git a/src/test/regress/sql/failure_1pc_copy_append.sql b/src/test/regress/sql/failure_1pc_copy_append.sql index 3fefe7d55..a9e74fec1 100644 --- a/src/test/regress/sql/failure_1pc_copy_append.sql +++ b/src/test/regress/sql/failure_1pc_copy_append.sql @@ -9,9 +9,13 @@ ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; CREATE TABLE copy_test (key int, value int); SELECT create_distributed_table('copy_test', 'key', 'append'); +SELECT citus.clear_network_traffic(); + COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; SELECT count(1) FROM copy_test; +SELECT citus.dump_network_traffic(); + ---- all of the following tests test behavior with 2 shard placements ---- SHOW citus.shard_replication_factor; diff --git a/src/test/regress/sql/failure_1pc_copy_hash.sql b/src/test/regress/sql/failure_1pc_copy_hash.sql index 6a9efb840..83cd1adbf 100644 --- a/src/test/regress/sql/failure_1pc_copy_hash.sql +++ b/src/test/regress/sql/failure_1pc_copy_hash.sql @@ -9,9 +9,13 @@ ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; CREATE TABLE copy_test (key int, value int); SELECT create_distributed_table('copy_test', 'key'); +SELECT citus.clear_network_traffic(); + COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; SELECT count(1) FROM copy_test; +SELECT citus.dump_network_traffic(); + -- ==== kill the connection when we try to start a transaction ==== -- the query should abort