From b813b39241f7ffd238250d8aaac4d61cc955934f Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 10 Jan 2017 02:03:00 -0800 Subject: [PATCH] Cache ShardPlacements in metadata cache. So far we've reloaded them frequently. Besides avoiding that cost - noticeable for some workloads with large shard counts - it makes it easier to add information to ShardPlacements that help us make placement_connection.c colocation aware. --- .../distributed/master/master_create_shards.c | 2 +- .../master/master_metadata_utility.c | 41 ++-- .../distributed/utils/metadata_cache.c | 197 ++++++++++++++---- .../distributed/master_metadata_utility.h | 6 +- src/include/distributed/metadata_cache.h | 5 + 5 files changed, 194 insertions(+), 57 deletions(-) diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 874bda931..5cbe6130d 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -28,8 +28,8 @@ #include "catalog/pg_class.h" #include "distributed/connection_cache.h" #include "distributed/listutils.h" -#include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 74c9f39d5..d03d7a407 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -55,6 +55,8 @@ static uint64 * AllocateUint64(uint64 value); static void RecordDistributedRelationDependencies(Oid distributedRelationId, Node *distributionKey); +static ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc, + HeapTuple heapTuple); /* exports for SQL callable functions */ @@ -240,6 +242,24 @@ CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval) } +/* + * CopyShardPlacement copies the values of the source placement into the + * target placement. + */ +void +CopyShardPlacement(ShardPlacement *srcPlacement, ShardPlacement *destPlacement) +{ + /* first copy all by-value fields */ + memcpy(destPlacement, srcPlacement, sizeof(ShardPlacement)); + + /* and then the fields pointing to external values */ + if (srcPlacement->nodeName) + { + destPlacement->nodeName = pstrdup(srcPlacement->nodeName); + } +} + + /* * ShardLength finds shard placements for the given shardId, extracts the length * of a finalized shard, and returns the shard's length. This function errors @@ -361,13 +381,17 @@ FinalizedShardPlacement(uint64 shardId, bool missingOk) /* - * ShardPlacementList finds shard placements for the given shardId from system - * catalogs, converts these placements to their in-memory representation, and - * returns the converted shard placements in a new list. + * BuildShardPlacementList finds shard placements for the given shardId from + * system catalogs, converts these placements to their in-memory + * representation, and returns the converted shard placements in a new list. + * + * This probably only should be called from metadata_cache.c. Resides here + * because it shares code with other routines in this file. */ List * -ShardPlacementList(uint64 shardId) +BuildShardPlacementList(ShardInterval *shardInterval) { + int64 shardId = shardInterval->shardId; List *shardPlacementList = NIL; Relation pgShardPlacement = NULL; SysScanDesc scanDescriptor = NULL; @@ -399,13 +423,6 @@ ShardPlacementList(uint64 shardId) systable_endscan(scanDescriptor); heap_close(pgShardPlacement, AccessShareLock); - /* if no shard placements are found, warn the user */ - if (shardPlacementList == NIL) - { - ereport(WARNING, (errmsg("could not find any shard placements for shardId " - UINT64_FORMAT, shardId))); - } - return shardPlacementList; } @@ -415,7 +432,7 @@ ShardPlacementList(uint64 shardId) * and converts this tuple to in-memory struct. The function assumes the * caller already has locks on the tuple, and doesn't perform any locking. */ -ShardPlacement * +static ShardPlacement * TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) { ShardPlacement *shardPlacement = NULL; diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index a786398d0..98623fc5d 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -245,6 +245,52 @@ LoadShardInterval(uint64 shardId) } +/* + * ShardPlacementList returns the list of placements for the given shard from + * the cache. + * + * The returned list is deep copied from the cache and thus can be modified + * and pfree()d freely. + */ +List * +ShardPlacementList(uint64 shardId) +{ + ShardCacheEntry *shardEntry = NULL; + DistTableCacheEntry *tableEntry = NULL; + ShardPlacement *placementArray = NULL; + int numberOfPlacements = 0; + List *placementList = NIL; + int i = 0; + + shardEntry = LookupShardCacheEntry(shardId); + tableEntry = shardEntry->tableEntry; + + /* the offset better be in a valid range */ + Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength); + + placementArray = tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex]; + numberOfPlacements = tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex]; + + for (i = 0; i < numberOfPlacements; i++) + { + /* copy placement into target context */ + ShardPlacement *placement = CitusMakeNode(ShardPlacement); + CopyShardPlacement(&placementArray[i], placement); + + placementList = lappend(placementList, placement); + } + + /* if no shard placements are found, warn the user */ + if (numberOfPlacements == 0) + { + ereport(WARNING, (errmsg("could not find any shard placements for shardId " + UINT64_FORMAT, shardId))); + } + + return placementList; +} + + /* * LookupShardCacheEntry returns the cache entry belonging to a shard, or * errors out if that shard is unknown. @@ -541,6 +587,15 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) shardIntervalArrayLength * sizeof(ShardInterval *)); + cacheEntry->arrayOfPlacementArrays = + MemoryContextAllocZero(CacheMemoryContext, + shardIntervalArrayLength * + sizeof(ShardPlacement *)); + cacheEntry->arrayOfPlacementArrayLengths = + MemoryContextAllocZero(CacheMemoryContext, + shardIntervalArrayLength * + sizeof(int)); + foreach(distShardTupleCell, distShardTupleList) { HeapTuple shardTuple = lfirst(distShardTupleCell); @@ -625,12 +680,38 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) ShardCacheEntry *shardEntry = NULL; ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex]; bool foundInCache = false; + List *placementList = NIL; + MemoryContext oldContext = NULL; + ListCell *placementCell = NULL; + ShardPlacement *placementArray = NULL; + int placementOffset = 0; + int numberOfPlacements = 0; shardEntry = hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_ENTER, &foundInCache); Assert(!foundInCache); shardEntry->shardIndex = shardIndex; shardEntry->tableEntry = cacheEntry; + + /* build list of shard placements */ + placementList = BuildShardPlacementList(shardInterval); + numberOfPlacements = list_length(placementList); + + /* and copy that list into the cache entry */ + oldContext = MemoryContextSwitchTo(CacheMemoryContext); + placementArray = palloc0(numberOfPlacements * sizeof(ShardPlacement)); + foreach(placementCell, placementList) + { + ShardPlacement *srcPlacement = (ShardPlacement *) lfirst(placementCell); + + CopyShardPlacement(srcPlacement, &placementArray[placementOffset]); + + placementOffset++; + } + MemoryContextSwitchTo(oldContext); + + cacheEntry->arrayOfPlacementArrays[shardIndex] = placementArray; + cacheEntry->arrayOfPlacementArrayLengths[shardIndex] = numberOfPlacements; } cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; @@ -1609,65 +1690,99 @@ WorkerNodeHashCode(const void *key, Size keySize) void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry) { + int shardIndex = 0; + if (cacheEntry->partitionKeyString != NULL) { pfree(cacheEntry->partitionKeyString); cacheEntry->partitionKeyString = NULL; } - if (cacheEntry->shardIntervalArrayLength > 0) + if (cacheEntry->shardIntervalCompareFunction != NULL) { - int i = 0; + pfree(cacheEntry->shardIntervalCompareFunction); + cacheEntry->shardIntervalCompareFunction = NULL; + } - for (i = 0; i < cacheEntry->shardIntervalArrayLength; i++) + if (cacheEntry->hashFunction) + { + pfree(cacheEntry->hashFunction); + cacheEntry->hashFunction = NULL; + } + + if (cacheEntry->shardIntervalArrayLength == 0) + { + return; + } + + for (shardIndex = 0; shardIndex < cacheEntry->shardIntervalArrayLength; + shardIndex++) + { + ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex]; + ShardPlacement *placementArray = cacheEntry->arrayOfPlacementArrays[shardIndex]; + int numberOfPlacements = cacheEntry->arrayOfPlacementArrayLengths[shardIndex]; + bool valueByVal = shardInterval->valueByVal; + bool foundInCache = false; + int placementIndex = 0; + + /* delete the shard's placements */ + for (placementIndex = 0; + placementIndex < numberOfPlacements; + placementIndex++) { - ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[i]; - bool valueByVal = shardInterval->valueByVal; - bool foundInCache = false; + ShardPlacement *placement = &placementArray[placementIndex]; - /* delete per-shard cache-entry */ - hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_REMOVE, - &foundInCache); - Assert(foundInCache); - - /* delete data pointed to by ShardInterval */ - if (!valueByVal) + if (placement->nodeName) { - if (shardInterval->minValueExists) - { - pfree(DatumGetPointer(shardInterval->minValue)); - } - - if (shardInterval->maxValueExists) - { - pfree(DatumGetPointer(shardInterval->maxValue)); - } + pfree(placement->nodeName); } - /* and finally the ShardInterval itself */ - pfree(shardInterval); + /* placement itself is deleted as part of the array */ + } + pfree(placementArray); + + /* delete per-shard cache-entry */ + hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_REMOVE, + &foundInCache); + Assert(foundInCache); + + /* delete data pointed to by ShardInterval */ + if (!valueByVal) + { + if (shardInterval->minValueExists) + { + pfree(DatumGetPointer(shardInterval->minValue)); + } + + if (shardInterval->maxValueExists) + { + pfree(DatumGetPointer(shardInterval->maxValue)); + } } + /* and finally the ShardInterval itself */ + pfree(shardInterval); + } + + if (cacheEntry->sortedShardIntervalArray) + { pfree(cacheEntry->sortedShardIntervalArray); cacheEntry->sortedShardIntervalArray = NULL; - cacheEntry->shardIntervalArrayLength = 0; - - cacheEntry->hasUninitializedShardInterval = false; - cacheEntry->hasUniformHashDistribution = false; - - if (cacheEntry->shardIntervalCompareFunction != NULL) - { - pfree(cacheEntry->shardIntervalCompareFunction); - cacheEntry->shardIntervalCompareFunction = NULL; - } - - /* we only allocated hash function for hash distributed tables */ - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH) - { - pfree(cacheEntry->hashFunction); - cacheEntry->hashFunction = NULL; - } } + if (cacheEntry->arrayOfPlacementArrayLengths) + { + pfree(cacheEntry->arrayOfPlacementArrayLengths); + cacheEntry->arrayOfPlacementArrayLengths = NULL; + } + if (cacheEntry->arrayOfPlacementArrays) + { + pfree(cacheEntry->arrayOfPlacementArrays); + cacheEntry->arrayOfPlacementArrays = NULL; + } + + cacheEntry->shardIntervalArrayLength = 0; + cacheEntry->hasUninitializedShardInterval = false; + cacheEntry->hasUniformHashDistribution = false; } diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index a6e6c9070..696016f78 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -62,13 +62,13 @@ extern List * LoadShardIntervalList(Oid relationId); extern int ShardIntervalCount(Oid relationId); extern List * LoadShardList(Oid relationId); extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval); +extern void CopyShardPlacement(ShardPlacement *srcPlacement, + ShardPlacement *destPlacement); extern uint64 ShardLength(uint64 shardId); extern bool NodeHasActiveShardPlacements(char *nodeName, int32 nodePort); extern List * FinalizedShardPlacementList(uint64 shardId); extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk); -extern List * ShardPlacementList(uint64 shardId); -extern ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc, - HeapTuple heapTuple); +extern List * BuildShardPlacementList(ShardInterval *shardInterval); /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 38fbb9cb1..314002acc 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -49,6 +49,10 @@ typedef struct FmgrInfo *shardIntervalCompareFunction; /* NULL if no shard intervals exist */ FmgrInfo *hashFunction; /* NULL if table is not distributed by hash */ + + /* pg_dist_shard_placement metadata */ + ShardPlacement **arrayOfPlacementArrays; + int *arrayOfPlacementArrayLengths; } DistTableCacheEntry; @@ -58,6 +62,7 @@ extern ShardInterval * LoadShardInterval(uint64 shardId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern int GetLocalGroupId(void); extern List * DistTableOidList(void); +extern List * ShardPlacementList(uint64 shardId); extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByShardId(int64 shardId);