mirror of https://github.com/citusdata/citus.git
Try a fix
parent
e578f81900
commit
c588a42114
|
@ -86,6 +86,7 @@ int RepartitionJoinBucketCountPerNode = 8;
|
||||||
|
|
||||||
/* Policy to use when assigning tasks to worker nodes */
|
/* Policy to use when assigning tasks to worker nodes */
|
||||||
int TaskAssignmentPolicy = TASK_ASSIGNMENT_GREEDY;
|
int TaskAssignmentPolicy = TASK_ASSIGNMENT_GREEDY;
|
||||||
|
int TaskAssignmentRoundRobinIndex = -1;
|
||||||
bool EnableUniqueJobIds = true;
|
bool EnableUniqueJobIds = true;
|
||||||
|
|
||||||
|
|
||||||
|
@ -5220,6 +5221,12 @@ RoundRobinReorder(List *placementList)
|
||||||
uint32 activePlacementCount = list_length(placementList);
|
uint32 activePlacementCount = list_length(placementList);
|
||||||
uint32 roundRobinIndex = (transactionId % activePlacementCount);
|
uint32 roundRobinIndex = (transactionId % activePlacementCount);
|
||||||
|
|
||||||
|
if (TaskAssignmentRoundRobinIndex != -1 &&
|
||||||
|
TaskAssignmentRoundRobinIndex < activePlacementCount)
|
||||||
|
{
|
||||||
|
roundRobinIndex = TaskAssignmentRoundRobinIndex;
|
||||||
|
}
|
||||||
|
|
||||||
placementList = LeftRotateList(placementList, roundRobinIndex);
|
placementList = LeftRotateList(placementList, roundRobinIndex);
|
||||||
|
|
||||||
return placementList;
|
return placementList;
|
||||||
|
|
|
@ -2135,6 +2135,18 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_STANDARD,
|
GUC_STANDARD,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomIntVariable(
|
||||||
|
"citus.task_assignment_round_robin_index",
|
||||||
|
gettext_noop("Sets the shifting which should be used when using round robin"),
|
||||||
|
gettext_noop("This is only meant to be used during tests to get consistent "
|
||||||
|
"output and consistently trigger certain failure scenarios."),
|
||||||
|
&TaskAssignmentRoundRobinIndex,
|
||||||
|
-1, -1, INT_MAX,
|
||||||
|
PGC_SUSET,
|
||||||
|
GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
|
||||||
DefineCustomEnumVariable(
|
DefineCustomEnumVariable(
|
||||||
"citus.task_executor_type",
|
"citus.task_executor_type",
|
||||||
gettext_noop("Sets the executor type to be used for distributed queries."),
|
gettext_noop("Sets the executor type to be used for distributed queries."),
|
||||||
|
|
|
@ -527,6 +527,7 @@ typedef List *(*ReorderFunction)(List *);
|
||||||
|
|
||||||
/* Config variable managed via guc.c */
|
/* Config variable managed via guc.c */
|
||||||
extern int TaskAssignmentPolicy;
|
extern int TaskAssignmentPolicy;
|
||||||
|
extern int TaskAssignmentRoundRobinIndex;
|
||||||
extern bool EnableUniqueJobIds;
|
extern bool EnableUniqueJobIds;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -70,6 +70,21 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SELECT placementid, p.shardid, logicalrelid, groupid FROM pg_dist_placement p JOIN pg_dist_shard s ON p.shardid = s.shardid;
|
||||||
|
placementid | shardid | logicalrelid | groupid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1450000 | 1450000 | products | 2
|
||||||
|
1450001 | 1450000 | products | 1
|
||||||
|
1450002 | 1450001 | products | 1
|
||||||
|
1450003 | 1450001 | products | 2
|
||||||
|
1450004 | 1450002 | products | 2
|
||||||
|
1450005 | 1450002 | products | 1
|
||||||
|
1450006 | 1450003 | products | 1
|
||||||
|
1450007 | 1450003 | products | 2
|
||||||
|
1450008 | 1450004 | r1 | 2
|
||||||
|
1450009 | 1450004 | r1 | 1
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
SELECT citus.clear_network_traffic();
|
SELECT citus.clear_network_traffic();
|
||||||
clear_network_traffic
|
clear_network_traffic
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -82,22 +97,10 @@ SELECT citus.mitmproxy('conn.delay(500)');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- we cannot control which replica of the reference table will be queried and there is
|
|
||||||
-- only one specific client we can control the connection for.
|
|
||||||
-- by using round-robin task_assignment_policy we can force to hit both machines.
|
|
||||||
-- and in the end, dumping the network traffic shows that the connection establishment
|
|
||||||
-- is initiated to the node behind the proxy
|
|
||||||
SET client_min_messages TO ERROR;
|
|
||||||
SET citus.task_assignment_policy TO 'round-robin';
|
SET citus.task_assignment_policy TO 'round-robin';
|
||||||
-- suppress the warning since we can't control which shard is chose first. Failure of this
|
SET citus.task_assignment_round_robin_index TO 0;
|
||||||
-- test would be if one of the queries does not return the result but an error.
|
|
||||||
SELECT name FROM r1 WHERE id = 2;
|
|
||||||
name
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
bar
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT name FROM r1 WHERE id = 2;
|
SELECT name FROM r1 WHERE id = 2;
|
||||||
|
WARNING: could not establish any connections to the node localhost:xxxxx after 400 ms
|
||||||
name
|
name
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
bar
|
bar
|
||||||
|
@ -105,7 +108,7 @@ SELECT name FROM r1 WHERE id = 2;
|
||||||
|
|
||||||
-- verify a connection attempt was made to the intercepted node, this would have cause the
|
-- verify a connection attempt was made to the intercepted node, this would have cause the
|
||||||
-- connection to have been delayed and thus caused a timeout
|
-- connection to have been delayed and thus caused a timeout
|
||||||
SELECT * FROM citus.dump_network_traffic() WHERE conn=0;
|
SELECT * FROM citus.dump_network_traffic() WHERE conn=0 AND source = 'coordinator';
|
||||||
conn | source | message
|
conn | source | message
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
0 | coordinator | [initial message]
|
0 | coordinator | [initial message]
|
||||||
|
@ -127,15 +130,8 @@ SELECT citus.mitmproxy('conn.delay(500)');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- suppress the warning since we can't control which shard is chose first. Failure of this
|
|
||||||
-- test would be if one of the queries does not return the result but an error.
|
|
||||||
SELECT count(*) FROM products;
|
|
||||||
count
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
0
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT count(*) FROM products;
|
SELECT count(*) FROM products;
|
||||||
|
WARNING: could not establish any connections to the node localhost:xxxxx after 400 ms
|
||||||
count
|
count
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
0
|
0
|
||||||
|
|
|
@ -49,24 +49,19 @@ INSERT INTO r1 (id, name) VALUES
|
||||||
|
|
||||||
SELECT create_reference_table('r1');
|
SELECT create_reference_table('r1');
|
||||||
|
|
||||||
|
SELECT placementid, p.shardid, logicalrelid, groupid FROM pg_dist_placement p JOIN pg_dist_shard s ON p.shardid = s.shardid;
|
||||||
|
|
||||||
|
|
||||||
SELECT citus.clear_network_traffic();
|
SELECT citus.clear_network_traffic();
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
SELECT citus.mitmproxy('conn.delay(500)');
|
||||||
|
|
||||||
-- we cannot control which replica of the reference table will be queried and there is
|
|
||||||
-- only one specific client we can control the connection for.
|
|
||||||
-- by using round-robin task_assignment_policy we can force to hit both machines.
|
|
||||||
-- and in the end, dumping the network traffic shows that the connection establishment
|
|
||||||
-- is initiated to the node behind the proxy
|
|
||||||
SET client_min_messages TO ERROR;
|
|
||||||
SET citus.task_assignment_policy TO 'round-robin';
|
SET citus.task_assignment_policy TO 'round-robin';
|
||||||
-- suppress the warning since we can't control which shard is chose first. Failure of this
|
SET citus.task_assignment_round_robin_index TO 0;
|
||||||
-- test would be if one of the queries does not return the result but an error.
|
|
||||||
SELECT name FROM r1 WHERE id = 2;
|
|
||||||
SELECT name FROM r1 WHERE id = 2;
|
SELECT name FROM r1 WHERE id = 2;
|
||||||
|
|
||||||
-- verify a connection attempt was made to the intercepted node, this would have cause the
|
-- verify a connection attempt was made to the intercepted node, this would have cause the
|
||||||
-- connection to have been delayed and thus caused a timeout
|
-- connection to have been delayed and thus caused a timeout
|
||||||
SELECT * FROM citus.dump_network_traffic() WHERE conn=0;
|
SELECT * FROM citus.dump_network_traffic() WHERE conn=0 AND source = 'coordinator';
|
||||||
|
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
@ -75,9 +70,6 @@ SELECT citus.mitmproxy('conn.allow()');
|
||||||
-- and with citus.force_max_query_parallelization is set
|
-- and with citus.force_max_query_parallelization is set
|
||||||
SET citus.force_max_query_parallelization TO ON;
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
SELECT citus.mitmproxy('conn.delay(500)');
|
SELECT citus.mitmproxy('conn.delay(500)');
|
||||||
-- suppress the warning since we can't control which shard is chose first. Failure of this
|
|
||||||
-- test would be if one of the queries does not return the result but an error.
|
|
||||||
SELECT count(*) FROM products;
|
|
||||||
SELECT count(*) FROM products;
|
SELECT count(*) FROM products;
|
||||||
|
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
Loading…
Reference in New Issue