diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index c4af3de3a..fb9601115 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -453,6 +453,12 @@ multi_ProcessUtility(Node *parsetree, ProcessVacuumStmt(vacuumStmt, queryString); } + + /* + * Ensure value is valid, we can't do some checks during CREATE + * EXTENSION. This is important to register some invalidation callbacks. + */ + CitusHasBeenLoaded(); } diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 66d652090..d0ec677bf 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -130,8 +130,6 @@ static HTAB *DistShardCacheHash = NULL; static HTAB *WorkerNodeHash = NULL; static bool workerNodeHashValid = false; -static bool invalidationRegistered = false; - /* default value is -1, for coordinator it's 0 and for worker nodes > 0 */ static int LocalGroupId = -1; @@ -160,8 +158,11 @@ static char * InstalledExtensionVersion(void); static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray, int shardIntervalArrayLength, FmgrInfo *shardIntervalSortCompareFunction); +static void InitializeCaches(void); static void InitializeDistTableCache(void); static void InitializeWorkerNodeCache(void); +static void RegisterWorkerNodeCacheCallbacks(void); +static void RegisterLocalGroupIdCacheCallbacks(void); static uint32 WorkerNodeHashCode(const void *key, Size keySize); static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); @@ -413,11 +414,7 @@ LookupShardCacheEntry(int64 shardId) Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING)); - /* probably not reachable */ - if (DistShardCacheHash == NULL) - { - InitializeDistTableCache(); - } + InitializeCaches(); /* lookup cache entry */ shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache); @@ -523,10 +520,7 @@ LookupDistTableCacheEntry(Oid relationId) return NULL; } - if (DistTableCacheHash == NULL) - { - InitializeDistTableCache(); - } + InitializeCaches(); /* * If the version is not known to be compatible, perform thorough check, @@ -1316,6 +1310,8 @@ AvailableExtensionVersion(void) bool doCopy = false; char *availableExtensionVersion; + InitializeCaches(); + estate = CreateExecutorState(); extensionsResultSet = makeNode(ReturnSetInfo); extensionsResultSet->econtext = GetPerTupleExprContext(estate); @@ -1348,10 +1344,6 @@ AvailableExtensionVersion(void) Datum availableVersion = slot_getattr(tupleTableSlot, 2, &isNull); /* we will cache the result of citus version to prevent catalog access */ - if (CacheMemoryContext == NULL) - { - CreateCacheMemoryContext(); - } oldMemoryContext = MemoryContextSwitchTo(CacheMemoryContext); availableExtensionVersion = text_to_cstring(DatumGetTextPP(availableVersion)); @@ -1418,11 +1410,6 @@ InstalledExtensionVersion(void) } /* we will cache the result of citus version to prevent catalog access */ - if (CacheMemoryContext == NULL) - { - CreateCacheMemoryContext(); - } - oldMemoryContext = MemoryContextSwitchTo(CacheMemoryContext); installedExtensionVersion = text_to_cstring(DatumGetTextPP(installedVersion)); @@ -1995,18 +1982,39 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS) } +/* + * InitializeCaches() registers invalidation handlers for metadata_cache.c's + * caches. + */ +static void +InitializeCaches(void) +{ + static bool performedInitialization = false; + + if (!performedInitialization) + { + /* set first, to avoid recursion dangers */ + performedInitialization = true; + + /* make sure we've initialized CacheMemoryContext */ + if (CacheMemoryContext == NULL) + { + CreateCacheMemoryContext(); + } + + InitializeDistTableCache(); + RegisterWorkerNodeCacheCallbacks(); + RegisterLocalGroupIdCacheCallbacks(); + } +} + + /* initialize the infrastructure for the metadata cache */ static void InitializeDistTableCache(void) { HASHCTL info; - /* make sure we've initialized CacheMemoryContext */ - if (CacheMemoryContext == NULL) - { - CreateCacheMemoryContext(); - } - /* build initial scan keys, copied for every relation scan */ memset(&DistPartitionScanKey, 0, sizeof(DistPartitionScanKey)); @@ -2060,6 +2068,8 @@ InitializeDistTableCache(void) HTAB * GetWorkerNodeHash(void) { + InitializeCaches(); /* ensure relevant callbacks are registered */ + /* * We might have some concurrent metadata changes. In order to get the changes, * we first need to accept the cache invalidation messages. @@ -2085,7 +2095,6 @@ GetWorkerNodeHash(void) static void InitializeWorkerNodeCache(void) { - static bool invalidationRegistered = false; HTAB *oldWorkerNodeHash = NULL; List *workerNodeList = NIL; ListCell *workerNodeCell = NULL; @@ -2093,11 +2102,7 @@ InitializeWorkerNodeCache(void) int hashFlags = 0; long maxTableSize = (long) MaxWorkerNodesTracked; - /* make sure we've initialized CacheMemoryContext */ - if (CacheMemoryContext == NULL) - { - CreateCacheMemoryContext(); - } + InitializeCaches(); /* * Create the hash that holds the worker nodes. The key is the combination of @@ -2155,16 +2160,20 @@ InitializeWorkerNodeCache(void) /* now, safe to destroy the old hash */ hash_destroy(oldWorkerNodeHash); +} - /* prevent multiple invalidation registrations */ - if (!invalidationRegistered) - { - /* Watch for invalidation events. */ - CacheRegisterRelcacheCallback(InvalidateNodeRelationCacheCallback, - (Datum) 0); - invalidationRegistered = true; - } +/* + * RegisterWorkerNodeCacheCallbacks registers the callbacks required for the + * worker node cache. It's separate from InitializeWorkerNodeCache so the + * callback can be registered early, before the metadata tables exist. + */ +static void +RegisterWorkerNodeCacheCallbacks(void) +{ + /* Watch for invalidation events. */ + CacheRegisterRelcacheCallback(InvalidateNodeRelationCacheCallback, + (Datum) 0); } @@ -2185,6 +2194,8 @@ GetLocalGroupId(void) Relation pgDistLocalGroupId = NULL; Oid localGroupTableOid = InvalidOid; + InitializeCaches(); + /* * Already set the group id, no need to read the heap again. */ @@ -2226,16 +2237,6 @@ GetLocalGroupId(void) systable_endscan(scanDescriptor); heap_close(pgDistLocalGroupId, AccessShareLock); - /* prevent multiple invalidation registrations */ - if (!invalidationRegistered) - { - /* Watch for invalidation events. */ - CacheRegisterRelcacheCallback(InvalidateLocalGroupIdRelationCacheCallback, - (Datum) 0); - - invalidationRegistered = true; - } - /* set the local cache variable */ LocalGroupId = groupId; @@ -2243,6 +2244,21 @@ GetLocalGroupId(void) } +/* + * RegisterLocalGroupIdCacheCallbacks registers the callbacks required to + * maintain LocalGroupId at a consistent value. It's separate from + * GetLocalGroupId so the callback can be registered early, before metadata + * tables exist. + */ +static void +RegisterLocalGroupIdCacheCallbacks(void) +{ + /* Watch for invalidation events. */ + CacheRegisterRelcacheCallback(InvalidateLocalGroupIdRelationCacheCallback, + (Datum) 0); +} + + /* * WorkerNodeHashCode computes the hash code for a worker node from the node's * host name and port number. Nodes that only differ by their rack locations @@ -2739,6 +2755,9 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid interva static void CachedRelationLookup(const char *relationName, Oid *cachedOid) { + /* force callbacks to be registered, so we always get notified upon changes */ + InitializeCaches(); + if (*cachedOid == InvalidOid) { *cachedOid = get_relname_relid(relationName, PG_CATALOG_NAMESPACE); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 08762ae0e..1e026a8d8 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -119,7 +119,6 @@ CREATE TABLE version_mismatch_table(column1 int); \copy version_mismatch_table FROM STDIN; -- Test INSERT INSERT INTO version_mismatch_table(column1) VALUES(5); - -- Test SELECT SELECT * FROM version_mismatch_table ORDER BY column1; column1 @@ -177,13 +176,6 @@ DROP EXTENSION citus; CREATE EXTENSION citus; -- test cache invalidation in workers \c - - - :worker_1_port --- this will initialize the cache -\d - List of relations - Schema | Name | Type | Owner ---------+------+------+------- -(0 rows) - DROP EXTENSION citus; SET citus.enable_version_checks TO 'false'; CREATE EXTENSION citus VERSION '5.2-4'; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 7c6ed0b7f..cfbb677c9 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -120,7 +120,7 @@ CREATE TABLE version_mismatch_table(column1 int); -- Test INSERT INSERT INTO version_mismatch_table(column1) VALUES(5); - + -- Test SELECT SELECT * FROM version_mismatch_table ORDER BY column1; @@ -164,8 +164,6 @@ CREATE EXTENSION citus; -- test cache invalidation in workers \c - - - :worker_1_port --- this will initialize the cache -\d DROP EXTENSION citus; SET citus.enable_version_checks TO 'false'; CREATE EXTENSION citus VERSION '5.2-4';