niupre/DeferredDrop
Nitish Upreti 2022-08-12 17:26:44 -07:00
parent 479ccfae89
commit 47814141fa
17 changed files with 464 additions and 122 deletions

View File

@ -79,7 +79,10 @@ PreprocessClusterStmt(Node *node, const char *clusterCommand,
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->metadataSyncCommand = clusterCommand;
ddlJob->taskList = DDLTaskList(relationId, clusterCommand);
/* we don't need to do cluster deleted shards */
bool includeOrphanedShards = false;
ddlJob->taskList = DDLTaskList(relationId, clusterCommand, includeOrphanedShards);
return list_make1(ddlJob);
}

View File

@ -160,7 +160,9 @@ PreprocessGrantStmt(Node *node, const char *queryString,
ddlJob->taskList = NIL;
if (IsCitusTable(relationId))
{
ddlJob->taskList = DDLTaskList(relationId, ddlString.data);
/* Propogate latest policies issue on deleted shards to avoid any potential issues */
bool includeOrphanedShards = true;
ddlJob->taskList = DDLTaskList(relationId, ddlString.data, includeOrphanedShards);
}
ddlJobs = lappend(ddlJobs, ddlJob);

View File

@ -275,7 +275,10 @@ PostprocessCreatePolicyStmt(Node *node, const char *queryString)
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->metadataSyncCommand = pstrdup(ddlCommand);
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
/* Propogate latest policies issue on deleted shards to avoid any potential issues */
bool includeOrphanedShards = true;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards);
relation_close(relation, NoLock);
@ -408,7 +411,10 @@ PreprocessAlterPolicyStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relOid);
ddlJob->metadataSyncCommand = pstrdup(ddlString.data);
ddlJob->taskList = DDLTaskList(relOid, ddlString.data);
/* Propogate latest policies issue on deleted shards to avoid any potential issues */
bool includeOrphanedShards = true;
ddlJob->taskList = DDLTaskList(relOid, ddlString.data, includeOrphanedShards);
relation_close(relation, NoLock);
@ -516,7 +522,10 @@ PreprocessDropPolicyStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relOid);
ddlJob->metadataSyncCommand = queryString;
ddlJob->taskList = DDLTaskList(relOid, queryString);
/* Propogate latest policies issue on deleted shards to avoid potential issues */
bool includeOrphanedShards = true;
ddlJob->taskList = DDLTaskList(relOid, queryString, includeOrphanedShards);
ddlJobs = lappend(ddlJobs, ddlJob);
}

View File

@ -179,7 +179,14 @@ PreprocessRenameStmt(Node *node, const char *renameCommand,
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, tableRelationId);
ddlJob->metadataSyncCommand = renameCommand;
ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand);
/*
* Rename orphaned shards as well, otherwise the shard's name will be different
* from the distributed table. This will cause shard cleaner to fail as we will
* try to delete the orphaned shard with the wrong (new) name.
*/
bool includeOrphanedShards = true;
ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand, includeOrphanedShards);
return list_make1(ddlJob);
}

View File

@ -93,7 +93,10 @@ PreprocessCreateStatisticsStmt(Node *node, const char *queryString,
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
/* We don't need to do create statistics on deleted shards */
bool includeOrphanedShards = false;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards);
List *ddlJobs = list_make1(ddlJob);
@ -202,7 +205,10 @@ PreprocessDropStatisticsStmt(Node *node, const char *queryString,
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
/* we don't need to do drop statistics on deleted shards */
bool includeOrphanedShards = false;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards);
ddlJobs = lappend(ddlJobs, ddlJob);
}
@ -268,7 +274,10 @@ PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString,
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
/* we don't need to do ALTER statistics on deleted shards */
bool includeOrphanedShards = false;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards);
List *ddlJobs = list_make1(ddlJob);
@ -306,7 +315,10 @@ PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString,
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
/* we don't need to do ALTER statistics on deleted shards */
bool includeOrphanedShards = false;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards);
List *ddlJobs = list_make1(ddlJob);
@ -426,7 +438,10 @@ PreprocessAlterStatisticsStmt(Node *node, const char *queryString,
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
/* we don't need to do ALTER statistics on deleted shards */
bool includeOrphanedShards = false;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards);
List *ddlJobs = list_make1(ddlJob);
@ -464,7 +479,12 @@ PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString,
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
/* Propogate owner changes on deleted shards to avoid any potential issues */
// TODO(niupre): Can this cause failure when we try to drop orphaned table?
// If this is the case, do we need to allow CREATE STATS as well?
bool includeOrphanedShards = true;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand, includeOrphanedShards);
List *ddlJobs = list_make1(ddlJob);

View File

@ -1162,7 +1162,10 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
else
{
/* ... otherwise use standard DDL task list function */
ddlJob->taskList = DDLTaskList(leftRelationId, sqlForTaskList);
/* Propogate latest updates issue on deleted shards to avoid any potential issues */
bool includeOrphanedShards = true;
ddlJob->taskList = DDLTaskList(leftRelationId, sqlForTaskList, includeOrphanedShards);
if (!propagateCommandToWorkers)
{
ddlJob->taskList = NIL;
@ -1835,7 +1838,10 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
QualifyTreeNode((Node *) stmt);
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->metadataSyncCommand = DeparseTreeNode((Node *) stmt);
ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand);
/* Propogate latest schema on deleted shards to avoid any potential issues */
bool includeOrphanedShards = true;
ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand, includeOrphanedShards);
return list_make1(ddlJob);
}

View File

@ -719,13 +719,16 @@ CitusCreateTriggerCommandDDLJob(Oid relationId, char *triggerName,
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->metadataSyncCommand = queryString;
/* we don't need to do CREATE trigger on deleted shards */
bool includeOrphanedShards = false;
if (!triggerName)
{
/*
* ENABLE/DISABLE TRIGGER ALL/USER commands do not specify trigger
* name.
*/
ddlJob->taskList = DDLTaskList(relationId, queryString);
ddlJob->taskList = DDLTaskList(relationId, queryString, includeOrphanedShards);
return list_make1(ddlJob);
}
@ -745,7 +748,7 @@ CitusCreateTriggerCommandDDLJob(Oid relationId, char *triggerName,
/* we don't have truncate triggers on shard relations */
if (!TRIGGER_FOR_TRUNCATE(triggerType))
{
ddlJob->taskList = DDLTaskList(relationId, queryString);
ddlJob->taskList = DDLTaskList(relationId, queryString, includeOrphanedShards);
}
return list_make1(ddlJob);

View File

@ -127,6 +127,7 @@ TruncateTaskList(Oid relationId)
char *schemaName = get_namespace_name(schemaId);
char *relationName = get_rel_name(relationId);
/* No need to propoagate Truncate command as shards will be dropped anyways */
List *shardIntervalList = LoadShardIntervalList(relationId);
/* lock metadata before getting placement lists */

View File

@ -1613,11 +1613,12 @@ InvalidateForeignKeyGraphForDDL(void)
* given list of shards.
*/
List *
DDLTaskList(Oid relationId, const char *commandString)
DDLTaskList(Oid relationId, const char *commandString, bool includeOrphanedShards)
{
List *taskList = NIL;
// TODO(niupre): This should be all shards?
List *shardIntervalList = LoadShardIntervalList(relationId);
List *shardIntervalList =
(includeOrphanedShards) ? LoadShardIntervalListWithOrphanedShards(relationId)
: LoadShardIntervalList(relationId);
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
char *escapedSchemaName = quote_literal_cstr(schemaName);

View File

@ -113,6 +113,15 @@ typedef struct CitusTableCacheEntrySlot
bool isValid;
} CitusTableCacheEntrySlot;
/*
* ShardInterval Array Type.
*/
typedef enum ShardIntervalArrayType
{
ACTIVE_SHARD_ARRAY,
TO_DELETE_SHARD_ARRAY
} ShardIntervalArrayType;
/*
* ShardIdCacheEntry is the entry type for ShardIdCacheHash.
@ -127,6 +136,9 @@ typedef struct ShardIdCacheEntry
/* pointer to the table entry to which this shard currently belongs */
CitusTableCacheEntry *tableEntry;
/* shard interval type */
ShardIntervalArrayType arrayType;
/* index of the shard interval in the sortedShardIntervalArray of the table entry */
int shardIndex;
} ShardIdCacheEntry;
@ -249,7 +261,9 @@ static void RegisterLocalGroupIdCacheCallbacks(void);
static void RegisterAuthinfoCacheCallbacks(void);
static void RegisterCitusTableCacheEntryReleaseCallbacks(void);
static void ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry);
static void RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *tableEntry);
static void RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *tableEntry,
ShardInterval **sortedShardIntervalArray,
int shardIntervalArrayLength);
static void CreateDistTableCache(void);
static void CreateShardIdCache(void);
static void CreateDistObjectCache(void);
@ -275,7 +289,7 @@ static void CachedRelationNamespaceLookupExtended(const char *relationName,
bool missing_ok);
static ShardPlacement * ResolveGroupShardPlacement(
GroupShardPlacement *groupShardPlacement, CitusTableCacheEntry *tableEntry,
int shardIndex);
ShardIntervalArrayType arrayType, int shardIndex);
static Oid LookupEnumValueId(Oid typeId, char *valueName);
static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSlot);
static void InvalidateDistTableCache(void);
@ -284,6 +298,13 @@ static void InitializeTableCacheEntry(int64 shardId);
static bool IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
CitusTableType tableType);
static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry);
static void BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayLength,
ShardIntervalArrayType arrayType, CitusTableCacheEntry *cacheEntry);
static void FreeCitusTableCacheShardAndPlacementEntryFromArray(
ShardInterval **shardIntervalArray,
int shardIntervalArrayLength,
GroupShardPlacement **arrayOfPlacementArrays,
int *arrayOfPlacementArrayLengths);
static Oid DistAuthinfoRelationId(void);
static Oid DistAuthinfoIndexId(void);
@ -702,12 +723,20 @@ LoadShardInterval(uint64 shardId)
ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId);
CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry;
int shardIndex = shardIdEntry->shardIndex;
ShardIntervalArrayType arrayType = shardIdEntry->arrayType;
/* the offset better be in a valid range */
Assert(shardIndex < tableEntry->shardIntervalArrayLength);
ShardInterval *sourceShardInterval =
tableEntry->sortedShardIntervalArray[shardIndex];
ShardInterval *sourceShardInterval = NULL;
if (arrayType == ACTIVE_SHARD_ARRAY)
{
Assert(shardIndex < tableEntry->shardIntervalArrayLength);
sourceShardInterval = tableEntry->sortedShardIntervalArray[shardIndex];
}
else
{
Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength);
sourceShardInterval = tableEntry->sortedOrphanedShardIntervalArray[shardIndex];
}
/* copy value to return */
ShardInterval *shardInterval = CopyShardInterval(sourceShardInterval);
@ -771,14 +800,26 @@ LoadGroupShardPlacement(uint64 shardId, uint64 placementId)
ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId);
CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry;
int shardIndex = shardIdEntry->shardIndex;
ShardIntervalArrayType arrayType = shardIdEntry->arrayType;
/* the offset better be in a valid range */
Assert(shardIndex < tableEntry->shardIntervalArrayLength);
GroupShardPlacement *placementArray =
tableEntry->arrayOfPlacementArrays[shardIndex];
int numberOfPlacements =
tableEntry->arrayOfPlacementArrayLengths[shardIndex];
GroupShardPlacement *placementArray = NULL;
int numberOfPlacements = 0;
if (arrayType == ACTIVE_SHARD_ARRAY)
{
/* the offset better be in a valid range */
Assert(shardIndex < tableEntry->shardIntervalArrayLength);
placementArray = tableEntry->arrayOfPlacementArrays[shardIndex];
numberOfPlacements =
tableEntry->arrayOfPlacementArrayLengths[shardIndex];
}
else if (arrayType == TO_DELETE_SHARD_ARRAY)
{
/* the offset better be in a valid range */
Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength);
placementArray = tableEntry->arrayOfOrphanedPlacementArrays[shardIndex];
numberOfPlacements =
tableEntry->arrayOfOrphanedPlacementArrayLengths[shardIndex];
}
for (int i = 0; i < numberOfPlacements; i++)
{
@ -806,9 +847,11 @@ LoadShardPlacement(uint64 shardId, uint64 placementId)
ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId);
CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry;
int shardIndex = shardIdEntry->shardIndex;
ShardIntervalArrayType arrayType = shardIdEntry->arrayType;
GroupShardPlacement *groupPlacement = LoadGroupShardPlacement(shardId, placementId);
ShardPlacement *nodePlacement = ResolveGroupShardPlacement(groupPlacement,
tableEntry, shardIndex);
tableEntry, arrayType, shardIndex);
return nodePlacement;
}
@ -829,10 +872,26 @@ ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, uint64 shardId)
ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId);
CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry;
int shardIndex = shardIdEntry->shardIndex;
GroupShardPlacement *placementArray =
tableEntry->arrayOfPlacementArrays[shardIndex];
int numberOfPlacements =
tableEntry->arrayOfPlacementArrayLengths[shardIndex];
ShardIntervalArrayType arrayType = shardIdEntry->arrayType;
GroupShardPlacement *placementArray = NULL;
int numberOfPlacements = 0;
if (arrayType == ACTIVE_SHARD_ARRAY)
{
/* the offset better be in a valid range */
Assert(shardIndex < tableEntry->shardIntervalArrayLength);
placementArray = tableEntry->arrayOfPlacementArrays[shardIndex];
numberOfPlacements =
tableEntry->arrayOfPlacementArrayLengths[shardIndex];
}
else if (arrayType == TO_DELETE_SHARD_ARRAY)
{
/* the offset better be in a valid range */
Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength);
placementArray = tableEntry->arrayOfOrphanedPlacementArrays[shardIndex];
numberOfPlacements =
tableEntry->arrayOfOrphanedPlacementArrayLengths[shardIndex];
}
for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++)
{
@ -840,7 +899,7 @@ ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, uint64 shardId)
if (placement->groupId == groupId)
{
placementOnNode = ResolveGroupShardPlacement(placement, tableEntry,
shardIndex);
arrayType, shardIndex);
break;
}
}
@ -878,9 +937,19 @@ ActiveShardPlacementOnGroup(int32 groupId, uint64 shardId)
static ShardPlacement *
ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
CitusTableCacheEntry *tableEntry,
ShardIntervalArrayType arrayType,
int shardIndex)
{
ShardInterval *shardInterval = tableEntry->sortedShardIntervalArray[shardIndex];
ShardInterval *shardInterval = NULL;
if (arrayType == ACTIVE_SHARD_ARRAY)
{
shardInterval = tableEntry->sortedShardIntervalArray[shardIndex];
}
else if (arrayType == TO_DELETE_SHARD_ARRAY)
{
shardInterval = tableEntry->sortedOrphanedShardIntervalArray[shardIndex];
}
ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement);
int32 groupId = groupShardPlacement->groupId;
@ -1049,20 +1118,31 @@ ShardPlacementListIncludingOrphanedPlacements(uint64 shardId)
ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId);
CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry;
int shardIndex = shardIdEntry->shardIndex;
ShardIntervalArrayType arrayType = shardIdEntry->arrayType;
/* the offset better be in a valid range */
Assert(shardIndex < tableEntry->shardIntervalArrayLength);
GroupShardPlacement *placementArray =
tableEntry->arrayOfPlacementArrays[shardIndex];
int numberOfPlacements =
tableEntry->arrayOfPlacementArrayLengths[shardIndex];
GroupShardPlacement *placementArray = NULL;
int numberOfPlacements = 0;
if (arrayType == ACTIVE_SHARD_ARRAY)
{
/* the offset better be in a valid range */
Assert(shardIndex < tableEntry->shardIntervalArrayLength);
placementArray = tableEntry->arrayOfPlacementArrays[shardIndex];
numberOfPlacements = tableEntry->arrayOfPlacementArrayLengths[shardIndex];
}
else if (arrayType == TO_DELETE_SHARD_ARRAY)
{
/* the offset better be in a valid range */
Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength);
placementArray = tableEntry->arrayOfOrphanedPlacementArrays[shardIndex];
numberOfPlacements = tableEntry->arrayOfOrphanedPlacementArrayLengths[shardIndex];
}
for (int i = 0; i < numberOfPlacements; i++)
{
GroupShardPlacement *groupShardPlacement = &placementArray[i];
ShardPlacement *shardPlacement = ResolveGroupShardPlacement(groupShardPlacement,
tableEntry,
arrayType,
shardIndex);
placementList = lappend(placementList, shardPlacement);
@ -1540,7 +1620,7 @@ BuildCitusTableCacheEntry(Oid relationId)
cacheEntry->hashFunction = hashFunction;
/* check the shard distribution for hash partitioned tables */
/* check the shard distribution of ACTIVE shards for hash partitioned tables */
cacheEntry->hasUniformHashDistribution =
HasUniformHashDistribution(cacheEntry->sortedShardIntervalArray,
cacheEntry->shardIntervalArrayLength);
@ -1574,8 +1654,11 @@ BuildCitusTableCacheEntry(Oid relationId)
static void
BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
{
List *shardIntervalList = NULL;
ShardInterval **shardIntervalArray = NULL;
ShardInterval **shardIntervalToDeleteArray = NULL;
ShardInterval **sortedShardIntervalArray = NULL;
ShardInterval **sortedOrphanedShardIntervalArray = NULL;
FmgrInfo *shardIntervalCompareFunction = NULL;
FmgrInfo *shardColumnCompareFunction = NULL;
Oid columnTypeId = InvalidOid;
@ -1592,24 +1675,13 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
List *distShardTupleList = LookupDistShardTuples(cacheEntry->relationId);
int shardIntervalArrayLength = list_length(distShardTupleList);
int activeShardCount = 0;
int orphanedShardCount = 0;
if (shardIntervalArrayLength > 0)
{
Relation distShardRelation = table_open(DistShardRelationId(), AccessShareLock);
TupleDesc distShardTupleDesc = RelationGetDescr(distShardRelation);
int arrayIndex = 0;
shardIntervalArray = MemoryContextAllocZero(MetadataCacheMemoryContext,
shardIntervalArrayLength *
sizeof(ShardInterval *));
cacheEntry->arrayOfPlacementArrays =
MemoryContextAllocZero(MetadataCacheMemoryContext,
shardIntervalArrayLength *
sizeof(GroupShardPlacement *));
cacheEntry->arrayOfPlacementArrayLengths =
MemoryContextAllocZero(MetadataCacheMemoryContext,
shardIntervalArrayLength *
sizeof(int));
HeapTuple shardTuple = NULL;
foreach_ptr(shardTuple, distShardTupleList)
@ -1618,18 +1690,71 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
distShardTupleDesc,
intervalTypeId,
intervalTypeMod);
MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
shardIntervalArray[arrayIndex] = CopyShardInterval(shardInterval);
Assert(shardInterval->shardState == SHARD_STATE_ACTIVE ||
shardInterval->shardState == SHARD_STATE_TO_DELETE);
MemoryContextSwitchTo(oldContext);
if (shardInterval->shardState == SHARD_STATE_ACTIVE)
{
activeShardCount++;
}
else
{
orphanedShardCount++;
}
shardIntervalList = lappend(shardIntervalList, shardInterval);
heap_freetuple(shardTuple);
arrayIndex++;
}
table_close(distShardRelation, AccessShareLock);
shardIntervalArray = MemoryContextAllocZero(MetadataCacheMemoryContext,
activeShardCount *
sizeof(ShardInterval *));
shardIntervalToDeleteArray = MemoryContextAllocZero(MetadataCacheMemoryContext,
orphanedShardCount *
sizeof(ShardInterval *));
cacheEntry->arrayOfPlacementArrays =
MemoryContextAllocZero(MetadataCacheMemoryContext,
activeShardCount *
sizeof(GroupShardPlacement *));
cacheEntry->arrayOfPlacementArrayLengths =
MemoryContextAllocZero(MetadataCacheMemoryContext,
activeShardCount *
sizeof(int));
cacheEntry->arrayOfOrphanedPlacementArrays =
MemoryContextAllocZero(MetadataCacheMemoryContext,
orphanedShardCount *
sizeof(GroupShardPlacement *));
cacheEntry->arrayOfOrphanedPlacementArrayLengths =
MemoryContextAllocZero(MetadataCacheMemoryContext,
orphanedShardCount *
sizeof(int));
}
int arrayIndexOne = 0;
int arrayIndexTwo = 0;
ShardInterval *interval = NULL;
foreach_ptr(interval, shardIntervalList)
{
MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
ShardInterval *copiedShardInterval = CopyShardInterval(interval);
if (interval->shardState == SHARD_STATE_ACTIVE)
{
shardIntervalArray[arrayIndexOne] = copiedShardInterval;
arrayIndexOne++;
}
else
{
shardIntervalToDeleteArray[arrayIndexTwo] = copiedShardInterval;
arrayIndexTwo++;
}
MemoryContextSwitchTo(oldContext);
}
/* look up value comparison function */
@ -1683,12 +1808,19 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
/* since there is a zero or one shard, it is already sorted */
sortedShardIntervalArray = shardIntervalArray;
sortedOrphanedShardIntervalArray = shardIntervalToDeleteArray;
}
else
{
/* sort the interval array */
sortedShardIntervalArray = SortShardIntervalArray(shardIntervalArray,
shardIntervalArrayLength,
activeShardCount,
cacheEntry->partitionColumn->
varcollid,
shardIntervalCompareFunction);
sortedOrphanedShardIntervalArray = SortShardIntervalArray(shardIntervalToDeleteArray,
orphanedShardCount,
cacheEntry->partitionColumn->
varcollid,
shardIntervalCompareFunction);
@ -1715,12 +1847,35 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
}
cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray;
cacheEntry->shardIntervalArrayLength = 0;
cacheEntry->sortedOrphanedShardIntervalArray = sortedOrphanedShardIntervalArray;
/* initialize to zero and increment as we go */
cacheEntry->shardIntervalArrayLength = 0;
cacheEntry->orphanedShardIntervalArrayLength = 0;
/* maintain shardId->(table,ShardInterval) cache and placement list. */
BuildShardIdCacheAndPlacementList(sortedShardIntervalArray, activeShardCount,
ACTIVE_SHARD_ARRAY, cacheEntry);
BuildShardIdCacheAndPlacementList(sortedOrphanedShardIntervalArray, orphanedShardCount,
TO_DELETE_SHARD_ARRAY, cacheEntry);
cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction;
cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction;
}
/*
* Maintain shardId->(table,ShardInterval) cache and placement list.
*/
void
BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayLength,
ShardIntervalArrayType arrayType, CitusTableCacheEntry *cacheEntry)
{
/* maintain shardId->(table,ShardInterval) cache */
for (int shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++)
for (int shardIndex = 0; shardIndex < arrayLength; shardIndex++)
{
ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex];
ShardInterval *shardInterval = shardIntervalArray[shardIndex];
int64 shardId = shardInterval->shardId;
int placementOffset = 0;
@ -1733,13 +1888,21 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
shardIdCacheEntry->tableEntry = cacheEntry;
shardIdCacheEntry->shardIndex = shardIndex;
shardIdCacheEntry->arrayType = arrayType;
/*
* We should increment this only after we are sure this hasn't already
* been assigned to any other relations. ResetCitusTableCacheEntry()
* depends on this.
*/
cacheEntry->shardIntervalArrayLength++;
if(arrayType == ACTIVE_SHARD_ARRAY)
{
cacheEntry->shardIntervalArrayLength++;
}
else if(arrayType == TO_DELETE_SHARD_ARRAY)
{
cacheEntry->orphanedShardIntervalArrayLength++;
}
/* build list of shard placements */
List *placementList = BuildShardPlacementList(shardId);
@ -1763,9 +1926,6 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
/* store the shard index in the ShardInterval */
shardInterval->shardIndex = shardIndex;
}
cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction;
cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction;
}
@ -1902,11 +2062,7 @@ HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
curShardInterval->minValue);
comparisonResult = DatumGetInt32(comparisonDatum);
// If one of the shards are marked as TO_DELETED, ignore the overlap.
bool markedForDelete = (lastShardInterval->shardState == SHARD_STATE_TO_DELETE ||
curShardInterval->shardState == SHARD_STATE_TO_DELETE);
if (!markedForDelete && comparisonResult >= 0)
if (comparisonResult >= 0)
{
return true;
}
@ -3971,6 +4127,49 @@ RegisterAuthinfoCacheCallbacks(void)
}
/*
*
*/
static void
FreeCitusTableCacheShardAndPlacementEntryFromArray(
ShardInterval **shardIntervalArray,
int shardIntervalArrayLength,
GroupShardPlacement **arrayOfPlacementArrays,
int *arrayOfPlacementArrayLengths)
{
for (int shardIndex = 0; shardIndex < shardIntervalArrayLength;
shardIndex++)
{
ShardInterval *shardInterval = shardIntervalArray[shardIndex];
GroupShardPlacement *placementArray = arrayOfPlacementArrays[shardIndex];
bool valueByVal = shardInterval->valueByVal;
/* delete the shard's placements */
if (placementArray != NULL)
{
pfree(placementArray);
}
/* delete data pointed to by ShardInterval */
if (!valueByVal)
{
if (shardInterval->minValueExists)
{
pfree(DatumGetPointer(shardInterval->minValue));
}
if (shardInterval->maxValueExists)
{
pfree(DatumGetPointer(shardInterval->maxValue));
}
}
/* and finally the ShardInterval itself */
pfree(shardInterval);
}
}
/*
* ResetCitusTableCacheEntry frees any out-of-band memory used by a cache entry,
* but does not free the entry itself.
@ -4008,45 +4207,31 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
}
/* clean up ShardIdCacheHash */
RemoveStaleShardIdCacheEntries(cacheEntry);
RemoveStaleShardIdCacheEntries(cacheEntry, cacheEntry->sortedShardIntervalArray, cacheEntry->shardIntervalArrayLength);
RemoveStaleShardIdCacheEntries(cacheEntry, cacheEntry->sortedOrphanedShardIntervalArray, cacheEntry->orphanedShardIntervalArrayLength);
for (int shardIndex = 0; shardIndex < cacheEntry->shardIntervalArrayLength;
shardIndex++)
{
ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex];
GroupShardPlacement *placementArray =
cacheEntry->arrayOfPlacementArrays[shardIndex];
bool valueByVal = shardInterval->valueByVal;
FreeCitusTableCacheShardAndPlacementEntryFromArray(
cacheEntry->sortedShardIntervalArray,
cacheEntry->shardIntervalArrayLength,
cacheEntry->arrayOfPlacementArrays,
cacheEntry->arrayOfPlacementArrayLengths);
/* delete the shard's placements */
if (placementArray != NULL)
{
pfree(placementArray);
}
/* delete data pointed to by ShardInterval */
if (!valueByVal)
{
if (shardInterval->minValueExists)
{
pfree(DatumGetPointer(shardInterval->minValue));
}
if (shardInterval->maxValueExists)
{
pfree(DatumGetPointer(shardInterval->maxValue));
}
}
/* and finally the ShardInterval itself */
pfree(shardInterval);
}
FreeCitusTableCacheShardAndPlacementEntryFromArray(
cacheEntry->sortedOrphanedShardIntervalArray,
cacheEntry->orphanedShardIntervalArrayLength,
cacheEntry->arrayOfOrphanedPlacementArrays,
cacheEntry->arrayOfOrphanedPlacementArrayLengths);
if (cacheEntry->sortedShardIntervalArray)
{
pfree(cacheEntry->sortedShardIntervalArray);
cacheEntry->sortedShardIntervalArray = NULL;
}
if (cacheEntry->sortedOrphanedShardIntervalArray)
{
pfree(cacheEntry->sortedOrphanedShardIntervalArray);
cacheEntry->sortedOrphanedShardIntervalArray = NULL;
}
if (cacheEntry->arrayOfPlacementArrayLengths)
{
pfree(cacheEntry->arrayOfPlacementArrayLengths);
@ -4057,6 +4242,16 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
pfree(cacheEntry->arrayOfPlacementArrays);
cacheEntry->arrayOfPlacementArrays = NULL;
}
if (cacheEntry->arrayOfOrphanedPlacementArrayLengths)
{
pfree(cacheEntry->arrayOfOrphanedPlacementArrayLengths);
cacheEntry->arrayOfOrphanedPlacementArrayLengths = NULL;
}
if (cacheEntry->arrayOfOrphanedPlacementArrays)
{
pfree(cacheEntry->arrayOfOrphanedPlacementArrays);
cacheEntry->arrayOfOrphanedPlacementArrays = NULL;
}
if (cacheEntry->referencedRelationsViaForeignKey)
{
list_free(cacheEntry->referencedRelationsViaForeignKey);
@ -4069,6 +4264,7 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
}
cacheEntry->shardIntervalArrayLength = 0;
cacheEntry->orphanedShardIntervalArrayLength = 0;
cacheEntry->hasUninitializedShardInterval = false;
cacheEntry->hasUniformHashDistribution = false;
cacheEntry->hasOverlappingShardInterval = false;
@ -4084,15 +4280,16 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
* we leave it in place.
*/
static void
RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *invalidatedTableEntry)
RemoveStaleShardIdCacheEntries(
CitusTableCacheEntry *invalidatedTableEntry,
ShardInterval **sortedShardIntervalArray,
int shardIntervalArrayLength)
{
int shardIndex = 0;
int shardCount = invalidatedTableEntry->shardIntervalArrayLength;
for (shardIndex = 0; shardIndex < shardCount; shardIndex++)
for (shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++)
{
ShardInterval *shardInterval =
invalidatedTableEntry->sortedShardIntervalArray[shardIndex];
ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex];
int64 shardId = shardInterval->shardId;
bool foundInCache = false;

View File

@ -856,8 +856,12 @@ CitusTableMetadataCreateCommandList(Oid relationId)
char *metadataCommand = DistributionCreateCommand(cacheEntry);
commandList = lappend(commandList, metadataCommand);
/* commands to insert pg_dist_shard & pg_dist_placement entries */
List *shardIntervalList = LoadShardIntervalList(relationId);
/* Commands to insert pg_dist_shard & pg_dist_placement entries
* Propoagate orphaned shards as well to have consistent behavior
* given all workers have the same metadata. The information is not
* strictly neeeded and will be deleted by cleaner anyways.
*/
List *shardIntervalList = LoadShardIntervalListWithOrphanedShards(relationId);
List *shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList);
commandList = list_concat(commandList, shardMetadataInsertCommandList);

View File

@ -1068,11 +1068,66 @@ LoadShardIntervalList(Oid relationId)
return shardList;
}
/*
* LoadShardIntervalListWithOrphanedShards returns a list of shard intervals related for a given
* distributed table. This includes both ACTIVE and TO_DELETE shards.
* The function returns an empty list if no shards can be found for the given relation.
* Since LoadShardIntervalListWithOrphanedShards relies on merging two sorted lists, it returns
* list where elements are sorted on shardminvalue. Shard intervals with uninitialized
* shard min/max values are placed in the end of the list.
*/
List * LoadShardIntervalListWithOrphanedShards(Oid relationId)
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
List *shardList = NIL;
int indexOne = 0;
int indexTwo = 0;
SortShardIntervalContext sortContext = {
.comparisonFunction = cacheEntry->shardIntervalCompareFunction,
.collation = cacheEntry->partitionColumn->varcollid
};
while(indexOne < cacheEntry->shardIntervalArrayLength && indexTwo < cacheEntry->orphanedShardIntervalArrayLength)
{
ShardInterval *leftInterval = cacheEntry->sortedShardIntervalArray[indexOne];
ShardInterval *rightInterval = cacheEntry->sortedOrphanedShardIntervalArray[indexTwo];
int cmp = CompareShardIntervals(leftInterval, rightInterval, &sortContext);
if (cmp <= 0)
{
shardList = lappend(shardList, leftInterval);
indexOne++;
}
else
{
shardList = lappend(shardList, rightInterval);
indexTwo++;
}
}
while (indexOne < cacheEntry->shardIntervalArrayLength)
{
shardList = lappend(shardList, cacheEntry->sortedShardIntervalArray[indexOne]);
indexOne++;
}
while (indexTwo < cacheEntry->orphanedShardIntervalArrayLength)
{
shardList = lappend(shardList, cacheEntry->sortedOrphanedShardIntervalArray[indexTwo]);
indexTwo++;
}
return shardList;
}
/*
* LoadUnsortedShardIntervalListViaCatalog returns a list of shard intervals related for a
* given distributed table. The function returns an empty list if no shards can be found
* for the given relation.
* given distributed table.
* The function returns both ACTIVE and TO_DELETE (orphaned) shards in the list.
* The function returns an empty list if no shards can be found for the given relation.
*
* This function does not use CitusTableCache and instead reads from catalog tables
* directly.
@ -1121,6 +1176,7 @@ LoadShardIntervalWithLongestShardName(Oid relationId)
int maxShardIndex = shardIntervalCount - 1;
uint64 largestShardId = INVALID_SHARD_ID;
/* Given ACTIVE split shards have higher sequence number, we don't need to look into orphaned shards */
for (int shardIndex = 0; shardIndex <= maxShardIndex; ++shardIndex)
{
ShardInterval *currentShardInterval =
@ -1149,6 +1205,21 @@ ShardIntervalCount(Oid relationId)
}
/*
* ShardIntervalCount returns number of shard intervals for a given distributed table.
* This includes both ACTIVE and TO_DELETE shards.
* The function returns 0 if no shards can be found for the given relation id.
*/
int
ShardIntervalCountWithOrphanedShards(Oid relationId)
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
return cacheEntry->shardIntervalArrayLength +
cacheEntry->orphanedShardIntervalArrayLength;
}
/*
* LoadShardList reads list of shards for given relationId from pg_dist_shard,
* and returns the list of found shardIds.

View File

@ -4553,6 +4553,7 @@ GenerateSyntheticShardIntervalArray(int partitionCount)
shardInterval->shardId = INVALID_SHARD_ID;
shardInterval->valueTypeId = INT4OID;
shardInterval->shardState = SHARD_STATE_INVALID_FIRST;
shardIntervalArray[shardIndex] = shardInterval;
}

View File

@ -94,7 +94,7 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString,
);
extern void MarkInvalidateForeignKeyGraph(void);
extern void InvalidateForeignKeyGraphForDDL(void);
extern List * DDLTaskList(Oid relationId, const char *commandString);
extern List * DDLTaskList(Oid relationId, const char *commandString, bool includeOrphanedShards);
extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands);
extern bool AlterTableInProgress(void);
extern bool DropSchemaOrDBInProgress(void);

View File

@ -66,9 +66,15 @@ typedef struct
bool autoConverted; /* table auto-added to metadata, valid for citus local tables */
/* pg_dist_shard metadata (variable-length ShardInterval array) for this table */
/* The list includes only ACTIVE shards */
int shardIntervalArrayLength;
ShardInterval **sortedShardIntervalArray;
/* pg_dist_shard metadata (variable-length ShardInterval array) for this table */
/* The list includes only TO_DELETE shards */
int orphanedShardIntervalArrayLength;
ShardInterval **sortedOrphanedShardIntervalArray;
/* comparator for partition column's type, NULL if DISTRIBUTE_BY_NONE */
FmgrInfo *shardColumnCompareFunction;
@ -96,6 +102,11 @@ typedef struct
/* pg_dist_placement metadata */
GroupShardPlacement **arrayOfPlacementArrays;
int *arrayOfPlacementArrayLengths;
/* pg_dist_placement metadata */
/* The list includes only TO_DELETE shards */
GroupShardPlacement **arrayOfOrphanedPlacementArrays;
int *arrayOfOrphanedPlacementArrayLengths;
} CitusTableCacheEntry;
typedef struct DistObjectCacheEntryKey
@ -143,7 +154,6 @@ extern List * AllCitusTableIds(void);
extern bool IsCitusTableType(Oid relationId, CitusTableType tableType);
extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry,
CitusTableType tableType);
extern void SetCreateCitusTransactionLevel(int val);
extern int GetCitusCreationLevel(void);
extern bool IsCitusTable(Oid relationId);

View File

@ -212,11 +212,15 @@ extern Datum citus_relation_size(PG_FUNCTION_ARGS);
/* Function declarations to read shard and shard placement data */
extern uint32 TableShardReplicationFactor(Oid relationId);
extern List * LoadShardIntervalList(Oid relationId);
extern List * LoadShardIntervalListWithOrphanedShards(Oid relationId);
extern List * LoadUnsortedShardIntervalListViaCatalog(Oid relationId);
extern ShardInterval * LoadShardIntervalWithLongestShardName(Oid relationId);
extern int ShardIntervalCount(Oid relationId);
extern int ShardIntervalCountWithOrphanedShards(Oid relationId);
extern List * LoadShardList(Oid relationId);
extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval);
extern uint64 ShardLength(uint64 shardId);
extern bool NodeGroupHasShardPlacements(int32 groupId,

View File

@ -47,6 +47,8 @@ extern int CompareRelationShards(const void *leftElement,
const void *rightElement);
extern int ShardIndex(ShardInterval *shardInterval);
extern int CalculateUniformHashRangeIndex(int hashedValue, int shardCount);
// TODO(niupre): Review API.
extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
CitusTableCacheEntry *cacheEntry);
extern int FindShardIntervalIndex(Datum searchedValue, CitusTableCacheEntry *cacheEntry);
@ -54,6 +56,7 @@ extern int SearchCachedShardInterval(Datum partitionColumnValue,
ShardInterval **shardIntervalCache,
int shardCount, Oid shardIntervalCollation,
FmgrInfo *compareFunction);
extern bool SingleReplicatedTable(Oid relationId);