Cleaner Improvement

niupre/TestDeferredDropAndCleanup
Nitish Upreti 2022-08-25 09:29:19 -07:00
parent 895fe14040
commit bf61fe3565
2 changed files with 34 additions and 10 deletions

View File

@ -66,6 +66,7 @@ typedef struct CleanupRecord
PG_FUNCTION_INFO_V1(citus_cleanup_orphaned_shards); PG_FUNCTION_INFO_V1(citus_cleanup_orphaned_shards);
PG_FUNCTION_INFO_V1(isolation_cleanup_orphaned_shards); PG_FUNCTION_INFO_V1(isolation_cleanup_orphaned_shards);
static int DropOrphanedShardsForMove(bool waitForLocks);
static bool TryDropShardOutsideTransaction(char *qualifiedTableName, char *nodeName, int nodePort); static bool TryDropShardOutsideTransaction(char *qualifiedTableName, char *nodeName, int nodePort);
static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode); static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode);
@ -108,7 +109,7 @@ citus_cleanup_orphaned_shards(PG_FUNCTION_ARGS)
PreventInTransactionBlock(true, "citus_cleanup_orphaned_shards"); PreventInTransactionBlock(true, "citus_cleanup_orphaned_shards");
bool waitForLocks = true; bool waitForLocks = true;
int droppedShardCount = DropOrphanedShards(waitForLocks); int droppedShardCount = DropOrphanedShardsForMove(waitForLocks);
if (droppedShardCount > 0) if (droppedShardCount > 0)
{ {
ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount))); ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount)));
@ -130,7 +131,7 @@ isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS)
EnsureCoordinator(); EnsureCoordinator();
bool waitForLocks = true; bool waitForLocks = true;
int droppedShardCount = DropOrphanedShards(waitForLocks); int droppedShardCount = DropOrphanedShardsForMove(waitForLocks);
if (droppedShardCount > 0) if (droppedShardCount > 0)
{ {
ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount))); ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount)));
@ -143,7 +144,7 @@ isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS)
/* /*
* DropOrphanedShardsInSeparateTransaction cleans up orphaned shards by * DropOrphanedShardsInSeparateTransaction cleans up orphaned shards by
* connecting to localhost. This is done, so that the locks that * connecting to localhost. This is done, so that the locks that
* DropOrphanedShards takes are only held for a short time. * DropOrphanedShardsForMove takes are only held for a short time.
*/ */
void void
DropOrphanedShardsInSeparateTransaction(void) DropOrphanedShardsInSeparateTransaction(void)
@ -153,7 +154,7 @@ DropOrphanedShardsInSeparateTransaction(void)
/* /*
* TryDropOrphanedShards is a wrapper around DropOrphanedShards that catches * TryDropOrphanedShards is a wrapper around DropOrphanedShardsForMove that catches
* any errors to make it safe to use in the maintenance daemon. * any errors to make it safe to use in the maintenance daemon.
* *
* If dropping any of the shards failed this function returns -1, otherwise it * If dropping any of the shards failed this function returns -1, otherwise it
@ -166,7 +167,7 @@ TryDropOrphanedShards(bool waitForLocks)
MemoryContext savedContext = CurrentMemoryContext; MemoryContext savedContext = CurrentMemoryContext;
PG_TRY(); PG_TRY();
{ {
droppedShardCount = DropOrphanedShards(waitForLocks); droppedShardCount = DropOrphanedShardsForMove(waitForLocks);
droppedShardCount += DropOrphanedShardsForCleanup(); droppedShardCount += DropOrphanedShardsForCleanup();
} }
PG_CATCH(); PG_CATCH();
@ -245,7 +246,7 @@ DropOrphanedShardsForCleanup()
/* /*
* DropOrphanedShards removes shards that were marked SHARD_STATE_TO_DELETE before. * DropOrphanedShardsForMove removes shards that were marked SHARD_STATE_TO_DELETE before.
* *
* It does so by trying to take an exclusive lock on the shard and its * It does so by trying to take an exclusive lock on the shard and its
* colocated placements before removing. If the lock cannot be obtained it * colocated placements before removing. If the lock cannot be obtained it
@ -263,8 +264,8 @@ DropOrphanedShardsForCleanup()
* wait for this lock or not. * wait for this lock or not.
* *
*/ */
int static int
DropOrphanedShards(bool waitForLocks) DropOrphanedShardsForMove(bool waitForLocks)
{ {
int removedShardCount = 0; int removedShardCount = 0;
@ -649,7 +650,31 @@ ListCleanupRecords(void)
static List * static List *
ListCleanupRecordsForCurrentOperation(void) ListCleanupRecordsForCurrentOperation(void)
{ {
return NULL; Relation pgDistCleanup = table_open(DistCleanupRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistCleanup);
ScanKeyData scanKey[1];
ScanKeyInit(&scanKey[0], Anum_pg_dist_cleanup_operation_id, BTEqualStrategyNumber,
F_OIDEQ, CurrentOperationId);
int scanKeyCount = 1;
Oid scanIndexId = InvalidOid;
bool useIndex = false;
SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanup, scanIndexId, useIndex, NULL,
scanKeyCount, scanKey);
HeapTuple heapTuple = NULL;
List *recordList = NIL;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
CleanupRecord *record = TupleToCleanupRecord(heapTuple, tupleDescriptor);
recordList = lappend(recordList, record);
}
systable_endscan(scanDescriptor);
table_close(pgDistCleanup, NoLock);
return recordList;
} }
/* /*

View File

@ -22,7 +22,6 @@ extern int NextOperationId;
extern int NextCleanupRecordId; extern int NextCleanupRecordId;
extern int TryDropOrphanedShards(bool waitForLocks); extern int TryDropOrphanedShards(bool waitForLocks);
extern int DropOrphanedShards(bool waitForLocks);
extern void DropOrphanedShardsInSeparateTransaction(void); extern void DropOrphanedShardsInSeparateTransaction(void);
/* Members for cleanup infrastructure */ /* Members for cleanup infrastructure */