mirror of https://github.com/citusdata/citus.git
Fix flakyness in failure_connection_establishment (#6226)
In CI our failure_connection_establishment sometimes failed randomly with the following error: ```diff -- verify a connection attempt was made to the intercepted node, this would have cause the -- connection to have been delayed and thus caused a timeout SELECT * FROM citus.dump_network_traffic() WHERE conn=0; conn | source | message ------+--------+--------- - 0 | coordinator | [initial message] -(1 row) +(0 rows) SELECT citus.mitmproxy('conn.allow()'); ``` Source: https://app.circleci.com/pipelines/github/citusdata/citus/26318/workflows/d3354024-9a67-4b01-9416-5cf79aec6bd8/jobs/745558 The way I fixed this was by removing the dump_network_traffic call. This might sound simple, but doing this while continuing to let the test serve its intended purpose required quite some more changes. This dump_network_traffic call was there because we didn't want to show warnings in the queries above, because the exact warnings were not reliable. The main reason this error was not reliable was because we were using round-robin task assignment. We did the same query twice, so that it would hit the node with the intercepted connection in one of those connections. Instead of doing that I'm now using the "first-replica" policy and do the queries only once. This works, because the first placements by placementid for each of the used tables are on the second node, so first-replica will cause the first connection to go there. This solved most of the flakyness, but when confirming that the flakyness was fixed I found some additional errors: ```diff -- show that INSERT failed SELECT citus.mitmproxy('conn.allow()'); mitmproxy ----------- (1 row) SELECT count(*) FROM single_replicatated WHERE key = 100; - count ---------------------------------------------------------------------- - 0 -(1 row) - +ERROR: could not establish any connections to the node localhost:9060 after 400 ms RESET client_min_messages; ``` Source: https://app.circleci.com/pipelines/github/citusdata/citus/26321/workflows/fd5f4622-400c-465e-8d82-83f5f55a87ec/jobs/745666 I addressed this with a combination of two things: 1. Only change citus.node_connection_timeout for the queries that we want to test timeout behaviour for. When those queries are done I reset the value to the default again. 2. Change our mitm framework to only delay the initial connection packet instead of all packets. I think sometimes a follow on packet of a previous connection attempt was causing the next connection attempt to be delayed even if `conn.allow()` was already called. For our tests we only care about connection timeouts, so there's no reason to delay any other packets than the initial connection packet. Then there was some last flakyness in the exact error that was given: ```diff -- tests for connectivity checks SELECT name FROM r1 WHERE id = 2; WARNING: could not establish any connections to the node localhost:9060 after 900 ms +WARNING: connection to the remote node localhost:9060 failed with the following error: name ------ bar (1 row) ``` Source: https://app.circleci.com/pipelines/github/citusdata/citus/26338/workflows/9610941c-4d01-4f62-84dc-b91abc56c252/jobs/746467 I don't have a good explaination for this slight change in error message, but given that it is missing the actual error message I expected this to be related to some small difference in timing: e.g. the server responding to the connection attempt right after the coordinator determined that the connection timed out. To solve this last flakyness I increased the connection timeouts and made the difference between the timeout and the delay a bit bigger. With these tweaks I wasn't able to reproduce this error on CI anymore. Finally, I made most of the same changes to failure_failover_to_local_execution, since it was using the `conn.delay()` mitm method too. The only change that I left out was the timing increase, since it might not be strictly necessary and increases time it takes to run the test. If this test ever becomes flaky the first thing we should try is increase its timeout.pull/6224/head
parent
506c16efdf
commit
cc7e93a56a
|
@ -34,24 +34,6 @@ SELECT create_distributed_table('products', 'product_no');
|
||||||
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(name);
|
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(name);
|
||||||
ERROR: cannot create constraint on "products"
|
ERROR: cannot create constraint on "products"
|
||||||
DETAIL: Distributed relations cannot have UNIQUE, EXCLUDE, or PRIMARY KEY constraints that do not include the partition column (with an equality operator if EXCLUDE).
|
DETAIL: Distributed relations cannot have UNIQUE, EXCLUDE, or PRIMARY KEY constraints that do not include the partition column (with an equality operator if EXCLUDE).
|
||||||
-- we will insert a connection delay here as this query was the cause for an investigation
|
|
||||||
-- into connection establishment problems
|
|
||||||
SET citus.node_connection_timeout TO 400;
|
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
|
||||||
mitmproxy
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no);
|
|
||||||
WARNING: could not establish connection after 400 ms
|
|
||||||
ERROR: connection to the remote node localhost:xxxxx failed
|
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
|
||||||
mitmproxy
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
CREATE TABLE r1 (
|
CREATE TABLE r1 (
|
||||||
id int PRIMARY KEY,
|
id int PRIMARY KEY,
|
||||||
name text
|
name text
|
||||||
|
@ -70,77 +52,88 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT citus.clear_network_traffic();
|
-- Confirm that the first placement for both tables is on the second worker
|
||||||
clear_network_traffic
|
-- node. This is necessary so we can use the first-replica task assignment
|
||||||
|
-- policy to first hit the node that we generate timeouts for.
|
||||||
|
SELECT placementid, p.shardid, logicalrelid, LEAST(2, groupid) groupid
|
||||||
|
FROM pg_dist_placement p JOIN pg_dist_shard s ON p.shardid = s.shardid
|
||||||
|
ORDER BY placementid;
|
||||||
|
placementid | shardid | logicalrelid | groupid
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
1450000 | 1450000 | products | 2
|
||||||
|
1450001 | 1450000 | products | 1
|
||||||
|
1450002 | 1450001 | products | 1
|
||||||
|
1450003 | 1450001 | products | 2
|
||||||
|
1450004 | 1450002 | products | 2
|
||||||
|
1450005 | 1450002 | products | 1
|
||||||
|
1450006 | 1450003 | products | 1
|
||||||
|
1450007 | 1450003 | products | 2
|
||||||
|
1450008 | 1450004 | r1 | 2
|
||||||
|
1450009 | 1450004 | r1 | 1
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
(1 row)
|
SET citus.task_assignment_policy TO 'first-replica';
|
||||||
|
-- we will insert a connection delay here as this query was the cause for an
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
-- investigation into connection establishment problems
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- we cannot control which replica of the reference table will be queried and there is
|
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no);
|
||||||
-- only one specific client we can control the connection for.
|
WARNING: could not establish connection after 900 ms
|
||||||
-- by using round-robin task_assignment_policy we can force to hit both machines.
|
ERROR: connection to the remote node localhost:xxxxx failed
|
||||||
-- and in the end, dumping the network traffic shows that the connection establishment
|
RESET citus.node_connection_timeout;
|
||||||
-- is initiated to the node behind the proxy
|
|
||||||
SET client_min_messages TO ERROR;
|
|
||||||
SET citus.task_assignment_policy TO 'round-robin';
|
|
||||||
-- suppress the warning since we can't control which shard is chose first. Failure of this
|
|
||||||
-- test would be if one of the queries does not return the result but an error.
|
|
||||||
SELECT name FROM r1 WHERE id = 2;
|
|
||||||
name
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
bar
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT name FROM r1 WHERE id = 2;
|
|
||||||
name
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
bar
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
-- verify a connection attempt was made to the intercepted node, this would have cause the
|
|
||||||
-- connection to have been delayed and thus caused a timeout
|
|
||||||
SELECT * FROM citus.dump_network_traffic() WHERE conn=0;
|
|
||||||
conn | source | message
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
0 | coordinator | [initial message]
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- similar test with the above but this time on a
|
-- Make sure that we fall back to a working node for reads, even if it's not
|
||||||
-- distributed table instead of a reference table
|
-- the first choice in our task assignment policy.
|
||||||
-- and with citus.force_max_query_parallelization is set
|
SET citus.node_connection_timeout TO 900;
|
||||||
SET citus.force_max_query_parallelization TO ON;
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- tests for connectivity checks
|
||||||
|
SELECT name FROM r1 WHERE id = 2;
|
||||||
|
WARNING: could not establish any connections to the node localhost:xxxxx after 900 ms
|
||||||
|
name
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
bar
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- similar test with the above but this time on a distributed table instead of
|
||||||
|
-- a reference table and with citus.force_max_query_parallelization is set
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- suppress the warning since we can't control which shard is chose first. Failure of this
|
|
||||||
-- test would be if one of the queries does not return the result but an error.
|
|
||||||
SELECT count(*) FROM products;
|
|
||||||
count
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
0
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT count(*) FROM products;
|
SELECT count(*) FROM products;
|
||||||
|
WARNING: could not establish any connections to the node localhost:xxxxx after 900 ms
|
||||||
count
|
count
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -158,24 +151,26 @@ SELECT create_distributed_table('single_replicatated', 'key');
|
||||||
-- this time the table is single replicated and we're still using the
|
-- this time the table is single replicated and we're still using the
|
||||||
-- the max parallelization flag, so the query should fail
|
-- the max parallelization flag, so the query should fail
|
||||||
SET citus.force_max_query_parallelization TO ON;
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT count(*) FROM single_replicatated;
|
SELECT count(*) FROM single_replicatated;
|
||||||
ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms
|
ERROR: could not establish any connections to the node localhost:xxxxx after 900 ms
|
||||||
SET citus.force_max_query_parallelization TO OFF;
|
RESET citus.force_max_query_parallelization;
|
||||||
-- one similar test, and this time on modification queries
|
RESET citus.node_connection_timeout;
|
||||||
-- to see that connection establishement failures could
|
|
||||||
-- fail the transaction (but not mark any placements as INVALID)
|
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- one similar test, and this time on modification queries
|
||||||
|
-- to see that connection establishement failures could
|
||||||
|
-- fail the transaction (but not mark any placements as INVALID)
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT
|
SELECT
|
||||||
count(*) as invalid_placement_count
|
count(*) as invalid_placement_count
|
||||||
|
@ -189,15 +184,23 @@ WHERE
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
INSERT INTO single_replicatated VALUES (100);
|
INSERT INTO single_replicatated VALUES (100);
|
||||||
ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms
|
ERROR: could not establish any connections to the node localhost:xxxxx after 900 ms
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
SELECT
|
SELECT
|
||||||
count(*) as invalid_placement_count
|
count(*) as invalid_placement_count
|
||||||
FROM
|
FROM
|
||||||
|
@ -210,13 +213,6 @@ WHERE
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- show that INSERT failed
|
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
|
||||||
mitmproxy
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT count(*) FROM single_replicatated WHERE key = 100;
|
SELECT count(*) FROM single_replicatated WHERE key = 100;
|
||||||
count
|
count
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -299,7 +295,8 @@ SELECT citus.mitmproxy('conn.onCommandComplete(command="SELECT 1").cancel(' || p
|
||||||
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
|
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
|
||||||
ERROR: canceling statement due to user request
|
ERROR: canceling statement due to user request
|
||||||
-- verify that the checks are not successful when timeouts happen on a connection
|
-- verify that the checks are not successful when timeouts happen on a connection
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -311,6 +308,13 @@ SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
|
||||||
f
|
f
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- tests for citus_check_cluster_node_health
|
-- tests for citus_check_cluster_node_health
|
||||||
-- kill all connectivity checks that originate from this node
|
-- kill all connectivity checks that originate from this node
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").kill()');
|
||||||
|
@ -427,13 +431,13 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_
|
||||||
SELECT * FROM citus_check_cluster_node_health();
|
SELECT * FROM citus_check_cluster_node_health();
|
||||||
ERROR: canceling statement due to user request
|
ERROR: canceling statement due to user request
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SET citus.node_connection_timeout TO DEFAULT;
|
|
||||||
DROP SCHEMA fail_connect CASCADE;
|
DROP SCHEMA fail_connect CASCADE;
|
||||||
NOTICE: drop cascades to 3 other objects
|
NOTICE: drop cascades to 3 other objects
|
||||||
DETAIL: drop cascades to table products
|
DETAIL: drop cascades to table products
|
||||||
|
|
|
@ -36,7 +36,7 @@ INSERT INTO failover_to_local SELECT i, i::text FROM generate_series(0,20)i;
|
||||||
-- even if the connection establishment fails, Citus can
|
-- even if the connection establishment fails, Citus can
|
||||||
-- failover to local exection
|
-- failover to local exection
|
||||||
SET citus.node_connection_timeout TO 400;
|
SET citus.node_connection_timeout TO 400;
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
SELECT citus.mitmproxy('conn.connect_delay(500)');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -54,6 +54,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM failure_fa
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -68,7 +69,8 @@ CONTEXT: while executing command on localhost:xxxxx
|
||||||
-- if the local execution is disabled, Citus does
|
-- if the local execution is disabled, Citus does
|
||||||
-- not try to fallback to local execution
|
-- not try to fallback to local execution
|
||||||
SET citus.enable_local_execution TO false;
|
SET citus.enable_local_execution TO false;
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
SET citus.node_connection_timeout TO 400;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(500)');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -77,6 +79,7 @@ SELECT citus.mitmproxy('conn.delay(500)');
|
||||||
SET citus.log_local_commands TO ON;
|
SET citus.log_local_commands TO ON;
|
||||||
SELECT count(*) FROM failover_to_local;
|
SELECT count(*) FROM failover_to_local;
|
||||||
ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms
|
ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
|
@ -114,8 +114,8 @@ class ActionsMixin:
|
||||||
self.next = CancelHandler(self.root, pid)
|
self.next = CancelHandler(self.root, pid)
|
||||||
return self.next
|
return self.next
|
||||||
|
|
||||||
def delay(self, timeMs):
|
def connect_delay(self, timeMs):
|
||||||
self.next = DelayHandler(self.root, timeMs)
|
self.next = ConnectDelayHandler(self.root, timeMs)
|
||||||
return self.next
|
return self.next
|
||||||
|
|
||||||
class AcceptHandler(Handler):
|
class AcceptHandler(Handler):
|
||||||
|
@ -174,12 +174,13 @@ class CancelHandler(Handler):
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
return 'done'
|
return 'done'
|
||||||
|
|
||||||
class DelayHandler(Handler):
|
class ConnectDelayHandler(Handler):
|
||||||
'Delay a packet by sleeping before deciding what to do'
|
'Delay the initial packet by sleeping before deciding what to do'
|
||||||
def __init__(self, root, timeMs):
|
def __init__(self, root, timeMs):
|
||||||
super().__init__(root)
|
super().__init__(root)
|
||||||
self.timeMs = timeMs
|
self.timeMs = timeMs
|
||||||
def _handle(self, flow, message):
|
def _handle(self, flow, message):
|
||||||
|
if message.is_initial:
|
||||||
time.sleep(self.timeMs/1000.0)
|
time.sleep(self.timeMs/1000.0)
|
||||||
return 'done'
|
return 'done'
|
||||||
|
|
||||||
|
|
|
@ -28,16 +28,6 @@ SELECT create_distributed_table('products', 'product_no');
|
||||||
-- Command below should error out since 'name' is not a distribution column
|
-- Command below should error out since 'name' is not a distribution column
|
||||||
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(name);
|
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(name);
|
||||||
|
|
||||||
|
|
||||||
-- we will insert a connection delay here as this query was the cause for an investigation
|
|
||||||
-- into connection establishment problems
|
|
||||||
SET citus.node_connection_timeout TO 400;
|
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
|
||||||
|
|
||||||
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no);
|
|
||||||
|
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
|
||||||
|
|
||||||
CREATE TABLE r1 (
|
CREATE TABLE r1 (
|
||||||
id int PRIMARY KEY,
|
id int PRIMARY KEY,
|
||||||
name text
|
name text
|
||||||
|
@ -49,38 +39,40 @@ INSERT INTO r1 (id, name) VALUES
|
||||||
|
|
||||||
SELECT create_reference_table('r1');
|
SELECT create_reference_table('r1');
|
||||||
|
|
||||||
SELECT citus.clear_network_traffic();
|
-- Confirm that the first placement for both tables is on the second worker
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
-- node. This is necessary so we can use the first-replica task assignment
|
||||||
|
-- policy to first hit the node that we generate timeouts for.
|
||||||
-- we cannot control which replica of the reference table will be queried and there is
|
SELECT placementid, p.shardid, logicalrelid, LEAST(2, groupid) groupid
|
||||||
-- only one specific client we can control the connection for.
|
FROM pg_dist_placement p JOIN pg_dist_shard s ON p.shardid = s.shardid
|
||||||
-- by using round-robin task_assignment_policy we can force to hit both machines.
|
ORDER BY placementid;
|
||||||
-- and in the end, dumping the network traffic shows that the connection establishment
|
SET citus.task_assignment_policy TO 'first-replica';
|
||||||
-- is initiated to the node behind the proxy
|
|
||||||
SET client_min_messages TO ERROR;
|
|
||||||
SET citus.task_assignment_policy TO 'round-robin';
|
|
||||||
-- suppress the warning since we can't control which shard is chose first. Failure of this
|
|
||||||
-- test would be if one of the queries does not return the result but an error.
|
|
||||||
SELECT name FROM r1 WHERE id = 2;
|
|
||||||
SELECT name FROM r1 WHERE id = 2;
|
|
||||||
|
|
||||||
-- verify a connection attempt was made to the intercepted node, this would have cause the
|
|
||||||
-- connection to have been delayed and thus caused a timeout
|
|
||||||
SELECT * FROM citus.dump_network_traffic() WHERE conn=0;
|
|
||||||
|
|
||||||
|
-- we will insert a connection delay here as this query was the cause for an
|
||||||
|
-- investigation into connection establishment problems
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
|
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no);
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
-- similar test with the above but this time on a
|
-- Make sure that we fall back to a working node for reads, even if it's not
|
||||||
-- distributed table instead of a reference table
|
-- the first choice in our task assignment policy.
|
||||||
-- and with citus.force_max_query_parallelization is set
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
|
-- tests for connectivity checks
|
||||||
|
SELECT name FROM r1 WHERE id = 2;
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
-- similar test with the above but this time on a distributed table instead of
|
||||||
|
-- a reference table and with citus.force_max_query_parallelization is set
|
||||||
SET citus.force_max_query_parallelization TO ON;
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
SET citus.node_connection_timeout TO 900;
|
||||||
-- suppress the warning since we can't control which shard is chose first. Failure of this
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
-- test would be if one of the queries does not return the result but an error.
|
|
||||||
SELECT count(*) FROM products;
|
SELECT count(*) FROM products;
|
||||||
SELECT count(*) FROM products;
|
RESET citus.node_connection_timeout;
|
||||||
|
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
CREATE TABLE single_replicatated(key int);
|
CREATE TABLE single_replicatated(key int);
|
||||||
SELECT create_distributed_table('single_replicatated', 'key');
|
SELECT create_distributed_table('single_replicatated', 'key');
|
||||||
|
@ -88,15 +80,17 @@ SELECT create_distributed_table('single_replicatated', 'key');
|
||||||
-- this time the table is single replicated and we're still using the
|
-- this time the table is single replicated and we're still using the
|
||||||
-- the max parallelization flag, so the query should fail
|
-- the max parallelization flag, so the query should fail
|
||||||
SET citus.force_max_query_parallelization TO ON;
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
SELECT count(*) FROM single_replicatated;
|
SELECT count(*) FROM single_replicatated;
|
||||||
|
RESET citus.force_max_query_parallelization;
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
SET citus.force_max_query_parallelization TO OFF;
|
|
||||||
|
|
||||||
-- one similar test, and this time on modification queries
|
-- one similar test, and this time on modification queries
|
||||||
-- to see that connection establishement failures could
|
-- to see that connection establishement failures could
|
||||||
-- fail the transaction (but not mark any placements as INVALID)
|
-- fail the transaction (but not mark any placements as INVALID)
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT
|
SELECT
|
||||||
count(*) as invalid_placement_count
|
count(*) as invalid_placement_count
|
||||||
|
@ -105,9 +99,12 @@ FROM
|
||||||
WHERE
|
WHERE
|
||||||
shardstate = 3 AND
|
shardstate = 3 AND
|
||||||
shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'single_replicatated'::regclass);
|
shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'single_replicatated'::regclass);
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
INSERT INTO single_replicatated VALUES (100);
|
INSERT INTO single_replicatated VALUES (100);
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
SELECT
|
SELECT
|
||||||
count(*) as invalid_placement_count
|
count(*) as invalid_placement_count
|
||||||
FROM
|
FROM
|
||||||
|
@ -116,8 +113,6 @@ WHERE
|
||||||
shardstate = 3 AND
|
shardstate = 3 AND
|
||||||
shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'single_replicatated'::regclass);
|
shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'single_replicatated'::regclass);
|
||||||
|
|
||||||
-- show that INSERT failed
|
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
|
||||||
SELECT count(*) FROM single_replicatated WHERE key = 100;
|
SELECT count(*) FROM single_replicatated WHERE key = 100;
|
||||||
|
|
||||||
|
|
||||||
|
@ -154,8 +149,11 @@ SELECT citus.mitmproxy('conn.onCommandComplete(command="SELECT 1").cancel(' || p
|
||||||
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
|
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
|
||||||
-- verify that the checks are not successful when timeouts happen on a connection
|
-- verify that the checks are not successful when timeouts happen on a connection
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
|
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
-- tests for citus_check_cluster_node_health
|
-- tests for citus_check_cluster_node_health
|
||||||
|
|
||||||
|
@ -197,7 +195,7 @@ SELECT * FROM citus_check_cluster_node_health();
|
||||||
|
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
SET citus.node_connection_timeout TO DEFAULT;
|
|
||||||
DROP SCHEMA fail_connect CASCADE;
|
DROP SCHEMA fail_connect CASCADE;
|
||||||
SET search_path TO default;
|
SET search_path TO default;
|
||||||
|
|
|
@ -23,11 +23,12 @@ INSERT INTO failover_to_local SELECT i, i::text FROM generate_series(0,20)i;
|
||||||
-- even if the connection establishment fails, Citus can
|
-- even if the connection establishment fails, Citus can
|
||||||
-- failover to local exection
|
-- failover to local exection
|
||||||
SET citus.node_connection_timeout TO 400;
|
SET citus.node_connection_timeout TO 400;
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
SELECT citus.mitmproxy('conn.connect_delay(500)');
|
||||||
SET citus.log_local_commands TO ON;
|
SET citus.log_local_commands TO ON;
|
||||||
SET client_min_messages TO DEBUG1;
|
SET client_min_messages TO DEBUG1;
|
||||||
SELECT count(*) FROM failover_to_local;
|
SELECT count(*) FROM failover_to_local;
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
-- if the remote query execution fails, Citus
|
-- if the remote query execution fails, Citus
|
||||||
|
@ -37,9 +38,11 @@ SELECT key / 0 FROM failover_to_local;
|
||||||
-- if the local execution is disabled, Citus does
|
-- if the local execution is disabled, Citus does
|
||||||
-- not try to fallback to local execution
|
-- not try to fallback to local execution
|
||||||
SET citus.enable_local_execution TO false;
|
SET citus.enable_local_execution TO false;
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
SET citus.node_connection_timeout TO 400;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(500)');
|
||||||
SET citus.log_local_commands TO ON;
|
SET citus.log_local_commands TO ON;
|
||||||
SELECT count(*) FROM failover_to_local;
|
SELECT count(*) FROM failover_to_local;
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
RESET citus.enable_local_execution;
|
RESET citus.enable_local_execution;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue