From b4584cfcd7fbbc55b5a04b838827070a26a3a32a Mon Sep 17 00:00:00 2001 From: EmelSimsek Date: Fri, 6 Oct 2023 10:01:07 +0300 Subject: [PATCH] Some tests with changing hash key --- src/backend/distributed/shared_library_init.c | 5 +- src/backend/distributed/utils/maintenanced.c | 85 ++++++++++++------- src/include/distributed/maintenanced.h | 2 +- 3 files changed, 58 insertions(+), 34 deletions(-) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index e5d593295..d7397f666 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -27,6 +27,7 @@ #include "catalog/objectaccess.h" #include "catalog/pg_extension.h" #include "citus_version.h" +#include "commands/dbcommands.h" #include "commands/explain.h" #include "commands/extension.h" #include "common/string.h" @@ -767,7 +768,9 @@ IsSequenceOverflowError(ErrorData *edata) void StartupCitusBackend(void) { - InitializeMaintenanceDaemonBackend(); + Oid superUser = CitusExtensionOwner(); + + InitializeMaintenanceDaemonBackend(get_database_name(MyDatabaseId), superUser); /* * For query backends this will be a no-op, because InitializeBackendData diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 5f49de20a..536a01f93 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -30,6 +30,7 @@ #include "catalog/pg_authid.h" #include "catalog/pg_namespace.h" #include "commands/async.h" +#include "commands/dbcommands.h" #include "commands/extension.h" #include "libpq/pqsignal.h" #include "catalog/namespace.h" @@ -83,9 +84,10 @@ typedef struct MaintenanceDaemonControlData typedef struct MaintenanceDaemonDBData { /* hash key: database to run on */ - Oid databaseOid; + char databaseName[NAMEDATALEN]; /* information: which user to use */ + NameData dbname; Oid userOid; pid_t workerPid; bool daemonStarted; @@ -145,32 +147,32 @@ InitializeMaintenanceDaemon(void) * maintenance worker if necessary. */ void -InitializeMaintenanceDaemonBackend(void) +InitializeMaintenanceDaemonBackend(const char* databaseName, Oid userOid) { - Oid extensionOwner = CitusExtensionOwner(); bool found; LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search( MaintenanceDaemonDBHash, - &MyDatabaseId, - HASH_ENTER_NULL, + databaseName, + HASH_ENTER, &found); if (dbData == NULL) { WarnMaintenanceDaemonNotStarted(); LWLockRelease(&MaintenanceDaemonControl->lock); - + elog(LOG, "dbData is NULL"); return; } if (!found) { /* ensure the values in MaintenanceDaemonDBData are zero */ - memset(((char *) dbData) + sizeof(Oid), 0, - sizeof(MaintenanceDaemonDBData) - sizeof(Oid)); + memset(((char *) dbData) + sizeof(char)*NAMEDATALEN, 0, + sizeof(MaintenanceDaemonDBData) - sizeof(char)*NAMEDATALEN); + } if (IsMaintenanceDaemon) @@ -194,8 +196,8 @@ InitializeMaintenanceDaemonBackend(void) memset(&worker, 0, sizeof(worker)); SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name), - "Citus Maintenance Daemon: %u/%u", - MyDatabaseId, extensionOwner); + "Citus Maintenance Daemon: %s/%u", + databaseName, userOid); /* request ability to connect to target database */ worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; @@ -213,8 +215,11 @@ InitializeMaintenanceDaemonBackend(void) strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), "CitusMaintenanceDaemonMain"); - worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); - memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner, + namestrcpy(&(dbData->dbname), databaseName); + worker.bgw_main_arg = NameGetDatum(&(dbData->dbname)); + elog(LOG, "worker.bgw_main_arg databaseName = %s", databaseName); + + memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &userOid, sizeof(Oid)); worker.bgw_notify_pid = MyProcPid; @@ -227,8 +232,10 @@ InitializeMaintenanceDaemonBackend(void) return; } + elog(LOG, "RegisterDynamicBackgroundWorker"); + dbData->daemonStarted = true; - dbData->userOid = extensionOwner; + dbData->userOid = userOid; dbData->workerPid = 0; dbData->triggerNodeMetadataSync = false; LWLockRelease(&MaintenanceDaemonControl->lock); @@ -246,9 +253,9 @@ InitializeMaintenanceDaemonBackend(void) * If owner of extension changed, wake up daemon. It'll notice and * restart. */ - if (dbData->userOid != extensionOwner) + if (dbData->userOid != userOid) { - dbData->userOid = extensionOwner; + dbData->userOid = userOid; if (dbData->latch) { SetLatch(dbData->latch); @@ -278,7 +285,9 @@ WarnMaintenanceDaemonNotStarted(void) void CitusMaintenanceDaemonMain(Datum main_arg) { - Oid databaseOid = DatumGetObjectId(main_arg); + char* databaseName = NameStr(*DatumGetName(main_arg)); + + elog(LOG, "databaseName = %s", databaseName); TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000); bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false; @@ -304,7 +313,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) - hash_search(MaintenanceDaemonDBHash, &databaseOid, + hash_search(MaintenanceDaemonDBHash, databaseName, HASH_FIND, NULL); if (!myDbData) { @@ -315,6 +324,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) * to properly add it to the hash. */ + elog(LOG, "!myDbData"); proc_exit(0); } @@ -327,6 +337,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) * In that case, the first one stays and the last one exits. */ + elog(LOG, "Another maintenance daemon is running."); proc_exit(0); } @@ -370,11 +381,14 @@ CitusMaintenanceDaemonMain(Datum main_arg) error_context_stack = &errorCallback; - elog(LOG, "starting maintenance daemon on database %u user %u", - databaseOid, myDbData->userOid); + elog(LOG, "starting maintenance daemon on database %s user %u", + databaseName, myDbData->userOid); /* connect to database, after that we can actually access catalogs */ - BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0); + BackgroundWorkerInitializeConnection(databaseName, NULL, BGWORKER_BYPASS_ALLOWCONN); + + elog(LOG, "BackgroundWorkerInitializeConnection %s DBId %u userid %u", + databaseName, MyDatabaseId, GetAuthenticatedUserId()); /* make worker recognizable in pg_stat_activity */ pgstat_report_appname("Citus Maintenance Daemon"); @@ -383,7 +397,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) * Terminate orphaned metadata sync daemons spawned from previously terminated * or crashed maintenanced instances. */ - SignalMetadataSyncDaemon(databaseOid, SIGTERM); + SignalMetadataSyncDaemon(MyDatabaseId, SIGTERM); /* enter main loop */ while (!got_SIGTERM) @@ -773,8 +787,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) "background task queue monitor"))); backgroundTasksQueueBgwHandle = - StartCitusBackgroundTaskQueueMonitor(MyDatabaseId, - myDbData->userOid); + StartCitusBackgroundTaskQueueMonitor(MyDatabaseId, myDbData->userOid); if (!backgroundTasksQueueBgwHandle || GetBackgroundWorkerPid(backgroundTasksQueueBgwHandle, @@ -810,9 +823,11 @@ CitusMaintenanceDaemonMain(Datum main_arg) CHECK_FOR_INTERRUPTS(); /* check for changed configuration */ + if (myDbData->userOid != GetSessionUserId()) { /* return code of 1 requests worker restart */ + elog(LOG, "userOid != GetSessionUserId()"); proc_exit(1); } @@ -897,10 +912,10 @@ MaintenanceDaemonShmemInit(void) memset(&hashInfo, 0, sizeof(hashInfo)); - hashInfo.keysize = sizeof(Oid); + hashInfo.keysize = NAMEDATALEN; hashInfo.entrysize = sizeof(MaintenanceDaemonDBData); - hashInfo.hash = tag_hash; - int hashFlags = (HASH_ELEM | HASH_FUNCTION); + + int hashFlags = (HASH_ELEM | HASH_STRINGS); MaintenanceDaemonDBHash = ShmemInitHash("Maintenance Database Hash", max_worker_processes, max_worker_processes, @@ -921,12 +936,13 @@ MaintenanceDaemonShmemInit(void) static void MaintenanceDaemonShmemExit(int code, Datum arg) { - Oid databaseOid = DatumGetObjectId(arg); + + char* databaseName = NameStr(*DatumGetName(arg)); LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) - hash_search(MaintenanceDaemonDBHash, &databaseOid, + hash_search(MaintenanceDaemonDBHash, &databaseName, HASH_FIND, NULL); /* myDbData is NULL after StopMaintenanceDaemon */ @@ -988,8 +1004,8 @@ static void MaintenanceDaemonErrorContext(void *arg) { MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) arg; - errcontext("Citus maintenance daemon for database %u user %u", - myDbData->databaseOid, myDbData->userOid); + errcontext("Citus maintenance daemon for database %s user %u", + myDbData->databaseName, myDbData->userOid); } @@ -1034,11 +1050,15 @@ StopMaintenanceDaemon(Oid databaseId) bool found = false; pid_t workerPid = 0; + StartTransactionCommand(); + char* databaseName = get_database_name(databaseId); + CommitTransactionCommand(); + LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search( MaintenanceDaemonDBHash, - &databaseId, + &databaseName, HASH_REMOVE, &found); if (found) @@ -1064,11 +1084,12 @@ TriggerNodeMetadataSync(Oid databaseId) { bool found = false; + char* databaseName = get_database_name(databaseId); LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search( MaintenanceDaemonDBHash, - &databaseId, + &databaseName, HASH_FIND, &found); if (found) { diff --git a/src/include/distributed/maintenanced.h b/src/include/distributed/maintenanced.h index de1e68883..717cc9648 100644 --- a/src/include/distributed/maintenanced.h +++ b/src/include/distributed/maintenanced.h @@ -26,7 +26,7 @@ extern void TriggerNodeMetadataSync(Oid databaseId); extern void InitializeMaintenanceDaemon(void); extern size_t MaintenanceDaemonShmemSize(void); extern void MaintenanceDaemonShmemInit(void); -extern void InitializeMaintenanceDaemonBackend(void); +extern void InitializeMaintenanceDaemonBackend(const char* databaseName, Oid userOid); extern bool LockCitusExtension(void); extern PGDLLEXPORT void CitusMaintenanceDaemonMain(Datum main_arg);