diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index baed8b0d5..c5282202e 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -1818,10 +1818,10 @@ static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) { char transferMode = LookupShardTransferMode(shardReplicationModeOid); - EnsureReferenceTablesExistOnAllNodesExtended(transferMode); if (list_length(options->relationIdList) == 0) { + EnsureReferenceTablesExistOnAllNodesExtended(transferMode); return; } @@ -1836,6 +1836,25 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) List *placementUpdateList = GetRebalanceSteps(options); + if (transferMode == TRANSFER_MODE_AUTOMATIC) + { + /* + * If the shard transfer mode is set to auto, we should check beforehand + * if we are able to use logical replication to transfer shards or not. + * We throw an error if any of the tables do not have a replica identity, which + * is required for logical replication to replicate UPDATE and DELETE commands. + */ + PlacementUpdateEvent *placementUpdate = NULL; + foreach_ptr(placementUpdate, placementUpdateList) + { + Oid relationId = RelationIdForShard(placementUpdate->shardId); + List *colocatedTableList = ColocatedTableList(relationId); + VerifyTablesHaveReplicaIdentity(colocatedTableList); + } + } + + EnsureReferenceTablesExistOnAllNodesExtended(transferMode); + if (list_length(placementUpdateList) == 0) { return; @@ -1916,12 +1935,6 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo EnsureTableOwner(colocatedTableId); } - if (shardTransferMode == TRANSFER_MODE_AUTOMATIC) - { - /* make sure that all tables included in the rebalance have a replica identity*/ - VerifyTablesHaveReplicaIdentity(colocatedTableList); - } - List *placementUpdateList = GetRebalanceSteps(options); if (list_length(placementUpdateList) == 0) @@ -1930,6 +1943,23 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo return 0; } + if (shardTransferMode == TRANSFER_MODE_AUTOMATIC) + { + /* + * If the shard transfer mode is set to auto, we should check beforehand + * if we are able to use logical replication to transfer shards or not. + * We throw an error if any of the tables do not have a replica identity, which + * is required for logical replication to replicate UPDATE and DELETE commands. + */ + PlacementUpdateEvent *placementUpdate = NULL; + foreach_ptr(placementUpdate, placementUpdateList) + { + relationId = RelationIdForShard(placementUpdate->shardId); + List *colocatedTables = ColocatedTableList(relationId); + VerifyTablesHaveReplicaIdentity(colocatedTables); + } + } + DropOrphanedResourcesInSeparateTransaction(); /* find the name of the shard transfer mode to interpolate in the scheduled command */ diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 2146d67f1..1dea3b442 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -1482,7 +1482,6 @@ SELECT * from master_drain_node('localhost', :worker_2_port); ERROR: cannot use logical replication to transfer shards of the relation colocated_rebalance_test since it doesn't have a REPLICA IDENTITY or PRIMARY KEY DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY. HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'. -CONTEXT: while executing command on localhost:xxxxx -- Make sure shouldhaveshards is false select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port; shouldhaveshards @@ -2714,6 +2713,39 @@ SELECT sh.logicalrelid, pl.nodeport (5 rows) DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE; +-- verify we detect if one of the tables do not have a replica identity or primary key +-- and error out in case of shard transfer mode = auto +SELECT 1 FROM citus_remove_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +create table table_with_primary_key (a int primary key); +select create_distributed_table('table_with_primary_key','a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +create table table_without_primary_key (a bigint); +select create_distributed_table('table_without_primary_key','a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- add the second node back, then rebalance +ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16; +select 1 from citus_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +select rebalance_table_shards(); +ERROR: cannot use logical replication to transfer shards of the relation table_without_primary_key since it doesn't have a REPLICA IDENTITY or PRIMARY KEY +DROP TABLE table_with_primary_key, table_without_primary_key; \c - - - :worker_1_port SET citus.enable_ddl_propagation TO OFF; REVOKE ALL ON SCHEMA public FROM testrole; diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index dbbc94732..da4259f5b 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -1497,6 +1497,21 @@ SELECT sh.logicalrelid, pl.nodeport DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE; +-- verify we detect if one of the tables do not have a replica identity or primary key +-- and error out in case of shard transfer mode = auto +SELECT 1 FROM citus_remove_node('localhost', :worker_2_port); + +create table table_with_primary_key (a int primary key); +select create_distributed_table('table_with_primary_key','a'); +create table table_without_primary_key (a bigint); +select create_distributed_table('table_without_primary_key','a'); + +-- add the second node back, then rebalance +ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16; +select 1 from citus_add_node('localhost', :worker_2_port); +select rebalance_table_shards(); + +DROP TABLE table_with_primary_key, table_without_primary_key; \c - - - :worker_1_port SET citus.enable_ddl_propagation TO OFF; REVOKE ALL ON SCHEMA public FROM testrole;