From 895fe140402009fe821c041ddedb5d9e1a1e3267 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Wed, 24 Aug 2022 23:55:44 -0700 Subject: [PATCH] Initial Commit --- .../distributed/metadata/metadata_cache.c | 24 + .../distributed/operations/shard_cleaner.c | 577 +++++++++++++++++- .../distributed/operations/shard_split.c | 326 +++------- src/backend/distributed/shared_library_init.c | 49 ++ .../distributed/sql/citus--11.0-4--11.1-1.sql | 18 + .../transaction/transaction_management.c | 2 + src/backend/distributed/utils/resource_lock.c | 12 +- src/include/distributed/metadata_cache.h | 2 + src/include/distributed/pg_dist_cleanup.h | 34 ++ src/include/distributed/resource_lock.h | 14 +- src/include/distributed/shard_cleaner.h | 53 ++ 11 files changed, 840 insertions(+), 271 deletions(-) create mode 100644 src/include/distributed/pg_dist_cleanup.h diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 83fc33720..949fe91c1 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -147,6 +147,8 @@ typedef struct MetadataCacheData Oid distLocalGroupRelationId; Oid distObjectRelationId; Oid distObjectPrimaryKeyIndexId; + Oid distCleanupRelationId; + Oid distCleanupPrimaryKeyIndexId; Oid distColocationRelationId; Oid distColocationConfigurationIndexId; Oid distPartitionRelationId; @@ -2465,6 +2467,28 @@ DistObjectPrimaryKeyIndexId(void) } +/* return oid of pg_dist_cleanup relation */ +Oid +DistCleanupRelationId(void) +{ + CachedRelationLookup("pg_dist_cleanup", + &MetadataCache.distCleanupRelationId); + + return MetadataCache.distCleanupRelationId; +} + + +/* return oid of pg_dist_cleanup primary key index */ +Oid +DistCleanupPrimaryKeyIndexId(void) +{ + CachedRelationLookup("pg_dist_cleanup_pkey", + &MetadataCache.distCleanupPrimaryKeyIndexId); + + return MetadataCache.distCleanupPrimaryKeyIndexId; +} + + /* return oid of pg_dist_colocation relation */ Oid DistColocationRelationId(void) diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index 0c9e7903c..89fb58992 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -11,10 +11,17 @@ */ #include "postgres.h" - +#include "miscadmin.h" +#include "access/genam.h" #include "access/xact.h" +#include "catalog/namespace.h" +#include "commands/sequence.h" #include "postmaster/postmaster.h" +#include "nodes/makefuncs.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "distributed/listutils.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/shard_cleaner.h" @@ -22,15 +29,60 @@ #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/worker_transaction.h" +#include "distributed/pg_dist_cleanup.h" + +/* GUC configuration for shard cleaner */ +int NextOperationId = 0; +int NextCleanupRecordId = 0; + +/* Data structure for cleanup operation */ + +/* + * CleanupRecord represents a record from pg_dist_cleanup. + */ +typedef struct CleanupRecord +{ + /* unique identifier of the record (for deletes) */ + uint64 recordId; + + /* identifier of the operation that generated the record (must not be in progress) */ + OperationId operationId; + + /* type of the object (e.g. shard placement) */ + CleanupObject objectType; + + /* fully qualified name of the object */ + char *objectName; + + /* node grou ID on which the object is located */ + int nodeGroupId; + + /* cleanup policy */ + CleanupPolicy policy; +} CleanupRecord; /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_cleanup_orphaned_shards); PG_FUNCTION_INFO_V1(isolation_cleanup_orphaned_shards); -static bool TryDropShard(GroupShardPlacement *placement); +static bool TryDropShardOutsideTransaction(char *qualifiedTableName, char *nodeName, int nodePort); static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode); +/* Functions for cleanup infrastructure */ +static CleanupRecord * TupleToCleanupRecord(HeapTuple heapTuple, + TupleDesc + tupleDescriptor); +static OperationId GetNextOperationId(void); +static uint64 GetNextCleanupRecordId(void); +static void LockOperationId(OperationId operationId); +static bool TryLockOperationId(OperationId operationId); +static void DeleteCleanupRecordByRecordId(uint64 recordId); +static void DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId); +static bool CleanupRecordExists(uint64 recordId); +static List * ListCleanupRecords(void); +static List * ListCleanupRecordsForCurrentOperation(void); +static int DropOrphanedShardsForCleanup(void); /* * citus_cleanup_orphaned_shards implements a user-facing UDF to delete @@ -115,6 +167,7 @@ TryDropOrphanedShards(bool waitForLocks) PG_TRY(); { droppedShardCount = DropOrphanedShards(waitForLocks); + droppedShardCount += DropOrphanedShardsForCleanup(); } PG_CATCH(); { @@ -132,6 +185,65 @@ TryDropOrphanedShards(bool waitForLocks) } +static int +DropOrphanedShardsForCleanup() +{ + List *cleanupRecordList = ListCleanupRecords(); + + int removedShardCountForCleanup = 0; + int failedShardCountForCleanup = 0; + CleanupRecord *record = NULL; + + foreach_ptr(record, cleanupRecordList) + { + /* We only supporting cleaning shards right now */ + if (record->objectType != CLEANUP_SHARD_PLACEMENT) + { + ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ", + record->objectType))); + continue; + } + + if (!TryLockOperationId(record->operationId)) + { + /* operation that the cleanup record is part of is still running */ + continue; + } + + char *qualifiedTableName = record->objectName; + WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId); + if (!CleanupRecordExists(record->recordId)) + { + /* + * The operation completed successfully just after we called + * ListCleanupRecords in which case the record is now gone. + */ + continue; + } + + if (TryDropShardOutsideTransaction(qualifiedTableName, workerNode->workerName, + workerNode->workerPort)) + { + /* delete the cleanup record */ + DeleteCleanupRecordByRecordId(record->recordId); + removedShardCountForCleanup++; + } + else + { + failedShardCountForCleanup++; + } + } + + if (failedShardCountForCleanup > 0) + { + ereport(WARNING, (errmsg("Failed to drop %d cleanup shards out of %d", + failedShardCountForCleanup, list_length(cleanupRecordList)))); + } + + return removedShardCountForCleanup; +} + + /* * DropOrphanedShards removes shards that were marked SHARD_STATE_TO_DELETE before. * @@ -155,7 +267,6 @@ int DropOrphanedShards(bool waitForLocks) { int removedShardCount = 0; - ListCell *shardPlacementCell = NULL; /* * We should try to take the highest lock that we take @@ -185,19 +296,26 @@ DropOrphanedShards(bool waitForLocks) int failedShardDropCount = 0; List *shardPlacementList = AllShardPlacementsWithShardPlacementState( SHARD_STATE_TO_DELETE); - foreach(shardPlacementCell, shardPlacementList) - { - GroupShardPlacement *placement = (GroupShardPlacement *) lfirst( - shardPlacementCell); + GroupShardPlacement *placement = NULL; + foreach_ptr(placement, shardPlacementList) + { if (!PrimaryNodeForGroup(placement->groupId, NULL) || !ShardExists(placement->shardId)) { continue; } - if (TryDropShard(placement)) + ShardPlacement *shardPlacement = LoadShardPlacement(placement->shardId, + placement->placementId); + ShardInterval *shardInterval = LoadShardInterval(placement->shardId); + char *qualifiedTableName = ConstructQualifiedShardName(shardInterval); + + if (TryDropShardOutsideTransaction(qualifiedTableName, shardPlacement->nodeName, + shardPlacement->nodePort)) { + /* delete the actual placement */ + DeleteShardPlacementRow(placement->placementId); removedShardCount++; } else @@ -216,6 +334,182 @@ DropOrphanedShards(bool waitForLocks) } +OperationId +StartNewOperationNeedingCleanup(void) +{ + CurrentOperationId = GetNextOperationId(); + + LockOperationId(CurrentOperationId); + + return CurrentOperationId; +} + + +void +CompleteNewOperationNeedingCleanup(bool isSuccess) +{ + /* + * As part of operation completion: + * 1. Drop all resources of CurrentOperationId that are marked with 'CLEANUP_ALWAYS' policy and + * the respective cleanup records in seperate transaction. + * + * 2. For all resources of CurrentOperationId that are marked with 'CLEANUP_ON_FAILURE': + * a) If isSuccess = true, drop cleanup records as operation is nearing completion. + * As the operation is nearing successful completion. This is done as part of the + * same transaction so will rollback in case of potential failure later. + * + * b) If isSuccess = false, drop resource and cleanup records in a seperate transaction. + */ + + /* We must have a valid OperationId. Any operation requring cleanup + * will call StartNewOperationNeedingCleanup. + */ + Assert(CurrentOperationId != INVALID_OPERATION_ID); + + List *recordList = ListCleanupRecordsForCurrentOperation(); + + int removedShardCountOnComplete = 0; + int failedShardCountOnComplete = 0; + + CleanupRecord *record = NULL; + foreach_ptr(record, recordList) + { + /* We only supporting cleaning shards right now */ + if (record->objectType != CLEANUP_SHARD_PLACEMENT) + { + ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ", + record->objectType))); + continue; + } + + if (record->policy == CLEANUP_ALWAYS || + (record->policy == CLEANUP_ON_FAILURE && !isSuccess)) + { + char *qualifiedTableName = record->objectName; + WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId); + + if (TryDropShardOutsideTransaction(qualifiedTableName, workerNode->workerName, + workerNode->workerPort)) + { + DeleteCleanupRecordByRecordIdOutsideTransaction(record->recordId); + removedShardCountOnComplete++; + } + else + { + failedShardCountOnComplete++; + } + } + else if (record->policy == CLEANUP_ON_FAILURE && isSuccess) + { + DeleteCleanupRecordByRecordId(record->recordId); + } + } +} + + +/* + * InsertCleanupRecordInCurrentTransaction inserts a new pg_dist_cleanup_record entry + * as part of the current transaction. This is primarily useful for deferred drop scenarios, + * since these records would roll back in case of operation failure. + */ +void +InsertCleanupRecordInCurrentTransaction(CleanupObject objectType, + char *objectName, + int nodeGroupId, + CleanupPolicy policy) +{ + /* We must have a valid OperationId. Any operation requring cleanup + * will call StartNewOperationNeedingCleanup. + */ + Assert(CurrentOperationId != INVALID_OPERATION_ID); + + Datum values[Natts_pg_dist_cleanup]; + bool isNulls[Natts_pg_dist_cleanup]; + + /* form new shard tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + uint64 recordId = GetNextCleanupRecordId(); + OperationId operationId = CurrentOperationId; + + values[Anum_pg_dist_cleanup_record_id - 1] = UInt64GetDatum(recordId); + values[Anum_pg_dist_cleanup_operation_id - 1] = UInt64GetDatum(operationId); + values[Anum_pg_dist_cleanup_object_type - 1] = Int32GetDatum(objectType); + values[Anum_pg_dist_cleanup_object_name - 1] = CStringGetTextDatum(objectName); + values[Anum_pg_dist_cleanup_node_group_id - 1] = Int32GetDatum(nodeGroupId); + values[Anum_pg_dist_cleanup_policy_type -1] = Int32GetDatum(policy); + + /* open cleanup relation and insert new tuple */ + Oid relationId = DistCleanupRelationId(); + Relation pgDistCleanup = table_open(relationId, RowExclusiveLock); + + TupleDesc tupleDescriptor = RelationGetDescr(pgDistCleanup); + HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + CatalogTupleInsert(pgDistCleanup, heapTuple); + + CommandCounterIncrement(); + table_close(pgDistCleanup, NoLock); +} + + +/* + * InsertCleanupRecordInSeparateTransaction inserts a new pg_dist_cleanup_record entry + * in a separate transaction to ensure the record persists after rollback. We should + * delete these records if the operation completes successfully. + * + * For failure scenarios, use a subtransaction (direct insert via localhost). + */ +void +InsertCleanupRecordInSubtransaction(CleanupObject objectType, + char *objectName, + int nodeGroupId, + CleanupPolicy policy) +{ + /* We must have a valid OperationId. Any operation requring cleanup + * will call StartNewOperationNeedingCleanup. + */ + Assert(CurrentOperationId != INVALID_OPERATION_ID); + + StringInfo command = makeStringInfo(); + appendStringInfo(command, + "INSERT INTO %s.%s " + " (operation_id, object_type, object_name, node_group_id, policy_type) " + " VALUES (" UINT64_FORMAT ", %d, %s, %d, %d)", + PG_CATALOG, + PG_DIST_CLEANUP, + CurrentOperationId, + objectType, + quote_literal_cstr(objectName), + nodeGroupId, + policy); + + SendCommandListToWorkerOutsideTransaction(LocalHostName, + PostPortNumber, + CitusExtensionOwnerName(), + list_make1(command->data)); +} + + +static void +DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId) +{ + StringInfo command = makeStringInfo(); + appendStringInfo(command, + "DELETE FROM %s.%s " + "WHERE record_id = %lu", + PG_CATALOG, + PG_DIST_CLEANUP, + recordId); + + SendCommandListToWorkerOutsideTransaction(LocalHostName, + PostPortNumber, + CitusExtensionOwnerName(), + list_make1(command->data)); +} + + /* * TryLockRelationAndPlacementCleanup tries to lock the given relation * and the placement cleanup. If it cannot, it returns false. @@ -245,20 +539,14 @@ TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode) * true on success. */ static bool -TryDropShard(GroupShardPlacement *placement) +TryDropShardOutsideTransaction(char *qualifiedTableName, char *nodeName, int nodePort) { - ShardPlacement *shardPlacement = LoadShardPlacement(placement->shardId, - placement->placementId); - ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId); - - ereport(LOG, (errmsg("dropping shard placement " INT64_FORMAT " of shard " - INT64_FORMAT " on %s:%d after it was moved away", - shardPlacement->placementId, shardPlacement->shardId, - shardPlacement->nodeName, shardPlacement->nodePort))); + ereport(LOG, (errmsg("dropping shard placement %s " + "on %s:%d after it was moved away", + qualifiedTableName, nodeName, nodePort))); /* prepare sql query to execute to drop the shard */ StringInfo dropQuery = makeStringInfo(); - char *qualifiedTableName = ConstructQualifiedShardName(shardInterval); appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, qualifiedTableName); /* @@ -274,15 +562,250 @@ TryDropShard(GroupShardPlacement *placement) dropQuery->data); /* remove the shard from the node */ - bool success = - SendOptionalCommandListToWorkerOutsideTransaction(shardPlacement->nodeName, - shardPlacement->nodePort, - NULL, dropCommandList); - if (success) - { - /* delete the actual placement */ - DeleteShardPlacementRow(placement->placementId); - } + bool success = true; + SendOptionalCommandListToWorkerOutsideTransaction(nodeName, + nodePort, + NULL, + dropCommandList); return success; } + + +/* + * GetNextOperationId allocates and returns a unique operationId for an operation + * requiring potential cleanup. This allocation occurs both in shared memory and + * in write ahead logs; writing to logs avoids the risk of having operationId collisions. + */ +static OperationId +GetNextOperationId() +{ + OperationId operationdId = INVALID_OPERATION_ID; + + /* + * In regression tests, we would like to generate operation IDs consistently + * even if the tests run in parallel. Instead of the sequence, we can use + * the next_operation_id GUC to specify which operation ID the current session should + * generate next. The GUC is automatically increased by 1 every time a new + * operation ID is generated. + */ + if (NextOperationId > 0) + { + operationdId = NextOperationId; + NextOperationId += 1; + + return operationdId; + } + + /* token location, or -1 if unknown */ + const int location = -1; + RangeVar *sequenceName = makeRangeVar(PG_CATALOG, + OPERATIONID_SEQUENCE_NAME, + location); + + bool missingOK = false; + Oid sequenceId = RangeVarGetRelid(sequenceName, NoLock, missingOK); + + bool checkPermissions = true; + operationdId = nextval_internal(sequenceId, checkPermissions); + + return operationdId; +} + + +/* + * ListCleanupRecords lists all the current cleanup records. + */ +static List * +ListCleanupRecords(void) +{ + Relation pgDistCleanup = table_open(DistCleanupRelationId(), AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistCleanup); + + List *recordList = NIL; + int scanKeyCount = 0; + bool indexOK = false; + + SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanup, InvalidOid, + indexOK, NULL, scanKeyCount, NULL); + + HeapTuple heapTuple = NULL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + CleanupRecord *record = TupleToCleanupRecord(heapTuple, tupleDescriptor); + recordList = lappend(recordList, record); + } + + systable_endscan(scanDescriptor); + table_close(pgDistCleanup, NoLock); + + return recordList; +} + + +/* + * ListCleanupRecordsForCurrentOperation lists all the current cleanup records. + */ +static List * +ListCleanupRecordsForCurrentOperation(void) +{ + return NULL; +} + +/* + * TupleToCleanupRecord converts a pg_dist_cleanup record tuple into a CleanupRecord struct. + */ +static CleanupRecord * +TupleToCleanupRecord(HeapTuple heapTuple, TupleDesc tupleDescriptor) +{ + Datum datumArray[Natts_pg_dist_cleanup]; + bool isNullArray[Natts_pg_dist_cleanup]; + heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); + + CleanupRecord *record = palloc0(sizeof(CleanupRecord)); + + record->recordId = + DatumGetUInt64(datumArray[Anum_pg_dist_cleanup_record_id - 1]); + + record->objectType = + DatumGetInt32(datumArray[Anum_pg_dist_cleanup_object_type - 1]); + + record->objectName = + TextDatumGetCString(datumArray[Anum_pg_dist_cleanup_object_name - 1]); + + record->nodeGroupId = + DatumGetInt32(datumArray[Anum_pg_dist_cleanup_node_group_id - 1]); + + record->policy = + DatumGetInt32(datumArray[Anum_pg_dist_cleanup_policy_type - 1]); + + return record; +} + + +/* + * CleanupRecordExists returns whether a cleanup record with the given + * record ID exists in pg_dist_cleanup_record. + */ +static bool +CleanupRecordExists(uint64 recordId) +{ + Relation pgDistCleanup = table_open(DistCleanupRelationId(), + AccessShareLock); + + const int scanKeyCount = 1; + ScanKeyData scanKey[1]; + bool indexOK = true; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_cleanup_record_id, + BTEqualStrategyNumber, F_INT8EQ, UInt64GetDatum(recordId)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanup, + DistCleanupPrimaryKeyIndexId(), + indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + bool recordExists = HeapTupleIsValid(heapTuple); + + systable_endscan(scanDescriptor); + + CommandCounterIncrement(); + table_close(pgDistCleanup, NoLock); + + return recordExists; +} + + +/* + * DeleteCleanupRecordByRecordId deletes a single pg_dist_cleanup_record entry. + */ +static void +DeleteCleanupRecordByRecordId(uint64 recordId) +{ + Relation pgDistCleanup = table_open(DistCleanupRelationId(), + RowExclusiveLock); + + const int scanKeyCount = 1; + ScanKeyData scanKey[1]; + bool indexOK = true; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_cleanup_record_id, + BTEqualStrategyNumber, F_INT8EQ, UInt64GetDatum(recordId)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanup, + DistCleanupPrimaryKeyIndexId(), + indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (heapTuple == NULL) + { + ereport(ERROR, (errmsg("could not find cleanup record " UINT64_FORMAT, + recordId))); + } + + simple_heap_delete(pgDistCleanup, &heapTuple->t_self); + + systable_endscan(scanDescriptor); + + CommandCounterIncrement(); + table_close(pgDistCleanup, NoLock); +} + + +/* + * GetNextCleanupRecordId allocates and returns a unique recordid for a cleanup entry. + * This allocation occurs both in shared memory and + * in write ahead logs; writing to logs avoids the risk of having operationId collisions. + */ +static uint64 +GetNextCleanupRecordId(void) +{ + uint64 recordId = INVALID_CLEANUP_RECORD_ID; + /* + * In regression tests, we would like to generate record IDs consistently + * even if the tests run in parallel. Instead of the sequence, we can use + * the next_record_id GUC to specify which recordid ID the current session should + * generate next. The GUC is automatically increased by 1 every time a new + * record ID is generated. + */ + if (NextCleanupRecordId > 0) + { + recordId = NextCleanupRecordId; + NextCleanupRecordId += 1; + + return recordId; + } + + RangeVar *sequenceName = makeRangeVar(PG_CATALOG, + CLEANUPRECORDID_SEQUENCE_NAME, + -1); + + bool missingOK = false; + Oid sequenceId = RangeVarGetRelid(sequenceName, NoLock, missingOK); + bool checkPermissions = true; + return nextval_internal(sequenceId, checkPermissions); +} + + +static void +LockOperationId(OperationId operationId) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = false; + SET_LOCKTAG_CLEANUP_OPERATION_ID(tag, operationId); + (void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); +} + +static bool +TryLockOperationId(OperationId operationId) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = true; + SET_LOCKTAG_CLEANUP_OPERATION_ID(tag, operationId); + bool lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); + return lockAcquired; +} diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index a8fb01655..a77b30812 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -33,6 +33,7 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" +#include "distributed/shard_cleaner.h" #include "distributed/shared_library_init.h" #include "distributed/pg_dist_shard.h" #include "distributed/metadata_sync.h" @@ -43,6 +44,9 @@ #include "distributed/shard_rebalancer.h" #include "postmaster/postmaster.h" +/* declarations for dynamic loading */ +bool DeferShardDeleteOnSplit = true; + /* * Entry for map that tracks ShardInterval -> Placement Node * created by split workflow. @@ -72,15 +76,13 @@ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, List *shardSplitPointsList, List *nodeIdsForPlacementList); static void CreateAndCopySplitShardsForShardGroup( - HTAB *mapOfShardToPlacementCreatedByWorkflow, WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList); -static void CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, - List *shardGroupSplitIntervalListList, +static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList); -static void CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement, +static void CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, WorkerNode *sourceWorkerNode, @@ -89,7 +91,7 @@ static HTAB * CreateWorkerForPlacementSet(List *workersForPlacementList); static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList, bool includeReplicaIdentity); -static void CreateReplicaIdentitiesForDummyShards(HTAB *mapOfDummyShardToPlacement); +static void CreateReplicaIdentitiesForDummyShards(HTAB *mapOfPlacementToDummyShardList); static void CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerNode); static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, @@ -121,8 +123,6 @@ static void CreatePartitioningHierarchy(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList); -static void TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow); -static HTAB * CreateEmptyMapForShardsCreatedByWorkflow(); static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode); static StringInfo CreateSplitShardReplicationSetupUDF( List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, @@ -133,10 +133,8 @@ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList); -static void AddDummyShardEntryInMap(HTAB *mapOfDummyShards, uint32 targetNodeId, +static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32 targetNodeId, ShardInterval *shardInterval); -static void DropDummyShards(HTAB *mapOfDummyShardToPlacement); -static void DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval); static uint64 GetNextShardIdForSplitChild(void); /* Customize error message strings based on operation type */ @@ -431,6 +429,9 @@ SplitShard(SplitMode splitMode, lappend(workersForPlacementList, (void *) workerNode); } + /* Start operation to prepare for generating cleanup records */ + StartNewOperationNeedingCleanup(); + if (splitMode == BLOCKING_SPLIT) { EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_BLOCK_WRITES); @@ -450,70 +451,9 @@ SplitShard(SplitMode splitMode, PlacementMovedUsingLogicalReplicationInTX = true; } -} - -/* - * ShardIntervalHashCode computes the hash code for a Shardinterval using - * shardId. - */ -static uint32 -ShardIntervalHashCode(const void *key, Size keySize) -{ - const ShardInterval *shardInterval = (const ShardInterval *) key; - const uint64 *shardId = &(shardInterval->shardId); - - /* standard hash function outlined in Effective Java, Item 8 */ - uint32 result = 17; - result = 37 * result + tag_hash(shardId, sizeof(uint64)); - - return result; -} - - -/* - * ShardIntervalHashCompare compares two shard intervals using shard id. - */ -static int -ShardIntervalHashCompare(const void *lhsKey, const void *rhsKey, Size keySize) -{ - const ShardInterval *intervalLhs = (const ShardInterval *) lhsKey; - const ShardInterval *intervalRhs = (const ShardInterval *) rhsKey; - - int shardIdCompare = 0; - - /* first, compare by shard id */ - if (intervalLhs->shardId < intervalRhs->shardId) - { - shardIdCompare = -1; - } - else if (intervalLhs->shardId > intervalRhs->shardId) - { - shardIdCompare = 1; - } - - return shardIdCompare; -} - - -/* Create an empty map that tracks ShardInterval -> Placement Node as created by workflow */ -static HTAB * -CreateEmptyMapForShardsCreatedByWorkflow() -{ - HASHCTL info = { 0 }; - info.keysize = sizeof(ShardInterval); - info.entrysize = sizeof(ShardCreatedByWorkflowEntry); - info.hash = ShardIntervalHashCode; - info.match = ShardIntervalHashCompare; - info.hcxt = CurrentMemoryContext; - - /* we don't have value field as it's a set */ - info.entrysize = info.keysize; - uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); - - HTAB *splitChildrenCreatedByWorkflow = hash_create("Shard id to Node Placement Map", - 32, &info, hashFlags); - return splitChildrenCreatedByWorkflow; + bool isSuccess = true; + CompleteNewOperationNeedingCleanup(isSuccess); } @@ -549,9 +489,6 @@ BlockingShardSplit(SplitOperation splitOperation, WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, false /* missingOk */); - - HTAB *mapOfShardToPlacementCreatedByWorkflow = - CreateEmptyMapForShardsCreatedByWorkflow(); PG_TRY(); { /* @@ -560,7 +497,6 @@ BlockingShardSplit(SplitOperation splitOperation, * Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints). */ CreateAndCopySplitShardsForShardGroup( - mapOfShardToPlacementCreatedByWorkflow, sourceShardToCopyNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, @@ -601,7 +537,8 @@ BlockingShardSplit(SplitOperation splitOperation, ShutdownAllConnections(); /* Do a best effort cleanup of shards created on workers in the above block */ - TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow); + bool isSuccess = false; + CompleteNewOperationNeedingCleanup(isSuccess); PG_RE_THROW(); } @@ -613,8 +550,7 @@ BlockingShardSplit(SplitOperation splitOperation, /* Create ShardGroup split children on a list of corresponding workers. */ static void -CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, - List *shardGroupSplitIntervalListList, +CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { /* @@ -642,16 +578,15 @@ CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, splitShardCreationCommandList, shardInterval->shardId); + /* Log resource for cleanup in case of failure only. */ + CleanupPolicy policy = CLEANUP_ON_FAILURE; + InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, + ConstructQualifiedShardName(shardInterval), + workerPlacementNode->groupId, + policy); + /* Create new split child shard on the specified placement list */ CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); - - ShardCreatedByWorkflowEntry entry; - entry.shardIntervalKey = shardInterval; - entry.workerNodeValue = workerPlacementNode; - bool found = false; - hash_search(mapOfShardToPlacementCreatedByWorkflow, &entry, HASH_ENTER, - &found); - Assert(!found); } } } @@ -735,14 +670,12 @@ CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, * on a list of corresponding workers. */ static void -CreateAndCopySplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, - WorkerNode *sourceShardNode, +CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList) { - CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow, - shardGroupSplitIntervalListList, + CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList, workersForPlacementList); /* For Blocking split, copy isn't snapshotted */ @@ -1197,20 +1130,38 @@ DropShardList(List *shardIntervalList) /* get shard name */ char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); - char storageType = shardInterval->storageType; - if (storageType == SHARD_STORAGE_TABLE) + if (DeferShardDeleteOnSplit) { - appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, - qualifiedShardName); - } - else if (storageType == SHARD_STORAGE_FOREIGN) - { - appendStringInfo(dropQuery, DROP_FOREIGN_TABLE_COMMAND, - qualifiedShardName); - } - /* drop old shard */ - SendCommandToWorker(workerName, workerPort, dropQuery->data); + /* Log shard in pg_dist_cleanup. + * Parent shards are to be dropped only on sucess after split workflow is complete, + * so mark the policy as 'CLEANUP_DEFERRED_ON_SUCCESS'. + * We also log cleanup record in the current transaction. If the current transaction rolls back, + * we do not generate a record at all. + */ + CleanupPolicy policy = CLEANUP_DEFERRED_ON_SUCCESS; + InsertCleanupRecordInCurrentTransaction(CLEANUP_SHARD_PLACEMENT, + qualifiedShardName, + placement->groupId, + policy); + } + else + { + char storageType = shardInterval->storageType; + if (storageType == SHARD_STORAGE_TABLE) + { + appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, + qualifiedShardName); + } + else if (storageType == SHARD_STORAGE_FOREIGN) + { + appendStringInfo(dropQuery, DROP_FOREIGN_TABLE_COMMAND, + qualifiedShardName); + } + + /* drop old shard */ + SendCommandToWorker(workerName, workerPort, dropQuery->data); + } } /* delete shard row */ @@ -1219,50 +1170,6 @@ DropShardList(List *shardIntervalList) } -/* - * In case of failure, TryDropSplitShardsOnFailure drops in-progress shard placements from both the - * coordinator and mx nodes. - */ -static void -TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow) -{ - HASH_SEQ_STATUS status; - ShardCreatedByWorkflowEntry *entry; - - hash_seq_init(&status, mapOfShardToPlacementCreatedByWorkflow); - while ((entry = (ShardCreatedByWorkflowEntry *) hash_seq_search(&status)) != 0) - { - ShardInterval *shardInterval = entry->shardIntervalKey; - WorkerNode *workerPlacementNode = entry->workerNodeValue; - - char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); - StringInfo dropShardQuery = makeStringInfo(); - - /* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */ - appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND, - qualifiedShardName); - - int connectionFlags = FOR_DDL; - connectionFlags |= OUTSIDE_TRANSACTION; - MultiConnection *connnection = GetNodeUserDatabaseConnection( - connectionFlags, - workerPlacementNode->workerName, - workerPlacementNode->workerPort, - CurrentUserName(), - NULL /* databaseName */); - - /* - * Perform a drop in best effort manner. - * The shard may or may not exist and the connection could have died. - */ - ExecuteOptionalRemoteCommand( - connnection, - dropShardQuery->data, - NULL /* pgResult */); - } -} - - /* * SplitShard API to split a given shard (or shard group) in non-blocking fashion * based on specified split points to a set of destination nodes. @@ -1308,11 +1215,6 @@ NonBlockingShardSplit(SplitOperation splitOperation, databaseName); ClaimConnectionExclusively(sourceConnection); - HTAB *mapOfShardToPlacementCreatedByWorkflow = - CreateEmptyMapForShardsCreatedByWorkflow(); - - HTAB *mapOfDummyShardToPlacement = CreateSimpleHash(NodeAndOwner, - GroupedShardSplitInfos); MultiConnection *sourceReplicationConnection = GetReplicationConnection(sourceShardToCopyNode->workerName, sourceShardToCopyNode->workerPort); @@ -1321,8 +1223,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, PG_TRY(); { /* 1) Physically create split children. */ - CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow, - shardGroupSplitIntervalListList, + CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList, workersForPlacementList); /* @@ -1330,8 +1231,10 @@ NonBlockingShardSplit(SplitOperation splitOperation, * Refer to the comment section of 'CreateDummyShardsForShardGroup' for indepth * information. */ + HTAB *mapOfPlacementToDummyShardList = CreateSimpleHash(NodeAndOwner, + GroupedShardSplitInfos); CreateDummyShardsForShardGroup( - mapOfDummyShardToPlacement, + mapOfPlacementToDummyShardList, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, sourceShardToCopyNode, @@ -1346,7 +1249,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, * initial COPY phase, like we do for the replica identities on the * target shards. */ - CreateReplicaIdentitiesForDummyShards(mapOfDummyShardToPlacement); + CreateReplicaIdentitiesForDummyShards(mapOfPlacementToDummyShardList); /* 4) Create Publications. */ CreatePublications(sourceConnection, publicationInfoHash); @@ -1468,11 +1371,6 @@ NonBlockingShardSplit(SplitOperation splitOperation, CreateForeignKeyConstraints(shardGroupSplitIntervalListList, workersForPlacementList); - /* - * 23) Drop dummy shards. - */ - DropDummyShards(mapOfDummyShardToPlacement); - /* 24) Close source connection */ CloseConnection(sourceConnection); @@ -1487,12 +1385,14 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* end ongoing transactions to enable us to clean up */ ShutdownAllConnections(); - /* Do a best effort cleanup of shards created on workers in the above block */ - TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow); - + /* Do a best effort cleanup of shards created on workers in the above block + * TODO(niupre): We don't need to do this once shard cleaner can clean replication + * artifacts. + */ DropAllLogicalReplicationLeftovers(SHARD_SPLIT); - DropDummyShards(mapOfDummyShardToPlacement); + bool isSuccess = false; + CompleteNewOperationNeedingCleanup(isSuccess); PG_RE_THROW(); } @@ -1523,7 +1423,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, * Note 2 : Given there is an overlap of source and destination in Worker0, Shard1_1 and Shard2_1 need not be created. */ static void -CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement, +CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, WorkerNode *sourceWorkerNode, @@ -1560,11 +1460,20 @@ CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement, splitShardCreationCommandList, shardInterval->shardId); + /* Log shard in pg_dist_cleanup. Given dummy shards are transient resources, + * we want to cleanup irrespective of operation success or failure. + */ + CleanupPolicy policy = CLEANUP_ALWAYS; + InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, + ConstructQualifiedShardName(shardInterval), + workerPlacementNode->groupId, + policy); + /* Create dummy source shard on the specified placement list */ CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); /* Add dummy source shard entry created for placement node in map */ - AddDummyShardEntryInMap(mapOfDummyShardToPlacement, + AddDummyShardEntryInMap(mapOfPlacementToDummyShardList, workerPlacementNode->nodeId, shardInterval); } @@ -1595,12 +1504,17 @@ CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement, splitShardCreationCommandList, shardInterval->shardId); + /* Log shard in pg_dist_cleanup. Given dummy shards are transient resources, + * we want to cleanup irrespective of operation success or failure. + */ + CleanupPolicy policy = CLEANUP_ALWAYS; + InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, + ConstructQualifiedShardName(shardInterval), + workerPlacementNode->groupId, + policy); + /* Create dummy split child shard on source worker node */ CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode); - - /* Add dummy split child shard entry created on source node */ - AddDummyShardEntryInMap(mapOfDummyShardToPlacement, sourceWorkerNode->nodeId, - shardInterval); } } } @@ -1822,7 +1736,8 @@ ParseReplicationSlotInfoFromResult(PGresult *result) * of logical replication. We cautiously delete only the dummy shards added in the DummyShardHashMap. */ static void -AddDummyShardEntryInMap(HTAB *mapOfDummyShardToPlacement, uint32 targetNodeId, +AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, + uint32 targetNodeId, ShardInterval *shardInterval) { NodeAndOwner key; @@ -1831,7 +1746,8 @@ AddDummyShardEntryInMap(HTAB *mapOfDummyShardToPlacement, uint32 targetNodeId, bool found = false; GroupedDummyShards *nodeMappingEntry = - (GroupedDummyShards *) hash_search(mapOfDummyShardToPlacement, &key, + (GroupedDummyShards *) hash_search(mapOfPlacementToDummyShardList, + &key, HASH_ENTER, &found); if (!found) @@ -1844,68 +1760,6 @@ AddDummyShardEntryInMap(HTAB *mapOfDummyShardToPlacement, uint32 targetNodeId, } -/* - * DropDummyShards traverses the dummy shard map and drops shard at given node. - * It fails if the shard cannot be dropped. - */ -static void -DropDummyShards(HTAB *mapOfDummyShardToPlacement) -{ - HASH_SEQ_STATUS status; - hash_seq_init(&status, mapOfDummyShardToPlacement); - - GroupedDummyShards *entry = NULL; - while ((entry = (GroupedDummyShards *) hash_seq_search(&status)) != NULL) - { - uint32 nodeId = entry->key.nodeId; - WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId, - false /* missingOk */); - - int connectionFlags = FOR_DDL; - connectionFlags |= OUTSIDE_TRANSACTION; - MultiConnection *connection = GetNodeUserDatabaseConnection( - connectionFlags, - shardToBeDroppedNode->workerName, - shardToBeDroppedNode->workerPort, - CurrentUserName(), - NULL /* databaseName */); - - List *dummyShardIntervalList = entry->shardIntervals; - ShardInterval *shardInterval = NULL; - foreach_ptr(shardInterval, dummyShardIntervalList) - { - DropDummyShard(connection, shardInterval); - } - - CloseConnection(connection); - } -} - - -/* - * DropDummyShard drops a given shard on the node connection. - * It fails if the shard cannot be dropped. - */ -static void -DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval) -{ - char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); - StringInfo dropShardQuery = makeStringInfo(); - - /* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */ - appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND, - qualifiedShardName); - - /* - * Since the dummy shard is expected to be present on the given node, - * fail if it cannot be dropped during cleanup. - */ - ExecuteCriticalRemoteCommand( - connection, - dropShardQuery->data); -} - - /* * CreateReplicaIdentitiesForDummyShards creates replica indentities for split * dummy shards. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index dd79f8d1e..b5d0ca27f 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -980,6 +980,25 @@ RegisterCitusConfigVariables(void) 0, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.defer_drop_after_shard_split", + gettext_noop("When enabled a shard split will mark the original shards " + "for deletion after a successful split, instead of deleting " + "them right away."), + gettext_noop("The deletion of a shard can sometimes run into a conflict with a " + "long running transactions on a the shard during the drop phase of " + "the shard split. This causes some splits to be rolled back after " + "resources have been spend on moving the shard. To prevent " + "conflicts this feature lets you skip the actual deletion till a " + "later point in time. When used one should set " + "citus.defer_shard_delete_interval to make sure defered deletions " + "will be executed"), + &DeferShardDeleteOnSplit, + true, + PGC_USERSET, + 0, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.defer_shard_delete_interval", gettext_noop("Sets the time to wait between background deletion for shards."), @@ -1817,6 +1836,36 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.next_operation_id", + gettext_noop("Set the next operation ID to use in operation creation."), + gettext_noop("Operation IDs are normally generated using a sequence. If " + "next_operation_id is set to a non-zero value, operation IDs will " + "instead be generated by incrementing from the value of " + "this GUC and this will be reflected in the GUC. This is " + "mainly useful to ensure consistent operation IDs when running " + "tests in parallel."), + &NextOperationId, + 0, 0, INT_MAX, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + + DefineCustomIntVariable( + "citus.next_cleanup_record_id", + gettext_noop("Set the next cleanup record ID to use in operation creation."), + gettext_noop("Cleanup record IDs are normally generated using a sequence. If " + "next_cleanup_record_id is set to a non-zero value, cleanup record IDs will " + "instead be generated by incrementing from the value of " + "this GUC and this will be reflected in the GUC. This is " + "mainly useful to ensure consistent cleanup record IDs when running " + "tests in parallel."), + &NextCleanupRecordId, + 0, 0, INT_MAX, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.node_connection_timeout", gettext_noop("Sets the maximum duration to connect to worker nodes."), diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 4e58bdd51..8107f6adc 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -76,3 +76,21 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" + +-- Table of records to: +-- 1) Cleanup leftover resources after a failure +-- 2) Deferred drop of old shard placements after a split. +CREATE TABLE citus.pg_dist_cleanup ( + record_id bigserial primary key, + operation_id bigint not null, + object_type int not null, + object_name text not null, + node_group_id int not null, + policy_type int not null +); +ALTER TABLE citus.pg_dist_cleanup SET SCHEMA pg_catalog; + + +-- Sequence used to generate an operation ID for use in pg_dist_cleanup_record. +CREATE SEQUENCE citus.pg_dist_operation_id_seq; +ALTER SEQUENCE citus.pg_dist_operation_id_seq SET SCHEMA pg_catalog; diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index d3f41273d..82c3fb5f7 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -38,6 +38,7 @@ #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" #include "distributed/shared_connection_stats.h" +#include "distributed/shard_cleaner.h" #include "distributed/subplan_execution.h" #include "distributed/version_compat.h" #include "distributed/worker_log_messages.h" @@ -600,6 +601,7 @@ ResetGlobalVariables() NodeMetadataSyncOnCommit = false; InTopLevelDelegatedFunctionCall = false; InTableTypeConversionFunctionCall = false; + CurrentOperationId = INVALID_OPERATION_ID; ResetWorkerErrorIndication(); memset(&AllowedDistributionColumnValue, 0, sizeof(AllowedDistributionColumn)); diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 35991ade1..b1818285a 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -508,7 +508,11 @@ LockPlacementCleanup(void) LOCKTAG tag; const bool sessionLock = false; const bool dontWait = false; - SET_LOCKTAG_PLACEMENT_CLEANUP(tag); + + /* Moves acquire lock with a constant operation id CITUS_SHARD_MOVE. + * This will change as we add support for parallel moves. + */ + SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_SHARD_MOVE); (void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); } @@ -523,7 +527,11 @@ TryLockPlacementCleanup(void) LOCKTAG tag; const bool sessionLock = false; const bool dontWait = true; - SET_LOCKTAG_PLACEMENT_CLEANUP(tag); + + /* Moves acquire lock with a constant operation id CITUS_SHARD_MOVE. + * This will change as we add support for parallel moves. + */ + SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_SHARD_MOVE); bool lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); return lockAcquired; } diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 92f8a4514..e646bce30 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -222,6 +222,7 @@ extern WorkerNode * LookupNodeForGroup(int32 groupId); extern Oid CitusCatalogNamespaceId(void); /* relation oids */ +extern Oid DistCleanupRelationId(void); extern Oid DistColocationRelationId(void); extern Oid DistColocationConfigurationIndexId(void); extern Oid DistPartitionRelationId(void); @@ -246,6 +247,7 @@ extern Oid DistTransactionRelationId(void); extern Oid DistTransactionGroupIndexId(void); extern Oid DistPlacementGroupidIndexId(void); extern Oid DistObjectPrimaryKeyIndexId(void); +extern Oid DistCleanupPrimaryKeyIndexId(void); /* type oids */ extern Oid LookupTypeOid(char *schemaNameSting, char *typeNameString); diff --git a/src/include/distributed/pg_dist_cleanup.h b/src/include/distributed/pg_dist_cleanup.h new file mode 100644 index 000000000..d8aa791b4 --- /dev/null +++ b/src/include/distributed/pg_dist_cleanup.h @@ -0,0 +1,34 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_cleanup.h + * definition of the relation that holds the resources to be cleaned up + * in cluster (pg_dist_cleanup). + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PG_DIST_CLEANUP_H +#define PG_DIST_CLEANUP_H + +/* ---------------- + * compiler constants for pg_dist_cleanup + * ---------------- + */ + +#define Natts_pg_dist_cleanup 6 +#define Anum_pg_dist_cleanup_record_id 1 +#define Anum_pg_dist_cleanup_operation_id 2 +#define Anum_pg_dist_cleanup_object_type 3 +#define Anum_pg_dist_cleanup_object_name 4 +#define Anum_pg_dist_cleanup_node_group_id 5 +#define Anum_pg_dist_cleanup_policy_type 6 + +#define PG_CATALOG "pg_catalog" +#define PG_DIST_CLEANUP "pg_dist_cleanup" +#define OPERATIONID_SEQUENCE_NAME "pg_dist_operationid_seq" +#define CLEANUPRECORDID_SEQUENCE_NAME "pg_dist_cleanup_record_id_seq" + +#endif /* PG_DIST_CLEANUP_H */ + diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index c808e9157..628d4de55 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -40,7 +40,7 @@ typedef enum AdvisoryLocktagClass ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7, ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8, ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9, - ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP = 10, + ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID = 10, ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13 } AdvisoryLocktagClass; @@ -48,7 +48,9 @@ typedef enum AdvisoryLocktagClass /* CitusOperations has constants for citus operations */ typedef enum CitusOperations { - CITUS_TRANSACTION_RECOVERY = 0 + CITUS_TRANSACTION_RECOVERY = 0, + + CITUS_SHARD_MOVE = 1 } CitusOperations; /* reuse advisory lock, but with different, unused field 4 (4)*/ @@ -108,12 +110,12 @@ typedef enum CitusOperations /* reuse advisory lock, but with different, unused field 4 (10) * Also it has the database hardcoded to MyDatabaseId, to ensure the locks * are local to each database */ -#define SET_LOCKTAG_PLACEMENT_CLEANUP(tag) \ +#define SET_LOCKTAG_CLEANUP_OPERATION_ID(tag, operationId) \ SET_LOCKTAG_ADVISORY(tag, \ MyDatabaseId, \ - (uint32) 0, \ - (uint32) 0, \ - ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP) + (uint32) ((operationId) >> 32), \ + (uint32) operationId, \ + ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID) /* reuse advisory lock, but with different, unused field 4 (12) * Also it has the database hardcoded to MyDatabaseId, to ensure the locks diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index 8a98254f9..7f0f1992c 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -17,8 +17,61 @@ extern bool DeferShardDeleteOnMove; extern double DesiredPercentFreeAfterMove; extern bool CheckAvailableSpaceBeforeMove; +extern bool DeferShardDeleteOnSplit; +extern int NextOperationId; +extern int NextCleanupRecordId; + extern int TryDropOrphanedShards(bool waitForLocks); extern int DropOrphanedShards(bool waitForLocks); extern void DropOrphanedShardsInSeparateTransaction(void); +/* Members for cleanup infrastructure */ +typedef uint64 OperationId; +extern OperationId CurrentOperationId; + +/* + * CleanupResource represents the Resource type in cleanup records. + */ +typedef enum CleanupObject +{ + CLEANUP_INVALID = 0, + CLEANUP_SHARD_PLACEMENT = 1 +} CleanupObject; + +typedef enum CleanupPolicy +{ + /* + * Resources that are transient and always need clean up after the operation is completed. + * (Example: Dummy Shards for Non-Blocking splits) + */ + CLEANUP_ALWAYS = 0, + + /* + * Resources that are cleanup only on failure. + * (Example: Split Children for Blocking/Non-Blocking splits) + */ + CLEANUP_ON_FAILURE = 1, + + /* + * Resources that are to be deferred cleanup only on success. + * (Example: Parent child being split for Blocking/Non-Blocking splits) + */ + CLEANUP_DEFERRED_ON_SUCCESS = 2, +} CleanupPolicy; + +#define INVALID_OPERATION_ID 0 +#define INVALID_CLEANUP_RECORD_ID 0 + +/* APIs for cleanup infrastructure */ +extern OperationId StartNewOperationNeedingCleanup(void); +extern void InsertCleanupRecordInCurrentTransaction(CleanupObject objectType, + char *objectName, + int nodeGroupId, + CleanupPolicy policy); +extern void InsertCleanupRecordInSubtransaction(CleanupObject objectType, + char *objectName, + int nodeGroupId, + CleanupPolicy policy); +extern void CompleteNewOperationNeedingCleanup(bool isSuccess); + #endif /*CITUS_SHARD_CLEANER_H */