diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index cd81e1a5b..b203b663a 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -375,13 +375,13 @@ CompleteNewOperationNeedingCleanup(bool isSuccess) */ Assert(CurrentOperationId != INVALID_OPERATION_ID); - List *recordList = ListCleanupRecordsForCurrentOperation(); + List *currentOperationRecordList = ListCleanupRecordsForCurrentOperation(); int removedShardCountOnComplete = 0; int failedShardCountOnComplete = 0; CleanupRecord *record = NULL; - foreach_ptr(record, recordList) + foreach_ptr(record, currentOperationRecordList) { /* We only supporting cleaning shards right now */ if (record->objectType != CLEANUP_SHARD_PLACEMENT) @@ -413,6 +413,18 @@ CompleteNewOperationNeedingCleanup(bool isSuccess) DeleteCleanupRecordByRecordId(record->recordId); } } + + if (list_length(currentOperationRecordList) > 0) + { + ereport(LOG, (errmsg("Removed %d orphaned shards out of %d", + removedShardCountOnComplete, list_length(currentOperationRecordList)))); + + if (failedShardCountOnComplete > 0) + { + ereport(WARNING, (errmsg("Failed to drop %d cleanup shards out of %d", + failedShardCountOnComplete, list_length(currentOperationRecordList)))); + } + } } diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 6fa8b7318..cb277b1f2 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -89,10 +89,13 @@ CREATE TABLE citus.pg_dist_cleanup ( policy_type int not null ); ALTER TABLE citus.pg_dist_cleanup SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.pg_dist_cleanup TO public; -- Sequence used to generate an operation ID for use in pg_dist_cleanup_record. -CREATE SEQUENCE citus.pg_dist_operationid_seq; +-- Right now, move is hardcoded to 1 (this will change with parallel moves), so +-- start with a higher number. +CREATE SEQUENCE citus.pg_dist_operationid_seq MINVALUE 101; ALTER SEQUENCE citus.pg_dist_operationid_seq SET SCHEMA pg_catalog; CREATE SEQUENCE citus.pg_dist_cleanup_recordid_seq; diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index 4432b7132..60470726c 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -37,6 +37,9 @@ typedef enum CleanupObject CLEANUP_SHARD_PLACEMENT = 1 } CleanupObject; +/* + * CleanupPolicy represents the policy type for cleanup records. + */ typedef enum CleanupPolicy { /* @@ -52,25 +55,56 @@ typedef enum CleanupPolicy CLEANUP_ON_FAILURE = 1, /* - * Resources that are to be deferred cleanup only on success. + * Resources that need 'deferred' clean up only on success . * (Example: Parent child being split for Blocking/Non-Blocking splits) */ CLEANUP_DEFERRED_ON_SUCCESS = 2, } CleanupPolicy; +/* Global Constants */ #define INVALID_OPERATION_ID 0 #define INVALID_CLEANUP_RECORD_ID 0 /* APIs for cleanup infrastructure */ + +/* + * StartNewOperationNeedingCleanup is be called by an operation to register + * for cleanup. + */ extern OperationId StartNewOperationNeedingCleanup(void); + +/* + * InsertCleanupRecordInCurrentTransaction inserts a new pg_dist_cleanup entry + * as part of the current transaction. + * + * This is primarily useful for deferred cleanup (CLEANUP_DEFERRED_ON_SUCCESS) + * scenarios, since the records would roll back in case of failure. + */ extern void InsertCleanupRecordInCurrentTransaction(CleanupObject objectType, char *objectName, int nodeGroupId, CleanupPolicy policy); + +/* + * InsertCleanupRecordInSeparateTransaction inserts a new pg_dist_cleanup entry + * in a separate transaction to ensure the record persists after rollback. + * + * This is used in scenarios where we need to cleanup resources on operation + * completion (CLEANUP_ALWAYS) or on failure (CLEANUP_ON_FAILURE). + */ extern void InsertCleanupRecordInSubtransaction(CleanupObject objectType, char *objectName, int nodeGroupId, CleanupPolicy policy); + +/* + * CompleteNewOperationNeedingCleanup is be called by an operation to signal + * completion. This will trigger cleanup of resources that were registered for: + * + * 1) CLEANUP_ALWAYS: resources that are transient and always need clean up. + * 2) CLEANUP_ON_FAILURE: resources that are cleanup only on failure, if + * isSuccess is false. + */ extern void CompleteNewOperationNeedingCleanup(bool isSuccess); #endif /*CITUS_SHARD_CLEANER_H */