Clarify resource-cleaner apis (#7518)

Rename InsertCleanupRecordInCurrentTransaction ->
InsertCleanupOnSuccessRecordInCurrentTransaction and hardcode policy
type as CLEANUP_DEFERRED_ON_SUCCESS.

Rename InsertCleanupRecordInSubtransaction ->
InsertCleanupRecordOutsideTransaction.
grant_database_2pc_onur_1
Onur Tirtir 2024-02-20 11:57:08 +03:00 committed by GitHub
parent 71ccbcf3e2
commit 56e014e64e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 79 additions and 74 deletions

View File

@ -426,10 +426,9 @@ ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement,
errdetail("Marking this shard placement for "
"deletion")));
InsertCleanupRecordInCurrentTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
shardRelationName,
shardPlacement->groupId,
CLEANUP_DEFERRED_ON_SUCCESS);
InsertCleanupOnSuccessRecordInCurrentTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
shardRelationName,
shardPlacement->groupId);
return;
}

View File

@ -452,15 +452,15 @@ CompareCleanupRecordsByObjectType(const void *leftElement, const void *rightElem
/*
* InsertCleanupRecordInCurrentTransaction inserts a new pg_dist_cleanup entry
* InsertCleanupOnSuccessRecordInCurrentTransaction inserts a new pg_dist_cleanup entry
* as part of the current transaction. This is primarily useful for deferred drop scenarios,
* since these records would roll back in case of operation failure.
* since these records would roll back in case of operation failure. And for the same reason,
* always sets the policy type to CLEANUP_DEFERRED_ON_SUCCESS.
*/
void
InsertCleanupRecordInCurrentTransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy)
InsertCleanupOnSuccessRecordInCurrentTransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId)
{
/* We must have a valid OperationId. Any operation requring cleanup
* will call RegisterOperationNeedingCleanup.
@ -482,7 +482,8 @@ InsertCleanupRecordInCurrentTransaction(CleanupObject objectType,
values[Anum_pg_dist_cleanup_object_type - 1] = Int32GetDatum(objectType);
values[Anum_pg_dist_cleanup_object_name - 1] = CStringGetTextDatum(objectName);
values[Anum_pg_dist_cleanup_node_group_id - 1] = Int32GetDatum(nodeGroupId);
values[Anum_pg_dist_cleanup_policy_type - 1] = Int32GetDatum(policy);
values[Anum_pg_dist_cleanup_policy_type - 1] =
Int32GetDatum(CLEANUP_DEFERRED_ON_SUCCESS);
/* open cleanup relation and insert new tuple */
Oid relationId = DistCleanupRelationId();
@ -499,23 +500,27 @@ InsertCleanupRecordInCurrentTransaction(CleanupObject objectType,
/*
* InsertCleanupRecordInSubtransaction inserts a new pg_dist_cleanup entry in a
* InsertCleanupRecordOutsideTransaction inserts a new pg_dist_cleanup entry in a
* separate transaction to ensure the record persists after rollback. We should
* delete these records if the operation completes successfully.
*
* For failure scenarios, use a subtransaction (direct insert via localhost).
* This is used in scenarios where we need to cleanup resources on operation
* completion (CLEANUP_ALWAYS) or on failure (CLEANUP_ON_FAILURE).
*/
void
InsertCleanupRecordInSubtransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy)
InsertCleanupRecordOutsideTransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy)
{
/* We must have a valid OperationId. Any operation requring cleanup
* will call RegisterOperationNeedingCleanup.
*/
Assert(CurrentOperationId != INVALID_OPERATION_ID);
/* assert the circumstance noted in function comment */
Assert(policy == CLEANUP_ALWAYS || policy == CLEANUP_ON_FAILURE);
StringInfo sequenceName = makeStringInfo();
appendStringInfo(sequenceName, "%s.%s",
PG_CATALOG,

View File

@ -733,11 +733,11 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
workerPlacementNode->workerPort)));
}
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
workerPlacementNode->groupId,
CLEANUP_ON_FAILURE);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
workerPlacementNode->groupId,
CLEANUP_ON_FAILURE);
/* Create new split child shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList,
@ -1717,11 +1717,11 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
/* Log shard in pg_dist_cleanup. Given dummy shards are transient resources,
* we want to cleanup irrespective of operation success or failure.
*/
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
workerPlacementNode->groupId,
CLEANUP_ALWAYS);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
workerPlacementNode->groupId,
CLEANUP_ALWAYS);
/* Create dummy source shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList,
@ -1780,11 +1780,11 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
/* Log shard in pg_dist_cleanup. Given dummy shards are transient resources,
* we want to cleanup irrespective of operation success or failure.
*/
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
sourceWorkerNode->groupId,
CLEANUP_ALWAYS);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
sourceWorkerNode->groupId,
CLEANUP_ALWAYS);
/* Create dummy split child shard on source worker node */
CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode);

View File

@ -604,10 +604,10 @@ InsertDeferredDropCleanupRecordsForShards(List *shardIntervalList)
* We also log cleanup record in the current transaction. If the current transaction rolls back,
* we do not generate a record at all.
*/
InsertCleanupRecordInCurrentTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
qualifiedShardName,
placement->groupId,
CLEANUP_DEFERRED_ON_SUCCESS);
InsertCleanupOnSuccessRecordInCurrentTransaction(
CLEANUP_OBJECT_SHARD_PLACEMENT,
qualifiedShardName,
placement->groupId);
}
}
}
@ -634,10 +634,9 @@ InsertCleanupRecordsForShardPlacementsOnNode(List *shardIntervalList,
* We also log cleanup record in the current transaction. If the current transaction rolls back,
* we do not generate a record at all.
*/
InsertCleanupRecordInCurrentTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
qualifiedShardName,
groupId,
CLEANUP_DEFERRED_ON_SUCCESS);
InsertCleanupOnSuccessRecordInCurrentTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
qualifiedShardName,
groupId);
}
}
@ -1393,10 +1392,11 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa
char *tableOwner = TableOwner(shardInterval->relationId);
/* drop the shard we created on the target, in case of failure */
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
GroupForNode(targetNodeName, targetNodePort),
CLEANUP_ON_FAILURE);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
GroupForNode(targetNodeName,
targetNodePort),
CLEANUP_ON_FAILURE);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner,
@ -1466,10 +1466,11 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
char *tableOwner = TableOwner(shardInterval->relationId);
/* drop the shard we created on the target, in case of failure */
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
GroupForNode(targetNodeName, targetNodePort),
CLEANUP_ON_FAILURE);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
GroupForNode(targetNodeName,
targetNodePort),
CLEANUP_ON_FAILURE);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);

View File

@ -1335,10 +1335,10 @@ CreatePublications(MultiConnection *connection,
WorkerNode *worker = FindWorkerNode(connection->hostname,
connection->port);
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_PUBLICATION,
entry->name,
worker->groupId,
CLEANUP_ALWAYS);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_PUBLICATION,
entry->name,
worker->groupId,
CLEANUP_ALWAYS);
ExecuteCriticalRemoteCommand(connection, DISABLE_DDL_PROPAGATION);
ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data);
@ -1435,10 +1435,10 @@ CreateReplicationSlots(MultiConnection *sourceConnection,
WorkerNode *worker = FindWorkerNode(sourceConnection->hostname,
sourceConnection->port);
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_REPLICATION_SLOT,
replicationSlot->name,
worker->groupId,
CLEANUP_ALWAYS);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_REPLICATION_SLOT,
replicationSlot->name,
worker->groupId,
CLEANUP_ALWAYS);
if (!firstReplicationSlot)
{
@ -1506,10 +1506,10 @@ CreateSubscriptions(MultiConnection *sourceConnection,
quote_identifier(GetUserNameFromId(ownerId, false))
)));
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_USER,
target->subscriptionOwnerName,
worker->groupId,
CLEANUP_ALWAYS);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_USER,
target->subscriptionOwnerName,
worker->groupId,
CLEANUP_ALWAYS);
StringInfo conninfo = makeStringInfo();
appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' "
@ -1567,10 +1567,10 @@ CreateSubscriptions(MultiConnection *sourceConnection,
pfree(createSubscriptionCommand->data);
pfree(createSubscriptionCommand);
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SUBSCRIPTION,
target->subscriptionName,
worker->groupId,
CLEANUP_ALWAYS);
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SUBSCRIPTION,
target->subscriptionName,
worker->groupId,
CLEANUP_ALWAYS);
ExecuteCriticalRemoteCommand(target->superuserConnection, psprintf(
"ALTER SUBSCRIPTION %s OWNER TO %s",

View File

@ -81,16 +81,16 @@ typedef enum CleanupPolicy
extern OperationId RegisterOperationNeedingCleanup(void);
/*
* InsertCleanupRecordInCurrentTransaction inserts a new pg_dist_cleanup entry
* InsertCleanupOnSuccessRecordInCurrentTransaction inserts a new pg_dist_cleanup entry
* as part of the current transaction.
*
* This is primarily useful for deferred cleanup (CLEANUP_DEFERRED_ON_SUCCESS)
* scenarios, since the records would roll back in case of failure.
* scenarios, since the records would roll back in case of failure. And for the
* same reason, always sets the policy type to CLEANUP_DEFERRED_ON_SUCCESS.
*/
extern void InsertCleanupRecordInCurrentTransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy);
extern void InsertCleanupOnSuccessRecordInCurrentTransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId);
/*
* InsertCleanupRecordInSeparateTransaction inserts a new pg_dist_cleanup entry
@ -99,10 +99,10 @@ extern void InsertCleanupRecordInCurrentTransaction(CleanupObject objectType,
* This is used in scenarios where we need to cleanup resources on operation
* completion (CLEANUP_ALWAYS) or on failure (CLEANUP_ON_FAILURE).
*/
extern void InsertCleanupRecordInSubtransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy);
extern void InsertCleanupRecordOutsideTransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy);
/*
* FinalizeOperationNeedingCleanupOnSuccess is be called by an operation to signal