From f706772b2f465959a5b95bd8056efd78f2019410 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 21 Feb 2019 18:02:58 +0300 Subject: [PATCH] Round-robin task assignment policy relies on local transaction id Before this commit, round-robin task assignment policy was relying on the taskId. Thus, even inside a transaction, the tasks were assigned to different nodes. This was especially problematic while reading from reference tables within transaction blocks. Because, we had to expand the distributed transaction to many nodes that are not necessarily already in the distributed transaction. --- .../planner/multi_physical_planner.c | 17 +- .../distributed/transaction/backend_data.c | 11 ++ src/include/distributed/backend_data.h | 1 + .../distributed/multi_physical_planner.h | 1 + .../expected/multi_task_assignment_policy.out | 183 +++++++++++------- .../sql/multi_task_assignment_policy.sql | 141 +++++++++++--- 6 files changed, 263 insertions(+), 91 deletions(-) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index eaecbb8e7..8e359239f 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -29,6 +29,7 @@ #include "catalog/pg_type.h" #include "commands/defrem.h" #include "commands/sequence.h" +#include "distributed/backend_data.h" #include "distributed/listutils.h" #include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodes.h" @@ -5076,14 +5077,24 @@ RoundRobinAssignTaskList(List *taskList) /* * RoundRobinReorder implements the core of the round-robin assignment policy. * It takes a task and placement list and rotates a copy of the placement list - * based on the task's jobId. The rotated copy is returned. + * based on the latest stable transaction id provided by PostgreSQL. + * + * We prefer to use transactionId as the seed for the rotation to use the replicas + * in the same worker node within the same transaction. This becomes more important + * when we're reading from (the same or multiple) reference tables within a + * transaction. With this approach, we can prevent reads to expand the worker nodes + * that participate in a distributed transaction. + * + * Note that we prefer PostgreSQL's transactionId over distributed transactionId that + * Citus generates since the distributed transactionId is generated during the execution + * where as task-assignment happens duing the planning. */ static List * RoundRobinReorder(Task *task, List *placementList) { - uint64 jobId = task->jobId; + TransactionId transactionId = GetMyProcLocalTransactionId(); uint32 activePlacementCount = list_length(placementList); - uint32 roundRobinIndex = (jobId % activePlacementCount); + uint32 roundRobinIndex = (transactionId % activePlacementCount); placementList = LeftRotateList(placementList, roundRobinIndex); diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 0ffb6d175..e790bd80b 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -993,3 +993,14 @@ ActiveDistributedTransactionNumbers(void) return activeTransactionNumberList; } + + +/* + * GetMyProcLocalTransactionId() is a wrapper for + * getting lxid of MyProc. + */ +LocalTransactionId +GetMyProcLocalTransactionId(void) +{ + return MyProc->lxid; +} diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index cc0f04ca8..17e896f61 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -67,5 +67,6 @@ extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); extern void CancelTransactionDueToDeadlock(PGPROC *proc); extern bool MyBackendGotCancelledDueToDeadlock(void); extern List * ActiveDistributedTransactionNumbers(void); +LocalTransactionId GetMyProcLocalTransactionId(void); #endif /* BACKEND_DATA_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index c4d662d5f..68831294a 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -351,6 +351,7 @@ extern List * TaskListDifference(const List *list1, const List *list2); extern List * AssignAnchorShardTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList); extern List * RoundRobinAssignTaskList(List *taskList); +extern List * RoundRobinPerTransactionAssignTaskList(List *taskList); extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement); /* function declaration for creating Task */ diff --git a/src/test/regress/expected/multi_task_assignment_policy.out b/src/test/regress/expected/multi_task_assignment_policy.out index 9b63c2ad4..686b1f9cc 100644 --- a/src/test/regress/expected/multi_task_assignment_policy.out +++ b/src/test/regress/expected/multi_task_assignment_policy.out @@ -10,6 +10,27 @@ SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine; t (1 row) +-- the function simply parses the results and returns 'shardId@worker' +-- for all the explain task outputs +CREATE OR REPLACE FUNCTION parse_explain_output(in qry text, in table_name text, out r text) +RETURNS SETOF TEXT AS $$ +DECLARE + portOfTheTask text; + shardOfTheTask text; +begin + for r in execute qry loop + IF r LIKE '%port%' THEN + portOfTheTask = substring(r, '([0-9]{1,10})'); + END IF; + + IF r LIKE '%' || table_name || '%' THEN + shardOfTheTask = substring(r, '([0-9]{1,10})'); + return QUERY SELECT shardOfTheTask || '@' || portOfTheTask ; + END IF; + + end loop; + return; +end; $$ language plpgsql; SET citus.explain_distributed_queries TO off; -- Check that our policies for assigning tasks to worker nodes run as expected. -- To test this, we first create a shell table, and then manually insert shard @@ -102,48 +123,7 @@ DEBUG: assigned task 1 to node localhost:57638 explain statements for distributed queries are not enabled (3 rows) --- Finally test the round-robin task assignment policy -SET citus.task_assignment_policy TO 'round-robin'; -EXPLAIN SELECT count(*) FROM task_assignment_test_table; -DEBUG: assigned task 3 to node localhost:57638 -DEBUG: assigned task 2 to node localhost:57638 -DEBUG: assigned task 1 to node localhost:57637 - QUERY PLAN ------------------------------------------------------------------------ - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) - explain statements for distributed queries are not enabled -(3 rows) - -EXPLAIN SELECT count(*) FROM task_assignment_test_table; -DEBUG: assigned task 3 to node localhost:57637 -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 1 to node localhost:57638 - QUERY PLAN ------------------------------------------------------------------------ - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) - explain statements for distributed queries are not enabled -(3 rows) - -EXPLAIN SELECT count(*) FROM task_assignment_test_table; -DEBUG: assigned task 3 to node localhost:57638 -DEBUG: assigned task 2 to node localhost:57638 -DEBUG: assigned task 1 to node localhost:57637 - QUERY PLAN ------------------------------------------------------------------------ - Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) - explain statements for distributed queries are not enabled -(3 rows) - -RESET citus.task_assignment_policy; -RESET client_min_messages; COMMIT; -BEGIN; -SET LOCAL client_min_messages TO DEBUG3; -SET LOCAL citus.explain_distributed_queries TO off; --- Check how task_assignment_policy impact planning decisions for reference tables CREATE TABLE task_assignment_reference_table (test_id integer); SELECT create_reference_table('task_assignment_reference_table'); create_reference_table @@ -151,6 +131,10 @@ SELECT create_reference_table('task_assignment_reference_table'); (1 row) +BEGIN; +SET LOCAL client_min_messages TO DEBUG3; +SET LOCAL citus.explain_distributed_queries TO off; +-- Check how task_assignment_policy impact planning decisions for reference tables SET LOCAL citus.task_assignment_policy TO 'greedy'; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; DEBUG: Distributed planning for a fast-path router query @@ -193,31 +177,99 @@ DEBUG: Plan is router executable explain statements for distributed queries are not enabled (2 rows) --- here we expect debug output showing two different hosts for subsequent queries -SET LOCAL citus.task_assignment_policy TO 'round-robin'; -EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; -DEBUG: Distributed planning for a fast-path router query -DEBUG: assigned task 0 to node localhost:57637 -DEBUG: Creating router plan -DEBUG: Plan is router executable - QUERY PLAN --------------------------------------------------------------- - Custom Scan (Citus Router) - explain statements for distributed queries are not enabled -(2 rows) - -EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; -DEBUG: Distributed planning for a fast-path router query -DEBUG: assigned task 0 to node localhost:57638 -DEBUG: Creating router plan -DEBUG: Plan is router executable - QUERY PLAN --------------------------------------------------------------- - Custom Scan (Citus Router) - explain statements for distributed queries are not enabled -(2 rows) - ROLLBACK; +RESET client_min_messages; +-- Now, lets test round-robin policy +-- round-robin policy relies on PostgreSQL's local transactionId, +-- which might change and we don't have any control over it. +-- the important thing that we look for is that round-robin policy +-- should give the same output for executions in the same transaction +-- and different output for executions that are not insdie the +-- same transaction. To ensure that, we define a helper function +BEGIN; +SET LOCAL citus.explain_distributed_queries TO on; +CREATE TEMPORARY TABLE explain_outputs (value text); +SET LOCAL citus.task_assignment_policy TO 'round-robin'; +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); +-- given that we're in the same transaction, the count should be 1 +SELECT count(DISTINCT value) FROM explain_outputs; + count +------- + 1 +(1 row) + +DROP TABLE explain_outputs; +COMMIT; +-- now test round-robin policy outside +-- a transaction, we should see the assignements +-- change on every execution +CREATE TEMPORARY TABLE explain_outputs (value text); +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.explain_distributed_queries TO ON; +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); +-- given that we're in the same transaction, the count should be 2 +-- since there are two different worker nodes +SELECT count(DISTINCT value) FROM explain_outputs; + count +------- + 2 +(1 row) + +TRUNCATE explain_outputs; +-- same test with a distributed table +-- we keep this test because as of this commit, the code +-- paths for reference tables and distributed tables are +-- not the same +SET citus.shard_replication_factor TO 2; +CREATE TABLE task_assignment_replicated_hash (test_id integer); +SELECT create_distributed_table('task_assignment_replicated_hash', 'test_id'); + create_distributed_table +-------------------------- + +(1 row) + +BEGIN; +SET LOCAL citus.explain_distributed_queries TO on; +SET LOCAL citus.task_assignment_policy TO 'round-robin'; +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); +-- given that we're in the same transaction, the count should be 1 +SELECT count(DISTINCT value) FROM explain_outputs; + count +------- + 1 +(1 row) + +DROP TABLE explain_outputs; +COMMIT; +-- now test round-robin policy outside +-- a transaction, we should see the assignements +-- change on every execution +CREATE TEMPORARY TABLE explain_outputs (value text); +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.explain_distributed_queries TO ON; +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); +-- given that we're in the same transaction, the count should be 2 +-- since there are two different worker nodes +SELECT count(DISTINCT value) FROM explain_outputs; + count +------- + 2 +(1 row) + +RESET citus.task_assignment_policy; +RESET client_min_messages; -- we should be able to use round-robin with router queries that -- only contains intermediate results BEGIN; @@ -240,3 +292,4 @@ WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; (0 rows) ROLLBACK; +DROP TABLE task_assignment_replicated_hash, task_assignment_reference_table; diff --git a/src/test/regress/sql/multi_task_assignment_policy.sql b/src/test/regress/sql/multi_task_assignment_policy.sql index 6b2da3948..ccbc72018 100644 --- a/src/test/regress/sql/multi_task_assignment_policy.sql +++ b/src/test/regress/sql/multi_task_assignment_policy.sql @@ -9,6 +9,28 @@ SET citus.next_shard_id TO 880000; SHOW server_version \gset SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine; +-- the function simply parses the results and returns 'shardId@worker' +-- for all the explain task outputs +CREATE OR REPLACE FUNCTION parse_explain_output(in qry text, in table_name text, out r text) +RETURNS SETOF TEXT AS $$ +DECLARE + portOfTheTask text; + shardOfTheTask text; +begin + for r in execute qry loop + IF r LIKE '%port%' THEN + portOfTheTask = substring(r, '([0-9]{1,10})'); + END IF; + + IF r LIKE '%' || table_name || '%' THEN + shardOfTheTask = substring(r, '([0-9]{1,10})'); + return QUERY SELECT shardOfTheTask || '@' || portOfTheTask ; + END IF; + + end loop; + return; +end; $$ language plpgsql; + SET citus.explain_distributed_queries TO off; @@ -80,31 +102,19 @@ EXPLAIN SELECT count(*) FROM task_assignment_test_table; EXPLAIN SELECT count(*) FROM task_assignment_test_table; --- Finally test the round-robin task assignment policy - -SET citus.task_assignment_policy TO 'round-robin'; - -EXPLAIN SELECT count(*) FROM task_assignment_test_table; - -EXPLAIN SELECT count(*) FROM task_assignment_test_table; - -EXPLAIN SELECT count(*) FROM task_assignment_test_table; - -RESET citus.task_assignment_policy; -RESET client_min_messages; - COMMIT; + + +CREATE TABLE task_assignment_reference_table (test_id integer); +SELECT create_reference_table('task_assignment_reference_table'); + BEGIN; SET LOCAL client_min_messages TO DEBUG3; SET LOCAL citus.explain_distributed_queries TO off; -- Check how task_assignment_policy impact planning decisions for reference tables - -CREATE TABLE task_assignment_reference_table (test_id integer); -SELECT create_reference_table('task_assignment_reference_table'); - SET LOCAL citus.task_assignment_policy TO 'greedy'; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; @@ -113,15 +123,100 @@ SET LOCAL citus.task_assignment_policy TO 'first-replica'; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; --- here we expect debug output showing two different hosts for subsequent queries -SET LOCAL citus.task_assignment_policy TO 'round-robin'; -EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; -EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; - ROLLBACK; +RESET client_min_messages; +-- Now, lets test round-robin policy +-- round-robin policy relies on PostgreSQL's local transactionId, +-- which might change and we don't have any control over it. +-- the important thing that we look for is that round-robin policy +-- should give the same output for executions in the same transaction +-- and different output for executions that are not insdie the +-- same transaction. To ensure that, we define a helper function +BEGIN; + +SET LOCAL citus.explain_distributed_queries TO on; + +CREATE TEMPORARY TABLE explain_outputs (value text); +SET LOCAL citus.task_assignment_policy TO 'round-robin'; + +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); + +-- given that we're in the same transaction, the count should be 1 +SELECT count(DISTINCT value) FROM explain_outputs; + +DROP TABLE explain_outputs; +COMMIT; + +-- now test round-robin policy outside +-- a transaction, we should see the assignements +-- change on every execution +CREATE TEMPORARY TABLE explain_outputs (value text); + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.explain_distributed_queries TO ON; + +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table'); + +-- given that we're in the same transaction, the count should be 2 +-- since there are two different worker nodes +SELECT count(DISTINCT value) FROM explain_outputs; +TRUNCATE explain_outputs; + +-- same test with a distributed table +-- we keep this test because as of this commit, the code +-- paths for reference tables and distributed tables are +-- not the same +SET citus.shard_replication_factor TO 2; + +CREATE TABLE task_assignment_replicated_hash (test_id integer); +SELECT create_distributed_table('task_assignment_replicated_hash', 'test_id'); + +BEGIN; + +SET LOCAL citus.explain_distributed_queries TO on; +SET LOCAL citus.task_assignment_policy TO 'round-robin'; + +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); + +-- given that we're in the same transaction, the count should be 1 +SELECT count(DISTINCT value) FROM explain_outputs; + +DROP TABLE explain_outputs; +COMMIT; + +-- now test round-robin policy outside +-- a transaction, we should see the assignements +-- change on every execution +CREATE TEMPORARY TABLE explain_outputs (value text); + +SET citus.task_assignment_policy TO 'round-robin'; +SET citus.explain_distributed_queries TO ON; + +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); +INSERT INTO explain_outputs + SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash'); + +-- given that we're in the same transaction, the count should be 2 +-- since there are two different worker nodes +SELECT count(DISTINCT value) FROM explain_outputs; + + +RESET citus.task_assignment_policy; +RESET client_min_messages; + -- we should be able to use round-robin with router queries that -- only contains intermediate results BEGIN; @@ -133,4 +228,4 @@ SET LOCAL citus.task_assignment_policy TO 'round-robin'; WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; ROLLBACK; - +DROP TABLE task_assignment_replicated_hash, task_assignment_reference_table;