diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 83fc33720..126763f55 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -160,6 +160,8 @@ typedef struct MetadataCacheData Oid distPlacementGroupidIndexId; Oid distTransactionRelationId; Oid distTransactionGroupIndexId; + Oid distCleanupRecordRelationId; + Oid distCleanupRecordPrimaryKeyIndexId; Oid citusCatalogNamespaceId; Oid copyFormatTypeId; Oid readIntermediateResultFuncId; @@ -2375,6 +2377,28 @@ DistRebalanceStrategyRelationId(void) } +/* return oid of pg_dist_cleanup_record relation */ +Oid +DistCleanupRecordRelationId(void) +{ + CachedRelationLookup("pg_dist_cleanup_record", + &MetadataCache.distCleanupRecordRelationId); + + return MetadataCache.distCleanupRecordRelationId; +} + + +/* return oid of pg_dist_cleanup_record primary key index */ +Oid +DistCleanupRecordPrimaryKeyIndexId(void) +{ + CachedRelationLookup("pg_dist_cleanup_record_pkey", + &MetadataCache.distCleanupRecordPrimaryKeyIndexId); + + return MetadataCache.distCleanupRecordPrimaryKeyIndexId; +} + + /* return the oid of citus namespace */ Oid CitusCatalogNamespaceId(void) diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index 0c9e7903c..8b852695a 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -11,11 +11,19 @@ */ #include "postgres.h" +#include "miscadmin.h" +#include "access/genam.h" #include "access/xact.h" +#include "catalog/namespace.h" +#include "commands/sequence.h" #include "postmaster/postmaster.h" +#include "nodes/makefuncs.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" #include "distributed/coordinator_protocol.h" +#include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/shard_cleaner.h" #include "distributed/shard_rebalancer.h" @@ -24,12 +32,52 @@ #include "distributed/worker_transaction.h" +/* + * CleanupRecord represents a record from pg_dist_cleanup_record. + */ +typedef struct CleanupRecord +{ + /* unique identifier of the record (for deletes) */ + CleanupRecordId recordId; + + /* identifier of the operation that generated the record (must not be in progress) */ + OperationId operationId; + + /* type of the object (e.g. shard placement) */ + CleanupObjectType objectType; + + /* fully qualified name of the object */ + char *objectName; + + /* node grou ID on which the object is located */ + int nodeGroupId; + + /* whether the record indicates cleanup after successful completion */ + bool isSuccess; +} CleanupRecord; + + /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_cleanup_orphaned_shards); PG_FUNCTION_INFO_V1(isolation_cleanup_orphaned_shards); -static bool TryDropShard(GroupShardPlacement *placement); +/* cleanup functions */ +static bool TryDropShard(char *qualifiedTableName, char *nodeName, int nodePort); static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode); +static List * ListCleanupRecords(void); +static uint64 GetNextCleanupRecordId(void); +static CleanupRecord * TupleToCleanupRecord(HeapTuple heapTuple, TupleDesc + tupleDescriptor); +static bool CleanupRecordExists(CleanupRecordId recordId); +static void DeleteCleanupRecordByRecordId(CleanupRecordId recordId); + +/* cleanup operation ID functions */ +static OperationId GetNextOperationId(void); +static void LockOperationId(OperationId operationId); +static bool IsOperationInProgress(OperationId operationId); + +/* operation ID set by StartCleanupOperation */ +OperationId CurrentOperationId = INVALID_OPERATION_ID; /* @@ -155,7 +203,6 @@ int DropOrphanedShards(bool waitForLocks) { int removedShardCount = 0; - ListCell *shardPlacementCell = NULL; /* * We should try to take the highest lock that we take @@ -183,21 +230,81 @@ DropOrphanedShards(bool waitForLocks) } int failedShardDropCount = 0; + + + /* + * First handle to-be-deleted placements which are generated in case + * of shard moves and deferred drop. + */ List *shardPlacementList = AllShardPlacementsWithShardPlacementState( SHARD_STATE_TO_DELETE); - foreach(shardPlacementCell, shardPlacementList) - { - GroupShardPlacement *placement = (GroupShardPlacement *) lfirst( - shardPlacementCell); + GroupShardPlacement *placement = NULL; + foreach_ptr(placement, shardPlacementList) + { if (!PrimaryNodeForGroup(placement->groupId, NULL) || !ShardExists(placement->shardId)) { continue; } - if (TryDropShard(placement)) + ShardPlacement *shardPlacement = LoadShardPlacement(placement->shardId, + placement->placementId); + ShardInterval *shardInterval = LoadShardInterval(placement->shardId); + char *qualifiedTableName = ConstructQualifiedShardName(shardInterval); + + if (TryDropShard(qualifiedTableName, shardPlacement->nodeName, + shardPlacement->nodePort)) { + /* delete the to-be-deleted placement metadata */ + DeleteShardPlacementRow(placement->placementId); + + removedShardCount++; + } + else + { + failedShardDropCount++; + } + } + + /* + * Cleanup objects listed in pg_dist_cleanup_record. + */ + List *recordList = ListCleanupRecords(); + + CleanupRecord *record = NULL; + foreach_ptr(record, recordList) + { + if (record->objectType != CLEANUP_SHARD_PLACEMENT) + { + /* we currently only clean shard placements */ + continue; + } + + if (IsOperationInProgress(record->operationId)) + { + /* operation that generated the record is still running */ + continue; + } + + char *qualifiedTableName = record->objectName; + WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId); + + if (!CleanupRecordExists(record->recordId)) + { + /* + * The operation completed successfully just after we called + * ListCleanupRecords in which case the record is now gone. + */ + continue; + } + + if (TryDropShard(qualifiedTableName, workerNode->workerName, + workerNode->workerPort)) + { + /* delete the cleanup record */ + DeleteCleanupRecordByRecordId(record->recordId); + removedShardCount++; } else @@ -245,20 +352,14 @@ TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode) * true on success. */ static bool -TryDropShard(GroupShardPlacement *placement) +TryDropShard(char *qualifiedTableName, char *nodeName, int nodePort) { - ShardPlacement *shardPlacement = LoadShardPlacement(placement->shardId, - placement->placementId); - ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId); - - ereport(LOG, (errmsg("dropping shard placement " INT64_FORMAT " of shard " - INT64_FORMAT " on %s:%d after it was moved away", - shardPlacement->placementId, shardPlacement->shardId, - shardPlacement->nodeName, shardPlacement->nodePort))); + ereport(LOG, (errmsg("dropping shard placement %s " + "on %s:%d after it was moved away", + qualifiedTableName, nodeName, nodePort))); /* prepare sql query to execute to drop the shard */ StringInfo dropQuery = makeStringInfo(); - char *qualifiedTableName = ConstructQualifiedShardName(shardInterval); appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, qualifiedTableName); /* @@ -274,15 +375,337 @@ TryDropShard(GroupShardPlacement *placement) dropQuery->data); /* remove the shard from the node */ - bool success = - SendOptionalCommandListToWorkerOutsideTransaction(shardPlacement->nodeName, - shardPlacement->nodePort, - NULL, dropCommandList); - if (success) - { - /* delete the actual placement */ - DeleteShardPlacementRow(placement->placementId); - } + bool success = true; + SendOptionalCommandListToWorkerOutsideTransaction(nodeName, + nodePort, + NULL, dropCommandList); return success; } + + +/* + * InsertCleanupRecordInCurrentTransaction inserts a new pg_dist_cleanup_record entry + * as part of the current transaction. This is primarily useful for deferred drop scenarios, + * since these records would roll back in case of failure. + * + * For failure scenarios, use a subtransaction (direct insert via localhost). + */ +void +InsertCleanupRecordInCurrentTransaction(CleanupObjectType objectType, char *objectName, + int nodeGroupId) +{ + /* StartOperationRequiringCleanup must have been called at this point */ + Assert(CurrentOperationId != INVALID_OPERATION_ID); + + Datum values[Natts_pg_dist_cleanup_record]; + bool isNulls[Natts_pg_dist_cleanup_record]; + + /* form new shard tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + CleanupRecordId recordId = GetNextCleanupRecordId(); + OperationId operationId = CurrentOperationId; + + values[Anum_pg_dist_cleanup_record_record_id - 1] = UInt64GetDatum(recordId); + values[Anum_pg_dist_cleanup_record_operation_id - 1] = UInt64GetDatum(operationId); + values[Anum_pg_dist_cleanup_record_object_type - 1] = Int32GetDatum(objectType); + values[Anum_pg_dist_cleanup_record_object_name - 1] = CStringGetTextDatum(objectName); + values[Anum_pg_dist_cleanup_record_node_group_id - 1] = Int32GetDatum(nodeGroupId); + values[Anum_pg_dist_cleanup_record_is_success - 1] = BoolGetDatum(true); + + /* open shard relation and insert new tuple */ + Oid relationId = DistCleanupRecordRelationId(); + Relation pgDistCleanupRecord = table_open(relationId, RowExclusiveLock); + + TupleDesc tupleDescriptor = RelationGetDescr(pgDistCleanupRecord); + HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + CatalogTupleInsert(pgDistCleanupRecord, heapTuple); + + CommandCounterIncrement(); + table_close(pgDistCleanupRecord, NoLock); +} + + +/* + * InsertCleanupRecordInSeparateTransaction inserts a new pg_dist_cleanup_record entry + * in a separate transaction to ensure the record persists after rollback. We should + * delete these records if the operation completes successfully. + * + * For failure scenarios, use a subtransaction (direct insert via localhost). + */ +void +InsertCleanupRecordInSubtransaction(CleanupObjectType objectType, char *objectName, + int nodeGroupId) +{ + /* StartOperationRequiringCleanup must have been called at this point */ + Assert(CurrentOperationId != INVALID_OPERATION_ID); + + StringInfo command = makeStringInfo(); + appendStringInfo(command, + "INSERT INTO pg_catalog.pg_dist_cleanup_record " + " (operation_id, object_type, object_name, node_group_id)" + " VALUES (" UINT64_FORMAT ", %d, %s, %d)", + CurrentOperationId, + objectType, + quote_literal_cstr(objectName), + nodeGroupId); + + ExecuteRebalancerCommandInSeparateTransaction(command->data); +} + + +/* + * GetNextCleanupRecordId generates a new record ID using the sequence. + */ +static CleanupRecordId +GetNextCleanupRecordId(void) +{ + RangeVar *sequenceName = makeRangeVar("pg_catalog", + "pg_dist_cleanup_record_record_id_seq", -1); + + bool missingOK = false; + Oid sequenceId = RangeVarGetRelid(sequenceName, NoLock, missingOK); + + bool checkPermissions = false; + return nextval_internal(sequenceId, checkPermissions); +} + + +/* + * ListCleanupRecords lists all the current cleanup records. + */ +static List * +ListCleanupRecords(void) +{ + Relation pgDistCleanupRecord = table_open(DistCleanupRecordRelationId(), + AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistCleanupRecord); + + List *recordList = NIL; + int scanKeyCount = 0; + bool indexOK = false; + + SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanupRecord, InvalidOid, + indexOK, NULL, scanKeyCount, NULL); + + HeapTuple heapTuple = NULL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + CleanupRecord *record = TupleToCleanupRecord(heapTuple, tupleDescriptor); + recordList = lappend(recordList, record); + } + + systable_endscan(scanDescriptor); + table_close(pgDistCleanupRecord, NoLock); + + return recordList; +} + + +/* + * TupleToCleanupRecord converts a pg_dist_cleanup_record tuple into a CleanupRecord struct. + */ +static CleanupRecord * +TupleToCleanupRecord(HeapTuple heapTuple, TupleDesc tupleDescriptor) +{ + Datum datumArray[Natts_pg_dist_cleanup_record]; + bool isNullArray[Natts_pg_dist_cleanup_record]; + heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); + + CleanupRecord *record = palloc0(sizeof(CleanupRecord)); + + record->recordId = + DatumGetUInt64(datumArray[Anum_pg_dist_cleanup_record_record_id - 1]); + + record->objectType = + DatumGetInt32(datumArray[Anum_pg_dist_cleanup_record_object_type - 1]); + + record->objectName = + TextDatumGetCString(datumArray[Anum_pg_dist_cleanup_record_object_name - 1]); + + record->nodeGroupId = + DatumGetInt32(datumArray[Anum_pg_dist_cleanup_record_node_group_id - 1]); + + record->isSuccess = + DatumGetBool(datumArray[Anum_pg_dist_cleanup_record_is_success - 1]); + + return record; +} + + +/* + * CleanupRecordExists returns whether a cleanup record with the given + * record ID exists in pg_dist_cleanup_record. + */ +static bool +CleanupRecordExists(CleanupRecordId recordId) +{ + Relation pgDistCleanupRecord = table_open(DistCleanupRecordRelationId(), + AccessShareLock); + + const int scanKeyCount = 1; + ScanKeyData scanKey[1]; + bool indexOK = true; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_cleanup_record_record_id, + BTEqualStrategyNumber, F_INT8EQ, UInt64GetDatum(recordId)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanupRecord, + DistCleanupRecordPrimaryKeyIndexId(), + indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + bool recordExists = HeapTupleIsValid(heapTuple); + + systable_endscan(scanDescriptor); + + CommandCounterIncrement(); + table_close(pgDistCleanupRecord, NoLock); + + return recordExists; +} + + +/* + * DeleteCleanupRecordByRecordId deletes a single pg_dist_cleanup_record entry. + */ +static void +DeleteCleanupRecordByRecordId(CleanupRecordId recordId) +{ + Relation pgDistCleanupRecord = table_open(DistCleanupRecordRelationId(), + RowExclusiveLock); + + const int scanKeyCount = 1; + ScanKeyData scanKey[1]; + bool indexOK = true; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_cleanup_record_record_id, + BTEqualStrategyNumber, F_INT8EQ, UInt64GetDatum(recordId)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanupRecord, + DistCleanupRecordPrimaryKeyIndexId(), + indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (heapTuple == NULL) + { + ereport(ERROR, (errmsg("could not find cleanup record " UINT64_FORMAT, + recordId))); + } + + simple_heap_delete(pgDistCleanupRecord, &heapTuple->t_self); + + systable_endscan(scanDescriptor); + + CommandCounterIncrement(); + table_close(pgDistCleanupRecord, NoLock); +} + + +/* + * DeleteCurrentCleanupRecords deletes all failure cleanup records belonging to the + * current operation. This is generally used to signal that those objects + * have been dropped already by successful completion of the transaction. + */ +void +DeleteMyCleanupOnFailureRecords(void) +{ + Relation pgDistCleanupRecord = table_open(DistCleanupRecordRelationId(), + RowExclusiveLock); + + const int scanKeyCount = 2; + ScanKeyData scanKey[2]; + bool indexOK = false; + + /* find failure records belonging to the current operation */ + OperationId operationId = CurrentOperationId; + bool isSuccess = false; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_cleanup_record_operation_id, + BTEqualStrategyNumber, F_INT8EQ, UInt64GetDatum(operationId)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_cleanup_record_is_success, + BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(isSuccess)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanupRecord, + InvalidOid, indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = NULL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + simple_heap_delete(pgDistCleanupRecord, &heapTuple->t_self); + } + + systable_endscan(scanDescriptor); + + CommandCounterIncrement(); + table_close(pgDistCleanupRecord, NoLock); +} + + +/* + * StartOperationRequiringCleanup should be called by an operation that wishes to generate + * cleanup records. + */ +OperationId +StartOperationRequiringCleanup(void) +{ + CurrentOperationId = GetNextOperationId(); + + LockOperationId(CurrentOperationId); + + return CurrentOperationId; +} + + +/* + * GetNextOperationId generates a new operation ID using the sequence. + */ +static OperationId +GetNextOperationId(void) +{ + RangeVar *sequenceName = makeRangeVar("pg_catalog", "pg_dist_operation_id_seq", -1); + + bool missingOK = false; + Oid sequenceId = RangeVarGetRelid(sequenceName, NoLock, missingOK); + + bool checkPermissions = false; + return nextval_internal(sequenceId, checkPermissions); +} + + +/* + * LockOperationId takes an exclusive lock on the operation ID to let other + * backends know that the operation is active. + */ +static void +LockOperationId(OperationId operationId) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = false; + SET_LOCKTAG_CLEANUP_OPERATION_ID(tag, operationId); + (void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); +} + + +/* + * IsOperationInProgress checks whether an operation is in progress by + * acquiring a share lock on the operation ID, which conflicts with any + * transaction that has called LockOperationId. + */ +static bool +IsOperationInProgress(OperationId operationId) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = true; + SET_LOCKTAG_CLEANUP_OPERATION_ID(tag, operationId); + bool lockAcquired = LockAcquire(&tag, ShareLock, sessionLock, dontWait); + return !lockAcquired; +} diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index a8fb01655..985a913c3 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -33,6 +33,7 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" +#include "distributed/shard_cleaner.h" #include "distributed/shared_library_init.h" #include "distributed/pg_dist_shard.h" #include "distributed/metadata_sync.h" @@ -531,6 +532,9 @@ BlockingShardSplit(SplitOperation splitOperation, List *shardSplitPointsList, List *workersForPlacementList) { + /* prepare to generate cleanup records */ + StartOperationRequiringCleanup(); + List *sourceColocatedShardIntervalList = ColocatedShardIntervalList( shardIntervalToSplit); @@ -607,6 +611,13 @@ BlockingShardSplit(SplitOperation splitOperation, } PG_END_TRY(); + /* + * Delete any cleanup records we generated in anticipation of + * successful completion. If we abort, we would still do cleanup + * of the new shards because the deletion would roll back. + */ + DeleteMyCleanupOnFailureRecords(); + CitusInvalidateRelcacheByRelid(DistShardRelationId()); } @@ -645,6 +656,12 @@ CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, /* Create new split child shard on the specified placement list */ CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); + /* in case of rollback, clean up the child shard */ + char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); + InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, + qualifiedShardName, + workerPlacementNode->groupId); + ShardCreatedByWorkflowEntry entry; entry.shardIntervalKey = shardInterval; entry.workerNodeValue = workerPlacementNode; @@ -1197,6 +1214,20 @@ DropShardList(List *shardIntervalList) /* get shard name */ char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); + if (DeferShardDeleteOnMove) + { + /* + * In case of deferred drop, we log a cleanup record in the current + * transaction. If the current transaction rolls back, we do not + * generate a record. + */ + + InsertCleanupRecordInCurrentTransaction(CLEANUP_SHARD_PLACEMENT, + qualifiedShardName, + placement->groupId); + continue; + } + char storageType = shardInterval->storageType; if (storageType == SHARD_STORAGE_TABLE) { @@ -1277,6 +1308,9 @@ NonBlockingShardSplit(SplitOperation splitOperation, List *shardSplitPointsList, List *workersForPlacementList) { + /* prepare to generate cleanup records */ + StartOperationRequiringCleanup(); + char *superUser = CitusExtensionOwnerName(); char *databaseName = get_database_name(MyDatabaseId); @@ -1497,6 +1531,13 @@ NonBlockingShardSplit(SplitOperation splitOperation, PG_RE_THROW(); } PG_END_TRY(); + + /* + * Delete any cleanup records we generated in anticipation of + * successful completion. If we abort, we would still do cleanup + * of the new shards because the deletion would roll back. + */ + DeleteMyCleanupOnFailureRecords(); } @@ -1563,6 +1604,12 @@ CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement, /* Create dummy source shard on the specified placement list */ CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); + /* in case of rollback, clean up the dummy shard on target node */ + char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); + InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, + qualifiedShardName, + workerPlacementNode->groupId); + /* Add dummy source shard entry created for placement node in map */ AddDummyShardEntryInMap(mapOfDummyShardToPlacement, workerPlacementNode->nodeId, @@ -1598,6 +1645,13 @@ CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement, /* Create dummy split child shard on source worker node */ CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode); + /* in case of rollback, clean up the dummy shard on source node */ + char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); + InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, + qualifiedShardName, + sourceWorkerNode->groupId); + + /* Add dummy split child shard entry created on source node */ AddDummyShardEntryInMap(mapOfDummyShardToPlacement, sourceWorkerNode->nodeId, shardInterval); diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 4e58bdd51..0946debe9 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -61,6 +61,8 @@ BEGIN END IF; END $check_citus$; + +#include "udfs/citus_prepare_pg_upgrade/11.1-1.sql" #include "udfs/citus_finish_pg_upgrade/11.1-1.sql" DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, @@ -76,3 +78,23 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" + +-- +-- Table of cleanup records to remove leftovers after a failure or +-- deferred drop of old shard placements after a move/split. + +CREATE TABLE citus.pg_dist_cleanup_record ( + record_id bigserial primary key, + operation_id bigint not null, + object_type int not null, + object_name text not null, + node_group_id int not null, + is_success bool not null default false +); +ALTER TABLE citus.pg_dist_cleanup_record SET SCHEMA pg_catalog; + +-- +-- Sequence used to generate an operation ID for use in pg_dist_cleanup_record. + +CREATE SEQUENCE citus.pg_dist_operation_id_seq; +ALTER SEQUENCE citus.pg_dist_operation_id_seq SET SCHEMA pg_catalog; diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql index 677234c46..9f59da0bc 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql @@ -95,3 +95,9 @@ DROP FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode); DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text, shard_transfer_mode citus.shard_transfer_mode); #include "../udfs/isolate_tenant_to_new_shard/8.0-1.sql" + +#include "../udfs/citus_prepare_pg_upgrade/11.0-1.sql" +#include "../udfs/citus_finish_pg_upgrade/11.0-4.sql" + +DROP TABLE pg_catalog.pg_dist_cleanup_record; +DROP SEQUENCE pg_catalog.pg_dist_operation_id_seq; diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/11.1-1.sql index ee24a37e9..9781b2674 100644 --- a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/11.1-1.sql +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/11.1-1.sql @@ -62,6 +62,7 @@ BEGIN INSERT INTO pg_catalog.pg_dist_local_group SELECT * FROM public.pg_dist_local_group; INSERT INTO pg_catalog.pg_dist_transaction SELECT * FROM public.pg_dist_transaction; INSERT INTO pg_catalog.pg_dist_colocation SELECT * FROM public.pg_dist_colocation; + INSERT INTO pg_catalog.pg_dist_cleanup_record SELECT * FROM public.pg_dist_cleanup_record; -- enterprise catalog tables INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo; INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo; @@ -91,6 +92,7 @@ BEGIN DROP TABLE public.pg_dist_shard; DROP TABLE public.pg_dist_transaction; DROP TABLE public.pg_dist_rebalance_strategy; + DROP TABLE public.pg_dist_cleanup_record; -- -- reset sequences diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql index ee24a37e9..9781b2674 100644 --- a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql @@ -62,6 +62,7 @@ BEGIN INSERT INTO pg_catalog.pg_dist_local_group SELECT * FROM public.pg_dist_local_group; INSERT INTO pg_catalog.pg_dist_transaction SELECT * FROM public.pg_dist_transaction; INSERT INTO pg_catalog.pg_dist_colocation SELECT * FROM public.pg_dist_colocation; + INSERT INTO pg_catalog.pg_dist_cleanup_record SELECT * FROM public.pg_dist_cleanup_record; -- enterprise catalog tables INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo; INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo; @@ -91,6 +92,7 @@ BEGIN DROP TABLE public.pg_dist_shard; DROP TABLE public.pg_dist_transaction; DROP TABLE public.pg_dist_rebalance_strategy; + DROP TABLE public.pg_dist_cleanup_record; -- -- reset sequences diff --git a/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/11.1-1.sql new file mode 100644 index 000000000..0b615cf39 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/11.1-1.sql @@ -0,0 +1,76 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_prepare_pg_upgrade() + RETURNS void + LANGUAGE plpgsql + SET search_path = pg_catalog + AS $cppu$ +BEGIN + + DELETE FROM pg_depend WHERE + objid IN (SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') AND + refobjid IN (select oid from pg_extension where extname = 'citus'); + -- + -- We are dropping the aggregates because postgres 14 changed + -- array_cat type from anyarray to anycompatiblearray. When + -- upgrading to pg14, specifically when running pg_restore on + -- array_cat_agg we would get an error. So we drop the aggregate + -- and create the right one on citus_finish_pg_upgrade. + + DROP AGGREGATE IF EXISTS array_cat_agg(anyarray); + DROP AGGREGATE IF EXISTS array_cat_agg(anycompatiblearray); + -- + -- Drop existing backup tables + -- + DROP TABLE IF EXISTS public.pg_dist_partition; + DROP TABLE IF EXISTS public.pg_dist_shard; + DROP TABLE IF EXISTS public.pg_dist_placement; + DROP TABLE IF EXISTS public.pg_dist_node_metadata; + DROP TABLE IF EXISTS public.pg_dist_node; + DROP TABLE IF EXISTS public.pg_dist_local_group; + DROP TABLE IF EXISTS public.pg_dist_transaction; + DROP TABLE IF EXISTS public.pg_dist_colocation; + DROP TABLE IF EXISTS public.pg_dist_authinfo; + DROP TABLE IF EXISTS public.pg_dist_poolinfo; + DROP TABLE IF EXISTS public.pg_dist_rebalance_strategy; + DROP TABLE IF EXISTS public.pg_dist_object; + DROP TABLE IF EXISTS public.pg_dist_cleanup_record; + + -- + -- backup citus catalog tables + -- + CREATE TABLE public.pg_dist_partition AS SELECT * FROM pg_catalog.pg_dist_partition; + CREATE TABLE public.pg_dist_shard AS SELECT * FROM pg_catalog.pg_dist_shard; + CREATE TABLE public.pg_dist_placement AS SELECT * FROM pg_catalog.pg_dist_placement; + CREATE TABLE public.pg_dist_node_metadata AS SELECT * FROM pg_catalog.pg_dist_node_metadata; + CREATE TABLE public.pg_dist_node AS SELECT * FROM pg_catalog.pg_dist_node; + CREATE TABLE public.pg_dist_local_group AS SELECT * FROM pg_catalog.pg_dist_local_group; + CREATE TABLE public.pg_dist_transaction AS SELECT * FROM pg_catalog.pg_dist_transaction; + CREATE TABLE public.pg_dist_colocation AS SELECT * FROM pg_catalog.pg_dist_colocation; + CREATE TABLE public.pg_dist_cleanup_record AS SELECT * FROM pg_catalog.pg_dist_cleanup_record; + -- enterprise catalog tables + CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo; + CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo; + CREATE TABLE public.pg_dist_rebalance_strategy AS SELECT + name, + default_strategy, + shard_cost_function::regprocedure::text, + node_capacity_function::regprocedure::text, + shard_allowed_on_node_function::regprocedure::text, + default_threshold, + minimum_threshold, + improvement_threshold + FROM pg_catalog.pg_dist_rebalance_strategy; + + -- store upgrade stable identifiers on pg_dist_object catalog + CREATE TABLE public.pg_dist_object AS SELECT + address.type, + address.object_names, + address.object_args, + objects.distribution_argument_index, + objects.colocationid + FROM pg_catalog.pg_dist_object objects, + pg_catalog.pg_identify_object_as_address(objects.classid, objects.objid, objects.objsubid) address; +END; +$cppu$; + +COMMENT ON FUNCTION pg_catalog.citus_prepare_pg_upgrade() + IS 'perform tasks to copy citus settings to a location that could later be restored after pg_upgrade is done'; diff --git a/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql index ff7e5d43e..0b615cf39 100644 --- a/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql @@ -32,6 +32,7 @@ BEGIN DROP TABLE IF EXISTS public.pg_dist_poolinfo; DROP TABLE IF EXISTS public.pg_dist_rebalance_strategy; DROP TABLE IF EXISTS public.pg_dist_object; + DROP TABLE IF EXISTS public.pg_dist_cleanup_record; -- -- backup citus catalog tables @@ -44,6 +45,7 @@ BEGIN CREATE TABLE public.pg_dist_local_group AS SELECT * FROM pg_catalog.pg_dist_local_group; CREATE TABLE public.pg_dist_transaction AS SELECT * FROM pg_catalog.pg_dist_transaction; CREATE TABLE public.pg_dist_colocation AS SELECT * FROM pg_catalog.pg_dist_colocation; + CREATE TABLE public.pg_dist_cleanup_record AS SELECT * FROM pg_catalog.pg_dist_cleanup_record; -- enterprise catalog tables CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo; CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo; diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index d3f41273d..392da45f6 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -37,6 +37,7 @@ #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" +#include "distributed/shard_cleaner.h" #include "distributed/shared_connection_stats.h" #include "distributed/subplan_execution.h" #include "distributed/version_compat.h" @@ -600,6 +601,7 @@ ResetGlobalVariables() NodeMetadataSyncOnCommit = false; InTopLevelDelegatedFunctionCall = false; InTableTypeConversionFunctionCall = false; + CurrentOperationId = INVALID_OPERATION_ID; ResetWorkerErrorIndication(); memset(&AllowedDistributionColumnValue, 0, sizeof(AllowedDistributionColumn)); diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 35991ade1..aedc51cca 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -508,7 +508,7 @@ LockPlacementCleanup(void) LOCKTAG tag; const bool sessionLock = false; const bool dontWait = false; - SET_LOCKTAG_PLACEMENT_CLEANUP(tag); + SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_CLEANUP); (void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); } @@ -523,7 +523,7 @@ TryLockPlacementCleanup(void) LOCKTAG tag; const bool sessionLock = false; const bool dontWait = true; - SET_LOCKTAG_PLACEMENT_CLEANUP(tag); + SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_CLEANUP); bool lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); return lockAcquired; } diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 92f8a4514..65b06d83d 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -229,6 +229,7 @@ extern Oid DistShardRelationId(void); extern Oid DistPlacementRelationId(void); extern Oid DistNodeRelationId(void); extern Oid DistRebalanceStrategyRelationId(void); +extern Oid DistCleanupRecordRelationId(void); extern Oid DistLocalGroupIdRelationId(void); extern Oid DistObjectRelationId(void); extern Oid DistEnabledCustomAggregatesId(void); @@ -246,6 +247,7 @@ extern Oid DistTransactionRelationId(void); extern Oid DistTransactionGroupIndexId(void); extern Oid DistPlacementGroupidIndexId(void); extern Oid DistObjectPrimaryKeyIndexId(void); +extern Oid DistCleanupRecordPrimaryKeyIndexId(void); /* type oids */ extern Oid LookupTypeOid(char *schemaNameSting, char *typeNameString); diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index c808e9157..fd2cdd48a 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -40,7 +40,7 @@ typedef enum AdvisoryLocktagClass ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7, ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8, ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9, - ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP = 10, + ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID = 10, ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13 } AdvisoryLocktagClass; @@ -48,7 +48,8 @@ typedef enum AdvisoryLocktagClass /* CitusOperations has constants for citus operations */ typedef enum CitusOperations { - CITUS_TRANSACTION_RECOVERY = 0 + CITUS_TRANSACTION_RECOVERY = 0, + CITUS_CLEANUP } CitusOperations; /* reuse advisory lock, but with different, unused field 4 (4)*/ @@ -108,12 +109,12 @@ typedef enum CitusOperations /* reuse advisory lock, but with different, unused field 4 (10) * Also it has the database hardcoded to MyDatabaseId, to ensure the locks * are local to each database */ -#define SET_LOCKTAG_PLACEMENT_CLEANUP(tag) \ +#define SET_LOCKTAG_CLEANUP_OPERATION_ID(tag, operationId) \ SET_LOCKTAG_ADVISORY(tag, \ MyDatabaseId, \ - (uint32) 0, \ - (uint32) 0, \ - ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP) + (uint32) ((operationId) >> 32), \ + (uint32) operationId, \ + ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID) /* reuse advisory lock, but with different, unused field 4 (12) * Also it has the database hardcoded to MyDatabaseId, to ensure the locks diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index 8a98254f9..f19275f39 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -11,6 +11,35 @@ #ifndef CITUS_SHARD_CLEANER_H #define CITUS_SHARD_CLEANER_H +/* ---------------- + * compiler constants for pg_dist_cleanup_record + * ---------------- + */ +#define Natts_pg_dist_cleanup_record 6 +#define Anum_pg_dist_cleanup_record_record_id 1 +#define Anum_pg_dist_cleanup_record_operation_id 2 +#define Anum_pg_dist_cleanup_record_object_type 3 +#define Anum_pg_dist_cleanup_record_object_name 4 +#define Anum_pg_dist_cleanup_record_node_group_id 5 +#define Anum_pg_dist_cleanup_record_is_success 6 + +#define INVALID_OPERATION_ID 0 + +/* + * CleanupObjectType represents an object type that can be used + * in cleanup records. + */ +typedef enum CleanupObjectType +{ + CLEANUP_SHARD_PLACEMENT = 1 +} CleanupObjectType; + +typedef uint64 CleanupRecordId; +typedef uint64 OperationId; + +extern OperationId CurrentOperationId; + + /* GUC to configure deferred shard deletion */ extern int DeferShardDeleteInterval; extern bool DeferShardDeleteOnMove; @@ -21,4 +50,11 @@ extern int TryDropOrphanedShards(bool waitForLocks); extern int DropOrphanedShards(bool waitForLocks); extern void DropOrphanedShardsInSeparateTransaction(void); +extern OperationId StartOperationRequiringCleanup(void); +extern void InsertCleanupRecordInCurrentTransaction(CleanupObjectType objectType, + char *objectName, int nodeGroupId); +extern void InsertCleanupRecordInSubtransaction(CleanupObjectType objectType, + char *objectName, int nodeGroupId); +extern void DeleteMyCleanupOnFailureRecords(void); + #endif /*CITUS_SHARD_CLEANER_H */ diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index a0791472b..e6178bba8 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1135,11 +1135,14 @@ SELECT * FROM multi_extension.print_extension_changes(); | function worker_copy_table_to_node(regclass,integer) void | function worker_split_copy(bigint,split_copy_info[]) void | function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info + | sequence pg_dist_cleanup_record_record_id_seq + | sequence pg_dist_operation_id_seq + | table pg_dist_cleanup_record | type replication_slot_info | type split_copy_info | type split_shard_info | view citus_locks -(34 rows) +(37 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 303326b76..f274c9120 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -232,12 +232,15 @@ ORDER BY 1; function worker_split_shard_replication_setup(split_shard_info[]) schema citus schema citus_internal + sequence pg_dist_cleanup_record_record_id_seq sequence pg_dist_colocationid_seq sequence pg_dist_groupid_seq sequence pg_dist_node_nodeid_seq + sequence pg_dist_operation_id_seq sequence pg_dist_placement_placementid_seq sequence pg_dist_shardid_seq table pg_dist_authinfo + table pg_dist_cleanup_record table pg_dist_colocation table pg_dist_local_group table pg_dist_node @@ -267,5 +270,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(259 rows) +(262 rows)