Add test for rebalancer with single shard tables (#6949)

Adds test for shard moves / rebalancer with single shard tables
pull/6959/head
Ahmet Gedemenli 2023-05-31 14:58:23 +03:00 committed by GitHub
commit 3cd81a7107
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 328 additions and 0 deletions

View File

@ -233,6 +233,251 @@ SELECT COUNT(*) = 0 FROM pg_dist_shard WHERE logicalrelid::text LIKE '%null_dist
t
(1 row)
-- create 7 single shard tables, 3 of them are colocated, for testing shard moves / rebalance on them
CREATE TABLE single_shard_table_col1_1 (a INT PRIMARY KEY);
CREATE TABLE single_shard_table_col1_2 (a TEXT PRIMARY KEY);
CREATE TABLE single_shard_table_col1_3 (a TIMESTAMP PRIMARY KEY);
CREATE TABLE single_shard_table_col2_1 (a INT PRIMARY KEY);
CREATE TABLE single_shard_table_col3_1 (a INT PRIMARY KEY);
CREATE TABLE single_shard_table_col4_1 (a INT PRIMARY KEY);
CREATE TABLE single_shard_table_col5_1 (a INT PRIMARY KEY);
SELECT create_distributed_table('single_shard_table_col1_1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('single_shard_table_col1_2', null, colocate_with=>'single_shard_table_col1_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('single_shard_table_col1_3', null, colocate_with=>'single_shard_table_col1_2');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('single_shard_table_col2_1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('single_shard_table_col3_1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('single_shard_table_col4_1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('single_shard_table_col5_1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- initial status
SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid;
shardid | nodeport
---------------------------------------------------------------------
1820002 | 57638
1820003 | 57638
1820004 | 57638
1820005 | 57637
1820006 | 57638
1820007 | 57637
1820008 | 57638
(7 rows)
-- errors out because streaming replicated
SELECT citus_copy_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: Table 'single_shard_table_col2_1' is streaming replicated. Shards of streaming replicated tables cannot be copied
SELECT master_copy_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
WARNING: do_repair argument is deprecated
ERROR: Table 'single_shard_table_col2_1' is streaming replicated. Shards of streaming replicated tables cannot be copied
SELECT citus_copy_shard_placement(1820005, :worker_1_node, :worker_2_node);
ERROR: Table 'single_shard_table_col2_1' is streaming replicated. Shards of streaming replicated tables cannot be copied
-- no changes because it's already balanced
SELECT rebalance_table_shards();
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
-- same placements
SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid;
shardid | nodeport
---------------------------------------------------------------------
1820002 | 57638
1820003 | 57638
1820004 | 57638
1820005 | 57637
1820006 | 57638
1820007 | 57637
1820008 | 57638
(7 rows)
-- manually move 2 shard from 2 colocation groups to make the cluster unbalanced
SELECT citus_move_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT citus_move_shard_placement(1820007, :worker_1_node, :worker_2_node);
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- all placements are located on worker 2
SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid;
shardid | nodeport
---------------------------------------------------------------------
1820002 | 57638
1820003 | 57638
1820004 | 57638
1820005 | 57638
1820006 | 57638
1820007 | 57638
1820008 | 57638
(7 rows)
-- move some of them to worker 1 to balance the cluster
SELECT rebalance_table_shards();
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
-- the final status, balanced
SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid;
shardid | nodeport
---------------------------------------------------------------------
1820002 | 57637
1820003 | 57637
1820004 | 57637
1820005 | 57637
1820006 | 57638
1820007 | 57638
1820008 | 57638
(7 rows)
-- verify we didn't break any colocations
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::text LIKE '%single_shard_table_col%' ORDER BY colocationid;
logicalrelid | colocationid
---------------------------------------------------------------------
single_shard_table_col1_1 | 198001
single_shard_table_col1_2 | 198001
single_shard_table_col1_3 | 198001
single_shard_table_col2_1 | 198002
single_shard_table_col3_1 | 198003
single_shard_table_col4_1 | 198004
single_shard_table_col5_1 | 198005
(7 rows)
-- drop preexisting tables
-- we can remove the drop commands once the issue is fixed: https://github.com/citusdata/citus/issues/6948
SET client_min_messages TO ERROR;
DROP TABLE IF EXISTS public.lineitem, public.orders, public.customer_append, public.part_append, public.supplier_single_shard,
public.events, public.users, public.lineitem_hash_part, public.lineitem_subquery, public.orders_hash_part,
public.orders_subquery, public.unlogged_table CASCADE;
DROP SCHEMA IF EXISTS with_basics, subquery_and_ctes CASCADE;
DROP TABLE IF EXISTS public.users_table, public.events_table, public.agg_results, public.agg_results_second, public.agg_results_third, public.agg_results_fourth, public.agg_results_window CASCADE;
-- drain node
SELECT citus_drain_node('localhost', :worker_2_port, 'block_writes');
citus_drain_node
---------------------------------------------------------------------
(1 row)
SELECT citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
RESET client_min_messages;
-- see the plan for moving 4 shards, 3 of them are in the same colocation group
SELECT * FROM get_rebalance_table_shards_plan();
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
single_shard_table_col1_1 | 1820002 | 0 | localhost | 57637 | localhost | 57638
single_shard_table_col1_2 | 1820003 | 0 | localhost | 57637 | localhost | 57638
single_shard_table_col1_3 | 1820004 | 0 | localhost | 57637 | localhost | 57638
single_shard_table_col2_1 | 1820005 | 0 | localhost | 57637 | localhost | 57638
(4 rows)
-- move some of them to worker 2 to balance the cluster
SELECT 1 FROM citus_rebalance_start();
NOTICE: Scheduled 2 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
?column?
---------------------------------------------------------------------
1
(1 row)
-- stop it
SELECT * FROM citus_rebalance_stop();
citus_rebalance_stop
---------------------------------------------------------------------
(1 row)
-- show rebalance status, see the cancelled job for two moves
SELECT state, details FROM citus_rebalance_status();
state | details
---------------------------------------------------------------------
cancelled | {"tasks": [], "task_state_counts": {"cancelled": 2}}
(1 row)
-- start again
SELECT 1 FROM citus_rebalance_start();
NOTICE: Scheduled 2 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
?column?
---------------------------------------------------------------------
1
(1 row)
-- show rebalance status, scheduled a job for two moves
SELECT state, details FROM citus_rebalance_status();
state | details
---------------------------------------------------------------------
scheduled | {"tasks": [], "task_state_counts": {"runnable": 2}}
(1 row)
-- wait for rebalance to be completed
SELECT * FROM citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
-- the final status, balanced
SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid;
shardid | nodeport
---------------------------------------------------------------------
1820002 | 57638
1820003 | 57638
1820004 | 57638
1820005 | 57638
1820006 | 57637
1820007 | 57637
1820008 | 57637
(7 rows)
-- test update_distributed_table_colocation
CREATE TABLE update_col_1 (a INT);
CREATE TABLE update_col_2 (a INT);

View File

@ -98,6 +98,89 @@ SELECT COUNT(*) = 0 FROM pg_dist_partition WHERE logicalrelid::text LIKE '%null_
SELECT COUNT(*) = 0 FROM pg_dist_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid::text LIKE '%null_dist_key_table%');
SELECT COUNT(*) = 0 FROM pg_dist_shard WHERE logicalrelid::text LIKE '%null_dist_key_table%';
-- create 7 single shard tables, 3 of them are colocated, for testing shard moves / rebalance on them
CREATE TABLE single_shard_table_col1_1 (a INT PRIMARY KEY);
CREATE TABLE single_shard_table_col1_2 (a TEXT PRIMARY KEY);
CREATE TABLE single_shard_table_col1_3 (a TIMESTAMP PRIMARY KEY);
CREATE TABLE single_shard_table_col2_1 (a INT PRIMARY KEY);
CREATE TABLE single_shard_table_col3_1 (a INT PRIMARY KEY);
CREATE TABLE single_shard_table_col4_1 (a INT PRIMARY KEY);
CREATE TABLE single_shard_table_col5_1 (a INT PRIMARY KEY);
SELECT create_distributed_table('single_shard_table_col1_1', null, colocate_with=>'none');
SELECT create_distributed_table('single_shard_table_col1_2', null, colocate_with=>'single_shard_table_col1_1');
SELECT create_distributed_table('single_shard_table_col1_3', null, colocate_with=>'single_shard_table_col1_2');
SELECT create_distributed_table('single_shard_table_col2_1', null, colocate_with=>'none');
SELECT create_distributed_table('single_shard_table_col3_1', null, colocate_with=>'none');
SELECT create_distributed_table('single_shard_table_col4_1', null, colocate_with=>'none');
SELECT create_distributed_table('single_shard_table_col5_1', null, colocate_with=>'none');
-- initial status
SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid;
-- errors out because streaming replicated
SELECT citus_copy_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
SELECT master_copy_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
SELECT citus_copy_shard_placement(1820005, :worker_1_node, :worker_2_node);
-- no changes because it's already balanced
SELECT rebalance_table_shards();
-- same placements
SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid;
-- manually move 2 shard from 2 colocation groups to make the cluster unbalanced
SELECT citus_move_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
SELECT citus_move_shard_placement(1820007, :worker_1_node, :worker_2_node);
-- all placements are located on worker 2
SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid;
-- move some of them to worker 1 to balance the cluster
SELECT rebalance_table_shards();
-- the final status, balanced
SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid;
-- verify we didn't break any colocations
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::text LIKE '%single_shard_table_col%' ORDER BY colocationid;
-- drop preexisting tables
-- we can remove the drop commands once the issue is fixed: https://github.com/citusdata/citus/issues/6948
SET client_min_messages TO ERROR;
DROP TABLE IF EXISTS public.lineitem, public.orders, public.customer_append, public.part_append, public.supplier_single_shard,
public.events, public.users, public.lineitem_hash_part, public.lineitem_subquery, public.orders_hash_part,
public.orders_subquery, public.unlogged_table CASCADE;
DROP SCHEMA IF EXISTS with_basics, subquery_and_ctes CASCADE;
DROP TABLE IF EXISTS public.users_table, public.events_table, public.agg_results, public.agg_results_second, public.agg_results_third, public.agg_results_fourth, public.agg_results_window CASCADE;
-- drain node
SELECT citus_drain_node('localhost', :worker_2_port, 'block_writes');
SELECT citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
RESET client_min_messages;
-- see the plan for moving 4 shards, 3 of them are in the same colocation group
SELECT * FROM get_rebalance_table_shards_plan();
-- move some of them to worker 2 to balance the cluster
SELECT 1 FROM citus_rebalance_start();
-- stop it
SELECT * FROM citus_rebalance_stop();
-- show rebalance status, see the cancelled job for two moves
SELECT state, details FROM citus_rebalance_status();
-- start again
SELECT 1 FROM citus_rebalance_start();
-- show rebalance status, scheduled a job for two moves
SELECT state, details FROM citus_rebalance_status();
-- wait for rebalance to be completed
SELECT * FROM citus_rebalance_wait();
-- the final status, balanced
SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid;
-- test update_distributed_table_colocation
CREATE TABLE update_col_1 (a INT);
CREATE TABLE update_col_2 (a INT);