Make LoadShardInterval() backed by the metadata cache.

Doing so requires adding a mapping from shardId to the cache
entries. For that metadata_cache.c now maintains an additional
hashtable. That hashtable only references shard intervals in the
dist table cache.
pull/1113/head
Andres Freund 2017-01-10 00:38:16 -08:00
parent f6e8647337
commit 8cb47195ba
1 changed files with 189 additions and 38 deletions

View File

@ -50,6 +50,33 @@
#include "utils/typcache.h" #include "utils/typcache.h"
/*
* ShardCacheEntry represents an entry in the shardId -> ShardInterval cache.
* To avoid duplicating data and invalidation logic between this cache and the
* DistTableCache, this only points into the DistTableCacheEntry of the
* shard's distributed table.
*/
typedef struct ShardCacheEntry
{
/* hash key, needs to be first */
int64 shardId;
/*
* Cache entry for the distributed table a shard belongs to, possibly not
* valid.
*/
DistTableCacheEntry *tableEntry;
/*
* Offset in tableEntry->sortedShardIntervalArray, only valid if
* tableEntry->isValid. We don't store pointers to the individual shard
* placements because that'd make invalidation a bit more complicated, and
* because there's simply no need.
*/
int shardIndex;
} ShardCacheEntry;
/* state which should be cleared upon DROP EXTENSION */ /* state which should be cleared upon DROP EXTENSION */
static bool extensionLoaded = false; static bool extensionLoaded = false;
static Oid distShardRelationId = InvalidOid; static Oid distShardRelationId = InvalidOid;
@ -74,6 +101,9 @@ static Oid extraDataContainerFuncId = InvalidOid;
/* Hash table for informations about each partition */ /* Hash table for informations about each partition */
static HTAB *DistTableCacheHash = NULL; static HTAB *DistTableCacheHash = NULL;
/* Hash table for informations about each shard */
static HTAB *DistShardCacheHash = NULL;
/* Hash table for informations about worker nodes */ /* Hash table for informations about worker nodes */
static HTAB *WorkerNodeHash = NULL; static HTAB *WorkerNodeHash = NULL;
static bool workerNodeHashValid = false; static bool workerNodeHashValid = false;
@ -89,6 +119,7 @@ static ScanKeyData DistShardScanKey[1];
/* local function forward declarations */ /* local function forward declarations */
static ShardCacheEntry * LookupShardCacheEntry(int64 shardId);
static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId); static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId);
static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
static void BuildCachedShardList(DistTableCacheEntry *cacheEntry); static void BuildCachedShardList(DistTableCacheEntry *cacheEntry);
@ -111,6 +142,7 @@ static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId);
static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId);
static List * LookupDistShardTuples(Oid relationId); static List * LookupDistShardTuples(Oid relationId);
static Oid LookupShardRelation(int64 shardId);
static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
Oid *intervalTypeId, int32 *intervalTypeMod); Oid *intervalTypeId, int32 *intervalTypeMod);
static ShardInterval * TupleToShardInterval(HeapTuple heapTuple, static ShardInterval * TupleToShardInterval(HeapTuple heapTuple,
@ -181,58 +213,104 @@ DistributedTableList(void)
/* /*
* LoadShardInterval reads shard metadata for given shardId from pg_dist_shard, * LoadShardInterval returns the, cached, metadata about a shard.
* and converts min/max values in these metadata to their properly typed datum *
* representations. The function then allocates a structure that stores the read * The return value is a copy of the cached ShardInterval struct and may
* and converted values, and returns this structure. * therefore be modified and/or freed.
*/ */
ShardInterval * ShardInterval *
LoadShardInterval(uint64 shardId) LoadShardInterval(uint64 shardId)
{ {
ShardInterval *shardInterval; ShardInterval *shardInterval = NULL;
SysScanDesc scanDescriptor = NULL; ShardInterval *sourceShardInterval = NULL;
ScanKeyData scanKey[1]; ShardCacheEntry *shardEntry = NULL;
int scanKeyCount = 1; DistTableCacheEntry *tableEntry = NULL;
HeapTuple heapTuple = NULL;
Form_pg_dist_shard shardForm = NULL;
DistTableCacheEntry *partitionEntry;
Oid intervalTypeId = InvalidOid;
int32 intervalTypeMod = -1;
Relation pgDistShard = heap_open(DistShardRelationId(), AccessShareLock); shardEntry = LookupShardCacheEntry(shardId);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, tableEntry = shardEntry->tableEntry;
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
scanDescriptor = systable_beginscan(pgDistShard, Assert(tableEntry->isDistributedTable);
DistShardShardidIndexId(), true,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor); /* the offset better be in a valid range */
if (!HeapTupleIsValid(heapTuple)) Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength);
{
ereport(ERROR, (errmsg("could not find valid entry for shard "
UINT64_FORMAT, shardId)));
}
shardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple); sourceShardInterval = tableEntry->sortedShardIntervalArray[shardEntry->shardIndex];
partitionEntry = DistributedTableCacheEntry(shardForm->logicalrelid);
GetPartitionTypeInputInfo(partitionEntry->partitionKeyString, /* copy value to return */
partitionEntry->partitionMethod, &intervalTypeId, shardInterval = (ShardInterval *) palloc0(sizeof(ShardInterval));
&intervalTypeMod); CopyShardInterval(sourceShardInterval, shardInterval);
shardInterval = TupleToShardInterval(heapTuple, tupleDescriptor, intervalTypeId,
intervalTypeMod);
systable_endscan(scanDescriptor);
heap_close(pgDistShard, AccessShareLock);
return shardInterval; return shardInterval;
} }
/*
* LookupShardCacheEntry returns the cache entry belonging to a shard, or
* errors out if that shard is unknown.
*/
static ShardCacheEntry *
LookupShardCacheEntry(int64 shardId)
{
ShardCacheEntry *shardEntry = NULL;
bool foundInCache = false;
bool recheck = false;
/* probably not reachable */
if (DistShardCacheHash == NULL)
{
InitializeDistTableCache();
}
/* lookup cache entry */
shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache);
if (!foundInCache)
{
/*
* A possible reason for not finding an entry in the cache is that the
* distributed table's cache entry hasn't been accessed. Thus look up
* the distributed table, and build the cache entry. Afterwards we
* know that the shard has to be in the cache if it exists. If the
* shard does *not* exist LookupShardRelation() will error out.
*/
Oid relationId = LookupShardRelation(shardId);
/* trigger building the cache for the shard id */
LookupDistTableCacheEntry(relationId);
recheck = true;
}
else if (!shardEntry->tableEntry->isValid)
{
/*
* The cache entry might not be valid right now. Reload cache entry
* and recheck (as the offset might have changed).
*/
LookupDistTableCacheEntry(shardEntry->tableEntry->relationId);
recheck = true;
}
/*
* If we (re-)loaded the table cache, re-search the shard cache - the
* shard index might have changed. If we still can't find the entry, it
* can't exist.
*/
if (recheck)
{
shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache);
if (!foundInCache)
{
ereport(ERROR, (errmsg("could not find valid entry for shard "
UINT64_FORMAT, shardId)));
}
}
return shardEntry;
}
/* /*
* DistributedTableCacheEntry looks up a pg_dist_partition entry for a * DistributedTableCacheEntry looks up a pg_dist_partition entry for a
* relation. * relation.
@ -441,6 +519,7 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
FmgrInfo *shardIntervalCompareFunction = NULL; FmgrInfo *shardIntervalCompareFunction = NULL;
List *distShardTupleList = NIL; List *distShardTupleList = NIL;
int shardIntervalArrayLength = 0; int shardIntervalArrayLength = 0;
int shardIndex = 0;
distShardTupleList = LookupDistShardTuples(cacheEntry->relationId); distShardTupleList = LookupDistShardTuples(cacheEntry->relationId);
shardIntervalArrayLength = list_length(distShardTupleList); shardIntervalArrayLength = list_length(distShardTupleList);
@ -539,6 +618,21 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
shardIntervalArrayLength); shardIntervalArrayLength);
} }
/* maintain shardId->(table,ShardInterval) cache */
for (shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++)
{
ShardCacheEntry *shardEntry = NULL;
ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex];
bool foundInCache = false;
shardEntry = hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_ENTER,
&foundInCache);
Assert(!foundInCache);
shardEntry->shardIndex = shardIndex;
shardEntry->tableEntry = cacheEntry;
}
cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength;
cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray;
cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction; cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction;
@ -1280,7 +1374,7 @@ InitializeDistTableCache(void)
DistShardScanKey[0].sk_collation = InvalidOid; DistShardScanKey[0].sk_collation = InvalidOid;
DistShardScanKey[0].sk_attno = Anum_pg_dist_shard_logicalrelid; DistShardScanKey[0].sk_attno = Anum_pg_dist_shard_logicalrelid;
/* initialize the hash table */ /* initialize the per-table hash table */
MemSet(&info, 0, sizeof(info)); MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(Oid); info.keysize = sizeof(Oid);
info.entrysize = sizeof(DistTableCacheEntry); info.entrysize = sizeof(DistTableCacheEntry);
@ -1289,6 +1383,15 @@ InitializeDistTableCache(void)
hash_create("Distributed Relation Cache", 32, &info, hash_create("Distributed Relation Cache", 32, &info,
HASH_ELEM | HASH_FUNCTION); HASH_ELEM | HASH_FUNCTION);
/* initialize the per-shard hash table */
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(int64);
info.entrysize = sizeof(ShardCacheEntry);
info.hash = tag_hash;
DistShardCacheHash =
hash_create("Shard Cache", 32 * 64, &info,
HASH_ELEM | HASH_FUNCTION);
/* Watch for invalidation events. */ /* Watch for invalidation events. */
CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback, CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback,
(Datum) 0); (Datum) 0);
@ -1520,7 +1623,14 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
{ {
ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[i]; ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[i];
bool valueByVal = shardInterval->valueByVal; bool valueByVal = shardInterval->valueByVal;
bool foundInCache = false;
/* delete per-shard cache-entry */
hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_REMOVE,
&foundInCache);
Assert(foundInCache);
/* delete data pointed to by ShardInterval */
if (!valueByVal) if (!valueByVal)
{ {
if (shardInterval->minValueExists) if (shardInterval->minValueExists)
@ -1534,6 +1644,7 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
} }
} }
/* and finally the ShardInterval itself */
pfree(shardInterval); pfree(shardInterval);
} }
@ -1772,6 +1883,46 @@ LookupDistShardTuples(Oid relationId)
} }
/*
* LookupShardRelation returns the logical relation oid a shard belongs to.
*
* Errors out if the shardId does not exist.
*/
static Oid
LookupShardRelation(int64 shardId)
{
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
HeapTuple heapTuple = NULL;
Form_pg_dist_shard shardForm = NULL;
Relation pgDistShard = heap_open(DistShardRelationId(), AccessShareLock);
Oid relationId = InvalidOid;
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
scanDescriptor = systable_beginscan(pgDistShard,
DistShardShardidIndexId(), true,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for shard "
UINT64_FORMAT, shardId)));
}
shardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple);
relationId = shardForm->logicalrelid;
systable_endscan(scanDescriptor);
heap_close(pgDistShard, NoLock);
return relationId;
}
/* /*
* GetPartitionTypeInputInfo populates output parameters with the interval type * GetPartitionTypeInputInfo populates output parameters with the interval type
* identifier and modifier for the specified partition key/method combination. * identifier and modifier for the specified partition key/method combination.