-- -- BACKGROUND_REBALANCE_PARALLEL_REFERENCE_TABLES -- -- Test to check if the background tasks scheduled for moving reference tables -- shards in parallel by the background rebalancer have the correct dependencies -- CREATE SCHEMA background_rebalance_parallel; SET search_path TO background_rebalance_parallel; SET citus.next_shard_id TO 85674000; SET citus.shard_replication_factor TO 1; SET client_min_messages TO ERROR; ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050; SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50; SELECT 1 FROM master_remove_node('localhost', :worker_1_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT 1 FROM master_remove_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT 1 FROM master_add_node('localhost', :worker_1_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT 1 FROM master_add_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; ALTER SYSTEM SET citus.max_background_task_executors_per_node = 5; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) -- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group CREATE TABLE table1_colg1 (a int PRIMARY KEY); SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE table2_colg1 (b int PRIMARY KEY); SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); create_distributed_table --------------------------------------------------------------------- (1 row) -- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group CREATE TABLE table1_colg2 (a int PRIMARY KEY); SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE table2_colg2 (b int primary key); SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); create_distributed_table --------------------------------------------------------------------- (1 row) -- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group CREATE TABLE table1_colg3 (a int PRIMARY KEY); SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE table2_colg3 (b int primary key); SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); create_distributed_table --------------------------------------------------------------------- (1 row) -- Create reference tables with primary-foreign key relationships CREATE TABLE customers ( id SERIAL PRIMARY KEY, name TEXT NOT NULL, email TEXT UNIQUE NOT NULL ); CREATE TABLE orders ( id SERIAL PRIMARY KEY, customer_id INTEGER NOT NULL REFERENCES customers(id), order_date DATE NOT NULL DEFAULT CURRENT_DATE ); CREATE TABLE order_items ( id SERIAL PRIMARY KEY, order_id INTEGER NOT NULL REFERENCES orders(id), product_name TEXT NOT NULL, quantity INTEGER NOT NULL, price NUMERIC(10, 2) NOT NULL ); SELECT create_reference_table('customers'); create_reference_table --------------------------------------------------------------------- (1 row) SELECT create_reference_table('orders'); create_reference_table --------------------------------------------------------------------- (1 row) SELECT create_reference_table('order_items'); create_reference_table --------------------------------------------------------------------- (1 row) -- INSERT SOME DATA -- Insert 10 customers INSERT INTO customers (name, email) SELECT 'Customer ' || i, 'customer' || i || '@example.com' FROM generate_series(1, 10) AS i; -- Insert 30 orders: each customer gets 3 orders INSERT INTO orders (customer_id, order_date) SELECT (i % 10) + 1, -- customer_id between 1 and 10 CURRENT_DATE - (i % 7) FROM generate_series(1, 30) AS i; -- Insert 90 order_items: each order has 3 items INSERT INTO order_items (order_id, product_name, quantity, price) SELECT (i % 30) + 1, -- order_id between 1 and 30 'Product ' || (i % 5 + 1), (i % 10) + 1, round((random() * 100 + 10)::numeric, 2) FROM generate_series(1, 90) AS i; SELECT c.id AS customer_id, c.name AS customer_name, c.email AS customer_email, COUNT(oi.id) AS total_order_items FROM customers c JOIN orders o ON c.id = o.customer_id JOIN order_items oi ON o.id = oi.order_id GROUP BY c.id, c.name, c.email ORDER BY c.id; customer_id | customer_name | customer_email | total_order_items --------------------------------------------------------------------- 1 | Customer 1 | customer1@example.com | 9 2 | Customer 2 | customer2@example.com | 9 3 | Customer 3 | customer3@example.com | 9 4 | Customer 4 | customer4@example.com | 9 5 | Customer 5 | customer5@example.com | 9 6 | Customer 6 | customer6@example.com | 9 7 | Customer 7 | customer7@example.com | 9 8 | Customer 8 | customer8@example.com | 9 9 | Customer 9 | customer9@example.com | 9 10 | Customer 10 | customer10@example.com | 9 (10 rows) -- Add two new nodes so that we can rebalance SELECT 1 FROM citus_add_node('localhost', :worker_3_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT 1 FROM citus_add_node('localhost', :worker_4_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid; table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- table1_colg1 | 85674000 | 0 | localhost | 57637 | localhost | 57640 table1_colg1 | 85674001 | 0 | localhost | 57638 | localhost | 57639 table2_colg1 | 85674004 | 0 | localhost | 57637 | localhost | 57640 table2_colg1 | 85674005 | 0 | localhost | 57638 | localhost | 57639 table1_colg2 | 85674008 | 0 | localhost | 57637 | localhost | 57640 table1_colg2 | 85674009 | 0 | localhost | 57638 | localhost | 57639 table2_colg2 | 85674012 | 0 | localhost | 57637 | localhost | 57640 table2_colg2 | 85674013 | 0 | localhost | 57638 | localhost | 57639 table1_colg3 | 85674016 | 0 | localhost | 57637 | localhost | 57640 table1_colg3 | 85674017 | 0 | localhost | 57638 | localhost | 57639 table2_colg3 | 85674020 | 0 | localhost | 57637 | localhost | 57640 table2_colg3 | 85674021 | 0 | localhost | 57638 | localhost | 57639 (12 rows) SET client_min_messages TO DEBUG1; SELECT citus_rebalance_start AS job_id from citus_rebalance_start( shard_transfer_mode := 'force_logical', parallel_transfer_colocated_shards := true, parallel_transfer_reference_tables := true) \gset DEBUG: skipping child tables for relation named: table1_colg1 DEBUG: skipping child tables for relation named: table1_colg1 DEBUG: skipping child tables for relation named: table1_colg1 DEBUG: skipping child tables for relation named: table1_colg1 DEBUG: skipping child tables for relation named: table1_colg2 DEBUG: skipping child tables for relation named: table1_colg2 DEBUG: skipping child tables for relation named: table1_colg2 DEBUG: skipping child tables for relation named: table1_colg2 DEBUG: skipping child tables for relation named: table1_colg3 DEBUG: skipping child tables for relation named: table1_colg3 DEBUG: skipping child tables for relation named: table1_colg3 DEBUG: skipping child tables for relation named: table1_colg3 NOTICE: Scheduled 6 moves as job xxx DETAIL: Rebalance scheduled as background job HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); SET client_min_messages TO ERROR; SELECT citus_rebalance_wait(); citus_rebalance_wait --------------------------------------------------------------------- (1 row) SELECT citus_rebalance_wait(); citus_rebalance_wait --------------------------------------------------------------------- (1 row) -- see the dependencies of the tasks scheduled by the background rebalancer SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on; job_id | task_id | depends_on --------------------------------------------------------------------- 17777 | 1001 | 1000 17777 | 1002 | 1000 17777 | 1002 | 1001 17777 | 1003 | 1000 17777 | 1003 | 1001 17777 | 1003 | 1002 17777 | 1005 | 1004 17777 | 1006 | 1004 17777 | 1006 | 1005 17777 | 1007 | 1004 17777 | 1007 | 1005 17777 | 1007 | 1006 17777 | 1008 | 1003 17777 | 1008 | 1007 17777 | 1009 | 1003 17777 | 1009 | 1007 17777 | 1010 | 1003 17777 | 1010 | 1007 17777 | 1011 | 1003 17777 | 1011 | 1007 17777 | 1012 | 1003 17777 | 1012 | 1007 17777 | 1013 | 1003 17777 | 1013 | 1007 (24 rows) -- Temporary hack to eliminate SET application name from command until we get the -- background job enhancement done. SELECT D.task_id, (SELECT CASE WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') ELSE T.command END FROM pg_dist_background_task T WHERE T.task_id = D.task_id), D.depends_on, (SELECT CASE WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') ELSE T.command END FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; task_id | command | depends_on | command --------------------------------------------------------------------- 1001 | 85674025 | 1000 | 85674024 1002 | 85674026 | 1000 | 85674024 1002 | 85674026 | 1001 | 85674025 1003 | 85674026 | 1000 | 85674024 1003 | 85674026 | 1001 | 85674025 1003 | 85674026 | 1002 | 85674026 1005 | 85674025 | 1004 | 85674024 1006 | 85674026 | 1004 | 85674024 1006 | 85674026 | 1005 | 85674025 1007 | 85674026 | 1004 | 85674024 1007 | 85674026 | 1005 | 85674025 1007 | 85674026 | 1006 | 85674026 1008 | 85674001 | 1003 | 85674026 1008 | 85674001 | 1007 | 85674026 1009 | 85674000 | 1003 | 85674026 1009 | 85674000 | 1007 | 85674026 1010 | 85674009 | 1003 | 85674026 1010 | 85674009 | 1007 | 85674026 1011 | 85674008 | 1003 | 85674026 1011 | 85674008 | 1007 | 85674026 1012 | 85674017 | 1003 | 85674026 1012 | 85674017 | 1007 | 85674026 1013 | 85674016 | 1003 | 85674026 1013 | 85674016 | 1007 | 85674026 (24 rows) TRUNCATE pg_dist_background_job CASCADE; TRUNCATE pg_dist_background_task CASCADE; TRUNCATE pg_dist_background_task_depend; -- Drain worker_3 so that we can move only one colocation group to worker_3 -- to create an unbalance that would cause parallel rebalancing. SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true); citus_set_node_property --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); -- Move all the shards of Colocation group 3 to worker_3. SELECT master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port ORDER BY shardid; master_move_shard_placement --------------------------------------------------------------------- (4 rows) CALL citus_cleanup_orphaned_resources(); SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true); citus_set_node_property --------------------------------------------------------------------- (1 row) SELECT 1 FROM citus_add_node('localhost', :worker_5_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT 1 FROM citus_add_node('localhost', :worker_6_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid; table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- table1_colg1 | 85674001 | 0 | localhost | 57637 | localhost | 57642 table1_colg1 | 85674003 | 0 | localhost | 57638 | localhost | 57641 table2_colg1 | 85674005 | 0 | localhost | 57637 | localhost | 57642 table2_colg1 | 85674007 | 0 | localhost | 57638 | localhost | 57641 table1_colg3 | 85674016 | 0 | localhost | 57639 | localhost | 57641 table1_colg3 | 85674017 | 0 | localhost | 57639 | localhost | 57642 table2_colg3 | 85674020 | 0 | localhost | 57639 | localhost | 57641 table2_colg3 | 85674021 | 0 | localhost | 57639 | localhost | 57642 (8 rows) SET client_min_messages TO DEBUG1; SELECT citus_rebalance_start AS job_id from citus_rebalance_start( shard_transfer_mode := 'block_writes', parallel_transfer_colocated_shards := true, parallel_transfer_reference_tables := true) \gset DEBUG: skipping child tables for relation named: table1_colg3 DEBUG: skipping child tables for relation named: table1_colg1 DEBUG: skipping child tables for relation named: table1_colg1 DEBUG: skipping child tables for relation named: table1_colg1 DEBUG: skipping child tables for relation named: table1_colg2 DEBUG: skipping child tables for relation named: table1_colg3 DEBUG: skipping child tables for relation named: table1_colg2 DEBUG: skipping child tables for relation named: table1_colg2 DEBUG: skipping child tables for relation named: table1_colg3 DEBUG: skipping child tables for relation named: table1_colg3 DEBUG: skipping child tables for relation named: table1_colg1 DEBUG: skipping child tables for relation named: table1_colg2 NOTICE: Scheduled 4 moves as job xxx DETAIL: Rebalance scheduled as background job HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); SET client_min_messages TO ERROR; SELECT citus_rebalance_wait(); citus_rebalance_wait --------------------------------------------------------------------- (1 row) -- see the dependencies of the tasks scheduled by the background rebalancer SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on; job_id | task_id | depends_on --------------------------------------------------------------------- 17778 | 1015 | 1014 17778 | 1016 | 1014 17778 | 1016 | 1015 17778 | 1017 | 1014 17778 | 1017 | 1015 17778 | 1017 | 1016 17778 | 1019 | 1018 17778 | 1020 | 1018 17778 | 1020 | 1019 17778 | 1021 | 1018 17778 | 1021 | 1019 17778 | 1021 | 1020 17778 | 1022 | 1017 17778 | 1022 | 1021 17778 | 1023 | 1017 17778 | 1023 | 1021 17778 | 1024 | 1017 17778 | 1024 | 1021 17778 | 1025 | 1017 17778 | 1025 | 1021 (20 rows) -- Temporary hack to eliminate SET application name from command until we get the -- background job enhancement done. SELECT D.task_id, (SELECT CASE WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') ELSE T.command END FROM pg_dist_background_task T WHERE T.task_id = D.task_id), D.depends_on, (SELECT CASE WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') ELSE T.command END FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; task_id | command | depends_on | command --------------------------------------------------------------------- 1015 | 85674025 | 1014 | 85674024 1016 | 85674026 | 1014 | 85674024 1016 | 85674026 | 1015 | 85674025 1017 | 85674026 | 1014 | 85674024 1017 | 85674026 | 1015 | 85674025 1017 | 85674026 | 1016 | 85674026 1019 | 85674025 | 1018 | 85674024 1020 | 85674026 | 1018 | 85674024 1020 | 85674026 | 1019 | 85674025 1021 | 85674026 | 1018 | 85674024 1021 | 85674026 | 1019 | 85674025 1021 | 85674026 | 1020 | 85674026 1022 | 85674016 | 1017 | 85674026 1022 | 85674016 | 1021 | 85674026 1023 | 85674017 | 1017 | 85674026 1023 | 85674017 | 1021 | 85674026 1024 | 85674003 | 1017 | 85674026 1024 | 85674003 | 1021 | 85674026 1025 | 85674001 | 1017 | 85674026 1025 | 85674001 | 1021 | 85674026 (20 rows) SELECT c.id AS customer_id, c.name AS customer_name, c.email AS customer_email, COUNT(oi.id) AS total_order_items FROM customers c JOIN orders o ON c.id = o.customer_id JOIN order_items oi ON o.id = oi.order_id GROUP BY c.id, c.name, c.email ORDER BY c.id; customer_id | customer_name | customer_email | total_order_items --------------------------------------------------------------------- 1 | Customer 1 | customer1@example.com | 9 2 | Customer 2 | customer2@example.com | 9 3 | Customer 3 | customer3@example.com | 9 4 | Customer 4 | customer4@example.com | 9 5 | Customer 5 | customer5@example.com | 9 6 | Customer 6 | customer6@example.com | 9 7 | Customer 7 | customer7@example.com | 9 8 | Customer 8 | customer8@example.com | 9 9 | Customer 9 | customer9@example.com | 9 10 | Customer 10 | customer10@example.com | 9 (10 rows) DROP SCHEMA background_rebalance_parallel CASCADE; TRUNCATE pg_dist_background_job CASCADE; TRUNCATE pg_dist_background_task CASCADE; TRUNCATE pg_dist_background_task_depend; SELECT public.wait_for_resource_cleanup(); wait_for_resource_cleanup --------------------------------------------------------------------- (1 row) select citus_remove_node('localhost', :worker_3_port); citus_remove_node --------------------------------------------------------------------- (1 row) select citus_remove_node('localhost', :worker_4_port); citus_remove_node --------------------------------------------------------------------- (1 row) select citus_remove_node('localhost', :worker_5_port); citus_remove_node --------------------------------------------------------------------- (1 row) select citus_remove_node('localhost', :worker_6_port); citus_remove_node --------------------------------------------------------------------- (1 row) ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;