From bf61fe3565c9c1abef734f5dc69dc48408be3993 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Thu, 25 Aug 2022 09:29:19 -0700 Subject: [PATCH] Cleaner Improvement --- .../distributed/operations/shard_cleaner.c | 43 +++++++++++++++---- src/include/distributed/shard_cleaner.h | 1 - 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index 89fb58992..52b85409c 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -66,6 +66,7 @@ typedef struct CleanupRecord PG_FUNCTION_INFO_V1(citus_cleanup_orphaned_shards); PG_FUNCTION_INFO_V1(isolation_cleanup_orphaned_shards); +static int DropOrphanedShardsForMove(bool waitForLocks); static bool TryDropShardOutsideTransaction(char *qualifiedTableName, char *nodeName, int nodePort); static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode); @@ -108,7 +109,7 @@ citus_cleanup_orphaned_shards(PG_FUNCTION_ARGS) PreventInTransactionBlock(true, "citus_cleanup_orphaned_shards"); bool waitForLocks = true; - int droppedShardCount = DropOrphanedShards(waitForLocks); + int droppedShardCount = DropOrphanedShardsForMove(waitForLocks); if (droppedShardCount > 0) { ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount))); @@ -130,7 +131,7 @@ isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS) EnsureCoordinator(); bool waitForLocks = true; - int droppedShardCount = DropOrphanedShards(waitForLocks); + int droppedShardCount = DropOrphanedShardsForMove(waitForLocks); if (droppedShardCount > 0) { ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount))); @@ -143,7 +144,7 @@ isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS) /* * DropOrphanedShardsInSeparateTransaction cleans up orphaned shards by * connecting to localhost. This is done, so that the locks that - * DropOrphanedShards takes are only held for a short time. + * DropOrphanedShardsForMove takes are only held for a short time. */ void DropOrphanedShardsInSeparateTransaction(void) @@ -153,7 +154,7 @@ DropOrphanedShardsInSeparateTransaction(void) /* - * TryDropOrphanedShards is a wrapper around DropOrphanedShards that catches + * TryDropOrphanedShards is a wrapper around DropOrphanedShardsForMove that catches * any errors to make it safe to use in the maintenance daemon. * * If dropping any of the shards failed this function returns -1, otherwise it @@ -166,7 +167,7 @@ TryDropOrphanedShards(bool waitForLocks) MemoryContext savedContext = CurrentMemoryContext; PG_TRY(); { - droppedShardCount = DropOrphanedShards(waitForLocks); + droppedShardCount = DropOrphanedShardsForMove(waitForLocks); droppedShardCount += DropOrphanedShardsForCleanup(); } PG_CATCH(); @@ -245,7 +246,7 @@ DropOrphanedShardsForCleanup() /* - * DropOrphanedShards removes shards that were marked SHARD_STATE_TO_DELETE before. + * DropOrphanedShardsForMove removes shards that were marked SHARD_STATE_TO_DELETE before. * * It does so by trying to take an exclusive lock on the shard and its * colocated placements before removing. If the lock cannot be obtained it @@ -263,8 +264,8 @@ DropOrphanedShardsForCleanup() * wait for this lock or not. * */ -int -DropOrphanedShards(bool waitForLocks) +static int +DropOrphanedShardsForMove(bool waitForLocks) { int removedShardCount = 0; @@ -649,7 +650,31 @@ ListCleanupRecords(void) static List * ListCleanupRecordsForCurrentOperation(void) { - return NULL; + Relation pgDistCleanup = table_open(DistCleanupRelationId(), AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistCleanup); + + ScanKeyData scanKey[1]; + ScanKeyInit(&scanKey[0], Anum_pg_dist_cleanup_operation_id, BTEqualStrategyNumber, + F_OIDEQ, CurrentOperationId); + + int scanKeyCount = 1; + Oid scanIndexId = InvalidOid; + bool useIndex = false; + SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanup, scanIndexId, useIndex, NULL, + scanKeyCount, scanKey); + + HeapTuple heapTuple = NULL; + List *recordList = NIL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + CleanupRecord *record = TupleToCleanupRecord(heapTuple, tupleDescriptor); + recordList = lappend(recordList, record); + } + + systable_endscan(scanDescriptor); + table_close(pgDistCleanup, NoLock); + + return recordList; } /* diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index 7f0f1992c..4432b7132 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -22,7 +22,6 @@ extern int NextOperationId; extern int NextCleanupRecordId; extern int TryDropOrphanedShards(bool waitForLocks); -extern int DropOrphanedShards(bool waitForLocks); extern void DropOrphanedShardsInSeparateTransaction(void); /* Members for cleanup infrastructure */