From 4dd1f5784bde7e3b210d2452941dd5d6e57b1a1c Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Mon, 10 Dec 2018 14:13:30 +0300 Subject: [PATCH] Failure&cancellation tests for mx metadata sync Failure&Cancellation tests for initial start_metadata_sync() calls to worker and DDL queries that send metadata syncing messages to an MX node Also adds message type definitions for messages that are exchanged during metadata syncing - --- .../expected/failure_mx_metadata_sync.out | 170 ++++++++++++++++++ src/test/regress/failure_schedule | 1 + src/test/regress/mitmscripts/structs.py | 118 ++++++++++++ .../regress/sql/failure_mx_metadata_sync.sql | 69 +++++++ 4 files changed, 358 insertions(+) create mode 100644 src/test/regress/expected/failure_mx_metadata_sync.out create mode 100644 src/test/regress/sql/failure_mx_metadata_sync.sql diff --git a/src/test/regress/expected/failure_mx_metadata_sync.out b/src/test/regress/expected/failure_mx_metadata_sync.out new file mode 100644 index 000000000..c47fbe278 --- /dev/null +++ b/src/test/regress/expected/failure_mx_metadata_sync.out @@ -0,0 +1,170 @@ +-- +-- failure_mx_metadata_sync.sql +-- +CREATE SCHEMA IF NOT EXISTS mx_metadata_sync; +SET SEARCH_PATH = mx_metadata_sync; +SET citus.shard_count TO 2; +SET citus.next_shard_id TO 16000000; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; +SELECT pg_backend_pid() as pid \gset +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +CREATE TABLE t1 (id int PRIMARY KEY); +SELECT create_distributed_table('t1', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x); +-- Initial metadata status +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; + hasmetadata +------------- + f +(1 row) + +-- Failure to set groupid in the worker +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); +ERROR: canceling statement due to user request +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- Failure to drop all tables in pg_dist_partition +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_drop_distributed_table").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); +ERROR: canceling statement due to user request +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_drop_distributed_table").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- Failure to truncate pg_dist_node in the worker +SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE pg_dist_node CASCADE").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); +ERROR: canceling statement due to user request +SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE pg_dist_node CASCADE").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- Failure to populate pg_dist_node in the worker +SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); +ERROR: canceling statement due to user request +SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +-- Verify that coordinator knows worker does not have valid metadata +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; + hasmetadata +------------- + f +(1 row) + +-- Verify we can sync metadata after unsuccessful attempts +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; + hasmetadata +------------- + t +(1 row) + +-- Check failures on DDL command propagation +CREATE TABLE t2 (id int PRIMARY KEY); +SELECT citus.mitmproxy('conn.onParse(query="^INSERT INTO pg_dist_placement").kill()'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('t2', 'id'); +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +SELECT citus.mitmproxy('conn.onParse(query="^INSERT INTO pg_dist_shard").cancel(' || :pid || ')'); + mitmproxy +----------- + +(1 row) + +SELECT create_distributed_table('t2', 'id'); +ERROR: canceling statement due to user request +-- Verify that the table was not distributed +SELECT count(*) > 0 AS is_table_distributed +FROM pg_dist_partition +WHERE logicalrelid='t2'::regclass; + is_table_distributed +---------------------- + f +(1 row) + +DROP TABLE t1; +DROP TABLE t2; +DROP SCHEMA mx_metadata_sync CASCADE; diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index dd7c007a6..7afdd3f26 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -29,3 +29,4 @@ test: failure_insert_select_pushdown test: failure_single_mod test: failure_savepoints test: failure_multi_row_insert +test: failure_mx_metadata_sync diff --git a/src/test/regress/mitmscripts/structs.py b/src/test/regress/mitmscripts/structs.py index 03c19c449..59a288b81 100644 --- a/src/test/regress/mitmscripts/structs.py +++ b/src/test/regress/mitmscripts/structs.py @@ -8,6 +8,9 @@ import construct.lib as cl import re +# For all possible message formats see: +# https://www.postgresql.org/docs/current/protocol-message-formats.html + class MessageMeta(type): def __init__(cls, name, bases, namespace): ''' @@ -293,6 +296,121 @@ class BackendKeyData(BackendMessage): # Both of these should be censored, for reproducible regression test output return "BackendKeyData(XXX)" +class NoticeResponse(BackendMessage): + key = 'N' + struct = Struct( + "notices" / GreedyRange( + Struct( + "key" / Enum(Byte, + severity=ord('S'), + _severity_not_localized=ord('V'), + _sql_state=ord('C'), + message=ord('M'), + detail=ord('D'), + hint=ord('H'), + _position=ord('P'), + _internal_position=ord('p'), + _internal_query=ord('q'), + _where=ord('W'), + schema_name=ord('s'), + table_name=ord('t'), + column_name=ord('c'), + data_type_name=ord('d'), + constraint_name=ord('n'), + _file_name=ord('F'), + _line_no=ord('L'), + _routine_name=ord('R') + ), + "value" / CString("ASCII") + ) + ) + ) + + def print(message): + return "NoticeResponse({})".format(", ".join( + "{}={}".format(response.key, response.value) + for response in message.notices + if not response.key.startswith('_') + )) + +class Parse(FrontendMessage): + key = 'P' + struct = Struct( + "name" / CString("ASCII"), + "query" / CString("ASCII"), + "_parametercount" / Int16ub, + "parameters" / Array( + this._parametercount, + Int32ub + ) + ) + +class ParseComplete(BackendMessage): + key = '1' + struct = Struct() + +class Bind(FrontendMessage): + key = 'B' + struct = Struct( + "destination_portal" / CString("ASCII"), + "prepared_statement" / CString("ASCII"), + "_parameter_format_code_count" / Int16ub, + "parameter_format_codes" / Array(this._parameter_format_code_count, + Int16ub), + "_parameter_value_count" / Int16ub, + "parameter_values" / Array( + this._parameter_value_count, + Struct( + "length" / Int32ub, + "value" / Bytes(this.length) + ) + ), + "result_column_format_count" / Int16ub, + "result_column_format_codes" / Array(this.result_column_format_count, + Int16ub) + ) + +class BindComplete(BackendMessage): + key = '2' + struct = Struct() + +class NoData(BackendMessage): + key = 'n' + struct = Struct() + +class Describe(FrontendMessage): + key = 'D' + struct = Struct( + "type" / Enum(Byte, + prepared_statement=ord('S'), + portal=ord('P') + ), + "name" / CString("ASCII") + ) + + def print(message): + return "Describe({}={})".format( + message.type, + message.name or "" + ) + +class Execute(FrontendMessage): + key = 'E' + struct = Struct( + "name" / CString("ASCII"), + "max_rows_to_return" / Int32ub + ) + + def print(message): + return "Execute({}, max_rows_to_return={})".format( + message.name or "", + message.max_rows_to_return + ) + +class Sync(FrontendMessage): + key = 'S' + struct = Struct() + frontend_switch = Switch( this.type, { **FrontendMessage.name_to_struct(), **SharedMessage.name_to_struct() }, diff --git a/src/test/regress/sql/failure_mx_metadata_sync.sql b/src/test/regress/sql/failure_mx_metadata_sync.sql new file mode 100644 index 000000000..aea247787 --- /dev/null +++ b/src/test/regress/sql/failure_mx_metadata_sync.sql @@ -0,0 +1,69 @@ +-- +-- failure_mx_metadata_sync.sql +-- +CREATE SCHEMA IF NOT EXISTS mx_metadata_sync; +SET SEARCH_PATH = mx_metadata_sync; +SET citus.shard_count TO 2; +SET citus.next_shard_id TO 16000000; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; + +SELECT pg_backend_pid() as pid \gset +SELECT citus.mitmproxy('conn.allow()'); + +CREATE TABLE t1 (id int PRIMARY KEY); +SELECT create_distributed_table('t1', 'id'); +INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x); + +-- Initial metadata status +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; + +-- Failure to set groupid in the worker +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").cancel(' || :pid || ')'); +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").kill()'); +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); + +-- Failure to drop all tables in pg_dist_partition +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_drop_distributed_table").cancel(' || :pid || ')'); +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_drop_distributed_table").kill()'); +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); + +-- Failure to truncate pg_dist_node in the worker +SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE pg_dist_node CASCADE").cancel(' || :pid || ')'); +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); +SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE pg_dist_node CASCADE").kill()'); +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); + +-- Failure to populate pg_dist_node in the worker +SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel(' || :pid || ')'); +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); +SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").kill()'); +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); + +-- Verify that coordinator knows worker does not have valid metadata +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; + +-- Verify we can sync metadata after unsuccessful attempts +SELECT citus.mitmproxy('conn.allow()'); +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; + +-- Check failures on DDL command propagation +CREATE TABLE t2 (id int PRIMARY KEY); + +SELECT citus.mitmproxy('conn.onParse(query="^INSERT INTO pg_dist_placement").kill()'); +SELECT create_distributed_table('t2', 'id'); + +SELECT citus.mitmproxy('conn.onParse(query="^INSERT INTO pg_dist_shard").cancel(' || :pid || ')'); +SELECT create_distributed_table('t2', 'id'); + +-- Verify that the table was not distributed +SELECT count(*) > 0 AS is_table_distributed +FROM pg_dist_partition +WHERE logicalrelid='t2'::regclass; + +DROP TABLE t1; +DROP TABLE t2; +DROP SCHEMA mx_metadata_sync CASCADE;