More fixes and comments

niupre/DeferredDrop
Nitish Upreti 2022-08-13 09:06:47 -07:00
parent 47814141fa
commit 96b389d8ef
7 changed files with 39 additions and 33 deletions

View File

@ -816,9 +816,9 @@ LoadGroupShardPlacement(uint64 shardId, uint64 placementId)
{ {
/* the offset better be in a valid range */ /* the offset better be in a valid range */
Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength); Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength);
placementArray = tableEntry->arrayOfOrphanedPlacementArrays[shardIndex]; placementArray = tableEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex];
numberOfPlacements = numberOfPlacements =
tableEntry->arrayOfOrphanedPlacementArrayLengths[shardIndex]; tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex];
} }
for (int i = 0; i < numberOfPlacements; i++) for (int i = 0; i < numberOfPlacements; i++)
@ -888,9 +888,9 @@ ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, uint64 shardId)
{ {
/* the offset better be in a valid range */ /* the offset better be in a valid range */
Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength); Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength);
placementArray = tableEntry->arrayOfOrphanedPlacementArrays[shardIndex]; placementArray = tableEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex];
numberOfPlacements = numberOfPlacements =
tableEntry->arrayOfOrphanedPlacementArrayLengths[shardIndex]; tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex];
} }
for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++) for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++)
@ -1133,8 +1133,8 @@ ShardPlacementListIncludingOrphanedPlacements(uint64 shardId)
{ {
/* the offset better be in a valid range */ /* the offset better be in a valid range */
Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength); Assert(shardIndex < tableEntry->orphanedShardIntervalArrayLength);
placementArray = tableEntry->arrayOfOrphanedPlacementArrays[shardIndex]; placementArray = tableEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex];
numberOfPlacements = tableEntry->arrayOfOrphanedPlacementArrayLengths[shardIndex]; numberOfPlacements = tableEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex];
} }
for (int i = 0; i < numberOfPlacements; i++) for (int i = 0; i < numberOfPlacements; i++)
@ -1726,11 +1726,11 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
activeShardCount * activeShardCount *
sizeof(int)); sizeof(int));
cacheEntry->arrayOfOrphanedPlacementArrays = cacheEntry->arrayOfOrphanedShardsPlacementArrays =
MemoryContextAllocZero(MetadataCacheMemoryContext, MemoryContextAllocZero(MetadataCacheMemoryContext,
orphanedShardCount * orphanedShardCount *
sizeof(GroupShardPlacement *)); sizeof(GroupShardPlacement *));
cacheEntry->arrayOfOrphanedPlacementArrayLengths = cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths =
MemoryContextAllocZero(MetadataCacheMemoryContext, MemoryContextAllocZero(MetadataCacheMemoryContext,
orphanedShardCount * orphanedShardCount *
sizeof(int)); sizeof(int));
@ -1920,8 +1920,16 @@ BuildShardIdCacheAndPlacementList(ShardInterval **shardIntervalArray, int arrayL
} }
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
cacheEntry->arrayOfPlacementArrays[shardIndex] = placementArray; if(arrayType == ACTIVE_SHARD_ARRAY)
cacheEntry->arrayOfPlacementArrayLengths[shardIndex] = numberOfPlacements; {
cacheEntry->arrayOfPlacementArrays[shardIndex] = placementArray;
cacheEntry->arrayOfPlacementArrayLengths[shardIndex] = numberOfPlacements;
}
else
{
cacheEntry->arrayOfOrphanedShardsPlacementArrays[shardIndex] = placementArray;
cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths[shardIndex] = numberOfPlacements;
}
/* store the shard index in the ShardInterval */ /* store the shard index in the ShardInterval */
shardInterval->shardIndex = shardIndex; shardInterval->shardIndex = shardIndex;
@ -4219,8 +4227,8 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
FreeCitusTableCacheShardAndPlacementEntryFromArray( FreeCitusTableCacheShardAndPlacementEntryFromArray(
cacheEntry->sortedOrphanedShardIntervalArray, cacheEntry->sortedOrphanedShardIntervalArray,
cacheEntry->orphanedShardIntervalArrayLength, cacheEntry->orphanedShardIntervalArrayLength,
cacheEntry->arrayOfOrphanedPlacementArrays, cacheEntry->arrayOfOrphanedShardsPlacementArrays,
cacheEntry->arrayOfOrphanedPlacementArrayLengths); cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths);
if (cacheEntry->sortedShardIntervalArray) if (cacheEntry->sortedShardIntervalArray)
{ {
@ -4242,15 +4250,15 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
pfree(cacheEntry->arrayOfPlacementArrays); pfree(cacheEntry->arrayOfPlacementArrays);
cacheEntry->arrayOfPlacementArrays = NULL; cacheEntry->arrayOfPlacementArrays = NULL;
} }
if (cacheEntry->arrayOfOrphanedPlacementArrayLengths) if (cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths)
{ {
pfree(cacheEntry->arrayOfOrphanedPlacementArrayLengths); pfree(cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths);
cacheEntry->arrayOfOrphanedPlacementArrayLengths = NULL; cacheEntry->arrayOfOrphanedShardsPlacementArrayLengths = NULL;
} }
if (cacheEntry->arrayOfOrphanedPlacementArrays) if (cacheEntry->arrayOfOrphanedShardsPlacementArrays)
{ {
pfree(cacheEntry->arrayOfOrphanedPlacementArrays); pfree(cacheEntry->arrayOfOrphanedShardsPlacementArrays);
cacheEntry->arrayOfOrphanedPlacementArrays = NULL; cacheEntry->arrayOfOrphanedShardsPlacementArrays = NULL;
} }
if (cacheEntry->referencedRelationsViaForeignKey) if (cacheEntry->referencedRelationsViaForeignKey)
{ {

View File

@ -631,8 +631,8 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
/* /*
* GroupShardPlacementsForTableOnGroup accepts a relationId and a group and returns a list * GroupShardPlacementsForTableOnGroup accepts a relationId and a group and returns a list
* of GroupShardPlacement's representing all of the placements for the table which reside * of GroupShardPlacement's representing all of the active shard's placements for the table
* on the group. * which reside on the group.
*/ */
List * List *
GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId) GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId)
@ -666,7 +666,7 @@ GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId)
/* /*
* ShardIntervalsOnWorkerGroup accepts a WorkerNode and returns a list of the shard * ShardIntervalsOnWorkerGroup accepts a WorkerNode and returns a list of active shard
* intervals of the given table which are placed on the group the node is a part of. * intervals of the given table which are placed on the group the node is a part of.
*/ */
static List * static List *

View File

@ -235,7 +235,7 @@ PrunedShardIdsForTable(Oid distributedTableId, List *whereClauseList)
/* /*
* SortedShardIntervalArray simply returns the shard interval ids in the sorted shard * SortedShardIntervalArray simply returns active shard interval ids in the sorted shard
* interval cache as a datum array. * interval cache as a datum array.
*/ */
static ArrayType * static ArrayType *

View File

@ -984,7 +984,7 @@ ColocationGroupTableList(uint32 colocationId, uint32 count)
/* /*
* ColocatedShardIntervalList function returns list of shard intervals which are * ColocatedShardIntervalList function returns list of active shard intervals which are
* co-located with given shard. If given shard is belong to append or range distributed * co-located with given shard. If given shard is belong to append or range distributed
* table, co-location is not valid for that shard. Therefore such shard is only co-located * table, co-location is not valid for that shard. Therefore such shard is only co-located
* with itself. * with itself.
@ -1045,7 +1045,7 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
/* /*
* ColocatedNonPartitionShardIntervalList function returns list of shard intervals * ColocatedNonPartitionShardIntervalList function returns list of active shard intervals
* which are co-located with given shard, except partitions. If given shard is belong * which are co-located with given shard, except partitions. If given shard is belong
* to append or range distributed table, co-location is not valid for that shard. * to append or range distributed table, co-location is not valid for that shard.
* Therefore such shard is only co-located with itself. * Therefore such shard is only co-located with itself.
@ -1187,7 +1187,7 @@ ColocatedTableId(Oid colocationId)
/* /*
* ColocatedShardIdInRelation returns shardId of the shard from given relation, so that * ColocatedShardIdInRelation returns shardId of an active shard from given relation, so that
* returned shard is co-located with given shard. * returned shard is co-located with given shard.
*/ */
uint64 uint64

View File

@ -28,7 +28,7 @@
/* /*
* SortedShardIntervalArray sorts the input shardIntervalArray. Shard intervals with * SortShardIntervalArray sorts the input shardIntervalArray. Shard intervals with
* no min/max values are placed at the end of the array. * no min/max values are placed at the end of the array.
*/ */
ShardInterval ** ShardInterval **
@ -251,7 +251,7 @@ ShardIndex(ShardInterval *shardInterval)
/* /*
* FindShardInterval finds a single shard interval in the cache for the * FindShardInterval finds a single active shard interval in the cache for the
* given partition column value. Note that reference tables do not have * given partition column value. Note that reference tables do not have
* partition columns, thus, pass partitionColumnValue and compareFunction * partition columns, thus, pass partitionColumnValue and compareFunction
* as NULL for them. * as NULL for them.
@ -280,7 +280,7 @@ FindShardInterval(Datum partitionColumnValue, CitusTableCacheEntry *cacheEntry)
/* /*
* FindShardIntervalIndex finds the index of the shard interval which covers * FindShardIntervalIndex finds the index of an active shard interval which covers
* the searched value. Note that the searched value must be the hashed value * the searched value. Note that the searched value must be the hashed value
* of the original value if the distribution method is hash. * of the original value if the distribution method is hash.
* *

View File

@ -104,9 +104,9 @@ typedef struct
int *arrayOfPlacementArrayLengths; int *arrayOfPlacementArrayLengths;
/* pg_dist_placement metadata */ /* pg_dist_placement metadata */
/* The list includes only TO_DELETE shards */ /* The list includes only TO_DELETE shard's placements */
GroupShardPlacement **arrayOfOrphanedPlacementArrays; GroupShardPlacement **arrayOfOrphanedShardsPlacementArrays;
int *arrayOfOrphanedPlacementArrayLengths; int *arrayOfOrphanedShardsPlacementArrayLengths;
} CitusTableCacheEntry; } CitusTableCacheEntry;
typedef struct DistObjectCacheEntryKey typedef struct DistObjectCacheEntryKey

View File

@ -47,8 +47,6 @@ extern int CompareRelationShards(const void *leftElement,
const void *rightElement); const void *rightElement);
extern int ShardIndex(ShardInterval *shardInterval); extern int ShardIndex(ShardInterval *shardInterval);
extern int CalculateUniformHashRangeIndex(int hashedValue, int shardCount); extern int CalculateUniformHashRangeIndex(int hashedValue, int shardCount);
// TODO(niupre): Review API.
extern ShardInterval * FindShardInterval(Datum partitionColumnValue, extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
CitusTableCacheEntry *cacheEntry); CitusTableCacheEntry *cacheEntry);
extern int FindShardIntervalIndex(Datum searchedValue, CitusTableCacheEntry *cacheEntry); extern int FindShardIntervalIndex(Datum searchedValue, CitusTableCacheEntry *cacheEntry);