diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index c664e782f..1fe519155 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -86,6 +86,7 @@ int RepartitionJoinBucketCountPerNode = 8; /* Policy to use when assigning tasks to worker nodes */ int TaskAssignmentPolicy = TASK_ASSIGNMENT_GREEDY; +int TaskAssignmentRoundRobinIndex = -1; bool EnableUniqueJobIds = true; @@ -5220,6 +5221,12 @@ RoundRobinReorder(List *placementList) uint32 activePlacementCount = list_length(placementList); uint32 roundRobinIndex = (transactionId % activePlacementCount); + if (TaskAssignmentRoundRobinIndex != -1 && + TaskAssignmentRoundRobinIndex < activePlacementCount) + { + roundRobinIndex = TaskAssignmentRoundRobinIndex; + } + placementList = LeftRotateList(placementList, roundRobinIndex); return placementList; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index dd79f8d1e..5066c5334 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -2135,6 +2135,18 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.task_assignment_round_robin_index", + gettext_noop("Sets the shifting which should be used when using round robin"), + gettext_noop("This is only meant to be used during tests to get consistent " + "output and consistently trigger certain failure scenarios."), + &TaskAssignmentRoundRobinIndex, + -1, -1, INT_MAX, + PGC_SUSET, + GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + + DefineCustomEnumVariable( "citus.task_executor_type", gettext_noop("Sets the executor type to be used for distributed queries."), diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 484c8f517..193b2d576 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -527,6 +527,7 @@ typedef List *(*ReorderFunction)(List *); /* Config variable managed via guc.c */ extern int TaskAssignmentPolicy; +extern int TaskAssignmentRoundRobinIndex; extern bool EnableUniqueJobIds; diff --git a/src/test/regress/expected/failure_connection_establishment.out b/src/test/regress/expected/failure_connection_establishment.out index 9c44269a3..54ee0093f 100644 --- a/src/test/regress/expected/failure_connection_establishment.out +++ b/src/test/regress/expected/failure_connection_establishment.out @@ -70,6 +70,21 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) +SELECT placementid, p.shardid, logicalrelid, groupid FROM pg_dist_placement p JOIN pg_dist_shard s ON p.shardid = s.shardid; + placementid | shardid | logicalrelid | groupid +--------------------------------------------------------------------- + 1450000 | 1450000 | products | 2 + 1450001 | 1450000 | products | 1 + 1450002 | 1450001 | products | 1 + 1450003 | 1450001 | products | 2 + 1450004 | 1450002 | products | 2 + 1450005 | 1450002 | products | 1 + 1450006 | 1450003 | products | 1 + 1450007 | 1450003 | products | 2 + 1450008 | 1450004 | r1 | 2 + 1450009 | 1450004 | r1 | 1 +(10 rows) + SELECT citus.clear_network_traffic(); clear_network_traffic --------------------------------------------------------------------- @@ -82,22 +97,10 @@ SELECT citus.mitmproxy('conn.delay(500)'); (1 row) --- we cannot control which replica of the reference table will be queried and there is --- only one specific client we can control the connection for. --- by using round-robin task_assignment_policy we can force to hit both machines. --- and in the end, dumping the network traffic shows that the connection establishment --- is initiated to the node behind the proxy -SET client_min_messages TO ERROR; SET citus.task_assignment_policy TO 'round-robin'; --- suppress the warning since we can't control which shard is chose first. Failure of this --- test would be if one of the queries does not return the result but an error. -SELECT name FROM r1 WHERE id = 2; - name ---------------------------------------------------------------------- - bar -(1 row) - +SET citus.task_assignment_round_robin_index TO 0; SELECT name FROM r1 WHERE id = 2; +WARNING: could not establish any connections to the node localhost:xxxxx after 400 ms name --------------------------------------------------------------------- bar @@ -105,7 +108,7 @@ SELECT name FROM r1 WHERE id = 2; -- verify a connection attempt was made to the intercepted node, this would have cause the -- connection to have been delayed and thus caused a timeout -SELECT * FROM citus.dump_network_traffic() WHERE conn=0; +SELECT * FROM citus.dump_network_traffic() WHERE conn=0 AND source = 'coordinator'; conn | source | message --------------------------------------------------------------------- 0 | coordinator | [initial message] @@ -127,15 +130,8 @@ SELECT citus.mitmproxy('conn.delay(500)'); (1 row) --- suppress the warning since we can't control which shard is chose first. Failure of this --- test would be if one of the queries does not return the result but an error. -SELECT count(*) FROM products; - count ---------------------------------------------------------------------- - 0 -(1 row) - SELECT count(*) FROM products; +WARNING: could not establish any connections to the node localhost:xxxxx after 400 ms count --------------------------------------------------------------------- 0 diff --git a/src/test/regress/sql/failure_connection_establishment.sql b/src/test/regress/sql/failure_connection_establishment.sql index 5029d40b7..4d462fb61 100644 --- a/src/test/regress/sql/failure_connection_establishment.sql +++ b/src/test/regress/sql/failure_connection_establishment.sql @@ -49,24 +49,19 @@ INSERT INTO r1 (id, name) VALUES SELECT create_reference_table('r1'); +SELECT placementid, p.shardid, logicalrelid, groupid FROM pg_dist_placement p JOIN pg_dist_shard s ON p.shardid = s.shardid; + + SELECT citus.clear_network_traffic(); SELECT citus.mitmproxy('conn.delay(500)'); --- we cannot control which replica of the reference table will be queried and there is --- only one specific client we can control the connection for. --- by using round-robin task_assignment_policy we can force to hit both machines. --- and in the end, dumping the network traffic shows that the connection establishment --- is initiated to the node behind the proxy -SET client_min_messages TO ERROR; SET citus.task_assignment_policy TO 'round-robin'; --- suppress the warning since we can't control which shard is chose first. Failure of this --- test would be if one of the queries does not return the result but an error. -SELECT name FROM r1 WHERE id = 2; +SET citus.task_assignment_round_robin_index TO 0; SELECT name FROM r1 WHERE id = 2; -- verify a connection attempt was made to the intercepted node, this would have cause the -- connection to have been delayed and thus caused a timeout -SELECT * FROM citus.dump_network_traffic() WHERE conn=0; +SELECT * FROM citus.dump_network_traffic() WHERE conn=0 AND source = 'coordinator'; SELECT citus.mitmproxy('conn.allow()'); @@ -75,9 +70,6 @@ SELECT citus.mitmproxy('conn.allow()'); -- and with citus.force_max_query_parallelization is set SET citus.force_max_query_parallelization TO ON; SELECT citus.mitmproxy('conn.delay(500)'); --- suppress the warning since we can't control which shard is chose first. Failure of this --- test would be if one of the queries does not return the result but an error. -SELECT count(*) FROM products; SELECT count(*) FROM products; SELECT citus.mitmproxy('conn.allow()');