diff --git a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql index be4e0e62f..09cb5c2e3 100644 --- a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql +++ b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql @@ -3,6 +3,5 @@ -- bump version to 13.2-1 #include "udfs/worker_last_saved_explain_analyze/13.2-1.sql" -DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode); #include "udfs/citus_rebalance_start/13.2-1.sql" #include "udfs/citus_internal_copy_single_shard_placement/13.2-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql index cfdf1eb60..22a56810d 100644 --- a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql @@ -1,8 +1,7 @@ -- citus--13.2-1--13.1-1 -DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean); -DROP FUNCTION IF EXISTS citus_internal.citus_internal_copy_single_shard_placement(bigint, integer, integer, integer, citus.shard_transfer_mode); -#include "udfs/citus_rebalance_start/11.1-1.sql" -- downgrade version to 13.1-1 +DROP FUNCTION IF EXISTS citus_internal.citus_internal_copy_single_shard_placement(bigint, integer, integer, integer, citus.shard_transfer_mode); +#include "../udfs/citus_rebalance_start/11.1-1.sql" DROP FUNCTION IF EXISTS pg_catalog.worker_last_saved_explain_analyze(); #include "../udfs/worker_last_saved_explain_analyze/9.4-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_start/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_rebalance_start/11.1-1.sql index cc84d3142..f9545a6bf 100644 --- a/src/backend/distributed/sql/udfs/citus_rebalance_start/11.1-1.sql +++ b/src/backend/distributed/sql/udfs/citus_rebalance_start/11.1-1.sql @@ -1,3 +1,5 @@ +DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean); + CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start( rebalance_strategy name DEFAULT NULL, drain_only boolean DEFAULT false, diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql b/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql index 658e78dd6..b3886f7a2 100644 --- a/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql +++ b/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql @@ -1,3 +1,5 @@ +DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode); + CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start( rebalance_strategy name DEFAULT NULL, drain_only boolean DEFAULT false, diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql b/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql index 658e78dd6..b3886f7a2 100644 --- a/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql @@ -1,3 +1,5 @@ +DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode); + CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start( rebalance_strategy name DEFAULT NULL, drain_only boolean DEFAULT false, diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 5a98c9cdd..cc872b17b 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -139,7 +139,6 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) * DROP TABLE and create_reference_table calls so that the list of reference tables we * operate on are stable. * - * * Since the changes to the reference table placements are made via loopback * connections we release the locks held at the end of this function. Due to Citus * only running transactions in READ COMMITTED mode we can be sure that other @@ -289,9 +288,9 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) } /* - * Since reference tables have been copied via a loopback connection we do not have to - * retain our locks. Since Citus only runs well in READ COMMITTED mode we can be sure - * that other transactions will find the reference tables copied. + * Since reference tables have been copied via a loopback connection we do not have + * to retain our locks. Since Citus only runs well in READ COMMITTED mode we can be + * sure that other transactions will find the reference tables copied. * We have obtained and held multiple locks, here we unlock them all in the reverse * order we have obtained them in. */ @@ -306,8 +305,8 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) /* * ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes is essentially a * twin of EnsureReferenceTablesExistOnAllNodesExtended. The difference is instead of - * copying the missing tables on to the worker nodes this function creates the background tasks - * for each required copy operation and schedule it in the background job. + * copying the missing tables on to the worker nodes this function creates the background + * tasks for each required copy operation and schedule it in the background job. * Another difference is that instead of moving all the colocated shards sequencially * this function creates a seperate background task for each shard, even when the shards * are part of same colocated shard group. @@ -502,8 +501,8 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr GetGlobalPID()); /* - * In first step just create and load data in the shards but defer the creation - * of the shard relationships to the next step. + * In first step just create and load data in the shards but defer the + * creation of the shard relationships to the next step. * The reason we want to defer the creation of the shard relationships is that * we want to make sure that all the parallel shard copy task are finished * before we create the relationships. Otherwise we might end up with @@ -511,7 +510,9 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr * create the shard relationships will result in ERROR. */ appendStringInfo(&buf, - "SELECT citus_internal.citus_internal_copy_single_shard_placement(%ld,%u,%u,%u,%s)", + "SELECT " + "citus_internal.citus_internal_copy_single_shard_placement" + "(%ld,%u,%u,%u,%s)", shardId, sourceShardPlacement->nodeId, newWorkerNode->nodeId, @@ -582,7 +583,8 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr taskEntry->taskId = task->taskid; ereport(DEBUG2, (errmsg( - "Added hash entry in scheduled task hash with task %ld for shard %ld", + "Added hash entry in scheduled task hash " + "with task %ld for shard %ld", task->taskid, shardId))); } else @@ -618,7 +620,9 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr CITUS_REBALANCER_APPLICATION_NAME_PREFIX, GetGlobalPID()); appendStringInfo(&buf, - "SELECT citus_internal.citus_internal_copy_single_shard_placement(%ld,%u,%u,%u,%s)", + "SELECT " + "citus_internal.citus_internal_copy_single_shard_placement" + "(%ld,%u,%u,%u,%s)", shardId, sourceShardPlacement->nodeId, newWorkerNode->nodeId, @@ -627,7 +631,8 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr ereport(DEBUG2, (errmsg( - "creating relations for reference table '%s' on %s:%d ... QUERY= %s", + "creating relations for reference table '%s' on %s:%d ... " + "QUERY= %s", referenceTableName, newWorkerNode->workerName, newWorkerNode->workerPort, buf.data))); diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 193bdf09f..8772d6944 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -140,6 +140,10 @@ DEPS = { "background_rebalance_parallel": TestDeps( None, ["multi_test_helpers", "multi_cluster_management"], worker_count=6 ), + "background_rebalance_parallel_reference_tables": TestDeps( + None, ["multi_test_helpers", "multi_cluster_management"], + repeatable=False, worker_count=6 + ), "function_propagation": TestDeps("minimal_schedule"), "citus_shards": TestDeps("minimal_schedule"), "grant_on_foreign_server_propagation": TestDeps("minimal_schedule"), diff --git a/src/test/regress/expected/background_rebalance_parallel.out b/src/test/regress/expected/background_rebalance_parallel.out index cc3470de9..90f13d701 100644 --- a/src/test/regress/expected/background_rebalance_parallel.out +++ b/src/test/regress/expected/background_rebalance_parallel.out @@ -534,7 +534,7 @@ FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; job_id | task_id | status | nodes_involved --------------------------------------------------------------------- 17779 | 1013 | done | {50,56} - 17779 | 1014 | running | {50,57} + 17779 | 1014 | done | {50,57} 17779 | 1015 | running | {50,56} 17779 | 1016 | blocked | {50,57} 17779 | 1017 | runnable | {50,56} diff --git a/src/test/regress/expected/background_rebalance_parallel_reference_tables.out b/src/test/regress/expected/background_rebalance_parallel_reference_tables.out new file mode 100644 index 000000000..fe248945b --- /dev/null +++ b/src/test/regress/expected/background_rebalance_parallel_reference_tables.out @@ -0,0 +1,457 @@ +-- +-- BACKGROUND_REBALANCE_PARALLEL_REFERENCE_TABLES +-- +-- Test to check if the background tasks scheduled for moving reference tables +-- shards in parallel by the background rebalancer have the correct dependencies +-- +CREATE SCHEMA background_rebalance_parallel; +SET search_path TO background_rebalance_parallel; +SET citus.next_shard_id TO 85674000; +SET citus.shard_replication_factor TO 1; +SET client_min_messages TO ERROR; +ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; +ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050; +SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50; +SELECT 1 FROM master_remove_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_add_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +ALTER SYSTEM SET citus.max_background_task_executors_per_node = 5; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group +CREATE TABLE table1_colg1 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg1 (b int PRIMARY KEY); +SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group +CREATE TABLE table1_colg2 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg2 (b int primary key); +SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group +CREATE TABLE table1_colg3 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg3 (b int primary key); +SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Create reference tables with primary-foreign key relationships +CREATE TABLE customers ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + email TEXT UNIQUE NOT NULL +); +CREATE TABLE orders ( + id SERIAL PRIMARY KEY, + customer_id INTEGER NOT NULL REFERENCES customers(id), + order_date DATE NOT NULL DEFAULT CURRENT_DATE +); +CREATE TABLE order_items ( + id SERIAL PRIMARY KEY, + order_id INTEGER NOT NULL REFERENCES orders(id), + product_name TEXT NOT NULL, + quantity INTEGER NOT NULL, + price NUMERIC(10, 2) NOT NULL +); +SELECT create_reference_table('customers'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('orders'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('order_items'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- INSERT SOME DATA +-- Insert 10 customers +INSERT INTO customers (name, email) +SELECT + 'Customer ' || i, + 'customer' || i || '@example.com' +FROM generate_series(1, 10) AS i; +-- Insert 30 orders: each customer gets 3 orders +INSERT INTO orders (customer_id, order_date) +SELECT + (i % 10) + 1, -- customer_id between 1 and 10 + CURRENT_DATE - (i % 7) +FROM generate_series(1, 30) AS i; +-- Insert 90 order_items: each order has 3 items +INSERT INTO order_items (order_id, product_name, quantity, price) +SELECT + (i % 30) + 1, -- order_id between 1 and 30 + 'Product ' || (i % 5 + 1), + (i % 10) + 1, + round((random() * 100 + 10)::numeric, 2) +FROM generate_series(1, 90) AS i; +-- Add two new nodes so that we can rebalance +SELECT 1 FROM citus_add_node('localhost', :worker_3_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_4_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid; + table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport +--------------------------------------------------------------------- + table1_colg1 | 85674000 | 0 | localhost | 57637 | localhost | 57640 + table1_colg1 | 85674001 | 0 | localhost | 57638 | localhost | 57639 + table2_colg1 | 85674004 | 0 | localhost | 57637 | localhost | 57640 + table2_colg1 | 85674005 | 0 | localhost | 57638 | localhost | 57639 + table1_colg2 | 85674008 | 0 | localhost | 57637 | localhost | 57640 + table1_colg2 | 85674009 | 0 | localhost | 57638 | localhost | 57639 + table2_colg2 | 85674012 | 0 | localhost | 57637 | localhost | 57640 + table2_colg2 | 85674013 | 0 | localhost | 57638 | localhost | 57639 + table1_colg3 | 85674016 | 0 | localhost | 57637 | localhost | 57640 + table1_colg3 | 85674017 | 0 | localhost | 57638 | localhost | 57639 + table2_colg3 | 85674020 | 0 | localhost | 57637 | localhost | 57640 + table2_colg3 | 85674021 | 0 | localhost | 57638 | localhost | 57639 +(12 rows) + +SELECT citus_rebalance_start AS job_id from citus_rebalance_start( + shard_transfer_mode := 'force_logical', + parallel_transfer_colocated_shards := true, + parallel_transfer_reference_tables := true) \gset +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +-- see the dependencies of the tasks scheduled by the background rebalancer +SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on; + job_id | task_id | depends_on +--------------------------------------------------------------------- + 17777 | 1001 | 1000 + 17777 | 1002 | 1000 + 17777 | 1002 | 1001 + 17777 | 1003 | 1000 + 17777 | 1003 | 1001 + 17777 | 1003 | 1002 + 17777 | 1005 | 1004 + 17777 | 1006 | 1004 + 17777 | 1006 | 1005 + 17777 | 1007 | 1004 + 17777 | 1007 | 1005 + 17777 | 1007 | 1006 + 17777 | 1008 | 1003 + 17777 | 1008 | 1007 + 17777 | 1009 | 1003 + 17777 | 1009 | 1007 + 17777 | 1010 | 1003 + 17777 | 1010 | 1007 + 17777 | 1011 | 1003 + 17777 | 1011 | 1007 + 17777 | 1012 | 1003 + 17777 | 1012 | 1007 + 17777 | 1013 | 1003 + 17777 | 1013 | 1007 +(24 rows) + +-- Temporary hack to eliminate SET application name from command until we get the +-- background job enhancement done. +SELECT D.task_id, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + task_id | command | depends_on | command +--------------------------------------------------------------------- + 1001 | 85674025 | 1000 | 85674024 + 1002 | 85674026 | 1000 | 85674024 + 1002 | 85674026 | 1001 | 85674025 + 1003 | 85674026 | 1000 | 85674024 + 1003 | 85674026 | 1001 | 85674025 + 1003 | 85674026 | 1002 | 85674026 + 1005 | 85674025 | 1004 | 85674024 + 1006 | 85674026 | 1004 | 85674024 + 1006 | 85674026 | 1005 | 85674025 + 1007 | 85674026 | 1004 | 85674024 + 1007 | 85674026 | 1005 | 85674025 + 1007 | 85674026 | 1006 | 85674026 + 1008 | 85674001 | 1003 | 85674026 + 1008 | 85674001 | 1007 | 85674026 + 1009 | 85674000 | 1003 | 85674026 + 1009 | 85674000 | 1007 | 85674026 + 1010 | 85674009 | 1003 | 85674026 + 1010 | 85674009 | 1007 | 85674026 + 1011 | 85674008 | 1003 | 85674026 + 1011 | 85674008 | 1007 | 85674026 + 1012 | 85674017 | 1003 | 85674026 + 1012 | 85674017 | 1007 | 85674026 + 1013 | 85674016 | 1003 | 85674026 + 1013 | 85674016 | 1007 | 85674026 +(24 rows) + +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; +-- Drain worker_3 so that we can move only one colocation group to worker_3 +-- to create an unbalance that would cause parallel rebalancing. +SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); +-- Move all the shards of Colocation group 3 to worker_3. +SELECT +master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port +ORDER BY + shardid; + master_move_shard_placement +--------------------------------------------------------------------- + + + + +(4 rows) + +CALL citus_cleanup_orphaned_resources(); +-- Activate and new nodes so that we can rebalance. +SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_5_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_6_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_rebalance_start AS job_id from citus_rebalance_start( + shard_transfer_mode := 'block_writes', + parallel_transfer_colocated_shards := true, + parallel_transfer_reference_tables := true) \gset +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +-- see the dependencies of the tasks scheduled by the background rebalancer +SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on; + job_id | task_id | depends_on +--------------------------------------------------------------------- + 17778 | 1015 | 1014 + 17778 | 1016 | 1014 + 17778 | 1016 | 1015 + 17778 | 1017 | 1014 + 17778 | 1017 | 1015 + 17778 | 1017 | 1016 + 17778 | 1019 | 1018 + 17778 | 1020 | 1018 + 17778 | 1020 | 1019 + 17778 | 1021 | 1018 + 17778 | 1021 | 1019 + 17778 | 1021 | 1020 + 17778 | 1022 | 1017 + 17778 | 1022 | 1021 + 17778 | 1023 | 1017 + 17778 | 1023 | 1021 + 17778 | 1024 | 1017 + 17778 | 1024 | 1021 + 17778 | 1025 | 1017 + 17778 | 1025 | 1021 +(20 rows) + +-- Temporary hack to eliminate SET application name from command until we get the +-- background job enhancement done. +SELECT D.task_id, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + task_id | command | depends_on | command +--------------------------------------------------------------------- + 1015 | 85674025 | 1014 | 85674024 + 1016 | 85674026 | 1014 | 85674024 + 1016 | 85674026 | 1015 | 85674025 + 1017 | 85674026 | 1014 | 85674024 + 1017 | 85674026 | 1015 | 85674025 + 1017 | 85674026 | 1016 | 85674026 + 1019 | 85674025 | 1018 | 85674024 + 1020 | 85674026 | 1018 | 85674024 + 1020 | 85674026 | 1019 | 85674025 + 1021 | 85674026 | 1018 | 85674024 + 1021 | 85674026 | 1019 | 85674025 + 1021 | 85674026 | 1020 | 85674026 + 1022 | 85674016 | 1017 | 85674026 + 1022 | 85674016 | 1021 | 85674026 + 1023 | 85674017 | 1017 | 85674026 + 1023 | 85674017 | 1021 | 85674026 + 1024 | 85674003 | 1017 | 85674026 + 1024 | 85674003 | 1021 | 85674026 + 1025 | 85674001 | 1017 | 85674026 + 1025 | 85674001 | 1021 | 85674026 +(20 rows) + +DROP SCHEMA background_rebalance_parallel CASCADE; +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_3_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_4_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_5_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_6_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; diff --git a/src/test/regress/expected/background_task_queue_monitor.out b/src/test/regress/expected/background_task_queue_monitor.out index 1d4006377..74570a4d2 100644 --- a/src/test/regress/expected/background_task_queue_monitor.out +++ b/src/test/regress/expected/background_task_queue_monitor.out @@ -252,9 +252,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable) job_id | task_id | status --------------------------------------------------------------------- - 1450005 | 1450009 | running - 1450005 | 1450010 | running - 1450005 | 1450011 | running + 1450005 | 1450009 | done + 1450005 | 1450010 | done + 1450005 | 1450011 | done 1450006 | 1450012 | running 1450007 | 1450013 | runnable (5 rows) @@ -282,9 +282,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that last task is running job_id | task_id | status --------------------------------------------------------------------- - 1450005 | 1450009 | running - 1450005 | 1450010 | running - 1450005 | 1450011 | running + 1450005 | 1450009 | done + 1450005 | 1450010 | done + 1450005 | 1450011 | done 1450006 | 1450012 | cancelled 1450007 | 1450013 | running (5 rows) @@ -318,9 +318,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that multiple cancels worked job_id | task_id | status --------------------------------------------------------------------- - 1450005 | 1450009 | cancelled - 1450005 | 1450010 | cancelled - 1450005 | 1450011 | cancelled + 1450005 | 1450009 | done + 1450005 | 1450010 | done + 1450005 | 1450011 | done 1450006 | 1450012 | cancelled 1450007 | 1450013 | cancelled (5 rows) @@ -372,9 +372,9 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; -- show that last task is not running but ready to run(runnable) task_id | status --------------------------------------------------------------------- - 1450014 | running - 1450015 | running - 1450016 | running + 1450014 | done + 1450015 | done + 1450016 | done 1450017 | running 1450018 | runnable (5 rows) @@ -397,9 +397,9 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; -- show that last task is running task_id | status --------------------------------------------------------------------- - 1450014 | running - 1450015 | running - 1450016 | running + 1450014 | done + 1450015 | done + 1450016 | done 1450017 | running 1450018 | running (5 rows) @@ -445,9 +445,9 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; -- show that all tasks are cancelled task_id | status --------------------------------------------------------------------- - 1450014 | cancelled - 1450015 | cancelled - 1450016 | cancelled + 1450014 | done + 1450015 | done + 1450016 | done 1450017 | cancelled 1450018 | cancelled (5 rows) @@ -693,9 +693,9 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task --------------------------------------------------------------------- 1450017 | 1450025 | running | {1,2} 1450017 | 1450026 | running | {3,4} - 1450017 | 1450027 | runnable | {1,2} - 1450017 | 1450028 | runnable | {1,3} - 1450017 | 1450029 | runnable | {2,4} + 1450017 | 1450027 | running | {1,2} + 1450017 | 1450028 | running | {1,3} + 1450017 | 1450029 | running | {2,4} 1450017 | 1450030 | runnable | {1,2} 1450017 | 1450031 | runnable | {1,3} 1450017 | 1450032 | runnable | {1,4} @@ -750,7 +750,7 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task 1450017 | 1450027 | running | {1,2} 1450017 | 1450028 | running | {1,3} 1450017 | 1450029 | running | {2,4} - 1450017 | 1450030 | runnable | {1,2} + 1450017 | 1450030 | running | {1,2} 1450017 | 1450031 | runnable | {1,3} 1450017 | 1450032 | runnable | {1,4} (8 rows) @@ -810,7 +810,7 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task 1450017 | 1450027 | done | {1,2} 1450017 | 1450028 | done | {1,3} 1450017 | 1450029 | done | {2,4} - 1450017 | 1450030 | running | {1,2} + 1450017 | 1450030 | done | {1,2} 1450017 | 1450031 | running | {1,3} 1450017 | 1450032 | running | {1,4} (8 rows) @@ -825,15 +825,15 @@ SELECT pg_reload_conf(); -- if pg_cancel_backend is called on one of the running task PIDs -- task doesn't restart because it's not allowed anymore by the limit. -- node with id 1 can be used only once, unless there are previously running tasks -SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset -SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process +SELECT pid AS task_id7_pid FROM pg_dist_background_task WHERE task_id IN (:task_id7) \gset +SELECT pg_cancel_backend(:task_id7_pid); -- cancel task_id7 process pg_cancel_backend --------------------------------------------------------------------- t (1 row) -- task goes to only runnable state, not running anymore. -SELECT citus_task_wait(:task_id6, desired_status => 'runnable'); +SELECT citus_task_wait(:task_id7, desired_status => 'runnable'); citus_task_wait --------------------------------------------------------------------- @@ -851,8 +851,8 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task 1450017 | 1450027 | done | {1,2} 1450017 | 1450028 | done | {1,3} 1450017 | 1450029 | done | {2,4} - 1450017 | 1450030 | runnable | {1,2} - 1450017 | 1450031 | running | {1,3} + 1450017 | 1450030 | done | {1,2} + 1450017 | 1450031 | runnable | {1,3} 1450017 | 1450032 | running | {1,4} (8 rows) @@ -868,7 +868,7 @@ SELECT citus_task_wait(:task_id8, desired_status => 'done'); (1 row) -SELECT citus_task_wait(:task_id6, desired_status => 'running'); +SELECT citus_task_wait(:task_id6, desired_status => 'done'); citus_task_wait --------------------------------------------------------------------- @@ -880,16 +880,16 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5, :task_id6, :task_id7, :task_id8) ORDER BY job_id, task_id; - job_id | task_id | status | nodes_involved + job_id | task_id | status | nodes_involved --------------------------------------------------------------------- - 1450017 | 1450025 | done | {1,2} - 1450017 | 1450026 | done | {3,4} - 1450017 | 1450027 | done | {1,2} - 1450017 | 1450028 | done | {1,3} - 1450017 | 1450029 | done | {2,4} - 1450017 | 1450030 | running | {1,2} - 1450017 | 1450031 | done | {1,3} - 1450017 | 1450032 | done | {1,4} + 1450017 | 1450025 | done | {1,2} + 1450017 | 1450026 | done | {3,4} + 1450017 | 1450027 | done | {1,2} + 1450017 | 1450028 | done | {1,3} + 1450017 | 1450029 | done | {2,4} + 1450017 | 1450030 | done | {1,2} + 1450017 | 1450031 | done | {1,3} + 1450017 | 1450032 | done | {1,4} (8 rows) SELECT citus_job_cancel(:job_id1); diff --git a/src/test/regress/expected/isolation_logical_replication_single_shard_commands.out b/src/test/regress/expected/isolation_logical_replication_single_shard_commands.out index cbb5e3d8d..979d10c14 100644 --- a/src/test/regress/expected/isolation_logical_replication_single_shard_commands.out +++ b/src/test/regress/expected/isolation_logical_replication_single_shard_commands.out @@ -594,10 +594,15 @@ step s2-move-placement: SELECT master_move_shard_placement( get_shard_id_for_distribution_column('logical_replicate_placement', 4), 'localhost', 57637, 'localhost', 57638); + +step s1-end: + COMMIT; -ERROR: could not acquire the lock required to move public.logical_replicate_placement -step s1-end: - COMMIT; +step s2-move-placement: <... completed> +master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) step s2-end: COMMIT; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index defe41f0d..27b006bf8 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1503,9 +1503,12 @@ ALTER EXTENSION citus UPDATE TO '13.2-1'; SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- + function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) bigint | function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision) | - | function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision) -(2 rows) + | function citus_internal.citus_internal_copy_single_shard_placement(bigint,integer,integer,integer,citus.shard_transfer_mode) void + | function citus_rebalance_start(name,boolean,citus.shard_transfer_mode,boolean,boolean) bigint + | function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision) +(5 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 030228fe3..c47c312bb 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -29,7 +29,7 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND pg_catalog.pg_describe_object(classid, objid, 0) != 'function any_value(anyelement)' AND pg_catalog.pg_describe_object(classid, objid, 0) != 'function any_value_agg(anyelement,anyelement)' ORDER BY 1; - description + description --------------------------------------------------------------------- event trigger citus_cascade_to_partition function alter_distributed_table(regclass,text,integer,text,boolean) @@ -85,6 +85,7 @@ ORDER BY 1; function citus_internal.add_shard_metadata(regclass,bigint,"char",text,text) function citus_internal.add_tenant_schema(oid,integer) function citus_internal.adjust_local_clock_to_remote(cluster_clock) + function citus_internal.citus_internal_copy_single_shard_placement(bigint,integer,integer,integer,citus.shard_transfer_mode) function citus_internal.database_command(text) function citus_internal.delete_colocation_metadata(integer) function citus_internal.delete_partition_metadata(regclass) @@ -155,7 +156,7 @@ ORDER BY 1; function citus_pid_for_gpid(bigint) function citus_prepare_pg_upgrade() function citus_query_stats() - function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) + function citus_rebalance_start(name,boolean,citus.shard_transfer_mode,boolean,boolean) function citus_rebalance_status(boolean) function citus_rebalance_stop() function citus_rebalance_wait() @@ -393,6 +394,6 @@ ORDER BY 1; view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(362 rows) +(363 rows) DROP TABLE extension_basic_types; diff --git a/src/test/regress/operations_schedule b/src/test/regress/operations_schedule index 6dbc303c2..5639ea4d6 100644 --- a/src/test/regress/operations_schedule +++ b/src/test/regress/operations_schedule @@ -13,3 +13,4 @@ test: multi_colocated_shard_rebalance test: cpu_priority test: check_mx test: citus_drain_node +test: background_rebalance_parallel_reference_tables diff --git a/src/test/regress/sql/background_rebalance_parallel_reference_tables.sql b/src/test/regress/sql/background_rebalance_parallel_reference_tables.sql new file mode 100644 index 000000000..e384b78cf --- /dev/null +++ b/src/test/regress/sql/background_rebalance_parallel_reference_tables.sql @@ -0,0 +1,228 @@ +-- +-- BACKGROUND_REBALANCE_PARALLEL_REFERENCE_TABLES +-- +-- Test to check if the background tasks scheduled for moving reference tables +-- shards in parallel by the background rebalancer have the correct dependencies +-- +CREATE SCHEMA background_rebalance_parallel; +SET search_path TO background_rebalance_parallel; +SET citus.next_shard_id TO 85674000; +SET citus.shard_replication_factor TO 1; +SET client_min_messages TO ERROR; + +ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; +ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050; + +SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50; + +SELECT 1 FROM master_remove_node('localhost', :worker_1_port); +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); + +SELECT 1 FROM master_add_node('localhost', :worker_1_port); +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +ALTER SYSTEM SET citus.max_background_task_executors_per_node = 5; +SELECT pg_reload_conf(); + +-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group +CREATE TABLE table1_colg1 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none'); + +CREATE TABLE table2_colg1 (b int PRIMARY KEY); + +SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); + +-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group +CREATE TABLE table1_colg2 (a int PRIMARY KEY); + +SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none'); + +CREATE TABLE table2_colg2 (b int primary key); + +SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); + +-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group +CREATE TABLE table1_colg3 (a int PRIMARY KEY); + +SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none'); + +CREATE TABLE table2_colg3 (b int primary key); + +SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); + +-- Create reference tables with primary-foreign key relationships + +CREATE TABLE customers ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + email TEXT UNIQUE NOT NULL +); + +CREATE TABLE orders ( + id SERIAL PRIMARY KEY, + customer_id INTEGER NOT NULL REFERENCES customers(id), + order_date DATE NOT NULL DEFAULT CURRENT_DATE +); + +CREATE TABLE order_items ( + id SERIAL PRIMARY KEY, + order_id INTEGER NOT NULL REFERENCES orders(id), + product_name TEXT NOT NULL, + quantity INTEGER NOT NULL, + price NUMERIC(10, 2) NOT NULL +); + +SELECT create_reference_table('customers'); +SELECT create_reference_table('orders'); +SELECT create_reference_table('order_items'); + +-- INSERT SOME DATA +-- Insert 10 customers +INSERT INTO customers (name, email) +SELECT + 'Customer ' || i, + 'customer' || i || '@example.com' +FROM generate_series(1, 10) AS i; + +-- Insert 30 orders: each customer gets 3 orders +INSERT INTO orders (customer_id, order_date) +SELECT + (i % 10) + 1, -- customer_id between 1 and 10 + CURRENT_DATE - (i % 7) +FROM generate_series(1, 30) AS i; + +-- Insert 90 order_items: each order has 3 items +INSERT INTO order_items (order_id, product_name, quantity, price) +SELECT + (i % 30) + 1, -- order_id between 1 and 30 + 'Product ' || (i % 5 + 1), + (i % 10) + 1, + round((random() * 100 + 10)::numeric, 2) +FROM generate_series(1, 90) AS i; + + +-- Add two new nodes so that we can rebalance +SELECT 1 FROM citus_add_node('localhost', :worker_3_port); +SELECT 1 FROM citus_add_node('localhost', :worker_4_port); + +SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid; + +SELECT citus_rebalance_start AS job_id from citus_rebalance_start( + shard_transfer_mode := 'force_logical', + parallel_transfer_colocated_shards := true, + parallel_transfer_reference_tables := true) \gset + +SELECT citus_rebalance_wait(); + +-- see the dependencies of the tasks scheduled by the background rebalancer +SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on; + +-- Temporary hack to eliminate SET application name from command until we get the +-- background job enhancement done. +SELECT D.task_id, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + + +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; + +-- Drain worker_3 so that we can move only one colocation group to worker_3 +-- to create an unbalance that would cause parallel rebalancing. +SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); +SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true); + +CALL citus_cleanup_orphaned_resources(); + +-- Move all the shards of Colocation group 3 to worker_3. +SELECT +master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port +ORDER BY + shardid; + +CALL citus_cleanup_orphaned_resources(); + +-- Activate and new nodes so that we can rebalance. +SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); +SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true); + +SELECT 1 FROM citus_add_node('localhost', :worker_5_port); +SELECT 1 FROM citus_add_node('localhost', :worker_6_port); + +SELECT citus_rebalance_start AS job_id from citus_rebalance_start( + shard_transfer_mode := 'block_writes', + parallel_transfer_colocated_shards := true, + parallel_transfer_reference_tables := true) \gset + +SELECT citus_rebalance_wait(); + +-- see the dependencies of the tasks scheduled by the background rebalancer +SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on; +-- Temporary hack to eliminate SET application name from command until we get the +-- background job enhancement done. +SELECT D.task_id, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + +DROP SCHEMA background_rebalance_parallel CASCADE; +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; +SELECT public.wait_for_resource_cleanup(); +select citus_remove_node('localhost', :worker_3_port); +select citus_remove_node('localhost', :worker_4_port); +select citus_remove_node('localhost', :worker_5_port); +select citus_remove_node('localhost', :worker_6_port); + +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; diff --git a/src/test/regress/sql/background_task_queue_monitor.sql b/src/test/regress/sql/background_task_queue_monitor.sql index 9f6abb73a..6c6a3b078 100644 --- a/src/test/regress/sql/background_task_queue_monitor.sql +++ b/src/test/regress/sql/background_task_queue_monitor.sql @@ -345,11 +345,11 @@ SELECT pg_reload_conf(); -- if pg_cancel_backend is called on one of the running task PIDs -- task doesn't restart because it's not allowed anymore by the limit. -- node with id 1 can be used only once, unless there are previously running tasks -SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset -SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process +SELECT pid AS task_id7_pid FROM pg_dist_background_task WHERE task_id IN (:task_id7) \gset +SELECT pg_cancel_backend(:task_id7_pid); -- cancel task_id7 process -- task goes to only runnable state, not running anymore. -SELECT citus_task_wait(:task_id6, desired_status => 'runnable'); +SELECT citus_task_wait(:task_id7, desired_status => 'runnable'); -- show that cancelled task hasn't restarted because limit doesn't allow it SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task @@ -359,7 +359,7 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task SELECT citus_task_wait(:task_id7, desired_status => 'done'); SELECT citus_task_wait(:task_id8, desired_status => 'done'); -SELECT citus_task_wait(:task_id6, desired_status => 'running'); +SELECT citus_task_wait(:task_id6, desired_status => 'done'); -- show that the 6th task has restarted only after both 6 and 7 are done -- since we have a limit of 1 background task executor per node with id 1