Merge pull request #1113 from citusdata/feature/colocation_mapping

ShardInterval (improved) and ShardPlacement (new) caching.
pull/1112/head
Andres Freund 2017-01-10 18:31:15 -08:00 committed by GitHub
commit c4b18da0dd
5 changed files with 522 additions and 203 deletions

View File

@ -28,8 +28,8 @@
#include "catalog/pg_class.h"
#include "distributed/connection_cache.h"
#include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"

View File

@ -55,6 +55,8 @@
static uint64 * AllocateUint64(uint64 value);
static void RecordDistributedRelationDependencies(Oid distributedRelationId,
Node *distributionKey);
static ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc,
HeapTuple heapTuple);
/* exports for SQL callable functions */
@ -240,6 +242,24 @@ CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval)
}
/*
* CopyShardPlacement copies the values of the source placement into the
* target placement.
*/
void
CopyShardPlacement(ShardPlacement *srcPlacement, ShardPlacement *destPlacement)
{
/* first copy all by-value fields */
memcpy(destPlacement, srcPlacement, sizeof(ShardPlacement));
/* and then the fields pointing to external values */
if (srcPlacement->nodeName)
{
destPlacement->nodeName = pstrdup(srcPlacement->nodeName);
}
}
/*
* ShardLength finds shard placements for the given shardId, extracts the length
* of a finalized shard, and returns the shard's length. This function errors
@ -361,13 +381,17 @@ FinalizedShardPlacement(uint64 shardId, bool missingOk)
/*
* ShardPlacementList finds shard placements for the given shardId from system
* catalogs, converts these placements to their in-memory representation, and
* returns the converted shard placements in a new list.
* BuildShardPlacementList finds shard placements for the given shardId from
* system catalogs, converts these placements to their in-memory
* representation, and returns the converted shard placements in a new list.
*
* This probably only should be called from metadata_cache.c. Resides here
* because it shares code with other routines in this file.
*/
List *
ShardPlacementList(uint64 shardId)
BuildShardPlacementList(ShardInterval *shardInterval)
{
int64 shardId = shardInterval->shardId;
List *shardPlacementList = NIL;
Relation pgShardPlacement = NULL;
SysScanDesc scanDescriptor = NULL;
@ -399,13 +423,6 @@ ShardPlacementList(uint64 shardId)
systable_endscan(scanDescriptor);
heap_close(pgShardPlacement, AccessShareLock);
/* if no shard placements are found, warn the user */
if (shardPlacementList == NIL)
{
ereport(WARNING, (errmsg("could not find any shard placements for shardId "
UINT64_FORMAT, shardId)));
}
return shardPlacementList;
}
@ -415,7 +432,7 @@ ShardPlacementList(uint64 shardId)
* and converts this tuple to in-memory struct. The function assumes the
* caller already has locks on the tuple, and doesn't perform any locking.
*/
ShardPlacement *
static ShardPlacement *
TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
{
ShardPlacement *shardPlacement = NULL;

View File

@ -50,6 +50,33 @@
#include "utils/typcache.h"
/*
* ShardCacheEntry represents an entry in the shardId -> ShardInterval cache.
* To avoid duplicating data and invalidation logic between this cache and the
* DistTableCache, this only points into the DistTableCacheEntry of the
* shard's distributed table.
*/
typedef struct ShardCacheEntry
{
/* hash key, needs to be first */
int64 shardId;
/*
* Cache entry for the distributed table a shard belongs to, possibly not
* valid.
*/
DistTableCacheEntry *tableEntry;
/*
* Offset in tableEntry->sortedShardIntervalArray, only valid if
* tableEntry->isValid. We don't store pointers to the individual shard
* placements because that'd make invalidation a bit more complicated, and
* because there's simply no need.
*/
int shardIndex;
} ShardCacheEntry;
/* state which should be cleared upon DROP EXTENSION */
static bool extensionLoaded = false;
static Oid distShardRelationId = InvalidOid;
@ -74,6 +101,9 @@ static Oid extraDataContainerFuncId = InvalidOid;
/* Hash table for informations about each partition */
static HTAB *DistTableCacheHash = NULL;
/* Hash table for informations about each shard */
static HTAB *DistShardCacheHash = NULL;
/* Hash table for informations about worker nodes */
static HTAB *WorkerNodeHash = NULL;
static bool workerNodeHashValid = false;
@ -89,7 +119,10 @@ static ScanKeyData DistShardScanKey[1];
/* local function forward declarations */
static ShardCacheEntry * LookupShardCacheEntry(int64 shardId);
static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId);
static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
static void BuildCachedShardList(DistTableCacheEntry *cacheEntry);
static FmgrInfo * ShardIntervalCompareFunction(ShardInterval **shardIntervalArray,
char partitionMethod);
static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray,
@ -109,6 +142,7 @@ static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId);
static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId);
static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId);
static List * LookupDistShardTuples(Oid relationId);
static Oid LookupShardRelation(int64 shardId);
static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
Oid *intervalTypeId, int32 *intervalTypeMod);
static ShardInterval * TupleToShardInterval(HeapTuple heapTuple,
@ -179,55 +213,147 @@ DistributedTableList(void)
/*
* LoadShardInterval reads shard metadata for given shardId from pg_dist_shard,
* and converts min/max values in these metadata to their properly typed datum
* representations. The function then allocates a structure that stores the read
* and converted values, and returns this structure.
* LoadShardInterval returns the, cached, metadata about a shard.
*
* The return value is a copy of the cached ShardInterval struct and may
* therefore be modified and/or freed.
*/
ShardInterval *
LoadShardInterval(uint64 shardId)
{
ShardInterval *shardInterval;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
HeapTuple heapTuple = NULL;
Form_pg_dist_shard shardForm = NULL;
DistTableCacheEntry *partitionEntry;
Oid intervalTypeId = InvalidOid;
int32 intervalTypeMod = -1;
ShardInterval *shardInterval = NULL;
ShardInterval *sourceShardInterval = NULL;
ShardCacheEntry *shardEntry = NULL;
DistTableCacheEntry *tableEntry = NULL;
Relation pgDistShard = heap_open(DistShardRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard);
shardEntry = LookupShardCacheEntry(shardId);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
tableEntry = shardEntry->tableEntry;
scanDescriptor = systable_beginscan(pgDistShard,
DistShardShardidIndexId(), true,
NULL, scanKeyCount, scanKey);
Assert(tableEntry->isDistributedTable);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
/* the offset better be in a valid range */
Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength);
sourceShardInterval = tableEntry->sortedShardIntervalArray[shardEntry->shardIndex];
/* copy value to return */
shardInterval = (ShardInterval *) palloc0(sizeof(ShardInterval));
CopyShardInterval(sourceShardInterval, shardInterval);
return shardInterval;
}
/*
* ShardPlacementList returns the list of placements for the given shard from
* the cache.
*
* The returned list is deep copied from the cache and thus can be modified
* and pfree()d freely.
*/
List *
ShardPlacementList(uint64 shardId)
{
ShardCacheEntry *shardEntry = NULL;
DistTableCacheEntry *tableEntry = NULL;
ShardPlacement *placementArray = NULL;
int numberOfPlacements = 0;
List *placementList = NIL;
int i = 0;
shardEntry = LookupShardCacheEntry(shardId);
tableEntry = shardEntry->tableEntry;
/* the offset better be in a valid range */
Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength);
placementArray = tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex];
numberOfPlacements = tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex];
for (i = 0; i < numberOfPlacements; i++)
{
/* copy placement into target context */
ShardPlacement *placement = CitusMakeNode(ShardPlacement);
CopyShardPlacement(&placementArray[i], placement);
placementList = lappend(placementList, placement);
}
/* if no shard placements are found, warn the user */
if (numberOfPlacements == 0)
{
ereport(WARNING, (errmsg("could not find any shard placements for shardId "
UINT64_FORMAT, shardId)));
}
return placementList;
}
/*
* LookupShardCacheEntry returns the cache entry belonging to a shard, or
* errors out if that shard is unknown.
*/
static ShardCacheEntry *
LookupShardCacheEntry(int64 shardId)
{
ShardCacheEntry *shardEntry = NULL;
bool foundInCache = false;
bool recheck = false;
/* probably not reachable */
if (DistShardCacheHash == NULL)
{
InitializeDistTableCache();
}
/* lookup cache entry */
shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache);
if (!foundInCache)
{
/*
* A possible reason for not finding an entry in the cache is that the
* distributed table's cache entry hasn't been accessed. Thus look up
* the distributed table, and build the cache entry. Afterwards we
* know that the shard has to be in the cache if it exists. If the
* shard does *not* exist LookupShardRelation() will error out.
*/
Oid relationId = LookupShardRelation(shardId);
/* trigger building the cache for the shard id */
LookupDistTableCacheEntry(relationId);
recheck = true;
}
else if (!shardEntry->tableEntry->isValid)
{
/*
* The cache entry might not be valid right now. Reload cache entry
* and recheck (as the offset might have changed).
*/
LookupDistTableCacheEntry(shardEntry->tableEntry->relationId);
recheck = true;
}
/*
* If we (re-)loaded the table cache, re-search the shard cache - the
* shard index might have changed. If we still can't find the entry, it
* can't exist.
*/
if (recheck)
{
shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache);
if (!foundInCache)
{
ereport(ERROR, (errmsg("could not find valid entry for shard "
UINT64_FORMAT, shardId)));
}
}
shardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple);
partitionEntry = DistributedTableCacheEntry(shardForm->logicalrelid);
GetPartitionTypeInputInfo(partitionEntry->partitionKeyString,
partitionEntry->partitionMethod, &intervalTypeId,
&intervalTypeMod);
shardInterval = TupleToShardInterval(heapTuple, tupleDescriptor, intervalTypeId,
intervalTypeMod);
systable_endscan(scanDescriptor);
heap_close(pgDistShard, AccessShareLock);
return shardInterval;
return shardEntry;
}
@ -276,99 +402,172 @@ LookupDistTableCacheEntry(Oid relationId)
{
DistTableCacheEntry *cacheEntry = NULL;
bool foundInCache = false;
HeapTuple distPartitionTuple = NULL;
char *partitionKeyString = NULL;
char partitionMethod = 0;
uint32 colocationId = INVALID_COLOCATION_ID;
char replicationModel = 0;
List *distShardTupleList = NIL;
int shardIntervalArrayLength = 0;
ShardInterval **shardIntervalArray = NULL;
ShardInterval **sortedShardIntervalArray = NULL;
FmgrInfo *shardIntervalCompareFunction = NULL;
FmgrInfo *hashFunction = NULL;
bool hasUninitializedShardInterval = false;
bool hasUniformHashDistribution = false;
void *hashKey = (void *) &relationId;
Relation pgDistPartition = NULL;
if (DistTableCacheHash == NULL)
{
InitializeDistTableCache();
}
cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_FIND, &foundInCache);
cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_ENTER, &foundInCache);
/* return valid matches */
if ((cacheEntry != NULL) && (cacheEntry->isValid))
if (foundInCache)
{
if (cacheEntry->isValid)
{
return cacheEntry;
}
/* free the content of old, invalid, entries */
if (cacheEntry != NULL)
{
ResetDistTableCacheEntry(cacheEntry);
}
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
distPartitionTuple = LookupDistPartitionTuple(pgDistPartition, relationId);
if (distPartitionTuple != NULL)
/* zero out entry, but not the key part */
memset(((char *) cacheEntry) + sizeof(Oid), 0,
sizeof(DistTableCacheEntry) - sizeof(Oid));
/* actually fill out entry */
BuildDistTableCacheEntry(cacheEntry);
/* and finally mark as valid */
cacheEntry->isValid = true;
return cacheEntry;
}
/*
* BuildDistTableCacheEntry is a helper routine for
* LookupDistTableCacheEntry() for building the cache contents.
*/
static void
BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
{
Form_pg_dist_partition partitionForm =
(Form_pg_dist_partition) GETSTRUCT(distPartitionTuple);
HeapTuple distPartitionTuple = NULL;
Relation pgDistPartition = NULL;
Form_pg_dist_partition partitionForm = NULL;
Datum partitionKeyDatum = 0;
Datum replicationModelDatum = 0;
MemoryContext oldContext = NULL;
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
TupleDesc tupleDescriptor = NULL;
bool isNull = false;
bool partitionKeyIsNull = false;
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
distPartitionTuple =
LookupDistPartitionTuple(pgDistPartition, cacheEntry->relationId);
/* not a distributed table, done */
if (distPartitionTuple == NULL)
{
cacheEntry->isDistributedTable = false;
heap_close(pgDistPartition, NoLock);
return;
}
cacheEntry->isDistributedTable = true;
tupleDescriptor = RelationGetDescr(pgDistPartition);
partitionForm = (Form_pg_dist_partition) GETSTRUCT(distPartitionTuple);
cacheEntry->partitionMethod = partitionForm->partmethod;
partitionKeyDatum = heap_getattr(distPartitionTuple,
Anum_pg_dist_partition_partkey,
tupleDescriptor,
&partitionKeyIsNull);
colocationId = heap_getattr(distPartitionTuple,
Anum_pg_dist_partition_colocationid, tupleDescriptor,
/* note that for reference tables partitionKeyisNull is true */
if (!partitionKeyIsNull)
{
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
cacheEntry->partitionKeyString = TextDatumGetCString(partitionKeyDatum);
MemoryContextSwitchTo(oldContext);
}
else
{
cacheEntry->partitionKeyString = NULL;
}
cacheEntry->colocationId = heap_getattr(distPartitionTuple,
Anum_pg_dist_partition_colocationid,
tupleDescriptor,
&isNull);
if (isNull)
{
colocationId = INVALID_COLOCATION_ID;
cacheEntry->colocationId = INVALID_COLOCATION_ID;
}
replicationModelDatum = heap_getattr(distPartitionTuple,
Anum_pg_dist_partition_repmodel,
tupleDescriptor,
&isNull);
if (isNull)
{
/*
* repmodel is NOT NULL but before ALTER EXTENSION citus UPGRADE the column
* doesn't exist
*/
replicationModelDatum = CharGetDatum('c');
cacheEntry->replicationModel = 'c';
}
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
partitionMethod = partitionForm->partmethod;
replicationModel = DatumGetChar(replicationModelDatum);
/* note that for reference tables isNull becomes true */
if (!partitionKeyIsNull)
else
{
partitionKeyString = TextDatumGetCString(partitionKeyDatum);
cacheEntry->replicationModel = DatumGetChar(replicationModelDatum);
}
MemoryContextSwitchTo(oldContext);
heap_freetuple(distPartitionTuple);
BuildCachedShardList(cacheEntry);
/* we only need hash functions for hash distributed tables */
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH)
{
TypeCacheEntry *typeEntry = NULL;
Node *partitionNode = stringToNode(cacheEntry->partitionKeyString);
Var *partitionColumn = (Var *) partitionNode;
FmgrInfo *hashFunction = NULL;
Assert(IsA(partitionNode, Var));
typeEntry = lookup_type_cache(partitionColumn->vartype,
TYPECACHE_HASH_PROC_FINFO);
hashFunction = MemoryContextAllocZero(CacheMemoryContext,
sizeof(FmgrInfo));
fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo), CacheMemoryContext);
cacheEntry->hashFunction = hashFunction;
/* check the shard distribution for hash partitioned tables */
cacheEntry->hasUniformHashDistribution =
HasUniformHashDistribution(cacheEntry->sortedShardIntervalArray,
cacheEntry->shardIntervalArrayLength);
}
else
{
cacheEntry->hashFunction = NULL;
}
heap_close(pgDistPartition, NoLock);
}
distShardTupleList = LookupDistShardTuples(relationId);
/*
* BuildCachedShardList() is a helper routine for BuildDistTableCacheEntry()
* building up the list of shards in a distributed relation.
*/
static void
BuildCachedShardList(DistTableCacheEntry *cacheEntry)
{
ShardInterval **shardIntervalArray = NULL;
ShardInterval **sortedShardIntervalArray = NULL;
FmgrInfo *shardIntervalCompareFunction = NULL;
List *distShardTupleList = NIL;
int shardIntervalArrayLength = 0;
int shardIndex = 0;
distShardTupleList = LookupDistShardTuples(cacheEntry->relationId);
shardIntervalArrayLength = list_length(distShardTupleList);
if (shardIntervalArrayLength > 0)
{
@ -379,13 +578,24 @@ LookupDistTableCacheEntry(Oid relationId)
Oid intervalTypeId = InvalidOid;
int32 intervalTypeMod = -1;
GetPartitionTypeInputInfo(partitionKeyString, partitionMethod, &intervalTypeId,
GetPartitionTypeInputInfo(cacheEntry->partitionKeyString,
cacheEntry->partitionMethod,
&intervalTypeId,
&intervalTypeMod);
shardIntervalArray = MemoryContextAllocZero(CacheMemoryContext,
shardIntervalArrayLength *
sizeof(ShardInterval *));
cacheEntry->arrayOfPlacementArrays =
MemoryContextAllocZero(CacheMemoryContext,
shardIntervalArrayLength *
sizeof(ShardPlacement *));
cacheEntry->arrayOfPlacementArrayLengths =
MemoryContextAllocZero(CacheMemoryContext,
shardIntervalArrayLength *
sizeof(int));
foreach(distShardTupleCell, distShardTupleList)
{
HeapTuple shardTuple = lfirst(distShardTupleCell);
@ -411,7 +621,7 @@ LookupDistTableCacheEntry(Oid relationId)
}
/* decide and allocate interval comparison function */
if (partitionMethod == DISTRIBUTE_BY_NONE)
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
shardIntervalCompareFunction = NULL;
}
@ -422,16 +632,17 @@ LookupDistTableCacheEntry(Oid relationId)
/* allocate the comparison function in the cache context */
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
shardIntervalCompareFunction = ShardIntervalCompareFunction(shardIntervalArray,
partitionMethod);
shardIntervalCompareFunction =
ShardIntervalCompareFunction(shardIntervalArray,
cacheEntry->partitionMethod);
MemoryContextSwitchTo(oldContext);
}
/* reference tables has a single shard which is not initialized */
if (partitionMethod == DISTRIBUTE_BY_NONE)
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
hasUninitializedShardInterval = true;
cacheEntry->hasUninitializedShardInterval = true;
/*
* Note that during create_reference_table() call,
@ -439,7 +650,7 @@ LookupDistTableCacheEntry(Oid relationId)
*/
if (shardIntervalArrayLength > 1)
{
char *relationName = get_rel_name(relationId);
char *relationName = get_rel_name(cacheEntry->relationId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("reference table \"%s\" has more than 1 shard",
@ -457,60 +668,55 @@ LookupDistTableCacheEntry(Oid relationId)
shardIntervalCompareFunction);
/* check if there exists any shard intervals with no min/max values */
hasUninitializedShardInterval =
cacheEntry->hasUninitializedShardInterval =
HasUninitializedShardInterval(sortedShardIntervalArray,
shardIntervalArrayLength);
}
/* we only need hash functions for hash distributed tables */
if (partitionMethod == DISTRIBUTE_BY_HASH)
/* maintain shardId->(table,ShardInterval) cache */
for (shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++)
{
TypeCacheEntry *typeEntry = NULL;
Node *partitionNode = stringToNode(partitionKeyString);
Var *partitionColumn = (Var *) partitionNode;
Assert(IsA(partitionNode, Var));
typeEntry = lookup_type_cache(partitionColumn->vartype,
TYPECACHE_HASH_PROC_FINFO);
ShardCacheEntry *shardEntry = NULL;
ShardInterval *shardInterval = sortedShardIntervalArray[shardIndex];
bool foundInCache = false;
List *placementList = NIL;
MemoryContext oldContext = NULL;
ListCell *placementCell = NULL;
ShardPlacement *placementArray = NULL;
int placementOffset = 0;
int numberOfPlacements = 0;
hashFunction = MemoryContextAllocZero(CacheMemoryContext,
sizeof(FmgrInfo));
shardEntry = hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_ENTER,
&foundInCache);
Assert(!foundInCache);
shardEntry->shardIndex = shardIndex;
shardEntry->tableEntry = cacheEntry;
fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo), CacheMemoryContext);
/* build list of shard placements */
placementList = BuildShardPlacementList(shardInterval);
numberOfPlacements = list_length(placementList);
/* check the shard distribution for hash partitioned tables */
hasUniformHashDistribution =
HasUniformHashDistribution(sortedShardIntervalArray,
shardIntervalArrayLength);
/* and copy that list into the cache entry */
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
placementArray = palloc0(numberOfPlacements * sizeof(ShardPlacement));
foreach(placementCell, placementList)
{
ShardPlacement *srcPlacement = (ShardPlacement *) lfirst(placementCell);
CopyShardPlacement(srcPlacement, &placementArray[placementOffset]);
placementOffset++;
}
MemoryContextSwitchTo(oldContext);
cacheEntry->arrayOfPlacementArrays[shardIndex] = placementArray;
cacheEntry->arrayOfPlacementArrayLengths[shardIndex] = numberOfPlacements;
}
cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_ENTER, NULL);
/* zero out entry, but not the key part */
memset(((char *) cacheEntry) + sizeof(Oid), 0,
sizeof(DistTableCacheEntry) - sizeof(Oid));
if (distPartitionTuple == NULL)
{
cacheEntry->isValid = true;
cacheEntry->isDistributedTable = false;
}
else
{
cacheEntry->isValid = true;
cacheEntry->isDistributedTable = true;
cacheEntry->partitionKeyString = partitionKeyString;
cacheEntry->partitionMethod = partitionMethod;
cacheEntry->colocationId = colocationId;
cacheEntry->replicationModel = replicationModel;
cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength;
cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray;
cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction;
cacheEntry->hashFunction = hashFunction;
cacheEntry->hasUninitializedShardInterval = hasUninitializedShardInterval;
cacheEntry->hasUniformHashDistribution = hasUniformHashDistribution;
}
return cacheEntry;
}
@ -1249,7 +1455,7 @@ InitializeDistTableCache(void)
DistShardScanKey[0].sk_collation = InvalidOid;
DistShardScanKey[0].sk_attno = Anum_pg_dist_shard_logicalrelid;
/* initialize the hash table */
/* initialize the per-table hash table */
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.entrysize = sizeof(DistTableCacheEntry);
@ -1258,6 +1464,15 @@ InitializeDistTableCache(void)
hash_create("Distributed Relation Cache", 32, &info,
HASH_ELEM | HASH_FUNCTION);
/* initialize the per-shard hash table */
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(int64);
info.entrysize = sizeof(ShardCacheEntry);
info.hash = tag_hash;
DistShardCacheHash =
hash_create("Shard Cache", 32 * 64, &info,
HASH_ELEM | HASH_FUNCTION);
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback,
(Datum) 0);
@ -1475,21 +1690,63 @@ WorkerNodeHashCode(const void *key, Size keySize)
void
ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
{
int shardIndex = 0;
if (cacheEntry->partitionKeyString != NULL)
{
pfree(cacheEntry->partitionKeyString);
cacheEntry->partitionKeyString = NULL;
}
if (cacheEntry->shardIntervalArrayLength > 0)
if (cacheEntry->shardIntervalCompareFunction != NULL)
{
int i = 0;
pfree(cacheEntry->shardIntervalCompareFunction);
cacheEntry->shardIntervalCompareFunction = NULL;
}
for (i = 0; i < cacheEntry->shardIntervalArrayLength; i++)
if (cacheEntry->hashFunction)
{
ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[i];
pfree(cacheEntry->hashFunction);
cacheEntry->hashFunction = NULL;
}
if (cacheEntry->shardIntervalArrayLength == 0)
{
return;
}
for (shardIndex = 0; shardIndex < cacheEntry->shardIntervalArrayLength;
shardIndex++)
{
ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex];
ShardPlacement *placementArray = cacheEntry->arrayOfPlacementArrays[shardIndex];
int numberOfPlacements = cacheEntry->arrayOfPlacementArrayLengths[shardIndex];
bool valueByVal = shardInterval->valueByVal;
bool foundInCache = false;
int placementIndex = 0;
/* delete the shard's placements */
for (placementIndex = 0;
placementIndex < numberOfPlacements;
placementIndex++)
{
ShardPlacement *placement = &placementArray[placementIndex];
if (placement->nodeName)
{
pfree(placement->nodeName);
}
/* placement itself is deleted as part of the array */
}
pfree(placementArray);
/* delete per-shard cache-entry */
hash_search(DistShardCacheHash, &shardInterval->shardId, HASH_REMOVE,
&foundInCache);
Assert(foundInCache);
/* delete data pointed to by ShardInterval */
if (!valueByVal)
{
if (shardInterval->minValueExists)
@ -1503,29 +1760,29 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
}
}
/* and finally the ShardInterval itself */
pfree(shardInterval);
}
if (cacheEntry->sortedShardIntervalArray)
{
pfree(cacheEntry->sortedShardIntervalArray);
cacheEntry->sortedShardIntervalArray = NULL;
cacheEntry->shardIntervalArrayLength = 0;
}
if (cacheEntry->arrayOfPlacementArrayLengths)
{
pfree(cacheEntry->arrayOfPlacementArrayLengths);
cacheEntry->arrayOfPlacementArrayLengths = NULL;
}
if (cacheEntry->arrayOfPlacementArrays)
{
pfree(cacheEntry->arrayOfPlacementArrays);
cacheEntry->arrayOfPlacementArrays = NULL;
}
cacheEntry->shardIntervalArrayLength = 0;
cacheEntry->hasUninitializedShardInterval = false;
cacheEntry->hasUniformHashDistribution = false;
if (cacheEntry->shardIntervalCompareFunction != NULL)
{
pfree(cacheEntry->shardIntervalCompareFunction);
cacheEntry->shardIntervalCompareFunction = NULL;
}
/* we only allocated hash function for hash distributed tables */
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH)
{
pfree(cacheEntry->hashFunction);
cacheEntry->hashFunction = NULL;
}
}
}
@ -1741,6 +1998,46 @@ LookupDistShardTuples(Oid relationId)
}
/*
* LookupShardRelation returns the logical relation oid a shard belongs to.
*
* Errors out if the shardId does not exist.
*/
static Oid
LookupShardRelation(int64 shardId)
{
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
HeapTuple heapTuple = NULL;
Form_pg_dist_shard shardForm = NULL;
Relation pgDistShard = heap_open(DistShardRelationId(), AccessShareLock);
Oid relationId = InvalidOid;
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
scanDescriptor = systable_beginscan(pgDistShard,
DistShardShardidIndexId(), true,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for shard "
UINT64_FORMAT, shardId)));
}
shardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple);
relationId = shardForm->logicalrelid;
systable_endscan(scanDescriptor);
heap_close(pgDistShard, NoLock);
return relationId;
}
/*
* GetPartitionTypeInputInfo populates output parameters with the interval type
* identifier and modifier for the specified partition key/method combination.

View File

@ -62,13 +62,13 @@ extern List * LoadShardIntervalList(Oid relationId);
extern int ShardIntervalCount(Oid relationId);
extern List * LoadShardList(Oid relationId);
extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval);
extern void CopyShardPlacement(ShardPlacement *srcPlacement,
ShardPlacement *destPlacement);
extern uint64 ShardLength(uint64 shardId);
extern bool NodeHasActiveShardPlacements(char *nodeName, int32 nodePort);
extern List * FinalizedShardPlacementList(uint64 shardId);
extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk);
extern List * ShardPlacementList(uint64 shardId);
extern ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc,
HeapTuple heapTuple);
extern List * BuildShardPlacementList(ShardInterval *shardInterval);
/* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,

View File

@ -49,6 +49,10 @@ typedef struct
FmgrInfo *shardIntervalCompareFunction; /* NULL if no shard intervals exist */
FmgrInfo *hashFunction; /* NULL if table is not distributed by hash */
/* pg_dist_shard_placement metadata */
ShardPlacement **arrayOfPlacementArrays;
int *arrayOfPlacementArrayLengths;
} DistTableCacheEntry;
@ -58,6 +62,7 @@ extern ShardInterval * LoadShardInterval(uint64 shardId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern int GetLocalGroupId(void);
extern List * DistTableOidList(void);
extern List * ShardPlacementList(uint64 shardId);
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId);