From 8cb47195baeaf795f2e2c67441702c56c7c73dd0 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 10 Jan 2017 00:38:16 -0800 Subject: [PATCH] Make LoadShardInterval() backed by the metadata cache. Doing so requires adding a mapping from shardId to the cache entries. For that metadata_cache.c now maintains an additional hashtable. That hashtable only references shard intervals in the dist table cache. --- .../distributed/utils/metadata_cache.c | 227 +++++++++++++++--- 1 file changed, 189 insertions(+), 38 deletions(-) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 190c66946..a786398d0 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -50,6 +50,33 @@ #include "utils/typcache.h" +/* + * ShardCacheEntry represents an entry in the shardId -> ShardInterval cache. + * To avoid duplicating data and invalidation logic between this cache and the + * DistTableCache, this only points into the DistTableCacheEntry of the + * shard's distributed table. + */ +typedef struct ShardCacheEntry +{ + /* hash key, needs to be first */ + int64 shardId; + + /* + * Cache entry for the distributed table a shard belongs to, possibly not + * valid. + */ + DistTableCacheEntry *tableEntry; + + /* + * Offset in tableEntry->sortedShardIntervalArray, only valid if + * tableEntry->isValid. We don't store pointers to the individual shard + * placements because that'd make invalidation a bit more complicated, and + * because there's simply no need. + */ + int shardIndex; +} ShardCacheEntry; + + /* state which should be cleared upon DROP EXTENSION */ static bool extensionLoaded = false; static Oid distShardRelationId = InvalidOid; @@ -74,6 +101,9 @@ static Oid extraDataContainerFuncId = InvalidOid; /* Hash table for informations about each partition */ static HTAB *DistTableCacheHash = NULL; +/* Hash table for informations about each shard */ +static HTAB *DistShardCacheHash = NULL; + /* Hash table for informations about worker nodes */ static HTAB *WorkerNodeHash = NULL; static bool workerNodeHashValid = false; @@ -89,6 +119,7 @@ static ScanKeyData DistShardScanKey[1]; /* local function forward declarations */ +static ShardCacheEntry * LookupShardCacheEntry(int64 shardId); static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId); static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void BuildCachedShardList(DistTableCacheEntry *cacheEntry); @@ -111,6 +142,7 @@ static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static List * LookupDistShardTuples(Oid relationId); +static Oid LookupShardRelation(int64 shardId); static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, Oid *intervalTypeId, int32 *intervalTypeMod); static ShardInterval * TupleToShardInterval(HeapTuple heapTuple, @@ -181,58 +213,104 @@ DistributedTableList(void) /* - * LoadShardInterval reads shard metadata for given shardId from pg_dist_shard, - * and converts min/max values in these metadata to their properly typed datum - * representations. The function then allocates a structure that stores the read - * and converted values, and returns this structure. + * LoadShardInterval returns the, cached, metadata about a shard. + * + * The return value is a copy of the cached ShardInterval struct and may + * therefore be modified and/or freed. */ ShardInterval * LoadShardInterval(uint64 shardId) { - ShardInterval *shardInterval; - SysScanDesc scanDescriptor = NULL; - ScanKeyData scanKey[1]; - int scanKeyCount = 1; - HeapTuple heapTuple = NULL; - Form_pg_dist_shard shardForm = NULL; - DistTableCacheEntry *partitionEntry; - Oid intervalTypeId = InvalidOid; - int32 intervalTypeMod = -1; + ShardInterval *shardInterval = NULL; + ShardInterval *sourceShardInterval = NULL; + ShardCacheEntry *shardEntry = NULL; + DistTableCacheEntry *tableEntry = NULL; - Relation pgDistShard = heap_open(DistShardRelationId(), AccessShareLock); - TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard); + shardEntry = LookupShardCacheEntry(shardId); - ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); + tableEntry = shardEntry->tableEntry; - scanDescriptor = systable_beginscan(pgDistShard, - DistShardShardidIndexId(), true, - NULL, scanKeyCount, scanKey); + Assert(tableEntry->isDistributedTable); - heapTuple = systable_getnext(scanDescriptor); - if (!HeapTupleIsValid(heapTuple)) - { - ereport(ERROR, (errmsg("could not find valid entry for shard " - UINT64_FORMAT, shardId))); - } + /* the offset better be in a valid range */ + Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength); - shardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple); - partitionEntry = DistributedTableCacheEntry(shardForm->logicalrelid); + sourceShardInterval = tableEntry->sortedShardIntervalArray[shardEntry->shardIndex]; - GetPartitionTypeInputInfo(partitionEntry->partitionKeyString, - partitionEntry->partitionMethod, &intervalTypeId, - &intervalTypeMod); - - shardInterval = TupleToShardInterval(heapTuple, tupleDescriptor, intervalTypeId, - intervalTypeMod); - - systable_endscan(scanDescriptor); - heap_close(pgDistShard, AccessShareLock); + /* copy value to return */ + shardInterval = (ShardInterval *) palloc0(sizeof(ShardInterval)); + CopyShardInterval(sourceShardInterval, shardInterval); return shardInterval; } +/* + * LookupShardCacheEntry returns the cache entry belonging to a shard, or + * errors out if that shard is unknown. + */ +static ShardCacheEntry * +LookupShardCacheEntry(int64 shardId) +{ + ShardCacheEntry *shardEntry = NULL; + bool foundInCache = false; + bool recheck = false; + + /* probably not reachable */ + if (DistShardCacheHash == NULL) + { + InitializeDistTableCache(); + } + + /* lookup cache entry */ + shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache); + + if (!foundInCache) + { + /* + * A possible reason for not finding an entry in the cache is that the + * distributed table's cache entry hasn't been accessed. Thus look up + * the distributed table, and build the cache entry. Afterwards we + * know that the shard has to be in the cache if it exists. If the + * shard does *not* exist LookupShardRelation() will error out. + */ + Oid relationId = LookupShardRelation(shardId); + + /* trigger building the cache for the shard id */ + LookupDistTableCacheEntry(relationId); + + recheck = true; + } + else if (!shardEntry->tableEntry->isValid) + { + /* + * The cache entry might not be valid right now. Reload cache entry + * and recheck (as the offset might have changed). + */ + LookupDistTableCacheEntry(shardEntry->tableEntry->relationId); + recheck = true; + } + + /* + * If we (re-)loaded the table cache, re-search the shard cache - the + * shard index might have changed. If we still can't find the entry, it + * can't exist. + */ + if (recheck) + { + shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache); + + if (!foundInCache) + { + ereport(ERROR, (errmsg("could not find valid entry for shard " + UINT64_FORMAT, shardId))); + } + } + + return shardEntry; +} + + /* * DistributedTableCacheEntry looks up a pg_dist_partition entry for a * relation. @@ -441,6 +519,7 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) FmgrInfo *shardIntervalCompareFunction = NULL; List *distShardTupleList = NIL; int shardIntervalArrayLength = 0; + int shardIndex = 0; distShardTupleList = LookupDistShardTuples(cacheEntry->relationId); shardIntervalArrayLength = list_length(distShardTupleList); @@ -539,6 +618,21 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) shardIntervalArrayLength); } + + /* maintain shardId->(table,ShardInterval) cache */ + for (shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++) + { + ShardCacheEntry *shardEntry = NULL; + ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex]; + bool foundInCache = false; + + shardEntry = hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_ENTER, + &foundInCache); + Assert(!foundInCache); + shardEntry->shardIndex = shardIndex; + shardEntry->tableEntry = cacheEntry; + } + cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction; @@ -1280,7 +1374,7 @@ InitializeDistTableCache(void) DistShardScanKey[0].sk_collation = InvalidOid; DistShardScanKey[0].sk_attno = Anum_pg_dist_shard_logicalrelid; - /* initialize the hash table */ + /* initialize the per-table hash table */ MemSet(&info, 0, sizeof(info)); info.keysize = sizeof(Oid); info.entrysize = sizeof(DistTableCacheEntry); @@ -1289,6 +1383,15 @@ InitializeDistTableCache(void) hash_create("Distributed Relation Cache", 32, &info, HASH_ELEM | HASH_FUNCTION); + /* initialize the per-shard hash table */ + MemSet(&info, 0, sizeof(info)); + info.keysize = sizeof(int64); + info.entrysize = sizeof(ShardCacheEntry); + info.hash = tag_hash; + DistShardCacheHash = + hash_create("Shard Cache", 32 * 64, &info, + HASH_ELEM | HASH_FUNCTION); + /* Watch for invalidation events. */ CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback, (Datum) 0); @@ -1520,7 +1623,14 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry) { ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[i]; bool valueByVal = shardInterval->valueByVal; + bool foundInCache = false; + /* delete per-shard cache-entry */ + hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_REMOVE, + &foundInCache); + Assert(foundInCache); + + /* delete data pointed to by ShardInterval */ if (!valueByVal) { if (shardInterval->minValueExists) @@ -1534,6 +1644,7 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry) } } + /* and finally the ShardInterval itself */ pfree(shardInterval); } @@ -1772,6 +1883,46 @@ LookupDistShardTuples(Oid relationId) } +/* + * LookupShardRelation returns the logical relation oid a shard belongs to. + * + * Errors out if the shardId does not exist. + */ +static Oid +LookupShardRelation(int64 shardId) +{ + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + HeapTuple heapTuple = NULL; + Form_pg_dist_shard shardForm = NULL; + Relation pgDistShard = heap_open(DistShardRelationId(), AccessShareLock); + Oid relationId = InvalidOid; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); + + scanDescriptor = systable_beginscan(pgDistShard, + DistShardShardidIndexId(), true, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for shard " + UINT64_FORMAT, shardId))); + } + + shardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple); + relationId = shardForm->logicalrelid; + + systable_endscan(scanDescriptor); + heap_close(pgDistShard, NoLock); + + return relationId; +} + + /* * GetPartitionTypeInputInfo populates output parameters with the interval type * identifier and modifier for the specified partition key/method combination.