From f6e864733760e54e51005acf95114f2f39b73ee5 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Mon, 9 Jan 2017 17:56:41 -0800 Subject: [PATCH 1/3] Split DistTableCacheEntry() into separate functions. Previously the function was getting too large. Thus this splits the function into separate parts for looking up the cache entry and building the cache contents. --- .../distributed/utils/metadata_cache.c | 275 ++++++++++-------- 1 file changed, 153 insertions(+), 122 deletions(-) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 55b5329c1..190c66946 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -90,6 +90,8 @@ static ScanKeyData DistShardScanKey[1]; /* local function forward declarations */ static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId); +static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry); +static void BuildCachedShardList(DistTableCacheEntry *cacheEntry); static FmgrInfo * ShardIntervalCompareFunction(ShardInterval **shardIntervalArray, char partitionMethod); static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray, @@ -276,99 +278,171 @@ LookupDistTableCacheEntry(Oid relationId) { DistTableCacheEntry *cacheEntry = NULL; bool foundInCache = false; - HeapTuple distPartitionTuple = NULL; - char *partitionKeyString = NULL; - char partitionMethod = 0; - uint32 colocationId = INVALID_COLOCATION_ID; - char replicationModel = 0; - List *distShardTupleList = NIL; - int shardIntervalArrayLength = 0; - ShardInterval **shardIntervalArray = NULL; - ShardInterval **sortedShardIntervalArray = NULL; - FmgrInfo *shardIntervalCompareFunction = NULL; - FmgrInfo *hashFunction = NULL; - bool hasUninitializedShardInterval = false; - bool hasUniformHashDistribution = false; void *hashKey = (void *) &relationId; - Relation pgDistPartition = NULL; if (DistTableCacheHash == NULL) { InitializeDistTableCache(); } - cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_FIND, &foundInCache); + cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_ENTER, &foundInCache); /* return valid matches */ - if ((cacheEntry != NULL) && (cacheEntry->isValid)) + if (foundInCache) { - return cacheEntry; - } + if (cacheEntry->isValid) + { + return cacheEntry; + } - /* free the content of old, invalid, entries */ - if (cacheEntry != NULL) - { + /* free the content of old, invalid, entries */ ResetDistTableCacheEntry(cacheEntry); } + /* zero out entry, but not the key part */ + memset(((char *) cacheEntry) + sizeof(Oid), 0, + sizeof(DistTableCacheEntry) - sizeof(Oid)); + + /* actually fill out entry */ + BuildDistTableCacheEntry(cacheEntry); + + /* and finally mark as valid */ + cacheEntry->isValid = true; + + return cacheEntry; +} + + +/* + * BuildDistTableCacheEntry is a helper routine for + * LookupDistTableCacheEntry() for building the cache contents. + */ +static void +BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry) +{ + HeapTuple distPartitionTuple = NULL; + Relation pgDistPartition = NULL; + Form_pg_dist_partition partitionForm = NULL; + Datum partitionKeyDatum = 0; + Datum replicationModelDatum = 0; + MemoryContext oldContext = NULL; + TupleDesc tupleDescriptor = NULL; + bool isNull = false; + bool partitionKeyIsNull = false; + pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock); - distPartitionTuple = LookupDistPartitionTuple(pgDistPartition, relationId); - if (distPartitionTuple != NULL) + distPartitionTuple = + LookupDistPartitionTuple(pgDistPartition, cacheEntry->relationId); + + /* not a distributed table, done */ + if (distPartitionTuple == NULL) { - Form_pg_dist_partition partitionForm = - (Form_pg_dist_partition) GETSTRUCT(distPartitionTuple); - Datum partitionKeyDatum = 0; - Datum replicationModelDatum = 0; - MemoryContext oldContext = NULL; - TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); - bool isNull = false; - bool partitionKeyIsNull = false; + cacheEntry->isDistributedTable = false; + heap_close(pgDistPartition, NoLock); + return; + } - partitionKeyDatum = heap_getattr(distPartitionTuple, - Anum_pg_dist_partition_partkey, - tupleDescriptor, - &partitionKeyIsNull); + cacheEntry->isDistributedTable = true; - colocationId = heap_getattr(distPartitionTuple, - Anum_pg_dist_partition_colocationid, tupleDescriptor, - &isNull); - if (isNull) - { - colocationId = INVALID_COLOCATION_ID; - } + tupleDescriptor = RelationGetDescr(pgDistPartition); + partitionForm = (Form_pg_dist_partition) GETSTRUCT(distPartitionTuple); - replicationModelDatum = heap_getattr(distPartitionTuple, - Anum_pg_dist_partition_repmodel, - tupleDescriptor, - &isNull); + cacheEntry->partitionMethod = partitionForm->partmethod; - if (isNull) - { - /* - * repmodel is NOT NULL but before ALTER EXTENSION citus UPGRADE the column - * doesn't exist - */ - replicationModelDatum = CharGetDatum('c'); - } + partitionKeyDatum = heap_getattr(distPartitionTuple, + Anum_pg_dist_partition_partkey, + tupleDescriptor, + &partitionKeyIsNull); + /* note that for reference tables partitionKeyisNull is true */ + if (!partitionKeyIsNull) + { oldContext = MemoryContextSwitchTo(CacheMemoryContext); - partitionMethod = partitionForm->partmethod; - replicationModel = DatumGetChar(replicationModelDatum); - - /* note that for reference tables isNull becomes true */ - if (!partitionKeyIsNull) - { - partitionKeyString = TextDatumGetCString(partitionKeyDatum); - } - + cacheEntry->partitionKeyString = TextDatumGetCString(partitionKeyDatum); MemoryContextSwitchTo(oldContext); + } + else + { + cacheEntry->partitionKeyString = NULL; + } - heap_freetuple(distPartitionTuple); + cacheEntry->colocationId = heap_getattr(distPartitionTuple, + Anum_pg_dist_partition_colocationid, + tupleDescriptor, + &isNull); + if (isNull) + { + cacheEntry->colocationId = INVALID_COLOCATION_ID; + } + + replicationModelDatum = heap_getattr(distPartitionTuple, + Anum_pg_dist_partition_repmodel, + tupleDescriptor, + &isNull); + if (isNull) + { + /* + * repmodel is NOT NULL but before ALTER EXTENSION citus UPGRADE the column + * doesn't exist + */ + cacheEntry->replicationModel = 'c'; + } + else + { + cacheEntry->replicationModel = DatumGetChar(replicationModelDatum); + } + + heap_freetuple(distPartitionTuple); + + BuildCachedShardList(cacheEntry); + + /* we only need hash functions for hash distributed tables */ + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH) + { + TypeCacheEntry *typeEntry = NULL; + Node *partitionNode = stringToNode(cacheEntry->partitionKeyString); + Var *partitionColumn = (Var *) partitionNode; + FmgrInfo *hashFunction = NULL; + + Assert(IsA(partitionNode, Var)); + typeEntry = lookup_type_cache(partitionColumn->vartype, + TYPECACHE_HASH_PROC_FINFO); + + hashFunction = MemoryContextAllocZero(CacheMemoryContext, + sizeof(FmgrInfo)); + + fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo), CacheMemoryContext); + + cacheEntry->hashFunction = hashFunction; + + /* check the shard distribution for hash partitioned tables */ + cacheEntry->hasUniformHashDistribution = + HasUniformHashDistribution(cacheEntry->sortedShardIntervalArray, + cacheEntry->shardIntervalArrayLength); + } + else + { + cacheEntry->hashFunction = NULL; } heap_close(pgDistPartition, NoLock); +} - distShardTupleList = LookupDistShardTuples(relationId); + +/* + * BuildCachedShardList() is a helper routine for BuildDistTableCacheEntry() + * building up the list of shards in a distributed relation. + */ +static void +BuildCachedShardList(DistTableCacheEntry *cacheEntry) +{ + ShardInterval **shardIntervalArray = NULL; + ShardInterval **sortedShardIntervalArray = NULL; + FmgrInfo *shardIntervalCompareFunction = NULL; + List *distShardTupleList = NIL; + int shardIntervalArrayLength = 0; + + distShardTupleList = LookupDistShardTuples(cacheEntry->relationId); shardIntervalArrayLength = list_length(distShardTupleList); if (shardIntervalArrayLength > 0) { @@ -379,7 +453,9 @@ LookupDistTableCacheEntry(Oid relationId) Oid intervalTypeId = InvalidOid; int32 intervalTypeMod = -1; - GetPartitionTypeInputInfo(partitionKeyString, partitionMethod, &intervalTypeId, + GetPartitionTypeInputInfo(cacheEntry->partitionKeyString, + cacheEntry->partitionMethod, + &intervalTypeId, &intervalTypeMod); shardIntervalArray = MemoryContextAllocZero(CacheMemoryContext, @@ -411,7 +487,7 @@ LookupDistTableCacheEntry(Oid relationId) } /* decide and allocate interval comparison function */ - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) { shardIntervalCompareFunction = NULL; } @@ -422,16 +498,17 @@ LookupDistTableCacheEntry(Oid relationId) /* allocate the comparison function in the cache context */ oldContext = MemoryContextSwitchTo(CacheMemoryContext); - shardIntervalCompareFunction = ShardIntervalCompareFunction(shardIntervalArray, - partitionMethod); + shardIntervalCompareFunction = + ShardIntervalCompareFunction(shardIntervalArray, + cacheEntry->partitionMethod); MemoryContextSwitchTo(oldContext); } /* reference tables has a single shard which is not initialized */ - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) { - hasUninitializedShardInterval = true; + cacheEntry->hasUninitializedShardInterval = true; /* * Note that during create_reference_table() call, @@ -439,7 +516,7 @@ LookupDistTableCacheEntry(Oid relationId) */ if (shardIntervalArrayLength > 1) { - char *relationName = get_rel_name(relationId); + char *relationName = get_rel_name(cacheEntry->relationId); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("reference table \"%s\" has more than 1 shard", @@ -457,60 +534,14 @@ LookupDistTableCacheEntry(Oid relationId) shardIntervalCompareFunction); /* check if there exists any shard intervals with no min/max values */ - hasUninitializedShardInterval = + cacheEntry->hasUninitializedShardInterval = HasUninitializedShardInterval(sortedShardIntervalArray, shardIntervalArrayLength); } - /* we only need hash functions for hash distributed tables */ - if (partitionMethod == DISTRIBUTE_BY_HASH) - { - TypeCacheEntry *typeEntry = NULL; - Node *partitionNode = stringToNode(partitionKeyString); - Var *partitionColumn = (Var *) partitionNode; - Assert(IsA(partitionNode, Var)); - typeEntry = lookup_type_cache(partitionColumn->vartype, - TYPECACHE_HASH_PROC_FINFO); - - hashFunction = MemoryContextAllocZero(CacheMemoryContext, - sizeof(FmgrInfo)); - - fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo), CacheMemoryContext); - - /* check the shard distribution for hash partitioned tables */ - hasUniformHashDistribution = - HasUniformHashDistribution(sortedShardIntervalArray, - shardIntervalArrayLength); - } - - cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_ENTER, NULL); - - /* zero out entry, but not the key part */ - memset(((char *) cacheEntry) + sizeof(Oid), 0, - sizeof(DistTableCacheEntry) - sizeof(Oid)); - - if (distPartitionTuple == NULL) - { - cacheEntry->isValid = true; - cacheEntry->isDistributedTable = false; - } - else - { - cacheEntry->isValid = true; - cacheEntry->isDistributedTable = true; - cacheEntry->partitionKeyString = partitionKeyString; - cacheEntry->partitionMethod = partitionMethod; - cacheEntry->colocationId = colocationId; - cacheEntry->replicationModel = replicationModel; - cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; - cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; - cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction; - cacheEntry->hashFunction = hashFunction; - cacheEntry->hasUninitializedShardInterval = hasUninitializedShardInterval; - cacheEntry->hasUniformHashDistribution = hasUniformHashDistribution; - } - - return cacheEntry; + cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; + cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; + cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction; } From 8cb47195baeaf795f2e2c67441702c56c7c73dd0 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 10 Jan 2017 00:38:16 -0800 Subject: [PATCH 2/3] Make LoadShardInterval() backed by the metadata cache. Doing so requires adding a mapping from shardId to the cache entries. For that metadata_cache.c now maintains an additional hashtable. That hashtable only references shard intervals in the dist table cache. --- .../distributed/utils/metadata_cache.c | 227 +++++++++++++++--- 1 file changed, 189 insertions(+), 38 deletions(-) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 190c66946..a786398d0 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -50,6 +50,33 @@ #include "utils/typcache.h" +/* + * ShardCacheEntry represents an entry in the shardId -> ShardInterval cache. + * To avoid duplicating data and invalidation logic between this cache and the + * DistTableCache, this only points into the DistTableCacheEntry of the + * shard's distributed table. + */ +typedef struct ShardCacheEntry +{ + /* hash key, needs to be first */ + int64 shardId; + + /* + * Cache entry for the distributed table a shard belongs to, possibly not + * valid. + */ + DistTableCacheEntry *tableEntry; + + /* + * Offset in tableEntry->sortedShardIntervalArray, only valid if + * tableEntry->isValid. We don't store pointers to the individual shard + * placements because that'd make invalidation a bit more complicated, and + * because there's simply no need. + */ + int shardIndex; +} ShardCacheEntry; + + /* state which should be cleared upon DROP EXTENSION */ static bool extensionLoaded = false; static Oid distShardRelationId = InvalidOid; @@ -74,6 +101,9 @@ static Oid extraDataContainerFuncId = InvalidOid; /* Hash table for informations about each partition */ static HTAB *DistTableCacheHash = NULL; +/* Hash table for informations about each shard */ +static HTAB *DistShardCacheHash = NULL; + /* Hash table for informations about worker nodes */ static HTAB *WorkerNodeHash = NULL; static bool workerNodeHashValid = false; @@ -89,6 +119,7 @@ static ScanKeyData DistShardScanKey[1]; /* local function forward declarations */ +static ShardCacheEntry * LookupShardCacheEntry(int64 shardId); static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId); static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void BuildCachedShardList(DistTableCacheEntry *cacheEntry); @@ -111,6 +142,7 @@ static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static List * LookupDistShardTuples(Oid relationId); +static Oid LookupShardRelation(int64 shardId); static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, Oid *intervalTypeId, int32 *intervalTypeMod); static ShardInterval * TupleToShardInterval(HeapTuple heapTuple, @@ -181,58 +213,104 @@ DistributedTableList(void) /* - * LoadShardInterval reads shard metadata for given shardId from pg_dist_shard, - * and converts min/max values in these metadata to their properly typed datum - * representations. The function then allocates a structure that stores the read - * and converted values, and returns this structure. + * LoadShardInterval returns the, cached, metadata about a shard. + * + * The return value is a copy of the cached ShardInterval struct and may + * therefore be modified and/or freed. */ ShardInterval * LoadShardInterval(uint64 shardId) { - ShardInterval *shardInterval; - SysScanDesc scanDescriptor = NULL; - ScanKeyData scanKey[1]; - int scanKeyCount = 1; - HeapTuple heapTuple = NULL; - Form_pg_dist_shard shardForm = NULL; - DistTableCacheEntry *partitionEntry; - Oid intervalTypeId = InvalidOid; - int32 intervalTypeMod = -1; + ShardInterval *shardInterval = NULL; + ShardInterval *sourceShardInterval = NULL; + ShardCacheEntry *shardEntry = NULL; + DistTableCacheEntry *tableEntry = NULL; - Relation pgDistShard = heap_open(DistShardRelationId(), AccessShareLock); - TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard); + shardEntry = LookupShardCacheEntry(shardId); - ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); + tableEntry = shardEntry->tableEntry; - scanDescriptor = systable_beginscan(pgDistShard, - DistShardShardidIndexId(), true, - NULL, scanKeyCount, scanKey); + Assert(tableEntry->isDistributedTable); - heapTuple = systable_getnext(scanDescriptor); - if (!HeapTupleIsValid(heapTuple)) - { - ereport(ERROR, (errmsg("could not find valid entry for shard " - UINT64_FORMAT, shardId))); - } + /* the offset better be in a valid range */ + Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength); - shardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple); - partitionEntry = DistributedTableCacheEntry(shardForm->logicalrelid); + sourceShardInterval = tableEntry->sortedShardIntervalArray[shardEntry->shardIndex]; - GetPartitionTypeInputInfo(partitionEntry->partitionKeyString, - partitionEntry->partitionMethod, &intervalTypeId, - &intervalTypeMod); - - shardInterval = TupleToShardInterval(heapTuple, tupleDescriptor, intervalTypeId, - intervalTypeMod); - - systable_endscan(scanDescriptor); - heap_close(pgDistShard, AccessShareLock); + /* copy value to return */ + shardInterval = (ShardInterval *) palloc0(sizeof(ShardInterval)); + CopyShardInterval(sourceShardInterval, shardInterval); return shardInterval; } +/* + * LookupShardCacheEntry returns the cache entry belonging to a shard, or + * errors out if that shard is unknown. + */ +static ShardCacheEntry * +LookupShardCacheEntry(int64 shardId) +{ + ShardCacheEntry *shardEntry = NULL; + bool foundInCache = false; + bool recheck = false; + + /* probably not reachable */ + if (DistShardCacheHash == NULL) + { + InitializeDistTableCache(); + } + + /* lookup cache entry */ + shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache); + + if (!foundInCache) + { + /* + * A possible reason for not finding an entry in the cache is that the + * distributed table's cache entry hasn't been accessed. Thus look up + * the distributed table, and build the cache entry. Afterwards we + * know that the shard has to be in the cache if it exists. If the + * shard does *not* exist LookupShardRelation() will error out. + */ + Oid relationId = LookupShardRelation(shardId); + + /* trigger building the cache for the shard id */ + LookupDistTableCacheEntry(relationId); + + recheck = true; + } + else if (!shardEntry->tableEntry->isValid) + { + /* + * The cache entry might not be valid right now. Reload cache entry + * and recheck (as the offset might have changed). + */ + LookupDistTableCacheEntry(shardEntry->tableEntry->relationId); + recheck = true; + } + + /* + * If we (re-)loaded the table cache, re-search the shard cache - the + * shard index might have changed. If we still can't find the entry, it + * can't exist. + */ + if (recheck) + { + shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache); + + if (!foundInCache) + { + ereport(ERROR, (errmsg("could not find valid entry for shard " + UINT64_FORMAT, shardId))); + } + } + + return shardEntry; +} + + /* * DistributedTableCacheEntry looks up a pg_dist_partition entry for a * relation. @@ -441,6 +519,7 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) FmgrInfo *shardIntervalCompareFunction = NULL; List *distShardTupleList = NIL; int shardIntervalArrayLength = 0; + int shardIndex = 0; distShardTupleList = LookupDistShardTuples(cacheEntry->relationId); shardIntervalArrayLength = list_length(distShardTupleList); @@ -539,6 +618,21 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) shardIntervalArrayLength); } + + /* maintain shardId->(table,ShardInterval) cache */ + for (shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++) + { + ShardCacheEntry *shardEntry = NULL; + ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex]; + bool foundInCache = false; + + shardEntry = hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_ENTER, + &foundInCache); + Assert(!foundInCache); + shardEntry->shardIndex = shardIndex; + shardEntry->tableEntry = cacheEntry; + } + cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction; @@ -1280,7 +1374,7 @@ InitializeDistTableCache(void) DistShardScanKey[0].sk_collation = InvalidOid; DistShardScanKey[0].sk_attno = Anum_pg_dist_shard_logicalrelid; - /* initialize the hash table */ + /* initialize the per-table hash table */ MemSet(&info, 0, sizeof(info)); info.keysize = sizeof(Oid); info.entrysize = sizeof(DistTableCacheEntry); @@ -1289,6 +1383,15 @@ InitializeDistTableCache(void) hash_create("Distributed Relation Cache", 32, &info, HASH_ELEM | HASH_FUNCTION); + /* initialize the per-shard hash table */ + MemSet(&info, 0, sizeof(info)); + info.keysize = sizeof(int64); + info.entrysize = sizeof(ShardCacheEntry); + info.hash = tag_hash; + DistShardCacheHash = + hash_create("Shard Cache", 32 * 64, &info, + HASH_ELEM | HASH_FUNCTION); + /* Watch for invalidation events. */ CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback, (Datum) 0); @@ -1520,7 +1623,14 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry) { ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[i]; bool valueByVal = shardInterval->valueByVal; + bool foundInCache = false; + /* delete per-shard cache-entry */ + hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_REMOVE, + &foundInCache); + Assert(foundInCache); + + /* delete data pointed to by ShardInterval */ if (!valueByVal) { if (shardInterval->minValueExists) @@ -1534,6 +1644,7 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry) } } + /* and finally the ShardInterval itself */ pfree(shardInterval); } @@ -1772,6 +1883,46 @@ LookupDistShardTuples(Oid relationId) } +/* + * LookupShardRelation returns the logical relation oid a shard belongs to. + * + * Errors out if the shardId does not exist. + */ +static Oid +LookupShardRelation(int64 shardId) +{ + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + HeapTuple heapTuple = NULL; + Form_pg_dist_shard shardForm = NULL; + Relation pgDistShard = heap_open(DistShardRelationId(), AccessShareLock); + Oid relationId = InvalidOid; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); + + scanDescriptor = systable_beginscan(pgDistShard, + DistShardShardidIndexId(), true, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for shard " + UINT64_FORMAT, shardId))); + } + + shardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple); + relationId = shardForm->logicalrelid; + + systable_endscan(scanDescriptor); + heap_close(pgDistShard, NoLock); + + return relationId; +} + + /* * GetPartitionTypeInputInfo populates output parameters with the interval type * identifier and modifier for the specified partition key/method combination. From b813b39241f7ffd238250d8aaac4d61cc955934f Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 10 Jan 2017 02:03:00 -0800 Subject: [PATCH 3/3] Cache ShardPlacements in metadata cache. So far we've reloaded them frequently. Besides avoiding that cost - noticeable for some workloads with large shard counts - it makes it easier to add information to ShardPlacements that help us make placement_connection.c colocation aware. --- .../distributed/master/master_create_shards.c | 2 +- .../master/master_metadata_utility.c | 41 ++-- .../distributed/utils/metadata_cache.c | 197 ++++++++++++++---- .../distributed/master_metadata_utility.h | 6 +- src/include/distributed/metadata_cache.h | 5 + 5 files changed, 194 insertions(+), 57 deletions(-) diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 874bda931..5cbe6130d 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -28,8 +28,8 @@ #include "catalog/pg_class.h" #include "distributed/connection_cache.h" #include "distributed/listutils.h" -#include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 74c9f39d5..d03d7a407 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -55,6 +55,8 @@ static uint64 * AllocateUint64(uint64 value); static void RecordDistributedRelationDependencies(Oid distributedRelationId, Node *distributionKey); +static ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc, + HeapTuple heapTuple); /* exports for SQL callable functions */ @@ -240,6 +242,24 @@ CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval) } +/* + * CopyShardPlacement copies the values of the source placement into the + * target placement. + */ +void +CopyShardPlacement(ShardPlacement *srcPlacement, ShardPlacement *destPlacement) +{ + /* first copy all by-value fields */ + memcpy(destPlacement, srcPlacement, sizeof(ShardPlacement)); + + /* and then the fields pointing to external values */ + if (srcPlacement->nodeName) + { + destPlacement->nodeName = pstrdup(srcPlacement->nodeName); + } +} + + /* * ShardLength finds shard placements for the given shardId, extracts the length * of a finalized shard, and returns the shard's length. This function errors @@ -361,13 +381,17 @@ FinalizedShardPlacement(uint64 shardId, bool missingOk) /* - * ShardPlacementList finds shard placements for the given shardId from system - * catalogs, converts these placements to their in-memory representation, and - * returns the converted shard placements in a new list. + * BuildShardPlacementList finds shard placements for the given shardId from + * system catalogs, converts these placements to their in-memory + * representation, and returns the converted shard placements in a new list. + * + * This probably only should be called from metadata_cache.c. Resides here + * because it shares code with other routines in this file. */ List * -ShardPlacementList(uint64 shardId) +BuildShardPlacementList(ShardInterval *shardInterval) { + int64 shardId = shardInterval->shardId; List *shardPlacementList = NIL; Relation pgShardPlacement = NULL; SysScanDesc scanDescriptor = NULL; @@ -399,13 +423,6 @@ ShardPlacementList(uint64 shardId) systable_endscan(scanDescriptor); heap_close(pgShardPlacement, AccessShareLock); - /* if no shard placements are found, warn the user */ - if (shardPlacementList == NIL) - { - ereport(WARNING, (errmsg("could not find any shard placements for shardId " - UINT64_FORMAT, shardId))); - } - return shardPlacementList; } @@ -415,7 +432,7 @@ ShardPlacementList(uint64 shardId) * and converts this tuple to in-memory struct. The function assumes the * caller already has locks on the tuple, and doesn't perform any locking. */ -ShardPlacement * +static ShardPlacement * TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) { ShardPlacement *shardPlacement = NULL; diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index a786398d0..98623fc5d 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -245,6 +245,52 @@ LoadShardInterval(uint64 shardId) } +/* + * ShardPlacementList returns the list of placements for the given shard from + * the cache. + * + * The returned list is deep copied from the cache and thus can be modified + * and pfree()d freely. + */ +List * +ShardPlacementList(uint64 shardId) +{ + ShardCacheEntry *shardEntry = NULL; + DistTableCacheEntry *tableEntry = NULL; + ShardPlacement *placementArray = NULL; + int numberOfPlacements = 0; + List *placementList = NIL; + int i = 0; + + shardEntry = LookupShardCacheEntry(shardId); + tableEntry = shardEntry->tableEntry; + + /* the offset better be in a valid range */ + Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength); + + placementArray = tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex]; + numberOfPlacements = tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex]; + + for (i = 0; i < numberOfPlacements; i++) + { + /* copy placement into target context */ + ShardPlacement *placement = CitusMakeNode(ShardPlacement); + CopyShardPlacement(&placementArray[i], placement); + + placementList = lappend(placementList, placement); + } + + /* if no shard placements are found, warn the user */ + if (numberOfPlacements == 0) + { + ereport(WARNING, (errmsg("could not find any shard placements for shardId " + UINT64_FORMAT, shardId))); + } + + return placementList; +} + + /* * LookupShardCacheEntry returns the cache entry belonging to a shard, or * errors out if that shard is unknown. @@ -541,6 +587,15 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) shardIntervalArrayLength * sizeof(ShardInterval *)); + cacheEntry->arrayOfPlacementArrays = + MemoryContextAllocZero(CacheMemoryContext, + shardIntervalArrayLength * + sizeof(ShardPlacement *)); + cacheEntry->arrayOfPlacementArrayLengths = + MemoryContextAllocZero(CacheMemoryContext, + shardIntervalArrayLength * + sizeof(int)); + foreach(distShardTupleCell, distShardTupleList) { HeapTuple shardTuple = lfirst(distShardTupleCell); @@ -625,12 +680,38 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) ShardCacheEntry *shardEntry = NULL; ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex]; bool foundInCache = false; + List *placementList = NIL; + MemoryContext oldContext = NULL; + ListCell *placementCell = NULL; + ShardPlacement *placementArray = NULL; + int placementOffset = 0; + int numberOfPlacements = 0; shardEntry = hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_ENTER, &foundInCache); Assert(!foundInCache); shardEntry->shardIndex = shardIndex; shardEntry->tableEntry = cacheEntry; + + /* build list of shard placements */ + placementList = BuildShardPlacementList(shardInterval); + numberOfPlacements = list_length(placementList); + + /* and copy that list into the cache entry */ + oldContext = MemoryContextSwitchTo(CacheMemoryContext); + placementArray = palloc0(numberOfPlacements * sizeof(ShardPlacement)); + foreach(placementCell, placementList) + { + ShardPlacement *srcPlacement = (ShardPlacement *) lfirst(placementCell); + + CopyShardPlacement(srcPlacement, &placementArray[placementOffset]); + + placementOffset++; + } + MemoryContextSwitchTo(oldContext); + + cacheEntry->arrayOfPlacementArrays[shardIndex] = placementArray; + cacheEntry->arrayOfPlacementArrayLengths[shardIndex] = numberOfPlacements; } cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; @@ -1609,65 +1690,99 @@ WorkerNodeHashCode(const void *key, Size keySize) void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry) { + int shardIndex = 0; + if (cacheEntry->partitionKeyString != NULL) { pfree(cacheEntry->partitionKeyString); cacheEntry->partitionKeyString = NULL; } - if (cacheEntry->shardIntervalArrayLength > 0) + if (cacheEntry->shardIntervalCompareFunction != NULL) { - int i = 0; + pfree(cacheEntry->shardIntervalCompareFunction); + cacheEntry->shardIntervalCompareFunction = NULL; + } - for (i = 0; i < cacheEntry->shardIntervalArrayLength; i++) + if (cacheEntry->hashFunction) + { + pfree(cacheEntry->hashFunction); + cacheEntry->hashFunction = NULL; + } + + if (cacheEntry->shardIntervalArrayLength == 0) + { + return; + } + + for (shardIndex = 0; shardIndex < cacheEntry->shardIntervalArrayLength; + shardIndex++) + { + ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex]; + ShardPlacement *placementArray = cacheEntry->arrayOfPlacementArrays[shardIndex]; + int numberOfPlacements = cacheEntry->arrayOfPlacementArrayLengths[shardIndex]; + bool valueByVal = shardInterval->valueByVal; + bool foundInCache = false; + int placementIndex = 0; + + /* delete the shard's placements */ + for (placementIndex = 0; + placementIndex < numberOfPlacements; + placementIndex++) { - ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[i]; - bool valueByVal = shardInterval->valueByVal; - bool foundInCache = false; + ShardPlacement *placement = &placementArray[placementIndex]; - /* delete per-shard cache-entry */ - hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_REMOVE, - &foundInCache); - Assert(foundInCache); - - /* delete data pointed to by ShardInterval */ - if (!valueByVal) + if (placement->nodeName) { - if (shardInterval->minValueExists) - { - pfree(DatumGetPointer(shardInterval->minValue)); - } - - if (shardInterval->maxValueExists) - { - pfree(DatumGetPointer(shardInterval->maxValue)); - } + pfree(placement->nodeName); } - /* and finally the ShardInterval itself */ - pfree(shardInterval); + /* placement itself is deleted as part of the array */ + } + pfree(placementArray); + + /* delete per-shard cache-entry */ + hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_REMOVE, + &foundInCache); + Assert(foundInCache); + + /* delete data pointed to by ShardInterval */ + if (!valueByVal) + { + if (shardInterval->minValueExists) + { + pfree(DatumGetPointer(shardInterval->minValue)); + } + + if (shardInterval->maxValueExists) + { + pfree(DatumGetPointer(shardInterval->maxValue)); + } } + /* and finally the ShardInterval itself */ + pfree(shardInterval); + } + + if (cacheEntry->sortedShardIntervalArray) + { pfree(cacheEntry->sortedShardIntervalArray); cacheEntry->sortedShardIntervalArray = NULL; - cacheEntry->shardIntervalArrayLength = 0; - - cacheEntry->hasUninitializedShardInterval = false; - cacheEntry->hasUniformHashDistribution = false; - - if (cacheEntry->shardIntervalCompareFunction != NULL) - { - pfree(cacheEntry->shardIntervalCompareFunction); - cacheEntry->shardIntervalCompareFunction = NULL; - } - - /* we only allocated hash function for hash distributed tables */ - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH) - { - pfree(cacheEntry->hashFunction); - cacheEntry->hashFunction = NULL; - } } + if (cacheEntry->arrayOfPlacementArrayLengths) + { + pfree(cacheEntry->arrayOfPlacementArrayLengths); + cacheEntry->arrayOfPlacementArrayLengths = NULL; + } + if (cacheEntry->arrayOfPlacementArrays) + { + pfree(cacheEntry->arrayOfPlacementArrays); + cacheEntry->arrayOfPlacementArrays = NULL; + } + + cacheEntry->shardIntervalArrayLength = 0; + cacheEntry->hasUninitializedShardInterval = false; + cacheEntry->hasUniformHashDistribution = false; } diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index a6e6c9070..696016f78 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -62,13 +62,13 @@ extern List * LoadShardIntervalList(Oid relationId); extern int ShardIntervalCount(Oid relationId); extern List * LoadShardList(Oid relationId); extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval); +extern void CopyShardPlacement(ShardPlacement *srcPlacement, + ShardPlacement *destPlacement); extern uint64 ShardLength(uint64 shardId); extern bool NodeHasActiveShardPlacements(char *nodeName, int32 nodePort); extern List * FinalizedShardPlacementList(uint64 shardId); extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk); -extern List * ShardPlacementList(uint64 shardId); -extern ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc, - HeapTuple heapTuple); +extern List * BuildShardPlacementList(ShardInterval *shardInterval); /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 38fbb9cb1..314002acc 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -49,6 +49,10 @@ typedef struct FmgrInfo *shardIntervalCompareFunction; /* NULL if no shard intervals exist */ FmgrInfo *hashFunction; /* NULL if table is not distributed by hash */ + + /* pg_dist_shard_placement metadata */ + ShardPlacement **arrayOfPlacementArrays; + int *arrayOfPlacementArrayLengths; } DistTableCacheEntry; @@ -58,6 +62,7 @@ extern ShardInterval * LoadShardInterval(uint64 shardId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern int GetLocalGroupId(void); extern List * DistTableOidList(void); +extern List * ShardPlacementList(uint64 shardId); extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByShardId(int64 shardId);