Improve performance of metadata cache (#3924)

#3866 removed the shard ID hash in metadata_cache.c to simplify cache management, 
but we observed a significant performance regression that was being masked by the
performance improvement provided by #3654 in our benchmarks, but #3654 only 
applies to specific workloads.

This PR brings back the shard ID cache as it existed before #3866 with some extra
 measures to handle invalidation. When we load a table entry, we overwrite 
ShardIdCacheEntry->tableEntry pointers for all the shards in that table, though 
it's possible that the table no longer contains the old shard ID or the table 
entry is never reloaded, which would leave a dangling pointer once the table 
entry is freed. To handle that case, we remove all shard ID cache entries that 
point exactly to that table entry when a table is freed (at the end of the 
transaction or any call to CitusTableCacheFlushInvalidatedEntries).

Co-authored-by: SaitTalhaNisanci <s.talhanisanci@gmail.com>
Co-authored-by: Marco Slot <marco.slot@gmail.com>
Co-authored-by: Jelte Fennema <github-tech@jeltef.nl>
pull/3956/head^2
Marco Slot 2020-06-30 12:10:10 +02:00 committed by GitHub
parent 02fa942be1
commit 634d6cf9d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 283 additions and 160 deletions

View File

@ -213,6 +213,11 @@ CitusExecutorRun(QueryDesc *queryDesc,
if (ExecutorLevel == 0 && PlannerLevel == 0)
{
/*
* We are leaving Citus code so no one should have any references to
* cache entries. Release them now to not hold onto memory in long
* transactions.
*/
CitusTableCacheFlushInvalidatedEntries();
}
}
@ -225,11 +230,6 @@ CitusExecutorRun(QueryDesc *queryDesc,
ExecutorLevel--;
if (ExecutorLevel == 0 && PlannerLevel == 0)
{
CitusTableCacheFlushInvalidatedEntries();
}
PG_RE_THROW();
}
PG_END_TRY();

View File

@ -90,18 +90,34 @@ typedef struct CitusTableCacheEntrySlot
{
/* lookup key - must be first. A pg_class.oid oid. */
Oid relationId;
CitusTableCacheEntry *data;
/* Citus table metadata (NULL for local tables) */
CitusTableCacheEntry *citusTableMetadata;
/*
* If isValid is false, we need to recheck whether the relation ID
* belongs to a Citus or not.
*/
bool isValid;
} CitusTableCacheEntrySlot;
/*
* ShardIdIndexSlot is entry type for CitusTableCacheEntry's ShardIdIndexHash.
* ShardIdCacheEntry is the entry type for ShardIdCacheHash.
*
* This should never be used outside of this file. Use ShardInterval instead.
*/
typedef struct ShardIdIndexSlot
typedef struct ShardIdCacheEntry
{
/* hash key, needs to be first */
uint64 shardId;
/* pointer to the table entry to which this shard currently belongs */
CitusTableCacheEntry *tableEntry;
/* index of the shard interval in the sortedShardIntervalArray of the table entry */
int shardIndex;
} ShardIdIndexSlot;
} ShardIdCacheEntry;
/*
@ -167,6 +183,8 @@ static HTAB *DistTableCacheHash = NULL;
static List *DistTableCacheExpired = NIL;
/* Hash table for informations about each shard */
static HTAB *ShardIdCacheHash = NULL;
static MemoryContext MetadataCacheMemoryContext = NULL;
/* Hash table for information about each object */
@ -189,12 +207,9 @@ static ScanKeyData DistObjectScanKey[3];
/* local function forward declarations */
static bool IsCitusTableViaCatalog(Oid relationId);
static ShardIdIndexSlot * LookupShardIdIndexSlot(ShardIdIndexSlot *table,
size_t tableSize, uint64 shardId);
static CitusTableCacheEntry * LookupShardIndex(int64 shardId, int *shardIndex);
static ShardIdCacheEntry * LookupShardIdCacheEntry(int64 shardId);
static CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId);
static CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId);
static void BuildCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry);
static CitusTableCacheEntry * BuildCitusTableCacheEntry(Oid relationId);
static void BuildCachedShardList(CitusTableCacheEntry *cacheEntry);
static void PrepareWorkerNodeCache(void);
static bool CheckInstalledVersion(int elevel);
@ -211,7 +226,9 @@ static void RegisterLocalGroupIdCacheCallbacks(void);
static void RegisterCitusTableCacheEntryReleaseCallbacks(void);
static uint32 WorkerNodeHashCode(const void *key, Size keySize);
static void ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry);
static void RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *tableEntry);
static void CreateDistTableCache(void);
static void CreateShardIdCache(void);
static void CreateDistObjectCache(void);
static void InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId);
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
@ -235,8 +252,11 @@ static ShardPlacement * ResolveGroupShardPlacement(
GroupShardPlacement *groupShardPlacement, CitusTableCacheEntry *tableEntry,
int shardIndex);
static Oid LookupEnumValueId(Oid typeId, char *valueName);
static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSlot);
static void InvalidateDistTableCache(void);
static void InvalidateDistObjectCache(void);
static void InitializeTableCacheEntry(int64 shardId);
static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry);
/* exports for SQL callable functions */
@ -280,18 +300,7 @@ EnsureModificationsCanRun(void)
bool
IsCitusTable(Oid relationId)
{
CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(relationId);
/*
* If extension hasn't been created, or has the wrong version and the
* table isn't a distributed one, LookupCitusTableCacheEntry() will return NULL.
*/
if (!cacheEntry)
{
return false;
}
return cacheEntry->isCitusTable;
return LookupCitusTableCacheEntry(relationId) != NULL;
}
@ -349,7 +358,6 @@ CitusTableList(void)
foreach_oid(relationId, distTableOidList)
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
Assert(cacheEntry->isCitusTable);
distributedTableList = lappend(distributedTableList, cacheEntry);
}
@ -367,10 +375,9 @@ CitusTableList(void)
ShardInterval *
LoadShardInterval(uint64 shardId)
{
int shardIndex = 0;
CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex);
Assert(tableEntry->isCitusTable);
ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId);
CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry;
int shardIndex = shardIdEntry->shardIndex;
/* the offset better be in a valid range */
Assert(shardIndex < tableEntry->shardIntervalArrayLength);
@ -391,7 +398,9 @@ LoadShardInterval(uint64 shardId)
Oid
RelationIdForShard(uint64 shardId)
{
return LookupShardRelation(shardId, false);
ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId);
CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry;
return tableEntry->relationId;
}
@ -402,9 +411,9 @@ RelationIdForShard(uint64 shardId)
bool
ReferenceTableShardId(uint64 shardId)
{
Oid relationId = LookupShardRelation(shardId, false);
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
char partitionMethod = cacheEntry->partitionMethod;
ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId);
CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry;
char partitionMethod = tableEntry->partitionMethod;
return partitionMethod == DISTRIBUTE_BY_NONE;
}
@ -419,8 +428,9 @@ ReferenceTableShardId(uint64 shardId)
GroupShardPlacement *
LoadGroupShardPlacement(uint64 shardId, uint64 placementId)
{
int shardIndex = 0;
CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex);
ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId);
CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry;
int shardIndex = shardIdEntry->shardIndex;
/* the offset better be in a valid range */
Assert(shardIndex < tableEntry->shardIntervalArrayLength);
@ -453,11 +463,12 @@ LoadGroupShardPlacement(uint64 shardId, uint64 placementId)
ShardPlacement *
LoadShardPlacement(uint64 shardId, uint64 placementId)
{
int shardIndex = 0;
CitusTableCacheEntry *cacheEntry = LookupShardIndex(shardId, &shardIndex);
ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId);
CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry;
int shardIndex = shardIdEntry->shardIndex;
GroupShardPlacement *groupPlacement = LoadGroupShardPlacement(shardId, placementId);
ShardPlacement *nodePlacement = ResolveGroupShardPlacement(groupPlacement,
cacheEntry, shardIndex);
tableEntry, shardIndex);
return nodePlacement;
}
@ -473,8 +484,9 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
{
ShardPlacement *placementOnNode = NULL;
int shardIndex = 0;
CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex);
ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId);
CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry;
int shardIndex = shardIdEntry->shardIndex;
GroupShardPlacement *placementArray =
tableEntry->arrayOfPlacementArrays[shardIndex];
int numberOfPlacements =
@ -658,8 +670,9 @@ ShardPlacementList(uint64 shardId)
{
List *placementList = NIL;
int shardIndex = 0;
CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex);
ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId);
CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry;
int shardIndex = shardIdEntry->shardIndex;
/* the offset better be in a valid range */
Assert(shardIndex < tableEntry->shardIntervalArrayLength);
@ -691,63 +704,100 @@ ShardPlacementList(uint64 shardId)
/*
* LookupShardIdIndexSlot returns the hash entry mapping shard id to index.
* The returned slot's shardId will be 0 when not found.
* While initializing the caller will set shardId to the searched value,
* while searching the caller will ignore the slot.
* There should always be spare capacity; this fails to halt otherwise.
* InitializeTableCacheEntry initializes a shard in cache. A possible reason
* for not finding an entry in the cache is that the distributed table's cache
* entry hasn't been accessed yet. Thus look up the distributed table, and
* build the cache entry. Afterwards we know that the shard has to be in the
* cache if it exists. If the shard does *not* exist, this function errors
* (because LookupShardRelationFromCatalog errors out).
*/
static ShardIdIndexSlot *
LookupShardIdIndexSlot(ShardIdIndexSlot *table, size_t tableSize, uint64 shardId)
static void
InitializeTableCacheEntry(int64 shardId)
{
Assert(shardId);
bool missingOk = false;
Oid relationId = LookupShardRelationFromCatalog(shardId, missingOk);
size_t tableIndex = shardId % tableSize;
ShardIdIndexSlot *slot = table + tableIndex;
while (slot->shardId != 0 && slot->shardId != shardId)
{
tableIndex++;
if (tableIndex >= tableSize)
{
tableIndex = 0;
}
slot = table + tableIndex;
}
return slot;
/* trigger building the cache for the shard id */
GetCitusTableCacheEntry(relationId);
}
/*
* LookupShardIndex returns the cache entry belonging to a shard, or
* errors out if that shard is unknown. shardIndex is set to the index
* for the shardId in sortedShardIntervalArray.
* RefreshInvalidTableCacheEntry checks if the cache entry is still valid and
* refreshes it in cache when it's not. It returns true if it refreshed the
* entry in the cache and false if it didn't.
*/
static CitusTableCacheEntry *
LookupShardIndex(int64 shardId, int *shardIndex)
static bool
RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry)
{
/*
* We might have some concurrent metadata changes. In order to get the changes,
* we first need to accept the cache invalidation messages.
*/
AcceptInvalidationMessages();
if (shardEntry->tableEntry->isValid)
{
return false;
}
Oid oldRelationId = shardEntry->tableEntry->relationId;
Oid currentRelationId = LookupShardRelationFromCatalog(shardEntry->shardId, false);
/*
* The relation OID to which the shard belongs could have changed,
* most notably when the extension is dropped and a shard ID is
* reused. Reload the cache entries for both old and new relation
* ID and then look up the shard entry again.
*/
LookupCitusTableCacheEntry(oldRelationId);
LookupCitusTableCacheEntry(currentRelationId);
return true;
}
/*
* LookupShardCacheEntry returns the cache entry belonging to a shard, or
* errors out if that shard is unknown.
*/
static ShardIdCacheEntry *
LookupShardIdCacheEntry(int64 shardId)
{
bool foundInCache = false;
bool recheck = false;
Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING));
InitializeCaches();
Oid relationId = LookupShardRelation(shardId, false);
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
ShardIdCacheEntry *shardEntry =
hash_search(ShardIdCacheHash, &shardId, HASH_FIND, &foundInCache);
ShardIdIndexSlot *idIndexSlot =
LookupShardIdIndexSlot(tableEntry->shardIdIndexHash,
tableEntry->shardIntervalArrayLength * 2, shardId);
if (idIndexSlot->shardId != 0)
if (!foundInCache)
{
*shardIndex = idIndexSlot->shardIndex;
return tableEntry;
InitializeTableCacheEntry(shardId);
recheck = true;
}
else
{
recheck = RefreshTableCacheEntryIfInvalid(shardEntry);
}
/*
* If we (re-)loaded the table cache, re-search the shard cache - the
* shard index might have changed. If we still can't find the entry, it
* can't exist.
*/
if (recheck)
{
shardEntry = hash_search(ShardIdCacheHash, &shardId, HASH_FIND, &foundInCache);
if (!foundInCache)
{
ereport(ERROR, (errmsg("could not find valid entry for shard "
UINT64_FORMAT " in table %d", shardId, relationId)));
return NULL;
UINT64_FORMAT, shardId)));
}
}
return shardEntry;
}
@ -763,7 +813,7 @@ GetCitusTableCacheEntry(Oid distributedRelationId)
CitusTableCacheEntry *cacheEntry =
LookupCitusTableCacheEntry(distributedRelationId);
if (cacheEntry && cacheEntry->isCitusTable)
if (cacheEntry)
{
return cacheEntry;
}
@ -786,7 +836,8 @@ GetCitusTableCacheEntry(Oid distributedRelationId)
/*
* GetCitusTableCacheEntry returns the distributed table metadata for the
* passed relationId. For efficiency it caches lookups.
* passed relationId. For efficiency it caches lookups. This function returns
* NULL if the relation isn't a distributed table.
*/
static CitusTableCacheEntry *
LookupCitusTableCacheEntry(Oid relationId)
@ -844,16 +895,37 @@ LookupCitusTableCacheEntry(Oid relationId)
/* return valid matches */
if (foundInCache)
{
Assert(cacheSlot->data->isValid);
return cacheSlot->data;
if (cacheSlot->isValid)
{
return cacheSlot->citusTableMetadata;
}
else
{
/*
* An invalidation was received or we encountered an OOM while building
* the cache entry. We need to rebuild it.
*/
if (cacheSlot->citusTableMetadata)
{
/*
* The CitusTableCacheEntry might still be in use. We therefore do
* not reset it until the end of the transaction.
*/
MemoryContext oldContext =
MemoryContextSwitchTo(MetadataCacheMemoryContext);
DistTableCacheExpired = lappend(DistTableCacheExpired,
cacheSlot->citusTableMetadata);
MemoryContextSwitchTo(oldContext);
}
}
}
/* zero out entry, but not the key part */
memset(((char *) cacheSlot) + sizeof(Oid), 0,
sizeof(CitusTableCacheEntrySlot) - sizeof(Oid));
cacheSlot->data =
MemoryContextAllocZero(MetadataCacheMemoryContext, sizeof(CitusTableCacheEntry));
cacheSlot->data->relationId = relationId;
/*
* We disable interrupts while creating the cache entry because loading
@ -863,15 +935,17 @@ LookupCitusTableCacheEntry(Oid relationId)
*/
HOLD_INTERRUPTS();
/* actually fill out entry */
BuildCitusTableCacheEntry(cacheSlot->data);
cacheSlot->citusTableMetadata = BuildCitusTableCacheEntry(relationId);
/* and finally mark as valid */
cacheSlot->data->isValid = true;
/*
* Mark it as valid only after building the full entry, such that any
* error that happened during the build would trigger a rebuild.
*/
cacheSlot->isValid = true;
RESUME_INTERRUPTS();
return cacheSlot->data;
return cacheSlot->citusTableMetadata;
}
@ -980,31 +1054,34 @@ LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid)
/*
* BuildCitusTableCacheEntry is a helper routine for
* LookupCitusTableCacheEntry() for building the cache contents.
* This function returns NULL if the relation isn't a distributed table.
*/
static void
BuildCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
static CitusTableCacheEntry *
BuildCitusTableCacheEntry(Oid relationId)
{
Relation pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
HeapTuple distPartitionTuple =
LookupDistPartitionTuple(pgDistPartition, relationId);
if (distPartitionTuple == NULL)
{
/* not a distributed table, done */
heap_close(pgDistPartition, NoLock);
return NULL;
}
MemoryContext oldContext = NULL;
Datum datumArray[Natts_pg_dist_partition];
bool isNullArray[Natts_pg_dist_partition];
Relation pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
HeapTuple distPartitionTuple =
LookupDistPartitionTuple(pgDistPartition, cacheEntry->relationId);
/* not a distributed table, done */
if (distPartitionTuple == NULL)
{
cacheEntry->isCitusTable = false;
heap_close(pgDistPartition, NoLock);
return;
}
cacheEntry->isCitusTable = true;
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
heap_deform_tuple(distPartitionTuple, tupleDescriptor, datumArray, isNullArray);
CitusTableCacheEntry *cacheEntry =
MemoryContextAllocZero(MetadataCacheMemoryContext, sizeof(CitusTableCacheEntry));
cacheEntry->relationId = relationId;
cacheEntry->partitionMethod = datumArray[Anum_pg_dist_partition_partmethod - 1];
Datum partitionKeyDatum = datumArray[Anum_pg_dist_partition_partkey - 1];
bool partitionKeyIsNull = isNullArray[Anum_pg_dist_partition_partkey - 1];
@ -1090,6 +1167,10 @@ BuildCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
MemoryContextSwitchTo(oldContext);
heap_close(pgDistPartition, NoLock);
cacheEntry->isValid = true;
return cacheEntry;
}
@ -1243,16 +1324,23 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray;
cacheEntry->shardIntervalArrayLength = 0;
cacheEntry->shardIdIndexHash =
MemoryContextAllocZero(MetadataCacheMemoryContext,
sizeof(ShardIdIndexSlot) * shardIntervalArrayLength * 2);
/* maintain shardId->(table,ShardInterval) cache */
for (int shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++)
{
ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex];
int64 shardId = shardInterval->shardId;
int placementOffset = 0;
/*
* Enable quick lookups of this shard ID by adding it to ShardIdCacheHash
* or overwriting the previous values.
*/
ShardIdCacheEntry *shardIdCacheEntry =
hash_search(ShardIdCacheHash, &shardId, HASH_ENTER, NULL);
shardIdCacheEntry->tableEntry = cacheEntry;
shardIdCacheEntry->shardIndex = shardIndex;
/*
* We should increment this only after we are sure this hasn't already
* been assigned to any other relations. ResetCitusTableCacheEntry()
@ -1281,13 +1369,6 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
/* store the shard index in the ShardInterval */
shardInterval->shardIndex = shardIndex;
ShardIdIndexSlot *idIndexSlot =
LookupShardIdIndexSlot(cacheEntry->shardIdIndexHash,
shardIntervalArrayLength * 2, shardInterval->shardId);
Assert(idIndexSlot->shardId == 0);
idIndexSlot->shardId = shardInterval->shardId;
idIndexSlot->shardIndex = shardIndex;
}
cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction;
@ -2865,6 +2946,7 @@ InitializeCaches(void)
MetadataCacheMemoryContext = NULL;
DistTableCacheHash = NULL;
DistTableCacheExpired = NIL;
ShardIdCacheHash = NULL;
PG_RE_THROW();
}
@ -2899,6 +2981,7 @@ InitializeDistCache(void)
DistShardScanKey[0].sk_attno = Anum_pg_dist_shard_logicalrelid;
CreateDistTableCache();
CreateShardIdCache();
InitializeDistObjectCache();
@ -3249,17 +3332,14 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
cacheEntry->partitionColumn = NULL;
}
if (cacheEntry->shardIdIndexHash != NULL)
{
pfree(cacheEntry->shardIdIndexHash);
cacheEntry->shardIdIndexHash = NULL;
}
if (cacheEntry->shardIntervalArrayLength == 0)
{
return;
}
/* clean up ShardIdCacheHash */
RemoveStaleShardIdCacheEntries(cacheEntry);
for (int shardIndex = 0; shardIndex < cacheEntry->shardIntervalArrayLength;
shardIndex++)
{
@ -3327,6 +3407,35 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
}
/*
* RemoveShardIdCacheEntries removes all shard ID cache entries belonging to the
* given table entry. If the shard ID belongs to a different (newer) table entry,
* we leave it in place.
*/
static void
RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *invalidatedTableEntry)
{
int shardIndex = 0;
int shardCount = invalidatedTableEntry->shardIntervalArrayLength;
for (shardIndex = 0; shardIndex < shardCount; shardIndex++)
{
ShardInterval *shardInterval =
invalidatedTableEntry->sortedShardIntervalArray[shardIndex];
int64 shardId = shardInterval->shardId;
bool foundInCache = false;
ShardIdCacheEntry *shardIdCacheEntry =
hash_search(ShardIdCacheHash, &shardId, HASH_FIND, &foundInCache);
if (foundInCache && shardIdCacheEntry->tableEntry == invalidatedTableEntry)
{
hash_search(ShardIdCacheHash, &shardId, HASH_REMOVE, &foundInCache);
}
}
}
/*
* InvalidateForeignRelationGraphCacheCallback invalidates the foreign key relation
* graph and entire distributed cache entries.
@ -3384,15 +3493,11 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
void *hashKey = (void *) &relationId;
bool foundInCache = false;
CitusTableCacheEntrySlot *cacheSlot =
hash_search(DistTableCacheHash, hashKey, HASH_REMOVE, &foundInCache);
hash_search(DistTableCacheHash, hashKey, HASH_FIND, &foundInCache);
if (foundInCache)
{
cacheSlot->data->isValid = false;
MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
DistTableCacheExpired = lappend(DistTableCacheExpired, cacheSlot->data);
MemoryContextSwitchTo(oldContext);
InvalidateCitusTableCacheEntrySlot(cacheSlot);
}
/*
@ -3413,6 +3518,25 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
}
/*
* InvalidateCitusTableCacheEntrySlot marks a CitusTableCacheEntrySlot as invalid,
* meaning it needs to be rebuilt and the citusTableMetadata (if any) should be
* released.
*/
static void
InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSlot)
{
/* recheck whether this is a distributed table */
cacheSlot->isValid = false;
if (cacheSlot->citusTableMetadata != NULL)
{
/* reload the metadata */
cacheSlot->citusTableMetadata->isValid = false;
}
}
/*
* InvalidateDistTableCache marks all DistTableCacheHash entries invalid.
*/
@ -3426,15 +3550,7 @@ InvalidateDistTableCache(void)
while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL)
{
bool foundInCache = false;
CitusTableCacheEntrySlot *removedSlot PG_USED_FOR_ASSERTS_ONLY =
hash_search(DistTableCacheHash, cacheSlot, HASH_REMOVE, &foundInCache);
Assert(removedSlot == cacheSlot);
cacheSlot->data->isValid = false;
MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
DistTableCacheExpired = lappend(DistTableCacheExpired, cacheSlot->data);
MemoryContextSwitchTo(oldContext);
InvalidateCitusTableCacheEntrySlot(cacheSlot);
}
}
@ -3471,14 +3587,13 @@ FlushDistTableCache(void)
while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL)
{
if (cacheSlot->data->isValid)
{
ResetCitusTableCacheEntry(cacheSlot->data);
}
ResetCitusTableCacheEntry(cacheSlot->citusTableMetadata);
}
hash_destroy(DistTableCacheHash);
hash_destroy(ShardIdCacheHash);
CreateDistTableCache();
CreateShardIdCache();
}
@ -3498,6 +3613,22 @@ CreateDistTableCache(void)
}
/* CreateShardIdCache initializes the shard ID mapping */
static void
CreateShardIdCache(void)
{
HASHCTL info;
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(int64);
info.entrysize = sizeof(ShardIdCacheEntry);
info.hash = tag_hash;
info.hcxt = MetadataCacheMemoryContext;
ShardIdCacheHash =
hash_create("Shard Id Cache", 128, &info,
HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
}
/* CreateDistObjectCache initializes the per-object hash table */
static void
CreateDistObjectCache(void)
@ -3754,13 +3885,13 @@ LookupDistShardTuples(Oid relationId)
/*
* LookupShardRelation returns the logical relation oid a shard belongs to.
* LookupShardRelationFromCatalog returns the logical relation oid a shard belongs to.
*
* Errors out if the shardId does not exist and missingOk is false.
* Returns InvalidOid if the shardId does not exist and missingOk is true.
*/
Oid
LookupShardRelation(int64 shardId, bool missingOk)
LookupShardRelationFromCatalog(int64 shardId, bool missingOk)
{
ScanKeyData scanKey[1];
int scanKeyCount = 1;

View File

@ -171,7 +171,7 @@ RelationIsAKnownShard(Oid shardRelationId, bool onlySearchPath)
}
/* try to get the relation id */
Oid relationId = LookupShardRelation(shardId, true);
Oid relationId = LookupShardRelationFromCatalog(shardId, true);
if (!OidIsValid(relationId))
{
/* there is no such relation */

View File

@ -38,10 +38,6 @@ extern int ReadFromSecondaries;
#define GROUP_ID_UPGRADING -2
/* internal type used by metadata_cache.c to cache shard indexes */
struct ShardIdIndexSlot;
/*
* Representation of a table's metadata that is frequently used for
* distributed execution. Cached.
@ -57,7 +53,6 @@ typedef struct
*/
bool isValid;
bool isCitusTable;
bool hasUninitializedShardInterval;
bool hasUniformHashDistribution; /* valid for hash partitioned tables */
bool hasOverlappingShardInterval;
@ -73,9 +68,6 @@ typedef struct
int shardIntervalArrayLength;
ShardInterval **sortedShardIntervalArray;
/* map of shardId to index in sortedShardIntervalArray */
struct ShardIdIndexSlot *shardIdIndexHash;
/* comparator for partition column's type, NULL if DISTRIBUTE_BY_NONE */
FmgrInfo *shardColumnCompareFunction;
@ -140,7 +132,7 @@ extern int32 GetLocalGroupId(void);
extern List * DistTableOidList(void);
extern List * ReferenceTableOidList(void);
extern void CitusTableCacheFlushInvalidatedEntries(void);
extern Oid LookupShardRelation(int64 shardId, bool missing_ok);
extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok);
extern List * ShardPlacementList(uint64 shardId);
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId);