mirror of https://github.com/citusdata/citus.git
Failure&cancellation tests for mx metadata sync
Failure&Cancellation tests for initial start_metadata_sync() calls to worker and DDL queries that send metadata syncing messages to an MX node Also adds message type definitions for messages that are exchanged during metadata syncing -pull/2535/head
parent
967b369f10
commit
4dd1f5784b
|
@ -0,0 +1,170 @@
|
||||||
|
--
|
||||||
|
-- failure_mx_metadata_sync.sql
|
||||||
|
--
|
||||||
|
CREATE SCHEMA IF NOT EXISTS mx_metadata_sync;
|
||||||
|
SET SEARCH_PATH = mx_metadata_sync;
|
||||||
|
SET citus.shard_count TO 2;
|
||||||
|
SET citus.next_shard_id TO 16000000;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
SELECT pg_backend_pid() as pid \gset
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE t1 (id int PRIMARY KEY);
|
||||||
|
SELECT create_distributed_table('t1', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x);
|
||||||
|
-- Initial metadata status
|
||||||
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
|
||||||
|
hasmetadata
|
||||||
|
-------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Failure to set groupid in the worker
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- Failure to drop all tables in pg_dist_partition
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_drop_distributed_table").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_drop_distributed_table").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- Failure to truncate pg_dist_node in the worker
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE pg_dist_node CASCADE").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE pg_dist_node CASCADE").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- Failure to populate pg_dist_node in the worker
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
-- Verify that coordinator knows worker does not have valid metadata
|
||||||
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
|
||||||
|
hasmetadata
|
||||||
|
-------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Verify we can sync metadata after unsuccessful attempts
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
start_metadata_sync_to_node
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
|
||||||
|
hasmetadata
|
||||||
|
-------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Check failures on DDL command propagation
|
||||||
|
CREATE TABLE t2 (id int PRIMARY KEY);
|
||||||
|
SELECT citus.mitmproxy('conn.onParse(query="^INSERT INTO pg_dist_placement").kill()');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('t2', 'id');
|
||||||
|
ERROR: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
CONTEXT: while executing command on localhost:9060
|
||||||
|
SELECT citus.mitmproxy('conn.onParse(query="^INSERT INTO pg_dist_shard").cancel(' || :pid || ')');
|
||||||
|
mitmproxy
|
||||||
|
-----------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('t2', 'id');
|
||||||
|
ERROR: canceling statement due to user request
|
||||||
|
-- Verify that the table was not distributed
|
||||||
|
SELECT count(*) > 0 AS is_table_distributed
|
||||||
|
FROM pg_dist_partition
|
||||||
|
WHERE logicalrelid='t2'::regclass;
|
||||||
|
is_table_distributed
|
||||||
|
----------------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE t1;
|
||||||
|
DROP TABLE t2;
|
||||||
|
DROP SCHEMA mx_metadata_sync CASCADE;
|
|
@ -29,3 +29,4 @@ test: failure_insert_select_pushdown
|
||||||
test: failure_single_mod
|
test: failure_single_mod
|
||||||
test: failure_savepoints
|
test: failure_savepoints
|
||||||
test: failure_multi_row_insert
|
test: failure_multi_row_insert
|
||||||
|
test: failure_mx_metadata_sync
|
||||||
|
|
|
@ -8,6 +8,9 @@ import construct.lib as cl
|
||||||
|
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
# For all possible message formats see:
|
||||||
|
# https://www.postgresql.org/docs/current/protocol-message-formats.html
|
||||||
|
|
||||||
class MessageMeta(type):
|
class MessageMeta(type):
|
||||||
def __init__(cls, name, bases, namespace):
|
def __init__(cls, name, bases, namespace):
|
||||||
'''
|
'''
|
||||||
|
@ -293,6 +296,121 @@ class BackendKeyData(BackendMessage):
|
||||||
# Both of these should be censored, for reproducible regression test output
|
# Both of these should be censored, for reproducible regression test output
|
||||||
return "BackendKeyData(XXX)"
|
return "BackendKeyData(XXX)"
|
||||||
|
|
||||||
|
class NoticeResponse(BackendMessage):
|
||||||
|
key = 'N'
|
||||||
|
struct = Struct(
|
||||||
|
"notices" / GreedyRange(
|
||||||
|
Struct(
|
||||||
|
"key" / Enum(Byte,
|
||||||
|
severity=ord('S'),
|
||||||
|
_severity_not_localized=ord('V'),
|
||||||
|
_sql_state=ord('C'),
|
||||||
|
message=ord('M'),
|
||||||
|
detail=ord('D'),
|
||||||
|
hint=ord('H'),
|
||||||
|
_position=ord('P'),
|
||||||
|
_internal_position=ord('p'),
|
||||||
|
_internal_query=ord('q'),
|
||||||
|
_where=ord('W'),
|
||||||
|
schema_name=ord('s'),
|
||||||
|
table_name=ord('t'),
|
||||||
|
column_name=ord('c'),
|
||||||
|
data_type_name=ord('d'),
|
||||||
|
constraint_name=ord('n'),
|
||||||
|
_file_name=ord('F'),
|
||||||
|
_line_no=ord('L'),
|
||||||
|
_routine_name=ord('R')
|
||||||
|
),
|
||||||
|
"value" / CString("ASCII")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def print(message):
|
||||||
|
return "NoticeResponse({})".format(", ".join(
|
||||||
|
"{}={}".format(response.key, response.value)
|
||||||
|
for response in message.notices
|
||||||
|
if not response.key.startswith('_')
|
||||||
|
))
|
||||||
|
|
||||||
|
class Parse(FrontendMessage):
|
||||||
|
key = 'P'
|
||||||
|
struct = Struct(
|
||||||
|
"name" / CString("ASCII"),
|
||||||
|
"query" / CString("ASCII"),
|
||||||
|
"_parametercount" / Int16ub,
|
||||||
|
"parameters" / Array(
|
||||||
|
this._parametercount,
|
||||||
|
Int32ub
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
class ParseComplete(BackendMessage):
|
||||||
|
key = '1'
|
||||||
|
struct = Struct()
|
||||||
|
|
||||||
|
class Bind(FrontendMessage):
|
||||||
|
key = 'B'
|
||||||
|
struct = Struct(
|
||||||
|
"destination_portal" / CString("ASCII"),
|
||||||
|
"prepared_statement" / CString("ASCII"),
|
||||||
|
"_parameter_format_code_count" / Int16ub,
|
||||||
|
"parameter_format_codes" / Array(this._parameter_format_code_count,
|
||||||
|
Int16ub),
|
||||||
|
"_parameter_value_count" / Int16ub,
|
||||||
|
"parameter_values" / Array(
|
||||||
|
this._parameter_value_count,
|
||||||
|
Struct(
|
||||||
|
"length" / Int32ub,
|
||||||
|
"value" / Bytes(this.length)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
"result_column_format_count" / Int16ub,
|
||||||
|
"result_column_format_codes" / Array(this.result_column_format_count,
|
||||||
|
Int16ub)
|
||||||
|
)
|
||||||
|
|
||||||
|
class BindComplete(BackendMessage):
|
||||||
|
key = '2'
|
||||||
|
struct = Struct()
|
||||||
|
|
||||||
|
class NoData(BackendMessage):
|
||||||
|
key = 'n'
|
||||||
|
struct = Struct()
|
||||||
|
|
||||||
|
class Describe(FrontendMessage):
|
||||||
|
key = 'D'
|
||||||
|
struct = Struct(
|
||||||
|
"type" / Enum(Byte,
|
||||||
|
prepared_statement=ord('S'),
|
||||||
|
portal=ord('P')
|
||||||
|
),
|
||||||
|
"name" / CString("ASCII")
|
||||||
|
)
|
||||||
|
|
||||||
|
def print(message):
|
||||||
|
return "Describe({}={})".format(
|
||||||
|
message.type,
|
||||||
|
message.name or "<unnamed>"
|
||||||
|
)
|
||||||
|
|
||||||
|
class Execute(FrontendMessage):
|
||||||
|
key = 'E'
|
||||||
|
struct = Struct(
|
||||||
|
"name" / CString("ASCII"),
|
||||||
|
"max_rows_to_return" / Int32ub
|
||||||
|
)
|
||||||
|
|
||||||
|
def print(message):
|
||||||
|
return "Execute({}, max_rows_to_return={})".format(
|
||||||
|
message.name or "<unnamed>",
|
||||||
|
message.max_rows_to_return
|
||||||
|
)
|
||||||
|
|
||||||
|
class Sync(FrontendMessage):
|
||||||
|
key = 'S'
|
||||||
|
struct = Struct()
|
||||||
|
|
||||||
frontend_switch = Switch(
|
frontend_switch = Switch(
|
||||||
this.type,
|
this.type,
|
||||||
{ **FrontendMessage.name_to_struct(), **SharedMessage.name_to_struct() },
|
{ **FrontendMessage.name_to_struct(), **SharedMessage.name_to_struct() },
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
--
|
||||||
|
-- failure_mx_metadata_sync.sql
|
||||||
|
--
|
||||||
|
CREATE SCHEMA IF NOT EXISTS mx_metadata_sync;
|
||||||
|
SET SEARCH_PATH = mx_metadata_sync;
|
||||||
|
SET citus.shard_count TO 2;
|
||||||
|
SET citus.next_shard_id TO 16000000;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
|
||||||
|
SELECT pg_backend_pid() as pid \gset
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
CREATE TABLE t1 (id int PRIMARY KEY);
|
||||||
|
SELECT create_distributed_table('t1', 'id');
|
||||||
|
INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x);
|
||||||
|
|
||||||
|
-- Initial metadata status
|
||||||
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
|
||||||
|
|
||||||
|
-- Failure to set groupid in the worker
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").cancel(' || :pid || ')');
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").kill()');
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
|
||||||
|
-- Failure to drop all tables in pg_dist_partition
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_drop_distributed_table").cancel(' || :pid || ')');
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_drop_distributed_table").kill()');
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
|
||||||
|
-- Failure to truncate pg_dist_node in the worker
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE pg_dist_node CASCADE").cancel(' || :pid || ')');
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE pg_dist_node CASCADE").kill()');
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
|
||||||
|
-- Failure to populate pg_dist_node in the worker
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel(' || :pid || ')');
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").kill()');
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
|
||||||
|
-- Verify that coordinator knows worker does not have valid metadata
|
||||||
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
|
||||||
|
|
||||||
|
-- Verify we can sync metadata after unsuccessful attempts
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
|
||||||
|
|
||||||
|
-- Check failures on DDL command propagation
|
||||||
|
CREATE TABLE t2 (id int PRIMARY KEY);
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onParse(query="^INSERT INTO pg_dist_placement").kill()');
|
||||||
|
SELECT create_distributed_table('t2', 'id');
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.onParse(query="^INSERT INTO pg_dist_shard").cancel(' || :pid || ')');
|
||||||
|
SELECT create_distributed_table('t2', 'id');
|
||||||
|
|
||||||
|
-- Verify that the table was not distributed
|
||||||
|
SELECT count(*) > 0 AS is_table_distributed
|
||||||
|
FROM pg_dist_partition
|
||||||
|
WHERE logicalrelid='t2'::regclass;
|
||||||
|
|
||||||
|
DROP TABLE t1;
|
||||||
|
DROP TABLE t2;
|
||||||
|
DROP SCHEMA mx_metadata_sync CASCADE;
|
Loading…
Reference in New Issue