Hopefully fix all issues

fix-flaky-failure_connection_establishment5
Jelte Fennema 2022-08-23 09:49:25 +02:00
parent a6ae5756ef
commit f9476898d7
5 changed files with 35 additions and 90 deletions

View File

@ -86,7 +86,6 @@ int RepartitionJoinBucketCountPerNode = 8;
/* Policy to use when assigning tasks to worker nodes */
int TaskAssignmentPolicy = TASK_ASSIGNMENT_GREEDY;
int TaskAssignmentRoundRobinIndex = -1;
bool EnableUniqueJobIds = true;
@ -5221,12 +5220,6 @@ RoundRobinReorder(List *placementList)
uint32 activePlacementCount = list_length(placementList);
uint32 roundRobinIndex = (transactionId % activePlacementCount);
if (TaskAssignmentRoundRobinIndex != -1 &&
TaskAssignmentRoundRobinIndex < activePlacementCount)
{
roundRobinIndex = TaskAssignmentRoundRobinIndex;
}
placementList = LeftRotateList(placementList, roundRobinIndex);
return placementList;

View File

@ -2135,18 +2135,6 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.task_assignment_round_robin_index",
gettext_noop("Sets the shifting which should be used when using round robin"),
gettext_noop("This is only meant to be used during tests to get consistent "
"output and consistently trigger certain failure scenarios."),
&TaskAssignmentRoundRobinIndex,
-1, -1, INT_MAX,
PGC_SUSET,
GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.task_executor_type",
gettext_noop("Sets the executor type to be used for distributed queries."),

View File

@ -527,7 +527,6 @@ typedef List *(*ReorderFunction)(List *);
/* Config variable managed via guc.c */
extern int TaskAssignmentPolicy;
extern int TaskAssignmentRoundRobinIndex;
extern bool EnableUniqueJobIds;

View File

@ -34,25 +34,6 @@ SELECT create_distributed_table('products', 'product_no');
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(name);
ERROR: cannot create constraint on "products"
DETAIL: Distributed relations cannot have UNIQUE, EXCLUDE, or PRIMARY KEY constraints that do not include the partition column (with an equality operator if EXCLUDE).
-- we will insert a connection delay here as this query was the cause for an investigation
-- into connection establishment problems
SET citus.node_connection_timeout TO 400;
SELECT citus.mitmproxy('conn.delay(500)');
mitmproxy
---------------------------------------------------------------------
(1 row)
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no);
WARNING: could not establish connection after 400 ms
ERROR: connection to the remote node localhost:xxxxx failed
RESET citus.node_connection_timeout;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CREATE TABLE r1 (
id int PRIMARY KEY,
name text
@ -71,6 +52,9 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut
(1 row)
-- Confirm that the first placement for both tables is on the second worker
-- node. This is necessary so we can use the first-replica task assignment
-- policy to first hit the node that we generate timeouts for.
SELECT placementid, p.shardid, logicalrelid, LEAST(2, groupid) groupid
FROM pg_dist_placement p JOIN pg_dist_shard s ON p.shardid = s.shardid
ORDER BY placementid;
@ -88,12 +72,21 @@ ORDER BY placementid;
1450009 | 1450004 | r1 | 1
(10 rows)
SELECT citus.clear_network_traffic();
clear_network_traffic
---------------------------------------------------------------------
(1 row)
SET citus.task_assignment_policy TO 'first-replica';
-- we will insert a connection delay here as this query was the cause for an
-- investigation into connection establishment problems
SET citus.node_connection_timeout TO 400;
SELECT citus.mitmproxy('conn.delay(500)');
mitmproxy
---------------------------------------------------------------------
(1 row)
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no);
WARNING: could not establish connection after 400 ms
ERROR: connection to the remote node localhost:xxxxx failed
-- Make sure that we fall back to a working node for reads, even if it's not
-- the first choice in our task assignment policy.
SET citus.node_connection_timeout TO 400;
SELECT citus.mitmproxy('conn.delay(500)');
mitmproxy
@ -101,8 +94,6 @@ SELECT citus.mitmproxy('conn.delay(500)');
(1 row)
SET citus.task_assignment_policy TO 'round-robin';
SET citus.task_assignment_round_robin_index TO 0;
SELECT name FROM r1 WHERE id = 2;
WARNING: could not establish any connections to the node localhost:xxxxx after 400 ms
name
@ -110,24 +101,8 @@ WARNING: could not establish any connections to the node localhost:xxxxx after
bar
(1 row)
-- verify a connection attempt was made to the intercepted node, this would have cause the
-- connection to have been delayed and thus caused a timeout
SELECT * FROM citus.dump_network_traffic() WHERE conn=0 AND source = 'coordinator';
conn | source | message
---------------------------------------------------------------------
0 | coordinator | [initial message]
(1 row)
RESET citus.node_connection_timeout;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
-- similar test with the above but this time on a
-- distributed table instead of a reference table
-- and with citus.force_max_query_parallelization is set
-- similar test with the above but this time on a distributed table instead of
-- a reference table and with citus.force_max_query_parallelization is set
SET citus.force_max_query_parallelization TO ON;
SET citus.node_connection_timeout TO 400;
SELECT citus.mitmproxy('conn.delay(500)');

View File

@ -28,17 +28,6 @@ SELECT create_distributed_table('products', 'product_no');
-- Command below should error out since 'name' is not a distribution column
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(name);
-- we will insert a connection delay here as this query was the cause for an investigation
-- into connection establishment problems
SET citus.node_connection_timeout TO 400;
SELECT citus.mitmproxy('conn.delay(500)');
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no);
RESET citus.node_connection_timeout;
SELECT citus.mitmproxy('conn.allow()');
CREATE TABLE r1 (
id int PRIMARY KEY,
name text
@ -50,29 +39,30 @@ INSERT INTO r1 (id, name) VALUES
SELECT create_reference_table('r1');
-- Confirm that the first placement for both tables is on the second worker
-- node. This is necessary so we can use the first-replica task assignment
-- policy to first hit the node that we generate timeouts for.
SELECT placementid, p.shardid, logicalrelid, LEAST(2, groupid) groupid
FROM pg_dist_placement p JOIN pg_dist_shard s ON p.shardid = s.shardid
ORDER BY placementid;
SET citus.task_assignment_policy TO 'first-replica';
SELECT citus.clear_network_traffic();
-- we will insert a connection delay here as this query was the cause for an
-- investigation into connection establishment problems
SET citus.node_connection_timeout TO 400;
SELECT citus.mitmproxy('conn.delay(500)');
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no);
-- Make sure that we fall back to a working node for reads, even if it's not
-- the first choice in our task assignment policy.
SET citus.node_connection_timeout TO 400;
SELECT citus.mitmproxy('conn.delay(500)');
SET citus.task_assignment_policy TO 'round-robin';
SET citus.task_assignment_round_robin_index TO 0;
SELECT name FROM r1 WHERE id = 2;
-- verify a connection attempt was made to the intercepted node, this would have cause the
-- connection to have been delayed and thus caused a timeout
SELECT * FROM citus.dump_network_traffic() WHERE conn=0 AND source = 'coordinator';
RESET citus.node_connection_timeout;
SELECT citus.mitmproxy('conn.allow()');
-- similar test with the above but this time on a
-- distributed table instead of a reference table
-- and with citus.force_max_query_parallelization is set
-- similar test with the above but this time on a distributed table instead of
-- a reference table and with citus.force_max_query_parallelization is set
SET citus.force_max_query_parallelization TO ON;
SET citus.node_connection_timeout TO 400;
SELECT citus.mitmproxy('conn.delay(500)');