diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index 0e2bd0ecd..0a70590cd 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -1266,10 +1266,11 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId, /* set shard storage type according to relation type */ char shardStorageType = ShardStorageType(citusLocalTableId); + ShardState shardState = SHARD_STATE_ACTIVE; text *shardMinValue = NULL; text *shardMaxValue = NULL; - InsertShardRow(citusLocalTableId, shardId, shardStorageType, + InsertShardRow(citusLocalTableId, shardId, shardStorageType, shardState, shardMinValue, shardMaxValue); List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError()); diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 6e1ff984a..3b621975b 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -1616,6 +1616,7 @@ List * DDLTaskList(Oid relationId, const char *commandString) { List *taskList = NIL; + // TODO(niupre): This should be all shards? List *shardIntervalList = LoadShardIntervalList(relationId); Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); diff --git a/src/backend/distributed/executor/partitioned_intermediate_results.c b/src/backend/distributed/executor/partitioned_intermediate_results.c index 752552343..c170f345e 100644 --- a/src/backend/distributed/executor/partitioned_intermediate_results.c +++ b/src/backend/distributed/executor/partitioned_intermediate_results.c @@ -357,7 +357,8 @@ QueryTupleShardSearchInfo(ArrayType *minValuesArray, ArrayType *maxValuesArray, [Anum_pg_dist_shard_shardid - 1] = partitionIndex, [Anum_pg_dist_shard_shardstorage - 1] = SHARD_STORAGE_VIRTUAL, [Anum_pg_dist_shard_shardminvalue - 1] = minValues[partitionIndex], - [Anum_pg_dist_shard_shardmaxvalue - 1] = maxValues[partitionIndex] + [Anum_pg_dist_shard_shardmaxvalue - 1] = maxValues[partitionIndex], + [Anum_pg_dist_shard_shardstate- 1] = SHARD_STATE_INVALID_FIRST }; bool nullsArray[Natts_pg_dist_shard] = { [Anum_pg_dist_shard_shardminvalue - 1] = minValueNulls[partitionIndex], diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 83fc33720..c5e555f19 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -1778,7 +1778,9 @@ ErrorIfInconsistentShardIntervals(CitusTableCacheEntry *cacheEntry) { /* * If table is hash-partitioned and has shards, there never should be any - * uninitalized shards. Historically we've not prevented that for range + * uninitalized shards. An exception to this is shard marked as TO_DELETED + * after split that are pending deletion. + * Historically we've not prevented that for range * partitioned tables, but it might be a good idea to start doing so. */ if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && @@ -1900,7 +1902,11 @@ HasOverlappingShardInterval(ShardInterval **shardIntervalArray, curShardInterval->minValue); comparisonResult = DatumGetInt32(comparisonDatum); - if (comparisonResult >= 0) + // If one of the shards are marked as TO_DELETED, ignore the overlap. + bool markedForDelete = (lastShardInterval->shardState == SHARD_STATE_TO_DELETE || + curShardInterval->shardState == SHARD_STATE_TO_DELETE); + + if (!markedForDelete && comparisonResult >= 0) { return true; } @@ -4815,6 +4821,8 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray, maxValueExists = true; } + char shardState = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstate - 1]); + ShardInterval *shardInterval = CitusMakeNode(ShardInterval); shardInterval->relationId = relationId; shardInterval->storageType = storageType; @@ -4826,6 +4834,7 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray, shardInterval->minValue = minValue; shardInterval->maxValue = maxValue; shardInterval->shardId = shardId; + shardInterval->shardState = shardState; return shardInterval; } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index b45a3641e..2fb7eb7f6 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -132,9 +132,9 @@ static void EnsurePartitionMetadataIsSane(Oid relationId, char distributionMetho int colocationId, char replicationModel, Var *distributionKey); static void EnsureCoordinatorInitiatedOperation(void); -static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType, - text *shardMinValue, - text *shardMaxValue); +static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, + char storageType, char shardState, + text *shardMinValue, text *shardMaxValue); static void EnsureShardPlacementMetadataIsSane(Oid relationId, int64 shardId, int64 placementId, int32 shardState, int64 shardLength, int32 groupId); @@ -162,6 +162,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata); +PG_FUNCTION_INFO_V1(citus_internal_update_shard_and_placement_state_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation); PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata); @@ -1295,7 +1296,7 @@ ShardListInsertCommand(List *shardIntervalList) /* now add shards to insertShardCommand */ StringInfo insertShardCommand = makeStringInfo(); appendStringInfo(insertShardCommand, - "WITH shard_data(relationname, shardid, storagetype, " + "WITH shard_data(relationname, shardid, storagetype, shardstate, " "shardminvalue, shardmaxvalue) AS (VALUES "); foreach_ptr(shardInterval, shardIntervalList) @@ -1328,10 +1329,11 @@ ShardListInsertCommand(List *shardIntervalList) } appendStringInfo(insertShardCommand, - "(%s::regclass, %ld, '%c'::\"char\", %s, %s)", + "(%s::regclass, %ld, '%c'::\"char\", %d, %s, %s)", quote_literal_cstr(qualifiedRelationName), shardId, shardInterval->storageType, + shardInterval->shardState, minHashToken->data, maxHashToken->data); @@ -1345,7 +1347,7 @@ ShardListInsertCommand(List *shardIntervalList) appendStringInfo(insertShardCommand, "SELECT citus_internal_add_shard_metadata(relationname, shardid, " - "storagetype, shardminvalue, shardmaxvalue) " + "storagetype, shardstate, shardminvalue, shardmaxvalue) " "FROM shard_data;"); /* @@ -3196,16 +3198,19 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) PG_ENSURE_ARGNOTNULL(2, "storage type"); char storageType = PG_GETARG_CHAR(2); + PG_ENSURE_ARGNOTNULL(2, "shardstate"); + ShardState shardState = PG_GETARG_INT32(3); + text *shardMinValue = NULL; if (!PG_ARGISNULL(3)) { - shardMinValue = PG_GETARG_TEXT_P(3); + shardMinValue = PG_GETARG_TEXT_P(4); } text *shardMaxValue = NULL; if (!PG_ARGISNULL(4)) { - shardMaxValue = PG_GETARG_TEXT_P(4); + shardMaxValue = PG_GETARG_TEXT_P(5); } /* only owner of the table (or superuser) is allowed to add the Citus metadata */ @@ -3224,11 +3229,11 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) * not sane, the user can only affect its own tables. Given that the * user is owner of the table, we should allow. */ - EnsureShardMetadataIsSane(relationId, shardId, storageType, shardMinValue, + EnsureShardMetadataIsSane(relationId, shardId, storageType, shardState, shardMinValue, shardMaxValue); } - InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue); + InsertShardRow(relationId, shardId, storageType, shardState, shardMinValue, shardMaxValue); PG_RETURN_VOID(); } @@ -3262,7 +3267,8 @@ EnsureCoordinatorInitiatedOperation(void) * for inserting into pg_dist_shard metadata. */ static void -EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType, +EnsureShardMetadataIsSane(Oid relationId, int64 shardId, + char storageType, char shardState, text *shardMinValue, text *shardMaxValue) { if (shardId <= INVALID_SHARD_ID) @@ -3278,6 +3284,13 @@ EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType, errmsg("Invalid shard storage type: %c", storageType))); } + if (!(shardState == SHARD_STATE_ACTIVE || + shardState == SHARD_STATE_TO_DELETE)) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Invalid shard state type: %c", shardState))); + } + char partitionMethod = PartitionMethodViaCatalog(relationId); if (partitionMethod == DISTRIBUTE_BY_INVALID) { @@ -3362,8 +3375,10 @@ EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType, Datum firstMin = Int32GetDatum(shardMinValueInt); Datum firstMax = Int32GetDatum(shardMaxValueInt); + ShardState firstState = shardState; Datum secondMin = shardInterval->minValue; Datum secondMax = shardInterval->maxValue; + ShardState secondState = shardInterval->shardState; Oid collationId = InvalidOid; /* @@ -3380,6 +3395,7 @@ EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType, } if (ShardIntervalsOverlapWithParams(firstMin, firstMax, secondMin, secondMax, + firstState, secondState, shardIntervalCompareFunction, collationId)) { @@ -3574,6 +3590,83 @@ citus_internal_update_placement_metadata(PG_FUNCTION_ARGS) } +/* + * citus_internal_update_shard_and_placement_state_metadata is an internal UDF to + * update shardState value for Shard and Placement in pg_dist_shard and pg_dist_placement. + */ +Datum +citus_internal_update_shard_and_placement_state_metadata(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + int64 shardId = PG_GETARG_INT64(0); + ShardState newState = PG_GETARG_INT32(1); + + ShardPlacement *placement = NULL; + if (!ShouldSkipMetadataChecks()) + { + /* this UDF is not allowed allowed for executing as a separate command */ + EnsureCoordinatorInitiatedOperation(); + + if (!ShardExists(shardId)) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Shard id does not exists: %ld", shardId))); + } + + bool missingOk = false; + EnsureShardOwner(shardId, missingOk); + + /* + * This function ensures that the source group exists hence we + * call it from this code-block. + */ + List *shardPlacementList = ActiveShardPlacementList(shardId); + + /* Split is only allowed for shards with a single placement */ + Assert(list_length(shardPlacementList) == 1); + placement = linitial(shardPlacementList); + + WorkerNode *workerNode = FindNodeWithNodeId(placement->nodeId, + false /* missingOk */); + if (!workerNode) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Node with group id %d for shard placement " + "%ld does not exist", workerNode->groupId, shardId))); + } + } + else + { + /* + * This function ensures that the source group exists hence we + * call it from this code-block. + */ + List *shardPlacementList = ActiveShardPlacementList(shardId); + + /* Split is only allowed for shards with a single placement */ + Assert(list_length(shardPlacementList) == 1); + placement = linitial(shardPlacementList); + } + + /* + * Updating pg_dist_placement ensures that the node with targetGroupId + * exists and this is the only placement on that group. + */ + if (placement == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Active placement for shard %ld is not " + "found", shardId))); + } + + UpdateShardState(shardId, newState); + UpdateShardPlacementState(placement->placementId, newState); + + PG_RETURN_VOID(); +} + + /* * citus_internal_delete_shard_metadata is an internal UDF to * delete a row in pg_dist_shard and corresponding placement rows diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index ff85f6930..29f4983a7 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1198,6 +1198,7 @@ CopyShardInterval(ShardInterval *srcInterval) destInterval->type = srcInterval->type; destInterval->relationId = srcInterval->relationId; destInterval->storageType = srcInterval->storageType; + destInterval->shardState = srcInterval->shardState; destInterval->valueTypeId = srcInterval->valueTypeId; destInterval->valueTypeLen = srcInterval->valueTypeLen; destInterval->valueByVal = srcInterval->valueByVal; @@ -1314,6 +1315,17 @@ IsActiveShardPlacement(ShardPlacement *shardPlacement) } +/* + * IsActiveShardInterval checks if the shard interval is labelled as + * active. + */ +bool +IsActiveShardInterval(ShardInterval *shardInterval) +{ + return (shardInterval->shardState == SHARD_STATE_ACTIVE); +} + + /* * FilterShardPlacementList filters a list of shard placements based on a filter. * Keep only the shard for which the filter function returns true. @@ -1633,7 +1645,7 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) */ void InsertShardRow(Oid relationId, uint64 shardId, char storageType, - text *shardMinValue, text *shardMaxValue) + int shardState, text *shardMinValue, text *shardMaxValue) { Datum values[Natts_pg_dist_shard]; bool isNulls[Natts_pg_dist_shard]; @@ -1661,6 +1673,8 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType, isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true; } + values[Anum_pg_dist_shard_shardstate - 1] = Int32GetDatum(shardState); + /* open shard relation and insert new tuple */ Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock); @@ -1942,6 +1956,56 @@ DeleteShardPlacementRow(uint64 placementId) } +/* + * UpdateShardState sets the shardState for the shard identified + * by shardId. + */ +void +UpdateShardState(uint64 shardId, char shardState) +{ + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + bool indexOK = true; + Datum values[Natts_pg_dist_shard]; + bool isnull[Natts_pg_dist_shard]; + bool replace[Natts_pg_dist_shard]; + bool colIsNull = false; + + Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard); + ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistShard, + DistShardShardidIndexId(), indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for shard " + UINT64_FORMAT, shardId))); + } + + memset(replace, 0, sizeof(replace)); + + values[Anum_pg_dist_shard_shardstate - 1] = CharGetDatum(shardState); + isnull[Anum_pg_dist_shard_shardstate - 1] = false; + replace[Anum_pg_dist_shard_shardstate - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + CatalogTupleUpdate(pgDistShard, &heapTuple->t_self, heapTuple); + + Assert(!colIsNull); + CitusInvalidateRelcacheByShardId(shardId); + + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + table_close(pgDistShard, NoLock); +} + + /* * UpdateShardPlacementState sets the shardState for the placement identified * by placementId. diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index b9841dabf..6ec84dcc0 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -195,6 +195,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, /* set shard storage type according to relation type */ char shardStorageType = ShardStorageType(distributedTableId); + /* shard state is active by default */ + ShardState shardState = SHARD_STATE_ACTIVE; + for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++) { uint32 roundRobinNodeIndex = shardIndex % workerNodeCount; @@ -214,7 +217,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, text *minHashTokenText = IntegerToText(shardMinHashToken); text *maxHashTokenText = IntegerToText(shardMaxHashToken); - InsertShardRow(distributedTableId, shardId, shardStorageType, + InsertShardRow(distributedTableId, shardId, shardStorageType, shardState, minHashTokenText, maxHashTokenText); List *currentInsertedShardPlacements = InsertShardPlacementRows( @@ -262,6 +265,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool LockRelationOid(sourceRelationId, AccessShareLock); /* prevent placement changes of the source relation until we colocate with them */ + // TODO(niupre): We should only return ACTIVE shards, not ALL shards. List *sourceShardIntervalList = LoadShardIntervalList(sourceRelationId); LockShardListMetadata(sourceShardIntervalList, ShareLock); @@ -283,6 +287,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool uint64 sourceShardId = sourceShardInterval->shardId; uint64 newShardId = GetNextShardId(); + int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue); int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue); text *shardMinValueText = IntegerToText(shardMinValue); @@ -290,14 +295,14 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool List *sourceShardPlacementList = ShardPlacementListWithoutOrphanedPlacements( sourceShardId); - InsertShardRow(targetRelationId, newShardId, targetShardStorageType, + const ShardState shardState = SHARD_STATE_ACTIVE; + InsertShardRow(targetRelationId, newShardId, targetShardStorageType, shardState, shardMinValueText, shardMaxValueText); ShardPlacement *sourcePlacement = NULL; foreach_ptr(sourcePlacement, sourceShardPlacementList) { int32 groupId = sourcePlacement->groupId; - const ShardState shardState = SHARD_STATE_ACTIVE; const uint64 shardSize = 0; /* @@ -370,7 +375,10 @@ CreateReferenceTableShard(Oid distributedTableId) /* get the next shard id */ uint64 shardId = GetNextShardId(); - InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue, + /* shard state is active by default */ + ShardState shardState = SHARD_STATE_ACTIVE; + + InsertShardRow(distributedTableId, shardId, shardStorageType, shardState, shardMinValue, shardMaxValue); List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId, diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 6ef99a321..ffb80673d 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -22,6 +22,7 @@ #include "distributed/adaptive_executor.h" #include "distributed/colocation_utils.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_utility.h" #include "distributed/shardinterval_utils.h" #include "distributed/coordinator_protocol.h" #include "distributed/connection_management.h" @@ -39,6 +40,7 @@ #include "distributed/shardsplit_logical_replication.h" #include "distributed/deparse_shard_query.h" #include "distributed/shard_rebalancer.h" +#include "distributed/shard_cleaner.h" #include "postmaster/postmaster.h" /* @@ -129,7 +131,7 @@ static void AddDummyShardEntryInMap(HTAB *mapOfDummyShards, uint32 targetNodeId, static void DropDummyShards(HTAB *mapOfDummyShardToPlacement); static void DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval); static uint64 GetNextShardIdForSplitChild(void); - +static void MarkShardListForDrop(List *shardIntervalList); /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = @@ -143,6 +145,8 @@ static const char *const SplitTargetName[] = [ISOLATE_TENANT_TO_NEW_SHARD] = "tenant", }; +bool DeferShardDeleteOnSplit = false; + /* Function definitions */ /* @@ -550,12 +554,23 @@ BlockingShardSplit(SplitOperation splitOperation, * going forward are part of the same distributed transaction. */ - /* - * Drop old shards and delete related metadata. Have to do that before - * creating the new shard metadata, because there's cross-checks - * preventing inconsistent metadata (like overlapping shards). - */ - DropShardList(sourceColocatedShardIntervalList); + if (DeferShardDeleteOnSplit) + { + /* + * Defer deletion of source shard and only mark + * shard metadata for deletion. + */ + MarkShardListForDrop(sourceColocatedShardIntervalList); + } + else + { + /* + * Drop old shards and delete related metadata. Have to do that before + * creating the new shard metadata, because there's cross-checks + * preventing inconsistent metadata (like overlapping shards). + */ + DropShardList(sourceColocatedShardIntervalList); + } /* Insert new shard and placement metdata */ InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, @@ -1013,6 +1028,7 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, shardInterval->relationId, shardInterval->shardId, shardInterval->storageType, + shardInterval->shardState, IntegerToText(DatumGetInt32(shardInterval->minValue)), IntegerToText(DatumGetInt32(shardInterval->maxValue))); @@ -1131,6 +1147,43 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, } } +/* + * MarkShardListForDrop drops shards and their metadata from both the coordinator and + * mx nodes. + */ +static void +MarkShardListForDrop(List *shardIntervalList) +{ + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardIntervalList) + { + uint64 shardId = shardInterval->shardId; + ShardState newState = SHARD_STATE_TO_DELETE; + + UpdateShardState(shardId, newState); + List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); + + /* Only single placement allowed (already validated RelationReplicationFactor = 1) */ + Assert(list_length(shardPlacementList) == 1); + + ShardPlacement *placement = (ShardPlacement *) linitial(shardPlacementList); + UpdateShardPlacementState(placement->placementId, newState); + + /* sync metadata with all other worked nodes */ + bool shouldSyncMetadata = ShouldSyncTableMetadata(shardInterval->relationId); + if (shouldSyncMetadata) + { + StringInfo updateShardCommand = makeStringInfo(); + appendStringInfo(updateShardCommand, + "SELECT citus_internal_update_shard_and_placement_state_metadata(%ld, %d)", + shardId, + newState); + + SendCommandToWorkersWithMetadata(updateShardCommand->data); + } + } +} + /* * DropShardList drops shards and their metadata from both the coordinator and @@ -1412,12 +1465,23 @@ NonBlockingShardSplit(SplitOperation splitOperation, shardIntervalToSplit->shardId)); DropShardSplitReplicationSlots(sourceConnection, replicationSlotInfoList); - /* - * 18) Drop old shards and delete related metadata. Have to do that before - * creating the new shard metadata, because there's cross-checks - * preventing inconsistent metadata (like overlapping shards). - */ - DropShardList(sourceColocatedShardIntervalList); + if (DeferShardDeleteOnSplit) + { + /* + * Defer deletion of source shard and only mark + * shard metadata for deletion. + */ + MarkShardListForDrop(sourceColocatedShardIntervalList); + } + else + { + /* + * Drop old shards and delete related metadata. Have to do that before + * creating the new shard metadata, because there's cross-checks + * preventing inconsistent metadata (like overlapping shards). + */ + DropShardList(sourceColocatedShardIntervalList); + } /* 19) Insert new shard and placement metdata */ InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index e67691e44..aa6f82014 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -168,6 +168,9 @@ master_create_empty_shard(PG_FUNCTION_ARGS) attemptableNodeCount = ShardReplicationFactor; } + /* shard state is active by default */ + char shardState = SHARD_STATE_ACTIVE; + /* first retrieve a list of random nodes for shard placements */ while (candidateNodeIndex < attemptableNodeCount) { @@ -184,7 +187,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) candidateNodeIndex++; } - InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue); + InsertShardRow(relationId, shardId, storageType, shardState, nullMinValue, nullMaxValue); CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList, ShardReplicationFactor); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index df1f1bfcb..372162518 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -3942,13 +3942,17 @@ ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterva Datum firstMin = firstInterval->minValue; Datum firstMax = firstInterval->maxValue; + char firstState = firstInterval->shardState; Datum secondMin = secondInterval->minValue; Datum secondMax = secondInterval->maxValue; + char secondState = secondInterval->shardState; FmgrInfo *comparisonFunction = intervalRelation->shardIntervalCompareFunction; Oid collation = intervalRelation->partitionColumn->varcollid; - return ShardIntervalsOverlapWithParams(firstMin, firstMax, secondMin, secondMax, + return ShardIntervalsOverlapWithParams(firstMin, firstMax, + secondMin, secondMax, + firstState, secondState, comparisonFunction, collation); } @@ -3959,8 +3963,10 @@ ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterva * The caller is responsible to ensure the input shard min/max values are not NULL. */ bool -ShardIntervalsOverlapWithParams(Datum firstMin, Datum firstMax, Datum secondMin, - Datum secondMax, FmgrInfo *comparisonFunction, +ShardIntervalsOverlapWithParams(Datum firstMin, Datum firstMax, + Datum secondMin, Datum secondMax, + char firstState, char secondState, + FmgrInfo *comparisonFunction, Oid collation) { /* @@ -3976,7 +3982,11 @@ ShardIntervalsOverlapWithParams(Datum firstMin, Datum firstMax, Datum secondMin, int firstComparison = DatumGetInt32(firstDatum); int secondComparison = DatumGetInt32(secondDatum); - if (firstComparison < 0 || secondComparison < 0) + // If one of the shards are marked as TO_DELETED, ignore the overlap. + bool markedForDelete = (firstState == SHARD_STATE_TO_DELETE || + secondState == SHARD_STATE_TO_DELETE); + + if (firstComparison < 0 || secondComparison < 0 || markedForDelete) { return false; } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 7c04f9088..326de38e2 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -880,6 +880,25 @@ RegisterCitusConfigVariables(void) 0, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.defer_drop_after_shard_split", + gettext_noop("When enabled a shard split will mark the original shards " + "for deletion after a successful split, instead of deleting " + "them right away."), + gettext_noop("The deletion of a shard can sometimes run into a conflict with a " + "long running transactions on a the shard during the drop phase of " + "the shard split. This causes some moves to be rolled back after " + "resources have been spend on splitting the shard. To prevent " + "conflicts this feature lets you skip the actual deletion till a " + "later point in time. When used one should set " + "citus.defer_shard_delete_interval to make sure defered deletions " + "will be executed"), + &DeferShardDeleteOnSplit, + false, + PGC_USERSET, + 0, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.defer_shard_delete_interval", gettext_noop("Sets the time to wait between background deletion for shards."), diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 7160882b0..63920e3c9 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -72,3 +72,8 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ #include "udfs/worker_copy_table_to_node/11.1-1.sql" #include "udfs/worker_split_shard_replication_setup/11.1-1.sql" #include "udfs/citus_isolation_test_session_is_blocked/11.1-1.sql" + +-- Changes for Shard Split Deferred Drop (default value SHARD_STATE_ACTIVE) +#include "udfs/citus_internal_add_shard_metadata/11.1-1.sql" +#include "udfs/citus_internal_update_shard_and_placement_state_metadata/11.1-1.sql" +ALTER TABLE pg_catalog.pg_dist_shard ADD COLUMN shardstate INT NOT NULL DEFAULT 1; diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql index 002ea471b..960fbd7eb 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql @@ -89,3 +89,17 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ DROP VIEW pg_catalog.citus_locks; DROP FUNCTION pg_catalog.citus_locks(); + +-- Changes for Shard Split Deferred Drop +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( + relation_id regclass, shard_id bigint, + storage_type "char", shard_min_value text, + shard_max_value text + ) + RETURNS void + LANGUAGE C + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text) IS + 'Inserts into pg_dist_shard with user checks'; +ALTER TABLE pg_catalog.pg_dist_shard DROP COLUMN shardstate; +DROP FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(shard_id bigint, shardState integer); diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/11.1-1.sql new file mode 100644 index 000000000..f7f62dc9f --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/11.1-1.sql @@ -0,0 +1,10 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( + relation_id regclass, shard_id bigint, + storage_type "char", shardstate integer, + shard_min_value text, shard_max_value text + ) + RETURNS void + LANGUAGE C + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", integer, text, text) IS + 'Inserts into pg_dist_shard with user checks'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_update_shard_and_placement_state_metadata/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_internal_update_shard_and_placement_state_metadata/11.1-1.sql new file mode 100644 index 000000000..bf88b26ce --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_update_shard_and_placement_state_metadata/11.1-1.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata( + shard_id bigint, + shardState integer) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(bigint, integer) IS + 'Updates into pg_dist_shard and pg_dist_placement with user checks'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_update_shard_and_placement_state_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_update_shard_and_placement_state_metadata/latest.sql new file mode 100644 index 000000000..bf88b26ce --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_update_shard_and_placement_state_metadata/latest.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata( + shard_id bigint, + shardState integer) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(bigint, integer) IS + 'Updates into pg_dist_shard and pg_dist_placement with user checks'; diff --git a/src/backend/distributed/test/distribution_metadata.c b/src/backend/distributed/test/distribution_metadata.c index 6d769ef27..bb1ca35db 100644 --- a/src/backend/distributed/test/distribution_metadata.c +++ b/src/backend/distributed/test/distribution_metadata.c @@ -224,8 +224,9 @@ create_monolithic_shard_row(PG_FUNCTION_ARGS) text *minInfoText = cstring_to_text(minInfo->data); text *maxInfoText = cstring_to_text(maxInfo->data); - InsertShardRow(distributedTableId, newShardId, SHARD_STORAGE_TABLE, minInfoText, - maxInfoText); + InsertShardRow(distributedTableId, newShardId, + SHARD_STORAGE_TABLE, SHARD_STATE_ACTIVE, + minInfoText, maxInfoText); PG_RETURN_INT64(newShardId); } diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 56e15d171..d7b39b10e 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -67,6 +67,7 @@ typedef struct ShardInterval Datum maxValue; /* a shard's typed max value datum */ uint64 shardId; int shardIndex; + ShardState shardState; } ShardInterval; @@ -220,6 +221,7 @@ extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval); extern uint64 ShardLength(uint64 shardId); extern bool NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements); +extern bool IsActiveShardInterval(ShardInterval *shardInterval); extern bool IsActiveShardPlacement(ShardPlacement *ShardPlacement); extern List * FilterShardPlacementList(List *shardPlacementList, bool (*filter)( ShardPlacement *)); @@ -239,7 +241,7 @@ extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, - text *shardMinValue, text *shardMaxValue); + int shardState, text *shardMinValue, text *shardMaxValue); extern void DeleteShardRow(uint64 shardId); extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, char shardState, uint64 shardLength, @@ -250,6 +252,7 @@ extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, extern void UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted); extern void DeletePartitionRow(Oid distributedRelationId); extern void DeleteShardRow(uint64 shardId); +extern void UpdateShardState(uint64 shardId, char shardState); extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern void UpdatePlacementGroupId(uint64 placementId, int groupId); extern void DeleteShardPlacementRow(uint64 placementId); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 14fdd7a0c..6e53a3a32 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -560,6 +560,7 @@ extern bool ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterval); extern bool ShardIntervalsOverlapWithParams(Datum firstMin, Datum firstMax, Datum secondMin, Datum secondMax, + char firstState, char secondState, FmgrInfo *comparisonFunction, Oid collation); extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); diff --git a/src/include/distributed/pg_dist_shard.h b/src/include/distributed/pg_dist_shard.h index 5c98b755f..e2f66dbae 100644 --- a/src/include/distributed/pg_dist_shard.h +++ b/src/include/distributed/pg_dist_shard.h @@ -43,13 +43,14 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard; * compiler constants for pg_dist_shards * ---------------- */ -#define Natts_pg_dist_shard 6 +#define Natts_pg_dist_shard 7 #define Anum_pg_dist_shard_logicalrelid 1 #define Anum_pg_dist_shard_shardid 2 #define Anum_pg_dist_shard_shardstorage 3 #define Anum_pg_dist_shard_shardalias_DROPPED 4 #define Anum_pg_dist_shard_shardminvalue 5 #define Anum_pg_dist_shard_shardmaxvalue 6 +#define Anum_pg_dist_shard_shardstate 7 /* * Valid values for shard storage types include foreign table, (standard) table diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index 8a98254f9..6605f2ec1 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -14,6 +14,7 @@ /* GUC to configure deferred shard deletion */ extern int DeferShardDeleteInterval; extern bool DeferShardDeleteOnMove; +extern bool DeferShardDeleteOnSplit; extern double DesiredPercentFreeAfterMove; extern bool CheckAvailableSpaceBeforeMove;