Improvements and comments

niupre/TestDeferredDropAndCleanup
Nitish Upreti 2022-08-25 18:42:46 -07:00
parent 92b1cdf6c0
commit 919e44eab6
3 changed files with 53 additions and 4 deletions

View File

@ -375,13 +375,13 @@ CompleteNewOperationNeedingCleanup(bool isSuccess)
*/ */
Assert(CurrentOperationId != INVALID_OPERATION_ID); Assert(CurrentOperationId != INVALID_OPERATION_ID);
List *recordList = ListCleanupRecordsForCurrentOperation(); List *currentOperationRecordList = ListCleanupRecordsForCurrentOperation();
int removedShardCountOnComplete = 0; int removedShardCountOnComplete = 0;
int failedShardCountOnComplete = 0; int failedShardCountOnComplete = 0;
CleanupRecord *record = NULL; CleanupRecord *record = NULL;
foreach_ptr(record, recordList) foreach_ptr(record, currentOperationRecordList)
{ {
/* We only supporting cleaning shards right now */ /* We only supporting cleaning shards right now */
if (record->objectType != CLEANUP_SHARD_PLACEMENT) if (record->objectType != CLEANUP_SHARD_PLACEMENT)
@ -413,6 +413,18 @@ CompleteNewOperationNeedingCleanup(bool isSuccess)
DeleteCleanupRecordByRecordId(record->recordId); DeleteCleanupRecordByRecordId(record->recordId);
} }
} }
if (list_length(currentOperationRecordList) > 0)
{
ereport(LOG, (errmsg("Removed %d orphaned shards out of %d",
removedShardCountOnComplete, list_length(currentOperationRecordList))));
if (failedShardCountOnComplete > 0)
{
ereport(WARNING, (errmsg("Failed to drop %d cleanup shards out of %d",
failedShardCountOnComplete, list_length(currentOperationRecordList))));
}
}
} }

View File

@ -89,10 +89,13 @@ CREATE TABLE citus.pg_dist_cleanup (
policy_type int not null policy_type int not null
); );
ALTER TABLE citus.pg_dist_cleanup SET SCHEMA pg_catalog; ALTER TABLE citus.pg_dist_cleanup SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.pg_dist_cleanup TO public;
-- Sequence used to generate an operation ID for use in pg_dist_cleanup_record. -- Sequence used to generate an operation ID for use in pg_dist_cleanup_record.
CREATE SEQUENCE citus.pg_dist_operationid_seq; -- Right now, move is hardcoded to 1 (this will change with parallel moves), so
-- start with a higher number.
CREATE SEQUENCE citus.pg_dist_operationid_seq MINVALUE 101;
ALTER SEQUENCE citus.pg_dist_operationid_seq SET SCHEMA pg_catalog; ALTER SEQUENCE citus.pg_dist_operationid_seq SET SCHEMA pg_catalog;
CREATE SEQUENCE citus.pg_dist_cleanup_recordid_seq; CREATE SEQUENCE citus.pg_dist_cleanup_recordid_seq;

View File

@ -37,6 +37,9 @@ typedef enum CleanupObject
CLEANUP_SHARD_PLACEMENT = 1 CLEANUP_SHARD_PLACEMENT = 1
} CleanupObject; } CleanupObject;
/*
* CleanupPolicy represents the policy type for cleanup records.
*/
typedef enum CleanupPolicy typedef enum CleanupPolicy
{ {
/* /*
@ -52,25 +55,56 @@ typedef enum CleanupPolicy
CLEANUP_ON_FAILURE = 1, CLEANUP_ON_FAILURE = 1,
/* /*
* Resources that are to be deferred cleanup only on success. * Resources that need 'deferred' clean up only on success .
* (Example: Parent child being split for Blocking/Non-Blocking splits) * (Example: Parent child being split for Blocking/Non-Blocking splits)
*/ */
CLEANUP_DEFERRED_ON_SUCCESS = 2, CLEANUP_DEFERRED_ON_SUCCESS = 2,
} CleanupPolicy; } CleanupPolicy;
/* Global Constants */
#define INVALID_OPERATION_ID 0 #define INVALID_OPERATION_ID 0
#define INVALID_CLEANUP_RECORD_ID 0 #define INVALID_CLEANUP_RECORD_ID 0
/* APIs for cleanup infrastructure */ /* APIs for cleanup infrastructure */
/*
* StartNewOperationNeedingCleanup is be called by an operation to register
* for cleanup.
*/
extern OperationId StartNewOperationNeedingCleanup(void); extern OperationId StartNewOperationNeedingCleanup(void);
/*
* InsertCleanupRecordInCurrentTransaction inserts a new pg_dist_cleanup entry
* as part of the current transaction.
*
* This is primarily useful for deferred cleanup (CLEANUP_DEFERRED_ON_SUCCESS)
* scenarios, since the records would roll back in case of failure.
*/
extern void InsertCleanupRecordInCurrentTransaction(CleanupObject objectType, extern void InsertCleanupRecordInCurrentTransaction(CleanupObject objectType,
char *objectName, char *objectName,
int nodeGroupId, int nodeGroupId,
CleanupPolicy policy); CleanupPolicy policy);
/*
* InsertCleanupRecordInSeparateTransaction inserts a new pg_dist_cleanup entry
* in a separate transaction to ensure the record persists after rollback.
*
* This is used in scenarios where we need to cleanup resources on operation
* completion (CLEANUP_ALWAYS) or on failure (CLEANUP_ON_FAILURE).
*/
extern void InsertCleanupRecordInSubtransaction(CleanupObject objectType, extern void InsertCleanupRecordInSubtransaction(CleanupObject objectType,
char *objectName, char *objectName,
int nodeGroupId, int nodeGroupId,
CleanupPolicy policy); CleanupPolicy policy);
/*
* CompleteNewOperationNeedingCleanup is be called by an operation to signal
* completion. This will trigger cleanup of resources that were registered for:
*
* 1) CLEANUP_ALWAYS: resources that are transient and always need clean up.
* 2) CLEANUP_ON_FAILURE: resources that are cleanup only on failure, if
* isSuccess is false.
*/
extern void CompleteNewOperationNeedingCleanup(bool isSuccess); extern void CompleteNewOperationNeedingCleanup(bool isSuccess);
#endif /*CITUS_SHARD_CLEANER_H */ #endif /*CITUS_SHARD_CLEANER_H */