Round-robin task assignment policy relies on local transaction id

Before this commit, round-robin task assignment policy was relying
on the taskId. Thus, even inside a transaction, the tasks were
assigned to different nodes. This was especially problematic
while reading from reference tables within transaction blocks.
Because, we had to expand the distributed transaction to many
nodes that are not necessarily already in the distributed transaction.
pull/2610/head
Onder Kalaci 2019-02-21 18:02:58 +03:00
parent acc2b0a387
commit f706772b2f
6 changed files with 263 additions and 91 deletions

View File

@ -29,6 +29,7 @@
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "commands/sequence.h" #include "commands/sequence.h"
#include "distributed/backend_data.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
@ -5076,14 +5077,24 @@ RoundRobinAssignTaskList(List *taskList)
/* /*
* RoundRobinReorder implements the core of the round-robin assignment policy. * RoundRobinReorder implements the core of the round-robin assignment policy.
* It takes a task and placement list and rotates a copy of the placement list * It takes a task and placement list and rotates a copy of the placement list
* based on the task's jobId. The rotated copy is returned. * based on the latest stable transaction id provided by PostgreSQL.
*
* We prefer to use transactionId as the seed for the rotation to use the replicas
* in the same worker node within the same transaction. This becomes more important
* when we're reading from (the same or multiple) reference tables within a
* transaction. With this approach, we can prevent reads to expand the worker nodes
* that participate in a distributed transaction.
*
* Note that we prefer PostgreSQL's transactionId over distributed transactionId that
* Citus generates since the distributed transactionId is generated during the execution
* where as task-assignment happens duing the planning.
*/ */
static List * static List *
RoundRobinReorder(Task *task, List *placementList) RoundRobinReorder(Task *task, List *placementList)
{ {
uint64 jobId = task->jobId; TransactionId transactionId = GetMyProcLocalTransactionId();
uint32 activePlacementCount = list_length(placementList); uint32 activePlacementCount = list_length(placementList);
uint32 roundRobinIndex = (jobId % activePlacementCount); uint32 roundRobinIndex = (transactionId % activePlacementCount);
placementList = LeftRotateList(placementList, roundRobinIndex); placementList = LeftRotateList(placementList, roundRobinIndex);

View File

@ -993,3 +993,14 @@ ActiveDistributedTransactionNumbers(void)
return activeTransactionNumberList; return activeTransactionNumberList;
} }
/*
* GetMyProcLocalTransactionId() is a wrapper for
* getting lxid of MyProc.
*/
LocalTransactionId
GetMyProcLocalTransactionId(void)
{
return MyProc->lxid;
}

View File

@ -67,5 +67,6 @@ extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
extern void CancelTransactionDueToDeadlock(PGPROC *proc); extern void CancelTransactionDueToDeadlock(PGPROC *proc);
extern bool MyBackendGotCancelledDueToDeadlock(void); extern bool MyBackendGotCancelledDueToDeadlock(void);
extern List * ActiveDistributedTransactionNumbers(void); extern List * ActiveDistributedTransactionNumbers(void);
LocalTransactionId GetMyProcLocalTransactionId(void);
#endif /* BACKEND_DATA_H */ #endif /* BACKEND_DATA_H */

View File

@ -351,6 +351,7 @@ extern List * TaskListDifference(const List *list1, const List *list2);
extern List * AssignAnchorShardTaskList(List *taskList); extern List * AssignAnchorShardTaskList(List *taskList);
extern List * FirstReplicaAssignTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList);
extern List * RoundRobinAssignTaskList(List *taskList); extern List * RoundRobinAssignTaskList(List *taskList);
extern List * RoundRobinPerTransactionAssignTaskList(List *taskList);
extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement); extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement);
/* function declaration for creating Task */ /* function declaration for creating Task */

View File

@ -10,6 +10,27 @@ SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
t t
(1 row) (1 row)
-- the function simply parses the results and returns 'shardId@worker'
-- for all the explain task outputs
CREATE OR REPLACE FUNCTION parse_explain_output(in qry text, in table_name text, out r text)
RETURNS SETOF TEXT AS $$
DECLARE
portOfTheTask text;
shardOfTheTask text;
begin
for r in execute qry loop
IF r LIKE '%port%' THEN
portOfTheTask = substring(r, '([0-9]{1,10})');
END IF;
IF r LIKE '%' || table_name || '%' THEN
shardOfTheTask = substring(r, '([0-9]{1,10})');
return QUERY SELECT shardOfTheTask || '@' || portOfTheTask ;
END IF;
end loop;
return;
end; $$ language plpgsql;
SET citus.explain_distributed_queries TO off; SET citus.explain_distributed_queries TO off;
-- Check that our policies for assigning tasks to worker nodes run as expected. -- Check that our policies for assigning tasks to worker nodes run as expected.
-- To test this, we first create a shell table, and then manually insert shard -- To test this, we first create a shell table, and then manually insert shard
@ -102,48 +123,7 @@ DEBUG: assigned task 1 to node localhost:57638
explain statements for distributed queries are not enabled explain statements for distributed queries are not enabled
(3 rows) (3 rows)
-- Finally test the round-robin task assignment policy
SET citus.task_assignment_policy TO 'round-robin';
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 1 to node localhost:57637
QUERY PLAN
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
DEBUG: assigned task 3 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57637
DEBUG: assigned task 1 to node localhost:57638
QUERY PLAN
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 1 to node localhost:57637
QUERY PLAN
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
RESET citus.task_assignment_policy;
RESET client_min_messages;
COMMIT; COMMIT;
BEGIN;
SET LOCAL client_min_messages TO DEBUG3;
SET LOCAL citus.explain_distributed_queries TO off;
-- Check how task_assignment_policy impact planning decisions for reference tables
CREATE TABLE task_assignment_reference_table (test_id integer); CREATE TABLE task_assignment_reference_table (test_id integer);
SELECT create_reference_table('task_assignment_reference_table'); SELECT create_reference_table('task_assignment_reference_table');
create_reference_table create_reference_table
@ -151,6 +131,10 @@ SELECT create_reference_table('task_assignment_reference_table');
(1 row) (1 row)
BEGIN;
SET LOCAL client_min_messages TO DEBUG3;
SET LOCAL citus.explain_distributed_queries TO off;
-- Check how task_assignment_policy impact planning decisions for reference tables
SET LOCAL citus.task_assignment_policy TO 'greedy'; SET LOCAL citus.task_assignment_policy TO 'greedy';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: Distributed planning for a fast-path router query DEBUG: Distributed planning for a fast-path router query
@ -193,31 +177,99 @@ DEBUG: Plan is router executable
explain statements for distributed queries are not enabled explain statements for distributed queries are not enabled
(2 rows) (2 rows)
-- here we expect debug output showing two different hosts for subsequent queries
SET LOCAL citus.task_assignment_policy TO 'round-robin';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: Distributed planning for a fast-path router query
DEBUG: assigned task 0 to node localhost:57637
DEBUG: Creating router plan
DEBUG: Plan is router executable
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: Distributed planning for a fast-path router query
DEBUG: assigned task 0 to node localhost:57638
DEBUG: Creating router plan
DEBUG: Plan is router executable
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
ROLLBACK; ROLLBACK;
RESET client_min_messages;
-- Now, lets test round-robin policy
-- round-robin policy relies on PostgreSQL's local transactionId,
-- which might change and we don't have any control over it.
-- the important thing that we look for is that round-robin policy
-- should give the same output for executions in the same transaction
-- and different output for executions that are not insdie the
-- same transaction. To ensure that, we define a helper function
BEGIN;
SET LOCAL citus.explain_distributed_queries TO on;
CREATE TEMPORARY TABLE explain_outputs (value text);
SET LOCAL citus.task_assignment_policy TO 'round-robin';
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
-- given that we're in the same transaction, the count should be 1
SELECT count(DISTINCT value) FROM explain_outputs;
count
-------
1
(1 row)
DROP TABLE explain_outputs;
COMMIT;
-- now test round-robin policy outside
-- a transaction, we should see the assignements
-- change on every execution
CREATE TEMPORARY TABLE explain_outputs (value text);
SET citus.task_assignment_policy TO 'round-robin';
SET citus.explain_distributed_queries TO ON;
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
-- given that we're in the same transaction, the count should be 2
-- since there are two different worker nodes
SELECT count(DISTINCT value) FROM explain_outputs;
count
-------
2
(1 row)
TRUNCATE explain_outputs;
-- same test with a distributed table
-- we keep this test because as of this commit, the code
-- paths for reference tables and distributed tables are
-- not the same
SET citus.shard_replication_factor TO 2;
CREATE TABLE task_assignment_replicated_hash (test_id integer);
SELECT create_distributed_table('task_assignment_replicated_hash', 'test_id');
create_distributed_table
--------------------------
(1 row)
BEGIN;
SET LOCAL citus.explain_distributed_queries TO on;
SET LOCAL citus.task_assignment_policy TO 'round-robin';
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
-- given that we're in the same transaction, the count should be 1
SELECT count(DISTINCT value) FROM explain_outputs;
count
-------
1
(1 row)
DROP TABLE explain_outputs;
COMMIT;
-- now test round-robin policy outside
-- a transaction, we should see the assignements
-- change on every execution
CREATE TEMPORARY TABLE explain_outputs (value text);
SET citus.task_assignment_policy TO 'round-robin';
SET citus.explain_distributed_queries TO ON;
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
-- given that we're in the same transaction, the count should be 2
-- since there are two different worker nodes
SELECT count(DISTINCT value) FROM explain_outputs;
count
-------
2
(1 row)
RESET citus.task_assignment_policy;
RESET client_min_messages;
-- we should be able to use round-robin with router queries that -- we should be able to use round-robin with router queries that
-- only contains intermediate results -- only contains intermediate results
BEGIN; BEGIN;
@ -240,3 +292,4 @@ WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1;
(0 rows) (0 rows)
ROLLBACK; ROLLBACK;
DROP TABLE task_assignment_replicated_hash, task_assignment_reference_table;

View File

@ -9,6 +9,28 @@ SET citus.next_shard_id TO 880000;
SHOW server_version \gset SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine; SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
-- the function simply parses the results and returns 'shardId@worker'
-- for all the explain task outputs
CREATE OR REPLACE FUNCTION parse_explain_output(in qry text, in table_name text, out r text)
RETURNS SETOF TEXT AS $$
DECLARE
portOfTheTask text;
shardOfTheTask text;
begin
for r in execute qry loop
IF r LIKE '%port%' THEN
portOfTheTask = substring(r, '([0-9]{1,10})');
END IF;
IF r LIKE '%' || table_name || '%' THEN
shardOfTheTask = substring(r, '([0-9]{1,10})');
return QUERY SELECT shardOfTheTask || '@' || portOfTheTask ;
END IF;
end loop;
return;
end; $$ language plpgsql;
SET citus.explain_distributed_queries TO off; SET citus.explain_distributed_queries TO off;
@ -80,31 +102,19 @@ EXPLAIN SELECT count(*) FROM task_assignment_test_table;
EXPLAIN SELECT count(*) FROM task_assignment_test_table; EXPLAIN SELECT count(*) FROM task_assignment_test_table;
-- Finally test the round-robin task assignment policy
SET citus.task_assignment_policy TO 'round-robin';
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
RESET citus.task_assignment_policy;
RESET client_min_messages;
COMMIT; COMMIT;
CREATE TABLE task_assignment_reference_table (test_id integer);
SELECT create_reference_table('task_assignment_reference_table');
BEGIN; BEGIN;
SET LOCAL client_min_messages TO DEBUG3; SET LOCAL client_min_messages TO DEBUG3;
SET LOCAL citus.explain_distributed_queries TO off; SET LOCAL citus.explain_distributed_queries TO off;
-- Check how task_assignment_policy impact planning decisions for reference tables -- Check how task_assignment_policy impact planning decisions for reference tables
CREATE TABLE task_assignment_reference_table (test_id integer);
SELECT create_reference_table('task_assignment_reference_table');
SET LOCAL citus.task_assignment_policy TO 'greedy'; SET LOCAL citus.task_assignment_policy TO 'greedy';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
@ -113,15 +123,100 @@ SET LOCAL citus.task_assignment_policy TO 'first-replica';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
-- here we expect debug output showing two different hosts for subsequent queries
SET LOCAL citus.task_assignment_policy TO 'round-robin';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
ROLLBACK; ROLLBACK;
RESET client_min_messages;
-- Now, lets test round-robin policy
-- round-robin policy relies on PostgreSQL's local transactionId,
-- which might change and we don't have any control over it.
-- the important thing that we look for is that round-robin policy
-- should give the same output for executions in the same transaction
-- and different output for executions that are not insdie the
-- same transaction. To ensure that, we define a helper function
BEGIN;
SET LOCAL citus.explain_distributed_queries TO on;
CREATE TEMPORARY TABLE explain_outputs (value text);
SET LOCAL citus.task_assignment_policy TO 'round-robin';
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
-- given that we're in the same transaction, the count should be 1
SELECT count(DISTINCT value) FROM explain_outputs;
DROP TABLE explain_outputs;
COMMIT;
-- now test round-robin policy outside
-- a transaction, we should see the assignements
-- change on every execution
CREATE TEMPORARY TABLE explain_outputs (value text);
SET citus.task_assignment_policy TO 'round-robin';
SET citus.explain_distributed_queries TO ON;
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
-- given that we're in the same transaction, the count should be 2
-- since there are two different worker nodes
SELECT count(DISTINCT value) FROM explain_outputs;
TRUNCATE explain_outputs;
-- same test with a distributed table
-- we keep this test because as of this commit, the code
-- paths for reference tables and distributed tables are
-- not the same
SET citus.shard_replication_factor TO 2;
CREATE TABLE task_assignment_replicated_hash (test_id integer);
SELECT create_distributed_table('task_assignment_replicated_hash', 'test_id');
BEGIN;
SET LOCAL citus.explain_distributed_queries TO on;
SET LOCAL citus.task_assignment_policy TO 'round-robin';
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
-- given that we're in the same transaction, the count should be 1
SELECT count(DISTINCT value) FROM explain_outputs;
DROP TABLE explain_outputs;
COMMIT;
-- now test round-robin policy outside
-- a transaction, we should see the assignements
-- change on every execution
CREATE TEMPORARY TABLE explain_outputs (value text);
SET citus.task_assignment_policy TO 'round-robin';
SET citus.explain_distributed_queries TO ON;
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
-- given that we're in the same transaction, the count should be 2
-- since there are two different worker nodes
SELECT count(DISTINCT value) FROM explain_outputs;
RESET citus.task_assignment_policy;
RESET client_min_messages;
-- we should be able to use round-robin with router queries that -- we should be able to use round-robin with router queries that
-- only contains intermediate results -- only contains intermediate results
BEGIN; BEGIN;
@ -133,4 +228,4 @@ SET LOCAL citus.task_assignment_policy TO 'round-robin';
WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1;
ROLLBACK; ROLLBACK;
DROP TABLE task_assignment_replicated_hash, task_assignment_reference_table;