diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index b2382485b..b89da1af8 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -213,6 +213,11 @@ CitusExecutorRun(QueryDesc *queryDesc, if (ExecutorLevel == 0 && PlannerLevel == 0) { + /* + * We are leaving Citus code so no one should have any references to + * cache entries. Release them now to not hold onto memory in long + * transactions. + */ CitusTableCacheFlushInvalidatedEntries(); } } @@ -225,11 +230,6 @@ CitusExecutorRun(QueryDesc *queryDesc, ExecutorLevel--; - if (ExecutorLevel == 0 && PlannerLevel == 0) - { - CitusTableCacheFlushInvalidatedEntries(); - } - PG_RE_THROW(); } PG_END_TRY(); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index af8235589..2537593af 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -90,18 +90,34 @@ typedef struct CitusTableCacheEntrySlot { /* lookup key - must be first. A pg_class.oid oid. */ Oid relationId; - CitusTableCacheEntry *data; + + /* Citus table metadata (NULL for local tables) */ + CitusTableCacheEntry *citusTableMetadata; + + /* + * If isValid is false, we need to recheck whether the relation ID + * belongs to a Citus or not. + */ + bool isValid; } CitusTableCacheEntrySlot; /* - * ShardIdIndexSlot is entry type for CitusTableCacheEntry's ShardIdIndexHash. + * ShardIdCacheEntry is the entry type for ShardIdCacheHash. + * + * This should never be used outside of this file. Use ShardInterval instead. */ -typedef struct ShardIdIndexSlot +typedef struct ShardIdCacheEntry { + /* hash key, needs to be first */ uint64 shardId; + + /* pointer to the table entry to which this shard currently belongs */ + CitusTableCacheEntry *tableEntry; + + /* index of the shard interval in the sortedShardIntervalArray of the table entry */ int shardIndex; -} ShardIdIndexSlot; +} ShardIdCacheEntry; /* @@ -167,6 +183,8 @@ static HTAB *DistTableCacheHash = NULL; static List *DistTableCacheExpired = NIL; /* Hash table for informations about each shard */ +static HTAB *ShardIdCacheHash = NULL; + static MemoryContext MetadataCacheMemoryContext = NULL; /* Hash table for information about each object */ @@ -189,12 +207,9 @@ static ScanKeyData DistObjectScanKey[3]; /* local function forward declarations */ static bool IsCitusTableViaCatalog(Oid relationId); -static ShardIdIndexSlot * LookupShardIdIndexSlot(ShardIdIndexSlot *table, - size_t tableSize, uint64 shardId); -static CitusTableCacheEntry * LookupShardIndex(int64 shardId, int *shardIndex); +static ShardIdCacheEntry * LookupShardIdCacheEntry(int64 shardId); static CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId); -static CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId); -static void BuildCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry); +static CitusTableCacheEntry * BuildCitusTableCacheEntry(Oid relationId); static void BuildCachedShardList(CitusTableCacheEntry *cacheEntry); static void PrepareWorkerNodeCache(void); static bool CheckInstalledVersion(int elevel); @@ -211,7 +226,9 @@ static void RegisterLocalGroupIdCacheCallbacks(void); static void RegisterCitusTableCacheEntryReleaseCallbacks(void); static uint32 WorkerNodeHashCode(const void *key, Size keySize); static void ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry); +static void RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *tableEntry); static void CreateDistTableCache(void); +static void CreateShardIdCache(void); static void CreateDistObjectCache(void); static void InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); @@ -235,8 +252,11 @@ static ShardPlacement * ResolveGroupShardPlacement( GroupShardPlacement *groupShardPlacement, CitusTableCacheEntry *tableEntry, int shardIndex); static Oid LookupEnumValueId(Oid typeId, char *valueName); +static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSlot); static void InvalidateDistTableCache(void); static void InvalidateDistObjectCache(void); +static void InitializeTableCacheEntry(int64 shardId); +static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry); /* exports for SQL callable functions */ @@ -280,18 +300,7 @@ EnsureModificationsCanRun(void) bool IsCitusTable(Oid relationId) { - CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(relationId); - - /* - * If extension hasn't been created, or has the wrong version and the - * table isn't a distributed one, LookupCitusTableCacheEntry() will return NULL. - */ - if (!cacheEntry) - { - return false; - } - - return cacheEntry->isCitusTable; + return LookupCitusTableCacheEntry(relationId) != NULL; } @@ -349,7 +358,6 @@ CitusTableList(void) foreach_oid(relationId, distTableOidList) { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - Assert(cacheEntry->isCitusTable); distributedTableList = lappend(distributedTableList, cacheEntry); } @@ -367,10 +375,9 @@ CitusTableList(void) ShardInterval * LoadShardInterval(uint64 shardId) { - int shardIndex = 0; - CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex); - - Assert(tableEntry->isCitusTable); + ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); + CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; + int shardIndex = shardIdEntry->shardIndex; /* the offset better be in a valid range */ Assert(shardIndex < tableEntry->shardIntervalArrayLength); @@ -391,7 +398,9 @@ LoadShardInterval(uint64 shardId) Oid RelationIdForShard(uint64 shardId) { - return LookupShardRelation(shardId, false); + ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); + CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; + return tableEntry->relationId; } @@ -402,9 +411,9 @@ RelationIdForShard(uint64 shardId) bool ReferenceTableShardId(uint64 shardId) { - Oid relationId = LookupShardRelation(shardId, false); - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - char partitionMethod = cacheEntry->partitionMethod; + ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); + CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; + char partitionMethod = tableEntry->partitionMethod; return partitionMethod == DISTRIBUTE_BY_NONE; } @@ -419,8 +428,9 @@ ReferenceTableShardId(uint64 shardId) GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId) { - int shardIndex = 0; - CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex); + ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); + CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; + int shardIndex = shardIdEntry->shardIndex; /* the offset better be in a valid range */ Assert(shardIndex < tableEntry->shardIntervalArrayLength); @@ -453,11 +463,12 @@ LoadGroupShardPlacement(uint64 shardId, uint64 placementId) ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId) { - int shardIndex = 0; - CitusTableCacheEntry *cacheEntry = LookupShardIndex(shardId, &shardIndex); + ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); + CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; + int shardIndex = shardIdEntry->shardIndex; GroupShardPlacement *groupPlacement = LoadGroupShardPlacement(shardId, placementId); ShardPlacement *nodePlacement = ResolveGroupShardPlacement(groupPlacement, - cacheEntry, shardIndex); + tableEntry, shardIndex); return nodePlacement; } @@ -473,8 +484,9 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId) { ShardPlacement *placementOnNode = NULL; - int shardIndex = 0; - CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex); + ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); + CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; + int shardIndex = shardIdEntry->shardIndex; GroupShardPlacement *placementArray = tableEntry->arrayOfPlacementArrays[shardIndex]; int numberOfPlacements = @@ -658,8 +670,9 @@ ShardPlacementList(uint64 shardId) { List *placementList = NIL; - int shardIndex = 0; - CitusTableCacheEntry *tableEntry = LookupShardIndex(shardId, &shardIndex); + ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); + CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; + int shardIndex = shardIdEntry->shardIndex; /* the offset better be in a valid range */ Assert(shardIndex < tableEntry->shardIntervalArrayLength); @@ -691,63 +704,100 @@ ShardPlacementList(uint64 shardId) /* - * LookupShardIdIndexSlot returns the hash entry mapping shard id to index. - * The returned slot's shardId will be 0 when not found. - * While initializing the caller will set shardId to the searched value, - * while searching the caller will ignore the slot. - * There should always be spare capacity; this fails to halt otherwise. + * InitializeTableCacheEntry initializes a shard in cache. A possible reason + * for not finding an entry in the cache is that the distributed table's cache + * entry hasn't been accessed yet. Thus look up the distributed table, and + * build the cache entry. Afterwards we know that the shard has to be in the + * cache if it exists. If the shard does *not* exist, this function errors + * (because LookupShardRelationFromCatalog errors out). */ -static ShardIdIndexSlot * -LookupShardIdIndexSlot(ShardIdIndexSlot *table, size_t tableSize, uint64 shardId) +static void +InitializeTableCacheEntry(int64 shardId) { - Assert(shardId); + bool missingOk = false; + Oid relationId = LookupShardRelationFromCatalog(shardId, missingOk); - size_t tableIndex = shardId % tableSize; - ShardIdIndexSlot *slot = table + tableIndex; - - while (slot->shardId != 0 && slot->shardId != shardId) - { - tableIndex++; - if (tableIndex >= tableSize) - { - tableIndex = 0; - } - - slot = table + tableIndex; - } - - return slot; + /* trigger building the cache for the shard id */ + GetCitusTableCacheEntry(relationId); } /* - * LookupShardIndex returns the cache entry belonging to a shard, or - * errors out if that shard is unknown. shardIndex is set to the index - * for the shardId in sortedShardIntervalArray. + * RefreshInvalidTableCacheEntry checks if the cache entry is still valid and + * refreshes it in cache when it's not. It returns true if it refreshed the + * entry in the cache and false if it didn't. */ -static CitusTableCacheEntry * -LookupShardIndex(int64 shardId, int *shardIndex) +static bool +RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry) { + /* + * We might have some concurrent metadata changes. In order to get the changes, + * we first need to accept the cache invalidation messages. + */ + AcceptInvalidationMessages(); + if (shardEntry->tableEntry->isValid) + { + return false; + } + Oid oldRelationId = shardEntry->tableEntry->relationId; + Oid currentRelationId = LookupShardRelationFromCatalog(shardEntry->shardId, false); + + /* + * The relation OID to which the shard belongs could have changed, + * most notably when the extension is dropped and a shard ID is + * reused. Reload the cache entries for both old and new relation + * ID and then look up the shard entry again. + */ + LookupCitusTableCacheEntry(oldRelationId); + LookupCitusTableCacheEntry(currentRelationId); + return true; +} + + +/* + * LookupShardCacheEntry returns the cache entry belonging to a shard, or + * errors out if that shard is unknown. + */ +static ShardIdCacheEntry * +LookupShardIdCacheEntry(int64 shardId) +{ + bool foundInCache = false; + bool recheck = false; + Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING)); InitializeCaches(); - Oid relationId = LookupShardRelation(shardId, false); - CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); + ShardIdCacheEntry *shardEntry = + hash_search(ShardIdCacheHash, &shardId, HASH_FIND, &foundInCache); - ShardIdIndexSlot *idIndexSlot = - LookupShardIdIndexSlot(tableEntry->shardIdIndexHash, - tableEntry->shardIntervalArrayLength * 2, shardId); - - if (idIndexSlot->shardId != 0) + if (!foundInCache) { - *shardIndex = idIndexSlot->shardIndex; - return tableEntry; + InitializeTableCacheEntry(shardId); + recheck = true; + } + else + { + recheck = RefreshTableCacheEntryIfInvalid(shardEntry); } - ereport(ERROR, (errmsg("could not find valid entry for shard " - UINT64_FORMAT " in table %d", shardId, relationId))); - return NULL; + /* + * If we (re-)loaded the table cache, re-search the shard cache - the + * shard index might have changed. If we still can't find the entry, it + * can't exist. + */ + if (recheck) + { + shardEntry = hash_search(ShardIdCacheHash, &shardId, HASH_FIND, &foundInCache); + + if (!foundInCache) + { + ereport(ERROR, (errmsg("could not find valid entry for shard " + UINT64_FORMAT, shardId))); + } + } + + return shardEntry; } @@ -763,7 +813,7 @@ GetCitusTableCacheEntry(Oid distributedRelationId) CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(distributedRelationId); - if (cacheEntry && cacheEntry->isCitusTable) + if (cacheEntry) { return cacheEntry; } @@ -786,7 +836,8 @@ GetCitusTableCacheEntry(Oid distributedRelationId) /* * GetCitusTableCacheEntry returns the distributed table metadata for the - * passed relationId. For efficiency it caches lookups. + * passed relationId. For efficiency it caches lookups. This function returns + * NULL if the relation isn't a distributed table. */ static CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId) @@ -844,16 +895,37 @@ LookupCitusTableCacheEntry(Oid relationId) /* return valid matches */ if (foundInCache) { - Assert(cacheSlot->data->isValid); - return cacheSlot->data; + if (cacheSlot->isValid) + { + return cacheSlot->citusTableMetadata; + } + else + { + /* + * An invalidation was received or we encountered an OOM while building + * the cache entry. We need to rebuild it. + */ + + if (cacheSlot->citusTableMetadata) + { + /* + * The CitusTableCacheEntry might still be in use. We therefore do + * not reset it until the end of the transaction. + */ + MemoryContext oldContext = + MemoryContextSwitchTo(MetadataCacheMemoryContext); + + DistTableCacheExpired = lappend(DistTableCacheExpired, + cacheSlot->citusTableMetadata); + + MemoryContextSwitchTo(oldContext); + } + } } /* zero out entry, but not the key part */ memset(((char *) cacheSlot) + sizeof(Oid), 0, sizeof(CitusTableCacheEntrySlot) - sizeof(Oid)); - cacheSlot->data = - MemoryContextAllocZero(MetadataCacheMemoryContext, sizeof(CitusTableCacheEntry)); - cacheSlot->data->relationId = relationId; /* * We disable interrupts while creating the cache entry because loading @@ -863,15 +935,17 @@ LookupCitusTableCacheEntry(Oid relationId) */ HOLD_INTERRUPTS(); - /* actually fill out entry */ - BuildCitusTableCacheEntry(cacheSlot->data); + cacheSlot->citusTableMetadata = BuildCitusTableCacheEntry(relationId); - /* and finally mark as valid */ - cacheSlot->data->isValid = true; + /* + * Mark it as valid only after building the full entry, such that any + * error that happened during the build would trigger a rebuild. + */ + cacheSlot->isValid = true; RESUME_INTERRUPTS(); - return cacheSlot->data; + return cacheSlot->citusTableMetadata; } @@ -980,31 +1054,34 @@ LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid) /* * BuildCitusTableCacheEntry is a helper routine for * LookupCitusTableCacheEntry() for building the cache contents. + * This function returns NULL if the relation isn't a distributed table. */ -static void -BuildCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry) +static CitusTableCacheEntry * +BuildCitusTableCacheEntry(Oid relationId) { + Relation pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock); + HeapTuple distPartitionTuple = + LookupDistPartitionTuple(pgDistPartition, relationId); + + if (distPartitionTuple == NULL) + { + /* not a distributed table, done */ + heap_close(pgDistPartition, NoLock); + return NULL; + } + MemoryContext oldContext = NULL; Datum datumArray[Natts_pg_dist_partition]; bool isNullArray[Natts_pg_dist_partition]; - Relation pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock); - HeapTuple distPartitionTuple = - LookupDistPartitionTuple(pgDistPartition, cacheEntry->relationId); - - /* not a distributed table, done */ - if (distPartitionTuple == NULL) - { - cacheEntry->isCitusTable = false; - heap_close(pgDistPartition, NoLock); - return; - } - - cacheEntry->isCitusTable = true; - TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); heap_deform_tuple(distPartitionTuple, tupleDescriptor, datumArray, isNullArray); + CitusTableCacheEntry *cacheEntry = + MemoryContextAllocZero(MetadataCacheMemoryContext, sizeof(CitusTableCacheEntry)); + + cacheEntry->relationId = relationId; + cacheEntry->partitionMethod = datumArray[Anum_pg_dist_partition_partmethod - 1]; Datum partitionKeyDatum = datumArray[Anum_pg_dist_partition_partkey - 1]; bool partitionKeyIsNull = isNullArray[Anum_pg_dist_partition_partkey - 1]; @@ -1090,6 +1167,10 @@ BuildCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry) MemoryContextSwitchTo(oldContext); heap_close(pgDistPartition, NoLock); + + cacheEntry->isValid = true; + + return cacheEntry; } @@ -1243,16 +1324,23 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; cacheEntry->shardIntervalArrayLength = 0; - cacheEntry->shardIdIndexHash = - MemoryContextAllocZero(MetadataCacheMemoryContext, - sizeof(ShardIdIndexSlot) * shardIntervalArrayLength * 2); - /* maintain shardId->(table,ShardInterval) cache */ for (int shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++) { ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex]; + int64 shardId = shardInterval->shardId; int placementOffset = 0; + /* + * Enable quick lookups of this shard ID by adding it to ShardIdCacheHash + * or overwriting the previous values. + */ + ShardIdCacheEntry *shardIdCacheEntry = + hash_search(ShardIdCacheHash, &shardId, HASH_ENTER, NULL); + + shardIdCacheEntry->tableEntry = cacheEntry; + shardIdCacheEntry->shardIndex = shardIndex; + /* * We should increment this only after we are sure this hasn't already * been assigned to any other relations. ResetCitusTableCacheEntry() @@ -1281,13 +1369,6 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) /* store the shard index in the ShardInterval */ shardInterval->shardIndex = shardIndex; - - ShardIdIndexSlot *idIndexSlot = - LookupShardIdIndexSlot(cacheEntry->shardIdIndexHash, - shardIntervalArrayLength * 2, shardInterval->shardId); - Assert(idIndexSlot->shardId == 0); - idIndexSlot->shardId = shardInterval->shardId; - idIndexSlot->shardIndex = shardIndex; } cacheEntry->shardColumnCompareFunction = shardColumnCompareFunction; @@ -2865,6 +2946,7 @@ InitializeCaches(void) MetadataCacheMemoryContext = NULL; DistTableCacheHash = NULL; DistTableCacheExpired = NIL; + ShardIdCacheHash = NULL; PG_RE_THROW(); } @@ -2899,6 +2981,7 @@ InitializeDistCache(void) DistShardScanKey[0].sk_attno = Anum_pg_dist_shard_logicalrelid; CreateDistTableCache(); + CreateShardIdCache(); InitializeDistObjectCache(); @@ -3249,17 +3332,14 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry) cacheEntry->partitionColumn = NULL; } - if (cacheEntry->shardIdIndexHash != NULL) - { - pfree(cacheEntry->shardIdIndexHash); - cacheEntry->shardIdIndexHash = NULL; - } - if (cacheEntry->shardIntervalArrayLength == 0) { return; } + /* clean up ShardIdCacheHash */ + RemoveStaleShardIdCacheEntries(cacheEntry); + for (int shardIndex = 0; shardIndex < cacheEntry->shardIntervalArrayLength; shardIndex++) { @@ -3327,6 +3407,35 @@ ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry) } +/* + * RemoveShardIdCacheEntries removes all shard ID cache entries belonging to the + * given table entry. If the shard ID belongs to a different (newer) table entry, + * we leave it in place. + */ +static void +RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *invalidatedTableEntry) +{ + int shardIndex = 0; + int shardCount = invalidatedTableEntry->shardIntervalArrayLength; + + for (shardIndex = 0; shardIndex < shardCount; shardIndex++) + { + ShardInterval *shardInterval = + invalidatedTableEntry->sortedShardIntervalArray[shardIndex]; + int64 shardId = shardInterval->shardId; + bool foundInCache = false; + + ShardIdCacheEntry *shardIdCacheEntry = + hash_search(ShardIdCacheHash, &shardId, HASH_FIND, &foundInCache); + + if (foundInCache && shardIdCacheEntry->tableEntry == invalidatedTableEntry) + { + hash_search(ShardIdCacheHash, &shardId, HASH_REMOVE, &foundInCache); + } + } +} + + /* * InvalidateForeignRelationGraphCacheCallback invalidates the foreign key relation * graph and entire distributed cache entries. @@ -3384,15 +3493,11 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) void *hashKey = (void *) &relationId; bool foundInCache = false; - CitusTableCacheEntrySlot *cacheSlot = - hash_search(DistTableCacheHash, hashKey, HASH_REMOVE, &foundInCache); + hash_search(DistTableCacheHash, hashKey, HASH_FIND, &foundInCache); if (foundInCache) { - cacheSlot->data->isValid = false; - MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext); - DistTableCacheExpired = lappend(DistTableCacheExpired, cacheSlot->data); - MemoryContextSwitchTo(oldContext); + InvalidateCitusTableCacheEntrySlot(cacheSlot); } /* @@ -3413,6 +3518,25 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) } +/* + * InvalidateCitusTableCacheEntrySlot marks a CitusTableCacheEntrySlot as invalid, + * meaning it needs to be rebuilt and the citusTableMetadata (if any) should be + * released. + */ +static void +InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSlot) +{ + /* recheck whether this is a distributed table */ + cacheSlot->isValid = false; + + if (cacheSlot->citusTableMetadata != NULL) + { + /* reload the metadata */ + cacheSlot->citusTableMetadata->isValid = false; + } +} + + /* * InvalidateDistTableCache marks all DistTableCacheHash entries invalid. */ @@ -3426,15 +3550,7 @@ InvalidateDistTableCache(void) while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL) { - bool foundInCache = false; - CitusTableCacheEntrySlot *removedSlot PG_USED_FOR_ASSERTS_ONLY = - hash_search(DistTableCacheHash, cacheSlot, HASH_REMOVE, &foundInCache); - Assert(removedSlot == cacheSlot); - - cacheSlot->data->isValid = false; - MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext); - DistTableCacheExpired = lappend(DistTableCacheExpired, cacheSlot->data); - MemoryContextSwitchTo(oldContext); + InvalidateCitusTableCacheEntrySlot(cacheSlot); } } @@ -3471,14 +3587,13 @@ FlushDistTableCache(void) while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL) { - if (cacheSlot->data->isValid) - { - ResetCitusTableCacheEntry(cacheSlot->data); - } + ResetCitusTableCacheEntry(cacheSlot->citusTableMetadata); } hash_destroy(DistTableCacheHash); + hash_destroy(ShardIdCacheHash); CreateDistTableCache(); + CreateShardIdCache(); } @@ -3498,6 +3613,22 @@ CreateDistTableCache(void) } +/* CreateShardIdCache initializes the shard ID mapping */ +static void +CreateShardIdCache(void) +{ + HASHCTL info; + MemSet(&info, 0, sizeof(info)); + info.keysize = sizeof(int64); + info.entrysize = sizeof(ShardIdCacheEntry); + info.hash = tag_hash; + info.hcxt = MetadataCacheMemoryContext; + ShardIdCacheHash = + hash_create("Shard Id Cache", 128, &info, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); +} + + /* CreateDistObjectCache initializes the per-object hash table */ static void CreateDistObjectCache(void) @@ -3754,13 +3885,13 @@ LookupDistShardTuples(Oid relationId) /* - * LookupShardRelation returns the logical relation oid a shard belongs to. + * LookupShardRelationFromCatalog returns the logical relation oid a shard belongs to. * * Errors out if the shardId does not exist and missingOk is false. * Returns InvalidOid if the shardId does not exist and missingOk is true. */ Oid -LookupShardRelation(int64 shardId, bool missingOk) +LookupShardRelationFromCatalog(int64 shardId, bool missingOk) { ScanKeyData scanKey[1]; int scanKeyCount = 1; diff --git a/src/backend/distributed/worker/worker_shard_visibility.c b/src/backend/distributed/worker/worker_shard_visibility.c index 78fe44118..4ea33f8b8 100644 --- a/src/backend/distributed/worker/worker_shard_visibility.c +++ b/src/backend/distributed/worker/worker_shard_visibility.c @@ -171,7 +171,7 @@ RelationIsAKnownShard(Oid shardRelationId, bool onlySearchPath) } /* try to get the relation id */ - Oid relationId = LookupShardRelation(shardId, true); + Oid relationId = LookupShardRelationFromCatalog(shardId, true); if (!OidIsValid(relationId)) { /* there is no such relation */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 657a71286..9e28dadd8 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -38,10 +38,6 @@ extern int ReadFromSecondaries; #define GROUP_ID_UPGRADING -2 -/* internal type used by metadata_cache.c to cache shard indexes */ -struct ShardIdIndexSlot; - - /* * Representation of a table's metadata that is frequently used for * distributed execution. Cached. @@ -57,7 +53,6 @@ typedef struct */ bool isValid; - bool isCitusTable; bool hasUninitializedShardInterval; bool hasUniformHashDistribution; /* valid for hash partitioned tables */ bool hasOverlappingShardInterval; @@ -73,9 +68,6 @@ typedef struct int shardIntervalArrayLength; ShardInterval **sortedShardIntervalArray; - /* map of shardId to index in sortedShardIntervalArray */ - struct ShardIdIndexSlot *shardIdIndexHash; - /* comparator for partition column's type, NULL if DISTRIBUTE_BY_NONE */ FmgrInfo *shardColumnCompareFunction; @@ -140,7 +132,7 @@ extern int32 GetLocalGroupId(void); extern List * DistTableOidList(void); extern List * ReferenceTableOidList(void); extern void CitusTableCacheFlushInvalidatedEntries(void); -extern Oid LookupShardRelation(int64 shardId, bool missing_ok); +extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok); extern List * ShardPlacementList(uint64 shardId); extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByShardId(int64 shardId);