mirror of https://github.com/citusdata/citus.git
340 lines
14 KiB
Plaintext
340 lines
14 KiB
Plaintext
--
|
|
-- MULTI_TASK_ASSIGNMENT
|
|
--
|
|
SET citus.next_shard_id TO 880000;
|
|
-- the function simply parses the results and returns 'shardId@worker'
|
|
-- for all the explain task outputs
|
|
CREATE OR REPLACE FUNCTION parse_explain_output(in qry text, in table_name text, out r text)
|
|
RETURNS SETOF TEXT AS $$
|
|
DECLARE
|
|
portOfTheTask text;
|
|
shardOfTheTask text;
|
|
begin
|
|
for r in execute qry loop
|
|
IF r LIKE '%port%' THEN
|
|
portOfTheTask = substring(r, '([0-9]{1,10})');
|
|
END IF;
|
|
|
|
IF r LIKE '%' || table_name || '%' THEN
|
|
shardOfTheTask = substring(r, '([0-9]{5,10})');
|
|
END IF;
|
|
|
|
end loop;
|
|
return QUERY SELECT shardOfTheTask || '@' || portOfTheTask;
|
|
end; $$ language plpgsql;
|
|
SET citus.explain_distributed_queries TO off;
|
|
-- Check that our policies for assigning tasks to worker nodes run as expected.
|
|
-- To test this, we first create a shell table, and then manually insert shard
|
|
-- and shard placement data into system catalogs. We next run Explain command,
|
|
-- and check that tasks are assigned to worker nodes as expected.
|
|
CREATE TABLE task_assignment_test_table (test_id integer);
|
|
SELECT create_distributed_table('task_assignment_test_table', 'test_id', 'append');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Create logical shards with shardids 200, 201, and 202
|
|
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue)
|
|
SELECT pg_class.oid, series.index, 'r', 1, 1000
|
|
FROM pg_class, generate_series(200, 202) AS series(index)
|
|
WHERE pg_class.relname = 'task_assignment_test_table';
|
|
-- Create shard placements for shard xxxxx and 201
|
|
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
|
|
SELECT 200, 1, 1, nodename, nodeport
|
|
FROM pg_dist_shard_placement
|
|
GROUP BY nodename, nodeport
|
|
ORDER BY nodename, nodeport ASC
|
|
LIMIT 2;
|
|
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
|
|
SELECT 201, 1, 1, nodename, nodeport
|
|
FROM pg_dist_shard_placement
|
|
GROUP BY nodename, nodeport
|
|
ORDER BY nodename, nodeport ASC
|
|
LIMIT 2;
|
|
-- Create shard placements for shard xxxxx
|
|
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
|
|
SELECT 202, 1, 1, nodename, nodeport
|
|
FROM pg_dist_shard_placement
|
|
GROUP BY nodename, nodeport
|
|
ORDER BY nodename, nodeport DESC
|
|
LIMIT 2;
|
|
-- Start transaction block to avoid auto commits. This avoids additional debug
|
|
-- messages from getting printed at real transaction starts and commits.
|
|
BEGIN;
|
|
-- Increase log level to see which worker nodes tasks are assigned to. Note that
|
|
-- the following log messages print node name and port numbers; and node numbers
|
|
-- in regression tests depend upon PG_VERSION_NUM.
|
|
SET client_min_messages TO DEBUG3;
|
|
-- First test the default greedy task assignment policy
|
|
SET citus.task_assignment_policy TO 'greedy';
|
|
EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table;
|
|
DEBUG: Router planner does not support append-partitioned tables.
|
|
DEBUG: no valid constraints found
|
|
DEBUG: shard count: 3
|
|
DEBUG: assigned task to node localhost:xxxxx
|
|
DEBUG: assigned task to node localhost:xxxxx
|
|
DEBUG: assigned task to node localhost:xxxxx
|
|
QUERY PLAN
|
|
---------------------------------------------------------------------
|
|
Aggregate
|
|
-> Custom Scan (Citus Adaptive)
|
|
explain statements for distributed queries are not enabled
|
|
(3 rows)
|
|
|
|
EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table;
|
|
DEBUG: Router planner does not support append-partitioned tables.
|
|
DEBUG: no valid constraints found
|
|
DEBUG: shard count: 3
|
|
DEBUG: assigned task to node localhost:xxxxx
|
|
DEBUG: assigned task to node localhost:xxxxx
|
|
DEBUG: assigned task to node localhost:xxxxx
|
|
QUERY PLAN
|
|
---------------------------------------------------------------------
|
|
Aggregate
|
|
-> Custom Scan (Citus Adaptive)
|
|
explain statements for distributed queries are not enabled
|
|
(3 rows)
|
|
|
|
-- Next test the first-replica task assignment policy
|
|
SET citus.task_assignment_policy TO 'first-replica';
|
|
EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table;
|
|
DEBUG: Router planner does not support append-partitioned tables.
|
|
DEBUG: no valid constraints found
|
|
DEBUG: shard count: 3
|
|
DEBUG: assigned task to node localhost:xxxxx
|
|
DEBUG: assigned task to node localhost:xxxxx
|
|
DEBUG: assigned task to node localhost:xxxxx
|
|
QUERY PLAN
|
|
---------------------------------------------------------------------
|
|
Aggregate
|
|
-> Custom Scan (Citus Adaptive)
|
|
explain statements for distributed queries are not enabled
|
|
(3 rows)
|
|
|
|
EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table;
|
|
DEBUG: Router planner does not support append-partitioned tables.
|
|
DEBUG: no valid constraints found
|
|
DEBUG: shard count: 3
|
|
DEBUG: assigned task to node localhost:xxxxx
|
|
DEBUG: assigned task to node localhost:xxxxx
|
|
DEBUG: assigned task to node localhost:xxxxx
|
|
QUERY PLAN
|
|
---------------------------------------------------------------------
|
|
Aggregate
|
|
-> Custom Scan (Citus Adaptive)
|
|
explain statements for distributed queries are not enabled
|
|
(3 rows)
|
|
|
|
COMMIT;
|
|
CREATE TABLE task_assignment_reference_table (test_id integer);
|
|
SELECT create_reference_table('task_assignment_reference_table');
|
|
create_reference_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
BEGIN;
|
|
SET LOCAL client_min_messages TO DEBUG3;
|
|
SET LOCAL citus.explain_distributed_queries TO off;
|
|
-- Check how task_assignment_policy impact planning decisions for reference tables
|
|
SET LOCAL citus.task_assignment_policy TO 'greedy';
|
|
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
|
|
DEBUG: Distributed planning for a fast-path router query
|
|
DEBUG: Creating router plan
|
|
DEBUG: Plan is router executable
|
|
QUERY PLAN
|
|
---------------------------------------------------------------------
|
|
Custom Scan (Citus Adaptive)
|
|
explain statements for distributed queries are not enabled
|
|
(2 rows)
|
|
|
|
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
|
|
DEBUG: Distributed planning for a fast-path router query
|
|
DEBUG: Creating router plan
|
|
DEBUG: Plan is router executable
|
|
QUERY PLAN
|
|
---------------------------------------------------------------------
|
|
Custom Scan (Citus Adaptive)
|
|
explain statements for distributed queries are not enabled
|
|
(2 rows)
|
|
|
|
SET LOCAL citus.task_assignment_policy TO 'first-replica';
|
|
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
|
|
DEBUG: Distributed planning for a fast-path router query
|
|
DEBUG: Creating router plan
|
|
DEBUG: Plan is router executable
|
|
QUERY PLAN
|
|
---------------------------------------------------------------------
|
|
Custom Scan (Citus Adaptive)
|
|
explain statements for distributed queries are not enabled
|
|
(2 rows)
|
|
|
|
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
|
|
DEBUG: Distributed planning for a fast-path router query
|
|
DEBUG: Creating router plan
|
|
DEBUG: Plan is router executable
|
|
QUERY PLAN
|
|
---------------------------------------------------------------------
|
|
Custom Scan (Citus Adaptive)
|
|
explain statements for distributed queries are not enabled
|
|
(2 rows)
|
|
|
|
ROLLBACK;
|
|
RESET client_min_messages;
|
|
-- Now, lets test round-robin policy
|
|
-- round-robin policy relies on PostgreSQL's local transactionId,
|
|
-- which might change and we don't have any control over it.
|
|
-- the important thing that we look for is that round-robin policy
|
|
-- should give the same output for executions in the same transaction
|
|
-- and different output for executions that are not inside the
|
|
-- same transaction. To ensure that, we define a helper function
|
|
BEGIN;
|
|
SET LOCAL citus.explain_distributed_queries TO on;
|
|
CREATE TEMPORARY TABLE explain_outputs (value text);
|
|
SET LOCAL citus.task_assignment_policy TO 'round-robin';
|
|
INSERT INTO explain_outputs
|
|
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
|
|
INSERT INTO explain_outputs
|
|
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
|
|
-- given that we're in the same transaction, the count should be 1
|
|
SELECT count(DISTINCT value) FROM explain_outputs;
|
|
count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
TRUNCATE explain_outputs;
|
|
COMMIT;
|
|
-- now test round-robin policy outside
|
|
-- a transaction, we should see the assignments
|
|
-- change on every execution
|
|
SET citus.task_assignment_policy TO 'round-robin';
|
|
SET citus.explain_distributed_queries TO ON;
|
|
INSERT INTO explain_outputs
|
|
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
|
|
INSERT INTO explain_outputs
|
|
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_reference_table;', 'task_assignment_reference_table');
|
|
-- given that we're in the same transaction, the count should be 2
|
|
-- since there are two different worker nodes
|
|
SELECT count(DISTINCT value) FROM explain_outputs;
|
|
count
|
|
---------------------------------------------------------------------
|
|
2
|
|
(1 row)
|
|
|
|
TRUNCATE explain_outputs;
|
|
-- same test with a distributed table
|
|
-- we keep this test because as of this commit, the code
|
|
-- paths for reference tables and distributed tables are
|
|
-- not the same
|
|
SET citus.shard_replication_factor TO 2;
|
|
CREATE TABLE task_assignment_replicated_hash (test_id integer);
|
|
SELECT create_distributed_table('task_assignment_replicated_hash', 'test_id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
BEGIN;
|
|
SET LOCAL citus.explain_distributed_queries TO on;
|
|
SET LOCAL citus.task_assignment_policy TO 'round-robin';
|
|
INSERT INTO explain_outputs
|
|
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
|
|
INSERT INTO explain_outputs
|
|
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
|
|
-- given that we're in the same transaction, the count should be 1
|
|
SELECT count(DISTINCT value) FROM explain_outputs;
|
|
count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
TRUNCATE explain_outputs;
|
|
COMMIT;
|
|
-- now test round-robin policy outside
|
|
-- a transaction, we should see the assignments
|
|
-- change on every execution
|
|
SET citus.task_assignment_policy TO 'round-robin';
|
|
SET citus.explain_distributed_queries TO ON;
|
|
INSERT INTO explain_outputs
|
|
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
|
|
INSERT INTO explain_outputs
|
|
SELECT parse_explain_output('EXPLAIN SELECT count(*) FROM task_assignment_replicated_hash;', 'task_assignment_replicated_hash');
|
|
-- given that we're in the same transaction, the count should be 2
|
|
-- since there are two different worker nodes
|
|
SELECT count(DISTINCT value) FROM explain_outputs;
|
|
count
|
|
---------------------------------------------------------------------
|
|
2
|
|
(1 row)
|
|
|
|
TRUNCATE explain_outputs;
|
|
-- test that the round robin policy detects the anchor shard correctly
|
|
-- we should not pick a reference table shard as the anchor shard when joining with a distributed table
|
|
SET citus.shard_replication_factor TO 1;
|
|
CREATE TABLE task_assignment_nonreplicated_hash (test_id integer, ref_id integer);
|
|
SELECT create_distributed_table('task_assignment_nonreplicated_hash', 'test_id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- run the query two times to make sure that it hits the correct worker every time
|
|
INSERT INTO explain_outputs
|
|
SELECT parse_explain_output($cmd$
|
|
EXPLAIN SELECT *
|
|
FROM (SELECT * FROM task_assignment_nonreplicated_hash WHERE test_id = 3) AS dist
|
|
LEFT JOIN task_assignment_reference_table ref
|
|
ON dist.ref_id = ref.test_id
|
|
$cmd$, 'task_assignment_nonreplicated_hash');
|
|
INSERT INTO explain_outputs
|
|
SELECT parse_explain_output($cmd$
|
|
EXPLAIN SELECT *
|
|
FROM (SELECT * FROM task_assignment_nonreplicated_hash WHERE test_id = 3) AS dist
|
|
LEFT JOIN task_assignment_reference_table ref
|
|
ON dist.ref_id = ref.test_id
|
|
$cmd$, 'task_assignment_nonreplicated_hash');
|
|
-- The count should be 1 since the shard exists in only one worker node
|
|
SELECT count(DISTINCT value) FROM explain_outputs;
|
|
count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
TRUNCATE explain_outputs;
|
|
-- We should be able to use round-robin with router queries that
|
|
-- only contains intermediate results
|
|
CREATE TABLE task_assignment_test_table_2 (test_id integer);
|
|
SELECT create_distributed_table('task_assignment_test_table_2', 'test_id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SET citus.task_assignment_policy TO 'round-robin';
|
|
-- Run the query two times to make sure that it hits two different workers
|
|
-- on consecutive runs
|
|
-- prevent PG 11 - PG 12 outputs to diverge
|
|
SET citus.enable_cte_inlining TO false;
|
|
INSERT INTO explain_outputs
|
|
SELECT parse_explain_output($cmd$
|
|
EXPLAIN WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1
|
|
$cmd$, 'task_assignment_test_table_2');
|
|
INSERT INTO explain_outputs
|
|
SELECT parse_explain_output($cmd$
|
|
EXPLAIN WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1
|
|
$cmd$, 'task_assignment_test_table_2');
|
|
-- The count should be 2 since the intermediate results are processed on
|
|
-- different workers
|
|
SELECT count(DISTINCT value) FROM explain_outputs;
|
|
count
|
|
---------------------------------------------------------------------
|
|
2
|
|
(1 row)
|
|
|
|
RESET citus.task_assignment_policy;
|
|
RESET client_min_messages;
|
|
DROP TABLE task_assignment_replicated_hash, task_assignment_nonreplicated_hash,
|
|
task_assignment_reference_table, task_assignment_test_table_2, explain_outputs;
|