mirror of https://github.com/citusdata/citus.git
Split DistTableCacheEntry() into separate functions.
Previously the function was getting too large. Thus this splits the function into separate parts for looking up the cache entry and building the cache contents.pull/1113/head
parent
8624ef5ac4
commit
f6e8647337
|
@ -90,6 +90,8 @@ static ScanKeyData DistShardScanKey[1];
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId);
|
static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId);
|
||||||
|
static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
|
||||||
|
static void BuildCachedShardList(DistTableCacheEntry *cacheEntry);
|
||||||
static FmgrInfo * ShardIntervalCompareFunction(ShardInterval **shardIntervalArray,
|
static FmgrInfo * ShardIntervalCompareFunction(ShardInterval **shardIntervalArray,
|
||||||
char partitionMethod);
|
char partitionMethod);
|
||||||
static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray,
|
static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray,
|
||||||
|
@ -276,99 +278,171 @@ LookupDistTableCacheEntry(Oid relationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry = NULL;
|
DistTableCacheEntry *cacheEntry = NULL;
|
||||||
bool foundInCache = false;
|
bool foundInCache = false;
|
||||||
HeapTuple distPartitionTuple = NULL;
|
|
||||||
char *partitionKeyString = NULL;
|
|
||||||
char partitionMethod = 0;
|
|
||||||
uint32 colocationId = INVALID_COLOCATION_ID;
|
|
||||||
char replicationModel = 0;
|
|
||||||
List *distShardTupleList = NIL;
|
|
||||||
int shardIntervalArrayLength = 0;
|
|
||||||
ShardInterval **shardIntervalArray = NULL;
|
|
||||||
ShardInterval **sortedShardIntervalArray = NULL;
|
|
||||||
FmgrInfo *shardIntervalCompareFunction = NULL;
|
|
||||||
FmgrInfo *hashFunction = NULL;
|
|
||||||
bool hasUninitializedShardInterval = false;
|
|
||||||
bool hasUniformHashDistribution = false;
|
|
||||||
void *hashKey = (void *) &relationId;
|
void *hashKey = (void *) &relationId;
|
||||||
Relation pgDistPartition = NULL;
|
|
||||||
|
|
||||||
if (DistTableCacheHash == NULL)
|
if (DistTableCacheHash == NULL)
|
||||||
{
|
{
|
||||||
InitializeDistTableCache();
|
InitializeDistTableCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_FIND, &foundInCache);
|
cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_ENTER, &foundInCache);
|
||||||
|
|
||||||
/* return valid matches */
|
/* return valid matches */
|
||||||
if ((cacheEntry != NULL) && (cacheEntry->isValid))
|
if (foundInCache)
|
||||||
|
{
|
||||||
|
if (cacheEntry->isValid)
|
||||||
{
|
{
|
||||||
return cacheEntry;
|
return cacheEntry;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* free the content of old, invalid, entries */
|
/* free the content of old, invalid, entries */
|
||||||
if (cacheEntry != NULL)
|
|
||||||
{
|
|
||||||
ResetDistTableCacheEntry(cacheEntry);
|
ResetDistTableCacheEntry(cacheEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
|
/* zero out entry, but not the key part */
|
||||||
distPartitionTuple = LookupDistPartitionTuple(pgDistPartition, relationId);
|
memset(((char *) cacheEntry) + sizeof(Oid), 0,
|
||||||
if (distPartitionTuple != NULL)
|
sizeof(DistTableCacheEntry) - sizeof(Oid));
|
||||||
|
|
||||||
|
/* actually fill out entry */
|
||||||
|
BuildDistTableCacheEntry(cacheEntry);
|
||||||
|
|
||||||
|
/* and finally mark as valid */
|
||||||
|
cacheEntry->isValid = true;
|
||||||
|
|
||||||
|
return cacheEntry;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BuildDistTableCacheEntry is a helper routine for
|
||||||
|
* LookupDistTableCacheEntry() for building the cache contents.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
|
||||||
{
|
{
|
||||||
Form_pg_dist_partition partitionForm =
|
HeapTuple distPartitionTuple = NULL;
|
||||||
(Form_pg_dist_partition) GETSTRUCT(distPartitionTuple);
|
Relation pgDistPartition = NULL;
|
||||||
|
Form_pg_dist_partition partitionForm = NULL;
|
||||||
Datum partitionKeyDatum = 0;
|
Datum partitionKeyDatum = 0;
|
||||||
Datum replicationModelDatum = 0;
|
Datum replicationModelDatum = 0;
|
||||||
MemoryContext oldContext = NULL;
|
MemoryContext oldContext = NULL;
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
TupleDesc tupleDescriptor = NULL;
|
||||||
bool isNull = false;
|
bool isNull = false;
|
||||||
bool partitionKeyIsNull = false;
|
bool partitionKeyIsNull = false;
|
||||||
|
|
||||||
|
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
|
||||||
|
distPartitionTuple =
|
||||||
|
LookupDistPartitionTuple(pgDistPartition, cacheEntry->relationId);
|
||||||
|
|
||||||
|
/* not a distributed table, done */
|
||||||
|
if (distPartitionTuple == NULL)
|
||||||
|
{
|
||||||
|
cacheEntry->isDistributedTable = false;
|
||||||
|
heap_close(pgDistPartition, NoLock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheEntry->isDistributedTable = true;
|
||||||
|
|
||||||
|
tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
|
partitionForm = (Form_pg_dist_partition) GETSTRUCT(distPartitionTuple);
|
||||||
|
|
||||||
|
cacheEntry->partitionMethod = partitionForm->partmethod;
|
||||||
|
|
||||||
partitionKeyDatum = heap_getattr(distPartitionTuple,
|
partitionKeyDatum = heap_getattr(distPartitionTuple,
|
||||||
Anum_pg_dist_partition_partkey,
|
Anum_pg_dist_partition_partkey,
|
||||||
tupleDescriptor,
|
tupleDescriptor,
|
||||||
&partitionKeyIsNull);
|
&partitionKeyIsNull);
|
||||||
|
|
||||||
colocationId = heap_getattr(distPartitionTuple,
|
/* note that for reference tables partitionKeyisNull is true */
|
||||||
Anum_pg_dist_partition_colocationid, tupleDescriptor,
|
if (!partitionKeyIsNull)
|
||||||
|
{
|
||||||
|
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
|
||||||
|
cacheEntry->partitionKeyString = TextDatumGetCString(partitionKeyDatum);
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
cacheEntry->partitionKeyString = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheEntry->colocationId = heap_getattr(distPartitionTuple,
|
||||||
|
Anum_pg_dist_partition_colocationid,
|
||||||
|
tupleDescriptor,
|
||||||
&isNull);
|
&isNull);
|
||||||
if (isNull)
|
if (isNull)
|
||||||
{
|
{
|
||||||
colocationId = INVALID_COLOCATION_ID;
|
cacheEntry->colocationId = INVALID_COLOCATION_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
replicationModelDatum = heap_getattr(distPartitionTuple,
|
replicationModelDatum = heap_getattr(distPartitionTuple,
|
||||||
Anum_pg_dist_partition_repmodel,
|
Anum_pg_dist_partition_repmodel,
|
||||||
tupleDescriptor,
|
tupleDescriptor,
|
||||||
&isNull);
|
&isNull);
|
||||||
|
|
||||||
if (isNull)
|
if (isNull)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* repmodel is NOT NULL but before ALTER EXTENSION citus UPGRADE the column
|
* repmodel is NOT NULL but before ALTER EXTENSION citus UPGRADE the column
|
||||||
* doesn't exist
|
* doesn't exist
|
||||||
*/
|
*/
|
||||||
replicationModelDatum = CharGetDatum('c');
|
cacheEntry->replicationModel = 'c';
|
||||||
}
|
}
|
||||||
|
else
|
||||||
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
|
|
||||||
partitionMethod = partitionForm->partmethod;
|
|
||||||
replicationModel = DatumGetChar(replicationModelDatum);
|
|
||||||
|
|
||||||
/* note that for reference tables isNull becomes true */
|
|
||||||
if (!partitionKeyIsNull)
|
|
||||||
{
|
{
|
||||||
partitionKeyString = TextDatumGetCString(partitionKeyDatum);
|
cacheEntry->replicationModel = DatumGetChar(replicationModelDatum);
|
||||||
}
|
}
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
|
||||||
|
|
||||||
heap_freetuple(distPartitionTuple);
|
heap_freetuple(distPartitionTuple);
|
||||||
|
|
||||||
|
BuildCachedShardList(cacheEntry);
|
||||||
|
|
||||||
|
/* we only need hash functions for hash distributed tables */
|
||||||
|
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH)
|
||||||
|
{
|
||||||
|
TypeCacheEntry *typeEntry = NULL;
|
||||||
|
Node *partitionNode = stringToNode(cacheEntry->partitionKeyString);
|
||||||
|
Var *partitionColumn = (Var *) partitionNode;
|
||||||
|
FmgrInfo *hashFunction = NULL;
|
||||||
|
|
||||||
|
Assert(IsA(partitionNode, Var));
|
||||||
|
typeEntry = lookup_type_cache(partitionColumn->vartype,
|
||||||
|
TYPECACHE_HASH_PROC_FINFO);
|
||||||
|
|
||||||
|
hashFunction = MemoryContextAllocZero(CacheMemoryContext,
|
||||||
|
sizeof(FmgrInfo));
|
||||||
|
|
||||||
|
fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo), CacheMemoryContext);
|
||||||
|
|
||||||
|
cacheEntry->hashFunction = hashFunction;
|
||||||
|
|
||||||
|
/* check the shard distribution for hash partitioned tables */
|
||||||
|
cacheEntry->hasUniformHashDistribution =
|
||||||
|
HasUniformHashDistribution(cacheEntry->sortedShardIntervalArray,
|
||||||
|
cacheEntry->shardIntervalArrayLength);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
cacheEntry->hashFunction = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
heap_close(pgDistPartition, NoLock);
|
heap_close(pgDistPartition, NoLock);
|
||||||
|
}
|
||||||
|
|
||||||
distShardTupleList = LookupDistShardTuples(relationId);
|
|
||||||
|
/*
|
||||||
|
* BuildCachedShardList() is a helper routine for BuildDistTableCacheEntry()
|
||||||
|
* building up the list of shards in a distributed relation.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
BuildCachedShardList(DistTableCacheEntry *cacheEntry)
|
||||||
|
{
|
||||||
|
ShardInterval **shardIntervalArray = NULL;
|
||||||
|
ShardInterval **sortedShardIntervalArray = NULL;
|
||||||
|
FmgrInfo *shardIntervalCompareFunction = NULL;
|
||||||
|
List *distShardTupleList = NIL;
|
||||||
|
int shardIntervalArrayLength = 0;
|
||||||
|
|
||||||
|
distShardTupleList = LookupDistShardTuples(cacheEntry->relationId);
|
||||||
shardIntervalArrayLength = list_length(distShardTupleList);
|
shardIntervalArrayLength = list_length(distShardTupleList);
|
||||||
if (shardIntervalArrayLength > 0)
|
if (shardIntervalArrayLength > 0)
|
||||||
{
|
{
|
||||||
|
@ -379,7 +453,9 @@ LookupDistTableCacheEntry(Oid relationId)
|
||||||
Oid intervalTypeId = InvalidOid;
|
Oid intervalTypeId = InvalidOid;
|
||||||
int32 intervalTypeMod = -1;
|
int32 intervalTypeMod = -1;
|
||||||
|
|
||||||
GetPartitionTypeInputInfo(partitionKeyString, partitionMethod, &intervalTypeId,
|
GetPartitionTypeInputInfo(cacheEntry->partitionKeyString,
|
||||||
|
cacheEntry->partitionMethod,
|
||||||
|
&intervalTypeId,
|
||||||
&intervalTypeMod);
|
&intervalTypeMod);
|
||||||
|
|
||||||
shardIntervalArray = MemoryContextAllocZero(CacheMemoryContext,
|
shardIntervalArray = MemoryContextAllocZero(CacheMemoryContext,
|
||||||
|
@ -411,7 +487,7 @@ LookupDistTableCacheEntry(Oid relationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* decide and allocate interval comparison function */
|
/* decide and allocate interval comparison function */
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
shardIntervalCompareFunction = NULL;
|
shardIntervalCompareFunction = NULL;
|
||||||
}
|
}
|
||||||
|
@ -422,16 +498,17 @@ LookupDistTableCacheEntry(Oid relationId)
|
||||||
/* allocate the comparison function in the cache context */
|
/* allocate the comparison function in the cache context */
|
||||||
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
|
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
|
||||||
|
|
||||||
shardIntervalCompareFunction = ShardIntervalCompareFunction(shardIntervalArray,
|
shardIntervalCompareFunction =
|
||||||
partitionMethod);
|
ShardIntervalCompareFunction(shardIntervalArray,
|
||||||
|
cacheEntry->partitionMethod);
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* reference tables has a single shard which is not initialized */
|
/* reference tables has a single shard which is not initialized */
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
hasUninitializedShardInterval = true;
|
cacheEntry->hasUninitializedShardInterval = true;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Note that during create_reference_table() call,
|
* Note that during create_reference_table() call,
|
||||||
|
@ -439,7 +516,7 @@ LookupDistTableCacheEntry(Oid relationId)
|
||||||
*/
|
*/
|
||||||
if (shardIntervalArrayLength > 1)
|
if (shardIntervalArrayLength > 1)
|
||||||
{
|
{
|
||||||
char *relationName = get_rel_name(relationId);
|
char *relationName = get_rel_name(cacheEntry->relationId);
|
||||||
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
errmsg("reference table \"%s\" has more than 1 shard",
|
errmsg("reference table \"%s\" has more than 1 shard",
|
||||||
|
@ -457,60 +534,14 @@ LookupDistTableCacheEntry(Oid relationId)
|
||||||
shardIntervalCompareFunction);
|
shardIntervalCompareFunction);
|
||||||
|
|
||||||
/* check if there exists any shard intervals with no min/max values */
|
/* check if there exists any shard intervals with no min/max values */
|
||||||
hasUninitializedShardInterval =
|
cacheEntry->hasUninitializedShardInterval =
|
||||||
HasUninitializedShardInterval(sortedShardIntervalArray,
|
HasUninitializedShardInterval(sortedShardIntervalArray,
|
||||||
shardIntervalArrayLength);
|
shardIntervalArrayLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we only need hash functions for hash distributed tables */
|
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
|
||||||
{
|
|
||||||
TypeCacheEntry *typeEntry = NULL;
|
|
||||||
Node *partitionNode = stringToNode(partitionKeyString);
|
|
||||||
Var *partitionColumn = (Var *) partitionNode;
|
|
||||||
Assert(IsA(partitionNode, Var));
|
|
||||||
typeEntry = lookup_type_cache(partitionColumn->vartype,
|
|
||||||
TYPECACHE_HASH_PROC_FINFO);
|
|
||||||
|
|
||||||
hashFunction = MemoryContextAllocZero(CacheMemoryContext,
|
|
||||||
sizeof(FmgrInfo));
|
|
||||||
|
|
||||||
fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo), CacheMemoryContext);
|
|
||||||
|
|
||||||
/* check the shard distribution for hash partitioned tables */
|
|
||||||
hasUniformHashDistribution =
|
|
||||||
HasUniformHashDistribution(sortedShardIntervalArray,
|
|
||||||
shardIntervalArrayLength);
|
|
||||||
}
|
|
||||||
|
|
||||||
cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_ENTER, NULL);
|
|
||||||
|
|
||||||
/* zero out entry, but not the key part */
|
|
||||||
memset(((char *) cacheEntry) + sizeof(Oid), 0,
|
|
||||||
sizeof(DistTableCacheEntry) - sizeof(Oid));
|
|
||||||
|
|
||||||
if (distPartitionTuple == NULL)
|
|
||||||
{
|
|
||||||
cacheEntry->isValid = true;
|
|
||||||
cacheEntry->isDistributedTable = false;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
cacheEntry->isValid = true;
|
|
||||||
cacheEntry->isDistributedTable = true;
|
|
||||||
cacheEntry->partitionKeyString = partitionKeyString;
|
|
||||||
cacheEntry->partitionMethod = partitionMethod;
|
|
||||||
cacheEntry->colocationId = colocationId;
|
|
||||||
cacheEntry->replicationModel = replicationModel;
|
|
||||||
cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength;
|
cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength;
|
||||||
cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray;
|
cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray;
|
||||||
cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction;
|
cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction;
|
||||||
cacheEntry->hashFunction = hashFunction;
|
|
||||||
cacheEntry->hasUninitializedShardInterval = hasUninitializedShardInterval;
|
|
||||||
cacheEntry->hasUniformHashDistribution = hasUniformHashDistribution;
|
|
||||||
}
|
|
||||||
|
|
||||||
return cacheEntry;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue