mirror of https://github.com/citusdata/citus.git
Check before logicalrep for rebalancer, error if needed (#6754)
DESCRIPTION: Check before logicalrep for rebalancer, error if needed Check if we can use logical replication or not, in case of shard transfer mode = auto, before executing the shard moves. If we can't, error out. Before this PR, we used to error out in the middle of shard moves: ```sql set citus.shard_count = 4; -- just to get the error sooner select citus_remove_node('localhost',9702); create table t1 (a int primary key); select create_distributed_table('t1','a'); create table t2 (a bigint); select create_distributed_table('t2','a'); select citus_add_node('localhost',9702); select rebalance_table_shards(); NOTICE: Moving shard 102008 from localhost:9701 to localhost:9702 ... NOTICE: Moving shard 102009 from localhost:9701 to localhost:9702 ... NOTICE: Moving shard 102012 from localhost:9701 to localhost:9702 ... ERROR: cannot use logical replication to transfer shards of the relation t2 since it doesn't have a REPLICA IDENTITY or PRIMARY KEY ``` Now we check and error out in the beginning, without moving the shards. fixes: #6727pull/6782/head^2
parent
aa465b6de1
commit
2713e015d6
|
@ -1818,10 +1818,10 @@ static void
|
|||
RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
|
||||
{
|
||||
char transferMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||
EnsureReferenceTablesExistOnAllNodesExtended(transferMode);
|
||||
|
||||
if (list_length(options->relationIdList) == 0)
|
||||
{
|
||||
EnsureReferenceTablesExistOnAllNodesExtended(transferMode);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1836,6 +1836,25 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
|
|||
|
||||
List *placementUpdateList = GetRebalanceSteps(options);
|
||||
|
||||
if (transferMode == TRANSFER_MODE_AUTOMATIC)
|
||||
{
|
||||
/*
|
||||
* If the shard transfer mode is set to auto, we should check beforehand
|
||||
* if we are able to use logical replication to transfer shards or not.
|
||||
* We throw an error if any of the tables do not have a replica identity, which
|
||||
* is required for logical replication to replicate UPDATE and DELETE commands.
|
||||
*/
|
||||
PlacementUpdateEvent *placementUpdate = NULL;
|
||||
foreach_ptr(placementUpdate, placementUpdateList)
|
||||
{
|
||||
Oid relationId = RelationIdForShard(placementUpdate->shardId);
|
||||
List *colocatedTableList = ColocatedTableList(relationId);
|
||||
VerifyTablesHaveReplicaIdentity(colocatedTableList);
|
||||
}
|
||||
}
|
||||
|
||||
EnsureReferenceTablesExistOnAllNodesExtended(transferMode);
|
||||
|
||||
if (list_length(placementUpdateList) == 0)
|
||||
{
|
||||
return;
|
||||
|
@ -1916,12 +1935,6 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
|
|||
EnsureTableOwner(colocatedTableId);
|
||||
}
|
||||
|
||||
if (shardTransferMode == TRANSFER_MODE_AUTOMATIC)
|
||||
{
|
||||
/* make sure that all tables included in the rebalance have a replica identity*/
|
||||
VerifyTablesHaveReplicaIdentity(colocatedTableList);
|
||||
}
|
||||
|
||||
List *placementUpdateList = GetRebalanceSteps(options);
|
||||
|
||||
if (list_length(placementUpdateList) == 0)
|
||||
|
@ -1930,6 +1943,23 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (shardTransferMode == TRANSFER_MODE_AUTOMATIC)
|
||||
{
|
||||
/*
|
||||
* If the shard transfer mode is set to auto, we should check beforehand
|
||||
* if we are able to use logical replication to transfer shards or not.
|
||||
* We throw an error if any of the tables do not have a replica identity, which
|
||||
* is required for logical replication to replicate UPDATE and DELETE commands.
|
||||
*/
|
||||
PlacementUpdateEvent *placementUpdate = NULL;
|
||||
foreach_ptr(placementUpdate, placementUpdateList)
|
||||
{
|
||||
relationId = RelationIdForShard(placementUpdate->shardId);
|
||||
List *colocatedTables = ColocatedTableList(relationId);
|
||||
VerifyTablesHaveReplicaIdentity(colocatedTables);
|
||||
}
|
||||
}
|
||||
|
||||
DropOrphanedResourcesInSeparateTransaction();
|
||||
|
||||
/* find the name of the shard transfer mode to interpolate in the scheduled command */
|
||||
|
|
|
@ -1482,7 +1482,6 @@ SELECT * from master_drain_node('localhost', :worker_2_port);
|
|||
ERROR: cannot use logical replication to transfer shards of the relation colocated_rebalance_test since it doesn't have a REPLICA IDENTITY or PRIMARY KEY
|
||||
DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY.
|
||||
HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'.
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
-- Make sure shouldhaveshards is false
|
||||
select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port;
|
||||
shouldhaveshards
|
||||
|
@ -2714,6 +2713,39 @@ SELECT sh.logicalrelid, pl.nodeport
|
|||
(5 rows)
|
||||
|
||||
DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
|
||||
-- verify we detect if one of the tables do not have a replica identity or primary key
|
||||
-- and error out in case of shard transfer mode = auto
|
||||
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
create table table_with_primary_key (a int primary key);
|
||||
select create_distributed_table('table_with_primary_key','a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
create table table_without_primary_key (a bigint);
|
||||
select create_distributed_table('table_without_primary_key','a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- add the second node back, then rebalance
|
||||
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16;
|
||||
select 1 from citus_add_node('localhost', :worker_2_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
select rebalance_table_shards();
|
||||
ERROR: cannot use logical replication to transfer shards of the relation table_without_primary_key since it doesn't have a REPLICA IDENTITY or PRIMARY KEY
|
||||
DROP TABLE table_with_primary_key, table_without_primary_key;
|
||||
\c - - - :worker_1_port
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
REVOKE ALL ON SCHEMA public FROM testrole;
|
||||
|
|
|
@ -1497,6 +1497,21 @@ SELECT sh.logicalrelid, pl.nodeport
|
|||
|
||||
DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
|
||||
|
||||
-- verify we detect if one of the tables do not have a replica identity or primary key
|
||||
-- and error out in case of shard transfer mode = auto
|
||||
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
|
||||
|
||||
create table table_with_primary_key (a int primary key);
|
||||
select create_distributed_table('table_with_primary_key','a');
|
||||
create table table_without_primary_key (a bigint);
|
||||
select create_distributed_table('table_without_primary_key','a');
|
||||
|
||||
-- add the second node back, then rebalance
|
||||
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16;
|
||||
select 1 from citus_add_node('localhost', :worker_2_port);
|
||||
select rebalance_table_shards();
|
||||
|
||||
DROP TABLE table_with_primary_key, table_without_primary_key;
|
||||
\c - - - :worker_1_port
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
REVOKE ALL ON SCHEMA public FROM testrole;
|
||||
|
|
Loading…
Reference in New Issue