users/niupre/DeferredDropSimplified
Nitish Upreti 2022-08-15 16:26:56 -07:00
parent 0653200151
commit 3b1dbed8d0
9 changed files with 29 additions and 16 deletions

View File

@ -162,7 +162,8 @@ PreprocessGrantStmt(Node *node, const char *queryString,
{
/* Propogate latest policies issue on deleted shards to avoid any potential issues */
bool includeOrphanedShards = true;
ddlJob->taskList = DDLTaskListExtended(relationId, ddlString.data, includeOrphanedShards);
ddlJob->taskList = DDLTaskListExtended(relationId, ddlString.data,
includeOrphanedShards);
}
ddlJobs = lappend(ddlJobs, ddlJob);

View File

@ -186,7 +186,8 @@ PreprocessRenameStmt(Node *node, const char *renameCommand,
* try to delete the orphaned shard with the wrong (new) name.
*/
bool includeOrphanedShards = true;
ddlJob->taskList = DDLTaskListExtended(tableRelationId, renameCommand, includeOrphanedShards);
ddlJob->taskList = DDLTaskListExtended(tableRelationId, renameCommand,
includeOrphanedShards);
return list_make1(ddlJob);
}

View File

@ -1592,7 +1592,8 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
/* Only load Active shards in the cache */
bool activeShardsOnly = true;
List *distShardTupleList = LookupDistShardTuples(cacheEntry->relationId, activeShardsOnly);
List *distShardTupleList = LookupDistShardTuples(cacheEntry->relationId,
activeShardsOnly);
int shardIntervalArrayLength = list_length(distShardTupleList);
if (shardIntervalArrayLength > 0)
{
@ -4562,7 +4563,8 @@ LookupDistShardTuples(Oid relationId, bool activeShardsOnly)
/* set scan arguments */
scanKey[0].sk_argument = ObjectIdGetDatum(relationId);
scanKey[1].sk_argument = (activeShardsOnly) ? SHARD_STATE_ACTIVE : SHARD_STATE_INVALID_LAST;
scanKey[1].sk_argument = (activeShardsOnly) ? SHARD_STATE_ACTIVE :
SHARD_STATE_INVALID_LAST;
SysScanDesc scanDescriptor = systable_beginscan(pgDistShard,
DistShardLogicalRelidIndexId(), true,

View File

@ -3271,7 +3271,7 @@ EnsureCoordinatorInitiatedOperation(void)
*/
static void
EnsureShardMetadataIsSane(Oid relationId, int64 shardId,
char storageType, int shardState,
char storageType, int shardState,
text *shardMinValue, text *shardMaxValue)
{
if (shardId <= INVALID_SHARD_ID)

View File

@ -1081,10 +1081,12 @@ LoadShardIntervalList(Oid relationId)
List *
LoadShardIntervalListIncludingOrphansViaCatalog(Oid relationId)
{
List *shardIntervalList = LoadUnsortedShardIntervalListIncludingOrphansViaCatalog(relationId);
List *shardIntervalList = LoadUnsortedShardIntervalListIncludingOrphansViaCatalog(
relationId);
// Transform into a temporary array to sort.
ShardInterval **shardIntervalArray = (ShardInterval **) PointerArrayFromList(shardIntervalList);
/* Transform into a temporary array to sort. */
ShardInterval **shardIntervalArray = (ShardInterval **) PointerArrayFromList(
shardIntervalList);
int shardIntervalArrayLength = list_length(shardIntervalList);
/*
@ -1103,10 +1105,12 @@ LoadShardIntervalListIncludingOrphansViaCatalog(Oid relationId)
shardIntervalArrayLength,
cacheEntry->partitionColumn->
varcollid,
cacheEntry->shardIntervalCompareFunction);
cacheEntry->
shardIntervalCompareFunction);
}
List *sortedShardIntervalList = ShardArrayToList(sortedShardIntervalArray, shardIntervalArrayLength);
List *sortedShardIntervalList = ShardArrayToList(sortedShardIntervalArray,
shardIntervalArrayLength);
return sortedShardIntervalList;
}

View File

@ -139,7 +139,8 @@ citus_drop_all_shards(PG_FUNCTION_ARGS)
*/
LockRelationOid(relationId, AccessExclusiveLock);
List *shardIntervalList = LoadUnsortedShardIntervalListIncludingOrphansViaCatalog(relationId);
List *shardIntervalList = LoadUnsortedShardIntervalListIncludingOrphansViaCatalog(
relationId);
int droppedShardCount = DropShards(relationId, schemaName, relationName,
shardIntervalList, dropShardsMetadataOnly);

View File

@ -576,6 +576,7 @@ BlockingShardSplit(SplitOperation splitOperation,
*/
DropShardList(sourceColocatedShardIntervalList);
}
/* Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);
@ -1188,6 +1189,7 @@ MarkShardListWithPlacementsForDrop(List *shardIntervalList)
}
}
/*
* DropShardList drops shards and their metadata from both the coordinator and
* mx nodes.
@ -1492,10 +1494,10 @@ NonBlockingShardSplit(SplitOperation splitOperation,
else
{
/*
* 18) Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks
* preventing inconsistent metadata (like overlapping shards).
*/
* 18) Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks
* preventing inconsistent metadata (like overlapping shards).
*/
DropShardList(sourceColocatedShardIntervalList);
}

View File

@ -486,6 +486,7 @@ SingleReplicatedTable(Oid relationId)
return true;
}
/*
* ShardArrayToList builds a list of out the array of ShardInterval*.
*/

View File

@ -95,7 +95,8 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString,
extern void MarkInvalidateForeignKeyGraph(void);
extern void InvalidateForeignKeyGraphForDDL(void);
extern List * DDLTaskList(Oid relationId, const char *commandString);
extern List * DDLTaskListExtended(Oid relationId, const char *commandString, bool includeOrphanedShards);
extern List * DDLTaskListExtended(Oid relationId, const char *commandString, bool
includeOrphanedShards);
extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands);
extern bool AlterTableInProgress(void);
extern bool DropSchemaOrDBInProgress(void);