From ee42af7ad2231eaf415a7db4b6d7b888ced7654d Mon Sep 17 00:00:00 2001 From: ahmet gedemenli Date: Tue, 30 May 2023 12:35:28 +0300 Subject: [PATCH] Add test for rebalancer with single shard tables --- .../expected/single_shard_table_udfs.out | 237 ++++++++++++++++++ .../regress/sql/single_shard_table_udfs.sql | 75 ++++++ 2 files changed, 312 insertions(+) diff --git a/src/test/regress/expected/single_shard_table_udfs.out b/src/test/regress/expected/single_shard_table_udfs.out index 1ae4d19cd..15919a137 100644 --- a/src/test/regress/expected/single_shard_table_udfs.out +++ b/src/test/regress/expected/single_shard_table_udfs.out @@ -233,6 +233,243 @@ SELECT COUNT(*) = 0 FROM pg_dist_shard WHERE logicalrelid::text LIKE '%null_dist t (1 row) +-- create 7 single shard tables, 3 of them are colocated, for testing shard moves / rebalance on them +CREATE TABLE single_shard_table_col1_1 (a INT PRIMARY KEY); +CREATE TABLE single_shard_table_col1_2 (a TEXT PRIMARY KEY); +CREATE TABLE single_shard_table_col1_3 (a TIMESTAMP PRIMARY KEY); +CREATE TABLE single_shard_table_col2_1 (a INT PRIMARY KEY); +CREATE TABLE single_shard_table_col3_1 (a INT PRIMARY KEY); +CREATE TABLE single_shard_table_col4_1 (a INT PRIMARY KEY); +CREATE TABLE single_shard_table_col5_1 (a INT PRIMARY KEY); +SELECT create_distributed_table('single_shard_table_col1_1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('single_shard_table_col1_2', null, colocate_with=>'single_shard_table_col1_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('single_shard_table_col1_3', null, colocate_with=>'single_shard_table_col1_2'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('single_shard_table_col2_1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('single_shard_table_col3_1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('single_shard_table_col4_1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('single_shard_table_col5_1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- initial status +SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; + shardid | nodeport +--------------------------------------------------------------------- + 1820002 | 57638 + 1820003 | 57638 + 1820004 | 57638 + 1820005 | 57637 + 1820006 | 57638 + 1820007 | 57637 + 1820008 | 57638 +(7 rows) + +-- errors out because streaming replicated +SELECT citus_copy_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: Table 'single_shard_table_col2_1' is streaming replicated. Shards of streaming replicated tables cannot be copied +SELECT master_copy_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +WARNING: do_repair argument is deprecated +ERROR: Table 'single_shard_table_col2_1' is streaming replicated. Shards of streaming replicated tables cannot be copied +SELECT citus_copy_shard_placement(1820005, :worker_1_node, :worker_2_node); +ERROR: Table 'single_shard_table_col2_1' is streaming replicated. Shards of streaming replicated tables cannot be copied +-- no changes because it's already balanced +SELECT rebalance_table_shards(); + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +-- same placements +SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; + shardid | nodeport +--------------------------------------------------------------------- + 1820002 | 57638 + 1820003 | 57638 + 1820004 | 57638 + 1820005 | 57637 + 1820006 | 57638 + 1820007 | 57637 + 1820008 | 57638 +(7 rows) + +-- manually move 2 shard from 2 colocation groups to make the cluster unbalanced +SELECT citus_move_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_move_shard_placement(1820007, :worker_1_node, :worker_2_node); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- all placements are located on worker 2 +SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; + shardid | nodeport +--------------------------------------------------------------------- + 1820002 | 57638 + 1820003 | 57638 + 1820004 | 57638 + 1820005 | 57638 + 1820006 | 57638 + 1820007 | 57638 + 1820008 | 57638 +(7 rows) + +-- move some of them to worker 1 to balance the cluster +SELECT rebalance_table_shards(); +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +-- the final status, balanced +SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; + shardid | nodeport +--------------------------------------------------------------------- + 1820002 | 57637 + 1820003 | 57637 + 1820004 | 57637 + 1820005 | 57637 + 1820006 | 57638 + 1820007 | 57638 + 1820008 | 57638 +(7 rows) + +-- verify we didn't break any colocations +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::text LIKE '%single_shard_table_col%' ORDER BY colocationid; + logicalrelid | colocationid +--------------------------------------------------------------------- + single_shard_table_col1_1 | 198001 + single_shard_table_col1_2 | 198001 + single_shard_table_col1_3 | 198001 + single_shard_table_col2_1 | 198002 + single_shard_table_col3_1 | 198003 + single_shard_table_col4_1 | 198004 + single_shard_table_col5_1 | 198005 +(7 rows) + +-- again, manually move 2 shard from 2 colocation groups to make the cluster unbalanced +-- consider using citus_drain_node when the issue is fixed: https://github.com/citusdata/citus/issues/6948 +SELECT citus_move_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_move_shard_placement(1820003, :worker_1_node, :worker_2_node); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- see the plan for moving 4 shards, 3 of them are in the same colocation group +SELECT * FROM get_rebalance_table_shards_plan(); + table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport +--------------------------------------------------------------------- + single_shard_table_col1_1 | 1820002 | 0 | localhost | 57638 | localhost | 57637 + single_shard_table_col1_2 | 1820003 | 0 | localhost | 57638 | localhost | 57637 + single_shard_table_col1_3 | 1820004 | 0 | localhost | 57638 | localhost | 57637 + single_shard_table_col2_1 | 1820005 | 0 | localhost | 57638 | localhost | 57637 +(4 rows) + +-- move some of them to worker 1 to balance the cluster +SELECT 1 FROM citus_rebalance_start(); +NOTICE: Scheduled 2 moves as job xxx +DETAIL: Rebalance scheduled as background job +HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- stop it +SELECT * FROM citus_rebalance_stop(); + citus_rebalance_stop +--------------------------------------------------------------------- + +(1 row) + +-- show rebalance status, see the cancelled job for two moves +SELECT state, details FROM citus_rebalance_status(); + state | details +--------------------------------------------------------------------- + cancelled | {"tasks": [], "task_state_counts": {"cancelled": 2}} +(1 row) + +-- start again +SELECT 1 FROM citus_rebalance_start(); +NOTICE: Scheduled 2 moves as job xxx +DETAIL: Rebalance scheduled as background job +HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- show rebalance status, scheduled a job for two moves +SELECT state, details FROM citus_rebalance_status(); + state | details +--------------------------------------------------------------------- + scheduled | {"tasks": [], "task_state_counts": {"runnable": 2}} +(1 row) + +-- wait for rebalance to be completed +SELECT * FROM citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +-- the final status, balanced +SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; + shardid | nodeport +--------------------------------------------------------------------- + 1820002 | 57637 + 1820003 | 57637 + 1820004 | 57637 + 1820005 | 57637 + 1820006 | 57638 + 1820007 | 57638 + 1820008 | 57638 +(7 rows) + -- test update_distributed_table_colocation CREATE TABLE update_col_1 (a INT); CREATE TABLE update_col_2 (a INT); diff --git a/src/test/regress/sql/single_shard_table_udfs.sql b/src/test/regress/sql/single_shard_table_udfs.sql index 7aeacdf0c..a06658701 100644 --- a/src/test/regress/sql/single_shard_table_udfs.sql +++ b/src/test/regress/sql/single_shard_table_udfs.sql @@ -98,6 +98,81 @@ SELECT COUNT(*) = 0 FROM pg_dist_partition WHERE logicalrelid::text LIKE '%null_ SELECT COUNT(*) = 0 FROM pg_dist_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid::text LIKE '%null_dist_key_table%'); SELECT COUNT(*) = 0 FROM pg_dist_shard WHERE logicalrelid::text LIKE '%null_dist_key_table%'; +-- create 7 single shard tables, 3 of them are colocated, for testing shard moves / rebalance on them +CREATE TABLE single_shard_table_col1_1 (a INT PRIMARY KEY); +CREATE TABLE single_shard_table_col1_2 (a TEXT PRIMARY KEY); +CREATE TABLE single_shard_table_col1_3 (a TIMESTAMP PRIMARY KEY); +CREATE TABLE single_shard_table_col2_1 (a INT PRIMARY KEY); +CREATE TABLE single_shard_table_col3_1 (a INT PRIMARY KEY); +CREATE TABLE single_shard_table_col4_1 (a INT PRIMARY KEY); +CREATE TABLE single_shard_table_col5_1 (a INT PRIMARY KEY); +SELECT create_distributed_table('single_shard_table_col1_1', null, colocate_with=>'none'); +SELECT create_distributed_table('single_shard_table_col1_2', null, colocate_with=>'single_shard_table_col1_1'); +SELECT create_distributed_table('single_shard_table_col1_3', null, colocate_with=>'single_shard_table_col1_2'); +SELECT create_distributed_table('single_shard_table_col2_1', null, colocate_with=>'none'); +SELECT create_distributed_table('single_shard_table_col3_1', null, colocate_with=>'none'); +SELECT create_distributed_table('single_shard_table_col4_1', null, colocate_with=>'none'); +SELECT create_distributed_table('single_shard_table_col5_1', null, colocate_with=>'none'); + +-- initial status +SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; + +-- errors out because streaming replicated +SELECT citus_copy_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +SELECT master_copy_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +SELECT citus_copy_shard_placement(1820005, :worker_1_node, :worker_2_node); + +-- no changes because it's already balanced +SELECT rebalance_table_shards(); + +-- same placements +SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; + +-- manually move 2 shard from 2 colocation groups to make the cluster unbalanced +SELECT citus_move_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +SELECT citus_move_shard_placement(1820007, :worker_1_node, :worker_2_node); + +-- all placements are located on worker 2 +SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; + +-- move some of them to worker 1 to balance the cluster +SELECT rebalance_table_shards(); + +-- the final status, balanced +SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; + +-- verify we didn't break any colocations +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::text LIKE '%single_shard_table_col%' ORDER BY colocationid; + +-- again, manually move 2 shard from 2 colocation groups to make the cluster unbalanced +-- consider using citus_drain_node when the issue is fixed: https://github.com/citusdata/citus/issues/6948 +SELECT citus_move_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +SELECT citus_move_shard_placement(1820003, :worker_1_node, :worker_2_node); + +-- see the plan for moving 4 shards, 3 of them are in the same colocation group +SELECT * FROM get_rebalance_table_shards_plan(); + +-- move some of them to worker 1 to balance the cluster +SELECT 1 FROM citus_rebalance_start(); + +-- stop it +SELECT * FROM citus_rebalance_stop(); + +-- show rebalance status, see the cancelled job for two moves +SELECT state, details FROM citus_rebalance_status(); + +-- start again +SELECT 1 FROM citus_rebalance_start(); + +-- show rebalance status, scheduled a job for two moves +SELECT state, details FROM citus_rebalance_status(); + +-- wait for rebalance to be completed +SELECT * FROM citus_rebalance_wait(); + +-- the final status, balanced +SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; + -- test update_distributed_table_colocation CREATE TABLE update_col_1 (a INT); CREATE TABLE update_col_2 (a INT);