mirror of https://github.com/citusdata/citus.git
Force cache invalidation machinery to be initialized earlier.
Previously it was not guaranteed that invalidations were registered after creating the extension, only if the extension was used afterwards.pull/1461/head
parent
f645dca593
commit
1691f780fd
|
@ -453,6 +453,12 @@ multi_ProcessUtility(Node *parsetree,
|
||||||
|
|
||||||
ProcessVacuumStmt(vacuumStmt, queryString);
|
ProcessVacuumStmt(vacuumStmt, queryString);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Ensure value is valid, we can't do some checks during CREATE
|
||||||
|
* EXTENSION. This is important to register some invalidation callbacks.
|
||||||
|
*/
|
||||||
|
CitusHasBeenLoaded();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -130,8 +130,6 @@ static HTAB *DistShardCacheHash = NULL;
|
||||||
static HTAB *WorkerNodeHash = NULL;
|
static HTAB *WorkerNodeHash = NULL;
|
||||||
static bool workerNodeHashValid = false;
|
static bool workerNodeHashValid = false;
|
||||||
|
|
||||||
static bool invalidationRegistered = false;
|
|
||||||
|
|
||||||
/* default value is -1, for coordinator it's 0 and for worker nodes > 0 */
|
/* default value is -1, for coordinator it's 0 and for worker nodes > 0 */
|
||||||
static int LocalGroupId = -1;
|
static int LocalGroupId = -1;
|
||||||
|
|
||||||
|
@ -160,8 +158,11 @@ static char * InstalledExtensionVersion(void);
|
||||||
static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
|
static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
|
||||||
int shardIntervalArrayLength,
|
int shardIntervalArrayLength,
|
||||||
FmgrInfo *shardIntervalSortCompareFunction);
|
FmgrInfo *shardIntervalSortCompareFunction);
|
||||||
|
static void InitializeCaches(void);
|
||||||
static void InitializeDistTableCache(void);
|
static void InitializeDistTableCache(void);
|
||||||
static void InitializeWorkerNodeCache(void);
|
static void InitializeWorkerNodeCache(void);
|
||||||
|
static void RegisterWorkerNodeCacheCallbacks(void);
|
||||||
|
static void RegisterLocalGroupIdCacheCallbacks(void);
|
||||||
static uint32 WorkerNodeHashCode(const void *key, Size keySize);
|
static uint32 WorkerNodeHashCode(const void *key, Size keySize);
|
||||||
static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
|
static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
|
||||||
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
|
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
|
||||||
|
@ -413,11 +414,7 @@ LookupShardCacheEntry(int64 shardId)
|
||||||
|
|
||||||
Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING));
|
Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING));
|
||||||
|
|
||||||
/* probably not reachable */
|
InitializeCaches();
|
||||||
if (DistShardCacheHash == NULL)
|
|
||||||
{
|
|
||||||
InitializeDistTableCache();
|
|
||||||
}
|
|
||||||
|
|
||||||
/* lookup cache entry */
|
/* lookup cache entry */
|
||||||
shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache);
|
shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache);
|
||||||
|
@ -523,10 +520,7 @@ LookupDistTableCacheEntry(Oid relationId)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (DistTableCacheHash == NULL)
|
InitializeCaches();
|
||||||
{
|
|
||||||
InitializeDistTableCache();
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If the version is not known to be compatible, perform thorough check,
|
* If the version is not known to be compatible, perform thorough check,
|
||||||
|
@ -1316,6 +1310,8 @@ AvailableExtensionVersion(void)
|
||||||
bool doCopy = false;
|
bool doCopy = false;
|
||||||
char *availableExtensionVersion;
|
char *availableExtensionVersion;
|
||||||
|
|
||||||
|
InitializeCaches();
|
||||||
|
|
||||||
estate = CreateExecutorState();
|
estate = CreateExecutorState();
|
||||||
extensionsResultSet = makeNode(ReturnSetInfo);
|
extensionsResultSet = makeNode(ReturnSetInfo);
|
||||||
extensionsResultSet->econtext = GetPerTupleExprContext(estate);
|
extensionsResultSet->econtext = GetPerTupleExprContext(estate);
|
||||||
|
@ -1348,10 +1344,6 @@ AvailableExtensionVersion(void)
|
||||||
Datum availableVersion = slot_getattr(tupleTableSlot, 2, &isNull);
|
Datum availableVersion = slot_getattr(tupleTableSlot, 2, &isNull);
|
||||||
|
|
||||||
/* we will cache the result of citus version to prevent catalog access */
|
/* we will cache the result of citus version to prevent catalog access */
|
||||||
if (CacheMemoryContext == NULL)
|
|
||||||
{
|
|
||||||
CreateCacheMemoryContext();
|
|
||||||
}
|
|
||||||
oldMemoryContext = MemoryContextSwitchTo(CacheMemoryContext);
|
oldMemoryContext = MemoryContextSwitchTo(CacheMemoryContext);
|
||||||
|
|
||||||
availableExtensionVersion = text_to_cstring(DatumGetTextPP(availableVersion));
|
availableExtensionVersion = text_to_cstring(DatumGetTextPP(availableVersion));
|
||||||
|
@ -1418,11 +1410,6 @@ InstalledExtensionVersion(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we will cache the result of citus version to prevent catalog access */
|
/* we will cache the result of citus version to prevent catalog access */
|
||||||
if (CacheMemoryContext == NULL)
|
|
||||||
{
|
|
||||||
CreateCacheMemoryContext();
|
|
||||||
}
|
|
||||||
|
|
||||||
oldMemoryContext = MemoryContextSwitchTo(CacheMemoryContext);
|
oldMemoryContext = MemoryContextSwitchTo(CacheMemoryContext);
|
||||||
|
|
||||||
installedExtensionVersion = text_to_cstring(DatumGetTextPP(installedVersion));
|
installedExtensionVersion = text_to_cstring(DatumGetTextPP(installedVersion));
|
||||||
|
@ -1995,18 +1982,39 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InitializeCaches() registers invalidation handlers for metadata_cache.c's
|
||||||
|
* caches.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
InitializeCaches(void)
|
||||||
|
{
|
||||||
|
static bool performedInitialization = false;
|
||||||
|
|
||||||
|
if (!performedInitialization)
|
||||||
|
{
|
||||||
|
/* set first, to avoid recursion dangers */
|
||||||
|
performedInitialization = true;
|
||||||
|
|
||||||
|
/* make sure we've initialized CacheMemoryContext */
|
||||||
|
if (CacheMemoryContext == NULL)
|
||||||
|
{
|
||||||
|
CreateCacheMemoryContext();
|
||||||
|
}
|
||||||
|
|
||||||
|
InitializeDistTableCache();
|
||||||
|
RegisterWorkerNodeCacheCallbacks();
|
||||||
|
RegisterLocalGroupIdCacheCallbacks();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* initialize the infrastructure for the metadata cache */
|
/* initialize the infrastructure for the metadata cache */
|
||||||
static void
|
static void
|
||||||
InitializeDistTableCache(void)
|
InitializeDistTableCache(void)
|
||||||
{
|
{
|
||||||
HASHCTL info;
|
HASHCTL info;
|
||||||
|
|
||||||
/* make sure we've initialized CacheMemoryContext */
|
|
||||||
if (CacheMemoryContext == NULL)
|
|
||||||
{
|
|
||||||
CreateCacheMemoryContext();
|
|
||||||
}
|
|
||||||
|
|
||||||
/* build initial scan keys, copied for every relation scan */
|
/* build initial scan keys, copied for every relation scan */
|
||||||
memset(&DistPartitionScanKey, 0, sizeof(DistPartitionScanKey));
|
memset(&DistPartitionScanKey, 0, sizeof(DistPartitionScanKey));
|
||||||
|
|
||||||
|
@ -2060,6 +2068,8 @@ InitializeDistTableCache(void)
|
||||||
HTAB *
|
HTAB *
|
||||||
GetWorkerNodeHash(void)
|
GetWorkerNodeHash(void)
|
||||||
{
|
{
|
||||||
|
InitializeCaches(); /* ensure relevant callbacks are registered */
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We might have some concurrent metadata changes. In order to get the changes,
|
* We might have some concurrent metadata changes. In order to get the changes,
|
||||||
* we first need to accept the cache invalidation messages.
|
* we first need to accept the cache invalidation messages.
|
||||||
|
@ -2085,7 +2095,6 @@ GetWorkerNodeHash(void)
|
||||||
static void
|
static void
|
||||||
InitializeWorkerNodeCache(void)
|
InitializeWorkerNodeCache(void)
|
||||||
{
|
{
|
||||||
static bool invalidationRegistered = false;
|
|
||||||
HTAB *oldWorkerNodeHash = NULL;
|
HTAB *oldWorkerNodeHash = NULL;
|
||||||
List *workerNodeList = NIL;
|
List *workerNodeList = NIL;
|
||||||
ListCell *workerNodeCell = NULL;
|
ListCell *workerNodeCell = NULL;
|
||||||
|
@ -2093,11 +2102,7 @@ InitializeWorkerNodeCache(void)
|
||||||
int hashFlags = 0;
|
int hashFlags = 0;
|
||||||
long maxTableSize = (long) MaxWorkerNodesTracked;
|
long maxTableSize = (long) MaxWorkerNodesTracked;
|
||||||
|
|
||||||
/* make sure we've initialized CacheMemoryContext */
|
InitializeCaches();
|
||||||
if (CacheMemoryContext == NULL)
|
|
||||||
{
|
|
||||||
CreateCacheMemoryContext();
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Create the hash that holds the worker nodes. The key is the combination of
|
* Create the hash that holds the worker nodes. The key is the combination of
|
||||||
|
@ -2155,16 +2160,20 @@ InitializeWorkerNodeCache(void)
|
||||||
|
|
||||||
/* now, safe to destroy the old hash */
|
/* now, safe to destroy the old hash */
|
||||||
hash_destroy(oldWorkerNodeHash);
|
hash_destroy(oldWorkerNodeHash);
|
||||||
|
}
|
||||||
|
|
||||||
/* prevent multiple invalidation registrations */
|
|
||||||
if (!invalidationRegistered)
|
|
||||||
{
|
|
||||||
/* Watch for invalidation events. */
|
|
||||||
CacheRegisterRelcacheCallback(InvalidateNodeRelationCacheCallback,
|
|
||||||
(Datum) 0);
|
|
||||||
|
|
||||||
invalidationRegistered = true;
|
/*
|
||||||
}
|
* RegisterWorkerNodeCacheCallbacks registers the callbacks required for the
|
||||||
|
* worker node cache. It's separate from InitializeWorkerNodeCache so the
|
||||||
|
* callback can be registered early, before the metadata tables exist.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RegisterWorkerNodeCacheCallbacks(void)
|
||||||
|
{
|
||||||
|
/* Watch for invalidation events. */
|
||||||
|
CacheRegisterRelcacheCallback(InvalidateNodeRelationCacheCallback,
|
||||||
|
(Datum) 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2185,6 +2194,8 @@ GetLocalGroupId(void)
|
||||||
Relation pgDistLocalGroupId = NULL;
|
Relation pgDistLocalGroupId = NULL;
|
||||||
Oid localGroupTableOid = InvalidOid;
|
Oid localGroupTableOid = InvalidOid;
|
||||||
|
|
||||||
|
InitializeCaches();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Already set the group id, no need to read the heap again.
|
* Already set the group id, no need to read the heap again.
|
||||||
*/
|
*/
|
||||||
|
@ -2226,16 +2237,6 @@ GetLocalGroupId(void)
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
heap_close(pgDistLocalGroupId, AccessShareLock);
|
heap_close(pgDistLocalGroupId, AccessShareLock);
|
||||||
|
|
||||||
/* prevent multiple invalidation registrations */
|
|
||||||
if (!invalidationRegistered)
|
|
||||||
{
|
|
||||||
/* Watch for invalidation events. */
|
|
||||||
CacheRegisterRelcacheCallback(InvalidateLocalGroupIdRelationCacheCallback,
|
|
||||||
(Datum) 0);
|
|
||||||
|
|
||||||
invalidationRegistered = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* set the local cache variable */
|
/* set the local cache variable */
|
||||||
LocalGroupId = groupId;
|
LocalGroupId = groupId;
|
||||||
|
|
||||||
|
@ -2243,6 +2244,21 @@ GetLocalGroupId(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RegisterLocalGroupIdCacheCallbacks registers the callbacks required to
|
||||||
|
* maintain LocalGroupId at a consistent value. It's separate from
|
||||||
|
* GetLocalGroupId so the callback can be registered early, before metadata
|
||||||
|
* tables exist.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RegisterLocalGroupIdCacheCallbacks(void)
|
||||||
|
{
|
||||||
|
/* Watch for invalidation events. */
|
||||||
|
CacheRegisterRelcacheCallback(InvalidateLocalGroupIdRelationCacheCallback,
|
||||||
|
(Datum) 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WorkerNodeHashCode computes the hash code for a worker node from the node's
|
* WorkerNodeHashCode computes the hash code for a worker node from the node's
|
||||||
* host name and port number. Nodes that only differ by their rack locations
|
* host name and port number. Nodes that only differ by their rack locations
|
||||||
|
@ -2739,6 +2755,9 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid interva
|
||||||
static void
|
static void
|
||||||
CachedRelationLookup(const char *relationName, Oid *cachedOid)
|
CachedRelationLookup(const char *relationName, Oid *cachedOid)
|
||||||
{
|
{
|
||||||
|
/* force callbacks to be registered, so we always get notified upon changes */
|
||||||
|
InitializeCaches();
|
||||||
|
|
||||||
if (*cachedOid == InvalidOid)
|
if (*cachedOid == InvalidOid)
|
||||||
{
|
{
|
||||||
*cachedOid = get_relname_relid(relationName, PG_CATALOG_NAMESPACE);
|
*cachedOid = get_relname_relid(relationName, PG_CATALOG_NAMESPACE);
|
||||||
|
|
|
@ -119,7 +119,6 @@ CREATE TABLE version_mismatch_table(column1 int);
|
||||||
\copy version_mismatch_table FROM STDIN;
|
\copy version_mismatch_table FROM STDIN;
|
||||||
-- Test INSERT
|
-- Test INSERT
|
||||||
INSERT INTO version_mismatch_table(column1) VALUES(5);
|
INSERT INTO version_mismatch_table(column1) VALUES(5);
|
||||||
|
|
||||||
-- Test SELECT
|
-- Test SELECT
|
||||||
SELECT * FROM version_mismatch_table ORDER BY column1;
|
SELECT * FROM version_mismatch_table ORDER BY column1;
|
||||||
column1
|
column1
|
||||||
|
@ -177,13 +176,6 @@ DROP EXTENSION citus;
|
||||||
CREATE EXTENSION citus;
|
CREATE EXTENSION citus;
|
||||||
-- test cache invalidation in workers
|
-- test cache invalidation in workers
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
-- this will initialize the cache
|
|
||||||
\d
|
|
||||||
List of relations
|
|
||||||
Schema | Name | Type | Owner
|
|
||||||
--------+------+------+-------
|
|
||||||
(0 rows)
|
|
||||||
|
|
||||||
DROP EXTENSION citus;
|
DROP EXTENSION citus;
|
||||||
SET citus.enable_version_checks TO 'false';
|
SET citus.enable_version_checks TO 'false';
|
||||||
CREATE EXTENSION citus VERSION '5.2-4';
|
CREATE EXTENSION citus VERSION '5.2-4';
|
||||||
|
|
|
@ -120,7 +120,7 @@ CREATE TABLE version_mismatch_table(column1 int);
|
||||||
|
|
||||||
-- Test INSERT
|
-- Test INSERT
|
||||||
INSERT INTO version_mismatch_table(column1) VALUES(5);
|
INSERT INTO version_mismatch_table(column1) VALUES(5);
|
||||||
|
|
||||||
-- Test SELECT
|
-- Test SELECT
|
||||||
SELECT * FROM version_mismatch_table ORDER BY column1;
|
SELECT * FROM version_mismatch_table ORDER BY column1;
|
||||||
|
|
||||||
|
@ -164,8 +164,6 @@ CREATE EXTENSION citus;
|
||||||
-- test cache invalidation in workers
|
-- test cache invalidation in workers
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
|
||||||
-- this will initialize the cache
|
|
||||||
\d
|
|
||||||
DROP EXTENSION citus;
|
DROP EXTENSION citus;
|
||||||
SET citus.enable_version_checks TO 'false';
|
SET citus.enable_version_checks TO 'false';
|
||||||
CREATE EXTENSION citus VERSION '5.2-4';
|
CREATE EXTENSION citus VERSION '5.2-4';
|
||||||
|
|
Loading…
Reference in New Issue