-- -- MUTLI_SHARD_REBALANCER -- SET citus.next_shard_id TO 433000; SET citus.propagate_session_settings_for_loopback_connection TO ON; -- Because of historic reasons this test was written in a way that assumes that -- by_shard_count is the default strategy. SELECT citus_set_default_rebalance_strategy('by_shard_count'); citus_set_default_rebalance_strategy --------------------------------------------------------------------- (1 row) -- Lower the minimum disk size that a shard group is considered as. Otherwise -- we need to create shards of more than 100MB. ALTER SYSTEM SET citus.rebalancer_by_disk_size_base_cost = 0; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) CREATE TABLE ref_table_test(a int primary key); SELECT create_reference_table('ref_table_test'); create_reference_table --------------------------------------------------------------------- (1 row) CREATE TABLE dist_table_test(a int primary key); SELECT create_distributed_table('dist_table_test', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE postgres_table_test(a int primary key); -- make sure that all rebalance operations works fine when -- reference tables are replicated to the coordinator SET client_min_messages TO ERROR; SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); ?column? --------------------------------------------------------------------- 1 (1 row) RESET client_min_messages; -- should just be noops even if we add the coordinator to the pg_dist_node SELECT rebalance_table_shards('dist_table_test'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT rebalance_table_shards(); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); -- test that calling rebalance_table_shards without specifying relation -- wouldn't move shard of the citus local table. SET citus.next_shard_id TO 433100; CREATE TABLE citus_local_table(a int, b int); SELECT citus_add_local_table_to_metadata('citus_local_table'); citus_add_local_table_to_metadata --------------------------------------------------------------------- (1 row) INSERT INTO citus_local_table VALUES (1, 2); SELECT rebalance_table_shards(); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); -- Check that rebalance_table_shards and get_rebalance_table_shards_plan fail -- for any type of table, but distributed tables. SELECT rebalance_table_shards('ref_table_test'); ERROR: table public.ref_table_test is a reference table, moving shard of a reference table is not supported SELECT rebalance_table_shards('postgres_table_test'); ERROR: table public.postgres_table_test is a regular postgres table, you can only move shards of a citus table SELECT rebalance_table_shards('citus_local_table'); ERROR: table public.citus_local_table is a local table, moving shard of a local table added to metadata is currently not supported SELECT get_rebalance_table_shards_plan('ref_table_test'); ERROR: table public.ref_table_test is a reference table, moving shard of a reference table is not supported SELECT get_rebalance_table_shards_plan('postgres_table_test'); ERROR: table public.postgres_table_test is a regular postgres table, you can only move shards of a citus table SELECT get_rebalance_table_shards_plan('citus_local_table'); ERROR: table public.citus_local_table is a local table, moving shard of a local table added to metadata is currently not supported -- Check that citus_move_shard_placement fails for shards belonging reference -- tables or citus local tables SELECT citus_move_shard_placement(433000, 'localhost', :worker_1_port, 'localhost', :worker_2_port); ERROR: table public.ref_table_test is a reference table, moving shard of a reference table is not supported SELECT citus_move_shard_placement(433100, 'localhost', :worker_1_port, 'localhost', :worker_2_port); ERROR: table public.citus_local_table is a local table, moving shard of a local table added to metadata is currently not supported -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; tablename --------------------------------------------------------------------- citus_local_table_433100 (1 row) -- also check that we still can access shard relation, not the shell table SELECT count(*) FROM citus_local_table; count --------------------------------------------------------------------- 1 (1 row) -- verify drain_node uses the localhostname guc by seeing it fail to connect to a non-existing name ALTER SYSTEM SET citus.local_hostname TO 'foobar'; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC pg_sleep --------------------------------------------------------------------- (1 row) SELECT master_drain_node('localhost', :master_port); ERROR: connection to the remote node postgres@foobar:57636 failed with the following error: could not translate host name "foobar" to address: CALL citus_cleanup_orphaned_resources(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC pg_sleep --------------------------------------------------------------------- (1 row) SELECT master_drain_node('localhost', :master_port); master_drain_node --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; tablename --------------------------------------------------------------------- citus_local_table_433100 (1 row) -- also check that we still can access shard relation, not the shell table SELECT count(*) FROM citus_local_table; count --------------------------------------------------------------------- 1 (1 row) -- show that we do not create a shard rebalancing plan for citus local table SELECT get_rebalance_table_shards_plan(); get_rebalance_table_shards_plan --------------------------------------------------------------------- (0 rows) DROP TABLE citus_local_table; CREATE TABLE dist_table_test_2(a int); SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('dist_table_test_2', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) -- Mark tables as coordinator replicated in order to be able to test replicate_table_shards UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN ('dist_table_test_2'::regclass); -- replicate_table_shards should fail when the hostname GUC is set to a non-reachable node ALTER SYSTEM SET citus.local_hostname TO 'foobar'; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC pg_sleep --------------------------------------------------------------------- (1 row) SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes'); ERROR: connection to the remote node postgres@foobar:57636 failed with the following error: could not translate host name "foobar" to address: ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC pg_sleep --------------------------------------------------------------------- (1 row) -- replicate reference table should ignore the coordinator SET citus.node_connection_timeout to '35s'; BEGIN; SET LOCAL citus.shard_replication_factor TO 2; SET citus.log_remote_commands TO ON; SET SESSION citus.max_adaptive_executor_pool_size TO 5; SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes'); NOTICE: issuing CALL citus_cleanup_orphaned_resources() DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=xxxxx' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.max_adaptive_executor_pool_size TO '5'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.next_shard_id TO '433105'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.node_connection_timeout TO '35000'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.propagate_session_settings_for_loopback_connection TO 'on'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=xxxxx' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.max_adaptive_executor_pool_size TO '5'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.next_shard_id TO '433105'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.node_connection_timeout TO '35000'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.propagate_session_settings_for_loopback_connection TO 'on'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=xxxxx' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.max_adaptive_executor_pool_size TO '5'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.next_shard_id TO '433105'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.node_connection_timeout TO '35000'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.propagate_session_settings_for_loopback_connection TO 'on'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=xxxxx' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.max_adaptive_executor_pool_size TO '5'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.next_shard_id TO '433105'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.node_connection_timeout TO '35000'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.propagate_session_settings_for_loopback_connection TO 'on'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_count TO '4'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT pg_catalog.citus_copy_shard_placement(43xxxx,xx,xx,'block_writes') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx replicate_table_shards --------------------------------------------------------------------- (1 row) COMMIT; RESET citus.node_connection_timeout; RESET citus.log_remote_commands; DROP TABLE dist_table_test, dist_table_test_2, ref_table_test, postgres_table_test; RESET citus.shard_count; RESET citus.shard_replication_factor; -- Create a user to test multiuser usage of rebalancer functions -- We explicitely don't create this user on worker nodes yet, so we can -- test some more error handling. We create them later there. SET citus.enable_create_role_propagation TO OFF; CREATE USER testrole; NOTICE: not propagating CREATE ROLE/USER commands to worker nodes HINT: Connect to worker nodes directly to manually create all necessary users and roles. GRANT ALL ON SCHEMA public TO testrole; ERROR: role "testrole" does not exist CONTEXT: while executing command on localhost:xxxxx CREATE OR REPLACE FUNCTION shard_placement_rebalance_array( worker_node_list json[], shard_placement_list json[], threshold float4 DEFAULT 0, max_shard_moves int DEFAULT 1000000, drain_only bool DEFAULT false, improvement_threshold float4 DEFAULT 0.5 ) RETURNS json[] AS 'citus' LANGUAGE C STRICT VOLATILE; CREATE OR REPLACE FUNCTION shard_placement_replication_array(worker_node_list json[], shard_placement_list json[], shard_replication_factor int) RETURNS json[] AS 'citus' LANGUAGE C STRICT VOLATILE; CREATE OR REPLACE FUNCTION worker_node_responsive(worker_node_name text, worker_node_port int) RETURNS boolean AS 'citus' LANGUAGE C STRICT VOLATILE; SET citus.next_shard_id TO 123000; SELECT worker_node_responsive(node_name, node_port::int) FROM master_get_active_worker_nodes() ORDER BY node_name, node_port ASC; worker_node_responsive --------------------------------------------------------------------- t t (2 rows) -- Check that worker_node_responsive returns false for dead nodes -- Note that PostgreSQL tries all possible resolutions of localhost on failing -- connections. This causes different error details to be printed on different -- environments. Therefore, we first set verbosity to terse. \set VERBOSITY terse SELECT worker_node_responsive('localhost', 1); worker_node_responsive --------------------------------------------------------------------- f (1 row) \set VERBOSITY default -- Check that with threshold=0.0 shard_placement_rebalance_array returns enough -- moves to make the cluster completely balanced. SELECT unnest(shard_placement_rebalance_array( ARRAY['{"node_name": "hostname1"}', '{"node_name": "hostname2"}']::json[], ARRAY['{"shardid":1, "nodename":"hostname1"}', '{"shardid":2, "nodename":"hostname1"}', '{"shardid":3, "nodename":"hostname1"}', '{"shardid":4, "nodename":"hostname1"}', '{"shardid":5, "nodename":"hostname1"}', '{"shardid":6, "nodename":"hostname2"}']::json[], 0.0 )); unnest --------------------------------------------------------------------- {"updatetype":1,"shardid":1,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname2","targetport":5432} {"updatetype":1,"shardid":2,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname2","targetport":5432} (2 rows) -- Check that with two nodes and threshold=1.0 shard_placement_rebalance_array -- doesn't return any moves, even if it is completely unbalanced. SELECT unnest(shard_placement_rebalance_array( ARRAY['{"node_name": "hostname1"}', '{"node_name": "hostname2"}']::json[], ARRAY['{"shardid":1, "nodename":"hostname1"}', '{"shardid":2, "nodename":"hostname1"}', '{"shardid":3, "nodename":"hostname1"}']::json[], 1.0 )); unnest --------------------------------------------------------------------- (0 rows) -- Check that with three nodes and threshold=1.0 -- shard_placement_rebalance_array returns moves when it is completely unbalanced SELECT unnest(shard_placement_rebalance_array( ARRAY['{"node_name": "hostname1"}', '{"node_name": "hostname2"}', '{"node_name": "hostname3"}' ]::json[], ARRAY['{"shardid":1, "nodename":"hostname1"}', '{"shardid":2, "nodename":"hostname1"}', '{"shardid":3, "nodename":"hostname1"}']::json[], 1.0 )); unnest --------------------------------------------------------------------- {"updatetype":1,"shardid":1,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname2","targetport":5432} (1 row) -- Check that with with three nodes and threshold=2.0 -- shard_placement_rebalance_array doesn't return any moves, even if it is -- completely unbalanced. (with three nodes) SELECT unnest(shard_placement_rebalance_array( ARRAY['{"node_name": "hostname1"}', '{"node_name": "hostname2"}', '{"node_name": "hostname3"}' ]::json[], ARRAY['{"shardid":1, "nodename":"hostname1"}', '{"shardid":2, "nodename":"hostname1"}', '{"shardid":3, "nodename":"hostname1"}']::json[], 2.0 )); unnest --------------------------------------------------------------------- (0 rows) -- Check that with threshold=0.0 shard_placement_rebalance_array doesn't return -- any moves if the cluster is already balanced. SELECT unnest(shard_placement_rebalance_array( ARRAY['{"node_name": "hostname1"}', '{"node_name": "hostname2"}']::json[], ARRAY['{"shardid":1, "nodename":"hostname1"}', '{"shardid":2, "nodename":"hostname1"}', '{"shardid":3, "nodename":"hostname1"}', '{"shardid":4, "nodename":"hostname2"}', '{"shardid":5, "nodename":"hostname2"}', '{"shardid":6, "nodename":"hostname2"}']::json[], 0.0 )); unnest --------------------------------------------------------------------- (0 rows) -- Check that shard_placement_replication_array returns a shard copy operation -- for each of the shards in an inactive node. SELECT unnest(shard_placement_replication_array( ARRAY['{"node_name": "hostname1"}', '{"node_name": "hostname2"}']::json[], ARRAY['{"shardid":1, "nodename":"hostname1"}', '{"shardid":2, "nodename":"hostname2"}', '{"shardid":1, "nodename":"hostname3"}', '{"shardid":2, "nodename":"hostname3"}']::json[], 2 )); unnest --------------------------------------------------------------------- {"updatetype":2,"shardid":1,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname2","targetport":5432} {"updatetype":2,"shardid":2,"sourcename":"hostname2","sourceport":5432,"targetname":"hostname1","targetport":5432} (2 rows) -- Check that shard_placement_replication_array errors out if all placements of -- a shard are placed on inactive nodes. SELECT unnest(shard_placement_replication_array( ARRAY['{"node_name": "hostname1"}']::json[], ARRAY['{"shardid":1, "nodename":"hostname2"}', '{"shardid":1, "nodename":"hostname3"}']::json[], 2 )); ERROR: could not find a source for shard xxxxx -- Check that shard_placement_replication_array errors out if replication factor -- is more than number of active nodes. SELECT unnest(shard_placement_replication_array( ARRAY['{"node_name": "hostname1"}']::json[], ARRAY['{"shardid":1, "nodename":"hostname1"}']::json[], 2 )); ERROR: could not find a target for shard xxxxx SET client_min_messages TO WARNING; set citus.shard_count = 4; -- Create a distributed table with all shards on a single node, so that we can -- use this as an under-replicated SET citus.shard_replication_factor TO 1; SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); master_set_node_property --------------------------------------------------------------------- (1 row) CREATE TABLE replication_test_table(int_column int); SELECT create_distributed_table('replication_test_table', 'int_column'); create_distributed_table --------------------------------------------------------------------- (1 row) UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'replication_test_table'::regclass; INSERT INTO replication_test_table SELECT * FROM generate_series(1, 100); -- Ensure that shard_replication_factor is 2 during replicate_table_shards -- and rebalance_table_shards tests SET citus.shard_replication_factor TO 2; SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); master_set_node_property --------------------------------------------------------------------- (1 row) CREATE VIEW replication_test_table_placements_per_node AS SELECT count(*) FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard WHERE logicalrelid = 'replication_test_table'::regclass AND shardstate != 4 GROUP BY nodename, nodeport ORDER BY nodename, nodeport; SELECT * FROM replication_test_table_placements_per_node; count --------------------------------------------------------------------- 4 (1 row) -- Test replicate_table_shards, which will in turn test update_shard_placement -- in copy mode. -- Check excluded_shard_list by excluding three shards with smaller ids SELECT replicate_table_shards('replication_test_table', excluded_shard_list := excluded_shard_list, shard_transfer_mode:='block_writes') FROM ( SELECT (array_agg(DISTINCT shardid ORDER BY shardid))[1:3] AS excluded_shard_list FROM pg_dist_shard WHERE logicalrelid = 'replication_test_table'::regclass ) T; replicate_table_shards --------------------------------------------------------------------- (1 row) SELECT * FROM replication_test_table_placements_per_node; count --------------------------------------------------------------------- 1 4 (2 rows) -- Check that with shard_replication_factor=1 we don't do any copies SELECT replicate_table_shards('replication_test_table', shard_replication_factor := 1, shard_transfer_mode:='block_writes'); replicate_table_shards --------------------------------------------------------------------- (1 row) SELECT * FROM replication_test_table_placements_per_node; count --------------------------------------------------------------------- 1 4 (2 rows) -- Check that max_shard_copies limits number of copy operations SELECT replicate_table_shards('replication_test_table', max_shard_copies := 2, shard_transfer_mode:='block_writes'); replicate_table_shards --------------------------------------------------------------------- (1 row) SELECT * FROM replication_test_table_placements_per_node; count --------------------------------------------------------------------- 3 4 (2 rows) -- Replicate the remaining under-replicated shards SELECT replicate_table_shards('replication_test_table', shard_transfer_mode:='block_writes'); replicate_table_shards --------------------------------------------------------------------- (1 row) SELECT * FROM replication_test_table_placements_per_node; count --------------------------------------------------------------------- 4 4 (2 rows) -- Check that querying the table doesn't error out SELECT count(*) FROM replication_test_table; count --------------------------------------------------------------------- 100 (1 row) DROP TABLE public.replication_test_table CASCADE; -- Test rebalance_table_shards, which will in turn test update_shard_placement -- in move mode. SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 6; CREATE TABLE rebalance_test_table(int_column int); SELECT create_distributed_table('rebalance_test_table', 'int_column'); create_distributed_table --------------------------------------------------------------------- (1 row) UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'rebalance_test_table'::regclass; CREATE VIEW table_placements_per_node AS SELECT nodeport, logicalrelid::regclass, count(*) FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard WHERE shardstate != 4 GROUP BY logicalrelid::regclass, nodename, nodeport ORDER BY logicalrelid::regclass, nodename, nodeport; -- Create six shards with replication factor 1 and move them to the same -- node to create an unbalanced cluster. CREATE OR REPLACE PROCEDURE create_unbalanced_shards(rel text) LANGUAGE SQL AS $$ SET citus.shard_replication_factor TO 1; SELECT count(master_move_shard_placement(shardid, src.nodename, src.nodeport::int, dst.nodename, dst.nodeport::int, shard_transfer_mode:='block_writes')) FROM pg_dist_shard s JOIN pg_dist_shard_placement src USING (shardid), (SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass; CALL citus_cleanup_orphaned_resources(); $$; CALL create_unbalanced_shards('rebalance_test_table'); SET citus.shard_replication_factor TO 2; -- Upload the test data to the shards INSERT INTO rebalance_test_table SELECT * FROM generate_series(1, 100); -- Verify that there is one node with all placements SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57638 | rebalance_test_table | 6 (1 row) -- check rebalances use the localhost guc by seeing it fail when the GUC is set to a non-existing host ALTER SYSTEM SET citus.local_hostname TO 'foobar'; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC pg_sleep --------------------------------------------------------------------- (1 row) SELECT rebalance_table_shards('rebalance_test_table', excluded_shard_list := excluded_shard_list, threshold := 0, shard_transfer_mode:='block_writes') FROM ( SELECT (array_agg(DISTINCT shardid ORDER BY shardid))[1:4] AS excluded_shard_list FROM pg_dist_shard WHERE logicalrelid = 'rebalance_test_table'::regclass ) T; ERROR: connection to the remote node postgres@foobar:57636 failed with the following error: could not translate host name "foobar" to address: CALL citus_cleanup_orphaned_resources(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC pg_sleep --------------------------------------------------------------------- (1 row) -- Check excluded_shard_list by excluding four shards with smaller ids SELECT rebalance_table_shards('rebalance_test_table', excluded_shard_list := excluded_shard_list, threshold := 0, shard_transfer_mode:='block_writes') FROM ( SELECT (array_agg(DISTINCT shardid ORDER BY shardid))[1:4] AS excluded_shard_list FROM pg_dist_shard WHERE logicalrelid = 'rebalance_test_table'::regclass ) T; rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | rebalance_test_table | 1 57638 | rebalance_test_table | 5 (2 rows) -- Check that max_shard_moves limits number of move operations -- First check that we error if not table owner -- Turn on NOTICE messages SET ROLE testrole; -- Make sure that rebalance is stopped if source or target nodes are -- unresponsive. SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='block_writes'); ERROR: target node localhost:xxxxx is not responsive \c - - - :worker_1_port SET citus.enable_create_role_propagation TO OFF; CREATE USER testrole; NOTICE: not propagating CREATE ROLE/USER commands to worker nodes HINT: Connect to worker nodes directly to manually create all necessary users and roles. GRANT ALL ON SCHEMA public TO testrole; ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. \c - - - :master_port SET client_min_messages TO WARNING; SET ROLE testrole; SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='block_writes'); ERROR: source node localhost:xxxxx is not responsive \c - - - :worker_2_port SET citus.enable_create_role_propagation TO OFF; CREATE USER testrole; NOTICE: not propagating CREATE ROLE/USER commands to worker nodes HINT: Connect to worker nodes directly to manually create all necessary users and roles. GRANT ALL ON SCHEMA public TO testrole; ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. \c - - - :master_port SET client_min_messages TO WARNING; SET citus.next_shard_id TO 123010; SET ROLE testrole; SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='block_writes'); ERROR: must be owner of table rebalance_test_table CONTEXT: while executing command on localhost:xxxxx RESET ROLE; CALL citus_cleanup_orphaned_resources(); -- Confirm no moves took place at all during these errors SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | rebalance_test_table | 1 57638 | rebalance_test_table | 5 (2 rows) CALL citus_cleanup_orphaned_resources(); SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | rebalance_test_table | 2 57638 | rebalance_test_table | 4 (2 rows) -- Check that threshold=1 doesn't move any shards SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_transfer_mode:='block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | rebalance_test_table | 2 57638 | rebalance_test_table | 4 (2 rows) -- Move the remaining shards using threshold=0 SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_transfer_mode:='block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | rebalance_test_table | 3 57638 | rebalance_test_table | 3 (2 rows) -- Check that shard is completely balanced and rebalancing again doesn't have -- any effects. SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_transfer_mode:='block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | rebalance_test_table | 3 57638 | rebalance_test_table | 3 (2 rows) -- Check that querying the table doesn't error out SELECT count(*) FROM rebalance_test_table; count --------------------------------------------------------------------- 100 (1 row) DROP TABLE rebalance_test_table; -- Test schema support CREATE SCHEMA test_schema_support; SELECT COUNT(*) FROM pg_dist_shard_placement; count --------------------------------------------------------------------- 0 (1 row) CREATE TABLE test_schema_support.nation_hash ( n_nationkey integer not null, n_name char(25) not null, n_regionkey integer not null, n_comment varchar(152) ); SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('test_schema_support.nation_hash', 'n_nationkey', 'hash'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE test_schema_support.nation_hash2 ( n_nationkey integer not null, n_name char(25) not null, n_regionkey integer not null, n_comment varchar(152) ); SELECT create_distributed_table('test_schema_support.nation_hash2', 'n_nationkey', 'hash'); create_distributed_table --------------------------------------------------------------------- (1 row) -- Mark tables as coordinator replicated in order to be able to test replicate_table_shards UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN ('test_schema_support.nation_hash2'::regclass, 'test_schema_support.nation_hash'::regclass); -- Shard count before replication SELECT COUNT(*) FROM pg_dist_shard_placement; count --------------------------------------------------------------------- 8 (1 row) SET search_path TO public; SELECT replicate_table_shards('test_schema_support.nation_hash', shard_replication_factor:=2, max_shard_copies:=1, shard_transfer_mode:='block_writes'); replicate_table_shards --------------------------------------------------------------------- (1 row) -- Confirm replication, both tables replicated due to colocation SELECT COUNT(*) FROM pg_dist_shard_placement; count --------------------------------------------------------------------- 10 (1 row) -- Test with search_path is set SET search_path TO test_schema_support; SELECT replicate_table_shards('nation_hash2', shard_replication_factor:=2, shard_transfer_mode:='block_writes'); replicate_table_shards --------------------------------------------------------------------- (1 row) -- Confirm replication SELECT COUNT(*) FROM pg_dist_shard_placement; count --------------------------------------------------------------------- 16 (1 row) DROP TABLE test_schema_support.nation_hash; DROP TABLE test_schema_support.nation_hash2; -- Test rebalancer with schema -- Next few operations is to create imbalanced distributed table CREATE TABLE test_schema_support.imbalanced_table_local ( id integer not null ); INSERT INTO test_schema_support.imbalanced_table_local VALUES(1); INSERT INTO test_schema_support.imbalanced_table_local VALUES(2); INSERT INTO test_schema_support.imbalanced_table_local VALUES(3); INSERT INTO test_schema_support.imbalanced_table_local VALUES(4); CREATE TABLE test_schema_support.imbalanced_table ( id integer not null ); SET citus.shard_count = 3; SET citus.shard_replication_factor TO 1; SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); master_set_node_property --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('test_schema_support.imbalanced_table', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO test_schema_support.imbalanced_table SELECT * FROM generate_series(1, 100); UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'test_schema_support.imbalanced_table'::regclass; SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); master_set_node_property --------------------------------------------------------------------- (1 row) SET citus.shard_count = 4; -- copy one of the shards to the other node, this is to test that the -- rebalancer takes into account all copies of a placement SET citus.shard_replication_factor TO 2; SELECT replicate_table_shards('test_schema_support.imbalanced_table', max_shard_copies := 1, shard_transfer_mode := 'block_writes'); replicate_table_shards --------------------------------------------------------------------- (1 row) SET citus.shard_replication_factor TO 1; -- imbalanced_table is now imbalanced -- Shard counts in each node before rebalance SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | imbalanced_table | 1 57638 | imbalanced_table | 3 (2 rows) -- Row count in imbalanced table before rebalance SELECT COUNT(*) FROM imbalanced_table; count --------------------------------------------------------------------- 100 (1 row) -- Test rebalance operation SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); -- Confirm rebalance -- Shard counts in each node after rebalance SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | imbalanced_table | 2 57638 | imbalanced_table | 2 (2 rows) -- Row count in imbalanced table after rebalance SELECT COUNT(*) FROM imbalanced_table; count --------------------------------------------------------------------- 100 (1 row) DROP TABLE test_schema_support.imbalanced_table; DROP TABLE test_schema_support.imbalanced_table_local; SET citus.shard_replication_factor TO 1; SET citus.shard_count = 4; ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 136; CREATE TABLE colocated_rebalance_test(id integer); CREATE TABLE colocated_rebalance_test2(id integer); SELECT create_distributed_table('colocated_rebalance_test', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) -- make sure that we do not allow shards on target nodes -- that are not eligable to move shards -- Try to move shards to a non-existing node SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', 10000, 'block_writes') FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; ERROR: Moving shards to a non-existing node is not supported HINT: Add the target node via SELECT citus_add_node('localhost', 10000); CALL citus_cleanup_orphaned_resources(); -- Try to move shards to a node where shards are not allowed SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); master_set_node_property --------------------------------------------------------------------- (1 row) SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; ERROR: Moving shards to a node that shouldn't have a shard is not supported HINT: Allow shards on the target node via SELECT * FROM citus_set_node_property('localhost', 57637, 'shouldhaveshards', true); SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); master_set_node_property --------------------------------------------------------------------- (1 row) -- Try to move shards to a non-active node UPDATE pg_dist_node SET isactive = false WHERE nodeport = :worker_1_port; SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; ERROR: Moving shards to a non-active node is not supported HINT: Activate the target node via SELECT citus_activate_node('localhost', 57637); UPDATE pg_dist_node SET isactive = true WHERE nodeport = :worker_1_port; -- Try to move shards to a secondary node UPDATE pg_dist_node SET noderole = 'secondary' WHERE nodeport = :worker_1_port; SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; ERROR: Moving shards to a secondary (e.g., replica) node is not supported UPDATE pg_dist_node SET noderole = 'primary' WHERE nodeport = :worker_1_port; -- Move all shards to worker1 SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; master_move_shard_placement --------------------------------------------------------------------- (2 rows) CALL citus_cleanup_orphaned_resources(); SELECT create_distributed_table('colocated_rebalance_test2', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) -- Confirm all shards for both tables are on worker1 SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 4 57637 | colocated_rebalance_test2 | 4 (2 rows) -- Confirm that the plan for drain_only doesn't show any moves SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0, drain_only := true); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- (0 rows) -- Running with drain_only shouldn't do anything SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); -- Confirm that nothing changed SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 4 57637 | colocated_rebalance_test2 | 4 (2 rows) -- Confirm that the plan shows 2 shards of both tables moving back to worker2 SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- colocated_rebalance_test | 123021 | 0 | localhost | 57637 | localhost | 57638 colocated_rebalance_test2 | 123025 | 0 | localhost | 57637 | localhost | 57638 colocated_rebalance_test | 123022 | 0 | localhost | 57637 | localhost | 57638 colocated_rebalance_test2 | 123026 | 0 | localhost | 57637 | localhost | 57638 (4 rows) -- Confirm that this also happens when using rebalancing by disk size even if the tables are empty SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', rebalance_strategy := 'by_disk_size'); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- colocated_rebalance_test | 123021 | 0 | localhost | 57637 | localhost | 57638 colocated_rebalance_test2 | 123025 | 0 | localhost | 57637 | localhost | 57638 colocated_rebalance_test | 123022 | 0 | localhost | 57637 | localhost | 57638 colocated_rebalance_test2 | 123026 | 0 | localhost | 57637 | localhost | 57638 (4 rows) -- Check that we can call this function SELECT * FROM get_rebalance_progress(); sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size | operation_type | source_lsn | target_lsn | status --------------------------------------------------------------------- (0 rows) -- Actually do the rebalance SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size | operation_type | source_lsn | target_lsn | status --------------------------------------------------------------------- (0 rows) -- Confirm that the shards are now there SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 2 57638 | colocated_rebalance_test | 2 57637 | colocated_rebalance_test2 | 2 57638 | colocated_rebalance_test2 | 2 (4 rows) CALL citus_cleanup_orphaned_resources(); select * from pg_dist_placement ORDER BY placementid; placementid | shardid | shardstate | shardlength | groupid --------------------------------------------------------------------- 138 | 123023 | 1 | 0 | 14 141 | 123024 | 1 | 0 | 14 144 | 123027 | 1 | 0 | 14 145 | 123028 | 1 | 0 | 14 146 | 123021 | 1 | 0 | 16 147 | 123025 | 1 | 0 | 16 148 | 123022 | 1 | 0 | 16 149 | 123026 | 1 | 0 | 16 (8 rows) -- Move all shards to worker1 again SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') FROM pg_dist_shard NATURAL JOIN pg_dist_placement NATURAL JOIN pg_dist_node WHERE nodeport = :worker_2_port AND logicalrelid = 'colocated_rebalance_test'::regclass; master_move_shard_placement --------------------------------------------------------------------- (2 rows) -- Confirm that the shards are now all on worker1 SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 4 57637 | colocated_rebalance_test2 | 4 (2 rows) -- Explicitly don't run citus_cleanup_orphaned_resources, rebalance_table_shards -- should do that for automatically. SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) -- Confirm that the shards are now moved SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 2 57638 | colocated_rebalance_test | 2 57637 | colocated_rebalance_test2 | 2 57638 | colocated_rebalance_test2 | 2 (4 rows) CREATE TABLE non_colocated_rebalance_test(id integer); SELECT create_distributed_table('non_colocated_rebalance_test', 'id', colocate_with := 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) -- confirm that both colocation groups are balanced SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 2 57638 | colocated_rebalance_test | 2 57637 | colocated_rebalance_test2 | 2 57638 | colocated_rebalance_test2 | 2 57637 | non_colocated_rebalance_test | 2 57638 | non_colocated_rebalance_test | 2 (6 rows) -- testing behaviour when setting isdatanode to 'marked for draining' SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); master_set_node_property --------------------------------------------------------------------- (1 row) SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- colocated_rebalance_test | 123021 | 0 | localhost | 57638 | localhost | 57637 colocated_rebalance_test2 | 123025 | 0 | localhost | 57638 | localhost | 57637 colocated_rebalance_test | 123022 | 0 | localhost | 57638 | localhost | 57637 colocated_rebalance_test2 | 123026 | 0 | localhost | 57638 | localhost | 57637 (4 rows) SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 4 57637 | colocated_rebalance_test2 | 4 57637 | non_colocated_rebalance_test | 2 57638 | non_colocated_rebalance_test | 2 (4 rows) SELECT * FROM get_rebalance_table_shards_plan('non_colocated_rebalance_test', threshold := 0); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- non_colocated_rebalance_test | 123030 | 0 | localhost | 57638 | localhost | 57637 non_colocated_rebalance_test | 123032 | 0 | localhost | 57638 | localhost | 57637 (2 rows) SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 4 57637 | colocated_rebalance_test2 | 4 57637 | non_colocated_rebalance_test | 4 (3 rows) -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); master_set_node_property --------------------------------------------------------------------- (1 row) SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 2 57638 | colocated_rebalance_test | 2 57637 | colocated_rebalance_test2 | 2 57638 | colocated_rebalance_test2 | 2 57637 | non_colocated_rebalance_test | 4 (5 rows) SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 2 57638 | colocated_rebalance_test | 2 57637 | colocated_rebalance_test2 | 2 57638 | colocated_rebalance_test2 | 2 57637 | non_colocated_rebalance_test | 2 57638 | non_colocated_rebalance_test | 2 (6 rows) -- testing behaviour when setting shouldhaveshards to false and rebalancing all -- colocation groups with drain_only=true SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); master_set_node_property --------------------------------------------------------------------- (1 row) -- we actually shouldn't need the ORDER BY clause as the output will be in execution order -- but this one involves different colocation groups and which colocation group is first moved is not consistent SELECT * FROM get_rebalance_table_shards_plan(threshold := 0, drain_only := true) ORDER BY shardid; table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- colocated_rebalance_test | 123021 | 0 | localhost | 57638 | localhost | 57637 colocated_rebalance_test | 123022 | 0 | localhost | 57638 | localhost | 57637 colocated_rebalance_test2 | 123025 | 0 | localhost | 57638 | localhost | 57637 colocated_rebalance_test2 | 123026 | 0 | localhost | 57638 | localhost | 57637 non_colocated_rebalance_test | 123029 | 0 | localhost | 57638 | localhost | 57637 non_colocated_rebalance_test | 123030 | 0 | localhost | 57638 | localhost | 57637 (6 rows) SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 4 57637 | colocated_rebalance_test2 | 4 57637 | non_colocated_rebalance_test | 4 (3 rows) -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); master_set_node_property --------------------------------------------------------------------- (1 row) SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 2 57638 | colocated_rebalance_test | 2 57637 | colocated_rebalance_test2 | 2 57638 | colocated_rebalance_test2 | 2 57637 | non_colocated_rebalance_test | 2 57638 | non_colocated_rebalance_test | 2 (6 rows) -- testing behaviour when setting shouldhaveshards to false and rebalancing all -- colocation groups with drain_only=false SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); master_set_node_property --------------------------------------------------------------------- (1 row) -- we actually shouldn't need the ORDER BY clause as the output will be in execution order -- but this one involves different colocation groups and which colocation group is first moved is not consistent SELECT * FROM get_rebalance_table_shards_plan(threshold := 0) ORDER BY shardid; table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- colocated_rebalance_test | 123021 | 0 | localhost | 57638 | localhost | 57637 colocated_rebalance_test | 123022 | 0 | localhost | 57638 | localhost | 57637 colocated_rebalance_test2 | 123025 | 0 | localhost | 57638 | localhost | 57637 colocated_rebalance_test2 | 123026 | 0 | localhost | 57638 | localhost | 57637 non_colocated_rebalance_test | 123029 | 0 | localhost | 57638 | localhost | 57637 non_colocated_rebalance_test | 123030 | 0 | localhost | 57638 | localhost | 57637 (6 rows) SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 4 57637 | colocated_rebalance_test2 | 4 57637 | non_colocated_rebalance_test | 4 (3 rows) -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); master_set_node_property --------------------------------------------------------------------- (1 row) SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 2 57638 | colocated_rebalance_test | 2 57637 | colocated_rebalance_test2 | 2 57638 | colocated_rebalance_test2 | 2 57637 | non_colocated_rebalance_test | 2 57638 | non_colocated_rebalance_test | 2 (6 rows) -- Make it a data node again SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); master_set_node_property --------------------------------------------------------------------- (1 row) -- testing behaviour of master_drain_node SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode := 'block_writes'); master_drain_node --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port; shouldhaveshards --------------------------------------------------------------------- f (1 row) SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 4 57637 | colocated_rebalance_test2 | 4 57637 | non_colocated_rebalance_test | 4 (3 rows) -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); master_set_node_property --------------------------------------------------------------------- (1 row) SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 2 57638 | colocated_rebalance_test | 2 57637 | colocated_rebalance_test2 | 2 57638 | colocated_rebalance_test2 | 2 57637 | non_colocated_rebalance_test | 2 57638 | non_colocated_rebalance_test | 2 (6 rows) -- Drop some tables for clear consistent error DROP TABLE test_schema_support.non_colocated_rebalance_test; DROP TABLE test_schema_support.colocated_rebalance_test2; -- testing behaviour when a transfer fails when using master_drain_node SELECT * from master_drain_node('localhost', :worker_2_port); ERROR: cannot use logical replication to transfer shards of the relation colocated_rebalance_test since it doesn't have a REPLICA IDENTITY or PRIMARY KEY DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY. HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'. -- Make sure shouldhaveshards is false select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port; shouldhaveshards --------------------------------------------------------------------- f (1 row) -- Make sure no actual nodes are moved SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | colocated_rebalance_test | 2 57638 | colocated_rebalance_test | 2 (2 rows) -- Make it a data node again SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); master_set_node_property --------------------------------------------------------------------- (1 row) -- Leave no trace on workers RESET search_path; \set VERBOSITY terse DROP SCHEMA test_schema_support CASCADE; \set VERBOSITY default REVOKE ALL ON SCHEMA public FROM testrole; DROP USER testrole; -- Test costs set citus.shard_count = 4; SET citus.next_shard_id TO 123040; CREATE TABLE tab (x int); SELECT create_distributed_table('tab','x'); create_distributed_table --------------------------------------------------------------------- (1 row) -- The following numbers are chosen such that they are placed on different -- shards. INSERT INTO tab SELECT 1 from generate_series(1, 30000); INSERT INTO tab SELECT 2 from generate_series(1, 10000); INSERT INTO tab SELECT 3 from generate_series(1, 10000); INSERT INTO tab SELECT 6 from generate_series(1, 10000); VACUUM FULL tab; ANALYZE tab; \c - - - :worker_1_port SELECT table_schema, table_name, row_estimate, CASE WHEN total_bytes BETWEEN 1900000 AND 2300000 THEN 2179072 WHEN total_bytes BETWEEN 900000 AND 1200000 THEN 1089536 WHEN total_bytes BETWEEN 300000 AND 440000 THEN 368640 ELSE total_bytes END FROM ( SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM ( SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME , c.reltuples AS row_estimate , pg_total_relation_size(c.oid) AS total_bytes , pg_indexes_size(c.oid) AS index_bytes , pg_total_relation_size(reltoastrelid) AS toast_bytes FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = c.relnamespace WHERE relkind = 'r' AND c.oid NOT IN (SELECT logicalrelid FROM pg_dist_partition) ) a WHERE table_schema = 'public' ) a ORDER BY table_name; table_schema | table_name | row_estimate | total_bytes --------------------------------------------------------------------- public | tab_123040 | 30000 | 1089536 public | tab_123042 | 10000 | 368640 (2 rows) \c - - - :worker_2_port SELECT table_schema, table_name, row_estimate, CASE WHEN total_bytes BETWEEN 1900000 AND 2300000 THEN 2179072 WHEN total_bytes BETWEEN 900000 AND 1200000 THEN 1089536 WHEN total_bytes BETWEEN 300000 AND 440000 THEN 368640 ELSE total_bytes END FROM ( SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM ( SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME , c.reltuples AS row_estimate , pg_total_relation_size(c.oid) AS total_bytes , pg_indexes_size(c.oid) AS index_bytes , pg_total_relation_size(reltoastrelid) AS toast_bytes FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = c.relnamespace WHERE relkind = 'r' AND c.oid NOT IN (SELECT logicalrelid FROM pg_dist_partition) ) a WHERE table_schema = 'public' ) a ORDER BY table_name; table_schema | table_name | row_estimate | total_bytes --------------------------------------------------------------------- public | tab_123041 | 10000 | 368640 public | tab_123043 | 10000 | 368640 (2 rows) \c - - - :master_port SELECT * FROM get_rebalance_table_shards_plan('tab'); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- (0 rows) SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size'); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- tab | 123042 | 0 | localhost | 57637 | localhost | 57638 (1 row) SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', threshold := 0); WARNING: the given threshold is lower than the minimum threshold allowed by the rebalance strategy, using the minimum allowed threshold instead DETAIL: Using threshold of 0.01 table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- tab | 123042 | 0 | localhost | 57637 | localhost | 57638 (1 row) SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | tab | 2 57638 | tab | 2 (2 rows) SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); NOTICE: cleaned up 1 orphaned resources SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | tab | 1 57638 | tab | 3 (2 rows) SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes', threshold := 0); WARNING: the given threshold is lower than the minimum threshold allowed by the rebalance strategy, using the minimum allowed threshold instead DETAIL: Using threshold of 0.01 rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | tab | 1 57638 | tab | 3 (2 rows) -- Check that sizes of colocated tables are added together for rebalances set citus.shard_count = 4; SET citus.next_shard_id TO 123050; CREATE TABLE tab2 (x int); SELECT create_distributed_table('tab2','x', colocate_with := 'tab'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO tab2 SELECT 1 from generate_series(1, 0); INSERT INTO tab2 SELECT 2 from generate_series(1, 60000); INSERT INTO tab2 SELECT 3 from generate_series(1, 10000); INSERT INTO tab2 SELECT 6 from generate_series(1, 10000); VACUUM FULL tab, tab2; ANALYZE tab, tab2; \c - - - :worker_1_port SELECT table_schema, table_name, row_estimate, CASE WHEN total_bytes BETWEEN 1900000 AND 2300000 THEN 2179072 WHEN total_bytes BETWEEN 900000 AND 1200000 THEN 1089536 WHEN total_bytes BETWEEN 300000 AND 440000 THEN 368640 ELSE total_bytes END FROM ( SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM ( SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME , c.reltuples AS row_estimate , pg_total_relation_size(c.oid) AS total_bytes , pg_indexes_size(c.oid) AS index_bytes , pg_total_relation_size(reltoastrelid) AS toast_bytes FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = c.relnamespace WHERE relkind = 'r' AND c.oid NOT IN (SELECT logicalrelid FROM pg_dist_partition) ) a WHERE table_schema = 'public' ) a ORDER BY table_name; table_schema | table_name | row_estimate | total_bytes --------------------------------------------------------------------- public | tab2_123050 | 0 | 0 public | tab_123040 | 30000 | 1089536 (2 rows) \c - - - :worker_2_port SELECT table_schema, table_name, row_estimate, CASE WHEN total_bytes BETWEEN 1900000 AND 2300000 THEN 2179072 WHEN total_bytes BETWEEN 900000 AND 1200000 THEN 1089536 WHEN total_bytes BETWEEN 300000 AND 440000 THEN 368640 ELSE total_bytes END FROM ( SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM ( SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME , c.reltuples AS row_estimate , pg_total_relation_size(c.oid) AS total_bytes , pg_indexes_size(c.oid) AS index_bytes , pg_total_relation_size(reltoastrelid) AS toast_bytes FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = c.relnamespace WHERE relkind = 'r' AND c.oid NOT IN (SELECT logicalrelid FROM pg_dist_partition) ) a WHERE table_schema = 'public' ) a ORDER BY table_name; table_schema | table_name | row_estimate | total_bytes --------------------------------------------------------------------- public | tab2_123051 | 10000 | 368640 public | tab2_123052 | 10000 | 368640 public | tab2_123053 | 60000 | 2179072 public | tab_123041 | 10000 | 368640 public | tab_123042 | 10000 | 368640 public | tab_123043 | 10000 | 368640 (6 rows) \c - - - :master_port -- disk sizes can be slightly different, so ORDER BY shardid gives us a consistent output SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size') ORDER BY shardid; NOTICE: Ignoring move of shard xxxxx from localhost:xxxxx to localhost:xxxxx, because the move only brings a small improvement relative to the shard its size DETAIL: The balance improvement of 0.1xxxxx is lower than the improvement_threshold of 0.5 NOTICE: Ignored 1 moves, all of which are shown in notices above HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.5). table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- tab | 123041 | 0 | localhost | 57638 | localhost | 57637 tab | 123042 | 0 | localhost | 57638 | localhost | 57637 tab2 | 123051 | 0 | localhost | 57638 | localhost | 57637 tab2 | 123052 | 0 | localhost | 57638 | localhost | 57637 (4 rows) -- supports improvement_threshold SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', improvement_threshold := 0); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- tab | 123043 | 0 | localhost | 57638 | localhost | 57637 tab2 | 123053 | 0 | localhost | 57638 | localhost | 57637 tab | 123040 | 0 | localhost | 57637 | localhost | 57638 tab2 | 123050 | 0 | localhost | 57637 | localhost | 57638 (4 rows) SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); NOTICE: Ignoring move of shard xxxxx from localhost:xxxxx to localhost:xxxxx, because the move only brings a small improvement relative to the shard its size DETAIL: The balance improvement of 0.1xxxxx is lower than the improvement_threshold of 0.5 NOTICE: Ignored 1 moves, all of which are shown in notices above HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.5). NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); NOTICE: cleaned up 2 orphaned resources SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | tab | 3 57638 | tab | 1 57637 | tab2 | 3 57638 | tab2 | 1 (4 rows) VACUUM FULL tab, tab2; ANALYZE tab, tab2; \c - - - :worker_1_port SELECT table_schema, table_name, row_estimate, CASE WHEN total_bytes BETWEEN 1900000 AND 2300000 THEN 2179072 WHEN total_bytes BETWEEN 900000 AND 1200000 THEN 1089536 WHEN total_bytes BETWEEN 300000 AND 440000 THEN 368640 ELSE total_bytes END FROM ( SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM ( SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME , c.reltuples AS row_estimate , pg_total_relation_size(c.oid) AS total_bytes , pg_indexes_size(c.oid) AS index_bytes , pg_total_relation_size(reltoastrelid) AS toast_bytes FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = c.relnamespace WHERE relkind = 'r' AND c.oid NOT IN (SELECT logicalrelid FROM pg_dist_partition) ) a WHERE table_schema = 'public' ) a ORDER BY table_name; table_schema | table_name | row_estimate | total_bytes --------------------------------------------------------------------- public | tab2_123050 | 0 | 0 public | tab2_123051 | 10000 | 368640 public | tab2_123052 | 10000 | 368640 public | tab_123040 | 30000 | 1089536 public | tab_123041 | 10000 | 368640 public | tab_123042 | 10000 | 368640 (6 rows) \c - - - :worker_2_port SELECT table_schema, table_name, row_estimate, CASE WHEN total_bytes BETWEEN 1900000 AND 2300000 THEN 2179072 WHEN total_bytes BETWEEN 900000 AND 1200000 THEN 1089536 WHEN total_bytes BETWEEN 300000 AND 440000 THEN 368640 ELSE total_bytes END FROM ( SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM ( SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME , c.reltuples AS row_estimate , pg_total_relation_size(c.oid) AS total_bytes , pg_indexes_size(c.oid) AS index_bytes , pg_total_relation_size(reltoastrelid) AS toast_bytes FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = c.relnamespace WHERE relkind = 'r' AND c.oid NOT IN (SELECT logicalrelid FROM pg_dist_partition) ) a WHERE table_schema = 'public' ) a ORDER BY table_name; table_schema | table_name | row_estimate | total_bytes --------------------------------------------------------------------- public | tab2_123053 | 60000 | 2179072 public | tab_123043 | 10000 | 368640 (2 rows) \c - - - :master_port DROP TABLE tab2; CREATE OR REPLACE FUNCTION capacity_high_worker_2(nodeidarg int) RETURNS real AS $$ SELECT (CASE WHEN nodeport = 57638 THEN 1000 ELSE 1 END)::real FROM pg_dist_node where nodeid = nodeidarg $$ LANGUAGE sql; \set VERBOSITY terse SELECT citus_add_rebalance_strategy( 'capacity_high_worker_2', 'citus_shard_cost_1', 'capacity_high_worker_2', 'citus_shard_allowed_on_node_true', 0 ); citus_add_rebalance_strategy --------------------------------------------------------------------- (1 row) SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_2'); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- tab | 123040 | 0 | localhost | 57637 | localhost | 57638 tab | 123041 | 0 | localhost | 57637 | localhost | 57638 tab | 123042 | 0 | localhost | 57637 | localhost | 57638 (3 rows) SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_2', shard_transfer_mode:='block_writes'); NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); NOTICE: cleaned up 1 orphaned resources SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57638 | tab | 4 (1 row) SELECT citus_set_default_rebalance_strategy('capacity_high_worker_2'); citus_set_default_rebalance_strategy --------------------------------------------------------------------- (1 row) SELECT * FROM get_rebalance_table_shards_plan('tab'); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- (0 rows) SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57638 | tab | 4 (1 row) CREATE OR REPLACE FUNCTION only_worker_1(shardid bigint, nodeidarg int) RETURNS boolean AS $$ SELECT (CASE WHEN nodeport = 57637 THEN TRUE ELSE FALSE END) FROM pg_dist_node where nodeid = nodeidarg $$ LANGUAGE sql; SELECT citus_add_rebalance_strategy( 'only_worker_1', 'citus_shard_cost_1', 'citus_node_capacity_1', 'only_worker_1', 0 ); citus_add_rebalance_strategy --------------------------------------------------------------------- (1 row) SELECT citus_set_default_rebalance_strategy('only_worker_1'); citus_set_default_rebalance_strategy --------------------------------------------------------------------- (1 row) SELECT * FROM get_rebalance_table_shards_plan('tab'); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- tab | 123040 | 0 | localhost | 57638 | localhost | 57637 tab | 123041 | 0 | localhost | 57638 | localhost | 57637 tab | 123042 | 0 | localhost | 57638 | localhost | 57637 tab | 123043 | 0 | localhost | 57638 | localhost | 57637 (4 rows) SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); NOTICE: cleaned up 1 orphaned resources SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- 57637 | tab | 4 (1 row) SELECT citus_set_default_rebalance_strategy('by_shard_count'); citus_set_default_rebalance_strategy --------------------------------------------------------------------- (1 row) SELECT * FROM get_rebalance_table_shards_plan('tab'); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- tab | 123040 | 0 | localhost | 57637 | localhost | 57638 tab | 123041 | 0 | localhost | 57637 | localhost | 57638 (2 rows) -- Check all the error handling cases SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_existing'); ERROR: could not find rebalance strategy with name non_existing SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing'); ERROR: could not find rebalance strategy with name non_existing CALL citus_cleanup_orphaned_resources(); SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing'); ERROR: could not find rebalance strategy with name non_existing CALL citus_cleanup_orphaned_resources(); SELECT citus_set_default_rebalance_strategy('non_existing'); ERROR: strategy with specified name does not exist UPDATE pg_dist_rebalance_strategy SET default_strategy=false; SELECT * FROM get_rebalance_table_shards_plan('tab'); ERROR: no rebalance_strategy was provided, but there is also no default strategy set SELECT * FROM rebalance_table_shards('tab'); ERROR: no rebalance_strategy was provided, but there is also no default strategy set CALL citus_cleanup_orphaned_resources(); SELECT * FROM master_drain_node('localhost', :worker_2_port); ERROR: no rebalance_strategy was provided, but there is also no default strategy set CALL citus_cleanup_orphaned_resources(); UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count'; CREATE OR REPLACE FUNCTION shard_cost_no_arguments() RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql; CREATE OR REPLACE FUNCTION shard_cost_bad_arg_type(text) RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql; CREATE OR REPLACE FUNCTION shard_cost_bad_return_type(bigint) RETURNS int AS $$ SELECT 1 $$ LANGUAGE sql; CREATE OR REPLACE FUNCTION node_capacity_no_arguments() RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql; CREATE OR REPLACE FUNCTION node_capacity_bad_arg_type(text) RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql; CREATE OR REPLACE FUNCTION node_capacity_bad_return_type(int) RETURNS int AS $$ SELECT 1 $$ LANGUAGE sql; CREATE OR REPLACE FUNCTION shard_allowed_on_node_no_arguments() RETURNS boolean AS $$ SELECT true $$ LANGUAGE sql; CREATE OR REPLACE FUNCTION shard_allowed_on_node_bad_arg1(text, int) RETURNS boolean AS $$ SELECT true $$ LANGUAGE sql; CREATE OR REPLACE FUNCTION shard_allowed_on_node_bad_arg2(bigint, text) RETURNS boolean AS $$ SELECT true $$ LANGUAGE sql; CREATE OR REPLACE FUNCTION shard_allowed_on_node_bad_return_type(bigint, int) RETURNS int AS $$ SELECT 1 $$ LANGUAGE sql; SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'shard_cost_no_arguments', 'citus_node_capacity_1', 'citus_shard_allowed_on_node_true', 0 ); ERROR: signature for shard_cost_function is incorrect SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'shard_cost_bad_arg_type', 'citus_node_capacity_1', 'citus_shard_allowed_on_node_true', 0 ); ERROR: signature for shard_cost_function is incorrect SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'shard_cost_bad_return_type', 'citus_node_capacity_1', 'citus_shard_allowed_on_node_true', 0 ); ERROR: signature for shard_cost_function is incorrect SELECT citus_add_rebalance_strategy( 'insert_should_fail', 0, 'citus_node_capacity_1', 'citus_shard_allowed_on_node_true', 0 ); ERROR: cache lookup failed for shard_cost_function with oid 0 SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', 'node_capacity_no_arguments', 'citus_shard_allowed_on_node_true', 0 ); ERROR: signature for node_capacity_function is incorrect SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', 'node_capacity_bad_arg_type', 'citus_shard_allowed_on_node_true', 0 ); ERROR: signature for node_capacity_function is incorrect SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', 'node_capacity_bad_return_type', 'citus_shard_allowed_on_node_true', 0 ); ERROR: signature for node_capacity_function is incorrect SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', 0, 'citus_shard_allowed_on_node_true', 0 ); ERROR: cache lookup failed for node_capacity_function with oid 0 SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', 'citus_node_capacity_1', 'shard_allowed_on_node_no_arguments', 0 ); ERROR: signature for shard_allowed_on_node_function is incorrect SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', 'citus_node_capacity_1', 'shard_allowed_on_node_bad_arg1', 0 ); ERROR: signature for shard_allowed_on_node_function is incorrect SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', 'citus_node_capacity_1', 'shard_allowed_on_node_bad_arg2', 0 ); ERROR: signature for shard_allowed_on_node_function is incorrect SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', 'citus_node_capacity_1', 'shard_allowed_on_node_bad_return_type', 0 ); ERROR: signature for shard_allowed_on_node_function is incorrect SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', 'citus_node_capacity_1', 0, 0 ); ERROR: cache lookup failed for shard_allowed_on_node_function with oid 0 -- Confirm that manual insert/update has the same checks INSERT INTO pg_catalog.pg_dist_rebalance_strategy( name, shard_cost_function, node_capacity_function, shard_allowed_on_node_function, default_threshold ) VALUES ( 'shard_cost_no_arguments', 'shard_cost_no_arguments', 'citus_node_capacity_1', 'citus_shard_allowed_on_node_true', 0 ); ERROR: signature for shard_cost_function is incorrect UPDATE pg_dist_rebalance_strategy SET shard_cost_function='shard_cost_no_arguments' WHERE name='by_disk_size'; ERROR: signature for shard_cost_function is incorrect -- Confirm that only a single default strategy can exist INSERT INTO pg_catalog.pg_dist_rebalance_strategy( name, default_strategy, shard_cost_function, node_capacity_function, shard_allowed_on_node_function, default_threshold ) VALUES ( 'second_default', true, 'citus_shard_cost_1', 'citus_node_capacity_1', 'citus_shard_allowed_on_node_true', 0 ); ERROR: there cannot be two default strategies UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_disk_size'; ERROR: there cannot be two default strategies -- ensure the trigger allows updating the default strategy UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count'; -- Confirm that default strategy should be higher than minimum strategy SELECT citus_add_rebalance_strategy( 'default_threshold_too_low', 'citus_shard_cost_1', 'capacity_high_worker_2', 'citus_shard_allowed_on_node_true', 0, 0.1 ); ERROR: default_threshold cannot be smaller than minimum_threshold -- Make it a data node again SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); master_set_node_property --------------------------------------------------------------------- (1 row) DROP TABLE tab; -- we don't need the coordinator on pg_dist_node anymore SELECT 1 FROM master_remove_node('localhost', :master_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT public.wait_until_metadata_sync(60000); wait_until_metadata_sync --------------------------------------------------------------------- (1 row) -- -- Make sure that rebalance_table_shards() and replicate_table_shards() replicate -- reference tables to the coordinator -- SET client_min_messages TO WARNING; CREATE TABLE dist_table_test_3(a int); SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('dist_table_test_3', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE ref_table(a int); SELECT create_reference_table('ref_table'); create_reference_table --------------------------------------------------------------------- (1 row) SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; count --------------------------------------------------------------------- 2 (1 row) SET citus.shard_replication_factor TO 2; SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes'); ERROR: Table 'dist_table_test_3' is streaming replicated. Shards of streaming replicated tables cannot be copied -- Mark table as coordinator replicated in order to be able to test replicate_table_shards UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid = 'dist_table_test_3'::regclass; SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes'); replicate_table_shards --------------------------------------------------------------------- (1 row) SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; count --------------------------------------------------------------------- 3 (1 row) SELECT 1 FROM master_remove_node('localhost', :master_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT public.wait_until_metadata_sync(30000); wait_until_metadata_sync --------------------------------------------------------------------- (1 row) CREATE TABLE rebalance_test_table(int_column int); SELECT create_distributed_table('rebalance_test_table', 'int_column', 'append'); create_distributed_table --------------------------------------------------------------------- (1 row) CALL create_unbalanced_shards('rebalance_test_table'); SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; count --------------------------------------------------------------------- 2 (1 row) SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='block_writes'); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; count --------------------------------------------------------------------- 3 (1 row) DROP TABLE dist_table_test_3, rebalance_test_table, ref_table; SELECT 1 FROM master_remove_node('localhost', :master_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT public.wait_until_metadata_sync(30000); wait_until_metadata_sync --------------------------------------------------------------------- (1 row) -- reference table 2 will not have a replica identity, causing the rebalancer to not work -- when ran in the default mode. Instead we need to change the shard transfer mode to make -- it work. This verifies the shard transfer mode used in the rebalancer is used for the -- ensurance of reference table existence. CREATE TABLE t1 (a int PRIMARY KEY, b int); CREATE TABLE r1 (a int PRIMARY KEY, b int); CREATE TABLE r2 (a int, b int); -- we remove worker 2 before creating the tables, this will allow us to have an active -- node without the reference tables SELECT 1 from master_remove_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT public.wait_until_metadata_sync(30000); wait_until_metadata_sync --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('t1','a'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_reference_table('r1'); create_reference_table --------------------------------------------------------------------- (1 row) SELECT create_reference_table('r2'); create_reference_table --------------------------------------------------------------------- (1 row) -- add data so to actually copy data when forcing logical replication for reference tables INSERT INTO r1 VALUES (1,2), (3,4); INSERT INTO r2 VALUES (1,2), (3,4); SELECT 1 from master_add_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) -- since r2 has no replica identity we expect an error here SELECT rebalance_table_shards(); ERROR: cannot use logical replication to transfer shards of the relation r2 since it doesn't have a REPLICA IDENTITY or PRIMARY KEY CALL citus_cleanup_orphaned_resources(); DROP TABLE t1, r1, r2; -- verify there are no distributed tables before we perform the following tests. Preceding -- test suites should clean up their distributed tables. SELECT count(*) FROM pg_dist_partition; count --------------------------------------------------------------------- 0 (1 row) -- verify a system having only reference tables will copy the reference tables when -- executing the rebalancer SELECT 1 from master_remove_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT public.wait_until_metadata_sync(30000); wait_until_metadata_sync --------------------------------------------------------------------- (1 row) CREATE TABLE r1 (a int PRIMARY KEY, b int); SELECT create_reference_table('r1'); create_reference_table --------------------------------------------------------------------- (1 row) ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 15; SELECT 1 from master_add_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) -- count the number of placements for the reference table to verify it is not available on -- all nodes SELECT count(*) FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) WHERE logicalrelid = 'r1'::regclass; count --------------------------------------------------------------------- 1 (1 row) -- rebalance with _only_ a reference table, this should trigger the copy SELECT rebalance_table_shards(); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); -- verify the reference table is on all nodes after the rebalance SELECT count(*) FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) WHERE logicalrelid = 'r1'::regclass; count --------------------------------------------------------------------- 2 (1 row) -- cleanup tables DROP TABLE r1; -- lastly we need to verify that reference tables are copied before the replication factor -- of other tables is increased. Without the copy of reference tables the replication might -- fail. SELECT 1 from master_remove_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT public.wait_until_metadata_sync(30000); wait_until_metadata_sync --------------------------------------------------------------------- (1 row) CREATE TABLE t1 (a int PRIMARY KEY, b int); CREATE TABLE r1 (a int PRIMARY KEY, b int); SELECT create_distributed_table('t1', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_reference_table('r1'); create_reference_table --------------------------------------------------------------------- (1 row) SELECT 1 from master_add_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) -- count the number of placements for the reference table to verify it is not available on -- all nodes SELECT count(*) FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) WHERE logicalrelid = 'r1'::regclass; count --------------------------------------------------------------------- 1 (1 row) SELECT replicate_table_shards('t1', shard_replication_factor := 2); ERROR: Table 't1' is streaming replicated. Shards of streaming replicated tables cannot be copied -- Mark table as coordinator replicated in order to be able to test replicate_table_shards UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN ('t1'::regclass); SELECT replicate_table_shards('t1', shard_replication_factor := 2); replicate_table_shards --------------------------------------------------------------------- (1 row) -- verify the reference table is on all nodes after replicate_table_shards SELECT count(*) FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) WHERE logicalrelid = 'r1'::regclass; count --------------------------------------------------------------------- 2 (1 row) DROP TABLE t1, r1; -- Test rebalancer with index on a table DROP TABLE IF EXISTS test_rebalance_with_index; CREATE TABLE test_rebalance_with_index (measureid integer PRIMARY KEY); SELECT create_distributed_table('test_rebalance_with_index', 'measureid'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE INDEX rebalance_with_index ON test_rebalance_with_index(measureid); INSERT INTO test_rebalance_with_index VALUES(0); INSERT INTO test_rebalance_with_index VALUES(1); INSERT INTO test_rebalance_with_index VALUES(2); SELECT * FROM master_drain_node('localhost', :worker_2_port); master_drain_node --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); UPDATE pg_dist_node SET shouldhaveshards=true WHERE nodeport = :worker_2_port; SELECT rebalance_table_shards(); rebalance_table_shards --------------------------------------------------------------------- (1 row) CALL citus_cleanup_orphaned_resources(); DROP TABLE test_rebalance_with_index CASCADE; -- Test rebalancer with disabled worker SET citus.next_shard_id TO 433500; SET citus.shard_replication_factor TO 2; DROP TABLE IF EXISTS test_rebalance_with_disabled_worker; CREATE TABLE test_rebalance_with_disabled_worker (a int); SELECT create_distributed_table('test_rebalance_with_disabled_worker', 'a', colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT citus_disable_node('localhost', :worker_2_port); citus_disable_node --------------------------------------------------------------------- (1 row) SELECT public.wait_until_metadata_sync(30000); wait_until_metadata_sync --------------------------------------------------------------------- (1 row) -- errors out because shard replication factor > shard allowed node count SELECT rebalance_table_shards('test_rebalance_with_disabled_worker'); ERROR: Shard replication factor (2) cannot be greater than number of nodes with should_have_shards=true (1). -- set replication factor to one, and try again SET citus.shard_replication_factor TO 1; SELECT rebalance_table_shards('test_rebalance_with_disabled_worker'); rebalance_table_shards --------------------------------------------------------------------- (1 row) SET citus.shard_replication_factor TO 2; SELECT 1 FROM citus_activate_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) DROP TABLE test_rebalance_with_disabled_worker; -- Test rebalance with all shards excluded DROP TABLE IF EXISTS test_with_all_shards_excluded; CREATE TABLE test_with_all_shards_excluded(a int PRIMARY KEY); SELECT create_distributed_table('test_with_all_shards_excluded', 'a', colocate_with:='none', shard_count:=4); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT shardid FROM pg_dist_shard ORDER BY shardid ASC; shardid --------------------------------------------------------------------- 433504 433505 433506 433507 (4 rows) SELECT rebalance_table_shards('test_with_all_shards_excluded', excluded_shard_list:='{102073, 102074, 102075, 102076}'); rebalance_table_shards --------------------------------------------------------------------- (1 row) DROP TABLE test_with_all_shards_excluded; SET citus.shard_count TO 2; CREATE TABLE "events.Energy Added" (user_id int, time timestamp with time zone, data jsonb, PRIMARY KEY (user_id, time )) PARTITION BY RANGE ("time"); CREATE INDEX idx_btree_hobbies ON "events.Energy Added" USING BTREE ((data->>'location')); SELECT create_distributed_table('"events.Energy Added"', 'user_id'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE "Energy Added_17634" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-13 00:00:00+00') TO ('2018-04-14 00:00:00+00'); CREATE TABLE "Energy Added_17635" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-14 00:00:00+00') TO ('2018-04-15 00:00:00+00'); create table colocated_t1 (a int); select create_distributed_table('colocated_t1','a',colocate_with=>'"events.Energy Added"'); create_distributed_table --------------------------------------------------------------------- (1 row) create table colocated_t2 (a int); select create_distributed_table('colocated_t2','a',colocate_with=>'"events.Energy Added"'); create_distributed_table --------------------------------------------------------------------- (1 row) create table colocated_t3 (a int); select create_distributed_table('colocated_t3','a',colocate_with=>'"events.Energy Added"'); create_distributed_table --------------------------------------------------------------------- (1 row) SET client_min_messages TO DEBUG4; SELECT * FROM get_rebalance_table_shards_plan('colocated_t1', rebalance_strategy := 'by_disk_size'); DEBUG: skipping child tables for relation named: colocated_t1 DEBUG: Size Query: SELECT (SELECT SUM(worker_partitioned_relation_total_size(relid)) FROM (VALUES ('public."events.Energy Added_433508"')) as q(relid)) + (SELECT SUM(pg_total_relation_size(relid)) FROM (VALUES ('public.colocated_t1_433514'), ('public.colocated_t2_433516'), ('public.colocated_t3_433518')) as q(relid)); DEBUG: skipping child tables for relation named: colocated_t1 DEBUG: Size Query: SELECT (SELECT SUM(worker_partitioned_relation_total_size(relid)) FROM (VALUES ('public."events.Energy Added_433508"')) as q(relid)) + (SELECT SUM(pg_total_relation_size(relid)) FROM (VALUES ('public.colocated_t1_433514'), ('public.colocated_t2_433516'), ('public.colocated_t3_433518')) as q(relid)); DEBUG: skipping child tables for relation named: colocated_t1 DEBUG: Size Query: SELECT (SELECT SUM(worker_partitioned_relation_total_size(relid)) FROM (VALUES ('public."events.Energy Added_433509"')) as q(relid)) + (SELECT SUM(pg_total_relation_size(relid)) FROM (VALUES ('public.colocated_t1_433515'), ('public.colocated_t2_433517'), ('public.colocated_t3_433519')) as q(relid)); DEBUG: skipping child tables for relation named: colocated_t1 DEBUG: Size Query: SELECT (SELECT SUM(worker_partitioned_relation_total_size(relid)) FROM (VALUES ('public."events.Energy Added_433509"')) as q(relid)) + (SELECT SUM(pg_total_relation_size(relid)) FROM (VALUES ('public.colocated_t1_433515'), ('public.colocated_t2_433517'), ('public.colocated_t3_433519')) as q(relid)); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- (0 rows) RESET client_min_messages; DROP TABLE "events.Energy Added", colocated_t1, colocated_t2, colocated_t3; RESET citus.shard_count; DROP VIEW table_placements_per_node; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2'; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1'; -- add colocation groups with shard group count < worker count -- the rebalancer should balance those "unbalanced shards" evenly as much as possible SELECT 1 FROM citus_remove_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) create table single_shard_colocation_1a (a int primary key); create table single_shard_colocation_1b (a int primary key); create table single_shard_colocation_1c (a int primary key); SET citus.shard_replication_factor = 1; select create_distributed_table('single_shard_colocation_1a','a', colocate_with => 'none', shard_count => 1); create_distributed_table --------------------------------------------------------------------- (1 row) select create_distributed_table('single_shard_colocation_1b','a',colocate_with=>'single_shard_colocation_1a'); create_distributed_table --------------------------------------------------------------------- (1 row) select create_distributed_table('single_shard_colocation_1c','a',colocate_with=>'single_shard_colocation_1b'); create_distributed_table --------------------------------------------------------------------- (1 row) create table single_shard_colocation_2a (a bigint); create table single_shard_colocation_2b (a bigint); select create_distributed_table('single_shard_colocation_2a','a', colocate_with => 'none', shard_count => 1); create_distributed_table --------------------------------------------------------------------- (1 row) select create_distributed_table('single_shard_colocation_2b','a',colocate_with=>'single_shard_colocation_2a'); create_distributed_table --------------------------------------------------------------------- (1 row) -- all shards are placed on the first worker node SELECT sh.logicalrelid, pl.nodeport FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b') ORDER BY sh.logicalrelid; logicalrelid | nodeport --------------------------------------------------------------------- single_shard_colocation_1a | 57637 single_shard_colocation_1b | 57637 single_shard_colocation_1c | 57637 single_shard_colocation_2a | 57637 single_shard_colocation_2b | 57637 (5 rows) -- add the second node back, then rebalance ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16; select 1 from citus_add_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) select rebalance_table_shards(); NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... rebalance_table_shards --------------------------------------------------------------------- (1 row) -- verify some shards are moved to the new node SELECT sh.logicalrelid, pl.nodeport FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b') ORDER BY sh.logicalrelid; logicalrelid | nodeport --------------------------------------------------------------------- single_shard_colocation_1a | 57638 single_shard_colocation_1b | 57638 single_shard_colocation_1c | 57638 single_shard_colocation_2a | 57637 single_shard_colocation_2b | 57637 (5 rows) DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE; -- test the same with coordinator shouldhaveshards = false and shard_count = 2 -- so that the shard allowed node count would be 2 when rebalancing -- for such cases, we only count the nodes that are allowed for shard placements UPDATE pg_dist_node SET shouldhaveshards=false WHERE nodeport = :master_port; create table two_shard_colocation_1a (a int primary key); create table two_shard_colocation_1b (a int primary key); SET citus.shard_replication_factor = 1; select create_distributed_table('two_shard_colocation_1a','a', colocate_with => 'none', shard_count => 2); create_distributed_table --------------------------------------------------------------------- (1 row) select create_distributed_table('two_shard_colocation_1b','a',colocate_with=>'two_shard_colocation_1a'); create_distributed_table --------------------------------------------------------------------- (1 row) create table two_shard_colocation_2a (a int primary key); create table two_shard_colocation_2b (a int primary key); select create_distributed_table('two_shard_colocation_2a','a', colocate_with => 'none', shard_count => 2); create_distributed_table --------------------------------------------------------------------- (1 row) select create_distributed_table('two_shard_colocation_2b','a',colocate_with=>'two_shard_colocation_2a'); create_distributed_table --------------------------------------------------------------------- (1 row) -- move shards of colocation group 1 to worker1 SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port) FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid WHERE sh.logicalrelid = 'two_shard_colocation_1a'::regclass AND pl.nodeport = :worker_2_port LIMIT 1; citus_move_shard_placement --------------------------------------------------------------------- (1 row) -- move shards of colocation group 2 to worker2 SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port) FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid WHERE sh.logicalrelid = 'two_shard_colocation_2a'::regclass AND pl.nodeport = :worker_1_port LIMIT 1; citus_move_shard_placement --------------------------------------------------------------------- (1 row) -- current state: -- coordinator: [] -- worker 1: [1_1, 1_2] -- worker 2: [2_1, 2_2] SELECT sh.logicalrelid, pl.nodeport FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b') ORDER BY sh.logicalrelid, pl.nodeport; logicalrelid | nodeport --------------------------------------------------------------------- two_shard_colocation_1a | 57637 two_shard_colocation_1a | 57637 two_shard_colocation_1b | 57637 two_shard_colocation_1b | 57637 two_shard_colocation_2a | 57638 two_shard_colocation_2a | 57638 two_shard_colocation_2b | 57638 two_shard_colocation_2b | 57638 (8 rows) -- If we take the coordinator into account, the rebalancer considers this as balanced and does nothing (shard_count < worker_count) -- but because the coordinator is not allowed for shards, rebalancer will distribute each colocation group to both workers select rebalance_table_shards(shard_transfer_mode:='block_writes'); NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... rebalance_table_shards --------------------------------------------------------------------- (1 row) -- final state: -- coordinator: [] -- worker 1: [1_1, 2_1] -- worker 2: [1_2, 2_2] SELECT sh.logicalrelid, pl.nodeport FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b') ORDER BY sh.logicalrelid, pl.nodeport; logicalrelid | nodeport --------------------------------------------------------------------- two_shard_colocation_1a | 57637 two_shard_colocation_1a | 57638 two_shard_colocation_1b | 57637 two_shard_colocation_1b | 57638 two_shard_colocation_2a | 57637 two_shard_colocation_2a | 57638 two_shard_colocation_2b | 57637 two_shard_colocation_2b | 57638 (8 rows) -- cleanup DROP TABLE two_shard_colocation_1a, two_shard_colocation_1b, two_shard_colocation_2a, two_shard_colocation_2b CASCADE; -- verify we detect if one of the tables do not have a replica identity or primary key -- and error out in case of shard transfer mode = auto SELECT 1 FROM citus_remove_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) create table table_with_primary_key (a int primary key); select create_distributed_table('table_with_primary_key','a'); create_distributed_table --------------------------------------------------------------------- (1 row) create table table_without_primary_key (a bigint); select create_distributed_table('table_without_primary_key','a'); create_distributed_table --------------------------------------------------------------------- (1 row) -- add the second node back, then rebalance ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16; select 1 from citus_add_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) select rebalance_table_shards(); ERROR: cannot use logical replication to transfer shards of the relation table_without_primary_key since it doesn't have a REPLICA IDENTITY or PRIMARY KEY DROP TABLE table_with_primary_key, table_without_primary_key; SELECT citus_set_default_rebalance_strategy('by_disk_size'); citus_set_default_rebalance_strategy --------------------------------------------------------------------- (1 row) ALTER SYSTEM RESET citus.rebalancer_by_disk_size_base_cost; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) \c - - - :worker_1_port SET citus.enable_ddl_propagation TO OFF; REVOKE ALL ON SCHEMA public FROM testrole; DROP USER testrole; \c - - - :worker_2_port SET citus.enable_ddl_propagation TO OFF; REVOKE ALL ON SCHEMA public FROM testrole; DROP USER testrole; DROP TABLE test_rebalance_with_disabled_worker_433500, test_rebalance_with_disabled_worker_433501, test_rebalance_with_disabled_worker_433502, test_rebalance_with_disabled_worker_433503;