diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index 0e2bd0ecd..cc1ee7083 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -1267,9 +1267,13 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId, /* set shard storage type according to relation type */ char shardStorageType = ShardStorageType(citusLocalTableId); + /* Default Shard State is Active */ + ShardState shardState = SHARD_STATE_ACTIVE; + text *shardMinValue = NULL; text *shardMaxValue = NULL; - InsertShardRow(citusLocalTableId, shardId, shardStorageType, + InsertShardRow(citusLocalTableId, shardId, + shardStorageType, shardState, shardMinValue, shardMaxValue); List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError()); diff --git a/src/backend/distributed/commands/grant.c b/src/backend/distributed/commands/grant.c index c7861060a..b5fb1f0e4 100644 --- a/src/backend/distributed/commands/grant.c +++ b/src/backend/distributed/commands/grant.c @@ -160,7 +160,9 @@ PreprocessGrantStmt(Node *node, const char *queryString, ddlJob->taskList = NIL; if (IsCitusTable(relationId)) { - ddlJob->taskList = DDLTaskList(relationId, ddlString.data); + /* Propogate latest policies issue on deleted shards to avoid any potential issues */ + bool includeOrphanedShards = true; + ddlJob->taskList = DDLTaskListExtended(relationId, ddlString.data, includeOrphanedShards); } ddlJobs = lappend(ddlJobs, ddlJob); diff --git a/src/backend/distributed/commands/rename.c b/src/backend/distributed/commands/rename.c index 5e313d68c..657e62a9d 100644 --- a/src/backend/distributed/commands/rename.c +++ b/src/backend/distributed/commands/rename.c @@ -179,7 +179,14 @@ PreprocessRenameStmt(Node *node, const char *renameCommand, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, tableRelationId); ddlJob->metadataSyncCommand = renameCommand; - ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand); + + /* + * Rename orphaned shards as well, otherwise the shard's name will be different + * from the distributed table. This will cause shard cleaner to fail as we will + * try to delete the orphaned shard with the wrong (new) name. + */ + bool includeOrphanedShards = true; + ddlJob->taskList = DDLTaskListExtended(tableRelationId, renameCommand, includeOrphanedShards); return list_make1(ddlJob); } diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 9b0a79e22..6ac549ad7 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -1626,7 +1626,7 @@ InvalidateForeignKeyGraphForDDL(void) * given list of shards. */ List * -DDLTaskList(Oid relationId, const char *commandString) +DDLTaskListExtended(Oid relationId, const char *commandString, bool includeOrphanedShards) { List *taskList = NIL; List *shardIntervalList = LoadShardIntervalList(relationId); @@ -1670,6 +1670,18 @@ DDLTaskList(Oid relationId, const char *commandString) } +/* + * DDLTaskList builds a list of tasks to execute a DDL command on a + * given list of shards. + */ +List * +DDLTaskList(Oid relationId, const char *commandString) +{ + bool includeOrphanedShards = false; + return DDLTaskListExtended(relationId, commandString, includeOrphanedShards); +} + + /* * NodeDDLTaskList builds a list of tasks to execute a DDL command on a * given target set of nodes. diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 83fc33720..76e600e30 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -225,7 +225,7 @@ static int32 LocalNodeId = -1; /* built first time through in InitializeDistCache */ static ScanKeyData DistPartitionScanKey[1]; -static ScanKeyData DistShardScanKey[1]; +static ScanKeyData DistShardScanKey[2]; static ScanKeyData DistObjectScanKey[3]; @@ -1590,7 +1590,9 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) &intervalTypeId, &intervalTypeMod); - List *distShardTupleList = LookupDistShardTuples(cacheEntry->relationId); + /* Only load Active shards in the cache */ + bool activeShardsOnly = true; + List *distShardTupleList = LookupDistShardTuples(cacheEntry->relationId, activeShardsOnly); int shardIntervalArrayLength = list_length(distShardTupleList); if (shardIntervalArrayLength > 0) { @@ -3596,6 +3598,14 @@ InitializeDistCache(void) DistShardScanKey[0].sk_collation = InvalidOid; DistShardScanKey[0].sk_attno = Anum_pg_dist_shard_logicalrelid; + fmgr_info_cxt(F_OIDEQ, + &DistShardScanKey[1].sk_func, + MetadataCacheMemoryContext); + DistShardScanKey[1].sk_strategy = BTLessStrategyNumber; + DistShardScanKey[1].sk_subtype = InvalidOid; + DistShardScanKey[1].sk_collation = InvalidOid; + DistShardScanKey[1].sk_attno = Anum_pg_dist_shard_shardstate; + CreateDistTableCache(); CreateShardIdCache(); @@ -4539,18 +4549,20 @@ LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId) * specified relation. */ List * -LookupDistShardTuples(Oid relationId) +LookupDistShardTuples(Oid relationId, bool activeShardsOnly) { List *distShardTupleList = NIL; - ScanKeyData scanKey[1]; + ScanKeyData scanKey[2]; Relation pgDistShard = table_open(DistShardRelationId(), AccessShareLock); /* copy scankey to local copy, it will be modified during the scan */ scanKey[0] = DistShardScanKey[0]; + scanKey[1] = DistShardScanKey[1]; /* set scan arguments */ scanKey[0].sk_argument = ObjectIdGetDatum(relationId); + scanKey[1].sk_argument = (activeShardsOnly) ? SHARD_STATE_ACTIVE : SHARD_STATE_INVALID_LAST; SysScanDesc scanDescriptor = systable_beginscan(pgDistShard, DistShardLogicalRelidIndexId(), true, @@ -4815,9 +4827,12 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray, maxValueExists = true; } + int shardState = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstate - 1]); + ShardInterval *shardInterval = CitusMakeNode(ShardInterval); shardInterval->relationId = relationId; shardInterval->storageType = storageType; + shardInterval->shardState = shardState; shardInterval->valueTypeId = intervalTypeId; shardInterval->valueTypeLen = intervalTypeLen; shardInterval->valueByVal = intervalByVal; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index b45a3641e..2f94f2334 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -132,9 +132,9 @@ static void EnsurePartitionMetadataIsSane(Oid relationId, char distributionMetho int colocationId, char replicationModel, Var *distributionKey); static void EnsureCoordinatorInitiatedOperation(void); -static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType, - text *shardMinValue, - text *shardMaxValue); +static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, + char storageType, int shardState, + text *shardMinValue, text *shardMaxValue); static void EnsureShardPlacementMetadataIsSane(Oid relationId, int64 shardId, int64 placementId, int32 shardState, int64 shardLength, int32 groupId); @@ -162,6 +162,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata); +PG_FUNCTION_INFO_V1(citus_internal_update_shard_and_placement_state_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation); PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata); @@ -1295,7 +1296,7 @@ ShardListInsertCommand(List *shardIntervalList) /* now add shards to insertShardCommand */ StringInfo insertShardCommand = makeStringInfo(); appendStringInfo(insertShardCommand, - "WITH shard_data(relationname, shardid, storagetype, " + "WITH shard_data(relationname, shardid, storagetype, shardstate, " "shardminvalue, shardmaxvalue) AS (VALUES "); foreach_ptr(shardInterval, shardIntervalList) @@ -1328,10 +1329,11 @@ ShardListInsertCommand(List *shardIntervalList) } appendStringInfo(insertShardCommand, - "(%s::regclass, %ld, '%c'::\"char\", %s, %s)", + "(%s::regclass, %ld, '%c'::\"char\", %d, %s, %s)", quote_literal_cstr(qualifiedRelationName), shardId, shardInterval->storageType, + shardInterval->shardState, minHashToken->data, maxHashToken->data); @@ -1345,7 +1347,7 @@ ShardListInsertCommand(List *shardIntervalList) appendStringInfo(insertShardCommand, "SELECT citus_internal_add_shard_metadata(relationname, shardid, " - "storagetype, shardminvalue, shardmaxvalue) " + "storagetype, shardstate, shardminvalue, shardmaxvalue) " "FROM shard_data;"); /* @@ -3196,16 +3198,19 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) PG_ENSURE_ARGNOTNULL(2, "storage type"); char storageType = PG_GETARG_CHAR(2); + PG_ENSURE_ARGNOTNULL(3, "shardstate"); + ShardState shardState = PG_GETARG_INT32(3); + text *shardMinValue = NULL; - if (!PG_ARGISNULL(3)) + if (!PG_ARGISNULL(4)) { - shardMinValue = PG_GETARG_TEXT_P(3); + shardMinValue = PG_GETARG_TEXT_P(4); } text *shardMaxValue = NULL; - if (!PG_ARGISNULL(4)) + if (!PG_ARGISNULL(5)) { - shardMaxValue = PG_GETARG_TEXT_P(4); + shardMaxValue = PG_GETARG_TEXT_P(5); } /* only owner of the table (or superuser) is allowed to add the Citus metadata */ @@ -3224,11 +3229,14 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) * not sane, the user can only affect its own tables. Given that the * user is owner of the table, we should allow. */ - EnsureShardMetadataIsSane(relationId, shardId, storageType, shardMinValue, - shardMaxValue); + EnsureShardMetadataIsSane(relationId, shardId, + storageType, shardState, + shardMinValue, shardMaxValue); } - InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue); + InsertShardRow(relationId, shardId, + storageType, shardState, + shardMinValue, shardMaxValue); PG_RETURN_VOID(); } @@ -3262,7 +3270,8 @@ EnsureCoordinatorInitiatedOperation(void) * for inserting into pg_dist_shard metadata. */ static void -EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType, +EnsureShardMetadataIsSane(Oid relationId, int64 shardId, + char storageType, int shardState, text *shardMinValue, text *shardMaxValue) { if (shardId <= INVALID_SHARD_ID) @@ -3278,6 +3287,13 @@ EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType, errmsg("Invalid shard storage type: %c", storageType))); } + if (!(shardState == SHARD_STATE_ACTIVE || + shardState == SHARD_STATE_TO_DELETE)) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Invalid shard state type: %c", shardState))); + } + char partitionMethod = PartitionMethodViaCatalog(relationId); if (partitionMethod == DISTRIBUTE_BY_INVALID) { @@ -3296,7 +3312,9 @@ EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType, "reference and local tables: %c", partitionMethod))); } - List *distShardTupleList = LookupDistShardTuples(relationId); + + bool activeShardsOnly = true; + List *distShardTupleList = LookupDistShardTuples(relationId, activeShardsOnly); if (partitionMethod == DISTRIBUTE_BY_NONE) { if (shardMinValue != NULL || shardMaxValue != NULL) @@ -3574,6 +3592,84 @@ citus_internal_update_placement_metadata(PG_FUNCTION_ARGS) } +/* + * citus_internal_update_shard_and_placement_state_metadata is an internal UDF to + * update shardState value for Shards and Placements in pg_dist_shard and pg_dist_placement + * catalogs. + */ +Datum +citus_internal_update_shard_and_placement_state_metadata(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + int64 shardId = PG_GETARG_INT64(0); + ShardState newState = PG_GETARG_INT32(1); + + ShardPlacement *placement = NULL; + if (!ShouldSkipMetadataChecks()) + { + /* this UDF is not allowed allowed for executing as a separate command */ + EnsureCoordinatorInitiatedOperation(); + + if (!ShardExists(shardId)) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Shard id does not exists: %ld", shardId))); + } + + bool missingOk = false; + EnsureShardOwner(shardId, missingOk); + + /* + * This function ensures that the source group exists hence we + * call it from this code-block. + */ + List *shardPlacementList = ActiveShardPlacementList(shardId); + + /* Split is only allowed for shards with a single placement */ + Assert(list_length(shardPlacementList) == 1); + placement = linitial(shardPlacementList); + + WorkerNode *workerNode = FindNodeWithNodeId(placement->nodeId, + false /* missingOk */); + if (!workerNode) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Node with group id %d for shard placement " + "%ld does not exist", workerNode->groupId, shardId))); + } + } + else + { + /* + * This function ensures that the source group exists hence we + * call it from this code-block. + */ + List *shardPlacementList = ActiveShardPlacementList(shardId); + + /* Split is only allowed for shards with a single placement */ + Assert(list_length(shardPlacementList) == 1); + placement = linitial(shardPlacementList); + } + + /* + * Updating pg_dist_placement ensures that the node with targetGroupId + * exists and this is the only placement on that group. + */ + if (placement == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Active placement for shard %ld is not " + "found", shardId))); + } + + UpdateShardState(shardId, newState); + UpdateShardPlacementState(placement->placementId, newState); + + PG_RETURN_VOID(); +} + + /* * citus_internal_delete_shard_metadata is an internal UDF to * delete a row in pg_dist_shard and corresponding placement rows diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index ff85f6930..71c3c0ed2 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -55,6 +55,7 @@ #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" +#include "distributed/shardinterval_utils.h" #include "nodes/makefuncs.h" #include "parser/scansup.h" #include "storage/lmgr.h" @@ -1070,7 +1071,48 @@ LoadShardIntervalList(Oid relationId) /* - * LoadUnsortedShardIntervalListViaCatalog returns a list of shard intervals related for a + * LoadShardIntervalListIncludingOrphansViaCatalog returns a list of sorted shard intervals + * for a given distributed table. The function returns an empty list if no shards can be found + * for the given relation. + * + * This function does not use CitusTableCache and instead reads from catalog tables + * directly. + */ +List * +LoadShardIntervalListIncludingOrphansViaCatalog(Oid relationId) +{ + List *shardIntervalList = LoadUnsortedShardIntervalListIncludingOrphansViaCatalog(relationId); + + // Transform into a temporary array to sort. + ShardInterval **shardIntervalArray = (ShardInterval **) PointerArrayFromList(shardIntervalList); + int shardIntervalArrayLength = list_length(shardIntervalList); + + /* + * Although we are loading intervals from catalog, use cache to + * get the partition column and compare function. + */ + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); + ShardInterval **sortedShardIntervalArray = NULL; + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) + { + sortedShardIntervalArray = shardIntervalArray; + } + else + { + sortedShardIntervalArray = SortShardIntervalArray(shardIntervalArray, + shardIntervalArrayLength, + cacheEntry->partitionColumn-> + varcollid, + cacheEntry->shardIntervalCompareFunction); + } + + List *sortedShardIntervalList = ShardArrayToList(sortedShardIntervalArray, shardIntervalArrayLength); + return sortedShardIntervalList; +} + + +/* + * LoadUnsortedShardIntervalListIncludingOrphansViaCatalog returns a list of shard intervals related for a * given distributed table. The function returns an empty list if no shards can be found * for the given relation. * @@ -1078,10 +1120,12 @@ LoadShardIntervalList(Oid relationId) * directly. */ List * -LoadUnsortedShardIntervalListViaCatalog(Oid relationId) +LoadUnsortedShardIntervalListIncludingOrphansViaCatalog(Oid relationId) { List *shardIntervalList = NIL; - List *distShardTuples = LookupDistShardTuples(relationId); + + bool activeShardsOnly = false; + List *distShardTuples = LookupDistShardTuples(relationId, activeShardsOnly); Relation distShardRelation = table_open(DistShardRelationId(), AccessShareLock); TupleDesc distShardTupleDesc = RelationGetDescr(distShardRelation); Oid intervalTypeId = InvalidOid; @@ -1198,6 +1242,7 @@ CopyShardInterval(ShardInterval *srcInterval) destInterval->type = srcInterval->type; destInterval->relationId = srcInterval->relationId; destInterval->storageType = srcInterval->storageType; + destInterval->shardState = srcInterval->shardState; destInterval->valueTypeId = srcInterval->valueTypeId; destInterval->valueTypeLen = srcInterval->valueTypeLen; destInterval->valueByVal = srcInterval->valueByVal; @@ -1632,7 +1677,8 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) * null min/max values in case they are creating an empty shard. */ void -InsertShardRow(Oid relationId, uint64 shardId, char storageType, +InsertShardRow(Oid relationId, uint64 shardId, + char storageType, int shardState, text *shardMinValue, text *shardMaxValue) { Datum values[Natts_pg_dist_shard]; @@ -1661,6 +1707,8 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType, isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true; } + values[Anum_pg_dist_shard_shardstate - 1] = Int32GetDatum(shardState); + /* open shard relation and insert new tuple */ Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock); @@ -1942,6 +1990,55 @@ DeleteShardPlacementRow(uint64 placementId) } +/* + * UpdateShardState sets the shardState for the shard identified by shardId. + */ +void +UpdateShardState(uint64 shardId, char shardState) +{ + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + bool indexOK = true; + Datum values[Natts_pg_dist_shard]; + bool isnull[Natts_pg_dist_shard]; + bool replace[Natts_pg_dist_shard]; + bool colIsNull = false; + + Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard); + ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistShard, + DistShardShardidIndexId(), indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for shard " + UINT64_FORMAT, shardId))); + } + + memset(replace, 0, sizeof(replace)); + + values[Anum_pg_dist_shard_shardstate - 1] = CharGetDatum(shardState); + isnull[Anum_pg_dist_shard_shardstate - 1] = false; + replace[Anum_pg_dist_shard_shardstate - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + CatalogTupleUpdate(pgDistShard, &heapTuple->t_self, heapTuple); + + Assert(!colIsNull); + CitusInvalidateRelcacheByShardId(shardId); + + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + table_close(pgDistShard, NoLock); +} + + /* * UpdateShardPlacementState sets the shardState for the placement identified * by placementId. diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index b9841dabf..c43846a8e 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -195,6 +195,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, /* set shard storage type according to relation type */ char shardStorageType = ShardStorageType(distributedTableId); + /* shard state is active by default */ + ShardState shardState = SHARD_STATE_ACTIVE; + for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++) { uint32 roundRobinNodeIndex = shardIndex % workerNodeCount; @@ -214,7 +217,8 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, text *minHashTokenText = IntegerToText(shardMinHashToken); text *maxHashTokenText = IntegerToText(shardMaxHashToken); - InsertShardRow(distributedTableId, shardId, shardStorageType, + InsertShardRow(distributedTableId, shardId, + shardStorageType, shardState, minHashTokenText, maxHashTokenText); List *currentInsertedShardPlacements = InsertShardPlacementRows( @@ -289,8 +293,10 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool text *shardMaxValueText = IntegerToText(shardMaxValue); List *sourceShardPlacementList = ShardPlacementListWithoutOrphanedPlacements( sourceShardId); + const ShardState targetShardState = SHARD_STATE_ACTIVE; - InsertShardRow(targetRelationId, newShardId, targetShardStorageType, + InsertShardRow(targetRelationId, newShardId, + targetShardStorageType, targetShardState, shardMinValueText, shardMaxValueText); ShardPlacement *sourcePlacement = NULL; @@ -370,8 +376,12 @@ CreateReferenceTableShard(Oid distributedTableId) /* get the next shard id */ uint64 shardId = GetNextShardId(); - InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue, - shardMaxValue); + /* shard state is active by default */ + ShardState shardState = SHARD_STATE_ACTIVE; + + InsertShardRow(distributedTableId, shardId, + shardStorageType, shardState, + shardMinValue, shardMaxValue); List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId, nodeList, workerStartIndex, diff --git a/src/backend/distributed/operations/delete_protocol.c b/src/backend/distributed/operations/delete_protocol.c index 84a22737b..6fc2cb88e 100644 --- a/src/backend/distributed/operations/delete_protocol.c +++ b/src/backend/distributed/operations/delete_protocol.c @@ -139,7 +139,7 @@ citus_drop_all_shards(PG_FUNCTION_ARGS) */ LockRelationOid(relationId, AccessExclusiveLock); - List *shardIntervalList = LoadUnsortedShardIntervalListViaCatalog(relationId); + List *shardIntervalList = LoadUnsortedShardIntervalListIncludingOrphansViaCatalog(relationId); int droppedShardCount = DropShards(relationId, schemaName, relationName, shardIntervalList, dropShardsMetadataOnly); diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index f446bbadd..8a672ca55 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -23,6 +23,7 @@ #include "distributed/colocation_utils.h" #include "distributed/hash_helpers.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_utility.h" #include "distributed/shardinterval_utils.h" #include "distributed/coordinator_protocol.h" #include "distributed/connection_management.h" @@ -40,6 +41,7 @@ #include "distributed/shardsplit_logical_replication.h" #include "distributed/deparse_shard_query.h" #include "distributed/shard_rebalancer.h" +#include "distributed/shard_cleaner.h" #include "postmaster/postmaster.h" /* @@ -135,7 +137,7 @@ static void AddDummyShardEntryInMap(HTAB *mapOfDummyShards, uint32 targetNodeId, static void DropDummyShards(HTAB *mapOfDummyShardToPlacement); static void DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval); static uint64 GetNextShardIdForSplitChild(void); - +static void MarkShardListWithPlacementsForDrop(List *shardIntervalList); /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = @@ -149,6 +151,8 @@ static const char *const SplitTargetName[] = [ISOLATE_TENANT_TO_NEW_SHARD] = "tenant", }; +bool DeferShardDeleteOnSplit = false; + /* Function definitions */ /* @@ -555,14 +559,23 @@ BlockingShardSplit(SplitOperation splitOperation, * require additional clean-up in case of failure. The remaining operations * going forward are part of the same distributed transaction. */ - - /* - * Drop old shards and delete related metadata. Have to do that before - * creating the new shard metadata, because there's cross-checks - * preventing inconsistent metadata (like overlapping shards). - */ - DropShardList(sourceColocatedShardIntervalList); - + if (DeferShardDeleteOnSplit) + { + /* + * Defer deletion of source shard and only mark + * shard metadata for deletion. + */ + MarkShardListWithPlacementsForDrop(sourceColocatedShardIntervalList); + } + else + { + /* + * Drop old shards and delete related metadata. Have to do that before + * creating the new shard metadata, because there's cross-checks + * preventing inconsistent metadata (like overlapping shards). + */ + DropShardList(sourceColocatedShardIntervalList); + } /* Insert new shard and placement metdata */ InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, workersForPlacementList); @@ -1019,6 +1032,7 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, shardInterval->relationId, shardInterval->shardId, shardInterval->storageType, + shardInterval->shardState, IntegerToText(DatumGetInt32(shardInterval->minValue)), IntegerToText(DatumGetInt32(shardInterval->maxValue))); @@ -1137,6 +1151,43 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, } +/* + * MarkShardListWithPlacementsForDrop update shards and placements metadata from both + * the coordinator and mx nodes to TO_DELETE state. + */ +static void +MarkShardListWithPlacementsForDrop(List *shardIntervalList) +{ + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardIntervalList) + { + uint64 shardId = shardInterval->shardId; + ShardState newState = SHARD_STATE_TO_DELETE; + + UpdateShardState(shardId, newState); + List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); + + /* Only single placement allowed (already validated RelationReplicationFactor = 1) */ + Assert(list_length(shardPlacementList) == 1); + + ShardPlacement *placement = (ShardPlacement *) linitial(shardPlacementList); + UpdateShardPlacementState(placement->placementId, newState); + + /* sync metadata with all other worked nodes */ + bool shouldSyncMetadata = ShouldSyncTableMetadata(shardInterval->relationId); + if (shouldSyncMetadata) + { + StringInfo updateShardCommand = makeStringInfo(); + appendStringInfo(updateShardCommand, + "SELECT citus_internal_update_shard_and_placement_state_metadata(%ld, %d)", + shardId, + newState); + + SendCommandToWorkersWithMetadata(updateShardCommand->data); + } + } +} + /* * DropShardList drops shards and their metadata from both the coordinator and * mx nodes. @@ -1430,13 +1481,23 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* 19) Drop Publications */ DropPublications(sourceConnection, publicationInfoHash); - - /* - * 20) Drop old shards and delete related metadata. Have to do that before - * creating the new shard metadata, because there's cross-checks - * preventing inconsistent metadata (like overlapping shards). - */ - DropShardList(sourceColocatedShardIntervalList); + if (DeferShardDeleteOnSplit) + { + /* + * 18) Defer deletion of source shard and only mark + * shard metadata for deletion. + */ + MarkShardListWithPlacementsForDrop(sourceColocatedShardIntervalList); + } + else + { + /* + * 18) Drop old shards and delete related metadata. Have to do that before + * creating the new shard metadata, because there's cross-checks + * preventing inconsistent metadata (like overlapping shards). + */ + DropShardList(sourceColocatedShardIntervalList); + } /* 21) Insert new shard and placement metdata */ InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index e67691e44..e4cbff5b6 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -103,6 +103,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) text *nullMinValue = NULL; text *nullMaxValue = NULL; char storageType = SHARD_STORAGE_TABLE; + char shardState = SHARD_STATE_ACTIVE; Oid relationId = ResolveRelationId(relationNameText, false); @@ -184,7 +185,9 @@ master_create_empty_shard(PG_FUNCTION_ARGS) candidateNodeIndex++; } - InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue); + InsertShardRow(relationId, shardId, + storageType, shardState, + nullMinValue, nullMaxValue); CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList, ShardReplicationFactor); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index df1f1bfcb..755454b7d 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -4543,6 +4543,7 @@ GenerateSyntheticShardIntervalArray(int partitionCount) shardInterval->shardId = INVALID_SHARD_ID; shardInterval->valueTypeId = INT4OID; + shardInterval->shardState = SHARD_STATE_INVALID_FIRST; shardIntervalArray[shardIndex] = shardInterval; } diff --git a/src/backend/distributed/planner/shard_pruning.c b/src/backend/distributed/planner/shard_pruning.c index 665c9a75b..d7561722d 100644 --- a/src/backend/distributed/planner/shard_pruning.c +++ b/src/backend/distributed/planner/shard_pruning.c @@ -263,7 +263,6 @@ static void AddHashRestrictionToInstance(ClauseWalkerContext *context, OpExpr *o Var *varClause, Const *constantClause); static void AddNewConjuction(ClauseWalkerContext *context, PruningTreeNode *node); static PruningInstance * CopyPartialPruningInstance(PruningInstance *sourceInstance); -static List * ShardArrayToList(ShardInterval **shardArray, int length); static List * DeepCopyShardIntervalList(List *originalShardIntervalList); static int PerformValueCompare(FunctionCallInfo compareFunctionCall, Datum a, Datum b); @@ -1361,25 +1360,6 @@ CopyPartialPruningInstance(PruningInstance *sourceInstance) } -/* - * ShardArrayToList builds a list of out the array of ShardInterval*. - */ -static List * -ShardArrayToList(ShardInterval **shardArray, int length) -{ - List *shardIntervalList = NIL; - - for (int shardIndex = 0; shardIndex < length; shardIndex++) - { - ShardInterval *shardInterval = - shardArray[shardIndex]; - shardIntervalList = lappend(shardIntervalList, shardInterval); - } - - return shardIntervalList; -} - - /* * DeepCopyShardIntervalList copies originalShardIntervalList and the * contained ShardIntervals, into a new list. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 1d00ccf5c..9073b2e01 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -880,6 +880,25 @@ RegisterCitusConfigVariables(void) 0, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.defer_drop_after_shard_split", + gettext_noop("When enabled a shard split will mark the original shards " + "for deletion after a successful split, instead of deleting " + "them right away."), + gettext_noop("The deletion of a shard can sometimes run into a conflict with a " + "long running transactions on a the shard during the drop phase of " + "the shard split. This causes some moves to be rolled back after " + "resources have been spend on splitting the shard. To prevent " + "conflicts this feature lets you skip the actual deletion till a " + "later point in time. When used one should set " + "citus.defer_shard_delete_interval to make sure defered deletions " + "will be executed"), + &DeferShardDeleteOnSplit, + false, + PGC_USERSET, + 0, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.defer_shard_delete_interval", gettext_noop("Sets the time to wait between background deletion for shards."), diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 80c40238e..cfafee53a 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -73,3 +73,8 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ #include "udfs/worker_split_shard_replication_setup/11.1-1.sql" #include "udfs/citus_isolation_test_session_is_blocked/11.1-1.sql" #include "udfs/replicate_reference_tables/11.1-1.sql" + +-- Changes for Shard Split Deferred Drop (default value SHARD_STATE_ACTIVE) +#include "udfs/citus_internal_add_shard_metadata/11.1-1.sql" +#include "udfs/citus_internal_update_shard_and_placement_state_metadata/11.1-1.sql" +ALTER TABLE pg_catalog.pg_dist_shard ADD COLUMN shardstate INT NOT NULL DEFAULT 1; diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql index be213c26e..32818ed4a 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql @@ -92,3 +92,8 @@ DROP FUNCTION pg_catalog.citus_locks(); DROP FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode); #include "../udfs/replicate_reference_tables/9.3-2.sql" + +-- Changes for Shard Split Deferred Drop +#include "../udfs/citus_internal_add_shard_metadata/10.2-1.sql" +ALTER TABLE pg_catalog.pg_dist_shard DROP COLUMN shardstate; +DROP FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(shard_id bigint, shardState integer); diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/11.1-1.sql new file mode 100644 index 000000000..f7f62dc9f --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/11.1-1.sql @@ -0,0 +1,10 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( + relation_id regclass, shard_id bigint, + storage_type "char", shardstate integer, + shard_min_value text, shard_max_value text + ) + RETURNS void + LANGUAGE C + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", integer, text, text) IS + 'Inserts into pg_dist_shard with user checks'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql index 7411d9179..f7f62dc9f 100644 --- a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql @@ -1,10 +1,10 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( relation_id regclass, shard_id bigint, - storage_type "char", shard_min_value text, - shard_max_value text + storage_type "char", shardstate integer, + shard_min_value text, shard_max_value text ) RETURNS void LANGUAGE C AS 'MODULE_PATHNAME'; -COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text) IS +COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", integer, text, text) IS 'Inserts into pg_dist_shard with user checks'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_update_shard_and_placement_state_metadata/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_internal_update_shard_and_placement_state_metadata/11.1-1.sql new file mode 100644 index 000000000..bf88b26ce --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_update_shard_and_placement_state_metadata/11.1-1.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata( + shard_id bigint, + shardState integer) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(bigint, integer) IS + 'Updates into pg_dist_shard and pg_dist_placement with user checks'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_update_shard_and_placement_state_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_update_shard_and_placement_state_metadata/latest.sql new file mode 100644 index 000000000..bf88b26ce --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_update_shard_and_placement_state_metadata/latest.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata( + shard_id bigint, + shardState integer) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(bigint, integer) IS + 'Updates into pg_dist_shard and pg_dist_placement with user checks'; diff --git a/src/backend/distributed/test/distribution_metadata.c b/src/backend/distributed/test/distribution_metadata.c index 6d769ef27..bb1ca35db 100644 --- a/src/backend/distributed/test/distribution_metadata.c +++ b/src/backend/distributed/test/distribution_metadata.c @@ -224,8 +224,9 @@ create_monolithic_shard_row(PG_FUNCTION_ARGS) text *minInfoText = cstring_to_text(minInfo->data); text *maxInfoText = cstring_to_text(maxInfo->data); - InsertShardRow(distributedTableId, newShardId, SHARD_STORAGE_TABLE, minInfoText, - maxInfoText); + InsertShardRow(distributedTableId, newShardId, + SHARD_STORAGE_TABLE, SHARD_STATE_ACTIVE, + minInfoText, maxInfoText); PG_RETURN_INT64(newShardId); } diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index c7453189b..dcd8f0336 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -485,3 +485,21 @@ SingleReplicatedTable(Oid relationId) return true; } + +/* + * ShardArrayToList builds a list of out the array of ShardInterval*. + */ +List * +ShardArrayToList(ShardInterval **shardArray, int length) +{ + List *shardIntervalList = NIL; + + for (int shardIndex = 0; shardIndex < length; shardIndex++) + { + ShardInterval *shardInterval = + shardArray[shardIndex]; + shardIntervalList = lappend(shardIntervalList, shardInterval); + } + + return shardIntervalList; +} diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index b0b55a2cd..9b7e53d53 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -95,6 +95,7 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString, extern void MarkInvalidateForeignKeyGraph(void); extern void InvalidateForeignKeyGraphForDDL(void); extern List * DDLTaskList(Oid relationId, const char *commandString); +extern List * DDLTaskListExtended(Oid relationId, const char *commandString, bool includeOrphanedShards); extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands); extern bool AlterTableInProgress(void); extern bool DropSchemaOrDBInProgress(void); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 92f8a4514..32732e25b 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -149,7 +149,7 @@ extern int GetCitusCreationLevel(void); extern bool IsCitusTable(Oid relationId); extern bool IsCitusTableViaCatalog(Oid relationId); extern char PgDistPartitionViaCatalog(Oid relationId); -extern List * LookupDistShardTuples(Oid relationId); +extern List * LookupDistShardTuples(Oid relationId, bool activeShardsOnly); extern char PartitionMethodViaCatalog(Oid relationId); extern Var * PartitionColumnViaCatalog(Oid relationId); extern bool IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 56e15d171..1b05aa4e4 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -67,6 +67,7 @@ typedef struct ShardInterval Datum maxValue; /* a shard's typed max value datum */ uint64 shardId; int shardIndex; + ShardState shardState; } ShardInterval; @@ -212,7 +213,8 @@ extern Datum citus_relation_size(PG_FUNCTION_ARGS); /* Function declarations to read shard and shard placement data */ extern uint32 TableShardReplicationFactor(Oid relationId); extern List * LoadShardIntervalList(Oid relationId); -extern List * LoadUnsortedShardIntervalListViaCatalog(Oid relationId); +extern List * LoadShardIntervalListIncludingOrphansViaCatalog(Oid relationId); +extern List * LoadUnsortedShardIntervalListIncludingOrphansViaCatalog(Oid relationId); extern ShardInterval * LoadShardIntervalWithLongestShardName(Oid relationId); extern int ShardIntervalCount(Oid relationId); extern List * LoadShardList(Oid relationId); @@ -238,9 +240,11 @@ extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); /* Function declarations to modify shard and shard placement data */ -extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, +extern void InsertShardRow(Oid relationId, uint64 shardId, + char storageType, int shardState, text *shardMinValue, text *shardMaxValue); extern void DeleteShardRow(uint64 shardId); +extern void UpdateShardState(uint64 shardId, char shardState); extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, char shardState, uint64 shardLength, int32 groupId); diff --git a/src/include/distributed/pg_dist_shard.h b/src/include/distributed/pg_dist_shard.h index 5c98b755f..e2f66dbae 100644 --- a/src/include/distributed/pg_dist_shard.h +++ b/src/include/distributed/pg_dist_shard.h @@ -43,13 +43,14 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard; * compiler constants for pg_dist_shards * ---------------- */ -#define Natts_pg_dist_shard 6 +#define Natts_pg_dist_shard 7 #define Anum_pg_dist_shard_logicalrelid 1 #define Anum_pg_dist_shard_shardid 2 #define Anum_pg_dist_shard_shardstorage 3 #define Anum_pg_dist_shard_shardalias_DROPPED 4 #define Anum_pg_dist_shard_shardminvalue 5 #define Anum_pg_dist_shard_shardmaxvalue 6 +#define Anum_pg_dist_shard_shardstate 7 /* * Valid values for shard storage types include foreign table, (standard) table diff --git a/src/include/distributed/relay_utility.h b/src/include/distributed/relay_utility.h index 35c66761d..e3b228069 100644 --- a/src/include/distributed/relay_utility.h +++ b/src/include/distributed/relay_utility.h @@ -30,12 +30,15 @@ * * The numbers assigned per state used for historical reason and should * not be changed since they correspond to shardstate in pg_dist_placement. + * LookupDistShardTuples also depends on the order of these states. */ typedef enum { SHARD_STATE_INVALID_FIRST = 0, SHARD_STATE_ACTIVE = 1, SHARD_STATE_TO_DELETE = 4, + + SHARD_STATE_INVALID_LAST = 5 } ShardState; diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index 8a98254f9..6605f2ec1 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -14,6 +14,7 @@ /* GUC to configure deferred shard deletion */ extern int DeferShardDeleteInterval; extern bool DeferShardDeleteOnMove; +extern bool DeferShardDeleteOnSplit; extern double DesiredPercentFreeAfterMove; extern bool CheckAvailableSpaceBeforeMove; diff --git a/src/include/distributed/shardinterval_utils.h b/src/include/distributed/shardinterval_utils.h index 4cc99e6d5..5c0f3180e 100644 --- a/src/include/distributed/shardinterval_utils.h +++ b/src/include/distributed/shardinterval_utils.h @@ -55,6 +55,7 @@ extern int SearchCachedShardInterval(Datum partitionColumnValue, int shardCount, Oid shardIntervalCollation, FmgrInfo *compareFunction); extern bool SingleReplicatedTable(Oid relationId); +extern List * ShardArrayToList(ShardInterval **shardArray, int length); #endif /* SHARDINTERVAL_UTILS_H_ */