Cache ShardPlacements in metadata cache.

So far we've reloaded them frequently. Besides avoiding that cost -
noticeable for some workloads with large shard counts - it makes it
easier to add information to ShardPlacements that help us make
placement_connection.c colocation aware.
pull/1113/head
Andres Freund 2017-01-10 02:03:00 -08:00
parent 8cb47195ba
commit b813b39241
5 changed files with 194 additions and 57 deletions

View File

@ -28,8 +28,8 @@
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"

View File

@ -55,6 +55,8 @@
static uint64 * AllocateUint64(uint64 value); static uint64 * AllocateUint64(uint64 value);
static void RecordDistributedRelationDependencies(Oid distributedRelationId, static void RecordDistributedRelationDependencies(Oid distributedRelationId,
Node *distributionKey); Node *distributionKey);
static ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc,
HeapTuple heapTuple);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
@ -240,6 +242,24 @@ CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval)
} }
/*
* CopyShardPlacement copies the values of the source placement into the
* target placement.
*/
void
CopyShardPlacement(ShardPlacement *srcPlacement, ShardPlacement *destPlacement)
{
/* first copy all by-value fields */
memcpy(destPlacement, srcPlacement, sizeof(ShardPlacement));
/* and then the fields pointing to external values */
if (srcPlacement->nodeName)
{
destPlacement->nodeName = pstrdup(srcPlacement->nodeName);
}
}
/* /*
* ShardLength finds shard placements for the given shardId, extracts the length * ShardLength finds shard placements for the given shardId, extracts the length
* of a finalized shard, and returns the shard's length. This function errors * of a finalized shard, and returns the shard's length. This function errors
@ -361,13 +381,17 @@ FinalizedShardPlacement(uint64 shardId, bool missingOk)
/* /*
* ShardPlacementList finds shard placements for the given shardId from system * BuildShardPlacementList finds shard placements for the given shardId from
* catalogs, converts these placements to their in-memory representation, and * system catalogs, converts these placements to their in-memory
* returns the converted shard placements in a new list. * representation, and returns the converted shard placements in a new list.
*
* This probably only should be called from metadata_cache.c. Resides here
* because it shares code with other routines in this file.
*/ */
List * List *
ShardPlacementList(uint64 shardId) BuildShardPlacementList(ShardInterval *shardInterval)
{ {
int64 shardId = shardInterval->shardId;
List *shardPlacementList = NIL; List *shardPlacementList = NIL;
Relation pgShardPlacement = NULL; Relation pgShardPlacement = NULL;
SysScanDesc scanDescriptor = NULL; SysScanDesc scanDescriptor = NULL;
@ -399,13 +423,6 @@ ShardPlacementList(uint64 shardId)
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
heap_close(pgShardPlacement, AccessShareLock); heap_close(pgShardPlacement, AccessShareLock);
/* if no shard placements are found, warn the user */
if (shardPlacementList == NIL)
{
ereport(WARNING, (errmsg("could not find any shard placements for shardId "
UINT64_FORMAT, shardId)));
}
return shardPlacementList; return shardPlacementList;
} }
@ -415,7 +432,7 @@ ShardPlacementList(uint64 shardId)
* and converts this tuple to in-memory struct. The function assumes the * and converts this tuple to in-memory struct. The function assumes the
* caller already has locks on the tuple, and doesn't perform any locking. * caller already has locks on the tuple, and doesn't perform any locking.
*/ */
ShardPlacement * static ShardPlacement *
TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
{ {
ShardPlacement *shardPlacement = NULL; ShardPlacement *shardPlacement = NULL;

View File

@ -245,6 +245,52 @@ LoadShardInterval(uint64 shardId)
} }
/*
* ShardPlacementList returns the list of placements for the given shard from
* the cache.
*
* The returned list is deep copied from the cache and thus can be modified
* and pfree()d freely.
*/
List *
ShardPlacementList(uint64 shardId)
{
ShardCacheEntry *shardEntry = NULL;
DistTableCacheEntry *tableEntry = NULL;
ShardPlacement *placementArray = NULL;
int numberOfPlacements = 0;
List *placementList = NIL;
int i = 0;
shardEntry = LookupShardCacheEntry(shardId);
tableEntry = shardEntry->tableEntry;
/* the offset better be in a valid range */
Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength);
placementArray = tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex];
numberOfPlacements = tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex];
for (i = 0; i < numberOfPlacements; i++)
{
/* copy placement into target context */
ShardPlacement *placement = CitusMakeNode(ShardPlacement);
CopyShardPlacement(&placementArray[i], placement);
placementList = lappend(placementList, placement);
}
/* if no shard placements are found, warn the user */
if (numberOfPlacements == 0)
{
ereport(WARNING, (errmsg("could not find any shard placements for shardId "
UINT64_FORMAT, shardId)));
}
return placementList;
}
/* /*
* LookupShardCacheEntry returns the cache entry belonging to a shard, or * LookupShardCacheEntry returns the cache entry belonging to a shard, or
* errors out if that shard is unknown. * errors out if that shard is unknown.
@ -541,6 +587,15 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
shardIntervalArrayLength * shardIntervalArrayLength *
sizeof(ShardInterval *)); sizeof(ShardInterval *));
cacheEntry->arrayOfPlacementArrays =
MemoryContextAllocZero(CacheMemoryContext,
shardIntervalArrayLength *
sizeof(ShardPlacement *));
cacheEntry->arrayOfPlacementArrayLengths =
MemoryContextAllocZero(CacheMemoryContext,
shardIntervalArrayLength *
sizeof(int));
foreach(distShardTupleCell, distShardTupleList) foreach(distShardTupleCell, distShardTupleList)
{ {
HeapTuple shardTuple = lfirst(distShardTupleCell); HeapTuple shardTuple = lfirst(distShardTupleCell);
@ -625,12 +680,38 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
ShardCacheEntry *shardEntry = NULL; ShardCacheEntry *shardEntry = NULL;
ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex]; ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex];
bool foundInCache = false; bool foundInCache = false;
List *placementList = NIL;
MemoryContext oldContext = NULL;
ListCell *placementCell = NULL;
ShardPlacement *placementArray = NULL;
int placementOffset = 0;
int numberOfPlacements = 0;
shardEntry = hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_ENTER, shardEntry = hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_ENTER,
&foundInCache); &foundInCache);
Assert(!foundInCache); Assert(!foundInCache);
shardEntry->shardIndex = shardIndex; shardEntry->shardIndex = shardIndex;
shardEntry->tableEntry = cacheEntry; shardEntry->tableEntry = cacheEntry;
/* build list of shard placements */
placementList = BuildShardPlacementList(shardInterval);
numberOfPlacements = list_length(placementList);
/* and copy that list into the cache entry */
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
placementArray = palloc0(numberOfPlacements * sizeof(ShardPlacement));
foreach(placementCell, placementList)
{
ShardPlacement *srcPlacement = (ShardPlacement *) lfirst(placementCell);
CopyShardPlacement(srcPlacement, &placementArray[placementOffset]);
placementOffset++;
}
MemoryContextSwitchTo(oldContext);
cacheEntry->arrayOfPlacementArrays[shardIndex] = placementArray;
cacheEntry->arrayOfPlacementArrayLengths[shardIndex] = numberOfPlacements;
} }
cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength;
@ -1609,65 +1690,99 @@ WorkerNodeHashCode(const void *key, Size keySize)
void void
ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry) ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
{ {
int shardIndex = 0;
if (cacheEntry->partitionKeyString != NULL) if (cacheEntry->partitionKeyString != NULL)
{ {
pfree(cacheEntry->partitionKeyString); pfree(cacheEntry->partitionKeyString);
cacheEntry->partitionKeyString = NULL; cacheEntry->partitionKeyString = NULL;
} }
if (cacheEntry->shardIntervalArrayLength > 0) if (cacheEntry->shardIntervalCompareFunction != NULL)
{ {
int i = 0; pfree(cacheEntry->shardIntervalCompareFunction);
cacheEntry->shardIntervalCompareFunction = NULL;
}
for (i = 0; i < cacheEntry->shardIntervalArrayLength; i++) if (cacheEntry->hashFunction)
{
pfree(cacheEntry->hashFunction);
cacheEntry->hashFunction = NULL;
}
if (cacheEntry->shardIntervalArrayLength == 0)
{
return;
}
for (shardIndex = 0; shardIndex < cacheEntry->shardIntervalArrayLength;
shardIndex++)
{
ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex];
ShardPlacement *placementArray = cacheEntry->arrayOfPlacementArrays[shardIndex];
int numberOfPlacements = cacheEntry->arrayOfPlacementArrayLengths[shardIndex];
bool valueByVal = shardInterval->valueByVal;
bool foundInCache = false;
int placementIndex = 0;
/* delete the shard's placements */
for (placementIndex = 0;
placementIndex < numberOfPlacements;
placementIndex++)
{ {
ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[i]; ShardPlacement *placement = &placementArray[placementIndex];
bool valueByVal = shardInterval->valueByVal;
bool foundInCache = false;
/* delete per-shard cache-entry */ if (placement->nodeName)
hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_REMOVE,
&foundInCache);
Assert(foundInCache);
/* delete data pointed to by ShardInterval */
if (!valueByVal)
{ {
if (shardInterval->minValueExists) pfree(placement->nodeName);
{
pfree(DatumGetPointer(shardInterval->minValue));
}
if (shardInterval->maxValueExists)
{
pfree(DatumGetPointer(shardInterval->maxValue));
}
} }
/* and finally the ShardInterval itself */ /* placement itself is deleted as part of the array */
pfree(shardInterval); }
pfree(placementArray);
/* delete per-shard cache-entry */
hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_REMOVE,
&foundInCache);
Assert(foundInCache);
/* delete data pointed to by ShardInterval */
if (!valueByVal)
{
if (shardInterval->minValueExists)
{
pfree(DatumGetPointer(shardInterval->minValue));
}
if (shardInterval->maxValueExists)
{
pfree(DatumGetPointer(shardInterval->maxValue));
}
} }
/* and finally the ShardInterval itself */
pfree(shardInterval);
}
if (cacheEntry->sortedShardIntervalArray)
{
pfree(cacheEntry->sortedShardIntervalArray); pfree(cacheEntry->sortedShardIntervalArray);
cacheEntry->sortedShardIntervalArray = NULL; cacheEntry->sortedShardIntervalArray = NULL;
cacheEntry->shardIntervalArrayLength = 0;
cacheEntry->hasUninitializedShardInterval = false;
cacheEntry->hasUniformHashDistribution = false;
if (cacheEntry->shardIntervalCompareFunction != NULL)
{
pfree(cacheEntry->shardIntervalCompareFunction);
cacheEntry->shardIntervalCompareFunction = NULL;
}
/* we only allocated hash function for hash distributed tables */
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH)
{
pfree(cacheEntry->hashFunction);
cacheEntry->hashFunction = NULL;
}
} }
if (cacheEntry->arrayOfPlacementArrayLengths)
{
pfree(cacheEntry->arrayOfPlacementArrayLengths);
cacheEntry->arrayOfPlacementArrayLengths = NULL;
}
if (cacheEntry->arrayOfPlacementArrays)
{
pfree(cacheEntry->arrayOfPlacementArrays);
cacheEntry->arrayOfPlacementArrays = NULL;
}
cacheEntry->shardIntervalArrayLength = 0;
cacheEntry->hasUninitializedShardInterval = false;
cacheEntry->hasUniformHashDistribution = false;
} }

View File

@ -62,13 +62,13 @@ extern List * LoadShardIntervalList(Oid relationId);
extern int ShardIntervalCount(Oid relationId); extern int ShardIntervalCount(Oid relationId);
extern List * LoadShardList(Oid relationId); extern List * LoadShardList(Oid relationId);
extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval); extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval);
extern void CopyShardPlacement(ShardPlacement *srcPlacement,
ShardPlacement *destPlacement);
extern uint64 ShardLength(uint64 shardId); extern uint64 ShardLength(uint64 shardId);
extern bool NodeHasActiveShardPlacements(char *nodeName, int32 nodePort); extern bool NodeHasActiveShardPlacements(char *nodeName, int32 nodePort);
extern List * FinalizedShardPlacementList(uint64 shardId); extern List * FinalizedShardPlacementList(uint64 shardId);
extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk); extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk);
extern List * ShardPlacementList(uint64 shardId); extern List * BuildShardPlacementList(ShardInterval *shardInterval);
extern ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc,
HeapTuple heapTuple);
/* Function declarations to modify shard and shard placement data */ /* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,

View File

@ -49,6 +49,10 @@ typedef struct
FmgrInfo *shardIntervalCompareFunction; /* NULL if no shard intervals exist */ FmgrInfo *shardIntervalCompareFunction; /* NULL if no shard intervals exist */
FmgrInfo *hashFunction; /* NULL if table is not distributed by hash */ FmgrInfo *hashFunction; /* NULL if table is not distributed by hash */
/* pg_dist_shard_placement metadata */
ShardPlacement **arrayOfPlacementArrays;
int *arrayOfPlacementArrayLengths;
} DistTableCacheEntry; } DistTableCacheEntry;
@ -58,6 +62,7 @@ extern ShardInterval * LoadShardInterval(uint64 shardId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern int GetLocalGroupId(void); extern int GetLocalGroupId(void);
extern List * DistTableOidList(void); extern List * DistTableOidList(void);
extern List * ShardPlacementList(uint64 shardId);
extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId); extern void CitusInvalidateRelcacheByShardId(int64 shardId);