diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 2ef5c6f2b..d629f7498 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -210,37 +210,8 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) distSelectJob->jobId); char *distResultPrefix = distResultPrefixString->data; - CitusTableCacheEntry *cachedTargetRelation = + CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId); - CitusTableCacheEntry *targetRelation = palloc(sizeof(CitusTableCacheEntry)); - *targetRelation = *cachedTargetRelation; - -#if PG_USE_ASSERT_CHECKING - - /* - * These fields aren't used in the code which follows, - * therefore in assert builds NULL these fields to - * segfault if they were to be used. - */ - targetRelation->partitionKeySTring = NULL; - targetRelation->shardIntervalCompareFunction = NULL; - targetRelation->hashFunction = NULL; - targetRelation->arrayOfPlacementArrayLengths = NULL; - targetRelation->arrayOfPlacementArrays = NULL; - targetRelation->referencedRelationViaForeignKey = NULL; - targetRelation->referencingRelationsViaForeignKey = NULL; -#endif - targetRelation->partitionColumn = copyObject( - cachedTargetRelation->partitionColumn); - targetRelation->sortedShardIntervalArray = - palloc(targetRelation->shardIntervalArrayLength * sizeof(ShardInterval)); - for (int shardIndex = 0; shardIndex < - targetRelation->shardIntervalArrayLength; shardIndex++) - { - targetRelation->sortedShardIntervalArray[shardIndex] = - CopyShardInterval( - cachedTargetRelation->sortedShardIntervalArray[shardIndex]); - } int partitionColumnIndex = PartitionColumnIndex(insertTargetList, targetRelation->partitionColumn); diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index e21a2c935..c0b645650 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -210,6 +210,11 @@ CitusExecutorRun(QueryDesc *queryDesc, } ExecutorLevel--; + + if (ExecutorLevel == 0 && PlannerLevel == 0) + { + CitusTableCacheFlushInvalidatedEntries(); + } } PG_CATCH(); { @@ -220,6 +225,11 @@ CitusExecutorRun(QueryDesc *queryDesc, ExecutorLevel--; + if (ExecutorLevel == 0 && PlannerLevel == 0) + { + CitusTableCacheFlushInvalidatedEntries(); + } + PG_RE_THROW(); } PG_END_TRY(); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index a79b43525..f7ba48911 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -73,6 +73,7 @@ #include "utils/rel.h" #include "utils/relfilenodemap.h" #include "utils/relmapper.h" +#include "utils/resowner.h" #include "utils/syscache.h" #include "utils/typcache.h" @@ -82,36 +83,30 @@ int ReadFromSecondaries = USE_SECONDARY_NODES_NEVER; /* - * ShardCacheEntry represents an entry in the shardId -> ShardInterval cache. - * To avoid duplicating data and invalidation logic between this cache and the - * DistTableCache, this only points into the CitusTableCacheEntry of the - * shard's distributed table. + * CitusTableCacheEntrySlot is entry type for DistTableCacheHash, + * entry data outlives slot on invalidation, so requires indirection. */ -typedef struct ShardCacheEntry +typedef struct CitusTableCacheEntrySlot { - /* hash key, needs to be first */ - int64 shardId; - - /* - * Cache entry for the distributed table a shard belongs to, possibly not - * valid. - */ - CitusTableCacheEntry *tableEntry; - - /* - * Offset in tableEntry->sortedShardIntervalArray, only valid if - * tableEntry->isValid. We don't store pointers to the individual shard - * placements because that'd make invalidation a bit more complicated, and - * because there's simply no need. - */ - int shardIndex; -} ShardCacheEntry; + /* lookup key - must be first. A pg_class.oid oid. */ + Oid relationId; + CitusTableCacheEntry *data; +} CitusTableCacheEntrySlot; /* - * State which should be cleared upon DROP EXTENSION. When the configuration - * changes, e.g. because the extension is dropped, these summarily get set to - * 0. + * ShardIdIndexSlot is entry type for CitusTableCacheEntry's ShardIdIndexHash. + */ +typedef struct ShardIdIndexSlot +{ + uint64 shardId; + int shardIndex; +} ShardIdIndexSlot; + + +/* + * State which should be cleared upon DROP EXTENSION. When the configuration + * changes, e.g. because extension is dropped, these summarily get set to 0. */ typedef struct MetadataCacheData { @@ -169,9 +164,9 @@ static bool citusVersionKnownCompatible = false; /* Hash table for informations about each partition */ static HTAB *DistTableCacheHash = NULL; +static List *DistTableCacheExpired = NIL; /* Hash table for informations about each shard */ -static HTAB *DistShardCacheHash = NULL; static MemoryContext MetadataCacheMemoryContext = NULL; /* Hash table for information about each object */ @@ -194,7 +189,10 @@ static ScanKeyData DistObjectScanKey[3]; /* local function forward declarations */ static bool IsCitusTableViaCatalog(Oid relationId); -static ShardCacheEntry * LookupShardCacheEntry(int64 shardId); +static ShardIdIndexSlot * LookupShardIdIndexSlot(ShardIdIndexSlot *table, + size_t tableSize, uint64 shardId); +static CitusTableCacheEntry * LookupShardIndex(int64 shardId, int *shardIndex); +static CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId); static CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId); static void BuildCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry); static void BuildCachedShardList(CitusTableCacheEntry *cacheEntry); @@ -210,6 +208,7 @@ static void InitializeWorkerNodeCache(void); static void RegisterForeignKeyGraphCacheCallbacks(void); static void RegisterWorkerNodeCacheCallbacks(void); static void RegisterLocalGroupIdCacheCallbacks(void); +static void RegisterCitusTableCacheEntryReleaseCallbacks(void); static uint32 WorkerNodeHashCode(const void *key, Size keySize); static void ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry); static void CreateDistTableCache(void); @@ -218,6 +217,8 @@ static void InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid rela static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); +static void CitusTableCacheEntryReleaseCallback(ResourceReleasePhase phase, bool isCommit, + bool isTopLevel, void *arg); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static List * LookupDistShardTuples(Oid relationId); static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, @@ -231,7 +232,8 @@ static void CachedRelationLookup(const char *relationName, Oid *cachedOid); static void CachedRelationNamespaceLookup(const char *relationName, Oid relnamespace, Oid *cachedOid); static ShardPlacement * ResolveGroupShardPlacement( - GroupShardPlacement *groupShardPlacement, ShardCacheEntry *shardEntry); + GroupShardPlacement *groupShardPlacement, CitusTableCacheEntry *tableEntry, + int shardIndex); static Oid LookupEnumValueId(Oid typeId, char *valueName); static void InvalidateDistTableCache(void); static void InvalidateDistObjectCache(void); @@ -365,17 +367,16 @@ CitusTableList(void) ShardInterval * LoadShardInterval(uint64 shardId) { - ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); - - CitusTableCacheEntry *tableEntry = shardEntry->tableEntry; + int shardIndex = 0; + CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex); Assert(tableEntry->isCitusTable); /* the offset better be in a valid range */ - Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength); + Assert(shardIndex < tableEntry->shardIntervalArrayLength); ShardInterval *sourceShardInterval = - tableEntry->sortedShardIntervalArray[shardEntry->shardIndex]; + tableEntry->sortedShardIntervalArray[shardIndex]; /* copy value to return */ ShardInterval *shardInterval = CopyShardInterval(sourceShardInterval); @@ -385,19 +386,12 @@ LoadShardInterval(uint64 shardId) /* - * RelationIdOfShard returns the relationId of the given - * shardId. + * RelationIdOfShard returns the relationId of the given shardId. */ Oid RelationIdForShard(uint64 shardId) { - ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); - - CitusTableCacheEntry *tableEntry = shardEntry->tableEntry; - - Assert(tableEntry->isCitusTable); - - return tableEntry->relationId; + return LookupShardRelation(shardId, false); } @@ -408,10 +402,11 @@ RelationIdForShard(uint64 shardId) bool ReferenceTableShardId(uint64 shardId) { - ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); - CitusTableCacheEntry *tableEntry = shardEntry->tableEntry; + Oid relationId = LookupShardRelation(shardId, false); + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); + char partitionMethod = cacheEntry->partitionMethod; - return (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE); + return partitionMethod == DISTRIBUTE_BY_NONE; } @@ -424,16 +419,16 @@ ReferenceTableShardId(uint64 shardId) GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId) { - ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); - CitusTableCacheEntry *tableEntry = shardEntry->tableEntry; + int shardIndex = 0; + CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex); /* the offset better be in a valid range */ - Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength); + Assert(shardIndex < tableEntry->shardIntervalArrayLength); GroupShardPlacement *placementArray = - tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex]; + tableEntry->arrayOfPlacementArrays[shardIndex]; int numberOfPlacements = - tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex]; + tableEntry->arrayOfPlacementArrayLengths[shardIndex]; for (int i = 0; i < numberOfPlacements; i++) { @@ -458,10 +453,11 @@ LoadGroupShardPlacement(uint64 shardId, uint64 placementId) ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId) { - ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); + int shardIndex = 0; + CitusTableCacheEntry *cacheEntry = LookupShardIndex(shardId, &shardIndex); GroupShardPlacement *groupPlacement = LoadGroupShardPlacement(shardId, placementId); ShardPlacement *nodePlacement = ResolveGroupShardPlacement(groupPlacement, - shardEntry); + cacheEntry, shardIndex); return nodePlacement; } @@ -477,12 +473,12 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId) { ShardPlacement *placementOnNode = NULL; - ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); - CitusTableCacheEntry *tableEntry = shardEntry->tableEntry; + int shardIndex = 0; + CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex); GroupShardPlacement *placementArray = - tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex]; + tableEntry->arrayOfPlacementArrays[shardIndex]; int numberOfPlacements = - tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex]; + tableEntry->arrayOfPlacementArrayLengths[shardIndex]; for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++) { @@ -490,7 +486,8 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId) if (placement->groupId == groupId) { - placementOnNode = ResolveGroupShardPlacement(placement, shardEntry); + placementOnNode = ResolveGroupShardPlacement(placement, tableEntry, + shardIndex); break; } } @@ -505,10 +502,9 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId) */ static ShardPlacement * ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, - ShardCacheEntry *shardEntry) + CitusTableCacheEntry *tableEntry, + int shardIndex) { - CitusTableCacheEntry *tableEntry = shardEntry->tableEntry; - int shardIndex = shardEntry->shardIndex; ShardInterval *shardInterval = tableEntry->sortedShardIntervalArray[shardIndex]; ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement); @@ -662,22 +658,23 @@ ShardPlacementList(uint64 shardId) { List *placementList = NIL; - ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); - CitusTableCacheEntry *tableEntry = shardEntry->tableEntry; + int shardIndex = 0; + CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex); /* the offset better be in a valid range */ - Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength); + Assert(shardIndex < tableEntry->shardIntervalArrayLength); GroupShardPlacement *placementArray = - tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex]; + tableEntry->arrayOfPlacementArrays[shardIndex]; int numberOfPlacements = - tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex]; + tableEntry->arrayOfPlacementArrayLengths[shardIndex]; for (int i = 0; i < numberOfPlacements; i++) { GroupShardPlacement *groupShardPlacement = &placementArray[i]; ShardPlacement *shardPlacement = ResolveGroupShardPlacement(groupShardPlacement, - shardEntry); + tableEntry, + shardIndex); placementList = lappend(placementList, shardPlacement); } @@ -694,82 +691,63 @@ ShardPlacementList(uint64 shardId) /* - * LookupShardCacheEntry returns the cache entry belonging to a shard, or - * errors out if that shard is unknown. + * LookupShardIdIndexSlot returns the hash entry mapping shard id to index. + * The returned slot's shardId will be 0 when not found. + * While initializing the caller will set shardId to the searched value, + * while searching the caller will ignore the slot. + * There should always be spare capacity; this fails to halt otherwise. */ -static ShardCacheEntry * -LookupShardCacheEntry(int64 shardId) +static ShardIdIndexSlot * +LookupShardIdIndexSlot(ShardIdIndexSlot *table, size_t tableSize, uint64 shardId) { - bool foundInCache = false; - bool recheck = false; + Assert(shardId); + size_t tableIndex = shardId % tableSize; + ShardIdIndexSlot *slot = table + tableIndex; + + while (slot->shardId != 0 && slot->shardId != shardId) + { + tableIndex++; + if (tableIndex >= tableSize) + { + tableIndex = 0; + } + + slot = table + tableIndex; + } + + return slot; +} + + +/* + * LookupShardIndex returns the cache entry belonging to a shard, or + * errors out if that shard is unknown. shardIndex is set to the index + * for the shardId in sortedShardIntervalArray. + */ +static CitusTableCacheEntry * +LookupShardIndex(int64 shardId, int *shardIndex) +{ Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING)); InitializeCaches(); - /* lookup cache entry */ - ShardCacheEntry *shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, - &foundInCache); + Oid relationId = LookupShardRelation(shardId, false); + CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); - if (!foundInCache) + ShardIdIndexSlot *idIndexSlot = + LookupShardIdIndexSlot(tableEntry->shardIdIndexHash, + tableEntry->shardIntervalArrayLength * 2, shardId); + + if (idIndexSlot->shardId != 0) { - /* - * A possible reason for not finding an entry in the cache is that the - * distributed table's cache entry hasn't been accessed. Thus look up - * the distributed table, and build the cache entry. Afterwards we - * know that the shard has to be in the cache if it exists. If the - * shard does *not* exist LookupShardRelation() will error out. - */ - Oid relationId = LookupShardRelation(shardId, false); - - /* trigger building the cache for the shard id */ - LookupCitusTableCacheEntry(relationId); - - recheck = true; - } - else - { - /* - * We might have some concurrent metadata changes. In order to get the changes, - * we first need to accept the cache invalidation messages. - */ - AcceptInvalidationMessages(); - - if (!shardEntry->tableEntry->isValid) - { - Oid oldRelationId = shardEntry->tableEntry->relationId; - Oid currentRelationId = LookupShardRelation(shardId, false); - - /* - * The relation OID to which the shard belongs could have changed, - * most notably when the extension is dropped and a shard ID is - * reused. Reload the cache entries for both old and new relation - * ID and then look up the shard entry again. - */ - LookupCitusTableCacheEntry(oldRelationId); - LookupCitusTableCacheEntry(currentRelationId); - - recheck = true; - } + *shardIndex = idIndexSlot->shardIndex; + return tableEntry; } - /* - * If we (re-)loaded the table cache, re-search the shard cache - the - * shard index might have changed. If we still can't find the entry, it - * can't exist. - */ - if (recheck) - { - shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache); - - if (!foundInCache) - { - ereport(ERROR, (errmsg("could not find valid entry for shard " - UINT64_FORMAT, shardId))); - } - } - - return shardEntry; + ereport(ERROR, (errmsg("could not find valid entry for shard " + UINT64_FORMAT " in table %d", shardId, relationId))); + return NULL; } @@ -855,31 +833,27 @@ LookupCitusTableCacheEntry(Oid relationId) } } - CitusTableCacheEntry *cacheEntry = hash_search(DistTableCacheHash, hashKey, - HASH_ENTER, - &foundInCache); + /* + * We might have some concurrent metadata changes. In order to get the changes, + * we first need to accept the cache invalidation messages. + */ + AcceptInvalidationMessages(); + CitusTableCacheEntrySlot *cacheSlot = + hash_search(DistTableCacheHash, hashKey, HASH_ENTER, &foundInCache); /* return valid matches */ if (foundInCache) { - /* - * We might have some concurrent metadata changes. In order to get the changes, - * we first need to accept the cache invalidation messages. - */ - AcceptInvalidationMessages(); - - if (cacheEntry->isValid) - { - return cacheEntry; - } - - /* free the content of old, invalid, entries */ - ResetCitusTableCacheEntry(cacheEntry); + Assert(cacheSlot->data->isValid); + return cacheSlot->data; } /* zero out entry, but not the key part */ - memset(((char *) cacheEntry) + sizeof(Oid), 0, - sizeof(CitusTableCacheEntry) - sizeof(Oid)); + memset(((char *) cacheSlot) + sizeof(Oid), 0, + sizeof(CitusTableCacheEntrySlot) - sizeof(Oid)); + cacheSlot->data = + MemoryContextAllocZero(MetadataCacheMemoryContext, sizeof(CitusTableCacheEntry)); + cacheSlot->data->relationId = relationId; /* * We disable interrupts while creating the cache entry because loading @@ -890,14 +864,14 @@ LookupCitusTableCacheEntry(Oid relationId) HOLD_INTERRUPTS(); /* actually fill out entry */ - BuildCitusTableCacheEntry(cacheEntry); + BuildCitusTableCacheEntry(cacheSlot->data); /* and finally mark as valid */ - cacheEntry->isValid = true; + cacheSlot->data->isValid = true; RESUME_INTERRUPTS(); - return cacheEntry; + return cacheSlot->data; } @@ -1182,25 +1156,6 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) } heap_close(distShardRelation, AccessShareLock); - - ShardInterval *firstShardInterval = shardIntervalArray[0]; - bool foundInCache = false; - ShardCacheEntry *shardEntry = hash_search(DistShardCacheHash, - &firstShardInterval->shardId, HASH_FIND, - &foundInCache); - if (foundInCache && shardEntry->tableEntry != cacheEntry) - { - /* - * Normally, all shard cache entries for a given DistTableEntry are removed - * before we get here. There is one exception: When a shard changes from - * one relation ID to another. That typically happens during metadata - * syncing when the distributed table is dropped and re-created without - * changing the shard IDs. That means that old relation no longer exists - * and we can safely wipe its entry, which will remove all corresponding - * shard cache entries. - */ - ResetCitusTableCacheEntry(shardEntry->tableEntry); - } } /* look up value comparison function */ @@ -1285,32 +1240,19 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) ErrorIfInconsistentShardIntervals(cacheEntry); } - /* - * We set these here, so ResetCitusTableCacheEntry() can see what has been - * entered into DistShardCacheHash even if the following loop is interrupted - * by throwing errors, etc. - */ cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; cacheEntry->shardIntervalArrayLength = 0; + cacheEntry->shardIdIndexHash = + MemoryContextAllocZero(MetadataCacheMemoryContext, + sizeof(ShardIdIndexSlot) * shardIntervalArrayLength * 2); + /* maintain shardId->(table,ShardInterval) cache */ for (int shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++) { ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex]; - bool foundInCache = false; int placementOffset = 0; - ShardCacheEntry *shardEntry = hash_search(DistShardCacheHash, - &shardInterval->shardId, HASH_ENTER, - &foundInCache); - if (foundInCache) - { - ereport(ERROR, (errmsg("cached metadata for shard " UINT64_FORMAT - " is inconsistent", - shardInterval->shardId), - errhint("Reconnect and try again."))); - } - /* * We should increment this only after we are sure this hasn't already * been assigned to any other relations. ResetCitusTableCacheEntry() @@ -1318,9 +1260,6 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) */ cacheEntry->shardIntervalArrayLength++; - shardEntry->shardIndex = shardIndex; - shardEntry->tableEntry = cacheEntry; - /* build list of shard placements */ List *placementList = BuildShardPlacementList(shardInterval); int numberOfPlacements = list_length(placementList); @@ -1342,6 +1281,13 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) /* store the shard index in the ShardInterval */ shardInterval->shardIndex = shardIndex; + + ShardIdIndexSlot *idIndexSlot = + LookupShardIdIndexSlot(cacheEntry->shardIdIndexHash, + shardIntervalArrayLength * 2, shardInterval->shardId); + Assert(idIndexSlot->shardId == 0); + idIndexSlot->shardId = shardInterval->shardId; + idIndexSlot->shardIndex = shardIndex; } cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction; @@ -2905,6 +2851,7 @@ InitializeCaches(void) RegisterForeignKeyGraphCacheCallbacks(); RegisterWorkerNodeCacheCallbacks(); RegisterLocalGroupIdCacheCallbacks(); + RegisterCitusTableCacheEntryReleaseCallbacks(); } PG_CATCH(); { @@ -2917,7 +2864,7 @@ InitializeCaches(void) MetadataCacheMemoryContext = NULL; DistTableCacheHash = NULL; - DistShardCacheHash = NULL; + DistTableCacheExpired = NIL; PG_RE_THROW(); } @@ -2930,8 +2877,6 @@ InitializeCaches(void) static void InitializeDistCache(void) { - HASHCTL info; - /* build initial scan keys, copied for every relation scan */ memset(&DistPartitionScanKey, 0, sizeof(DistPartitionScanKey)); @@ -2957,16 +2902,6 @@ InitializeDistCache(void) InitializeDistObjectCache(); - /* initialize the per-shard hash table */ - MemSet(&info, 0, sizeof(info)); - info.keysize = sizeof(int64); - info.entrysize = sizeof(ShardCacheEntry); - info.hash = tag_hash; - info.hcxt = MetadataCacheMemoryContext; - DistShardCacheHash = - hash_create("Shard Cache", 32 * 64, &info, - HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); - /* Watch for invalidation events. */ CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback, (Datum) 0); @@ -3170,6 +3105,17 @@ RegisterWorkerNodeCacheCallbacks(void) } +/* + * RegisterCitusTableCacheEntryReleaseCallbacks registers callbacks to release + * cache entries. Data should be locked by callers to avoid staleness. + */ +static void +RegisterCitusTableCacheEntryReleaseCallbacks(void) +{ + RegisterResourceReleaseCallback(CitusTableCacheEntryReleaseCallback, NULL); +} + + /* * GetLocalGroupId returns the group identifier of the local node. The function assumes * that pg_dist_local_node_group has exactly one row and has at least one column. @@ -3303,6 +3249,12 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry) cacheEntry->partitionColumn = NULL; } + if (cacheEntry->shardIdIndexHash != NULL) + { + pfree(cacheEntry->shardIdIndexHash); + cacheEntry->shardIdIndexHash = NULL; + } + if (cacheEntry->shardIntervalArrayLength == 0) { return; @@ -3315,7 +3267,6 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry) GroupShardPlacement *placementArray = cacheEntry->arrayOfPlacementArrays[shardIndex]; bool valueByVal = shardInterval->valueByVal; - bool foundInCache = false; /* delete the shard's placements */ if (placementArray != NULL) @@ -3323,11 +3274,6 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry) pfree(placementArray); } - /* delete per-shard cache-entry */ - hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_REMOVE, - &foundInCache); - Assert(foundInCache); - /* delete data pointed to by ShardInterval */ if (!valueByVal) { @@ -3376,6 +3322,8 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry) cacheEntry->hasUninitializedShardInterval = false; cacheEntry->hasUniformHashDistribution = false; cacheEntry->hasOverlappingShardInterval = false; + + pfree(cacheEntry); } @@ -3437,11 +3385,14 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) bool foundInCache = false; - CitusTableCacheEntry *cacheEntry = hash_search(DistTableCacheHash, hashKey, - HASH_FIND, &foundInCache); + CitusTableCacheEntrySlot *cacheSlot = + hash_search(DistTableCacheHash, hashKey, HASH_REMOVE, &foundInCache); if (foundInCache) { - cacheEntry->isValid = false; + cacheSlot->data->isValid = false; + MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext); + DistTableCacheExpired = lappend(DistTableCacheExpired, cacheSlot->data); + MemoryContextSwitchTo(oldContext); } /* @@ -3468,14 +3419,22 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) static void InvalidateDistTableCache(void) { - CitusTableCacheEntry *cacheEntry = NULL; + CitusTableCacheEntrySlot *cacheSlot = NULL; HASH_SEQ_STATUS status; hash_seq_init(&status, DistTableCacheHash); - while ((cacheEntry = (CitusTableCacheEntry *) hash_seq_search(&status)) != NULL) + while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL) { - cacheEntry->isValid = false; + bool foundInCache = false; + CitusTableCacheEntrySlot *removedSlot PG_USED_FOR_ASSERTS_ONLY = + hash_search(DistTableCacheHash, cacheSlot, HASH_REMOVE, &foundInCache); + Assert(removedSlot == cacheSlot); + + cacheSlot->data->isValid = false; + MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext); + DistTableCacheExpired = lappend(DistTableCacheExpired, cacheSlot->data); + MemoryContextSwitchTo(oldContext); } } @@ -3505,14 +3464,17 @@ InvalidateDistObjectCache(void) void FlushDistTableCache(void) { - CitusTableCacheEntry *cacheEntry = NULL; + CitusTableCacheEntrySlot *cacheSlot = NULL; HASH_SEQ_STATUS status; hash_seq_init(&status, DistTableCacheHash); - while ((cacheEntry = (CitusTableCacheEntry *) hash_seq_search(&status)) != NULL) + while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL) { - ResetCitusTableCacheEntry(cacheEntry); + if (cacheSlot->data->isValid) + { + ResetCitusTableCacheEntry(cacheSlot->data); + } } hash_destroy(DistTableCacheHash); @@ -3527,7 +3489,7 @@ CreateDistTableCache(void) HASHCTL info; MemSet(&info, 0, sizeof(info)); info.keysize = sizeof(Oid); - info.entrysize = sizeof(CitusTableCacheEntry); + info.entrysize = sizeof(CitusTableCacheEntrySlot); info.hash = tag_hash; info.hcxt = MetadataCacheMemoryContext; DistTableCacheHash = @@ -3686,6 +3648,41 @@ InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId) } +/* + * CitusTableCacheFlushInvalidatedEntries frees invalidated cache entries. + * Invalidated entries aren't freed immediately as callers expect their lifetime + * to extend beyond that scope. + */ +void +CitusTableCacheFlushInvalidatedEntries() +{ + if (DistTableCacheHash != NULL && DistTableCacheExpired != NIL) + { + CitusTableCacheEntry *cacheEntry = NULL; + foreach_ptr(cacheEntry, DistTableCacheExpired) + { + ResetCitusTableCacheEntry(cacheEntry); + } + list_free(DistTableCacheExpired); + DistTableCacheExpired = NIL; + } +} + + +/* + * CitusTableCacheEntryReleaseCallback frees invalidated cache entries. + */ +static void +CitusTableCacheEntryReleaseCallback(ResourceReleasePhase phase, bool isCommit, + bool isTopLevel, void *arg) +{ + if (isTopLevel && phase == RESOURCE_RELEASE_LOCKS) + { + CitusTableCacheFlushInvalidatedEntries(); + } +} + + /* * LookupDistPartitionTuple searches pg_dist_partition for relationId's entry * and returns that or, if no matching entry was found, NULL. diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 6a7c509b5..dad2db602 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -336,6 +336,8 @@ CitusMaintenanceDaemonMain(Datum main_arg) Assert(myDbData->workerPid == MyProcPid); + CitusTableCacheFlushInvalidatedEntries(); + /* * XXX: Each task should clear the metadata cache before every iteration * by calling InvalidateMetadataSystemCache(), because otherwise it diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index d6c9e4168..e8a2a7b19 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -37,6 +37,11 @@ extern int ReadFromSecondaries; */ #define GROUP_ID_UPGRADING -2 + +/* internal type used by metadata_cache.c to cache shard indexes */ +struct ShardIdIndexSlot; + + /* * Representation of a table's metadata that is frequently used for * distributed execution. Cached. @@ -68,6 +73,9 @@ typedef struct int shardIntervalArrayLength; ShardInterval **sortedShardIntervalArray; + /* map of shardId to index in sortedShardIntervalArray */ + struct ShardIdIndexSlot *shardIdIndexHash; + /* comparator for partition column's type, NULL if DISTRIBUTE_BY_NONE */ FmgrInfo *shardColumnCompareFunction; @@ -131,6 +139,7 @@ extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, extern int32 GetLocalGroupId(void); extern List * DistTableOidList(void); extern List * ReferenceTableOidList(void); +extern void CitusTableCacheFlushInvalidatedEntries(void); extern Oid LookupShardRelation(int64 shardId, bool missing_ok); extern List * ShardPlacementList(uint64 shardId); extern void CitusInvalidateRelcacheByRelid(Oid relationId);