From f645dca593b094e54d19e79ed623e3ed0a8b59d5 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 20 Jun 2017 16:48:46 -0700 Subject: [PATCH 1/5] Centralized metadata_cache cache variables into one struct, to avoid missing resets. E.g. extensionOwner was already missed. --- .../distributed/utils/metadata_cache.c | 191 +++++++++--------- 1 file changed, 98 insertions(+), 93 deletions(-) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 5f4f14f97..66d652090 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -81,27 +81,39 @@ typedef struct ShardCacheEntry } ShardCacheEntry; -/* state which should be cleared upon DROP EXTENSION */ -static bool extensionLoaded = false; -static Oid distShardRelationId = InvalidOid; -static Oid distShardPlacementRelationId = InvalidOid; -static Oid distNodeRelationId = InvalidOid; -static Oid distLocalGroupRelationId = InvalidOid; -static Oid distColocationRelationId = InvalidOid; -static Oid distColocationConfigurationIndexId = InvalidOid; -static Oid distColocationColocationidIndexId = InvalidOid; -static Oid distPartitionRelationId = InvalidOid; -static Oid distPartitionLogicalRelidIndexId = InvalidOid; -static Oid distPartitionColocationidIndexId = InvalidOid; -static Oid distShardLogicalRelidIndexId = InvalidOid; -static Oid distShardShardidIndexId = InvalidOid; -static Oid distShardPlacementShardidIndexId = InvalidOid; -static Oid distShardPlacementPlacementidIndexId = InvalidOid; -static Oid distShardPlacementNodeidIndexId = InvalidOid; -static Oid distTransactionRelationId = InvalidOid; -static Oid distTransactionGroupIndexId = InvalidOid; -static Oid extraDataContainerFuncId = InvalidOid; -static Oid workerHashFunctionId = InvalidOid; +/* + * State which should be cleared upon DROP EXTENSION. When the configuration + * changes, e.g. because the extension is dropped, these summarily get set to + * 0. + */ +typedef struct MetadataCacheData +{ + bool extensionLoaded; + Oid distShardRelationId; + Oid distShardPlacementRelationId; + Oid distNodeRelationId; + Oid distLocalGroupRelationId; + Oid distColocationRelationId; + Oid distColocationConfigurationIndexId; + Oid distColocationColocationidIndexId; + Oid distPartitionRelationId; + Oid distPartitionLogicalRelidIndexId; + Oid distPartitionColocationidIndexId; + Oid distShardLogicalRelidIndexId; + Oid distShardShardidIndexId; + Oid distShardPlacementShardidIndexId; + Oid distShardPlacementPlacementidIndexId; + Oid distShardPlacementNodeidIndexId; + Oid distTransactionRelationId; + Oid distTransactionGroupIndexId; + Oid extraDataContainerFuncId; + Oid workerHashFunctionId; + Oid extensionOwner; +} MetadataCacheData; + + +static MetadataCacheData MetadataCache; + /* Citus extension version variables */ bool EnableVersionChecks = true; /* version checks are enabled */ @@ -1094,7 +1106,7 @@ bool CitusHasBeenLoaded(void) { /* recheck presence until citus has been loaded */ - if (!extensionLoaded || creating_extension) + if (!MetadataCache.extensionLoaded || creating_extension) { bool extensionPresent = false; bool extensionScriptExecuted = true; @@ -1115,9 +1127,11 @@ CitusHasBeenLoaded(void) } /* we disable extension features during pg_upgrade */ - extensionLoaded = extensionPresent && extensionScriptExecuted && !IsBinaryUpgrade; + MetadataCache.extensionLoaded = extensionPresent && + extensionScriptExecuted && + !IsBinaryUpgrade; - if (extensionLoaded) + if (MetadataCache.extensionLoaded) { /* * InvalidateDistRelationCacheCallback resets state such as extensionLoaded @@ -1140,7 +1154,7 @@ CitusHasBeenLoaded(void) } } - return extensionLoaded; + return MetadataCache.extensionLoaded; } @@ -1433,9 +1447,10 @@ InstalledExtensionVersion(void) Oid DistShardRelationId(void) { - CachedRelationLookup("pg_dist_shard", &distShardRelationId); + CachedRelationLookup("pg_dist_shard", + &MetadataCache.distShardRelationId); - return distShardRelationId; + return MetadataCache.distShardRelationId; } @@ -1443,9 +1458,10 @@ DistShardRelationId(void) Oid DistShardPlacementRelationId(void) { - CachedRelationLookup("pg_dist_shard_placement", &distShardPlacementRelationId); + CachedRelationLookup("pg_dist_shard_placement", + &MetadataCache.distShardPlacementRelationId); - return distShardPlacementRelationId; + return MetadataCache.distShardPlacementRelationId; } @@ -1453,9 +1469,10 @@ DistShardPlacementRelationId(void) Oid DistNodeRelationId(void) { - CachedRelationLookup("pg_dist_node", &distNodeRelationId); + CachedRelationLookup("pg_dist_node", + &MetadataCache.distNodeRelationId); - return distNodeRelationId; + return MetadataCache.distNodeRelationId; } @@ -1463,9 +1480,10 @@ DistNodeRelationId(void) Oid DistLocalGroupIdRelationId(void) { - CachedRelationLookup("pg_dist_local_group", &distLocalGroupRelationId); + CachedRelationLookup("pg_dist_local_group", + &MetadataCache.distLocalGroupRelationId); - return distLocalGroupRelationId; + return MetadataCache.distLocalGroupRelationId; } @@ -1473,9 +1491,10 @@ DistLocalGroupIdRelationId(void) Oid DistColocationRelationId(void) { - CachedRelationLookup("pg_dist_colocation", &distColocationRelationId); + CachedRelationLookup("pg_dist_colocation", + &MetadataCache.distColocationRelationId); - return distColocationRelationId; + return MetadataCache.distColocationRelationId; } @@ -1484,9 +1503,9 @@ Oid DistColocationConfigurationIndexId(void) { CachedRelationLookup("pg_dist_colocation_configuration_index", - &distColocationConfigurationIndexId); + &MetadataCache.distColocationConfigurationIndexId); - return distColocationConfigurationIndexId; + return MetadataCache.distColocationConfigurationIndexId; } @@ -1495,9 +1514,9 @@ Oid DistColocationColocationidIndexId(void) { CachedRelationLookup("pg_dist_colocation_pkey", - &distColocationColocationidIndexId); + &MetadataCache.distColocationColocationidIndexId); - return distColocationColocationidIndexId; + return MetadataCache.distColocationColocationidIndexId; } @@ -1505,9 +1524,10 @@ DistColocationColocationidIndexId(void) Oid DistPartitionRelationId(void) { - CachedRelationLookup("pg_dist_partition", &distPartitionRelationId); + CachedRelationLookup("pg_dist_partition", + &MetadataCache.distPartitionRelationId); - return distPartitionRelationId; + return MetadataCache.distPartitionRelationId; } @@ -1516,9 +1536,9 @@ Oid DistPartitionLogicalRelidIndexId(void) { CachedRelationLookup("pg_dist_partition_logical_relid_index", - &distPartitionLogicalRelidIndexId); + &MetadataCache.distPartitionLogicalRelidIndexId); - return distPartitionLogicalRelidIndexId; + return MetadataCache.distPartitionLogicalRelidIndexId; } @@ -1527,9 +1547,9 @@ Oid DistPartitionColocationidIndexId(void) { CachedRelationLookup("pg_dist_partition_colocationid_index", - &distPartitionColocationidIndexId); + &MetadataCache.distPartitionColocationidIndexId); - return distPartitionColocationidIndexId; + return MetadataCache.distPartitionColocationidIndexId; } @@ -1538,9 +1558,9 @@ Oid DistShardLogicalRelidIndexId(void) { CachedRelationLookup("pg_dist_shard_logical_relid_index", - &distShardLogicalRelidIndexId); + &MetadataCache.distShardLogicalRelidIndexId); - return distShardLogicalRelidIndexId; + return MetadataCache.distShardLogicalRelidIndexId; } @@ -1548,9 +1568,10 @@ DistShardLogicalRelidIndexId(void) Oid DistShardShardidIndexId(void) { - CachedRelationLookup("pg_dist_shard_shardid_index", &distShardShardidIndexId); + CachedRelationLookup("pg_dist_shard_shardid_index", + &MetadataCache.distShardShardidIndexId); - return distShardShardidIndexId; + return MetadataCache.distShardShardidIndexId; } @@ -1559,9 +1580,9 @@ Oid DistShardPlacementShardidIndexId(void) { CachedRelationLookup("pg_dist_shard_placement_shardid_index", - &distShardPlacementShardidIndexId); + &MetadataCache.distShardPlacementShardidIndexId); - return distShardPlacementShardidIndexId; + return MetadataCache.distShardPlacementShardidIndexId; } @@ -1570,9 +1591,9 @@ Oid DistShardPlacementPlacementidIndexId(void) { CachedRelationLookup("pg_dist_shard_placement_placementid_index", - &distShardPlacementPlacementidIndexId); + &MetadataCache.distShardPlacementPlacementidIndexId); - return distShardPlacementPlacementidIndexId; + return MetadataCache.distShardPlacementPlacementidIndexId; } @@ -1580,9 +1601,10 @@ DistShardPlacementPlacementidIndexId(void) Oid DistTransactionRelationId(void) { - CachedRelationLookup("pg_dist_transaction", &distTransactionRelationId); + CachedRelationLookup("pg_dist_transaction", + &MetadataCache.distTransactionRelationId); - return distTransactionRelationId; + return MetadataCache.distTransactionRelationId; } @@ -1591,9 +1613,9 @@ Oid DistTransactionGroupIndexId(void) { CachedRelationLookup("pg_dist_transaction_group_index", - &distTransactionGroupIndexId); + &MetadataCache.distTransactionGroupIndexId); - return distTransactionGroupIndexId; + return MetadataCache.distTransactionGroupIndexId; } @@ -1602,9 +1624,9 @@ Oid DistShardPlacementNodeidIndexId(void) { CachedRelationLookup("pg_dist_shard_placement_nodeid_index", - &distShardPlacementNodeidIndexId); + &MetadataCache.distShardPlacementNodeidIndexId); - return distShardPlacementNodeidIndexId; + return MetadataCache.distShardPlacementNodeidIndexId; } @@ -1615,14 +1637,15 @@ CitusExtraDataContainerFuncId(void) List *nameList = NIL; Oid paramOids[1] = { INTERNALOID }; - if (extraDataContainerFuncId == InvalidOid) + if (MetadataCache.extraDataContainerFuncId == InvalidOid) { nameList = list_make2(makeString("pg_catalog"), makeString("citus_extradata_container")); - extraDataContainerFuncId = LookupFuncName(nameList, 1, paramOids, false); + MetadataCache.extraDataContainerFuncId = + LookupFuncName(nameList, 1, paramOids, false); } - return extraDataContainerFuncId; + return MetadataCache.extraDataContainerFuncId; } @@ -1630,17 +1653,18 @@ CitusExtraDataContainerFuncId(void) Oid CitusWorkerHashFunctionId(void) { - if (workerHashFunctionId == InvalidOid) + if (MetadataCache.workerHashFunctionId == InvalidOid) { Oid citusExtensionOid = get_extension_oid("citus", false); Oid citusSchemaOid = get_extension_schema(citusExtensionOid); char *citusSchemaName = get_namespace_name(citusSchemaOid); const int argCount = 1; - workerHashFunctionId = FunctionOid(citusSchemaName, "worker_hash", argCount); + MetadataCache.workerHashFunctionId = + FunctionOid(citusSchemaName, "worker_hash", argCount); } - return workerHashFunctionId; + return MetadataCache.workerHashFunctionId; } @@ -1657,11 +1681,10 @@ CitusExtensionOwner(void) ScanKeyData entry[1]; HeapTuple extensionTuple = NULL; Form_pg_extension extensionForm = NULL; - static Oid extensionOwner = InvalidOid; - if (extensionOwner != InvalidOid) + if (MetadataCache.extensionOwner != InvalidOid) { - return extensionOwner; + return MetadataCache.extensionOwner; } relation = heap_open(ExtensionRelationId, AccessShareLock); @@ -1693,8 +1716,8 @@ CitusExtensionOwner(void) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("citus extension needs to be owned by superuser"))); } - extensionOwner = extensionForm->extowner; - Assert(OidIsValid(extensionOwner)); + MetadataCache.extensionOwner = extensionForm->extowner; + Assert(OidIsValid(MetadataCache.extensionOwner)); } else { @@ -1706,7 +1729,7 @@ CitusExtensionOwner(void) heap_close(relation, AccessShareLock); - return extensionOwner; + return MetadataCache.extensionOwner; } @@ -2382,27 +2405,9 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) * This happens pretty rarely, but most importantly happens during * DROP EXTENSION citus; */ - if (relationId != InvalidOid && relationId == distPartitionRelationId) + if (relationId != InvalidOid && relationId == MetadataCache.distPartitionRelationId) { - extensionLoaded = false; - distShardRelationId = InvalidOid; - distShardPlacementRelationId = InvalidOid; - distLocalGroupRelationId = InvalidOid; - distNodeRelationId = InvalidOid; - distColocationRelationId = InvalidOid; - distColocationConfigurationIndexId = InvalidOid; - distColocationColocationidIndexId = InvalidOid; - distPartitionRelationId = InvalidOid; - distPartitionLogicalRelidIndexId = InvalidOid; - distPartitionColocationidIndexId = InvalidOid; - distShardLogicalRelidIndexId = InvalidOid; - distShardShardidIndexId = InvalidOid; - distShardPlacementShardidIndexId = InvalidOid; - distShardPlacementPlacementidIndexId = InvalidOid; - distTransactionRelationId = InvalidOid; - distTransactionGroupIndexId = InvalidOid; - extraDataContainerFuncId = InvalidOid; - workerHashFunctionId = InvalidOid; + memset(&MetadataCache, 0, sizeof(MetadataCache)); } } @@ -2461,7 +2466,7 @@ DistTableOidList(void) static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId) { - if (relationId == InvalidOid || relationId == distNodeRelationId) + if (relationId == InvalidOid || relationId == MetadataCache.distNodeRelationId) { workerNodeHashValid = false; } @@ -2476,7 +2481,7 @@ static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId) { /* when invalidation happens simply set the LocalGroupId to the default value */ - if (relationId == InvalidOid || relationId == distLocalGroupRelationId) + if (relationId == InvalidOid || relationId == MetadataCache.distLocalGroupRelationId) { LocalGroupId = -1; } From 1691f780fd6ace1d1af62d94dcc36477d8609179 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 20 Jun 2017 16:55:11 -0700 Subject: [PATCH 2/5] Force cache invalidation machinery to be initialized earlier. Previously it was not guaranteed that invalidations were registered after creating the extension, only if the extension was used afterwards. --- .../distributed/executor/multi_utility.c | 6 + .../distributed/utils/metadata_cache.c | 119 ++++++++++-------- src/test/regress/expected/multi_extension.out | 8 -- src/test/regress/sql/multi_extension.sql | 4 +- 4 files changed, 76 insertions(+), 61 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index c4af3de3a..fb9601115 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -453,6 +453,12 @@ multi_ProcessUtility(Node *parsetree, ProcessVacuumStmt(vacuumStmt, queryString); } + + /* + * Ensure value is valid, we can't do some checks during CREATE + * EXTENSION. This is important to register some invalidation callbacks. + */ + CitusHasBeenLoaded(); } diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 66d652090..d0ec677bf 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -130,8 +130,6 @@ static HTAB *DistShardCacheHash = NULL; static HTAB *WorkerNodeHash = NULL; static bool workerNodeHashValid = false; -static bool invalidationRegistered = false; - /* default value is -1, for coordinator it's 0 and for worker nodes > 0 */ static int LocalGroupId = -1; @@ -160,8 +158,11 @@ static char * InstalledExtensionVersion(void); static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray, int shardIntervalArrayLength, FmgrInfo *shardIntervalSortCompareFunction); +static void InitializeCaches(void); static void InitializeDistTableCache(void); static void InitializeWorkerNodeCache(void); +static void RegisterWorkerNodeCacheCallbacks(void); +static void RegisterLocalGroupIdCacheCallbacks(void); static uint32 WorkerNodeHashCode(const void *key, Size keySize); static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); @@ -413,11 +414,7 @@ LookupShardCacheEntry(int64 shardId) Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING)); - /* probably not reachable */ - if (DistShardCacheHash == NULL) - { - InitializeDistTableCache(); - } + InitializeCaches(); /* lookup cache entry */ shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache); @@ -523,10 +520,7 @@ LookupDistTableCacheEntry(Oid relationId) return NULL; } - if (DistTableCacheHash == NULL) - { - InitializeDistTableCache(); - } + InitializeCaches(); /* * If the version is not known to be compatible, perform thorough check, @@ -1316,6 +1310,8 @@ AvailableExtensionVersion(void) bool doCopy = false; char *availableExtensionVersion; + InitializeCaches(); + estate = CreateExecutorState(); extensionsResultSet = makeNode(ReturnSetInfo); extensionsResultSet->econtext = GetPerTupleExprContext(estate); @@ -1348,10 +1344,6 @@ AvailableExtensionVersion(void) Datum availableVersion = slot_getattr(tupleTableSlot, 2, &isNull); /* we will cache the result of citus version to prevent catalog access */ - if (CacheMemoryContext == NULL) - { - CreateCacheMemoryContext(); - } oldMemoryContext = MemoryContextSwitchTo(CacheMemoryContext); availableExtensionVersion = text_to_cstring(DatumGetTextPP(availableVersion)); @@ -1418,11 +1410,6 @@ InstalledExtensionVersion(void) } /* we will cache the result of citus version to prevent catalog access */ - if (CacheMemoryContext == NULL) - { - CreateCacheMemoryContext(); - } - oldMemoryContext = MemoryContextSwitchTo(CacheMemoryContext); installedExtensionVersion = text_to_cstring(DatumGetTextPP(installedVersion)); @@ -1995,18 +1982,39 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS) } +/* + * InitializeCaches() registers invalidation handlers for metadata_cache.c's + * caches. + */ +static void +InitializeCaches(void) +{ + static bool performedInitialization = false; + + if (!performedInitialization) + { + /* set first, to avoid recursion dangers */ + performedInitialization = true; + + /* make sure we've initialized CacheMemoryContext */ + if (CacheMemoryContext == NULL) + { + CreateCacheMemoryContext(); + } + + InitializeDistTableCache(); + RegisterWorkerNodeCacheCallbacks(); + RegisterLocalGroupIdCacheCallbacks(); + } +} + + /* initialize the infrastructure for the metadata cache */ static void InitializeDistTableCache(void) { HASHCTL info; - /* make sure we've initialized CacheMemoryContext */ - if (CacheMemoryContext == NULL) - { - CreateCacheMemoryContext(); - } - /* build initial scan keys, copied for every relation scan */ memset(&DistPartitionScanKey, 0, sizeof(DistPartitionScanKey)); @@ -2060,6 +2068,8 @@ InitializeDistTableCache(void) HTAB * GetWorkerNodeHash(void) { + InitializeCaches(); /* ensure relevant callbacks are registered */ + /* * We might have some concurrent metadata changes. In order to get the changes, * we first need to accept the cache invalidation messages. @@ -2085,7 +2095,6 @@ GetWorkerNodeHash(void) static void InitializeWorkerNodeCache(void) { - static bool invalidationRegistered = false; HTAB *oldWorkerNodeHash = NULL; List *workerNodeList = NIL; ListCell *workerNodeCell = NULL; @@ -2093,11 +2102,7 @@ InitializeWorkerNodeCache(void) int hashFlags = 0; long maxTableSize = (long) MaxWorkerNodesTracked; - /* make sure we've initialized CacheMemoryContext */ - if (CacheMemoryContext == NULL) - { - CreateCacheMemoryContext(); - } + InitializeCaches(); /* * Create the hash that holds the worker nodes. The key is the combination of @@ -2155,16 +2160,20 @@ InitializeWorkerNodeCache(void) /* now, safe to destroy the old hash */ hash_destroy(oldWorkerNodeHash); +} - /* prevent multiple invalidation registrations */ - if (!invalidationRegistered) - { - /* Watch for invalidation events. */ - CacheRegisterRelcacheCallback(InvalidateNodeRelationCacheCallback, - (Datum) 0); - invalidationRegistered = true; - } +/* + * RegisterWorkerNodeCacheCallbacks registers the callbacks required for the + * worker node cache. It's separate from InitializeWorkerNodeCache so the + * callback can be registered early, before the metadata tables exist. + */ +static void +RegisterWorkerNodeCacheCallbacks(void) +{ + /* Watch for invalidation events. */ + CacheRegisterRelcacheCallback(InvalidateNodeRelationCacheCallback, + (Datum) 0); } @@ -2185,6 +2194,8 @@ GetLocalGroupId(void) Relation pgDistLocalGroupId = NULL; Oid localGroupTableOid = InvalidOid; + InitializeCaches(); + /* * Already set the group id, no need to read the heap again. */ @@ -2226,16 +2237,6 @@ GetLocalGroupId(void) systable_endscan(scanDescriptor); heap_close(pgDistLocalGroupId, AccessShareLock); - /* prevent multiple invalidation registrations */ - if (!invalidationRegistered) - { - /* Watch for invalidation events. */ - CacheRegisterRelcacheCallback(InvalidateLocalGroupIdRelationCacheCallback, - (Datum) 0); - - invalidationRegistered = true; - } - /* set the local cache variable */ LocalGroupId = groupId; @@ -2243,6 +2244,21 @@ GetLocalGroupId(void) } +/* + * RegisterLocalGroupIdCacheCallbacks registers the callbacks required to + * maintain LocalGroupId at a consistent value. It's separate from + * GetLocalGroupId so the callback can be registered early, before metadata + * tables exist. + */ +static void +RegisterLocalGroupIdCacheCallbacks(void) +{ + /* Watch for invalidation events. */ + CacheRegisterRelcacheCallback(InvalidateLocalGroupIdRelationCacheCallback, + (Datum) 0); +} + + /* * WorkerNodeHashCode computes the hash code for a worker node from the node's * host name and port number. Nodes that only differ by their rack locations @@ -2739,6 +2755,9 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid interva static void CachedRelationLookup(const char *relationName, Oid *cachedOid) { + /* force callbacks to be registered, so we always get notified upon changes */ + InitializeCaches(); + if (*cachedOid == InvalidOid) { *cachedOid = get_relname_relid(relationName, PG_CATALOG_NAMESPACE); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 08762ae0e..1e026a8d8 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -119,7 +119,6 @@ CREATE TABLE version_mismatch_table(column1 int); \copy version_mismatch_table FROM STDIN; -- Test INSERT INSERT INTO version_mismatch_table(column1) VALUES(5); - -- Test SELECT SELECT * FROM version_mismatch_table ORDER BY column1; column1 @@ -177,13 +176,6 @@ DROP EXTENSION citus; CREATE EXTENSION citus; -- test cache invalidation in workers \c - - - :worker_1_port --- this will initialize the cache -\d - List of relations - Schema | Name | Type | Owner ---------+------+------+------- -(0 rows) - DROP EXTENSION citus; SET citus.enable_version_checks TO 'false'; CREATE EXTENSION citus VERSION '5.2-4'; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 7c6ed0b7f..cfbb677c9 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -120,7 +120,7 @@ CREATE TABLE version_mismatch_table(column1 int); -- Test INSERT INSERT INTO version_mismatch_table(column1) VALUES(5); - + -- Test SELECT SELECT * FROM version_mismatch_table ORDER BY column1; @@ -164,8 +164,6 @@ CREATE EXTENSION citus; -- test cache invalidation in workers \c - - - :worker_1_port --- this will initialize the cache -\d DROP EXTENSION citus; SET citus.enable_version_checks TO 'false'; CREATE EXTENSION citus VERSION '5.2-4'; From 3483bb99eb1329f4ac90bcb8b8a04ec1a7ab6045 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sun, 11 Jun 2017 18:22:29 -0700 Subject: [PATCH 3/5] Minimal infrastructure for per-backend citus initialization. --- src/backend/distributed/shared_library_init.c | 17 ++++++++++++++++- src/backend/distributed/utils/metadata_cache.c | 8 ++++++++ src/include/distributed/shared_library_init.h | 16 ++++++++++++++++ 3 files changed, 40 insertions(+), 1 deletion(-) create mode 100644 src/include/distributed/shared_library_init.h diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 181242e8b..f6a03702b 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * shared_library_init.c - * Initialize Citus extension + * Functionality related to the initialization of the Citus extension. * * Copyright (c) 2012-2016, Citus Data, Inc. *------------------------------------------------------------------------- @@ -36,6 +36,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/placement_connection.h" #include "distributed/remote_commands.h" +#include "distributed/shared_library_init.h" #include "distributed/task_tracker.h" #include "distributed/transaction_management.h" #include "distributed/worker_manager.h" @@ -177,6 +178,20 @@ _PG_init(void) } +/* + * StartupCitusBackend initializes per-backend infrastructure, and is called + * the first time citus is used in a database. + * + * NB: All code here has to be able to cope with this routine being called + * multiple times in the same backend. This will e.g. happen when the + * extension is created or upgraded. + */ +void +StartupCitusBackend(void) +{ +} + + /* * CreateRequiredDirectories - Create directories required for Citus to * function. diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index d0ec677bf..9dd62087e 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -35,6 +35,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard_placement.h" +#include "distributed/shared_library_init.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" @@ -1118,6 +1119,13 @@ CitusHasBeenLoaded(void) { extensionScriptExecuted = false; } + + /* + * Whenever the extension exists, even when currently creating it, + * we need the infrastructure to run citus in this database to be + * ready. + */ + StartupCitusBackend(); } /* we disable extension features during pg_upgrade */ diff --git a/src/include/distributed/shared_library_init.h b/src/include/distributed/shared_library_init.h new file mode 100644 index 000000000..35f3d0e16 --- /dev/null +++ b/src/include/distributed/shared_library_init.h @@ -0,0 +1,16 @@ +/*------------------------------------------------------------------------- + * + * shared_library_init.h + * Functionality related to the initialization of the Citus extension. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef SHARED_LIBRARY_INIT_H +#define SHARED_LIBRARY_INIT_H + +extern void StartupCitusBackend(void); + +#endif /* SHARED_LIBRARY_INIT_H */ From c3b7c5dc336f2c83b840d2a50479c3e6858e8fbf Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sun, 11 Jun 2017 18:24:12 -0700 Subject: [PATCH 4/5] Introduce per-database maintenance process. This will be used for deadlock detection, prepared transaction recovery amongst others, but currently is just idling around. --- src/backend/distributed/shared_library_init.c | 4 + src/backend/distributed/utils/maintenanced.c | 412 ++++++++++++++++++ src/include/distributed/maintenanced.h | 20 + 3 files changed, 436 insertions(+) create mode 100644 src/backend/distributed/utils/maintenanced.c create mode 100644 src/include/distributed/maintenanced.h diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index f6a03702b..0c09eb6f6 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -22,6 +22,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/connection_management.h" #include "distributed/connection_management.h" +#include "distributed/maintenanced.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/multi_copy.h" @@ -161,6 +162,8 @@ _PG_init(void) set_rel_pathlist_hook = multi_relation_restriction_hook; set_join_pathlist_hook = multi_join_restriction_hook; + InitializeMaintenanceDaemon(); + /* organize that task tracker is started once server is up */ TaskTrackerRegister(); @@ -189,6 +192,7 @@ _PG_init(void) void StartupCitusBackend(void) { + InitializeMaintenanceDaemonBackend(); } diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c new file mode 100644 index 000000000..bf90c6df7 --- /dev/null +++ b/src/backend/distributed/utils/maintenanced.c @@ -0,0 +1,412 @@ +/*------------------------------------------------------------------------- + * + * maintenanced.c + * Background worker run for each citus using database in a postgres + * cluster. + * + * This file provides infrastructure for launching exactly one a background + * worker for every database in which citus is used. That background worker + * can then perform work like deadlock detection, prepared transaction + * recovery, and cleanup. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + + +#include "miscadmin.h" +#include "pgstat.h" + +#include "access/xact.h" +#include "libpq/pqsignal.h" +#include "distributed/maintenanced.h" +#include "distributed/metadata_cache.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "storage/latch.h" +#include "storage/lwlock.h" +#include "tcop/tcopprot.h" + + +/* + * Shared memory data for all maintenance workers. + */ +typedef struct MaintenanceDaemonControlData +{ + /* + * Lock protecting the shared memory state. This is to be taken when + * looking up (shared mode) or inserting (exclusive mode) per-database + * data in dbHash. + */ + int trancheId; + LWLockTranche lockTranche; + LWLock lock; + + /* + * Hash-table of workers, one entry for each database with citus + * activated. + */ + HTAB *dbHash; +} MaintenanceDaemonControlData; + + +/* + * Per database worker state. + */ +typedef struct MaintenanceDaemonDBData +{ + /* hash key: database to run on */ + Oid databaseOid; + + /* information: which user to use */ + Oid userOid; + bool daemonStarted; + Latch *latch; /* pointer to the background worker's latch */ +} MaintenanceDaemonDBData; + + +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; +static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL; + +static volatile sig_atomic_t got_SIGHUP = false; + +static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS); +static size_t MaintenanceDaemonShmemSize(void); +static void MaintenanceDaemonShmemInit(void); +static void MaintenanceDaemonErrorContext(void *arg); + +/* + * InitializeMaintenanceDaemon, called at server start, is responsible for + * requesting shared memory and related infrastructure required by maintenance + * daemons. + */ +void +InitializeMaintenanceDaemon(void) +{ + RequestAddinShmemSpace(MaintenanceDaemonShmemSize()); + + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = MaintenanceDaemonShmemInit; +} + + +/* + * InitializeMaintenanceDaemonBackend, called at backend start and + * configuration changes, is responsible for starting a per-database + * maintenance worker if necessary. + */ +void +InitializeMaintenanceDaemonBackend(void) +{ + MaintenanceDaemonDBData *dbData = NULL; + Oid extensionOwner = CitusExtensionOwner(); + bool found; + + LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); + + dbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonControl->dbHash, + &MyDatabaseId, + HASH_ENTER_NULL, &found); + + if (dbData == NULL) + { + /* FIXME: better message, reference relevant guc in hint */ + ereport(ERROR, (errmsg("ran out of database slots"))); + } + + if (!found || !dbData->daemonStarted) + { + BackgroundWorker worker; + BackgroundWorkerHandle *handle = NULL; + int pid = 0; + + dbData->userOid = extensionOwner; + + memset(&worker, 0, sizeof(worker)); + + snprintf(worker.bgw_name, BGW_MAXLEN, + "Citus Maintenance Daemon: %u/%u", + MyDatabaseId, extensionOwner); + + /* request ability to connect to target database */ + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + + /* + * No point in getting started before able to run query, but we do + * want to get started on Hot-Stanby standbys. + */ + worker.bgw_start_time = BgWorkerStart_ConsistentState; + + /* + * Restart after a bit after errors, but don't bog the system. + */ + worker.bgw_restart_time = 5; + sprintf(worker.bgw_library_name, "citus"); + sprintf(worker.bgw_function_name, "CitusMaintenanceDaemonMain"); + worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); + memcpy(worker.bgw_extra, &extensionOwner, sizeof(Oid)); + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + ereport(ERROR, (errmsg("could not start maintenance background worker"), + errhint("Increasing max_worker_processes might help."))); + } + + dbData->daemonStarted = true; + LWLockRelease(&MaintenanceDaemonControl->lock); + + WaitForBackgroundWorkerStartup(handle, &pid); + } + else + { + Assert(dbData->daemonStarted); + + /* + * If owner of extension changed, wake up daemon. It'll notice and + * restart. + */ + if (dbData->userOid != extensionOwner) + { + dbData->userOid = extensionOwner; + if (dbData->latch) + { + SetLatch(dbData->latch); + } + } + LWLockRelease(&MaintenanceDaemonControl->lock); + } +} + + +/* + * CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll + * be started by the background worker infrastructure. If it errors out, + * it'll be restarted after a few seconds. + */ +void +CitusMaintenanceDaemonMain(Datum main_arg) +{ + Oid databaseOid = DatumGetObjectId(main_arg); + MaintenanceDaemonDBData *myDbData = NULL; + ErrorContextCallback errorCallback; + + /* + * Look up this worker's configuration. + */ + LWLockAcquire(&MaintenanceDaemonControl->lock, LW_SHARED); + + myDbData = (MaintenanceDaemonDBData *) + hash_search(MaintenanceDaemonControl->dbHash, &databaseOid, + HASH_FIND, NULL); + if (!myDbData) + { + /* should never happen */ + ereport(ERROR, (errmsg("got lost finding myself"))); + } + LWLockRelease(&MaintenanceDaemonControl->lock); + + + myDbData->latch = MyLatch; + + /* + * Setup error context so log messages can be properly attributed. Some of + * them otherwise sound like they might be from a normal user connection. + * Do so before setting up signals etc, so we never exit without the + * context setup. + */ + memset(&errorCallback, 0, sizeof(errorCallback)); + errorCallback.callback = MaintenanceDaemonErrorContext; + errorCallback.arg = (void *) myDbData; + errorCallback.previous = error_context_stack; + error_context_stack = &errorCallback; + + /* wire up signals */ + pqsignal(SIGTERM, die); + pqsignal(SIGHUP, MaintenanceDaemonSigHupHandler); + BackgroundWorkerUnblockSignals(); + + elog(LOG, "starting maintenance daemon on database %u user %u", + databaseOid, myDbData->userOid); + + /* connect to database, after that we can actually access catalogs */ + BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid); + + /* make worker recognizable in pg_stat_activity */ + pgstat_report_appname("Citus Maintenance Daemon"); + + /* enter main loop */ + for (;;) + { + int rc; + int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; + int timeout = 10000; /* wake up at least every so often */ + + CHECK_FOR_INTERRUPTS(); + + /* + * Perform Work. If a specific task needs to be called sooner than + * timeout indicates, it's ok to lower it to that value. Expensive + * tasks should do their own time math about whether to re-run checks. + */ + + /* + * Wait until timeout, or until somebody wakes us up. + */ + rc = WaitLatch(MyLatch, latchFlags, timeout); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + { + proc_exit(1); + } + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + + /* check for changed configuration */ + if (myDbData->userOid != GetSessionUserId()) + { + /* return code of 1 requests worker restart */ + proc_exit(1); + } + + /* + * Could also add code checking whether extension still exists, + * but that'd complicate things a bit, because we'd have to delete + * the shared memory entry. There'd potentially be a race + * condition where the extension gets re-created, checking that + * this entry still exists, and it getting deleted just after. + * Doesn't seem worth catering for that. + */ + } + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + } +} + + +/* + * MaintenanceDaemonShmemSize computes how much shared memory is required. + */ +static size_t +MaintenanceDaemonShmemSize(void) +{ + Size size = 0; + Size hashSize = 0; + + size = add_size(size, sizeof(MaintenanceDaemonControlData)); + + /* + * We request enough shared memory to have one hash-table entry for each + * worker process. We couldn't start more anyway, so there's little point + * in allocating more. + */ + hashSize = hash_estimate_size(max_worker_processes, sizeof(MaintenanceDaemonDBData)); + size = add_size(size, hashSize); + + return size; +} + + +/* + * MaintenanceDaemonShmemInit initializes the requested shared memory for the + * maintenance daemon. + */ +static void +MaintenanceDaemonShmemInit(void) +{ + bool alreadyInitialized = false; + HASHCTL hashInfo; + int hashFlags = 0; + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + MaintenanceDaemonControl = + (MaintenanceDaemonControlData *) ShmemInitStruct("Citus Maintenance Daemon", + MaintenanceDaemonShmemSize(), + &alreadyInitialized); + + /* + * Might already be initialized on EXEC_BACKEND type platforms that call + * shared library initialization functions in every backend. + */ + if (!alreadyInitialized) + { + /* initialize lwlock */ + LWLockTranche *tranche = &MaintenanceDaemonControl->lockTranche; + + /* start by zeroing out all the memory */ + memset(MaintenanceDaemonControl, 0, MaintenanceDaemonShmemSize()); + + /* initialize lock */ + MaintenanceDaemonControl->trancheId = LWLockNewTrancheId(); + tranche->array_base = &MaintenanceDaemonControl->lock; + tranche->array_stride = sizeof(LWLock); + tranche->name = "Citus Maintenance Daemon"; + LWLockRegisterTranche(MaintenanceDaemonControl->trancheId, tranche); + LWLockInitialize(&MaintenanceDaemonControl->lock, + MaintenanceDaemonControl->trancheId); + } + + + memset(&hashInfo, 0, sizeof(hashInfo)); + hashInfo.keysize = sizeof(Oid); + hashInfo.entrysize = sizeof(MaintenanceDaemonDBData); + hashInfo.hash = tag_hash; + hashFlags = (HASH_ELEM | HASH_FUNCTION); + + MaintenanceDaemonControl->dbHash = + ShmemInitHash("Maintenance Database Hash", + max_worker_processes, max_worker_processes, + &hashInfo, hashFlags); + + LWLockRelease(AddinShmemInitLock); + + if (prev_shmem_startup_hook != NULL) + { + prev_shmem_startup_hook(); + } +} + + +/* + * MaintenanceDaemonSigHupHandler set a flag to re-read config file at next + * convenient time. + */ +static void +MaintenanceDaemonSigHupHandler(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGHUP = true; + if (MyProc != NULL) + { + SetLatch(&MyProc->procLatch); + } + + errno = save_errno; +} + + +/* + * MaintenanceDaemonErrorContext adds some context to log messages to make it + * easier to associate them with the maintenance daemon. + */ +static void +MaintenanceDaemonErrorContext(void *arg) +{ + MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) arg; + errcontext("Citus maintenance daemon for database %u user %u", + myDbData->databaseOid, myDbData->userOid); +} diff --git a/src/include/distributed/maintenanced.h b/src/include/distributed/maintenanced.h new file mode 100644 index 000000000..bb6ebcd6b --- /dev/null +++ b/src/include/distributed/maintenanced.h @@ -0,0 +1,20 @@ +/*------------------------------------------------------------------------- + * + * maintenanced.h + * Background worker run for each citus using database in a postgres + * cluster. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef MAINTENANCED_H +#define MAINTENANCED_H + +extern void InitializeMaintenanceDaemon(void); +extern void InitializeMaintenanceDaemonBackend(void); + +extern void CitusMaintenanceDaemonMain(Datum main_arg); + +#endif /* MAINTENANCED_H */ From 4a3b2de4c55fe1baff46d877acee27a28e680550 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 20 Jun 2017 17:57:18 -0700 Subject: [PATCH 5/5] Add some tests checking that maintenance daemon gets started. The 2nd database one is a bit slow, but also shows something important, so we might want to keep it? --- src/test/regress/expected/multi_extension.out | 101 +++++++++++++++++- src/test/regress/sql/multi_extension.sql | 92 +++++++++++++++- 2 files changed, 189 insertions(+), 4 deletions(-) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 1e026a8d8..5e30d5868 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -7,6 +7,36 @@ -- not done yet. ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 580000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 580000; +CREATE SCHEMA test; +CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database()) + RETURNS pg_stat_activity + LANGUAGE plpgsql +AS $$ +DECLARE + activity record; +BEGIN + LOOP + SELECT * INTO activity FROM pg_stat_activity + WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname; + IF activity.pid IS NOT NULL THEN + RETURN activity; + ELSE + PERFORM pg_sleep(0.1); + PERFORM pg_stat_clear_snapshot(); + END IF ; + END LOOP; +END; +$$; +-- check maintenance daemon is started +SELECT datname, + datname = current_database(), + usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus') +FROM test.maintenance_worker(); + datname | ?column? | ?column? +------------+----------+---------- + regression | t | t +(1 row) + -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, @@ -15,7 +45,7 @@ FROM pg_depend AS pgd, WHERE pgd.refclassid = 'pg_extension'::regclass AND pgd.refobjid = pge.oid AND pge.extname = 'citus' AND - pgio.schema NOT IN ('pg_catalog', 'citus'); + pgio.schema NOT IN ('pg_catalog', 'citus', 'test'); count ------- 0 @@ -96,7 +126,7 @@ FROM pg_depend AS pgd, WHERE pgd.refclassid = 'pg_extension'::regclass AND pgd.refobjid = pge.oid AND pge.extname = 'citus' AND - pgio.schema NOT IN ('pg_catalog', 'citus'); + pgio.schema NOT IN ('pg_catalog', 'citus', 'test'); count ------- 0 @@ -189,3 +219,70 @@ ALTER EXTENSION citus UPDATE; --------+------+------+------- (0 rows) +\c - - - :master_port +-- check that maintenance daemon gets (re-)started for the right user +DROP EXTENSION citus; +CREATE USER testuser SUPERUSER; +SET ROLE testuser; +CREATE EXTENSION citus; +SELECT datname, + datname = current_database(), + usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus') +FROM test.maintenance_worker(); + datname | ?column? | ?column? +------------+----------+---------- + regression | t | t +(1 row) + +-- and recreate as the right owner +RESET ROLE; +DROP EXTENSION citus; +CREATE EXTENSION citus; +-- Check that maintenance daemon can also be started in another database +CREATE DATABASE another; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c another +CREATE EXTENSION citus; +CREATE SCHEMA test; +CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database()) + RETURNS pg_stat_activity + LANGUAGE plpgsql +AS $$ +DECLARE + activity record; +BEGIN + LOOP + SELECT * INTO activity FROM pg_stat_activity + WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname; + IF activity.pid IS NOT NULL THEN + RETURN activity; + ELSE + PERFORM pg_sleep(0.1); + PERFORM pg_stat_clear_snapshot(); + END IF ; + END LOOP; +END; +$$; +SELECT datname, + datname = current_database(), + usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus') +FROM test.maintenance_worker(); + datname | ?column? | ?column? +---------+----------+---------- + another | t | t +(1 row) + +-- Test that database with active worker can be dropped. That'll +-- require killing the maintenance worker. +\c regression +SELECT datname, + pg_terminate_backend(pid) +FROM test.maintenance_worker('another'); + datname | pg_terminate_backend +---------+---------------------- + another | t +(1 row) + +DROP DATABASE another; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index cfbb677c9..5f5c40c47 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -10,6 +10,34 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 580000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 580000; +CREATE SCHEMA test; + +CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database()) + RETURNS pg_stat_activity + LANGUAGE plpgsql +AS $$ +DECLARE + activity record; +BEGIN + LOOP + SELECT * INTO activity FROM pg_stat_activity + WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname; + IF activity.pid IS NOT NULL THEN + RETURN activity; + ELSE + PERFORM pg_sleep(0.1); + PERFORM pg_stat_clear_snapshot(); + END IF ; + END LOOP; +END; +$$; + +-- check maintenance daemon is started +SELECT datname, + datname = current_database(), + usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus') +FROM test.maintenance_worker(); + -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, @@ -18,7 +46,8 @@ FROM pg_depend AS pgd, WHERE pgd.refclassid = 'pg_extension'::regclass AND pgd.refobjid = pge.oid AND pge.extname = 'citus' AND - pgio.schema NOT IN ('pg_catalog', 'citus'); + pgio.schema NOT IN ('pg_catalog', 'citus', 'test'); + -- DROP EXTENSION pre-created by the regression suite DROP EXTENSION citus; @@ -94,7 +123,7 @@ FROM pg_depend AS pgd, WHERE pgd.refclassid = 'pg_extension'::regclass AND pgd.refobjid = pge.oid AND pge.extname = 'citus' AND - pgio.schema NOT IN ('pg_catalog', 'citus'); + pgio.schema NOT IN ('pg_catalog', 'citus', 'test'); -- see incompatible version errors out RESET citus.enable_version_checks; @@ -173,3 +202,62 @@ ALTER EXTENSION citus UPDATE; -- if cache is invalidated succesfull, this \d should work without any problem \d + +\c - - - :master_port + +-- check that maintenance daemon gets (re-)started for the right user +DROP EXTENSION citus; +CREATE USER testuser SUPERUSER; +SET ROLE testuser; +CREATE EXTENSION citus; + +SELECT datname, + datname = current_database(), + usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus') +FROM test.maintenance_worker(); + +-- and recreate as the right owner +RESET ROLE; +DROP EXTENSION citus; +CREATE EXTENSION citus; + + +-- Check that maintenance daemon can also be started in another database +CREATE DATABASE another; +\c another +CREATE EXTENSION citus; + +CREATE SCHEMA test; + +CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database()) + RETURNS pg_stat_activity + LANGUAGE plpgsql +AS $$ +DECLARE + activity record; +BEGIN + LOOP + SELECT * INTO activity FROM pg_stat_activity + WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname; + IF activity.pid IS NOT NULL THEN + RETURN activity; + ELSE + PERFORM pg_sleep(0.1); + PERFORM pg_stat_clear_snapshot(); + END IF ; + END LOOP; +END; +$$; + +SELECT datname, + datname = current_database(), + usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus') +FROM test.maintenance_worker(); + +-- Test that database with active worker can be dropped. That'll +-- require killing the maintenance worker. +\c regression +SELECT datname, + pg_terminate_backend(pid) +FROM test.maintenance_worker('another'); +DROP DATABASE another;