diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 7938a954f..121b0afdc 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -1067,7 +1067,7 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo * Trigger deletion of orphaned shards and hope that this removes * the shard. */ - DropMarkedShardsInDifferentTransaction(); + DropMarkedShardsInSeparateTransaction(); shardPlacementList = ShardPlacementList(shardId); targetPlacement = SearchShardPlacementInList(shardPlacementList, targetNodeName, diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index d01fd6eb3..bbd6a666c 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -18,6 +18,7 @@ #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/shard_cleaner.h" +#include "distributed/shard_rebalancer.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/worker_transaction.h" @@ -88,18 +89,14 @@ isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS) /* - * DropMarkedShardsInDifferentTransaction cleans up orphaned shards by + * DropMarkedShardsInSeparateTransaction cleans up orphaned shards by * connecting to localhost. This is done, so that the locks that * DropMarkedShards takes are only held for a short time. */ void -DropMarkedShardsInDifferentTransaction(void) +DropMarkedShardsInSeparateTransaction(void) { - int connectionFlag = FORCE_NEW_CONNECTION; - MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, - PostPortNumber); - ExecuteCriticalRemoteCommand(connection, "CALL citus_cleanup_orphaned_shards();"); - CloseConnection(connection); + ExecuteCriticalCommandInSeparateTransaction("CALL citus_cleanup_orphaned_shards()"); } diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index d9832f306..93716e624 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -701,7 +701,7 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, "unsupported"))); } - DropMarkedShardsInDifferentTransaction(); + DropMarkedShardsInSeparateTransaction(); foreach(placementUpdateCell, placementUpdateList) { @@ -913,17 +913,15 @@ citus_drain_node(PG_FUNCTION_ARGS) }; char *nodeName = text_to_cstring(nodeNameText); - int connectionFlag = FORCE_NEW_CONNECTION; - MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, - PostPortNumber); /* * This is done in a separate session. This way it's not undone if the * draining fails midway through. */ - ExecuteCriticalRemoteCommand(connection, psprintf( - "SELECT master_set_node_property(%s, %i, 'shouldhaveshards', false)", - quote_literal_cstr(nodeName), nodePort)); + ExecuteCriticalCommandInSeparateTransaction(psprintf( + "SELECT master_set_node_property(%s, %i, 'shouldhaveshards', false)", + quote_literal_cstr(nodeName), + nodePort)); RebalanceTableShards(&options, shardTransferModeOid); @@ -1695,20 +1693,32 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, REBALANCE_PROGRESS_MOVING); ConflictShardPlacementUpdateOnlyWithIsolationTesting(shardId); - int connectionFlag = FORCE_NEW_CONNECTION; - MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, - PostPortNumber); /* * In case of failure, we throw an error such that rebalance_table_shards * fails early. */ - ExecuteCriticalRemoteCommand(connection, placementUpdateCommand->data); + ExecuteCriticalCommandInSeparateTransaction(placementUpdateCommand->data); UpdateColocatedShardPlacementProgress(shardId, sourceNode->workerName, sourceNode->workerPort, REBALANCE_PROGRESS_MOVED); +} + + +/* + * ExecuteCriticalCommandInSeparateTransaction runs a command in a separate + * transaction that is commited right away. This is useful for things that you + * don't want to rollback when the current transaction is rolled back. + */ +void +ExecuteCriticalCommandInSeparateTransaction(char *command) +{ + int connectionFlag = FORCE_NEW_CONNECTION; + MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, + PostPortNumber); + ExecuteCriticalRemoteCommand(connection, command); CloseConnection(connection); } diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index 103097397..83daf5cff 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -19,6 +19,6 @@ extern bool CheckAvailableSpaceBeforeMove; extern int TryDropMarkedShards(bool waitForLocks); extern int DropMarkedShards(bool waitForLocks); -extern void DropMarkedShardsInDifferentTransaction(void); +extern void DropMarkedShardsInSeparateTransaction(void); #endif /*CITUS_SHARD_CLEANER_H */ diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 096af4a58..de0684d68 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -190,6 +190,7 @@ extern List * RebalancePlacementUpdates(List *workerNodeList, List *shardPlaceme RebalancePlanFunctions *rebalancePlanFunctions); extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList, int shardReplicationFactor); +extern void ExecuteCriticalCommandInSeparateTransaction(char *command); #endif /* SHARD_REBALANCER_H */