Merge pull request #2610 from citusdata/improve_round_robin

Add transactionId based round robin policy
pull/2618/head
Önder Kalacı 2019-02-25 13:12:24 +01:00 committed by GitHub
commit 25b5fc9d14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 263 additions and 91 deletions

View File

@ -29,6 +29,7 @@
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "commands/sequence.h"
#include "distributed/backend_data.h"
#include "distributed/listutils.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h"
@ -5076,14 +5077,24 @@ RoundRobinAssignTaskList(List *taskList)
/*
* RoundRobinReorder implements the core of the round-robin assignment policy.
* It takes a task and placement list and rotates a copy of the placement list
* based on the task's jobId. The rotated copy is returned.
* based on the latest stable transaction id provided by PostgreSQL.
*
* We prefer to use transactionId as the seed for the rotation to use the replicas
* in the same worker node within the same transaction. This becomes more important
* when we're reading from (the same or multiple) reference tables within a
* transaction. With this approach, we can prevent reads to expand the worker nodes
* that participate in a distributed transaction.
*
* Note that we prefer PostgreSQL's transactionId over distributed transactionId that
* Citus generates since the distributed transactionId is generated during the execution
* where as task-assignment happens duing the planning.
*/
static List *
RoundRobinReorder(Task *task, List *placementList)
{
uint64 jobId = task->jobId;
TransactionId transactionId = GetMyProcLocalTransactionId();
uint32 activePlacementCount = list_length(placementList);
uint32 roundRobinIndex = (jobId % activePlacementCount);
uint32 roundRobinIndex = (transactionId % activePlacementCount);
placementList = LeftRotateList(placementList, roundRobinIndex);

View File

@ -993,3 +993,14 @@ ActiveDistributedTransactionNumbers(void)
return activeTransactionNumberList;
}
/*
* GetMyProcLocalTransactionId() is a wrapper for
* getting lxid of MyProc.
*/
LocalTransactionId
GetMyProcLocalTransactionId(void)
{
return MyProc->lxid;
}

View File

@ -67,5 +67,6 @@ extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
extern void CancelTransactionDueToDeadlock(PGPROC *proc);
extern bool MyBackendGotCancelledDueToDeadlock(void);
extern List * ActiveDistributedTransactionNumbers(void);
LocalTransactionId GetMyProcLocalTransactionId(void);
#endif /* BACKEND_DATA_H */

View File

@ -351,6 +351,7 @@ extern List * TaskListDifference(const List *list1, const List *list2);
extern List * AssignAnchorShardTaskList(List *taskList);
extern List * FirstReplicaAssignTaskList(List *taskList);
extern List * RoundRobinAssignTaskList(List *taskList);
extern List * RoundRobinPerTransactionAssignTaskList(List *taskList);
extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement);
/* function declaration for creating Task */

View File

@ -10,6 +10,27 @@ SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
t
(1 row)
-- the function simply parses the results and returns 'shardId@worker'
-- for all the explain task outputs
CREATE OR REPLACE FUNCTION parse_explain_output(in qry text, in table_name text, out r text)
RETURNS SETOF TEXT AS $$
DECLARE
portOfTheTask text;
shardOfTheTask text;
begin
for r in execute qry loop
IF r LIKE '%port%' THEN
portOfTheTask = substring(r, '([0-9]{1,10})');
END IF;
IF r LIKE '%' || table_name || '%' THEN
shardOfTheTask = substring(r, '([0-9]{1,10})');
return QUERY SELECT shardOfTheTask || '@' || portOfTheTask ;
END IF;
end loop;
return;
end; $$ language plpgsql;
SET citus.explain_distributed_queries TO off;
-- Check that our policies for assigning tasks to worker nodes run as expected.
-- To test this, we first create a shell table, and then manually insert shard
@ -102,48 +123,7 @@ DEBUG: assigned task 1 to node localhost:57638
explain statements for distributed queries are not enabled
(3 rows)
-- Finally test the round-robin task assignment policy
SET citus.task_assignment_policy TO 'round-robin';
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 1 to node localhost:57637
QUERY PLAN
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
DEBUG: assigned task 3 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57637
DEBUG: assigned task 1 to node localhost:57638
QUERY PLAN
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 1 to node localhost:57637
QUERY PLAN
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
RESET citus.task_assignment_policy;
RESET client_min_messages;
COMMIT;
BEGIN;
SET LOCAL client_min_messages TO DEBUG3;
SET LOCAL citus.explain_distributed_queries TO off;
-- Check how task_assignment_policy impact planning decisions for reference tables
CREATE TABLE task_assignment_reference_table (test_id integer);
SELECT create_reference_table('task_assignment_reference_table');
create_reference_table
@ -151,6 +131,10 @@ SELECT create_reference_table('task_assignment_reference_table');
(1 row)
BEGIN;
SET LOCAL client_min_messages TO DEBUG3;
SET LOCAL citus.explain_distributed_queries TO off;
-- Check how task_assignment_policy impact planning decisions for reference tables
SET LOCAL citus.task_assignment_policy TO 'greedy';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: Distributed planning for a fast-path router query
@ -193,31 +177,99 @@ DEBUG: Plan is router executable
explain statements for distributed queries are not enabled
(2 rows)
-- here we expect debug output showing two different hosts for subsequent queries
SET LOCAL citus.task_assignment_policy TO 'round-robin';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: Distributed planning for a fast-path router query
DEBUG: assigned task 0 to node localhost:57637
DEBUG: Creating router plan
DEBUG: Plan is router executable
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: Distributed planning for a fast-path router query
DEBUG: assigned task 0 to node localhost:57638
DEBUG: Creating router plan
DEBUG: Plan is router executable
QUERY PLAN
--------------------------------------------------------------
Custom Scan (Citus Router)
explain statements for distributed queries are not enabled
(2 rows)
ROLLBACK;
RESET client_min_messages;
-- Now, lets test round-robin policy
-- round-robin policy relies on PostgreSQL's local transactionId,
-- which might change and we don't have any control over it.
-- the important thing that we look for is that round-robin policy
-- should give the same output for executions in the same transaction
-- and different output for executions that are not insdie the
-- same transaction. To ensure that, we define a helper function
BEGIN;
SET LOCAL citus.explain_distributed_queries TO on;
CREATE TEMPORARY TABLE explain_outputs (value text);
SET LOCAL citus.task_assignment_policy TO 'round-robin';
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
-- given that we're in the same transaction, the count should be 1
SELECT count(DISTINCT value) FROM explain_outputs;
count
-------
1
(1 row)
DROP TABLE explain_outputs;
COMMIT;
-- now test round-robin policy outside
-- a transaction, we should see the assignements
-- change on every execution
CREATE TEMPORARY TABLE explain_outputs (value text);
SET citus.task_assignment_policy TO 'round-robin';
SET citus.explain_distributed_queries TO ON;
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
-- given that we're in the same transaction, the count should be 2
-- since there are two different worker nodes
SELECT count(DISTINCT value) FROM explain_outputs;
count
-------
2
(1 row)
TRUNCATE explain_outputs;
-- same test with a distributed table
-- we keep this test because as of this commit, the code
-- paths for reference tables and distributed tables are
-- not the same
SET citus.shard_replication_factor TO 2;
CREATE TABLE task_assignment_replicated_hash (test_id integer);
SELECT create_distributed_table('task_assignment_replicated_hash', 'test_id');
create_distributed_table
--------------------------
(1 row)
BEGIN;
SET LOCAL citus.explain_distributed_queries TO on;
SET LOCAL citus.task_assignment_policy TO 'round-robin';
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
-- given that we're in the same transaction, the count should be 1
SELECT count(DISTINCT value) FROM explain_outputs;
count
-------
1
(1 row)
DROP TABLE explain_outputs;
COMMIT;
-- now test round-robin policy outside
-- a transaction, we should see the assignements
-- change on every execution
CREATE TEMPORARY TABLE explain_outputs (value text);
SET citus.task_assignment_policy TO 'round-robin';
SET citus.explain_distributed_queries TO ON;
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
-- given that we're in the same transaction, the count should be 2
-- since there are two different worker nodes
SELECT count(DISTINCT value) FROM explain_outputs;
count
-------
2
(1 row)
RESET citus.task_assignment_policy;
RESET client_min_messages;
-- we should be able to use round-robin with router queries that
-- only contains intermediate results
BEGIN;
@ -240,3 +292,4 @@ WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1;
(0 rows)
ROLLBACK;
DROP TABLE task_assignment_replicated_hash, task_assignment_reference_table;

View File

@ -9,6 +9,28 @@ SET citus.next_shard_id TO 880000;
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
-- the function simply parses the results and returns 'shardId@worker'
-- for all the explain task outputs
CREATE OR REPLACE FUNCTION parse_explain_output(in qry text, in table_name text, out r text)
RETURNS SETOF TEXT AS $$
DECLARE
portOfTheTask text;
shardOfTheTask text;
begin
for r in execute qry loop
IF r LIKE '%port%' THEN
portOfTheTask = substring(r, '([0-9]{1,10})');
END IF;
IF r LIKE '%' || table_name || '%' THEN
shardOfTheTask = substring(r, '([0-9]{1,10})');
return QUERY SELECT shardOfTheTask || '@' || portOfTheTask ;
END IF;
end loop;
return;
end; $$ language plpgsql;
SET citus.explain_distributed_queries TO off;
@ -80,31 +102,19 @@ EXPLAIN SELECT count(*) FROM task_assignment_test_table;
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
-- Finally test the round-robin task assignment policy
SET citus.task_assignment_policy TO 'round-robin';
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
RESET citus.task_assignment_policy;
RESET client_min_messages;
COMMIT;
CREATE TABLE task_assignment_reference_table (test_id integer);
SELECT create_reference_table('task_assignment_reference_table');
BEGIN;
SET LOCAL client_min_messages TO DEBUG3;
SET LOCAL citus.explain_distributed_queries TO off;
-- Check how task_assignment_policy impact planning decisions for reference tables
CREATE TABLE task_assignment_reference_table (test_id integer);
SELECT create_reference_table('task_assignment_reference_table');
SET LOCAL citus.task_assignment_policy TO 'greedy';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
@ -113,15 +123,100 @@ SET LOCAL citus.task_assignment_policy TO 'first-replica';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
-- here we expect debug output showing two different hosts for subsequent queries
SET LOCAL citus.task_assignment_policy TO 'round-robin';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
ROLLBACK;
RESET client_min_messages;
-- Now, lets test round-robin policy
-- round-robin policy relies on PostgreSQL's local transactionId,
-- which might change and we don't have any control over it.
-- the important thing that we look for is that round-robin policy
-- should give the same output for executions in the same transaction
-- and different output for executions that are not insdie the
-- same transaction. To ensure that, we define a helper function
BEGIN;
SET LOCAL citus.explain_distributed_queries TO on;
CREATE TEMPORARY TABLE explain_outputs (value text);
SET LOCAL citus.task_assignment_policy TO 'round-robin';
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
-- given that we're in the same transaction, the count should be 1
SELECT count(DISTINCT value) FROM explain_outputs;
DROP TABLE explain_outputs;
COMMIT;
-- now test round-robin policy outside
-- a transaction, we should see the assignements
-- change on every execution
CREATE TEMPORARY TABLE explain_outputs (value text);
SET citus.task_assignment_policy TO 'round-robin';
SET citus.explain_distributed_queries TO ON;
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
-- given that we're in the same transaction, the count should be 2
-- since there are two different worker nodes
SELECT count(DISTINCT value) FROM explain_outputs;
TRUNCATE explain_outputs;
-- same test with a distributed table
-- we keep this test because as of this commit, the code
-- paths for reference tables and distributed tables are
-- not the same
SET citus.shard_replication_factor TO 2;
CREATE TABLE task_assignment_replicated_hash (test_id integer);
SELECT create_distributed_table('task_assignment_replicated_hash', 'test_id');
BEGIN;
SET LOCAL citus.explain_distributed_queries TO on;
SET LOCAL citus.task_assignment_policy TO 'round-robin';
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
-- given that we're in the same transaction, the count should be 1
SELECT count(DISTINCT value) FROM explain_outputs;
DROP TABLE explain_outputs;
COMMIT;
-- now test round-robin policy outside
-- a transaction, we should see the assignements
-- change on every execution
CREATE TEMPORARY TABLE explain_outputs (value text);
SET citus.task_assignment_policy TO 'round-robin';
SET citus.explain_distributed_queries TO ON;
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
INSERT INTO explain_outputs
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
-- given that we're in the same transaction, the count should be 2
-- since there are two different worker nodes
SELECT count(DISTINCT value) FROM explain_outputs;
RESET citus.task_assignment_policy;
RESET client_min_messages;
-- we should be able to use round-robin with router queries that
-- only contains intermediate results
BEGIN;
@ -133,4 +228,4 @@ SET LOCAL citus.task_assignment_policy TO 'round-robin';
WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1;
ROLLBACK;
DROP TABLE task_assignment_replicated_hash, task_assignment_reference_table;