Initial Commit

niupre/TestDeferredDropAndCleanup
Nitish Upreti 2022-08-24 23:55:44 -07:00
parent 31faa88a4e
commit 895fe14040
11 changed files with 840 additions and 271 deletions

View File

@ -147,6 +147,8 @@ typedef struct MetadataCacheData
Oid distLocalGroupRelationId;
Oid distObjectRelationId;
Oid distObjectPrimaryKeyIndexId;
Oid distCleanupRelationId;
Oid distCleanupPrimaryKeyIndexId;
Oid distColocationRelationId;
Oid distColocationConfigurationIndexId;
Oid distPartitionRelationId;
@ -2465,6 +2467,28 @@ DistObjectPrimaryKeyIndexId(void)
}
/* return oid of pg_dist_cleanup relation */
Oid
DistCleanupRelationId(void)
{
CachedRelationLookup("pg_dist_cleanup",
&MetadataCache.distCleanupRelationId);
return MetadataCache.distCleanupRelationId;
}
/* return oid of pg_dist_cleanup primary key index */
Oid
DistCleanupPrimaryKeyIndexId(void)
{
CachedRelationLookup("pg_dist_cleanup_pkey",
&MetadataCache.distCleanupPrimaryKeyIndexId);
return MetadataCache.distCleanupPrimaryKeyIndexId;
}
/* return oid of pg_dist_colocation relation */
Oid
DistColocationRelationId(void)

View File

@ -11,10 +11,17 @@
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "commands/sequence.h"
#include "postmaster/postmaster.h"
#include "nodes/makefuncs.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "distributed/listutils.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/shard_cleaner.h"
@ -22,15 +29,60 @@
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/worker_transaction.h"
#include "distributed/pg_dist_cleanup.h"
/* GUC configuration for shard cleaner */
int NextOperationId = 0;
int NextCleanupRecordId = 0;
/* Data structure for cleanup operation */
/*
* CleanupRecord represents a record from pg_dist_cleanup.
*/
typedef struct CleanupRecord
{
/* unique identifier of the record (for deletes) */
uint64 recordId;
/* identifier of the operation that generated the record (must not be in progress) */
OperationId operationId;
/* type of the object (e.g. shard placement) */
CleanupObject objectType;
/* fully qualified name of the object */
char *objectName;
/* node grou ID on which the object is located */
int nodeGroupId;
/* cleanup policy */
CleanupPolicy policy;
} CleanupRecord;
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(citus_cleanup_orphaned_shards);
PG_FUNCTION_INFO_V1(isolation_cleanup_orphaned_shards);
static bool TryDropShard(GroupShardPlacement *placement);
static bool TryDropShardOutsideTransaction(char *qualifiedTableName, char *nodeName, int nodePort);
static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode);
/* Functions for cleanup infrastructure */
static CleanupRecord * TupleToCleanupRecord(HeapTuple heapTuple,
TupleDesc
tupleDescriptor);
static OperationId GetNextOperationId(void);
static uint64 GetNextCleanupRecordId(void);
static void LockOperationId(OperationId operationId);
static bool TryLockOperationId(OperationId operationId);
static void DeleteCleanupRecordByRecordId(uint64 recordId);
static void DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId);
static bool CleanupRecordExists(uint64 recordId);
static List * ListCleanupRecords(void);
static List * ListCleanupRecordsForCurrentOperation(void);
static int DropOrphanedShardsForCleanup(void);
/*
* citus_cleanup_orphaned_shards implements a user-facing UDF to delete
@ -115,6 +167,7 @@ TryDropOrphanedShards(bool waitForLocks)
PG_TRY();
{
droppedShardCount = DropOrphanedShards(waitForLocks);
droppedShardCount += DropOrphanedShardsForCleanup();
}
PG_CATCH();
{
@ -132,6 +185,65 @@ TryDropOrphanedShards(bool waitForLocks)
}
static int
DropOrphanedShardsForCleanup()
{
List *cleanupRecordList = ListCleanupRecords();
int removedShardCountForCleanup = 0;
int failedShardCountForCleanup = 0;
CleanupRecord *record = NULL;
foreach_ptr(record, cleanupRecordList)
{
/* We only supporting cleaning shards right now */
if (record->objectType != CLEANUP_SHARD_PLACEMENT)
{
ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ",
record->objectType)));
continue;
}
if (!TryLockOperationId(record->operationId))
{
/* operation that the cleanup record is part of is still running */
continue;
}
char *qualifiedTableName = record->objectName;
WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId);
if (!CleanupRecordExists(record->recordId))
{
/*
* The operation completed successfully just after we called
* ListCleanupRecords in which case the record is now gone.
*/
continue;
}
if (TryDropShardOutsideTransaction(qualifiedTableName, workerNode->workerName,
workerNode->workerPort))
{
/* delete the cleanup record */
DeleteCleanupRecordByRecordId(record->recordId);
removedShardCountForCleanup++;
}
else
{
failedShardCountForCleanup++;
}
}
if (failedShardCountForCleanup > 0)
{
ereport(WARNING, (errmsg("Failed to drop %d cleanup shards out of %d",
failedShardCountForCleanup, list_length(cleanupRecordList))));
}
return removedShardCountForCleanup;
}
/*
* DropOrphanedShards removes shards that were marked SHARD_STATE_TO_DELETE before.
*
@ -155,7 +267,6 @@ int
DropOrphanedShards(bool waitForLocks)
{
int removedShardCount = 0;
ListCell *shardPlacementCell = NULL;
/*
* We should try to take the highest lock that we take
@ -185,19 +296,26 @@ DropOrphanedShards(bool waitForLocks)
int failedShardDropCount = 0;
List *shardPlacementList = AllShardPlacementsWithShardPlacementState(
SHARD_STATE_TO_DELETE);
foreach(shardPlacementCell, shardPlacementList)
{
GroupShardPlacement *placement = (GroupShardPlacement *) lfirst(
shardPlacementCell);
GroupShardPlacement *placement = NULL;
foreach_ptr(placement, shardPlacementList)
{
if (!PrimaryNodeForGroup(placement->groupId, NULL) ||
!ShardExists(placement->shardId))
{
continue;
}
if (TryDropShard(placement))
ShardPlacement *shardPlacement = LoadShardPlacement(placement->shardId,
placement->placementId);
ShardInterval *shardInterval = LoadShardInterval(placement->shardId);
char *qualifiedTableName = ConstructQualifiedShardName(shardInterval);
if (TryDropShardOutsideTransaction(qualifiedTableName, shardPlacement->nodeName,
shardPlacement->nodePort))
{
/* delete the actual placement */
DeleteShardPlacementRow(placement->placementId);
removedShardCount++;
}
else
@ -216,6 +334,182 @@ DropOrphanedShards(bool waitForLocks)
}
OperationId
StartNewOperationNeedingCleanup(void)
{
CurrentOperationId = GetNextOperationId();
LockOperationId(CurrentOperationId);
return CurrentOperationId;
}
void
CompleteNewOperationNeedingCleanup(bool isSuccess)
{
/*
* As part of operation completion:
* 1. Drop all resources of CurrentOperationId that are marked with 'CLEANUP_ALWAYS' policy and
* the respective cleanup records in seperate transaction.
*
* 2. For all resources of CurrentOperationId that are marked with 'CLEANUP_ON_FAILURE':
* a) If isSuccess = true, drop cleanup records as operation is nearing completion.
* As the operation is nearing successful completion. This is done as part of the
* same transaction so will rollback in case of potential failure later.
*
* b) If isSuccess = false, drop resource and cleanup records in a seperate transaction.
*/
/* We must have a valid OperationId. Any operation requring cleanup
* will call StartNewOperationNeedingCleanup.
*/
Assert(CurrentOperationId != INVALID_OPERATION_ID);
List *recordList = ListCleanupRecordsForCurrentOperation();
int removedShardCountOnComplete = 0;
int failedShardCountOnComplete = 0;
CleanupRecord *record = NULL;
foreach_ptr(record, recordList)
{
/* We only supporting cleaning shards right now */
if (record->objectType != CLEANUP_SHARD_PLACEMENT)
{
ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ",
record->objectType)));
continue;
}
if (record->policy == CLEANUP_ALWAYS ||
(record->policy == CLEANUP_ON_FAILURE && !isSuccess))
{
char *qualifiedTableName = record->objectName;
WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId);
if (TryDropShardOutsideTransaction(qualifiedTableName, workerNode->workerName,
workerNode->workerPort))
{
DeleteCleanupRecordByRecordIdOutsideTransaction(record->recordId);
removedShardCountOnComplete++;
}
else
{
failedShardCountOnComplete++;
}
}
else if (record->policy == CLEANUP_ON_FAILURE && isSuccess)
{
DeleteCleanupRecordByRecordId(record->recordId);
}
}
}
/*
* InsertCleanupRecordInCurrentTransaction inserts a new pg_dist_cleanup_record entry
* as part of the current transaction. This is primarily useful for deferred drop scenarios,
* since these records would roll back in case of operation failure.
*/
void
InsertCleanupRecordInCurrentTransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy)
{
/* We must have a valid OperationId. Any operation requring cleanup
* will call StartNewOperationNeedingCleanup.
*/
Assert(CurrentOperationId != INVALID_OPERATION_ID);
Datum values[Natts_pg_dist_cleanup];
bool isNulls[Natts_pg_dist_cleanup];
/* form new shard tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
uint64 recordId = GetNextCleanupRecordId();
OperationId operationId = CurrentOperationId;
values[Anum_pg_dist_cleanup_record_id - 1] = UInt64GetDatum(recordId);
values[Anum_pg_dist_cleanup_operation_id - 1] = UInt64GetDatum(operationId);
values[Anum_pg_dist_cleanup_object_type - 1] = Int32GetDatum(objectType);
values[Anum_pg_dist_cleanup_object_name - 1] = CStringGetTextDatum(objectName);
values[Anum_pg_dist_cleanup_node_group_id - 1] = Int32GetDatum(nodeGroupId);
values[Anum_pg_dist_cleanup_policy_type -1] = Int32GetDatum(policy);
/* open cleanup relation and insert new tuple */
Oid relationId = DistCleanupRelationId();
Relation pgDistCleanup = table_open(relationId, RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistCleanup);
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
CatalogTupleInsert(pgDistCleanup, heapTuple);
CommandCounterIncrement();
table_close(pgDistCleanup, NoLock);
}
/*
* InsertCleanupRecordInSeparateTransaction inserts a new pg_dist_cleanup_record entry
* in a separate transaction to ensure the record persists after rollback. We should
* delete these records if the operation completes successfully.
*
* For failure scenarios, use a subtransaction (direct insert via localhost).
*/
void
InsertCleanupRecordInSubtransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy)
{
/* We must have a valid OperationId. Any operation requring cleanup
* will call StartNewOperationNeedingCleanup.
*/
Assert(CurrentOperationId != INVALID_OPERATION_ID);
StringInfo command = makeStringInfo();
appendStringInfo(command,
"INSERT INTO %s.%s "
" (operation_id, object_type, object_name, node_group_id, policy_type) "
" VALUES (" UINT64_FORMAT ", %d, %s, %d, %d)",
PG_CATALOG,
PG_DIST_CLEANUP,
CurrentOperationId,
objectType,
quote_literal_cstr(objectName),
nodeGroupId,
policy);
SendCommandListToWorkerOutsideTransaction(LocalHostName,
PostPortNumber,
CitusExtensionOwnerName(),
list_make1(command->data));
}
static void
DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId)
{
StringInfo command = makeStringInfo();
appendStringInfo(command,
"DELETE FROM %s.%s "
"WHERE record_id = %lu",
PG_CATALOG,
PG_DIST_CLEANUP,
recordId);
SendCommandListToWorkerOutsideTransaction(LocalHostName,
PostPortNumber,
CitusExtensionOwnerName(),
list_make1(command->data));
}
/*
* TryLockRelationAndPlacementCleanup tries to lock the given relation
* and the placement cleanup. If it cannot, it returns false.
@ -245,20 +539,14 @@ TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode)
* true on success.
*/
static bool
TryDropShard(GroupShardPlacement *placement)
TryDropShardOutsideTransaction(char *qualifiedTableName, char *nodeName, int nodePort)
{
ShardPlacement *shardPlacement = LoadShardPlacement(placement->shardId,
placement->placementId);
ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId);
ereport(LOG, (errmsg("dropping shard placement " INT64_FORMAT " of shard "
INT64_FORMAT " on %s:%d after it was moved away",
shardPlacement->placementId, shardPlacement->shardId,
shardPlacement->nodeName, shardPlacement->nodePort)));
ereport(LOG, (errmsg("dropping shard placement %s "
"on %s:%d after it was moved away",
qualifiedTableName, nodeName, nodePort)));
/* prepare sql query to execute to drop the shard */
StringInfo dropQuery = makeStringInfo();
char *qualifiedTableName = ConstructQualifiedShardName(shardInterval);
appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, qualifiedTableName);
/*
@ -274,15 +562,250 @@ TryDropShard(GroupShardPlacement *placement)
dropQuery->data);
/* remove the shard from the node */
bool success =
SendOptionalCommandListToWorkerOutsideTransaction(shardPlacement->nodeName,
shardPlacement->nodePort,
NULL, dropCommandList);
if (success)
{
/* delete the actual placement */
DeleteShardPlacementRow(placement->placementId);
}
bool success = true;
SendOptionalCommandListToWorkerOutsideTransaction(nodeName,
nodePort,
NULL,
dropCommandList);
return success;
}
/*
* GetNextOperationId allocates and returns a unique operationId for an operation
* requiring potential cleanup. This allocation occurs both in shared memory and
* in write ahead logs; writing to logs avoids the risk of having operationId collisions.
*/
static OperationId
GetNextOperationId()
{
OperationId operationdId = INVALID_OPERATION_ID;
/*
* In regression tests, we would like to generate operation IDs consistently
* even if the tests run in parallel. Instead of the sequence, we can use
* the next_operation_id GUC to specify which operation ID the current session should
* generate next. The GUC is automatically increased by 1 every time a new
* operation ID is generated.
*/
if (NextOperationId > 0)
{
operationdId = NextOperationId;
NextOperationId += 1;
return operationdId;
}
/* token location, or -1 if unknown */
const int location = -1;
RangeVar *sequenceName = makeRangeVar(PG_CATALOG,
OPERATIONID_SEQUENCE_NAME,
location);
bool missingOK = false;
Oid sequenceId = RangeVarGetRelid(sequenceName, NoLock, missingOK);
bool checkPermissions = true;
operationdId = nextval_internal(sequenceId, checkPermissions);
return operationdId;
}
/*
* ListCleanupRecords lists all the current cleanup records.
*/
static List *
ListCleanupRecords(void)
{
Relation pgDistCleanup = table_open(DistCleanupRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistCleanup);
List *recordList = NIL;
int scanKeyCount = 0;
bool indexOK = false;
SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanup, InvalidOid,
indexOK, NULL, scanKeyCount, NULL);
HeapTuple heapTuple = NULL;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
CleanupRecord *record = TupleToCleanupRecord(heapTuple, tupleDescriptor);
recordList = lappend(recordList, record);
}
systable_endscan(scanDescriptor);
table_close(pgDistCleanup, NoLock);
return recordList;
}
/*
* ListCleanupRecordsForCurrentOperation lists all the current cleanup records.
*/
static List *
ListCleanupRecordsForCurrentOperation(void)
{
return NULL;
}
/*
* TupleToCleanupRecord converts a pg_dist_cleanup record tuple into a CleanupRecord struct.
*/
static CleanupRecord *
TupleToCleanupRecord(HeapTuple heapTuple, TupleDesc tupleDescriptor)
{
Datum datumArray[Natts_pg_dist_cleanup];
bool isNullArray[Natts_pg_dist_cleanup];
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
CleanupRecord *record = palloc0(sizeof(CleanupRecord));
record->recordId =
DatumGetUInt64(datumArray[Anum_pg_dist_cleanup_record_id - 1]);
record->objectType =
DatumGetInt32(datumArray[Anum_pg_dist_cleanup_object_type - 1]);
record->objectName =
TextDatumGetCString(datumArray[Anum_pg_dist_cleanup_object_name - 1]);
record->nodeGroupId =
DatumGetInt32(datumArray[Anum_pg_dist_cleanup_node_group_id - 1]);
record->policy =
DatumGetInt32(datumArray[Anum_pg_dist_cleanup_policy_type - 1]);
return record;
}
/*
* CleanupRecordExists returns whether a cleanup record with the given
* record ID exists in pg_dist_cleanup_record.
*/
static bool
CleanupRecordExists(uint64 recordId)
{
Relation pgDistCleanup = table_open(DistCleanupRelationId(),
AccessShareLock);
const int scanKeyCount = 1;
ScanKeyData scanKey[1];
bool indexOK = true;
ScanKeyInit(&scanKey[0], Anum_pg_dist_cleanup_record_id,
BTEqualStrategyNumber, F_INT8EQ, UInt64GetDatum(recordId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanup,
DistCleanupPrimaryKeyIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
bool recordExists = HeapTupleIsValid(heapTuple);
systable_endscan(scanDescriptor);
CommandCounterIncrement();
table_close(pgDistCleanup, NoLock);
return recordExists;
}
/*
* DeleteCleanupRecordByRecordId deletes a single pg_dist_cleanup_record entry.
*/
static void
DeleteCleanupRecordByRecordId(uint64 recordId)
{
Relation pgDistCleanup = table_open(DistCleanupRelationId(),
RowExclusiveLock);
const int scanKeyCount = 1;
ScanKeyData scanKey[1];
bool indexOK = true;
ScanKeyInit(&scanKey[0], Anum_pg_dist_cleanup_record_id,
BTEqualStrategyNumber, F_INT8EQ, UInt64GetDatum(recordId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanup,
DistCleanupPrimaryKeyIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (heapTuple == NULL)
{
ereport(ERROR, (errmsg("could not find cleanup record " UINT64_FORMAT,
recordId)));
}
simple_heap_delete(pgDistCleanup, &heapTuple->t_self);
systable_endscan(scanDescriptor);
CommandCounterIncrement();
table_close(pgDistCleanup, NoLock);
}
/*
* GetNextCleanupRecordId allocates and returns a unique recordid for a cleanup entry.
* This allocation occurs both in shared memory and
* in write ahead logs; writing to logs avoids the risk of having operationId collisions.
*/
static uint64
GetNextCleanupRecordId(void)
{
uint64 recordId = INVALID_CLEANUP_RECORD_ID;
/*
* In regression tests, we would like to generate record IDs consistently
* even if the tests run in parallel. Instead of the sequence, we can use
* the next_record_id GUC to specify which recordid ID the current session should
* generate next. The GUC is automatically increased by 1 every time a new
* record ID is generated.
*/
if (NextCleanupRecordId > 0)
{
recordId = NextCleanupRecordId;
NextCleanupRecordId += 1;
return recordId;
}
RangeVar *sequenceName = makeRangeVar(PG_CATALOG,
CLEANUPRECORDID_SEQUENCE_NAME,
-1);
bool missingOK = false;
Oid sequenceId = RangeVarGetRelid(sequenceName, NoLock, missingOK);
bool checkPermissions = true;
return nextval_internal(sequenceId, checkPermissions);
}
static void
LockOperationId(OperationId operationId)
{
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = false;
SET_LOCKTAG_CLEANUP_OPERATION_ID(tag, operationId);
(void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait);
}
static bool
TryLockOperationId(OperationId operationId)
{
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = true;
SET_LOCKTAG_CLEANUP_OPERATION_ID(tag, operationId);
bool lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait);
return lockAcquired;
}

View File

@ -33,6 +33,7 @@
#include "distributed/multi_partitioning_utils.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "distributed/shard_cleaner.h"
#include "distributed/shared_library_init.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/metadata_sync.h"
@ -43,6 +44,9 @@
#include "distributed/shard_rebalancer.h"
#include "postmaster/postmaster.h"
/* declarations for dynamic loading */
bool DeferShardDeleteOnSplit = true;
/*
* Entry for map that tracks ShardInterval -> Placement Node
* created by split workflow.
@ -72,15 +76,13 @@ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
List *shardSplitPointsList,
List *nodeIdsForPlacementList);
static void CreateAndCopySplitShardsForShardGroup(
HTAB *mapOfShardToPlacementCreatedByWorkflow,
WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow,
List *shardGroupSplitIntervalListList,
static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement,
static void CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
WorkerNode *sourceWorkerNode,
@ -89,7 +91,7 @@ static HTAB * CreateWorkerForPlacementSet(List *workersForPlacementList);
static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList,
List *workersForPlacementList,
bool includeReplicaIdentity);
static void CreateReplicaIdentitiesForDummyShards(HTAB *mapOfDummyShardToPlacement);
static void CreateReplicaIdentitiesForDummyShards(HTAB *mapOfPlacementToDummyShardList);
static void CreateObjectOnPlacement(List *objectCreationCommandList,
WorkerNode *workerNode);
static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList,
@ -121,8 +123,6 @@ static void CreatePartitioningHierarchy(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow);
static HTAB * CreateEmptyMapForShardsCreatedByWorkflow();
static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode);
static StringInfo CreateSplitShardReplicationSetupUDF(
List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList,
@ -133,10 +133,8 @@ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList);
static void AddDummyShardEntryInMap(HTAB *mapOfDummyShards, uint32 targetNodeId,
static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32 targetNodeId,
ShardInterval *shardInterval);
static void DropDummyShards(HTAB *mapOfDummyShardToPlacement);
static void DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval);
static uint64 GetNextShardIdForSplitChild(void);
/* Customize error message strings based on operation type */
@ -431,6 +429,9 @@ SplitShard(SplitMode splitMode,
lappend(workersForPlacementList, (void *) workerNode);
}
/* Start operation to prepare for generating cleanup records */
StartNewOperationNeedingCleanup();
if (splitMode == BLOCKING_SPLIT)
{
EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_BLOCK_WRITES);
@ -450,70 +451,9 @@ SplitShard(SplitMode splitMode,
PlacementMovedUsingLogicalReplicationInTX = true;
}
}
/*
* ShardIntervalHashCode computes the hash code for a Shardinterval using
* shardId.
*/
static uint32
ShardIntervalHashCode(const void *key, Size keySize)
{
const ShardInterval *shardInterval = (const ShardInterval *) key;
const uint64 *shardId = &(shardInterval->shardId);
/* standard hash function outlined in Effective Java, Item 8 */
uint32 result = 17;
result = 37 * result + tag_hash(shardId, sizeof(uint64));
return result;
}
/*
* ShardIntervalHashCompare compares two shard intervals using shard id.
*/
static int
ShardIntervalHashCompare(const void *lhsKey, const void *rhsKey, Size keySize)
{
const ShardInterval *intervalLhs = (const ShardInterval *) lhsKey;
const ShardInterval *intervalRhs = (const ShardInterval *) rhsKey;
int shardIdCompare = 0;
/* first, compare by shard id */
if (intervalLhs->shardId < intervalRhs->shardId)
{
shardIdCompare = -1;
}
else if (intervalLhs->shardId > intervalRhs->shardId)
{
shardIdCompare = 1;
}
return shardIdCompare;
}
/* Create an empty map that tracks ShardInterval -> Placement Node as created by workflow */
static HTAB *
CreateEmptyMapForShardsCreatedByWorkflow()
{
HASHCTL info = { 0 };
info.keysize = sizeof(ShardInterval);
info.entrysize = sizeof(ShardCreatedByWorkflowEntry);
info.hash = ShardIntervalHashCode;
info.match = ShardIntervalHashCompare;
info.hcxt = CurrentMemoryContext;
/* we don't have value field as it's a set */
info.entrysize = info.keysize;
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
HTAB *splitChildrenCreatedByWorkflow = hash_create("Shard id to Node Placement Map",
32, &info, hashFlags);
return splitChildrenCreatedByWorkflow;
bool isSuccess = true;
CompleteNewOperationNeedingCleanup(isSuccess);
}
@ -549,9 +489,6 @@ BlockingShardSplit(SplitOperation splitOperation,
WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId,
false /* missingOk */);
HTAB *mapOfShardToPlacementCreatedByWorkflow =
CreateEmptyMapForShardsCreatedByWorkflow();
PG_TRY();
{
/*
@ -560,7 +497,6 @@ BlockingShardSplit(SplitOperation splitOperation,
* Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints).
*/
CreateAndCopySplitShardsForShardGroup(
mapOfShardToPlacementCreatedByWorkflow,
sourceShardToCopyNode,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
@ -601,7 +537,8 @@ BlockingShardSplit(SplitOperation splitOperation,
ShutdownAllConnections();
/* Do a best effort cleanup of shards created on workers in the above block */
TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow);
bool isSuccess = false;
CompleteNewOperationNeedingCleanup(isSuccess);
PG_RE_THROW();
}
@ -613,8 +550,7 @@ BlockingShardSplit(SplitOperation splitOperation,
/* Create ShardGroup split children on a list of corresponding workers. */
static void
CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow,
List *shardGroupSplitIntervalListList,
CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
{
/*
@ -642,16 +578,15 @@ CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow,
splitShardCreationCommandList,
shardInterval->shardId);
/* Log resource for cleanup in case of failure only. */
CleanupPolicy policy = CLEANUP_ON_FAILURE;
InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
workerPlacementNode->groupId,
policy);
/* Create new split child shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode);
ShardCreatedByWorkflowEntry entry;
entry.shardIntervalKey = shardInterval;
entry.workerNodeValue = workerPlacementNode;
bool found = false;
hash_search(mapOfShardToPlacementCreatedByWorkflow, &entry, HASH_ENTER,
&found);
Assert(!found);
}
}
}
@ -735,14 +670,12 @@ CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList,
* on a list of corresponding workers.
*/
static void
CreateAndCopySplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow,
WorkerNode *sourceShardNode,
CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
{
CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow,
shardGroupSplitIntervalListList,
CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList);
/* For Blocking split, copy isn't snapshotted */
@ -1197,20 +1130,38 @@ DropShardList(List *shardIntervalList)
/* get shard name */
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
char storageType = shardInterval->storageType;
if (storageType == SHARD_STORAGE_TABLE)
if (DeferShardDeleteOnSplit)
{
appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND,
qualifiedShardName);
}
else if (storageType == SHARD_STORAGE_FOREIGN)
{
appendStringInfo(dropQuery, DROP_FOREIGN_TABLE_COMMAND,
qualifiedShardName);
}
/* drop old shard */
SendCommandToWorker(workerName, workerPort, dropQuery->data);
/* Log shard in pg_dist_cleanup.
* Parent shards are to be dropped only on sucess after split workflow is complete,
* so mark the policy as 'CLEANUP_DEFERRED_ON_SUCCESS'.
* We also log cleanup record in the current transaction. If the current transaction rolls back,
* we do not generate a record at all.
*/
CleanupPolicy policy = CLEANUP_DEFERRED_ON_SUCCESS;
InsertCleanupRecordInCurrentTransaction(CLEANUP_SHARD_PLACEMENT,
qualifiedShardName,
placement->groupId,
policy);
}
else
{
char storageType = shardInterval->storageType;
if (storageType == SHARD_STORAGE_TABLE)
{
appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND,
qualifiedShardName);
}
else if (storageType == SHARD_STORAGE_FOREIGN)
{
appendStringInfo(dropQuery, DROP_FOREIGN_TABLE_COMMAND,
qualifiedShardName);
}
/* drop old shard */
SendCommandToWorker(workerName, workerPort, dropQuery->data);
}
}
/* delete shard row */
@ -1219,50 +1170,6 @@ DropShardList(List *shardIntervalList)
}
/*
* In case of failure, TryDropSplitShardsOnFailure drops in-progress shard placements from both the
* coordinator and mx nodes.
*/
static void
TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow)
{
HASH_SEQ_STATUS status;
ShardCreatedByWorkflowEntry *entry;
hash_seq_init(&status, mapOfShardToPlacementCreatedByWorkflow);
while ((entry = (ShardCreatedByWorkflowEntry *) hash_seq_search(&status)) != 0)
{
ShardInterval *shardInterval = entry->shardIntervalKey;
WorkerNode *workerPlacementNode = entry->workerNodeValue;
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
StringInfo dropShardQuery = makeStringInfo();
/* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */
appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND,
qualifiedShardName);
int connectionFlags = FOR_DDL;
connectionFlags |= OUTSIDE_TRANSACTION;
MultiConnection *connnection = GetNodeUserDatabaseConnection(
connectionFlags,
workerPlacementNode->workerName,
workerPlacementNode->workerPort,
CurrentUserName(),
NULL /* databaseName */);
/*
* Perform a drop in best effort manner.
* The shard may or may not exist and the connection could have died.
*/
ExecuteOptionalRemoteCommand(
connnection,
dropShardQuery->data,
NULL /* pgResult */);
}
}
/*
* SplitShard API to split a given shard (or shard group) in non-blocking fashion
* based on specified split points to a set of destination nodes.
@ -1308,11 +1215,6 @@ NonBlockingShardSplit(SplitOperation splitOperation,
databaseName);
ClaimConnectionExclusively(sourceConnection);
HTAB *mapOfShardToPlacementCreatedByWorkflow =
CreateEmptyMapForShardsCreatedByWorkflow();
HTAB *mapOfDummyShardToPlacement = CreateSimpleHash(NodeAndOwner,
GroupedShardSplitInfos);
MultiConnection *sourceReplicationConnection =
GetReplicationConnection(sourceShardToCopyNode->workerName,
sourceShardToCopyNode->workerPort);
@ -1321,8 +1223,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
PG_TRY();
{
/* 1) Physically create split children. */
CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow,
shardGroupSplitIntervalListList,
CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList);
/*
@ -1330,8 +1231,10 @@ NonBlockingShardSplit(SplitOperation splitOperation,
* Refer to the comment section of 'CreateDummyShardsForShardGroup' for indepth
* information.
*/
HTAB *mapOfPlacementToDummyShardList = CreateSimpleHash(NodeAndOwner,
GroupedShardSplitInfos);
CreateDummyShardsForShardGroup(
mapOfDummyShardToPlacement,
mapOfPlacementToDummyShardList,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
sourceShardToCopyNode,
@ -1346,7 +1249,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
* initial COPY phase, like we do for the replica identities on the
* target shards.
*/
CreateReplicaIdentitiesForDummyShards(mapOfDummyShardToPlacement);
CreateReplicaIdentitiesForDummyShards(mapOfPlacementToDummyShardList);
/* 4) Create Publications. */
CreatePublications(sourceConnection, publicationInfoHash);
@ -1468,11 +1371,6 @@ NonBlockingShardSplit(SplitOperation splitOperation,
CreateForeignKeyConstraints(shardGroupSplitIntervalListList,
workersForPlacementList);
/*
* 23) Drop dummy shards.
*/
DropDummyShards(mapOfDummyShardToPlacement);
/* 24) Close source connection */
CloseConnection(sourceConnection);
@ -1487,12 +1385,14 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* end ongoing transactions to enable us to clean up */
ShutdownAllConnections();
/* Do a best effort cleanup of shards created on workers in the above block */
TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow);
/* Do a best effort cleanup of shards created on workers in the above block
* TODO(niupre): We don't need to do this once shard cleaner can clean replication
* artifacts.
*/
DropAllLogicalReplicationLeftovers(SHARD_SPLIT);
DropDummyShards(mapOfDummyShardToPlacement);
bool isSuccess = false;
CompleteNewOperationNeedingCleanup(isSuccess);
PG_RE_THROW();
}
@ -1523,7 +1423,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
* Note 2 : Given there is an overlap of source and destination in Worker0, Shard1_1 and Shard2_1 need not be created.
*/
static void
CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement,
CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
WorkerNode *sourceWorkerNode,
@ -1560,11 +1460,20 @@ CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement,
splitShardCreationCommandList,
shardInterval->shardId);
/* Log shard in pg_dist_cleanup. Given dummy shards are transient resources,
* we want to cleanup irrespective of operation success or failure.
*/
CleanupPolicy policy = CLEANUP_ALWAYS;
InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
workerPlacementNode->groupId,
policy);
/* Create dummy source shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode);
/* Add dummy source shard entry created for placement node in map */
AddDummyShardEntryInMap(mapOfDummyShardToPlacement,
AddDummyShardEntryInMap(mapOfPlacementToDummyShardList,
workerPlacementNode->nodeId,
shardInterval);
}
@ -1595,12 +1504,17 @@ CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement,
splitShardCreationCommandList,
shardInterval->shardId);
/* Log shard in pg_dist_cleanup. Given dummy shards are transient resources,
* we want to cleanup irrespective of operation success or failure.
*/
CleanupPolicy policy = CLEANUP_ALWAYS;
InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
workerPlacementNode->groupId,
policy);
/* Create dummy split child shard on source worker node */
CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode);
/* Add dummy split child shard entry created on source node */
AddDummyShardEntryInMap(mapOfDummyShardToPlacement, sourceWorkerNode->nodeId,
shardInterval);
}
}
}
@ -1822,7 +1736,8 @@ ParseReplicationSlotInfoFromResult(PGresult *result)
* of logical replication. We cautiously delete only the dummy shards added in the DummyShardHashMap.
*/
static void
AddDummyShardEntryInMap(HTAB *mapOfDummyShardToPlacement, uint32 targetNodeId,
AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList,
uint32 targetNodeId,
ShardInterval *shardInterval)
{
NodeAndOwner key;
@ -1831,7 +1746,8 @@ AddDummyShardEntryInMap(HTAB *mapOfDummyShardToPlacement, uint32 targetNodeId,
bool found = false;
GroupedDummyShards *nodeMappingEntry =
(GroupedDummyShards *) hash_search(mapOfDummyShardToPlacement, &key,
(GroupedDummyShards *) hash_search(mapOfPlacementToDummyShardList,
&key,
HASH_ENTER,
&found);
if (!found)
@ -1844,68 +1760,6 @@ AddDummyShardEntryInMap(HTAB *mapOfDummyShardToPlacement, uint32 targetNodeId,
}
/*
* DropDummyShards traverses the dummy shard map and drops shard at given node.
* It fails if the shard cannot be dropped.
*/
static void
DropDummyShards(HTAB *mapOfDummyShardToPlacement)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, mapOfDummyShardToPlacement);
GroupedDummyShards *entry = NULL;
while ((entry = (GroupedDummyShards *) hash_seq_search(&status)) != NULL)
{
uint32 nodeId = entry->key.nodeId;
WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId,
false /* missingOk */);
int connectionFlags = FOR_DDL;
connectionFlags |= OUTSIDE_TRANSACTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(
connectionFlags,
shardToBeDroppedNode->workerName,
shardToBeDroppedNode->workerPort,
CurrentUserName(),
NULL /* databaseName */);
List *dummyShardIntervalList = entry->shardIntervals;
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, dummyShardIntervalList)
{
DropDummyShard(connection, shardInterval);
}
CloseConnection(connection);
}
}
/*
* DropDummyShard drops a given shard on the node connection.
* It fails if the shard cannot be dropped.
*/
static void
DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval)
{
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
StringInfo dropShardQuery = makeStringInfo();
/* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */
appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND,
qualifiedShardName);
/*
* Since the dummy shard is expected to be present on the given node,
* fail if it cannot be dropped during cleanup.
*/
ExecuteCriticalRemoteCommand(
connection,
dropShardQuery->data);
}
/*
* CreateReplicaIdentitiesForDummyShards creates replica indentities for split
* dummy shards.

View File

@ -980,6 +980,25 @@ RegisterCitusConfigVariables(void)
0,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.defer_drop_after_shard_split",
gettext_noop("When enabled a shard split will mark the original shards "
"for deletion after a successful split, instead of deleting "
"them right away."),
gettext_noop("The deletion of a shard can sometimes run into a conflict with a "
"long running transactions on a the shard during the drop phase of "
"the shard split. This causes some splits to be rolled back after "
"resources have been spend on moving the shard. To prevent "
"conflicts this feature lets you skip the actual deletion till a "
"later point in time. When used one should set "
"citus.defer_shard_delete_interval to make sure defered deletions "
"will be executed"),
&DeferShardDeleteOnSplit,
true,
PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.defer_shard_delete_interval",
gettext_noop("Sets the time to wait between background deletion for shards."),
@ -1817,6 +1836,36 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.next_operation_id",
gettext_noop("Set the next operation ID to use in operation creation."),
gettext_noop("Operation IDs are normally generated using a sequence. If "
"next_operation_id is set to a non-zero value, operation IDs will "
"instead be generated by incrementing from the value of "
"this GUC and this will be reflected in the GUC. This is "
"mainly useful to ensure consistent operation IDs when running "
"tests in parallel."),
&NextOperationId,
0, 0, INT_MAX,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.next_cleanup_record_id",
gettext_noop("Set the next cleanup record ID to use in operation creation."),
gettext_noop("Cleanup record IDs are normally generated using a sequence. If "
"next_cleanup_record_id is set to a non-zero value, cleanup record IDs will "
"instead be generated by incrementing from the value of "
"this GUC and this will be reflected in the GUC. This is "
"mainly useful to ensure consistent cleanup record IDs when running "
"tests in parallel."),
&NextCleanupRecordId,
0, 0, INT_MAX,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.node_connection_timeout",
gettext_noop("Sets the maximum duration to connect to worker nodes."),

View File

@ -76,3 +76,21 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text);
#include "udfs/isolate_tenant_to_new_shard/11.1-1.sql"
-- Table of records to:
-- 1) Cleanup leftover resources after a failure
-- 2) Deferred drop of old shard placements after a split.
CREATE TABLE citus.pg_dist_cleanup (
record_id bigserial primary key,
operation_id bigint not null,
object_type int not null,
object_name text not null,
node_group_id int not null,
policy_type int not null
);
ALTER TABLE citus.pg_dist_cleanup SET SCHEMA pg_catalog;
-- Sequence used to generate an operation ID for use in pg_dist_cleanup_record.
CREATE SEQUENCE citus.pg_dist_operation_id_seq;
ALTER SEQUENCE citus.pg_dist_operation_id_seq SET SCHEMA pg_catalog;

View File

@ -38,6 +38,7 @@
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/shard_cleaner.h"
#include "distributed/subplan_execution.h"
#include "distributed/version_compat.h"
#include "distributed/worker_log_messages.h"
@ -600,6 +601,7 @@ ResetGlobalVariables()
NodeMetadataSyncOnCommit = false;
InTopLevelDelegatedFunctionCall = false;
InTableTypeConversionFunctionCall = false;
CurrentOperationId = INVALID_OPERATION_ID;
ResetWorkerErrorIndication();
memset(&AllowedDistributionColumnValue, 0,
sizeof(AllowedDistributionColumn));

View File

@ -508,7 +508,11 @@ LockPlacementCleanup(void)
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = false;
SET_LOCKTAG_PLACEMENT_CLEANUP(tag);
/* Moves acquire lock with a constant operation id CITUS_SHARD_MOVE.
* This will change as we add support for parallel moves.
*/
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_SHARD_MOVE);
(void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait);
}
@ -523,7 +527,11 @@ TryLockPlacementCleanup(void)
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = true;
SET_LOCKTAG_PLACEMENT_CLEANUP(tag);
/* Moves acquire lock with a constant operation id CITUS_SHARD_MOVE.
* This will change as we add support for parallel moves.
*/
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_SHARD_MOVE);
bool lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait);
return lockAcquired;
}

View File

@ -222,6 +222,7 @@ extern WorkerNode * LookupNodeForGroup(int32 groupId);
extern Oid CitusCatalogNamespaceId(void);
/* relation oids */
extern Oid DistCleanupRelationId(void);
extern Oid DistColocationRelationId(void);
extern Oid DistColocationConfigurationIndexId(void);
extern Oid DistPartitionRelationId(void);
@ -246,6 +247,7 @@ extern Oid DistTransactionRelationId(void);
extern Oid DistTransactionGroupIndexId(void);
extern Oid DistPlacementGroupidIndexId(void);
extern Oid DistObjectPrimaryKeyIndexId(void);
extern Oid DistCleanupPrimaryKeyIndexId(void);
/* type oids */
extern Oid LookupTypeOid(char *schemaNameSting, char *typeNameString);

View File

@ -0,0 +1,34 @@
/*-------------------------------------------------------------------------
*
* pg_dist_cleanup.h
* definition of the relation that holds the resources to be cleaned up
* in cluster (pg_dist_cleanup).
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_CLEANUP_H
#define PG_DIST_CLEANUP_H
/* ----------------
* compiler constants for pg_dist_cleanup
* ----------------
*/
#define Natts_pg_dist_cleanup 6
#define Anum_pg_dist_cleanup_record_id 1
#define Anum_pg_dist_cleanup_operation_id 2
#define Anum_pg_dist_cleanup_object_type 3
#define Anum_pg_dist_cleanup_object_name 4
#define Anum_pg_dist_cleanup_node_group_id 5
#define Anum_pg_dist_cleanup_policy_type 6
#define PG_CATALOG "pg_catalog"
#define PG_DIST_CLEANUP "pg_dist_cleanup"
#define OPERATIONID_SEQUENCE_NAME "pg_dist_operationid_seq"
#define CLEANUPRECORDID_SEQUENCE_NAME "pg_dist_cleanup_record_id_seq"
#endif /* PG_DIST_CLEANUP_H */

View File

@ -40,7 +40,7 @@ typedef enum AdvisoryLocktagClass
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7,
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8,
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9,
ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP = 10,
ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID = 10,
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12,
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13
} AdvisoryLocktagClass;
@ -48,7 +48,9 @@ typedef enum AdvisoryLocktagClass
/* CitusOperations has constants for citus operations */
typedef enum CitusOperations
{
CITUS_TRANSACTION_RECOVERY = 0
CITUS_TRANSACTION_RECOVERY = 0,
CITUS_SHARD_MOVE = 1
} CitusOperations;
/* reuse advisory lock, but with different, unused field 4 (4)*/
@ -108,12 +110,12 @@ typedef enum CitusOperations
/* reuse advisory lock, but with different, unused field 4 (10)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */
#define SET_LOCKTAG_PLACEMENT_CLEANUP(tag) \
#define SET_LOCKTAG_CLEANUP_OPERATION_ID(tag, operationId) \
SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \
(uint32) 0, \
(uint32) 0, \
ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP)
(uint32) ((operationId) >> 32), \
(uint32) operationId, \
ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID)
/* reuse advisory lock, but with different, unused field 4 (12)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks

View File

@ -17,8 +17,61 @@ extern bool DeferShardDeleteOnMove;
extern double DesiredPercentFreeAfterMove;
extern bool CheckAvailableSpaceBeforeMove;
extern bool DeferShardDeleteOnSplit;
extern int NextOperationId;
extern int NextCleanupRecordId;
extern int TryDropOrphanedShards(bool waitForLocks);
extern int DropOrphanedShards(bool waitForLocks);
extern void DropOrphanedShardsInSeparateTransaction(void);
/* Members for cleanup infrastructure */
typedef uint64 OperationId;
extern OperationId CurrentOperationId;
/*
* CleanupResource represents the Resource type in cleanup records.
*/
typedef enum CleanupObject
{
CLEANUP_INVALID = 0,
CLEANUP_SHARD_PLACEMENT = 1
} CleanupObject;
typedef enum CleanupPolicy
{
/*
* Resources that are transient and always need clean up after the operation is completed.
* (Example: Dummy Shards for Non-Blocking splits)
*/
CLEANUP_ALWAYS = 0,
/*
* Resources that are cleanup only on failure.
* (Example: Split Children for Blocking/Non-Blocking splits)
*/
CLEANUP_ON_FAILURE = 1,
/*
* Resources that are to be deferred cleanup only on success.
* (Example: Parent child being split for Blocking/Non-Blocking splits)
*/
CLEANUP_DEFERRED_ON_SUCCESS = 2,
} CleanupPolicy;
#define INVALID_OPERATION_ID 0
#define INVALID_CLEANUP_RECORD_ID 0
/* APIs for cleanup infrastructure */
extern OperationId StartNewOperationNeedingCleanup(void);
extern void InsertCleanupRecordInCurrentTransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy);
extern void InsertCleanupRecordInSubtransaction(CleanupObject objectType,
char *objectName,
int nodeGroupId,
CleanupPolicy policy);
extern void CompleteNewOperationNeedingCleanup(bool isSuccess);
#endif /*CITUS_SHARD_CLEANER_H */