citus/src/test/regress/expected/shard_rebalancer.out

2316 lines
92 KiB
Plaintext

--
-- MUTLI_SHARD_REBALANCER
--
SET citus.next_shard_id TO 433000;
CREATE TABLE ref_table_test(a int primary key);
SELECT create_reference_table('ref_table_test');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE dist_table_test(a int primary key);
SELECT create_distributed_table('dist_table_test', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE postgres_table_test(a int primary key);
-- make sure that all rebalance operations works fine when
-- reference tables are replicated to the coordinator
SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
NOTICE: Replicating reference table "ref_table_test" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
(1 row)
-- should just be noops even if we add the coordinator to the pg_dist_node
SELECT rebalance_table_shards('dist_table_test');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT rebalance_table_shards();
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
-- test that calling rebalance_table_shards without specifying relation
-- wouldn't move shard of the citus local table.
SET citus.next_shard_id TO 433100;
CREATE TABLE citus_local_table(a int, b int);
SELECT citus_add_local_table_to_metadata('citus_local_table');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
INSERT INTO citus_local_table VALUES (1, 2);
SELECT rebalance_table_shards();
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
-- Check that rebalance_table_shards and get_rebalance_table_shards_plan fail
-- for any type of table, but distributed tables.
SELECT rebalance_table_shards('ref_table_test');
ERROR: table public.ref_table_test is a reference table, moving shard of a reference table is not supported
SELECT rebalance_table_shards('postgres_table_test');
ERROR: table public.postgres_table_test is a regular postgres table, you can only move shards of a citus table
SELECT rebalance_table_shards('citus_local_table');
ERROR: table public.citus_local_table is a local table, moving shard of a local table added to metadata is currently not supported
SELECT get_rebalance_table_shards_plan('ref_table_test');
ERROR: table public.ref_table_test is a reference table, moving shard of a reference table is not supported
SELECT get_rebalance_table_shards_plan('postgres_table_test');
ERROR: table public.postgres_table_test is a regular postgres table, you can only move shards of a citus table
SELECT get_rebalance_table_shards_plan('citus_local_table');
ERROR: table public.citus_local_table is a local table, moving shard of a local table added to metadata is currently not supported
-- Check that citus_move_shard_placement fails for shards belonging reference
-- tables or citus local tables
SELECT citus_move_shard_placement(433000, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: table public.ref_table_test is a reference table, moving shard of a reference table is not supported
SELECT citus_move_shard_placement(433100, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: table public.citus_local_table is a local table, moving shard of a local table added to metadata is currently not supported
-- show that citus local table shard is still on the coordinator
SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%';
tablename
---------------------------------------------------------------------
citus_local_table_433100
(1 row)
-- also check that we still can access shard relation, not the shell table
SELECT count(*) FROM citus_local_table;
count
---------------------------------------------------------------------
1
(1 row)
-- verify drain_node uses the localhostname guc by seeing it fail to connect to a non-existing name
ALTER SYSTEM SET citus.local_hostname TO 'foobar';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT master_drain_node('localhost', :master_port);
ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: <system specific error>
CALL citus_cleanup_orphaned_shards();
ALTER SYSTEM RESET citus.local_hostname;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT master_drain_node('localhost', :master_port);
master_drain_node
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
-- show that citus local table shard is still on the coordinator
SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%';
tablename
---------------------------------------------------------------------
citus_local_table_433100
(1 row)
-- also check that we still can access shard relation, not the shell table
SELECT count(*) FROM citus_local_table;
count
---------------------------------------------------------------------
1
(1 row)
-- show that we do not create a shard rebalancing plan for citus local table
SELECT get_rebalance_table_shards_plan();
get_rebalance_table_shards_plan
---------------------------------------------------------------------
(0 rows)
DROP TABLE citus_local_table;
CREATE TABLE dist_table_test_2(a int);
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('dist_table_test_2', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Mark tables as coordinator replicated in order to be able to test replicate_table_shards
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN
('dist_table_test_2'::regclass);
-- replicate_table_shards should fail when the hostname GUC is set to a non-reachable node
ALTER SYSTEM SET citus.local_hostname TO 'foobar';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC
pg_sleep
---------------------------------------------------------------------
(1 row)
SET citus.shard_replication_factor TO 2;
SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes');
ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: <system specific error>
ALTER SYSTEM RESET citus.local_hostname;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC
pg_sleep
---------------------------------------------------------------------
(1 row)
-- replicate reference table should ignore the coordinator
SET citus.shard_replication_factor TO 2;
SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes');
NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
replicate_table_shards
---------------------------------------------------------------------
(1 row)
DROP TABLE dist_table_test, dist_table_test_2, ref_table_test, postgres_table_test;
RESET citus.shard_count;
RESET citus.shard_replication_factor;
-- Create a user to test multiuser usage of rebalancer functions
-- We explicitely don't create this user on worker nodes yet, so we can
-- test some more error handling. We create them later there.
SET citus.enable_create_role_propagation TO OFF;
CREATE USER testrole;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
GRANT ALL ON SCHEMA public TO testrole;
ERROR: role "testrole" does not exist
CONTEXT: while executing command on localhost:xxxxx
CREATE OR REPLACE FUNCTION shard_placement_rebalance_array(
worker_node_list json[],
shard_placement_list json[],
threshold float4 DEFAULT 0,
max_shard_moves int DEFAULT 1000000,
drain_only bool DEFAULT false,
improvement_threshold float4 DEFAULT 0.5
)
RETURNS json[]
AS 'citus'
LANGUAGE C STRICT VOLATILE;
CREATE FUNCTION shard_placement_replication_array(worker_node_list json[],
shard_placement_list json[],
shard_replication_factor int)
RETURNS json[]
AS 'citus'
LANGUAGE C STRICT VOLATILE;
CREATE FUNCTION worker_node_responsive(worker_node_name text, worker_node_port int)
RETURNS boolean
AS 'citus'
LANGUAGE C STRICT VOLATILE;
-- this function is dropped in Citus10, added here for tests
CREATE OR REPLACE FUNCTION pg_catalog.master_create_distributed_table(table_name regclass,
distribution_column text,
distribution_method citus.distribution_type)
RETURNS void
LANGUAGE C STRICT
AS 'citus', $$master_create_distributed_table$$;
COMMENT ON FUNCTION pg_catalog.master_create_distributed_table(table_name regclass,
distribution_column text,
distribution_method citus.distribution_type)
IS 'define the table distribution functions';
-- this function is dropped in Citus10, added here for tests
CREATE OR REPLACE FUNCTION pg_catalog.master_create_worker_shards(table_name text, shard_count integer,
replication_factor integer DEFAULT 2)
RETURNS void
AS 'citus', $$master_create_worker_shards$$
LANGUAGE C STRICT;
SET citus.next_shard_id TO 123000;
SELECT worker_node_responsive(node_name, node_port::int)
FROM master_get_active_worker_nodes()
ORDER BY node_name, node_port ASC;
worker_node_responsive
---------------------------------------------------------------------
t
t
(2 rows)
-- Check that worker_node_responsive returns false for dead nodes
-- Note that PostgreSQL tries all possible resolutions of localhost on failing
-- connections. This causes different error details to be printed on different
-- environments. Therefore, we first set verbosity to terse.
\set VERBOSITY terse
SELECT worker_node_responsive('localhost', 1);
worker_node_responsive
---------------------------------------------------------------------
f
(1 row)
\set VERBOSITY default
-- Check that with threshold=0.0 shard_placement_rebalance_array returns enough
-- moves to make the cluster completely balanced.
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "hostname1"}',
'{"node_name": "hostname2"}']::json[],
ARRAY['{"shardid":1, "nodename":"hostname1"}',
'{"shardid":2, "nodename":"hostname1"}',
'{"shardid":3, "nodename":"hostname1"}',
'{"shardid":4, "nodename":"hostname1"}',
'{"shardid":5, "nodename":"hostname1"}',
'{"shardid":6, "nodename":"hostname2"}']::json[],
0.0
));
unnest
---------------------------------------------------------------------
{"updatetype":1,"shardid":1,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname2","targetport":5432}
{"updatetype":1,"shardid":2,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname2","targetport":5432}
(2 rows)
-- Check that with two nodes and threshold=1.0 shard_placement_rebalance_array
-- doesn't return any moves, even if it is completely unbalanced.
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "hostname1"}',
'{"node_name": "hostname2"}']::json[],
ARRAY['{"shardid":1, "nodename":"hostname1"}',
'{"shardid":2, "nodename":"hostname1"}',
'{"shardid":3, "nodename":"hostname1"}']::json[],
1.0
));
unnest
---------------------------------------------------------------------
(0 rows)
-- Check that with three nodes and threshold=1.0
-- shard_placement_rebalance_array returns moves when it is completely unbalanced
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "hostname1"}',
'{"node_name": "hostname2"}',
'{"node_name": "hostname3"}'
]::json[],
ARRAY['{"shardid":1, "nodename":"hostname1"}',
'{"shardid":2, "nodename":"hostname1"}',
'{"shardid":3, "nodename":"hostname1"}']::json[],
1.0
));
unnest
---------------------------------------------------------------------
{"updatetype":1,"shardid":1,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname2","targetport":5432}
(1 row)
-- Check that with with three nodes and threshold=2.0
-- shard_placement_rebalance_array doesn't return any moves, even if it is
-- completely unbalanced. (with three nodes)
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "hostname1"}',
'{"node_name": "hostname2"}',
'{"node_name": "hostname3"}'
]::json[],
ARRAY['{"shardid":1, "nodename":"hostname1"}',
'{"shardid":2, "nodename":"hostname1"}',
'{"shardid":3, "nodename":"hostname1"}']::json[],
2.0
));
unnest
---------------------------------------------------------------------
(0 rows)
-- Check that with threshold=0.0 shard_placement_rebalance_array doesn't return
-- any moves if the cluster is already balanced.
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "hostname1"}',
'{"node_name": "hostname2"}']::json[],
ARRAY['{"shardid":1, "nodename":"hostname1"}',
'{"shardid":2, "nodename":"hostname1"}',
'{"shardid":3, "nodename":"hostname1"}',
'{"shardid":4, "nodename":"hostname2"}',
'{"shardid":5, "nodename":"hostname2"}',
'{"shardid":6, "nodename":"hostname2"}']::json[],
0.0
));
unnest
---------------------------------------------------------------------
(0 rows)
-- Check that shard_placement_replication_array returns a shard copy operation
-- for each of the shards in an inactive node.
SELECT unnest(shard_placement_replication_array(
ARRAY['{"node_name": "hostname1"}',
'{"node_name": "hostname2"}']::json[],
ARRAY['{"shardid":1, "nodename":"hostname1"}',
'{"shardid":2, "nodename":"hostname2"}',
'{"shardid":1, "nodename":"hostname3"}',
'{"shardid":2, "nodename":"hostname3"}']::json[],
2
));
unnest
---------------------------------------------------------------------
{"updatetype":2,"shardid":1,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname2","targetport":5432}
{"updatetype":2,"shardid":2,"sourcename":"hostname2","sourceport":5432,"targetname":"hostname1","targetport":5432}
(2 rows)
-- Check that shard_placement_replication_array returns a shard copy operation
-- for each of the inactive shards.
SELECT unnest(shard_placement_replication_array(
ARRAY['{"node_name": "hostname1"}',
'{"node_name": "hostname2"}']::json[],
ARRAY['{"shardid":1, "nodename":"hostname1"}',
'{"shardid":2, "shardstate":3, "nodename":"hostname1"}',
'{"shardid":1, "shardstate":3, "nodename":"hostname2"}',
'{"shardid":2, "nodename":"hostname2"}']::json[],
2
));
unnest
---------------------------------------------------------------------
{"updatetype":2,"shardid":1,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname2","targetport":5432}
{"updatetype":2,"shardid":2,"sourcename":"hostname2","sourceport":5432,"targetname":"hostname1","targetport":5432}
(2 rows)
-- Check that shard_placement_replication_array errors out if all placements of
-- a shard are placed on inactive nodes.
SELECT unnest(shard_placement_replication_array(
ARRAY['{"node_name": "hostname1"}']::json[],
ARRAY['{"shardid":1, "nodename":"hostname2"}',
'{"shardid":1, "nodename":"hostname3"}']::json[],
2
));
ERROR: could not find a source for shard xxxxx
-- Check that shard_placement_replication_array errors out if replication factor
-- is more than number of active nodes.
SELECT unnest(shard_placement_replication_array(
ARRAY['{"node_name": "hostname1"}']::json[],
ARRAY['{"shardid":1, "nodename":"hostname1"}']::json[],
2
));
ERROR: could not find a target for shard xxxxx
-- Ensure that shard_replication_factor is 2 during replicate_table_shards
-- and rebalance_table_shards tests
SET citus.shard_replication_factor TO 2;
-- Turn off NOTICE messages
SET client_min_messages TO WARNING;
-- Create a single-row test data for shard rebalancer test shards
CREATE TABLE shard_rebalancer_test_data AS SELECT 1::int as int_column;
-- Test replicate_table_shards, which will in turn test update_shard_placement
-- in copy mode.
CREATE TABLE replication_test_table(int_column int);
SELECT master_create_distributed_table('replication_test_table', 'int_column', 'append');
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE VIEW replication_test_table_placements_per_node AS
SELECT count(*) FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard
WHERE logicalrelid = 'replication_test_table'::regclass
AND shardstate != 4
GROUP BY nodename, nodeport
ORDER BY nodename, nodeport;
-- Create four shards with replication factor 2, and delete the placements
-- with smaller port number to simulate under-replicated shards.
SELECT count(master_create_empty_shard('replication_test_table'))
FROM generate_series(1, 4);
count
---------------------------------------------------------------------
4
(1 row)
DELETE FROM pg_dist_shard_placement WHERE placementid in (
SELECT pg_dist_shard_placement.placementid
FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard
WHERE logicalrelid = 'replication_test_table'::regclass
AND (nodename, nodeport) = (SELECT nodename, nodeport FROM pg_dist_shard_placement
ORDER BY nodename, nodeport limit 1)
);
-- Upload the test data to the shards
\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
-- Verify that there is one node with all placements
SELECT * FROM replication_test_table_placements_per_node;
count
---------------------------------------------------------------------
4
(1 row)
-- Check excluded_shard_list by excluding three shards with smaller ids
SELECT replicate_table_shards('replication_test_table',
excluded_shard_list := excluded_shard_list,
shard_transfer_mode:='block_writes')
FROM (
SELECT (array_agg(DISTINCT shardid ORDER BY shardid))[1:3] AS excluded_shard_list
FROM pg_dist_shard
WHERE logicalrelid = 'replication_test_table'::regclass
) T;
replicate_table_shards
---------------------------------------------------------------------
(1 row)
SELECT * FROM replication_test_table_placements_per_node;
count
---------------------------------------------------------------------
1
4
(2 rows)
-- Check that with shard_replication_factor=1 we don't do any copies
SELECT replicate_table_shards('replication_test_table',
shard_replication_factor := 1,
shard_transfer_mode:='block_writes');
replicate_table_shards
---------------------------------------------------------------------
(1 row)
SELECT * FROM replication_test_table_placements_per_node;
count
---------------------------------------------------------------------
1
4
(2 rows)
-- Check that max_shard_copies limits number of copy operations
SELECT replicate_table_shards('replication_test_table',
max_shard_copies := 2,
shard_transfer_mode:='block_writes');
replicate_table_shards
---------------------------------------------------------------------
(1 row)
SELECT * FROM replication_test_table_placements_per_node;
count
---------------------------------------------------------------------
3
4
(2 rows)
-- Replicate the remaining under-replicated shards
SELECT replicate_table_shards('replication_test_table');
replicate_table_shards
---------------------------------------------------------------------
(1 row)
SELECT * FROM replication_test_table_placements_per_node;
count
---------------------------------------------------------------------
4
4
(2 rows)
-- Check that querying the table doesn't error out
SELECT count(*) FROM replication_test_table;
count
---------------------------------------------------------------------
4
(1 row)
DROP TABLE public.replication_test_table CASCADE;
-- Test rebalance_table_shards, which will in turn test update_shard_placement
-- in move mode.
CREATE TABLE rebalance_test_table(int_column int);
SELECT master_create_distributed_table('rebalance_test_table', 'int_column', 'append');
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE VIEW table_placements_per_node AS
SELECT nodeport, logicalrelid::regclass, count(*)
FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard
WHERE shardstate != 4
GROUP BY logicalrelid::regclass, nodename, nodeport
ORDER BY logicalrelid::regclass, nodename, nodeport;
-- Create six shards with replication factor 1 and move them to the same
-- node to create an unbalanced cluster.
CREATE PROCEDURE create_unbalanced_shards(rel text)
LANGUAGE SQL
AS $$
SET citus.shard_replication_factor TO 1;
SELECT count(master_create_empty_shard(rel))
FROM generate_series(1, 6);
SELECT count(master_move_shard_placement(shardid,
src.nodename, src.nodeport::int,
dst.nodename, dst.nodeport::int,
shard_transfer_mode:='block_writes'))
FROM pg_dist_shard s JOIN
pg_dist_shard_placement src USING (shardid),
(SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst
WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass;
CALL citus_cleanup_orphaned_shards();
$$;
CALL create_unbalanced_shards('rebalance_test_table');
SET citus.shard_replication_factor TO 2;
-- Upload the test data to the shards
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
-- Verify that there is one node with all placements
SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57638 | rebalance_test_table | 6
(1 row)
-- check rebalances use the localhost guc by seeing it fail when the GUC is set to a non-existing host
ALTER SYSTEM SET citus.local_hostname TO 'foobar';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT rebalance_table_shards('rebalance_test_table',
excluded_shard_list := excluded_shard_list,
threshold := 0,
shard_transfer_mode:='block_writes')
FROM (
SELECT (array_agg(DISTINCT shardid ORDER BY shardid))[1:4] AS excluded_shard_list
FROM pg_dist_shard
WHERE logicalrelid = 'rebalance_test_table'::regclass
) T;
ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: <system specific error>
CALL citus_cleanup_orphaned_shards();
ALTER SYSTEM RESET citus.local_hostname;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC
pg_sleep
---------------------------------------------------------------------
(1 row)
-- Check excluded_shard_list by excluding four shards with smaller ids
SELECT rebalance_table_shards('rebalance_test_table',
excluded_shard_list := excluded_shard_list,
threshold := 0,
shard_transfer_mode:='block_writes')
FROM (
SELECT (array_agg(DISTINCT shardid ORDER BY shardid))[1:4] AS excluded_shard_list
FROM pg_dist_shard
WHERE logicalrelid = 'rebalance_test_table'::regclass
) T;
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | rebalance_test_table | 1
57638 | rebalance_test_table | 5
(2 rows)
-- Check that max_shard_moves limits number of move operations
-- First check that we error if not table owner
-- Turn on NOTICE messages
SET ROLE testrole;
-- Make sure that rebalance is stopped if source or target nodes are
-- unresponsive.
SELECT rebalance_table_shards('rebalance_test_table',
shard_transfer_mode:='block_writes');
ERROR: target node localhost:xxxxx is not responsive
\c - - - :worker_1_port
SET citus.enable_create_role_propagation TO OFF;
CREATE USER testrole;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
GRANT ALL ON SCHEMA public TO testrole;
\c - - - :master_port
SET client_min_messages TO WARNING;
SET ROLE testrole;
SELECT rebalance_table_shards('rebalance_test_table',
shard_transfer_mode:='block_writes');
ERROR: source node localhost:xxxxx is not responsive
\c - - - :worker_2_port
SET citus.enable_create_role_propagation TO OFF;
CREATE USER testrole;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
GRANT ALL ON SCHEMA public TO testrole;
\c - - - :master_port
SET client_min_messages TO WARNING;
SET citus.next_shard_id TO 123010;
SET ROLE testrole;
SELECT rebalance_table_shards('rebalance_test_table',
shard_transfer_mode:='block_writes');
ERROR: must be owner of table rebalance_test_table
CONTEXT: while executing command on localhost:xxxxx
RESET ROLE;
-- Confirm no moves took place at all during these errors
SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | rebalance_test_table | 1
57638 | rebalance_test_table | 5
(2 rows)
CALL citus_cleanup_orphaned_shards();
SELECT rebalance_table_shards('rebalance_test_table',
threshold := 0, max_shard_moves := 1,
shard_transfer_mode:='block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | rebalance_test_table | 2
57638 | rebalance_test_table | 4
(2 rows)
-- Check that threshold=1 doesn't move any shards
SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_transfer_mode:='block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | rebalance_test_table | 2
57638 | rebalance_test_table | 4
(2 rows)
-- Move the remaining shards using threshold=0
SELECT rebalance_table_shards('rebalance_test_table', threshold := 0);
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | rebalance_test_table | 3
57638 | rebalance_test_table | 3
(2 rows)
-- Check that shard is completely balanced and rebalancing again doesn't have
-- any effects.
SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_transfer_mode:='block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | rebalance_test_table | 3
57638 | rebalance_test_table | 3
(2 rows)
-- Check that querying the table doesn't error out
SELECT count(*) FROM rebalance_test_table;
count
---------------------------------------------------------------------
6
(1 row)
DROP TABLE rebalance_test_table;
-- Test schema support
CREATE SCHEMA test_schema_support;
SELECT COUNT(*) FROM pg_dist_shard_placement;
count
---------------------------------------------------------------------
0
(1 row)
CREATE TABLE test_schema_support.nation_hash (
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152)
);
SELECT master_create_distributed_table('test_schema_support.nation_hash', 'n_nationkey', 'hash');
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT master_create_worker_shards('test_schema_support.nation_hash', 4, 1);
master_create_worker_shards
---------------------------------------------------------------------
(1 row)
CREATE TABLE test_schema_support.nation_hash2 (
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152)
);
SELECT master_create_distributed_table('test_schema_support.nation_hash2', 'n_nationkey', 'hash');
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT master_create_worker_shards('test_schema_support.nation_hash2', 4, 1);
master_create_worker_shards
---------------------------------------------------------------------
(1 row)
-- Shard count before replication
SELECT COUNT(*) FROM pg_dist_shard_placement;
count
---------------------------------------------------------------------
8
(1 row)
SET search_path TO public;
SELECT replicate_table_shards('test_schema_support.nation_hash', shard_transfer_mode:='block_writes');
replicate_table_shards
---------------------------------------------------------------------
(1 row)
-- Confirm replication
SELECT COUNT(*) FROM pg_dist_shard_placement;
count
---------------------------------------------------------------------
12
(1 row)
-- Test with search_path is set
SET search_path TO test_schema_support;
SELECT replicate_table_shards('nation_hash2', shard_transfer_mode:='block_writes');
replicate_table_shards
---------------------------------------------------------------------
(1 row)
-- Confirm replication
SELECT COUNT(*) FROM pg_dist_shard_placement;
count
---------------------------------------------------------------------
16
(1 row)
DROP TABLE test_schema_support.nation_hash;
DROP TABLE test_schema_support.nation_hash2;
-- Test rebalancer with schema
-- Next few operations is to create imbalanced distributed table
CREATE TABLE test_schema_support.imbalanced_table_local (
id integer not null
);
INSERT INTO test_schema_support.imbalanced_table_local VALUES(1);
INSERT INTO test_schema_support.imbalanced_table_local VALUES(2);
INSERT INTO test_schema_support.imbalanced_table_local VALUES(3);
INSERT INTO test_schema_support.imbalanced_table_local VALUES(4);
CREATE TABLE test_schema_support.imbalanced_table (
id integer not null
);
SELECT master_create_distributed_table('test_schema_support.imbalanced_table', 'id', 'append');
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
SET citus.shard_replication_factor TO 1;
SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset
COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid);
SET citus.shard_replication_factor TO 2;
SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset
COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid);
SET citus.shard_replication_factor TO 1;
SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset
COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid);
-- imbalanced_table is now imbalanced
-- Shard counts in each node before rebalance
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | imbalanced_table | 1
57638 | imbalanced_table | 3
(2 rows)
-- Row count in imbalanced table before rebalance
SELECT COUNT(*) FROM imbalanced_table;
count
---------------------------------------------------------------------
12
(1 row)
-- Try force_logical
SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical');
ERROR: the force_logical transfer mode is currently unsupported
CALL citus_cleanup_orphaned_shards();
-- Test rebalance operation
SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
-- Confirm rebalance
-- Shard counts in each node after rebalance
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | imbalanced_table | 2
57638 | imbalanced_table | 2
(2 rows)
-- Row count in imbalanced table after rebalance
SELECT COUNT(*) FROM imbalanced_table;
count
---------------------------------------------------------------------
12
(1 row)
DROP TABLE public.shard_rebalancer_test_data;
DROP TABLE test_schema_support.imbalanced_table;
DROP TABLE test_schema_support.imbalanced_table_local;
SET citus.shard_replication_factor TO 1;
CREATE TABLE colocated_rebalance_test(id integer);
CREATE TABLE colocated_rebalance_test2(id integer);
SELECT create_distributed_table('colocated_rebalance_test', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- make sure that we do not allow shards on target nodes
-- that are not eligable to move shards
-- Try to move shards to a non-existing node
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', 10000, 'block_writes')
FROM pg_dist_shard_placement
WHERE nodeport = :worker_2_port;
ERROR: Moving shards to a non-existing node is not supported
HINT: Add the target node via SELECT citus_add_node('localhost', 10000);
CALL citus_cleanup_orphaned_shards();
-- Try to move shards to a node where shards are not allowed
SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
master_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes')
FROM pg_dist_shard_placement
WHERE nodeport = :worker_2_port;
ERROR: Moving shards to a node that shouldn't have a shard is not supported
HINT: Allow shards on the target node via SELECT * FROM citus_set_node_property('localhost', 57637, 'shouldhaveshards', true);
SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true);
master_set_node_property
---------------------------------------------------------------------
(1 row)
-- Try to move shards to a non-active node
UPDATE pg_dist_node SET isactive = false WHERE nodeport = :worker_1_port;
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes')
FROM pg_dist_shard_placement
WHERE nodeport = :worker_2_port;
ERROR: Moving shards to a non-active node is not supported
HINT: Activate the target node via SELECT citus_activate_node('localhost', 57637);
UPDATE pg_dist_node SET isactive = true WHERE nodeport = :worker_1_port;
-- Try to move shards to a secondary node
UPDATE pg_dist_node SET noderole = 'secondary' WHERE nodeport = :worker_1_port;
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes')
FROM pg_dist_shard_placement
WHERE nodeport = :worker_2_port;
ERROR: Moving shards to a secondary (e.g., replica) node is not supported
UPDATE pg_dist_node SET noderole = 'primary' WHERE nodeport = :worker_1_port;
-- Move all shards to worker1
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes')
FROM pg_dist_shard_placement
WHERE nodeport = :worker_2_port;
master_move_shard_placement
---------------------------------------------------------------------
(2 rows)
CALL citus_cleanup_orphaned_shards();
SELECT create_distributed_table('colocated_rebalance_test2', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Confirm all shards for both tables are on worker1
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 4
57637 | colocated_rebalance_test2 | 4
(2 rows)
-- Confirm that the plan for drain_only doesn't show any moves
SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0, drain_only := true);
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
(0 rows)
-- Running with drain_only shouldn't do anything
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true);
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
-- Confirm that nothing changed
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 4
57637 | colocated_rebalance_test2 | 4
(2 rows)
-- Confirm that the plan shows 2 shards of both tables moving back to worker2
SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0);
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
colocated_rebalance_test | 123021 | 0 | localhost | 57637 | localhost | 57638
colocated_rebalance_test2 | 123025 | 0 | localhost | 57637 | localhost | 57638
colocated_rebalance_test | 123022 | 0 | localhost | 57637 | localhost | 57638
colocated_rebalance_test2 | 123026 | 0 | localhost | 57637 | localhost | 57638
(4 rows)
-- Confirm that this also happens when using rebalancing by disk size even if the tables are empty
SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', rebalance_strategy := 'by_disk_size');
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
colocated_rebalance_test | 123021 | 0 | localhost | 57637 | localhost | 57638
colocated_rebalance_test2 | 123025 | 0 | localhost | 57637 | localhost | 57638
colocated_rebalance_test | 123022 | 0 | localhost | 57637 | localhost | 57638
colocated_rebalance_test2 | 123026 | 0 | localhost | 57637 | localhost | 57638
(4 rows)
-- Check that we can call this function
SELECT * FROM get_rebalance_progress();
sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size
---------------------------------------------------------------------
(0 rows)
-- Actually do the rebalance
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
-- Check that we can call this function without a crash
SELECT * FROM get_rebalance_progress();
sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size
---------------------------------------------------------------------
(0 rows)
-- Confirm that the shards are now there
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 2
57638 | colocated_rebalance_test | 2
57637 | colocated_rebalance_test2 | 2
57638 | colocated_rebalance_test2 | 2
(4 rows)
CALL citus_cleanup_orphaned_shards();
select * from pg_dist_placement ORDER BY placementid;
placementid | shardid | shardstate | shardlength | groupid
---------------------------------------------------------------------
151 | 123023 | 1 | 0 | 14
154 | 123024 | 1 | 0 | 14
157 | 123027 | 1 | 0 | 14
158 | 123028 | 1 | 0 | 14
159 | 123021 | 1 | 0 | 16
160 | 123025 | 1 | 0 | 16
161 | 123022 | 1 | 0 | 16
162 | 123026 | 1 | 0 | 16
(8 rows)
-- Move all shards to worker1 again
SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes')
FROM pg_dist_shard NATURAL JOIN pg_dist_placement NATURAL JOIN pg_dist_node
WHERE nodeport = :worker_2_port AND logicalrelid = 'colocated_rebalance_test'::regclass;
master_move_shard_placement
---------------------------------------------------------------------
(2 rows)
-- Confirm that the shards are now all on worker1
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 4
57637 | colocated_rebalance_test2 | 4
(2 rows)
-- Explicitly don't run citus_cleanup_orphaned_shards, rebalance_table_shards
-- should do that for automatically.
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
-- Confirm that the shards are now moved
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 2
57638 | colocated_rebalance_test | 2
57637 | colocated_rebalance_test2 | 2
57638 | colocated_rebalance_test2 | 2
(4 rows)
CREATE TABLE non_colocated_rebalance_test(id integer);
SELECT create_distributed_table('non_colocated_rebalance_test', 'id', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- confirm that both colocation groups are balanced
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 2
57638 | colocated_rebalance_test | 2
57637 | colocated_rebalance_test2 | 2
57638 | colocated_rebalance_test2 | 2
57637 | non_colocated_rebalance_test | 2
57638 | non_colocated_rebalance_test | 2
(6 rows)
-- testing behaviour when setting isdatanode to 'marked for draining'
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
master_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0);
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
colocated_rebalance_test | 123021 | 0 | localhost | 57638 | localhost | 57637
colocated_rebalance_test2 | 123025 | 0 | localhost | 57638 | localhost | 57637
colocated_rebalance_test | 123022 | 0 | localhost | 57638 | localhost | 57637
colocated_rebalance_test2 | 123026 | 0 | localhost | 57638 | localhost | 57637
(4 rows)
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 4
57637 | colocated_rebalance_test2 | 4
57637 | non_colocated_rebalance_test | 2
57638 | non_colocated_rebalance_test | 2
(4 rows)
SELECT * FROM get_rebalance_table_shards_plan('non_colocated_rebalance_test', threshold := 0);
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
non_colocated_rebalance_test | 123030 | 0 | localhost | 57638 | localhost | 57637
non_colocated_rebalance_test | 123032 | 0 | localhost | 57638 | localhost | 57637
(2 rows)
SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 4
57637 | colocated_rebalance_test2 | 4
57637 | non_colocated_rebalance_test | 4
(3 rows)
-- Put shards back
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
master_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 2
57638 | colocated_rebalance_test | 2
57637 | colocated_rebalance_test2 | 2
57638 | colocated_rebalance_test2 | 2
57637 | non_colocated_rebalance_test | 4
(5 rows)
SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 2
57638 | colocated_rebalance_test | 2
57637 | colocated_rebalance_test2 | 2
57638 | colocated_rebalance_test2 | 2
57637 | non_colocated_rebalance_test | 2
57638 | non_colocated_rebalance_test | 2
(6 rows)
-- testing behaviour when setting shouldhaveshards to false and rebalancing all
-- colocation groups with drain_only=true
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
master_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT * FROM get_rebalance_table_shards_plan(threshold := 0, drain_only := true);
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
colocated_rebalance_test | 123021 | 0 | localhost | 57638 | localhost | 57637
colocated_rebalance_test2 | 123025 | 0 | localhost | 57638 | localhost | 57637
colocated_rebalance_test | 123022 | 0 | localhost | 57638 | localhost | 57637
colocated_rebalance_test2 | 123026 | 0 | localhost | 57638 | localhost | 57637
non_colocated_rebalance_test | 123029 | 0 | localhost | 57638 | localhost | 57637
non_colocated_rebalance_test | 123030 | 0 | localhost | 57638 | localhost | 57637
(6 rows)
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true);
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 4
57637 | colocated_rebalance_test2 | 4
57637 | non_colocated_rebalance_test | 4
(3 rows)
-- Put shards back
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
master_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 2
57638 | colocated_rebalance_test | 2
57637 | colocated_rebalance_test2 | 2
57638 | colocated_rebalance_test2 | 2
57637 | non_colocated_rebalance_test | 2
57638 | non_colocated_rebalance_test | 2
(6 rows)
-- testing behaviour when setting shouldhaveshards to false and rebalancing all
-- colocation groups with drain_only=false
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
master_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT * FROM get_rebalance_table_shards_plan(threshold := 0);
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
colocated_rebalance_test | 123021 | 0 | localhost | 57638 | localhost | 57637
colocated_rebalance_test2 | 123025 | 0 | localhost | 57638 | localhost | 57637
colocated_rebalance_test | 123022 | 0 | localhost | 57638 | localhost | 57637
colocated_rebalance_test2 | 123026 | 0 | localhost | 57638 | localhost | 57637
non_colocated_rebalance_test | 123029 | 0 | localhost | 57638 | localhost | 57637
non_colocated_rebalance_test | 123030 | 0 | localhost | 57638 | localhost | 57637
(6 rows)
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 4
57637 | colocated_rebalance_test2 | 4
57637 | non_colocated_rebalance_test | 4
(3 rows)
-- Put shards back
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
master_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 2
57638 | colocated_rebalance_test | 2
57637 | colocated_rebalance_test2 | 2
57638 | colocated_rebalance_test2 | 2
57637 | non_colocated_rebalance_test | 2
57638 | non_colocated_rebalance_test | 2
(6 rows)
-- Make it a data node again
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
master_set_node_property
---------------------------------------------------------------------
(1 row)
-- testing behaviour of master_drain_node
SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode := 'block_writes');
master_drain_node
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port;
shouldhaveshards
---------------------------------------------------------------------
f
(1 row)
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 4
57637 | colocated_rebalance_test2 | 4
57637 | non_colocated_rebalance_test | 4
(3 rows)
-- Put shards back
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
master_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | colocated_rebalance_test | 2
57638 | colocated_rebalance_test | 2
57637 | colocated_rebalance_test2 | 2
57638 | colocated_rebalance_test2 | 2
57637 | non_colocated_rebalance_test | 2
57638 | non_colocated_rebalance_test | 2
(6 rows)
-- Drop some tables for clear consistent error
DROP TABLE test_schema_support.colocated_rebalance_test2;
-- Leave no trace on workers
RESET search_path;
\set VERBOSITY terse
DROP SCHEMA test_schema_support CASCADE;
\set VERBOSITY default
REVOKE ALL ON SCHEMA public FROM testrole;
DROP USER testrole;
-- Test costs
set citus.shard_count = 4;
SET citus.next_shard_id TO 123040;
CREATE TABLE tab (x int);
SELECT create_distributed_table('tab','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- The following numbers are chosen such that they are placed on different
-- shards.
INSERT INTO tab SELECT 1 from generate_series(1, 30000);
INSERT INTO tab SELECT 2 from generate_series(1, 10000);
INSERT INTO tab SELECT 3 from generate_series(1, 10000);
INSERT INTO tab SELECT 6 from generate_series(1, 10000);
VACUUM FULL tab;
ANALYZE tab;
\c - - - :worker_1_port
SELECT table_schema, table_name, row_estimate, total_bytes
FROM (
SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM (
SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME
, c.reltuples AS row_estimate
, pg_total_relation_size(c.oid) AS total_bytes
, pg_indexes_size(c.oid) AS index_bytes
, pg_total_relation_size(reltoastrelid) AS toast_bytes
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relkind = 'r'
) a
WHERE table_schema = 'public'
) a ORDER BY table_name;
table_schema | table_name | row_estimate | total_bytes
---------------------------------------------------------------------
public | tab_123040 | 30000 | 1089536
public | tab_123042 | 10000 | 368640
(2 rows)
\c - - - :worker_2_port
SELECT table_schema, table_name, row_estimate, total_bytes
FROM (
SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM (
SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME
, c.reltuples AS row_estimate
, pg_total_relation_size(c.oid) AS total_bytes
, pg_indexes_size(c.oid) AS index_bytes
, pg_total_relation_size(reltoastrelid) AS toast_bytes
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relkind = 'r'
) a
WHERE table_schema = 'public'
) a ORDER BY table_name;
table_schema | table_name | row_estimate | total_bytes
---------------------------------------------------------------------
public | tab_123041 | 10000 | 368640
public | tab_123043 | 10000 | 368640
(2 rows)
\c - - - :master_port
SELECT * FROM get_rebalance_table_shards_plan('tab');
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
(0 rows)
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size');
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
tab | 123042 | 0 | localhost | 57637 | localhost | 57638
(1 row)
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', threshold := 0);
WARNING: the given threshold is lower than the minimum threshold allowed by the rebalance strategy, using the minimum allowed threshold instead
DETAIL: Using threshold of 0.01
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
tab | 123042 | 0 | localhost | 57637 | localhost | 57638
(1 row)
SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | tab | 2
57638 | tab | 2
(2 rows)
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes');
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 1 orphaned shards
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | tab | 1
57638 | tab | 3
(2 rows)
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes', threshold := 0);
WARNING: the given threshold is lower than the minimum threshold allowed by the rebalance strategy, using the minimum allowed threshold instead
DETAIL: Using threshold of 0.01
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | tab | 1
57638 | tab | 3
(2 rows)
-- Check that sizes of colocated tables are added together for rebalances
set citus.shard_count = 4;
SET citus.next_shard_id TO 123050;
CREATE TABLE tab2 (x int);
SELECT create_distributed_table('tab2','x', colocate_with := 'tab');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO tab2 SELECT 1 from generate_series(1, 0);
INSERT INTO tab2 SELECT 2 from generate_series(1, 60000);
INSERT INTO tab2 SELECT 3 from generate_series(1, 10000);
INSERT INTO tab2 SELECT 6 from generate_series(1, 10000);
VACUUM FULL tab, tab2;
ANALYZE tab, tab2;
\c - - - :worker_1_port
SELECT table_schema, table_name, row_estimate, total_bytes
FROM (
SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM (
SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME
, c.reltuples AS row_estimate
, pg_total_relation_size(c.oid) AS total_bytes
, pg_indexes_size(c.oid) AS index_bytes
, pg_total_relation_size(reltoastrelid) AS toast_bytes
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relkind = 'r'
) a
WHERE table_schema = 'public'
) a ORDER BY table_name;
table_schema | table_name | row_estimate | total_bytes
---------------------------------------------------------------------
public | tab2_123050 | 0 | 0
public | tab_123040 | 30000 | 1089536
(2 rows)
\c - - - :worker_2_port
SELECT table_schema, table_name, row_estimate, total_bytes
FROM (
SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM (
SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME
, c.reltuples AS row_estimate
, pg_total_relation_size(c.oid) AS total_bytes
, pg_indexes_size(c.oid) AS index_bytes
, pg_total_relation_size(reltoastrelid) AS toast_bytes
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relkind = 'r'
) a
WHERE table_schema = 'public'
) a ORDER BY table_name;
table_schema | table_name | row_estimate | total_bytes
---------------------------------------------------------------------
public | tab2_123051 | 10000 | 368640
public | tab2_123052 | 10000 | 368640
public | tab2_123053 | 60000 | 2179072
public | tab_123041 | 10000 | 368640
public | tab_123042 | 10000 | 368640
public | tab_123043 | 10000 | 368640
(6 rows)
\c - - - :master_port
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size');
NOTICE: Ignoring move of shard xxxxx from localhost:xxxxx to localhost:xxxxx, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.151125 is lower than the improvement_threshold of 0.5
NOTICE: Ignored 1 moves, all of which are shown in notices above
HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.5).
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
tab | 123041 | 0 | localhost | 57638 | localhost | 57637
tab2 | 123051 | 0 | localhost | 57638 | localhost | 57637
tab | 123042 | 0 | localhost | 57638 | localhost | 57637
tab2 | 123052 | 0 | localhost | 57638 | localhost | 57637
(4 rows)
-- supports improvement_threshold
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', improvement_threshold := 0);
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
tab | 123043 | 0 | localhost | 57638 | localhost | 57637
tab2 | 123053 | 0 | localhost | 57638 | localhost | 57637
tab | 123040 | 0 | localhost | 57637 | localhost | 57638
tab2 | 123050 | 0 | localhost | 57637 | localhost | 57638
(4 rows)
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes');
NOTICE: Ignoring move of shard xxxxx from localhost:xxxxx to localhost:xxxxx, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.151125 is lower than the improvement_threshold of 0.5
NOTICE: Ignored 1 moves, all of which are shown in notices above
HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.5).
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 4 orphaned shards
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | tab | 3
57638 | tab | 1
57637 | tab2 | 3
57638 | tab2 | 1
(4 rows)
VACUUM FULL tab, tab2;
ANALYZE tab, tab2;
\c - - - :worker_1_port
SELECT table_schema, table_name, row_estimate, total_bytes
FROM (
SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM (
SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME
, c.reltuples AS row_estimate
, pg_total_relation_size(c.oid) AS total_bytes
, pg_indexes_size(c.oid) AS index_bytes
, pg_total_relation_size(reltoastrelid) AS toast_bytes
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relkind = 'r'
) a
WHERE table_schema = 'public'
) a ORDER BY table_name;
table_schema | table_name | row_estimate | total_bytes
---------------------------------------------------------------------
public | tab2_123050 | 0 | 0
public | tab2_123051 | 10000 | 368640
public | tab2_123052 | 10000 | 368640
public | tab_123040 | 30000 | 1089536
public | tab_123041 | 10000 | 368640
public | tab_123042 | 10000 | 368640
(6 rows)
\c - - - :worker_2_port
SELECT table_schema, table_name, row_estimate, total_bytes
FROM (
SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM (
SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME
, c.reltuples AS row_estimate
, pg_total_relation_size(c.oid) AS total_bytes
, pg_indexes_size(c.oid) AS index_bytes
, pg_total_relation_size(reltoastrelid) AS toast_bytes
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relkind = 'r'
) a
WHERE table_schema = 'public'
) a ORDER BY table_name;
table_schema | table_name | row_estimate | total_bytes
---------------------------------------------------------------------
public | tab2_123053 | 60000 | 2179072
public | tab_123043 | 10000 | 368640
(2 rows)
\c - - - :master_port
DROP TABLE tab2;
CREATE OR REPLACE FUNCTION capacity_high_worker_2(nodeidarg int)
RETURNS real AS $$
SELECT
(CASE WHEN nodeport = 57638 THEN 1000 ELSE 1 END)::real
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
\set VERBOSITY terse
SELECT citus_add_rebalance_strategy(
'capacity_high_worker_2',
'citus_shard_cost_1',
'capacity_high_worker_2',
'citus_shard_allowed_on_node_true',
0
);
citus_add_rebalance_strategy
---------------------------------------------------------------------
(1 row)
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_2');
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
tab | 123040 | 0 | localhost | 57637 | localhost | 57638
tab | 123041 | 0 | localhost | 57637 | localhost | 57638
tab | 123042 | 0 | localhost | 57637 | localhost | 57638
(3 rows)
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_2', shard_transfer_mode:='block_writes');
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 3 orphaned shards
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57638 | tab | 4
(1 row)
SELECT citus_set_default_rebalance_strategy('capacity_high_worker_2');
citus_set_default_rebalance_strategy
---------------------------------------------------------------------
(1 row)
SELECT * FROM get_rebalance_table_shards_plan('tab');
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
(0 rows)
SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57638 | tab | 4
(1 row)
CREATE FUNCTION only_worker_1(shardid bigint, nodeidarg int)
RETURNS boolean AS $$
SELECT
(CASE WHEN nodeport = 57637 THEN TRUE ELSE FALSE END)
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
SELECT citus_add_rebalance_strategy(
'only_worker_1',
'citus_shard_cost_1',
'citus_node_capacity_1',
'only_worker_1',
0
);
citus_add_rebalance_strategy
---------------------------------------------------------------------
(1 row)
SELECT citus_set_default_rebalance_strategy('only_worker_1');
citus_set_default_rebalance_strategy
---------------------------------------------------------------------
(1 row)
SELECT * FROM get_rebalance_table_shards_plan('tab');
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
tab | 123040 | 0 | localhost | 57638 | localhost | 57637
tab | 123041 | 0 | localhost | 57638 | localhost | 57637
tab | 123042 | 0 | localhost | 57638 | localhost | 57637
tab | 123043 | 0 | localhost | 57638 | localhost | 57637
(4 rows)
SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes');
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 4 orphaned shards
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | tab | 4
(1 row)
SELECT citus_set_default_rebalance_strategy('by_shard_count');
citus_set_default_rebalance_strategy
---------------------------------------------------------------------
(1 row)
SELECT * FROM get_rebalance_table_shards_plan('tab');
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
tab | 123040 | 0 | localhost | 57637 | localhost | 57638
tab | 123041 | 0 | localhost | 57637 | localhost | 57638
(2 rows)
-- Check all the error handling cases
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_existing');
ERROR: could not find rebalance strategy with name non_existing
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing');
ERROR: could not find rebalance strategy with name non_existing
CALL citus_cleanup_orphaned_shards();
SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing');
ERROR: could not find rebalance strategy with name non_existing
CALL citus_cleanup_orphaned_shards();
SELECT citus_set_default_rebalance_strategy('non_existing');
ERROR: strategy with specified name does not exist
UPDATE pg_dist_rebalance_strategy SET default_strategy=false;
SELECT * FROM get_rebalance_table_shards_plan('tab');
ERROR: no rebalance_strategy was provided, but there is also no default strategy set
SELECT * FROM rebalance_table_shards('tab');
ERROR: no rebalance_strategy was provided, but there is also no default strategy set
CALL citus_cleanup_orphaned_shards();
SELECT * FROM master_drain_node('localhost', :worker_2_port);
ERROR: no rebalance_strategy was provided, but there is also no default strategy set
CALL citus_cleanup_orphaned_shards();
UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count';
CREATE OR REPLACE FUNCTION shard_cost_no_arguments()
RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION shard_cost_bad_arg_type(text)
RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION shard_cost_bad_return_type(bigint)
RETURNS int AS $$ SELECT 1 $$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION node_capacity_no_arguments()
RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION node_capacity_bad_arg_type(text)
RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION node_capacity_bad_return_type(int)
RETURNS int AS $$ SELECT 1 $$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION shard_allowed_on_node_no_arguments()
RETURNS boolean AS $$ SELECT true $$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION shard_allowed_on_node_bad_arg1(text, int)
RETURNS boolean AS $$ SELECT true $$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION shard_allowed_on_node_bad_arg2(bigint, text)
RETURNS boolean AS $$ SELECT true $$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION shard_allowed_on_node_bad_return_type(bigint, int)
RETURNS int AS $$ SELECT 1 $$ LANGUAGE sql;
SELECT citus_add_rebalance_strategy(
'insert_should_fail',
'shard_cost_no_arguments',
'citus_node_capacity_1',
'citus_shard_allowed_on_node_true',
0
);
ERROR: signature for shard_cost_function is incorrect
SELECT citus_add_rebalance_strategy(
'insert_should_fail',
'shard_cost_bad_arg_type',
'citus_node_capacity_1',
'citus_shard_allowed_on_node_true',
0
);
ERROR: signature for shard_cost_function is incorrect
SELECT citus_add_rebalance_strategy(
'insert_should_fail',
'shard_cost_bad_return_type',
'citus_node_capacity_1',
'citus_shard_allowed_on_node_true',
0
);
ERROR: signature for shard_cost_function is incorrect
SELECT citus_add_rebalance_strategy(
'insert_should_fail',
0,
'citus_node_capacity_1',
'citus_shard_allowed_on_node_true',
0
);
ERROR: cache lookup failed for shard_cost_function with oid 0
SELECT citus_add_rebalance_strategy(
'insert_should_fail',
'citus_shard_cost_1',
'node_capacity_no_arguments',
'citus_shard_allowed_on_node_true',
0
);
ERROR: signature for node_capacity_function is incorrect
SELECT citus_add_rebalance_strategy(
'insert_should_fail',
'citus_shard_cost_1',
'node_capacity_bad_arg_type',
'citus_shard_allowed_on_node_true',
0
);
ERROR: signature for node_capacity_function is incorrect
SELECT citus_add_rebalance_strategy(
'insert_should_fail',
'citus_shard_cost_1',
'node_capacity_bad_return_type',
'citus_shard_allowed_on_node_true',
0
);
ERROR: signature for node_capacity_function is incorrect
SELECT citus_add_rebalance_strategy(
'insert_should_fail',
'citus_shard_cost_1',
0,
'citus_shard_allowed_on_node_true',
0
);
ERROR: cache lookup failed for node_capacity_function with oid 0
SELECT citus_add_rebalance_strategy(
'insert_should_fail',
'citus_shard_cost_1',
'citus_node_capacity_1',
'shard_allowed_on_node_no_arguments',
0
);
ERROR: signature for shard_allowed_on_node_function is incorrect
SELECT citus_add_rebalance_strategy(
'insert_should_fail',
'citus_shard_cost_1',
'citus_node_capacity_1',
'shard_allowed_on_node_bad_arg1',
0
);
ERROR: signature for shard_allowed_on_node_function is incorrect
SELECT citus_add_rebalance_strategy(
'insert_should_fail',
'citus_shard_cost_1',
'citus_node_capacity_1',
'shard_allowed_on_node_bad_arg2',
0
);
ERROR: signature for shard_allowed_on_node_function is incorrect
SELECT citus_add_rebalance_strategy(
'insert_should_fail',
'citus_shard_cost_1',
'citus_node_capacity_1',
'shard_allowed_on_node_bad_return_type',
0
);
ERROR: signature for shard_allowed_on_node_function is incorrect
SELECT citus_add_rebalance_strategy(
'insert_should_fail',
'citus_shard_cost_1',
'citus_node_capacity_1',
0,
0
);
ERROR: cache lookup failed for shard_allowed_on_node_function with oid 0
-- Confirm that manual insert/update has the same checks
INSERT INTO
pg_catalog.pg_dist_rebalance_strategy(
name,
shard_cost_function,
node_capacity_function,
shard_allowed_on_node_function,
default_threshold
) VALUES (
'shard_cost_no_arguments',
'shard_cost_no_arguments',
'citus_node_capacity_1',
'citus_shard_allowed_on_node_true',
0
);
ERROR: signature for shard_cost_function is incorrect
UPDATE pg_dist_rebalance_strategy SET shard_cost_function='shard_cost_no_arguments' WHERE name='by_disk_size';
ERROR: signature for shard_cost_function is incorrect
-- Confirm that only a single default strategy can exist
INSERT INTO
pg_catalog.pg_dist_rebalance_strategy(
name,
default_strategy,
shard_cost_function,
node_capacity_function,
shard_allowed_on_node_function,
default_threshold
) VALUES (
'second_default',
true,
'citus_shard_cost_1',
'citus_node_capacity_1',
'citus_shard_allowed_on_node_true',
0
);
ERROR: there cannot be two default strategies
UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_disk_size';
ERROR: there cannot be two default strategies
-- ensure the trigger allows updating the default strategy
UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count';
-- Confirm that default strategy should be higher than minimum strategy
SELECT citus_add_rebalance_strategy(
'default_threshold_too_low',
'citus_shard_cost_1',
'capacity_high_worker_2',
'citus_shard_allowed_on_node_true',
0,
0.1
);
ERROR: default_threshold cannot be smaller than minimum_threshold
-- Make it a data node again
SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
master_set_node_property
---------------------------------------------------------------------
(1 row)
DROP TABLE tab;
-- we don't need the coordinator on pg_dist_node anymore
SELECT 1 FROM master_remove_node('localhost', :master_port);
?column?
---------------------------------------------------------------------
1
(1 row)
--
-- Make sure that rebalance_table_shards() and replicate_table_shards() replicate
-- reference tables to the coordinator when replicate_reference_tables_on_activate
-- is off.
--
SET citus.replicate_reference_tables_on_activate TO off;
SET client_min_messages TO WARNING;
CREATE TABLE dist_table_test_3(a int);
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('dist_table_test_3', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE ref_table(a int);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
count
---------------------------------------------------------------------
2
(1 row)
SET citus.shard_replication_factor TO 2;
SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes');
ERROR: Table 'dist_table_test_3' is streaming replicated. Shards of streaming replicated tables cannot be copied
-- Mark table as coordinator replicated in order to be able to test replicate_table_shards
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN
('dist_table_test_3'::regclass);
SELECT replicate_table_shards('dist_table_test_3', max_shard_copies := 4, shard_transfer_mode:='block_writes');
replicate_table_shards
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
count
---------------------------------------------------------------------
3
(1 row)
SELECT 1 FROM master_remove_node('localhost', :master_port);
?column?
---------------------------------------------------------------------
1
(1 row)
CREATE TABLE rebalance_test_table(int_column int);
SELECT master_create_distributed_table('rebalance_test_table', 'int_column', 'append');
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
CALL create_unbalanced_shards('rebalance_test_table');
SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
count
---------------------------------------------------------------------
2
(1 row)
SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
count
---------------------------------------------------------------------
3
(1 row)
DROP TABLE dist_table_test_3, rebalance_test_table, ref_table;
SELECT 1 FROM master_remove_node('localhost', :master_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- reference table 2 will not have a replica identity, causing the rebalancer to not work
-- when ran in the default mode. Instead we need to change the shard transfer mode to make
-- it work. This verifies the shard transfer mode used in the rebalancer is used for the
-- ensurance of reference table existence.
CREATE TABLE t1 (a int PRIMARY KEY, b int);
CREATE TABLE r1 (a int PRIMARY KEY, b int);
CREATE TABLE r2 (a int, b int);
-- we remove worker 2 before creating the tables, this will allow us to have an active
-- node without the reference tables
SELECT 1 from master_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT create_distributed_table('t1','a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('r1');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('r2');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- add data so to actually copy data when forcing logical replication for reference tables
INSERT INTO r1 VALUES (1,2), (3,4);
INSERT INTO r2 VALUES (1,2), (3,4);
SELECT 1 from master_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT rebalance_table_shards();
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
DROP TABLE t1, r1, r2;
-- verify there are no distributed tables before we perform the following tests. Preceding
-- test suites should clean up their distributed tables.
SELECT count(*) FROM pg_dist_partition;
count
---------------------------------------------------------------------
0
(1 row)
-- verify a system having only reference tables will copy the reference tables when
-- executing the rebalancer
SELECT 1 from master_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
CREATE TABLE r1 (a int PRIMARY KEY, b int);
SELECT create_reference_table('r1');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT 1 from master_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- count the number of placements for the reference table to verify it is not available on
-- all nodes
SELECT count(*)
FROM pg_dist_shard
JOIN pg_dist_shard_placement USING (shardid)
WHERE logicalrelid = 'r1'::regclass;
count
---------------------------------------------------------------------
1
(1 row)
-- rebalance with _only_ a reference table, this should trigger the copy
SELECT rebalance_table_shards();
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
-- verify the reference table is on all nodes after the rebalance
SELECT count(*)
FROM pg_dist_shard
JOIN pg_dist_shard_placement USING (shardid)
WHERE logicalrelid = 'r1'::regclass;
count
---------------------------------------------------------------------
2
(1 row)
-- cleanup tables
DROP TABLE r1;
-- lastly we need to verify that reference tables are copied before the replication factor
-- of other tables is increased. Without the copy of reference tables the replication might
-- fail.
SELECT 1 from master_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
CREATE TABLE t1 (a int PRIMARY KEY, b int);
CREATE TABLE r1 (a int PRIMARY KEY, b int);
SELECT create_distributed_table('t1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('r1');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT 1 from master_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- count the number of placements for the reference table to verify it is not available on
-- all nodes
SELECT count(*)
FROM pg_dist_shard
JOIN pg_dist_shard_placement USING (shardid)
WHERE logicalrelid = 'r1'::regclass;
count
---------------------------------------------------------------------
1
(1 row)
SELECT replicate_table_shards('t1', shard_replication_factor := 2);
ERROR: Table 't1' is streaming replicated. Shards of streaming replicated tables cannot be copied
-- Mark table as coordinator replicated in order to be able to test replicate_table_shards
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid IN
('t1'::regclass);
SELECT replicate_table_shards('t1', shard_replication_factor := 2);
replicate_table_shards
---------------------------------------------------------------------
(1 row)
-- verify the reference table is on all nodes after replicate_table_shards
SELECT count(*)
FROM pg_dist_shard
JOIN pg_dist_shard_placement USING (shardid)
WHERE logicalrelid = 'r1'::regclass;
count
---------------------------------------------------------------------
2
(1 row)
DROP TABLE t1, r1;