diff --git a/src/test/regress/expected/failure_connection_establishment.out b/src/test/regress/expected/failure_connection_establishment.out index 9c44269a3..d30e390e3 100644 --- a/src/test/regress/expected/failure_connection_establishment.out +++ b/src/test/regress/expected/failure_connection_establishment.out @@ -34,24 +34,6 @@ SELECT create_distributed_table('products', 'product_no'); ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(name); ERROR: cannot create constraint on "products" DETAIL: Distributed relations cannot have UNIQUE, EXCLUDE, or PRIMARY KEY constraints that do not include the partition column (with an equality operator if EXCLUDE). --- we will insert a connection delay here as this query was the cause for an investigation --- into connection establishment problems -SET citus.node_connection_timeout TO 400; -SELECT citus.mitmproxy('conn.delay(500)'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no); -WARNING: could not establish connection after 400 ms -ERROR: connection to the remote node localhost:xxxxx failed -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - CREATE TABLE r1 ( id int PRIMARY KEY, name text @@ -70,77 +52,88 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) -SELECT citus.clear_network_traffic(); - clear_network_traffic +-- Confirm that the first placement for both tables is on the second worker +-- node. This is necessary so we can use the first-replica task assignment +-- policy to first hit the node that we generate timeouts for. +SELECT placementid, p.shardid, logicalrelid, LEAST(2, groupid) groupid +FROM pg_dist_placement p JOIN pg_dist_shard s ON p.shardid = s.shardid +ORDER BY placementid; + placementid | shardid | logicalrelid | groupid --------------------------------------------------------------------- + 1450000 | 1450000 | products | 2 + 1450001 | 1450000 | products | 1 + 1450002 | 1450001 | products | 1 + 1450003 | 1450001 | products | 2 + 1450004 | 1450002 | products | 2 + 1450005 | 1450002 | products | 1 + 1450006 | 1450003 | products | 1 + 1450007 | 1450003 | products | 2 + 1450008 | 1450004 | r1 | 2 + 1450009 | 1450004 | r1 | 1 +(10 rows) -(1 row) - -SELECT citus.mitmproxy('conn.delay(500)'); +SET citus.task_assignment_policy TO 'first-replica'; +-- we will insert a connection delay here as this query was the cause for an +-- investigation into connection establishment problems +SET citus.node_connection_timeout TO 900; +SELECT citus.mitmproxy('conn.connect_delay(1400)'); mitmproxy --------------------------------------------------------------------- (1 row) --- we cannot control which replica of the reference table will be queried and there is --- only one specific client we can control the connection for. --- by using round-robin task_assignment_policy we can force to hit both machines. --- and in the end, dumping the network traffic shows that the connection establishment --- is initiated to the node behind the proxy -SET client_min_messages TO ERROR; -SET citus.task_assignment_policy TO 'round-robin'; --- suppress the warning since we can't control which shard is chose first. Failure of this --- test would be if one of the queries does not return the result but an error. -SELECT name FROM r1 WHERE id = 2; - name ---------------------------------------------------------------------- - bar -(1 row) - -SELECT name FROM r1 WHERE id = 2; - name ---------------------------------------------------------------------- - bar -(1 row) - --- verify a connection attempt was made to the intercepted node, this would have cause the --- connection to have been delayed and thus caused a timeout -SELECT * FROM citus.dump_network_traffic() WHERE conn=0; - conn | source | message ---------------------------------------------------------------------- - 0 | coordinator | [initial message] -(1 row) - +ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no); +WARNING: could not establish connection after 900 ms +ERROR: connection to the remote node localhost:xxxxx failed +RESET citus.node_connection_timeout; SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) --- similar test with the above but this time on a --- distributed table instead of a reference table --- and with citus.force_max_query_parallelization is set -SET citus.force_max_query_parallelization TO ON; -SELECT citus.mitmproxy('conn.delay(500)'); +-- Make sure that we fall back to a working node for reads, even if it's not +-- the first choice in our task assignment policy. +SET citus.node_connection_timeout TO 900; +SELECT citus.mitmproxy('conn.connect_delay(1400)'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +-- tests for connectivity checks +SELECT name FROM r1 WHERE id = 2; +WARNING: could not establish any connections to the node localhost:xxxxx after 900 ms + name +--------------------------------------------------------------------- + bar +(1 row) + +RESET citus.node_connection_timeout; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +-- similar test with the above but this time on a distributed table instead of +-- a reference table and with citus.force_max_query_parallelization is set +SET citus.force_max_query_parallelization TO ON; +SET citus.node_connection_timeout TO 900; +SELECT citus.mitmproxy('conn.connect_delay(1400)'); mitmproxy --------------------------------------------------------------------- (1 row) --- suppress the warning since we can't control which shard is chose first. Failure of this --- test would be if one of the queries does not return the result but an error. -SELECT count(*) FROM products; - count ---------------------------------------------------------------------- - 0 -(1 row) - SELECT count(*) FROM products; +WARNING: could not establish any connections to the node localhost:xxxxx after 900 ms count --------------------------------------------------------------------- 0 (1 row) +RESET citus.node_connection_timeout; SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -158,24 +151,26 @@ SELECT create_distributed_table('single_replicatated', 'key'); -- this time the table is single replicated and we're still using the -- the max parallelization flag, so the query should fail SET citus.force_max_query_parallelization TO ON; -SELECT citus.mitmproxy('conn.delay(500)'); +SET citus.node_connection_timeout TO 900; +SELECT citus.mitmproxy('conn.connect_delay(1400)'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT count(*) FROM single_replicatated; -ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms -SET citus.force_max_query_parallelization TO OFF; --- one similar test, and this time on modification queries --- to see that connection establishement failures could --- fail the transaction (but not mark any placements as INVALID) +ERROR: could not establish any connections to the node localhost:xxxxx after 900 ms +RESET citus.force_max_query_parallelization; +RESET citus.node_connection_timeout; SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) +-- one similar test, and this time on modification queries +-- to see that connection establishement failures could +-- fail the transaction (but not mark any placements as INVALID) BEGIN; SELECT count(*) as invalid_placement_count @@ -189,15 +184,23 @@ WHERE 0 (1 row) -SELECT citus.mitmproxy('conn.delay(500)'); +SET citus.node_connection_timeout TO 900; +SELECT citus.mitmproxy('conn.connect_delay(1400)'); mitmproxy --------------------------------------------------------------------- (1 row) INSERT INTO single_replicatated VALUES (100); -ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms +ERROR: could not establish any connections to the node localhost:xxxxx after 900 ms COMMIT; +RESET citus.node_connection_timeout; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + SELECT count(*) as invalid_placement_count FROM @@ -210,13 +213,6 @@ WHERE 0 (1 row) --- show that INSERT failed -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - SELECT count(*) FROM single_replicatated WHERE key = 100; count --------------------------------------------------------------------- @@ -299,7 +295,8 @@ SELECT citus.mitmproxy('conn.onCommandComplete(command="SELECT 1").cancel(' || p SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); ERROR: canceling statement due to user request -- verify that the checks are not successful when timeouts happen on a connection -SELECT citus.mitmproxy('conn.delay(500)'); +SET citus.node_connection_timeout TO 900; +SELECT citus.mitmproxy('conn.connect_delay(1400)'); mitmproxy --------------------------------------------------------------------- @@ -311,6 +308,13 @@ SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); f (1 row) +RESET citus.node_connection_timeout; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + -- tests for citus_check_cluster_node_health -- kill all connectivity checks that originate from this node SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").kill()'); @@ -427,13 +431,13 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_ SELECT * FROM citus_check_cluster_node_health(); ERROR: canceling statement due to user request RESET client_min_messages; +RESET citus.node_connection_timeout; SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) -SET citus.node_connection_timeout TO DEFAULT; DROP SCHEMA fail_connect CASCADE; NOTICE: drop cascades to 3 other objects DETAIL: drop cascades to table products diff --git a/src/test/regress/expected/failure_failover_to_local_execution.out b/src/test/regress/expected/failure_failover_to_local_execution.out index 0ecc98111..56518141a 100644 --- a/src/test/regress/expected/failure_failover_to_local_execution.out +++ b/src/test/regress/expected/failure_failover_to_local_execution.out @@ -36,7 +36,7 @@ INSERT INTO failover_to_local SELECT i, i::text FROM generate_series(0,20)i; -- even if the connection establishment fails, Citus can -- failover to local exection SET citus.node_connection_timeout TO 400; -SELECT citus.mitmproxy('conn.delay(500)'); +SELECT citus.mitmproxy('conn.connect_delay(500)'); mitmproxy --------------------------------------------------------------------- @@ -54,6 +54,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM failure_fa (1 row) RESET client_min_messages; +RESET citus.node_connection_timeout; SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -68,7 +69,8 @@ CONTEXT: while executing command on localhost:xxxxx -- if the local execution is disabled, Citus does -- not try to fallback to local execution SET citus.enable_local_execution TO false; -SELECT citus.mitmproxy('conn.delay(500)'); +SET citus.node_connection_timeout TO 400; +SELECT citus.mitmproxy('conn.connect_delay(500)'); mitmproxy --------------------------------------------------------------------- @@ -77,6 +79,7 @@ SELECT citus.mitmproxy('conn.delay(500)'); SET citus.log_local_commands TO ON; SELECT count(*) FROM failover_to_local; ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms +RESET citus.node_connection_timeout; SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/mitmscripts/fluent.py b/src/test/regress/mitmscripts/fluent.py index 2fc408e03..2e6be9d34 100644 --- a/src/test/regress/mitmscripts/fluent.py +++ b/src/test/regress/mitmscripts/fluent.py @@ -114,8 +114,8 @@ class ActionsMixin: self.next = CancelHandler(self.root, pid) return self.next - def delay(self, timeMs): - self.next = DelayHandler(self.root, timeMs) + def connect_delay(self, timeMs): + self.next = ConnectDelayHandler(self.root, timeMs) return self.next class AcceptHandler(Handler): @@ -174,13 +174,14 @@ class CancelHandler(Handler): time.sleep(0.1) return 'done' -class DelayHandler(Handler): - 'Delay a packet by sleeping before deciding what to do' +class ConnectDelayHandler(Handler): + 'Delay the initial packet by sleeping before deciding what to do' def __init__(self, root, timeMs): super().__init__(root) self.timeMs = timeMs def _handle(self, flow, message): - time.sleep(self.timeMs/1000.0) + if message.is_initial: + time.sleep(self.timeMs/1000.0) return 'done' class Contains(Handler, ActionsMixin, FilterableMixin): diff --git a/src/test/regress/sql/failure_connection_establishment.sql b/src/test/regress/sql/failure_connection_establishment.sql index 5029d40b7..1817e3199 100644 --- a/src/test/regress/sql/failure_connection_establishment.sql +++ b/src/test/regress/sql/failure_connection_establishment.sql @@ -28,16 +28,6 @@ SELECT create_distributed_table('products', 'product_no'); -- Command below should error out since 'name' is not a distribution column ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(name); - --- we will insert a connection delay here as this query was the cause for an investigation --- into connection establishment problems -SET citus.node_connection_timeout TO 400; -SELECT citus.mitmproxy('conn.delay(500)'); - -ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no); - -SELECT citus.mitmproxy('conn.allow()'); - CREATE TABLE r1 ( id int PRIMARY KEY, name text @@ -49,38 +39,40 @@ INSERT INTO r1 (id, name) VALUES SELECT create_reference_table('r1'); -SELECT citus.clear_network_traffic(); -SELECT citus.mitmproxy('conn.delay(500)'); - --- we cannot control which replica of the reference table will be queried and there is --- only one specific client we can control the connection for. --- by using round-robin task_assignment_policy we can force to hit both machines. --- and in the end, dumping the network traffic shows that the connection establishment --- is initiated to the node behind the proxy -SET client_min_messages TO ERROR; -SET citus.task_assignment_policy TO 'round-robin'; --- suppress the warning since we can't control which shard is chose first. Failure of this --- test would be if one of the queries does not return the result but an error. -SELECT name FROM r1 WHERE id = 2; -SELECT name FROM r1 WHERE id = 2; - --- verify a connection attempt was made to the intercepted node, this would have cause the --- connection to have been delayed and thus caused a timeout -SELECT * FROM citus.dump_network_traffic() WHERE conn=0; +-- Confirm that the first placement for both tables is on the second worker +-- node. This is necessary so we can use the first-replica task assignment +-- policy to first hit the node that we generate timeouts for. +SELECT placementid, p.shardid, logicalrelid, LEAST(2, groupid) groupid +FROM pg_dist_placement p JOIN pg_dist_shard s ON p.shardid = s.shardid +ORDER BY placementid; +SET citus.task_assignment_policy TO 'first-replica'; +-- we will insert a connection delay here as this query was the cause for an +-- investigation into connection establishment problems +SET citus.node_connection_timeout TO 900; +SELECT citus.mitmproxy('conn.connect_delay(1400)'); +ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no); +RESET citus.node_connection_timeout; SELECT citus.mitmproxy('conn.allow()'); --- similar test with the above but this time on a --- distributed table instead of a reference table --- and with citus.force_max_query_parallelization is set +-- Make sure that we fall back to a working node for reads, even if it's not +-- the first choice in our task assignment policy. +SET citus.node_connection_timeout TO 900; +SELECT citus.mitmproxy('conn.connect_delay(1400)'); +-- tests for connectivity checks +SELECT name FROM r1 WHERE id = 2; +RESET citus.node_connection_timeout; +SELECT citus.mitmproxy('conn.allow()'); + +-- similar test with the above but this time on a distributed table instead of +-- a reference table and with citus.force_max_query_parallelization is set SET citus.force_max_query_parallelization TO ON; -SELECT citus.mitmproxy('conn.delay(500)'); --- suppress the warning since we can't control which shard is chose first. Failure of this --- test would be if one of the queries does not return the result but an error. +SET citus.node_connection_timeout TO 900; +SELECT citus.mitmproxy('conn.connect_delay(1400)'); SELECT count(*) FROM products; -SELECT count(*) FROM products; - +RESET citus.node_connection_timeout; SELECT citus.mitmproxy('conn.allow()'); + SET citus.shard_replication_factor TO 1; CREATE TABLE single_replicatated(key int); SELECT create_distributed_table('single_replicatated', 'key'); @@ -88,15 +80,17 @@ SELECT create_distributed_table('single_replicatated', 'key'); -- this time the table is single replicated and we're still using the -- the max parallelization flag, so the query should fail SET citus.force_max_query_parallelization TO ON; -SELECT citus.mitmproxy('conn.delay(500)'); +SET citus.node_connection_timeout TO 900; +SELECT citus.mitmproxy('conn.connect_delay(1400)'); SELECT count(*) FROM single_replicatated; +RESET citus.force_max_query_parallelization; +RESET citus.node_connection_timeout; +SELECT citus.mitmproxy('conn.allow()'); -SET citus.force_max_query_parallelization TO OFF; -- one similar test, and this time on modification queries -- to see that connection establishement failures could -- fail the transaction (but not mark any placements as INVALID) -SELECT citus.mitmproxy('conn.allow()'); BEGIN; SELECT count(*) as invalid_placement_count @@ -105,9 +99,12 @@ FROM WHERE shardstate = 3 AND shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'single_replicatated'::regclass); -SELECT citus.mitmproxy('conn.delay(500)'); +SET citus.node_connection_timeout TO 900; +SELECT citus.mitmproxy('conn.connect_delay(1400)'); INSERT INTO single_replicatated VALUES (100); COMMIT; +RESET citus.node_connection_timeout; +SELECT citus.mitmproxy('conn.allow()'); SELECT count(*) as invalid_placement_count FROM @@ -116,8 +113,6 @@ WHERE shardstate = 3 AND shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'single_replicatated'::regclass); --- show that INSERT failed -SELECT citus.mitmproxy('conn.allow()'); SELECT count(*) FROM single_replicatated WHERE key = 100; @@ -154,8 +149,11 @@ SELECT citus.mitmproxy('conn.onCommandComplete(command="SELECT 1").cancel(' || p SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); -- verify that the checks are not successful when timeouts happen on a connection -SELECT citus.mitmproxy('conn.delay(500)'); +SET citus.node_connection_timeout TO 900; +SELECT citus.mitmproxy('conn.connect_delay(1400)'); SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); +RESET citus.node_connection_timeout; +SELECT citus.mitmproxy('conn.allow()'); -- tests for citus_check_cluster_node_health @@ -197,7 +195,7 @@ SELECT * FROM citus_check_cluster_node_health(); RESET client_min_messages; +RESET citus.node_connection_timeout; SELECT citus.mitmproxy('conn.allow()'); -SET citus.node_connection_timeout TO DEFAULT; DROP SCHEMA fail_connect CASCADE; SET search_path TO default; diff --git a/src/test/regress/sql/failure_failover_to_local_execution.sql b/src/test/regress/sql/failure_failover_to_local_execution.sql index 19722f6ba..12ae3afd9 100644 --- a/src/test/regress/sql/failure_failover_to_local_execution.sql +++ b/src/test/regress/sql/failure_failover_to_local_execution.sql @@ -23,11 +23,12 @@ INSERT INTO failover_to_local SELECT i, i::text FROM generate_series(0,20)i; -- even if the connection establishment fails, Citus can -- failover to local exection SET citus.node_connection_timeout TO 400; -SELECT citus.mitmproxy('conn.delay(500)'); +SELECT citus.mitmproxy('conn.connect_delay(500)'); SET citus.log_local_commands TO ON; SET client_min_messages TO DEBUG1; SELECT count(*) FROM failover_to_local; RESET client_min_messages; +RESET citus.node_connection_timeout; SELECT citus.mitmproxy('conn.allow()'); -- if the remote query execution fails, Citus @@ -37,9 +38,11 @@ SELECT key / 0 FROM failover_to_local; -- if the local execution is disabled, Citus does -- not try to fallback to local execution SET citus.enable_local_execution TO false; -SELECT citus.mitmproxy('conn.delay(500)'); +SET citus.node_connection_timeout TO 400; +SELECT citus.mitmproxy('conn.connect_delay(500)'); SET citus.log_local_commands TO ON; SELECT count(*) FROM failover_to_local; +RESET citus.node_connection_timeout; SELECT citus.mitmproxy('conn.allow()'); RESET citus.enable_local_execution;