Fail early when shard can't be safely moved to a new node (#7467)

DESCRIPTION: citus_move_shard_placement now fails early when shard
cannot be safely moved

The implementation is quite simplistic -
`citus_move_shard_placement(...)` will fail with an error if there's any
new node in the cluster that doesn't have reference tables yet.

It could have been finer-grained, i.e. erroring only when trying to move
a shard to an unitialized node. Looking at the related functions -
`replicate_reference_tables()` or `citus_rebalance_start()`, I think
it's acceptable behaviour. These other functions also treat "any"
unitialized node as a temporary anomaly.

Fixes #7426

---------

Co-authored-by: Jelte Fennema-Nio <jelte.fennema@microsoft.com>
pull/7478/head^2
Filip Sedlák 2024-02-07 13:04:52 +01:00 committed by GitHub
parent 9ff8436f14
commit 6869b3ad10
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 116 additions and 0 deletions

View File

@ -294,6 +294,17 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
EnsureCoordinator(); EnsureCoordinator();
List *referenceTableIdList = NIL;
if (HasNodesWithMissingReferenceTables(&referenceTableIdList))
{
ereport(ERROR, (errmsg("there are missing reference tables on some nodes"),
errhint("Copy reference tables first with "
"replicate_reference_tables() or use "
"citus_rebalance_start() that will do it automatically."
)));
}
int64 shardId = PG_GETARG_INT64(0); int64 shardId = PG_GETARG_INT64(0);
char *sourceNodeName = text_to_cstring(PG_GETARG_TEXT_P(1)); char *sourceNodeName = text_to_cstring(PG_GETARG_TEXT_P(1));
int32 sourceNodePort = PG_GETARG_INT32(2); int32 sourceNodePort = PG_GETARG_INT32(2);

View File

@ -2395,6 +2395,74 @@ SELECT count(*) FROM pg_dist_partition;
0 0
(1 row) (1 row)
-- verify a system with a new node won't copy distributed table shards without reference tables
SELECT 1 from master_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT public.wait_until_metadata_sync(30000);
wait_until_metadata_sync
---------------------------------------------------------------------
(1 row)
CREATE TABLE r1 (a int PRIMARY KEY, b int);
SELECT create_reference_table('r1');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE d1 (a int PRIMARY KEY, b int);
SELECT create_distributed_table('d1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 15;
SELECT 1 from master_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- count the number of placements for the reference table to verify it is not available on
-- all nodes
SELECT count(*)
FROM pg_dist_shard
JOIN pg_dist_shard_placement USING (shardid)
WHERE logicalrelid = 'r1'::regclass;
count
---------------------------------------------------------------------
1
(1 row)
-- #7426 We can't move shards to the fresh node before we copy reference tables there.
-- rebalance_table_shards() will do the copy, but the low-level
-- citus_move_shard_placement() should raise an error
SELECT citus_move_shard_placement(pg_dist_shard.shardid, nodename, nodeport, 'localhost', :worker_2_port)
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
WHERE logicalrelid = 'd1'::regclass AND nodename = 'localhost' AND nodeport = :worker_1_port LIMIT 1;
ERROR: there are missing reference tables on some nodes
SELECT replicate_reference_tables();
replicate_reference_tables
---------------------------------------------------------------------
(1 row)
-- After replication, the move should succeed.
SELECT citus_move_shard_placement(pg_dist_shard.shardid, nodename, nodeport, 'localhost', :worker_2_port)
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
WHERE logicalrelid = 'd1'::regclass AND nodename = 'localhost' AND nodeport = :worker_1_port LIMIT 1;
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
DROP TABLE d1, r1;
-- verify a system having only reference tables will copy the reference tables when -- verify a system having only reference tables will copy the reference tables when
-- executing the rebalancer -- executing the rebalancer
SELECT 1 from master_remove_node('localhost', :worker_2_port); SELECT 1 from master_remove_node('localhost', :worker_2_port);

View File

@ -1340,6 +1340,43 @@ DROP TABLE t1, r1, r2;
-- test suites should clean up their distributed tables. -- test suites should clean up their distributed tables.
SELECT count(*) FROM pg_dist_partition; SELECT count(*) FROM pg_dist_partition;
-- verify a system with a new node won't copy distributed table shards without reference tables
SELECT 1 from master_remove_node('localhost', :worker_2_port);
SELECT public.wait_until_metadata_sync(30000);
CREATE TABLE r1 (a int PRIMARY KEY, b int);
SELECT create_reference_table('r1');
CREATE TABLE d1 (a int PRIMARY KEY, b int);
SELECT create_distributed_table('d1', 'a');
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 15;
SELECT 1 from master_add_node('localhost', :worker_2_port);
-- count the number of placements for the reference table to verify it is not available on
-- all nodes
SELECT count(*)
FROM pg_dist_shard
JOIN pg_dist_shard_placement USING (shardid)
WHERE logicalrelid = 'r1'::regclass;
-- #7426 We can't move shards to the fresh node before we copy reference tables there.
-- rebalance_table_shards() will do the copy, but the low-level
-- citus_move_shard_placement() should raise an error
SELECT citus_move_shard_placement(pg_dist_shard.shardid, nodename, nodeport, 'localhost', :worker_2_port)
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
WHERE logicalrelid = 'd1'::regclass AND nodename = 'localhost' AND nodeport = :worker_1_port LIMIT 1;
SELECT replicate_reference_tables();
-- After replication, the move should succeed.
SELECT citus_move_shard_placement(pg_dist_shard.shardid, nodename, nodeport, 'localhost', :worker_2_port)
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
WHERE logicalrelid = 'd1'::regclass AND nodename = 'localhost' AND nodeport = :worker_1_port LIMIT 1;
DROP TABLE d1, r1;
-- verify a system having only reference tables will copy the reference tables when -- verify a system having only reference tables will copy the reference tables when
-- executing the rebalancer -- executing the rebalancer