diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 5f49de20a..27beccef3 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -134,11 +134,54 @@ static void WarnMaintenanceDaemonNotStarted(void); void InitializeMaintenanceDaemon(void) { + elog(LOG, "InitializeMaintenanceDaemon"); prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = MaintenanceDaemonShmemInit; } +void +InitializeMaintenanceDaemonForAdminDB(void) +{ + elog(LOG, "InitializeMaintenanceDaemonForAdmin"); + + BackgroundWorker worker; + BackgroundWorkerHandle *handle = NULL; + memset(&worker, 0, sizeof(worker)); + + SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name), + "Citus Maintenance Daemon: %u/%u", + 0, 0); + + /* request ability to connect to target database */ + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + + /* + * No point in getting started before able to run query, but we do + * want to get started on Hot-Standby. + */ + worker.bgw_start_time = BgWorkerStart_ConsistentState; + + /* Restart after a bit after errors, but don't bog the system. */ + worker.bgw_restart_time = 5; + strcpy_s(worker.bgw_library_name, + sizeof(worker.bgw_library_name), "citus"); + strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), + "CitusMaintenanceDaemonMain"); + + worker.bgw_main_arg = (Datum)0; + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) { + elog(LOG, "RegisterDynamicBackgroundWorker failed for admin"); + } + + pid_t pid; + WaitForBackgroundWorkerStartup(handle, &pid); + + pfree(handle); + +} /* * InitializeMaintenanceDaemonBackend, called at backend start and * configuration changes, is responsible for starting a per-database @@ -147,6 +190,7 @@ InitializeMaintenanceDaemon(void) void InitializeMaintenanceDaemonBackend(void) { + elog(LOG, "InitializeMaintenanceDaemonBackend"); Oid extensionOwner = CitusExtensionOwner(); bool found; @@ -278,6 +322,7 @@ WarnMaintenanceDaemonNotStarted(void) void CitusMaintenanceDaemonMain(Datum main_arg) { + elog(LOG, "CitusMaintenanceDaemonMain"); Oid databaseOid = DatumGetObjectId(main_arg); TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000); @@ -292,46 +337,79 @@ CitusMaintenanceDaemonMain(Datum main_arg) BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL; bool backgroundTasksQueueWarnedForLock = false; + + MaintenanceDaemonDBData *myDbData = NULL; + + if (databaseOid == 0) + { + + /* TODO : Get the admin database name from GUC contro_db*/ + char* databaseName = "postgres"; + + BackgroundWorkerInitializeConnection(databaseName, NULL, 0); + + // Now we have a valid MyDatabaseId. + // Insert the daemon instance to the hash table. + bool found; + + LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); + + myDbData = (MaintenanceDaemonDBData *) hash_search( + MaintenanceDaemonDBHash, + &MyDatabaseId, + HASH_ENTER_NULL, + &found); + + if (!myDbData) + { + LWLockRelease(&MaintenanceDaemonControl->lock); + return; + } + + if (!found) + { + /* ensure the values in MaintenanceDaemonDBData are zero */ + memset(((char *) myDbData) + sizeof(Oid), 0, + sizeof(MaintenanceDaemonDBData) - sizeof(Oid)); + + myDbData->userOid = 0; + myDbData->workerPid = 0; + myDbData->triggerNodeMetadataSync = false; + } + + before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(MyDatabaseId)); + databaseOid = MyDatabaseId; + } + else + { + LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); + + myDbData = (MaintenanceDaemonDBData *) + hash_search(MaintenanceDaemonDBHash, &databaseOid, + HASH_FIND, NULL); + if (!myDbData || myDbData->workerPid != 0) + { + /* + * When the database crashes, background workers are restarted, but + * the state in shared memory is lost. In that case, we exit and + * wait for a session to call InitializeMaintenanceDaemonBackend + * to properly add it to the hash. + */ + + LWLockRelease(&MaintenanceDaemonControl->lock); + proc_exit(0); + + } + before_shmem_exit(MaintenanceDaemonShmemExit, main_arg); + + BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0); + } /* * We do metadata sync in a separate background worker. We need its * handle to be able to check its status. */ BackgroundWorkerHandle *metadataSyncBgwHandle = NULL; - /* - * Look up this worker's configuration. - */ - LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); - - MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) - hash_search(MaintenanceDaemonDBHash, &databaseOid, - HASH_FIND, NULL); - if (!myDbData) - { - /* - * When the database crashes, background workers are restarted, but - * the state in shared memory is lost. In that case, we exit and - * wait for a session to call InitializeMaintenanceDaemonBackend - * to properly add it to the hash. - */ - - proc_exit(0); - } - - if (myDbData->workerPid != 0) - { - /* - * Another maintenance daemon is running. This usually happens because - * postgres restarts the daemon after an non-zero exit, and - * InitializeMaintenanceDaemonBackend started one before postgres did. - * In that case, the first one stays and the last one exits. - */ - - proc_exit(0); - } - - before_shmem_exit(MaintenanceDaemonShmemExit, main_arg); - /* * Signal that I am the maintenance daemon now. * @@ -373,9 +451,6 @@ CitusMaintenanceDaemonMain(Datum main_arg) elog(LOG, "starting maintenance daemon on database %u user %u", databaseOid, myDbData->userOid); - /* connect to database, after that we can actually access catalogs */ - BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0); - /* make worker recognizable in pg_stat_activity */ pgstat_report_appname("Citus Maintenance Daemon"); @@ -870,6 +945,7 @@ MaintenanceDaemonShmemSize(void) void MaintenanceDaemonShmemInit(void) { + elog(LOG, "MaintenanceDaemonShmemInit"); bool alreadyInitialized = false; HASHCTL hashInfo; @@ -921,6 +997,7 @@ MaintenanceDaemonShmemInit(void) static void MaintenanceDaemonShmemExit(int code, Datum arg) { + elog(LOG, "MaintenanceDaemonShmemExit"); Oid databaseOid = DatumGetObjectId(arg); LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); @@ -1031,6 +1108,7 @@ LockCitusExtension(void) void StopMaintenanceDaemon(Oid databaseId) { + elog(LOG, "StopMaintenanceDaemon"); bool found = false; pid_t workerPid = 0; @@ -1062,6 +1140,7 @@ StopMaintenanceDaemon(Oid databaseId) void TriggerNodeMetadataSync(Oid databaseId) { + elog(LOG, "TriggerNodeMetadataSync"); bool found = false; LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); @@ -1089,6 +1168,7 @@ TriggerNodeMetadataSync(Oid databaseId) static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData) { + elog(LOG, "MetadataSyncTriggeredCheckAndReset"); LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); bool metadataSyncTriggered = dbData->triggerNodeMetadataSync; diff --git a/src/include/distributed/maintenanced.h b/src/include/distributed/maintenanced.h index de1e68883..e99ca377d 100644 --- a/src/include/distributed/maintenanced.h +++ b/src/include/distributed/maintenanced.h @@ -27,6 +27,7 @@ extern void InitializeMaintenanceDaemon(void); extern size_t MaintenanceDaemonShmemSize(void); extern void MaintenanceDaemonShmemInit(void); extern void InitializeMaintenanceDaemonBackend(void); +extern void InitializeMaintenanceDaemonForAdminDB(void); extern bool LockCitusExtension(void); extern PGDLLEXPORT void CitusMaintenanceDaemonMain(Datum main_arg);