mirror of https://github.com/citusdata/citus.git
Fix DistShardCacheHash initialization
InitializeCaches() method may prematurely set performedInitialization without actually creating DistShardCacheHash. Fix makes sure flag is set only if DistShardCacheHash is created successfully. Also introduced a new memory context to allocate aforementioned hash tables. If allocation/initialization fails for any reason we make sure memory is reclaimed by deleting the memory context.pull/2703/head
parent
986ef6651a
commit
3fe482adbc
|
@ -60,6 +60,7 @@
|
|||
#include "utils/inval.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/palloc.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/relfilenodemap.h"
|
||||
|
@ -155,6 +156,7 @@ static HTAB *DistTableCacheHash = NULL;
|
|||
|
||||
/* Hash table for informations about each shard */
|
||||
static HTAB *DistShardCacheHash = NULL;
|
||||
static MemoryContext MetadataCacheMemoryContext = NULL;
|
||||
|
||||
/* Hash table for informations about worker nodes */
|
||||
static HTAB *WorkerNodeHash = NULL;
|
||||
|
@ -927,7 +929,7 @@ BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
|
|||
{
|
||||
Node *partitionNode = NULL;
|
||||
|
||||
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
|
||||
oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
|
||||
|
||||
/* get the string representation of the partition column Var */
|
||||
cacheEntry->partitionKeyString = TextDatumGetCString(partitionKeyDatum);
|
||||
|
@ -979,10 +981,11 @@ BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
|
|||
typeEntry = lookup_type_cache(partitionColumn->vartype,
|
||||
TYPECACHE_HASH_PROC_FINFO);
|
||||
|
||||
hashFunction = MemoryContextAllocZero(CacheMemoryContext,
|
||||
hashFunction = MemoryContextAllocZero(MetadataCacheMemoryContext,
|
||||
sizeof(FmgrInfo));
|
||||
|
||||
fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo), CacheMemoryContext);
|
||||
fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo),
|
||||
MetadataCacheMemoryContext);
|
||||
|
||||
cacheEntry->hashFunction = hashFunction;
|
||||
|
||||
|
@ -996,7 +999,7 @@ BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
|
|||
cacheEntry->hashFunction = NULL;
|
||||
}
|
||||
|
||||
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
|
||||
oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
|
||||
|
||||
cacheEntry->referencedRelationsViaForeignKey = ReferencedRelationIdList(
|
||||
cacheEntry->relationId);
|
||||
|
@ -1044,16 +1047,16 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
|
|||
ListCell *distShardTupleCell = NULL;
|
||||
int arrayIndex = 0;
|
||||
|
||||
shardIntervalArray = MemoryContextAllocZero(CacheMemoryContext,
|
||||
shardIntervalArray = MemoryContextAllocZero(MetadataCacheMemoryContext,
|
||||
shardIntervalArrayLength *
|
||||
sizeof(ShardInterval *));
|
||||
|
||||
cacheEntry->arrayOfPlacementArrays =
|
||||
MemoryContextAllocZero(CacheMemoryContext,
|
||||
MemoryContextAllocZero(MetadataCacheMemoryContext,
|
||||
shardIntervalArrayLength *
|
||||
sizeof(GroupShardPlacement *));
|
||||
cacheEntry->arrayOfPlacementArrayLengths =
|
||||
MemoryContextAllocZero(CacheMemoryContext,
|
||||
MemoryContextAllocZero(MetadataCacheMemoryContext,
|
||||
shardIntervalArrayLength *
|
||||
sizeof(int));
|
||||
|
||||
|
@ -1065,7 +1068,7 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
|
|||
intervalTypeId,
|
||||
intervalTypeMod);
|
||||
ShardInterval *newShardInterval = NULL;
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(CacheMemoryContext);
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
|
||||
|
||||
newShardInterval = (ShardInterval *) palloc0(sizeof(ShardInterval));
|
||||
CopyShardInterval(shardInterval, newShardInterval);
|
||||
|
@ -1085,7 +1088,7 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
|
|||
if (columnTypeId != InvalidOid)
|
||||
{
|
||||
/* allocate the comparison function in the cache context */
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(CacheMemoryContext);
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
|
||||
|
||||
shardColumnCompareFunction = GetFunctionInfo(columnTypeId, BTREE_AM_OID,
|
||||
BTORDER_PROC);
|
||||
|
@ -1100,7 +1103,7 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
|
|||
if (intervalTypeId != InvalidOid)
|
||||
{
|
||||
/* allocate the comparison function in the cache context */
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(CacheMemoryContext);
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
|
||||
|
||||
shardIntervalCompareFunction = GetFunctionInfo(intervalTypeId, BTREE_AM_OID,
|
||||
BTORDER_PROC);
|
||||
|
@ -1221,7 +1224,7 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
|
|||
numberOfPlacements = list_length(placementList);
|
||||
|
||||
/* and copy that list into the cache entry */
|
||||
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
|
||||
oldContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
|
||||
placementArray = palloc0(numberOfPlacements * sizeof(GroupShardPlacement));
|
||||
foreach(placementCell, placementList)
|
||||
{
|
||||
|
@ -1654,7 +1657,7 @@ AvailableExtensionVersion(void)
|
|||
Datum availableVersion = slot_getattr(tupleTableSlot, 2, &isNull);
|
||||
|
||||
/* we will cache the result of citus version to prevent catalog access */
|
||||
oldMemoryContext = MemoryContextSwitchTo(CacheMemoryContext);
|
||||
oldMemoryContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
|
||||
|
||||
availableExtensionVersion = text_to_cstring(DatumGetTextPP(availableVersion));
|
||||
|
||||
|
@ -1692,6 +1695,8 @@ InstalledExtensionVersion(void)
|
|||
HeapTuple extensionTuple = NULL;
|
||||
char *installedExtensionVersion = NULL;
|
||||
|
||||
InitializeCaches();
|
||||
|
||||
relation = heap_open(ExtensionRelationId, AccessShareLock);
|
||||
|
||||
ScanKeyInit(&entry[0], Anum_pg_extension_extname, BTEqualStrategyNumber, F_NAMEEQ,
|
||||
|
@ -1720,7 +1725,7 @@ InstalledExtensionVersion(void)
|
|||
}
|
||||
|
||||
/* we will cache the result of citus version to prevent catalog access */
|
||||
oldMemoryContext = MemoryContextSwitchTo(CacheMemoryContext);
|
||||
oldMemoryContext = MemoryContextSwitchTo(MetadataCacheMemoryContext);
|
||||
|
||||
installedExtensionVersion = text_to_cstring(DatumGetTextPP(installedVersion));
|
||||
|
||||
|
@ -2598,19 +2603,54 @@ InitializeCaches(void)
|
|||
|
||||
if (!performedInitialization)
|
||||
{
|
||||
/* set first, to avoid recursion dangers */
|
||||
performedInitialization = true;
|
||||
MetadataCacheMemoryContext = NULL;
|
||||
|
||||
/* make sure we've initialized CacheMemoryContext */
|
||||
if (CacheMemoryContext == NULL)
|
||||
/*
|
||||
* If either of dist table cache or shard cache
|
||||
* allocation and initializations fail due to an exception
|
||||
* that is caused by OOM or any other reason,
|
||||
* we reset the flag, and delete the shard cache memory
|
||||
* context to reclaim partially allocated memory.
|
||||
*
|
||||
* Command will continue to fail since we re-throw the exception.
|
||||
*/
|
||||
PG_TRY();
|
||||
{
|
||||
CreateCacheMemoryContext();
|
||||
}
|
||||
/* set first, to avoid recursion dangers */
|
||||
performedInitialization = true;
|
||||
|
||||
InitializeDistTableCache();
|
||||
RegisterForeignKeyGraphCacheCallbacks();
|
||||
RegisterWorkerNodeCacheCallbacks();
|
||||
RegisterLocalGroupIdCacheCallbacks();
|
||||
/* make sure we've initialized CacheMemoryContext */
|
||||
if (CacheMemoryContext == NULL)
|
||||
{
|
||||
CreateCacheMemoryContext();
|
||||
}
|
||||
|
||||
MetadataCacheMemoryContext = AllocSetContextCreate(
|
||||
CacheMemoryContext,
|
||||
"MetadataCacheMemoryContext",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
InitializeDistTableCache();
|
||||
RegisterForeignKeyGraphCacheCallbacks();
|
||||
RegisterWorkerNodeCacheCallbacks();
|
||||
RegisterLocalGroupIdCacheCallbacks();
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
performedInitialization = false;
|
||||
|
||||
if (MetadataCacheMemoryContext != NULL)
|
||||
{
|
||||
MemoryContextDelete(MetadataCacheMemoryContext);
|
||||
}
|
||||
|
||||
MetadataCacheMemoryContext = NULL;
|
||||
DistTableCacheHash = NULL;
|
||||
DistShardCacheHash = NULL;
|
||||
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2626,7 +2666,7 @@ InitializeDistTableCache(void)
|
|||
|
||||
fmgr_info_cxt(F_OIDEQ,
|
||||
&DistPartitionScanKey[0].sk_func,
|
||||
CacheMemoryContext);
|
||||
MetadataCacheMemoryContext);
|
||||
DistPartitionScanKey[0].sk_strategy = BTEqualStrategyNumber;
|
||||
DistPartitionScanKey[0].sk_subtype = InvalidOid;
|
||||
DistPartitionScanKey[0].sk_collation = InvalidOid;
|
||||
|
@ -2636,7 +2676,7 @@ InitializeDistTableCache(void)
|
|||
|
||||
fmgr_info_cxt(F_OIDEQ,
|
||||
&DistShardScanKey[0].sk_func,
|
||||
CacheMemoryContext);
|
||||
MetadataCacheMemoryContext);
|
||||
DistShardScanKey[0].sk_strategy = BTEqualStrategyNumber;
|
||||
DistShardScanKey[0].sk_subtype = InvalidOid;
|
||||
DistShardScanKey[0].sk_collation = InvalidOid;
|
||||
|
@ -2650,9 +2690,10 @@ InitializeDistTableCache(void)
|
|||
info.keysize = sizeof(int64);
|
||||
info.entrysize = sizeof(ShardCacheEntry);
|
||||
info.hash = tag_hash;
|
||||
info.hcxt = MetadataCacheMemoryContext;
|
||||
DistShardCacheHash =
|
||||
hash_create("Shard Cache", 32 * 64, &info,
|
||||
HASH_ELEM | HASH_FUNCTION);
|
||||
HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
|
||||
|
||||
/* Watch for invalidation events. */
|
||||
CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback,
|
||||
|
@ -2735,7 +2776,7 @@ InitializeWorkerNodeCache(void)
|
|||
memset(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(uint32) + WORKER_LENGTH + sizeof(uint32);
|
||||
info.entrysize = sizeof(WorkerNode);
|
||||
info.hcxt = CacheMemoryContext;
|
||||
info.hcxt = MetadataCacheMemoryContext;
|
||||
info.hash = WorkerNodeHashCode;
|
||||
info.match = WorkerNodeCompare;
|
||||
hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE;
|
||||
|
@ -2746,7 +2787,7 @@ InitializeWorkerNodeCache(void)
|
|||
workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters);
|
||||
|
||||
newWorkerNodeCount = list_length(workerNodeList);
|
||||
newWorkerNodeArray = MemoryContextAlloc(CacheMemoryContext,
|
||||
newWorkerNodeArray = MemoryContextAlloc(MetadataCacheMemoryContext,
|
||||
sizeof(WorkerNode *) * newWorkerNodeCount);
|
||||
|
||||
/* iterate over the worker node list */
|
||||
|
@ -3165,9 +3206,10 @@ CreateDistTableCache(void)
|
|||
info.keysize = sizeof(Oid);
|
||||
info.entrysize = sizeof(DistTableCacheEntry);
|
||||
info.hash = tag_hash;
|
||||
info.hcxt = MetadataCacheMemoryContext;
|
||||
DistTableCacheHash =
|
||||
hash_create("Distributed Relation Cache", 32, &info,
|
||||
HASH_ELEM | HASH_FUNCTION);
|
||||
HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue