diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index eaecbb8e7..8e359239f 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -29,6 +29,7 @@ #include "catalog/pg_type.h" #include "commands/defrem.h" #include "commands/sequence.h" +#include "distributed/backend_data.h" #include "distributed/listutils.h" #include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodes.h" @@ -5076,14 +5077,24 @@ RoundRobinAssignTaskList(List *taskList) /* * RoundRobinReorder implements the core of the round-robin assignment policy. * It takes a task and placement list and rotates a copy of the placement list - * based on the task's jobId. The rotated copy is returned. + * based on the latest stable transaction id provided by PostgreSQL. + * + * We prefer to use transactionId as the seed for the rotation to use the replicas + * in the same worker node within the same transaction. This becomes more important + * when we're reading from (the same or multiple) reference tables within a + * transaction. With this approach, we can prevent reads to expand the worker nodes + * that participate in a distributed transaction. + * + * Note that we prefer PostgreSQL's transactionId over distributed transactionId that + * Citus generates since the distributed transactionId is generated during the execution + * where as task-assignment happens duing the planning. */ static List * RoundRobinReorder(Task *task, List *placementList) { - uint64 jobId = task->jobId; + TransactionId transactionId = GetMyProcLocalTransactionId(); uint32 activePlacementCount = list_length(placementList); - uint32 roundRobinIndex = (jobId % activePlacementCount); + uint32 roundRobinIndex = (transactionId % activePlacementCount); placementList = LeftRotateList(placementList, roundRobinIndex); diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 0ffb6d175..e790bd80b 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -993,3 +993,14 @@ ActiveDistributedTransactionNumbers(void) return activeTransactionNumberList; } + + +/* + * GetMyProcLocalTransactionId() is a wrapper for + * getting lxid of MyProc. + */ +LocalTransactionId +GetMyProcLocalTransactionId(void) +{ + return MyProc->lxid; +} diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index cc0f04ca8..17e896f61 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -67,5 +67,6 @@ extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); extern void CancelTransactionDueToDeadlock(PGPROC *proc); extern bool MyBackendGotCancelledDueToDeadlock(void); extern List * ActiveDistributedTransactionNumbers(void); +LocalTransactionId GetMyProcLocalTransactionId(void); #endif /* BACKEND_DATA_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index c4d662d5f..68831294a 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -351,6 +351,7 @@ extern List * TaskListDifference(const List *list1, const List *list2); extern List * AssignAnchorShardTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList); extern List * RoundRobinAssignTaskList(List *taskList); +extern List * RoundRobinPerTransactionAssignTaskList(List *taskList); extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement); /* function declaration for creating Task */ diff --git a/src/test/regress/expected/multi_task_assignment_policy.out b/src/test/regress/expected/multi_task_assignment_policy.out index 9b63c2ad4..686b1f9cc 100644 --- a/src/test/regress/expected/multi_task_assignment_policy.out +++ b/src/test/regress/expected/multi_task_assignment_policy.out @@ -10,6 +10,27 @@ SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine; t (1 row) +-- the function simply parses the results and returns 'shardId@worker' +-- for all the explain task outputs +CREATE OR REPLACE FUNCTION parse_explain_output(in qry text, in table_name text, out r text) +RETURNS SETOF TEXT AS $$ +DECLARE + portOfTheTask text; + shardOfTheTask text; +begin + for r in execute qry loop + IF r LIKE '%port%' THEN + portOfTheTask = substring(r, '([0-9]{1,10})'); + END IF; + + IF r LIKE '%' || table_name || '%' THEN + shardOfTheTask = substring(r, '([0-9]{1,10})'); + return QUERY SELECT shardOfTheTask || '@' || portOfTheTask ; + END IF; + + end loop; + return; +end; $$ language plpgsql; SET citus.explain_distributed_queries TO off; -- Check that our policies for assigning tasks to worker nodes run as expected. -- To test this, we first create a shell table, and then manually insert shard @@ -102,48 +123,7 @@ DEBUG: assigned task 1 to node localhost:57638 explain statements for distributed queries are not enabled (3 rows) --- Finally test the round-robin task assignment policy -SET citus.task_assignment_policy TO 'round-robin'; -EXPLAIN SELECT count(*) FROM task_assignment_test_table; -DEBUG: assigned task 3 to node localhost:57638 -DEBUG: assigned task 2 to node localhost:57638 -DEBUG: assigned task 1 to node localhost:57637 - QUERY PLAN ------------------------------------------------------------------------ - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) - explain statements for distributed queries are not enabled -(3 rows) - -EXPLAIN SELECT count(*) FROM task_assignment_test_table; -DEBUG: assigned task 3 to node localhost:57637 -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 1 to node localhost:57638 - QUERY PLAN ------------------------------------------------------------------------ - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) - explain statements for distributed queries are not enabled -(3 rows) - -EXPLAIN SELECT count(*) FROM task_assignment_test_table; -DEBUG: assigned task 3 to node localhost:57638 -DEBUG: assigned task 2 to node localhost:57638 -DEBUG: assigned task 1 to node localhost:57637 - QUERY PLAN ------------------------------------------------------------------------ - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) - explain statements for distributed queries are not enabled -(3 rows) - -RESET citus.task_assignment_policy; -RESET client_min_messages; COMMIT; -BEGIN; -SET LOCAL client_min_messages TO DEBUG3; -SET LOCAL citus.explain_distributed_queries TO off; --- Check how task_assignment_policy impact planning decisions for reference tables CREATE TABLE task_assignment_reference_table (test_id integer); SELECT create_reference_table('task_assignment_reference_table'); create_reference_table @@ -151,6 +131,10 @@ SELECT create_reference_table('task_assignment_reference_table'); (1 row) +BEGIN; +SET LOCAL client_min_messages TO DEBUG3; +SET LOCAL citus.explain_distributed_queries TO off; +-- Check how task_assignment_policy impact planning decisions for reference tables SET LOCAL citus.task_assignment_policy TO 'greedy'; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; DEBUG: Distributed planning for a fast-path router query @@ -193,31 +177,99 @@ DEBUG: Plan is router executable explain statements for distributed queries are not enabled (2 rows) --- here we expect debug output showing two different hosts for subsequent queries -SET LOCAL citus.task_assignment_policy TO 'round-robin'; -EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; -DEBUG: Distributed planning for a fast-path router query -DEBUG: assigned task 0 to node localhost:57637 -DEBUG: Creating router plan -DEBUG: Plan is router executable - QUERY PLAN --------------------------------------------------------------- - Custom Scan (Citus Router) - explain statements for distributed queries are not enabled -(2 rows) - -EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; -DEBUG: Distributed planning for a fast-path router query -DEBUG: assigned task 0 to node localhost:57638 -DEBUG: Creating router plan -DEBUG: Plan is router executable - QUERY PLAN --------------------------------------------------------------- - Custom Scan (Citus Router) - explain statements for distributed queries are not enabled -(2 rows) - ROLLBACK; +RESET client_min_messages; +-- Now, lets test round-robin policy +-- round-robin policy relies on PostgreSQL's local transactionId, +-- which might change and we don't have any control over it. +-- the important thing that we look for is that round-robin policy +-- should give the same output for executions in the same transaction +-- and different output for executions that are not insdie the +-- same transaction. To ensure that, we define a helper function +BEGIN; +SET LOCAL citus.explain_distributed_queries TO on; +CREATE TEMPORARY TABLE explain_outputs (value text); +SET LOCAL citus.task_assignment_policy TO 'round-robin'; +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); +-- given that we're in the same transaction, the count should be 1 +SELECT count(DISTINCT value) FROM explain_outputs; + count +------- + 1 +(1 row) + +DROP TABLE explain_outputs; +COMMIT; +-- now test round-robin policy outside +-- a transaction, we should see the assignements +-- change on every execution +CREATE TEMPORARY TABLE explain_outputs (value text); +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.explain_distributed_queries TO ON; +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); +-- given that we're in the same transaction, the count should be 2 +-- since there are two different worker nodes +SELECT count(DISTINCT value) FROM explain_outputs; + count +------- + 2 +(1 row) + +TRUNCATE explain_outputs; +-- same test with a distributed table +-- we keep this test because as of this commit, the code +-- paths for reference tables and distributed tables are +-- not the same +SET citus.shard_replication_factor TO 2; +CREATE TABLE task_assignment_replicated_hash (test_id integer); +SELECT create_distributed_table('task_assignment_replicated_hash', 'test_id'); + create_distributed_table +-------------------------- + +(1 row) + +BEGIN; +SET LOCAL citus.explain_distributed_queries TO on; +SET LOCAL citus.task_assignment_policy TO 'round-robin'; +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); +-- given that we're in the same transaction, the count should be 1 +SELECT count(DISTINCT value) FROM explain_outputs; + count +------- + 1 +(1 row) + +DROP TABLE explain_outputs; +COMMIT; +-- now test round-robin policy outside +-- a transaction, we should see the assignements +-- change on every execution +CREATE TEMPORARY TABLE explain_outputs (value text); +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.explain_distributed_queries TO ON; +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); +-- given that we're in the same transaction, the count should be 2 +-- since there are two different worker nodes +SELECT count(DISTINCT value) FROM explain_outputs; + count +------- + 2 +(1 row) + +RESET citus.task_assignment_policy; +RESET client_min_messages; -- we should be able to use round-robin with router queries that -- only contains intermediate results BEGIN; @@ -240,3 +292,4 @@ WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; (0 rows) ROLLBACK; +DROP TABLE task_assignment_replicated_hash, task_assignment_reference_table; diff --git a/src/test/regress/sql/multi_task_assignment_policy.sql b/src/test/regress/sql/multi_task_assignment_policy.sql index 6b2da3948..ccbc72018 100644 --- a/src/test/regress/sql/multi_task_assignment_policy.sql +++ b/src/test/regress/sql/multi_task_assignment_policy.sql @@ -9,6 +9,28 @@ SET citus.next_shard_id TO 880000; SHOW server_version \gset SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine; +-- the function simply parses the results and returns 'shardId@worker' +-- for all the explain task outputs +CREATE OR REPLACE FUNCTION parse_explain_output(in qry text, in table_name text, out r text) +RETURNS SETOF TEXT AS $$ +DECLARE + portOfTheTask text; + shardOfTheTask text; +begin + for r in execute qry loop + IF r LIKE '%port%' THEN + portOfTheTask = substring(r, '([0-9]{1,10})'); + END IF; + + IF r LIKE '%' || table_name || '%' THEN + shardOfTheTask = substring(r, '([0-9]{1,10})'); + return QUERY SELECT shardOfTheTask || '@' || portOfTheTask ; + END IF; + + end loop; + return; +end; $$ language plpgsql; + SET citus.explain_distributed_queries TO off; @@ -80,31 +102,19 @@ EXPLAIN SELECT count(*) FROM task_assignment_test_table; EXPLAIN SELECT count(*) FROM task_assignment_test_table; --- Finally test the round-robin task assignment policy - -SET citus.task_assignment_policy TO 'round-robin'; - -EXPLAIN SELECT count(*) FROM task_assignment_test_table; - -EXPLAIN SELECT count(*) FROM task_assignment_test_table; - -EXPLAIN SELECT count(*) FROM task_assignment_test_table; - -RESET citus.task_assignment_policy; -RESET client_min_messages; - COMMIT; + + +CREATE TABLE task_assignment_reference_table (test_id integer); +SELECT create_reference_table('task_assignment_reference_table'); + BEGIN; SET LOCAL client_min_messages TO DEBUG3; SET LOCAL citus.explain_distributed_queries TO off; -- Check how task_assignment_policy impact planning decisions for reference tables - -CREATE TABLE task_assignment_reference_table (test_id integer); -SELECT create_reference_table('task_assignment_reference_table'); - SET LOCAL citus.task_assignment_policy TO 'greedy'; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; @@ -113,15 +123,100 @@ SET LOCAL citus.task_assignment_policy TO 'first-replica'; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; --- here we expect debug output showing two different hosts for subsequent queries -SET LOCAL citus.task_assignment_policy TO 'round-robin'; -EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; -EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; - ROLLBACK; +RESET client_min_messages; +-- Now, lets test round-robin policy +-- round-robin policy relies on PostgreSQL's local transactionId, +-- which might change and we don't have any control over it. +-- the important thing that we look for is that round-robin policy +-- should give the same output for executions in the same transaction +-- and different output for executions that are not insdie the +-- same transaction. To ensure that, we define a helper function +BEGIN; + +SET LOCAL citus.explain_distributed_queries TO on; + +CREATE TEMPORARY TABLE explain_outputs (value text); +SET LOCAL citus.task_assignment_policy TO 'round-robin'; + +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); + +-- given that we're in the same transaction, the count should be 1 +SELECT count(DISTINCT value) FROM explain_outputs; + +DROP TABLE explain_outputs; +COMMIT; + +-- now test round-robin policy outside +-- a transaction, we should see the assignements +-- change on every execution +CREATE TEMPORARY TABLE explain_outputs (value text); + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.explain_distributed_queries TO ON; + +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); + +-- given that we're in the same transaction, the count should be 2 +-- since there are two different worker nodes +SELECT count(DISTINCT value) FROM explain_outputs; +TRUNCATE explain_outputs; + +-- same test with a distributed table +-- we keep this test because as of this commit, the code +-- paths for reference tables and distributed tables are +-- not the same +SET citus.shard_replication_factor TO 2; + +CREATE TABLE task_assignment_replicated_hash (test_id integer); +SELECT create_distributed_table('task_assignment_replicated_hash', 'test_id'); + +BEGIN; + +SET LOCAL citus.explain_distributed_queries TO on; +SET LOCAL citus.task_assignment_policy TO 'round-robin'; + +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); + +-- given that we're in the same transaction, the count should be 1 +SELECT count(DISTINCT value) FROM explain_outputs; + +DROP TABLE explain_outputs; +COMMIT; + +-- now test round-robin policy outside +-- a transaction, we should see the assignements +-- change on every execution +CREATE TEMPORARY TABLE explain_outputs (value text); + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.explain_distributed_queries TO ON; + +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); + +-- given that we're in the same transaction, the count should be 2 +-- since there are two different worker nodes +SELECT count(DISTINCT value) FROM explain_outputs; + + +RESET citus.task_assignment_policy; +RESET client_min_messages; + -- we should be able to use round-robin with router queries that -- only contains intermediate results BEGIN; @@ -133,4 +228,4 @@ SET LOCAL citus.task_assignment_policy TO 'round-robin'; WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; ROLLBACK; - +DROP TABLE task_assignment_replicated_hash, task_assignment_reference_table;