Merge pull request #3866 from citusdata/release-cache-entry-deferred

Deferred release of metadata cache entries
pull/3911/head
Philip Dubé 2020-06-15 16:41:02 +00:00 committed by GitHub
commit 56eb5ee305
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 239 additions and 250 deletions

View File

@ -210,37 +210,8 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
distSelectJob->jobId); distSelectJob->jobId);
char *distResultPrefix = distResultPrefixString->data; char *distResultPrefix = distResultPrefixString->data;
CitusTableCacheEntry *cachedTargetRelation = CitusTableCacheEntry *targetRelation =
GetCitusTableCacheEntry(targetRelationId); GetCitusTableCacheEntry(targetRelationId);
CitusTableCacheEntry *targetRelation = palloc(sizeof(CitusTableCacheEntry));
*targetRelation = *cachedTargetRelation;
#if PG_USE_ASSERT_CHECKING
/*
* These fields aren't used in the code which follows,
* therefore in assert builds NULL these fields to
* segfault if they were to be used.
*/
targetRelation->partitionKeySTring = NULL;
targetRelation->shardIntervalCompareFunction = NULL;
targetRelation->hashFunction = NULL;
targetRelation->arrayOfPlacementArrayLengths = NULL;
targetRelation->arrayOfPlacementArrays = NULL;
targetRelation->referencedRelationViaForeignKey = NULL;
targetRelation->referencingRelationsViaForeignKey = NULL;
#endif
targetRelation->partitionColumn = copyObject(
cachedTargetRelation->partitionColumn);
targetRelation->sortedShardIntervalArray =
palloc(targetRelation->shardIntervalArrayLength * sizeof(ShardInterval));
for (int shardIndex = 0; shardIndex <
targetRelation->shardIntervalArrayLength; shardIndex++)
{
targetRelation->sortedShardIntervalArray[shardIndex] =
CopyShardInterval(
cachedTargetRelation->sortedShardIntervalArray[shardIndex]);
}
int partitionColumnIndex = int partitionColumnIndex =
PartitionColumnIndex(insertTargetList, targetRelation->partitionColumn); PartitionColumnIndex(insertTargetList, targetRelation->partitionColumn);

View File

@ -210,6 +210,11 @@ CitusExecutorRun(QueryDesc *queryDesc,
} }
ExecutorLevel--; ExecutorLevel--;
if (ExecutorLevel == 0 && PlannerLevel == 0)
{
CitusTableCacheFlushInvalidatedEntries();
}
} }
PG_CATCH(); PG_CATCH();
{ {
@ -220,6 +225,11 @@ CitusExecutorRun(QueryDesc *queryDesc,
ExecutorLevel--; ExecutorLevel--;
if (ExecutorLevel == 0 && PlannerLevel == 0)
{
CitusTableCacheFlushInvalidatedEntries();
}
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();

View File

@ -73,6 +73,7 @@
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/relfilenodemap.h" #include "utils/relfilenodemap.h"
#include "utils/relmapper.h" #include "utils/relmapper.h"
#include "utils/resowner.h"
#include "utils/syscache.h" #include "utils/syscache.h"
#include "utils/typcache.h" #include "utils/typcache.h"
@ -82,36 +83,30 @@ int ReadFromSecondaries = USE_SECONDARY_NODES_NEVER;
/* /*
* ShardCacheEntry represents an entry in the shardId -> ShardInterval cache. * CitusTableCacheEntrySlot is entry type for DistTableCacheHash,
* To avoid duplicating data and invalidation logic between this cache and the * entry data outlives slot on invalidation, so requires indirection.
* DistTableCache, this only points into the CitusTableCacheEntry of the
* shard's distributed table.
*/ */
typedef struct ShardCacheEntry typedef struct CitusTableCacheEntrySlot
{ {
/* hash key, needs to be first */ /* lookup key - must be first. A pg_class.oid oid. */
int64 shardId; Oid relationId;
CitusTableCacheEntry *data;
} CitusTableCacheEntrySlot;
/*
* Cache entry for the distributed table a shard belongs to, possibly not
* valid.
*/
CitusTableCacheEntry *tableEntry;
/* /*
* Offset in tableEntry->sortedShardIntervalArray, only valid if * ShardIdIndexSlot is entry type for CitusTableCacheEntry's ShardIdIndexHash.
* tableEntry->isValid. We don't store pointers to the individual shard
* placements because that'd make invalidation a bit more complicated, and
* because there's simply no need.
*/ */
typedef struct ShardIdIndexSlot
{
uint64 shardId;
int shardIndex; int shardIndex;
} ShardCacheEntry; } ShardIdIndexSlot;
/* /*
* State which should be cleared upon DROP EXTENSION. When the configuration * State which should be cleared upon DROP EXTENSION. When the configuration
* changes, e.g. because the extension is dropped, these summarily get set to * changes, e.g. because extension is dropped, these summarily get set to 0.
* 0.
*/ */
typedef struct MetadataCacheData typedef struct MetadataCacheData
{ {
@ -169,9 +164,9 @@ static bool citusVersionKnownCompatible = false;
/* Hash table for informations about each partition */ /* Hash table for informations about each partition */
static HTAB *DistTableCacheHash = NULL; static HTAB *DistTableCacheHash = NULL;
static List *DistTableCacheExpired = NIL;
/* Hash table for informations about each shard */ /* Hash table for informations about each shard */
static HTAB *DistShardCacheHash = NULL;
static MemoryContext MetadataCacheMemoryContext = NULL; static MemoryContext MetadataCacheMemoryContext = NULL;
/* Hash table for information about each object */ /* Hash table for information about each object */
@ -194,7 +189,10 @@ static ScanKeyData DistObjectScanKey[3];
/* local function forward declarations */ /* local function forward declarations */
static bool IsCitusTableViaCatalog(Oid relationId); static bool IsCitusTableViaCatalog(Oid relationId);
static ShardCacheEntry * LookupShardCacheEntry(int64 shardId); static ShardIdIndexSlot * LookupShardIdIndexSlot(ShardIdIndexSlot *table,
size_t tableSize, uint64 shardId);
static CitusTableCacheEntry * LookupShardIndex(int64 shardId, int *shardIndex);
static CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId);
static CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId); static CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId);
static void BuildCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry); static void BuildCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry);
static void BuildCachedShardList(CitusTableCacheEntry *cacheEntry); static void BuildCachedShardList(CitusTableCacheEntry *cacheEntry);
@ -210,6 +208,7 @@ static void InitializeWorkerNodeCache(void);
static void RegisterForeignKeyGraphCacheCallbacks(void); static void RegisterForeignKeyGraphCacheCallbacks(void);
static void RegisterWorkerNodeCacheCallbacks(void); static void RegisterWorkerNodeCacheCallbacks(void);
static void RegisterLocalGroupIdCacheCallbacks(void); static void RegisterLocalGroupIdCacheCallbacks(void);
static void RegisterCitusTableCacheEntryReleaseCallbacks(void);
static uint32 WorkerNodeHashCode(const void *key, Size keySize); static uint32 WorkerNodeHashCode(const void *key, Size keySize);
static void ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry); static void ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry);
static void CreateDistTableCache(void); static void CreateDistTableCache(void);
@ -218,6 +217,8 @@ static void InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid rela
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId);
static void CitusTableCacheEntryReleaseCallback(ResourceReleasePhase phase, bool isCommit,
bool isTopLevel, void *arg);
static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId);
static List * LookupDistShardTuples(Oid relationId); static List * LookupDistShardTuples(Oid relationId);
static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
@ -231,7 +232,8 @@ static void CachedRelationLookup(const char *relationName, Oid *cachedOid);
static void CachedRelationNamespaceLookup(const char *relationName, Oid relnamespace, static void CachedRelationNamespaceLookup(const char *relationName, Oid relnamespace,
Oid *cachedOid); Oid *cachedOid);
static ShardPlacement * ResolveGroupShardPlacement( static ShardPlacement * ResolveGroupShardPlacement(
GroupShardPlacement *groupShardPlacement, ShardCacheEntry *shardEntry); GroupShardPlacement *groupShardPlacement, CitusTableCacheEntry *tableEntry,
int shardIndex);
static Oid LookupEnumValueId(Oid typeId, char *valueName); static Oid LookupEnumValueId(Oid typeId, char *valueName);
static void InvalidateDistTableCache(void); static void InvalidateDistTableCache(void);
static void InvalidateDistObjectCache(void); static void InvalidateDistObjectCache(void);
@ -365,17 +367,16 @@ CitusTableList(void)
ShardInterval * ShardInterval *
LoadShardInterval(uint64 shardId) LoadShardInterval(uint64 shardId)
{ {
ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); int shardIndex = 0;
CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex);
CitusTableCacheEntry *tableEntry = shardEntry->tableEntry;
Assert(tableEntry->isCitusTable); Assert(tableEntry->isCitusTable);
/* the offset better be in a valid range */ /* the offset better be in a valid range */
Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength); Assert(shardIndex < tableEntry->shardIntervalArrayLength);
ShardInterval *sourceShardInterval = ShardInterval *sourceShardInterval =
tableEntry->sortedShardIntervalArray[shardEntry->shardIndex]; tableEntry->sortedShardIntervalArray[shardIndex];
/* copy value to return */ /* copy value to return */
ShardInterval *shardInterval = CopyShardInterval(sourceShardInterval); ShardInterval *shardInterval = CopyShardInterval(sourceShardInterval);
@ -385,19 +386,12 @@ LoadShardInterval(uint64 shardId)
/* /*
* RelationIdOfShard returns the relationId of the given * RelationIdOfShard returns the relationId of the given shardId.
* shardId.
*/ */
Oid Oid
RelationIdForShard(uint64 shardId) RelationIdForShard(uint64 shardId)
{ {
ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); return LookupShardRelation(shardId, false);
CitusTableCacheEntry *tableEntry = shardEntry->tableEntry;
Assert(tableEntry->isCitusTable);
return tableEntry->relationId;
} }
@ -408,10 +402,11 @@ RelationIdForShard(uint64 shardId)
bool bool
ReferenceTableShardId(uint64 shardId) ReferenceTableShardId(uint64 shardId)
{ {
ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); Oid relationId = LookupShardRelation(shardId, false);
CitusTableCacheEntry *tableEntry = shardEntry->tableEntry; CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
char partitionMethod = cacheEntry->partitionMethod;
return (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE); return partitionMethod == DISTRIBUTE_BY_NONE;
} }
@ -424,16 +419,16 @@ ReferenceTableShardId(uint64 shardId)
GroupShardPlacement * GroupShardPlacement *
LoadGroupShardPlacement(uint64 shardId, uint64 placementId) LoadGroupShardPlacement(uint64 shardId, uint64 placementId)
{ {
ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); int shardIndex = 0;
CitusTableCacheEntry *tableEntry = shardEntry->tableEntry; CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex);
/* the offset better be in a valid range */ /* the offset better be in a valid range */
Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength); Assert(shardIndex < tableEntry->shardIntervalArrayLength);
GroupShardPlacement *placementArray = GroupShardPlacement *placementArray =
tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex]; tableEntry->arrayOfPlacementArrays[shardIndex];
int numberOfPlacements = int numberOfPlacements =
tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex]; tableEntry->arrayOfPlacementArrayLengths[shardIndex];
for (int i = 0; i < numberOfPlacements; i++) for (int i = 0; i < numberOfPlacements; i++)
{ {
@ -458,10 +453,11 @@ LoadGroupShardPlacement(uint64 shardId, uint64 placementId)
ShardPlacement * ShardPlacement *
LoadShardPlacement(uint64 shardId, uint64 placementId) LoadShardPlacement(uint64 shardId, uint64 placementId)
{ {
ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); int shardIndex = 0;
CitusTableCacheEntry *cacheEntry = LookupShardIndex(shardId, &shardIndex);
GroupShardPlacement *groupPlacement = LoadGroupShardPlacement(shardId, placementId); GroupShardPlacement *groupPlacement = LoadGroupShardPlacement(shardId, placementId);
ShardPlacement *nodePlacement = ResolveGroupShardPlacement(groupPlacement, ShardPlacement *nodePlacement = ResolveGroupShardPlacement(groupPlacement,
shardEntry); cacheEntry, shardIndex);
return nodePlacement; return nodePlacement;
} }
@ -477,12 +473,12 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
{ {
ShardPlacement *placementOnNode = NULL; ShardPlacement *placementOnNode = NULL;
ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); int shardIndex = 0;
CitusTableCacheEntry *tableEntry = shardEntry->tableEntry; CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex);
GroupShardPlacement *placementArray = GroupShardPlacement *placementArray =
tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex]; tableEntry->arrayOfPlacementArrays[shardIndex];
int numberOfPlacements = int numberOfPlacements =
tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex]; tableEntry->arrayOfPlacementArrayLengths[shardIndex];
for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++) for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++)
{ {
@ -490,7 +486,8 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
if (placement->groupId == groupId) if (placement->groupId == groupId)
{ {
placementOnNode = ResolveGroupShardPlacement(placement, shardEntry); placementOnNode = ResolveGroupShardPlacement(placement, tableEntry,
shardIndex);
break; break;
} }
} }
@ -505,10 +502,9 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
*/ */
static ShardPlacement * static ShardPlacement *
ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
ShardCacheEntry *shardEntry) CitusTableCacheEntry *tableEntry,
int shardIndex)
{ {
CitusTableCacheEntry *tableEntry = shardEntry->tableEntry;
int shardIndex = shardEntry->shardIndex;
ShardInterval *shardInterval = tableEntry->sortedShardIntervalArray[shardIndex]; ShardInterval *shardInterval = tableEntry->sortedShardIntervalArray[shardIndex];
ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement); ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement);
@ -662,22 +658,23 @@ ShardPlacementList(uint64 shardId)
{ {
List *placementList = NIL; List *placementList = NIL;
ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); int shardIndex = 0;
CitusTableCacheEntry *tableEntry = shardEntry->tableEntry; CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex);
/* the offset better be in a valid range */ /* the offset better be in a valid range */
Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength); Assert(shardIndex < tableEntry->shardIntervalArrayLength);
GroupShardPlacement *placementArray = GroupShardPlacement *placementArray =
tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex]; tableEntry->arrayOfPlacementArrays[shardIndex];
int numberOfPlacements = int numberOfPlacements =
tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex]; tableEntry->arrayOfPlacementArrayLengths[shardIndex];
for (int i = 0; i < numberOfPlacements; i++) for (int i = 0; i < numberOfPlacements; i++)
{ {
GroupShardPlacement *groupShardPlacement = &placementArray[i]; GroupShardPlacement *groupShardPlacement = &placementArray[i];
ShardPlacement *shardPlacement = ResolveGroupShardPlacement(groupShardPlacement, ShardPlacement *shardPlacement = ResolveGroupShardPlacement(groupShardPlacement,
shardEntry); tableEntry,
shardIndex);
placementList = lappend(placementList, shardPlacement); placementList = lappend(placementList, shardPlacement);
} }
@ -694,82 +691,63 @@ ShardPlacementList(uint64 shardId)
/* /*
* LookupShardCacheEntry returns the cache entry belonging to a shard, or * LookupShardIdIndexSlot returns the hash entry mapping shard id to index.
* errors out if that shard is unknown. * The returned slot's shardId will be 0 when not found.
* While initializing the caller will set shardId to the searched value,
* while searching the caller will ignore the slot.
* There should always be spare capacity; this fails to halt otherwise.
*/ */
static ShardCacheEntry * static ShardIdIndexSlot *
LookupShardCacheEntry(int64 shardId) LookupShardIdIndexSlot(ShardIdIndexSlot *table, size_t tableSize, uint64 shardId)
{ {
bool foundInCache = false; Assert(shardId);
bool recheck = false;
size_t tableIndex = shardId % tableSize;
ShardIdIndexSlot *slot = table + tableIndex;
while (slot->shardId != 0 && slot->shardId != shardId)
{
tableIndex++;
if (tableIndex >= tableSize)
{
tableIndex = 0;
}
slot = table + tableIndex;
}
return slot;
}
/*
* LookupShardIndex returns the cache entry belonging to a shard, or
* errors out if that shard is unknown. shardIndex is set to the index
* for the shardId in sortedShardIntervalArray.
*/
static CitusTableCacheEntry *
LookupShardIndex(int64 shardId, int *shardIndex)
{
Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING)); Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING));
InitializeCaches(); InitializeCaches();
/* lookup cache entry */
ShardCacheEntry *shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND,
&foundInCache);
if (!foundInCache)
{
/*
* A possible reason for not finding an entry in the cache is that the
* distributed table's cache entry hasn't been accessed. Thus look up
* the distributed table, and build the cache entry. Afterwards we
* know that the shard has to be in the cache if it exists. If the
* shard does *not* exist LookupShardRelation() will error out.
*/
Oid relationId = LookupShardRelation(shardId, false); Oid relationId = LookupShardRelation(shardId, false);
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
/* trigger building the cache for the shard id */ ShardIdIndexSlot *idIndexSlot =
LookupCitusTableCacheEntry(relationId); LookupShardIdIndexSlot(tableEntry->shardIdIndexHash,
tableEntry->shardIntervalArrayLength * 2, shardId);
recheck = true; if (idIndexSlot->shardId != 0)
}
else
{ {
/* *shardIndex = idIndexSlot->shardIndex;
* We might have some concurrent metadata changes. In order to get the changes, return tableEntry;
* we first need to accept the cache invalidation messages.
*/
AcceptInvalidationMessages();
if (!shardEntry->tableEntry->isValid)
{
Oid oldRelationId = shardEntry->tableEntry->relationId;
Oid currentRelationId = LookupShardRelation(shardId, false);
/*
* The relation OID to which the shard belongs could have changed,
* most notably when the extension is dropped and a shard ID is
* reused. Reload the cache entries for both old and new relation
* ID and then look up the shard entry again.
*/
LookupCitusTableCacheEntry(oldRelationId);
LookupCitusTableCacheEntry(currentRelationId);
recheck = true;
}
} }
/*
* If we (re-)loaded the table cache, re-search the shard cache - the
* shard index might have changed. If we still can't find the entry, it
* can't exist.
*/
if (recheck)
{
shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache);
if (!foundInCache)
{
ereport(ERROR, (errmsg("could not find valid entry for shard " ereport(ERROR, (errmsg("could not find valid entry for shard "
UINT64_FORMAT, shardId))); UINT64_FORMAT " in table %d", shardId, relationId)));
} return NULL;
}
return shardEntry;
} }
@ -855,31 +833,27 @@ LookupCitusTableCacheEntry(Oid relationId)
} }
} }
CitusTableCacheEntry *cacheEntry = hash_search(DistTableCacheHash, hashKey,
HASH_ENTER,
&foundInCache);
/* return valid matches */
if (foundInCache)
{
/* /*
* We might have some concurrent metadata changes. In order to get the changes, * We might have some concurrent metadata changes. In order to get the changes,
* we first need to accept the cache invalidation messages. * we first need to accept the cache invalidation messages.
*/ */
AcceptInvalidationMessages(); AcceptInvalidationMessages();
CitusTableCacheEntrySlot *cacheSlot =
hash_search(DistTableCacheHash, hashKey, HASH_ENTER, &foundInCache);
if (cacheEntry->isValid) /* return valid matches */
if (foundInCache)
{ {
return cacheEntry; Assert(cacheSlot->data->isValid);
} return cacheSlot->data;
/* free the content of old, invalid, entries */
ResetCitusTableCacheEntry(cacheEntry);
} }
/* zero out entry, but not the key part */ /* zero out entry, but not the key part */
memset(((char *) cacheEntry) + sizeof(Oid), 0, memset(((char *) cacheSlot) + sizeof(Oid), 0,
sizeof(CitusTableCacheEntry) - sizeof(Oid)); sizeof(CitusTableCacheEntrySlot) - sizeof(Oid));
cacheSlot->data =
MemoryContextAllocZero(MetadataCacheMemoryContext, sizeof(CitusTableCacheEntry));
cacheSlot->data->relationId = relationId;
/* /*
* We disable interrupts while creating the cache entry because loading * We disable interrupts while creating the cache entry because loading
@ -890,14 +864,14 @@ LookupCitusTableCacheEntry(Oid relationId)
HOLD_INTERRUPTS(); HOLD_INTERRUPTS();
/* actually fill out entry */ /* actually fill out entry */
BuildCitusTableCacheEntry(cacheEntry); BuildCitusTableCacheEntry(cacheSlot->data);
/* and finally mark as valid */ /* and finally mark as valid */
cacheEntry->isValid = true; cacheSlot->data->isValid = true;
RESUME_INTERRUPTS(); RESUME_INTERRUPTS();
return cacheEntry; return cacheSlot->data;
} }
@ -1182,25 +1156,6 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
} }
heap_close(distShardRelation, AccessShareLock); heap_close(distShardRelation, AccessShareLock);
ShardInterval *firstShardInterval = shardIntervalArray[0];
bool foundInCache = false;
ShardCacheEntry *shardEntry = hash_search(DistShardCacheHash,
&firstShardInterval->shardId, HASH_FIND,
&foundInCache);
if (foundInCache && shardEntry->tableEntry != cacheEntry)
{
/*
* Normally, all shard cache entries for a given DistTableEntry are removed
* before we get here. There is one exception: When a shard changes from
* one relation ID to another. That typically happens during metadata
* syncing when the distributed table is dropped and re-created without
* changing the shard IDs. That means that old relation no longer exists
* and we can safely wipe its entry, which will remove all corresponding
* shard cache entries.
*/
ResetCitusTableCacheEntry(shardEntry->tableEntry);
}
} }
/* look up value comparison function */ /* look up value comparison function */
@ -1285,32 +1240,19 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
ErrorIfInconsistentShardIntervals(cacheEntry); ErrorIfInconsistentShardIntervals(cacheEntry);
} }
/*
* We set these here, so ResetCitusTableCacheEntry() can see what has been
* entered into DistShardCacheHash even if the following loop is interrupted
* by throwing errors, etc.
*/
cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray;
cacheEntry->shardIntervalArrayLength = 0; cacheEntry->shardIntervalArrayLength = 0;
cacheEntry->shardIdIndexHash =
MemoryContextAllocZero(MetadataCacheMemoryContext,
sizeof(ShardIdIndexSlot) * shardIntervalArrayLength * 2);
/* maintain shardId->(table,ShardInterval) cache */ /* maintain shardId->(table,ShardInterval) cache */
for (int shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++) for (int shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++)
{ {
ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex]; ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex];
bool foundInCache = false;
int placementOffset = 0; int placementOffset = 0;
ShardCacheEntry *shardEntry = hash_search(DistShardCacheHash,
&shardInterval->shardId, HASH_ENTER,
&foundInCache);
if (foundInCache)
{
ereport(ERROR, (errmsg("cached metadata for shard " UINT64_FORMAT
" is inconsistent",
shardInterval->shardId),
errhint("Reconnect and try again.")));
}
/* /*
* We should increment this only after we are sure this hasn't already * We should increment this only after we are sure this hasn't already
* been assigned to any other relations. ResetCitusTableCacheEntry() * been assigned to any other relations. ResetCitusTableCacheEntry()
@ -1318,9 +1260,6 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
*/ */
cacheEntry->shardIntervalArrayLength++; cacheEntry->shardIntervalArrayLength++;
shardEntry->shardIndex = shardIndex;
shardEntry->tableEntry = cacheEntry;
/* build list of shard placements */ /* build list of shard placements */
List *placementList = BuildShardPlacementList(shardInterval); List *placementList = BuildShardPlacementList(shardInterval);
int numberOfPlacements = list_length(placementList); int numberOfPlacements = list_length(placementList);
@ -1342,6 +1281,13 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
/* store the shard index in the ShardInterval */ /* store the shard index in the ShardInterval */
shardInterval->shardIndex = shardIndex; shardInterval->shardIndex = shardIndex;
ShardIdIndexSlot *idIndexSlot =
LookupShardIdIndexSlot(cacheEntry->shardIdIndexHash,
shardIntervalArrayLength * 2, shardInterval->shardId);
Assert(idIndexSlot->shardId == 0);
idIndexSlot->shardId = shardInterval->shardId;
idIndexSlot->shardIndex = shardIndex;
} }
cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction; cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction;
@ -2905,6 +2851,7 @@ InitializeCaches(void)
RegisterForeignKeyGraphCacheCallbacks(); RegisterForeignKeyGraphCacheCallbacks();
RegisterWorkerNodeCacheCallbacks(); RegisterWorkerNodeCacheCallbacks();
RegisterLocalGroupIdCacheCallbacks(); RegisterLocalGroupIdCacheCallbacks();
RegisterCitusTableCacheEntryReleaseCallbacks();
} }
PG_CATCH(); PG_CATCH();
{ {
@ -2917,7 +2864,7 @@ InitializeCaches(void)
MetadataCacheMemoryContext = NULL; MetadataCacheMemoryContext = NULL;
DistTableCacheHash = NULL; DistTableCacheHash = NULL;
DistShardCacheHash = NULL; DistTableCacheExpired = NIL;
PG_RE_THROW(); PG_RE_THROW();
} }
@ -2930,8 +2877,6 @@ InitializeCaches(void)
static void static void
InitializeDistCache(void) InitializeDistCache(void)
{ {
HASHCTL info;
/* build initial scan keys, copied for every relation scan */ /* build initial scan keys, copied for every relation scan */
memset(&DistPartitionScanKey, 0, sizeof(DistPartitionScanKey)); memset(&DistPartitionScanKey, 0, sizeof(DistPartitionScanKey));
@ -2957,16 +2902,6 @@ InitializeDistCache(void)
InitializeDistObjectCache(); InitializeDistObjectCache();
/* initialize the per-shard hash table */
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(int64);
info.entrysize = sizeof(ShardCacheEntry);
info.hash = tag_hash;
info.hcxt = MetadataCacheMemoryContext;
DistShardCacheHash =
hash_create("Shard Cache", 32 * 64, &info,
HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
/* Watch for invalidation events. */ /* Watch for invalidation events. */
CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback, CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback,
(Datum) 0); (Datum) 0);
@ -3170,6 +3105,17 @@ RegisterWorkerNodeCacheCallbacks(void)
} }
/*
* RegisterCitusTableCacheEntryReleaseCallbacks registers callbacks to release
* cache entries. Data should be locked by callers to avoid staleness.
*/
static void
RegisterCitusTableCacheEntryReleaseCallbacks(void)
{
RegisterResourceReleaseCallback(CitusTableCacheEntryReleaseCallback, NULL);
}
/* /*
* GetLocalGroupId returns the group identifier of the local node. The function assumes * GetLocalGroupId returns the group identifier of the local node. The function assumes
* that pg_dist_local_node_group has exactly one row and has at least one column. * that pg_dist_local_node_group has exactly one row and has at least one column.
@ -3303,6 +3249,12 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
cacheEntry->partitionColumn = NULL; cacheEntry->partitionColumn = NULL;
} }
if (cacheEntry->shardIdIndexHash != NULL)
{
pfree(cacheEntry->shardIdIndexHash);
cacheEntry->shardIdIndexHash = NULL;
}
if (cacheEntry->shardIntervalArrayLength == 0) if (cacheEntry->shardIntervalArrayLength == 0)
{ {
return; return;
@ -3315,7 +3267,6 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
GroupShardPlacement *placementArray = GroupShardPlacement *placementArray =
cacheEntry->arrayOfPlacementArrays[shardIndex]; cacheEntry->arrayOfPlacementArrays[shardIndex];
bool valueByVal = shardInterval->valueByVal; bool valueByVal = shardInterval->valueByVal;
bool foundInCache = false;
/* delete the shard's placements */ /* delete the shard's placements */
if (placementArray != NULL) if (placementArray != NULL)
@ -3323,11 +3274,6 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
pfree(placementArray); pfree(placementArray);
} }
/* delete per-shard cache-entry */
hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_REMOVE,
&foundInCache);
Assert(foundInCache);
/* delete data pointed to by ShardInterval */ /* delete data pointed to by ShardInterval */
if (!valueByVal) if (!valueByVal)
{ {
@ -3376,6 +3322,8 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry)
cacheEntry->hasUninitializedShardInterval = false; cacheEntry->hasUninitializedShardInterval = false;
cacheEntry->hasUniformHashDistribution = false; cacheEntry->hasUniformHashDistribution = false;
cacheEntry->hasOverlappingShardInterval = false; cacheEntry->hasOverlappingShardInterval = false;
pfree(cacheEntry);
} }
@ -3437,11 +3385,14 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
bool foundInCache = false; bool foundInCache = false;
CitusTableCacheEntry *cacheEntry = hash_search(DistTableCacheHash, hashKey, CitusTableCacheEntrySlot *cacheSlot =
HASH_FIND, &foundInCache); hash_search(DistTableCacheHash, hashKey, HASH_REMOVE, &foundInCache);
if (foundInCache) if (foundInCache)
{ {
cacheEntry->isValid = false; cacheSlot->data->isValid = false;
MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
DistTableCacheExpired = lappend(DistTableCacheExpired, cacheSlot->data);
MemoryContextSwitchTo(oldContext);
} }
/* /*
@ -3468,14 +3419,22 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
static void static void
InvalidateDistTableCache(void) InvalidateDistTableCache(void)
{ {
CitusTableCacheEntry *cacheEntry = NULL; CitusTableCacheEntrySlot *cacheSlot = NULL;
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
hash_seq_init(&status, DistTableCacheHash); hash_seq_init(&status, DistTableCacheHash);
while ((cacheEntry = (CitusTableCacheEntry *) hash_seq_search(&status)) != NULL) while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL)
{ {
cacheEntry->isValid = false; bool foundInCache = false;
CitusTableCacheEntrySlot *removedSlot PG_USED_FOR_ASSERTS_ONLY =
hash_search(DistTableCacheHash, cacheSlot, HASH_REMOVE, &foundInCache);
Assert(removedSlot == cacheSlot);
cacheSlot->data->isValid = false;
MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
DistTableCacheExpired = lappend(DistTableCacheExpired, cacheSlot->data);
MemoryContextSwitchTo(oldContext);
} }
} }
@ -3505,14 +3464,17 @@ InvalidateDistObjectCache(void)
void void
FlushDistTableCache(void) FlushDistTableCache(void)
{ {
CitusTableCacheEntry *cacheEntry = NULL; CitusTableCacheEntrySlot *cacheSlot = NULL;
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
hash_seq_init(&status, DistTableCacheHash); hash_seq_init(&status, DistTableCacheHash);
while ((cacheEntry = (CitusTableCacheEntry *) hash_seq_search(&status)) != NULL) while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL)
{ {
ResetCitusTableCacheEntry(cacheEntry); if (cacheSlot->data->isValid)
{
ResetCitusTableCacheEntry(cacheSlot->data);
}
} }
hash_destroy(DistTableCacheHash); hash_destroy(DistTableCacheHash);
@ -3527,7 +3489,7 @@ CreateDistTableCache(void)
HASHCTL info; HASHCTL info;
MemSet(&info, 0, sizeof(info)); MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(Oid); info.keysize = sizeof(Oid);
info.entrysize = sizeof(CitusTableCacheEntry); info.entrysize = sizeof(CitusTableCacheEntrySlot);
info.hash = tag_hash; info.hash = tag_hash;
info.hcxt = MetadataCacheMemoryContext; info.hcxt = MetadataCacheMemoryContext;
DistTableCacheHash = DistTableCacheHash =
@ -3686,6 +3648,41 @@ InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId)
} }
/*
* CitusTableCacheFlushInvalidatedEntries frees invalidated cache entries.
* Invalidated entries aren't freed immediately as callers expect their lifetime
* to extend beyond that scope.
*/
void
CitusTableCacheFlushInvalidatedEntries()
{
if (DistTableCacheHash != NULL && DistTableCacheExpired != NIL)
{
CitusTableCacheEntry *cacheEntry = NULL;
foreach_ptr(cacheEntry, DistTableCacheExpired)
{
ResetCitusTableCacheEntry(cacheEntry);
}
list_free(DistTableCacheExpired);
DistTableCacheExpired = NIL;
}
}
/*
* CitusTableCacheEntryReleaseCallback frees invalidated cache entries.
*/
static void
CitusTableCacheEntryReleaseCallback(ResourceReleasePhase phase, bool isCommit,
bool isTopLevel, void *arg)
{
if (isTopLevel && phase == RESOURCE_RELEASE_LOCKS)
{
CitusTableCacheFlushInvalidatedEntries();
}
}
/* /*
* LookupDistPartitionTuple searches pg_dist_partition for relationId's entry * LookupDistPartitionTuple searches pg_dist_partition for relationId's entry
* and returns that or, if no matching entry was found, NULL. * and returns that or, if no matching entry was found, NULL.

View File

@ -336,6 +336,8 @@ CitusMaintenanceDaemonMain(Datum main_arg)
Assert(myDbData->workerPid == MyProcPid); Assert(myDbData->workerPid == MyProcPid);
CitusTableCacheFlushInvalidatedEntries();
/* /*
* XXX: Each task should clear the metadata cache before every iteration * XXX: Each task should clear the metadata cache before every iteration
* by calling InvalidateMetadataSystemCache(), because otherwise it * by calling InvalidateMetadataSystemCache(), because otherwise it

View File

@ -37,6 +37,11 @@ extern int ReadFromSecondaries;
*/ */
#define GROUP_ID_UPGRADING -2 #define GROUP_ID_UPGRADING -2
/* internal type used by metadata_cache.c to cache shard indexes */
struct ShardIdIndexSlot;
/* /*
* Representation of a table's metadata that is frequently used for * Representation of a table's metadata that is frequently used for
* distributed execution. Cached. * distributed execution. Cached.
@ -68,6 +73,9 @@ typedef struct
int shardIntervalArrayLength; int shardIntervalArrayLength;
ShardInterval **sortedShardIntervalArray; ShardInterval **sortedShardIntervalArray;
/* map of shardId to index in sortedShardIntervalArray */
struct ShardIdIndexSlot *shardIdIndexHash;
/* comparator for partition column's type, NULL if DISTRIBUTE_BY_NONE */ /* comparator for partition column's type, NULL if DISTRIBUTE_BY_NONE */
FmgrInfo *shardColumnCompareFunction; FmgrInfo *shardColumnCompareFunction;
@ -131,6 +139,7 @@ extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid,
extern int32 GetLocalGroupId(void); extern int32 GetLocalGroupId(void);
extern List * DistTableOidList(void); extern List * DistTableOidList(void);
extern List * ReferenceTableOidList(void); extern List * ReferenceTableOidList(void);
extern void CitusTableCacheFlushInvalidatedEntries(void);
extern Oid LookupShardRelation(int64 shardId, bool missing_ok); extern Oid LookupShardRelation(int64 shardId, bool missing_ok);
extern List * ShardPlacementList(uint64 shardId); extern List * ShardPlacementList(uint64 shardId);
extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByRelid(Oid relationId);