Add ExecuteCriticalCommandInDifferentTransaction function

We use this pattern multiple times throughout the codebase now. Seems
like a good moment to abstract it away.
pull/5024/head
Jelte Fennema 2021-06-04 11:16:12 +02:00
parent 503c70b619
commit 3f60e4f394
5 changed files with 28 additions and 20 deletions

View File

@ -1067,7 +1067,7 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo
* Trigger deletion of orphaned shards and hope that this removes * Trigger deletion of orphaned shards and hope that this removes
* the shard. * the shard.
*/ */
DropMarkedShardsInDifferentTransaction(); DropMarkedShardsInSeparateTransaction();
shardPlacementList = ShardPlacementList(shardId); shardPlacementList = ShardPlacementList(shardId);
targetPlacement = SearchShardPlacementInList(shardPlacementList, targetPlacement = SearchShardPlacementInList(shardPlacementList,
targetNodeName, targetNodeName,

View File

@ -18,6 +18,7 @@
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/shard_cleaner.h" #include "distributed/shard_cleaner.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
@ -88,18 +89,14 @@ isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS)
/* /*
* DropMarkedShardsInDifferentTransaction cleans up orphaned shards by * DropMarkedShardsInSeparateTransaction cleans up orphaned shards by
* connecting to localhost. This is done, so that the locks that * connecting to localhost. This is done, so that the locks that
* DropMarkedShards takes are only held for a short time. * DropMarkedShards takes are only held for a short time.
*/ */
void void
DropMarkedShardsInDifferentTransaction(void) DropMarkedShardsInSeparateTransaction(void)
{ {
int connectionFlag = FORCE_NEW_CONNECTION; ExecuteCriticalCommandInSeparateTransaction("CALL citus_cleanup_orphaned_shards()");
MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName,
PostPortNumber);
ExecuteCriticalRemoteCommand(connection, "CALL citus_cleanup_orphaned_shards();");
CloseConnection(connection);
} }

View File

@ -701,7 +701,7 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid,
"unsupported"))); "unsupported")));
} }
DropMarkedShardsInDifferentTransaction(); DropMarkedShardsInSeparateTransaction();
foreach(placementUpdateCell, placementUpdateList) foreach(placementUpdateCell, placementUpdateList)
{ {
@ -913,17 +913,15 @@ citus_drain_node(PG_FUNCTION_ARGS)
}; };
char *nodeName = text_to_cstring(nodeNameText); char *nodeName = text_to_cstring(nodeNameText);
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName,
PostPortNumber);
/* /*
* This is done in a separate session. This way it's not undone if the * This is done in a separate session. This way it's not undone if the
* draining fails midway through. * draining fails midway through.
*/ */
ExecuteCriticalRemoteCommand(connection, psprintf( ExecuteCriticalCommandInSeparateTransaction(psprintf(
"SELECT master_set_node_property(%s, %i, 'shouldhaveshards', false)", "SELECT master_set_node_property(%s, %i, 'shouldhaveshards', false)",
quote_literal_cstr(nodeName), nodePort)); quote_literal_cstr(nodeName),
nodePort));
RebalanceTableShards(&options, shardTransferModeOid); RebalanceTableShards(&options, shardTransferModeOid);
@ -1695,20 +1693,32 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent,
REBALANCE_PROGRESS_MOVING); REBALANCE_PROGRESS_MOVING);
ConflictShardPlacementUpdateOnlyWithIsolationTesting(shardId); ConflictShardPlacementUpdateOnlyWithIsolationTesting(shardId);
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName,
PostPortNumber);
/* /*
* In case of failure, we throw an error such that rebalance_table_shards * In case of failure, we throw an error such that rebalance_table_shards
* fails early. * fails early.
*/ */
ExecuteCriticalRemoteCommand(connection, placementUpdateCommand->data); ExecuteCriticalCommandInSeparateTransaction(placementUpdateCommand->data);
UpdateColocatedShardPlacementProgress(shardId, UpdateColocatedShardPlacementProgress(shardId,
sourceNode->workerName, sourceNode->workerName,
sourceNode->workerPort, sourceNode->workerPort,
REBALANCE_PROGRESS_MOVED); REBALANCE_PROGRESS_MOVED);
}
/*
* ExecuteCriticalCommandInSeparateTransaction runs a command in a separate
* transaction that is commited right away. This is useful for things that you
* don't want to rollback when the current transaction is rolled back.
*/
void
ExecuteCriticalCommandInSeparateTransaction(char *command)
{
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName,
PostPortNumber);
ExecuteCriticalRemoteCommand(connection, command);
CloseConnection(connection); CloseConnection(connection);
} }

View File

@ -19,6 +19,6 @@ extern bool CheckAvailableSpaceBeforeMove;
extern int TryDropMarkedShards(bool waitForLocks); extern int TryDropMarkedShards(bool waitForLocks);
extern int DropMarkedShards(bool waitForLocks); extern int DropMarkedShards(bool waitForLocks);
extern void DropMarkedShardsInDifferentTransaction(void); extern void DropMarkedShardsInSeparateTransaction(void);
#endif /*CITUS_SHARD_CLEANER_H */ #endif /*CITUS_SHARD_CLEANER_H */

View File

@ -190,6 +190,7 @@ extern List * RebalancePlacementUpdates(List *workerNodeList, List *shardPlaceme
RebalancePlanFunctions *rebalancePlanFunctions); RebalancePlanFunctions *rebalancePlanFunctions);
extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList, extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList,
int shardReplicationFactor); int shardReplicationFactor);
extern void ExecuteCriticalCommandInSeparateTransaction(char *command);
#endif /* SHARD_REBALANCER_H */ #endif /* SHARD_REBALANCER_H */