mirror of https://github.com/citusdata/citus.git
Start Daemon in server startup
parent
d0b093c975
commit
87a5fcff41
|
@ -134,11 +134,54 @@ static void WarnMaintenanceDaemonNotStarted(void);
|
|||
void
|
||||
InitializeMaintenanceDaemon(void)
|
||||
{
|
||||
elog(LOG, "InitializeMaintenanceDaemon");
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = MaintenanceDaemonShmemInit;
|
||||
}
|
||||
|
||||
void
|
||||
InitializeMaintenanceDaemonForAdminDB(void)
|
||||
{
|
||||
elog(LOG, "InitializeMaintenanceDaemonForAdmin");
|
||||
|
||||
BackgroundWorker worker;
|
||||
BackgroundWorkerHandle *handle = NULL;
|
||||
|
||||
memset(&worker, 0, sizeof(worker));
|
||||
|
||||
SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name),
|
||||
"Citus Maintenance Daemon: %u/%u",
|
||||
0, 0);
|
||||
|
||||
/* request ability to connect to target database */
|
||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||
|
||||
/*
|
||||
* No point in getting started before able to run query, but we do
|
||||
* want to get started on Hot-Standby.
|
||||
*/
|
||||
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
||||
|
||||
/* Restart after a bit after errors, but don't bog the system. */
|
||||
worker.bgw_restart_time = 5;
|
||||
strcpy_s(worker.bgw_library_name,
|
||||
sizeof(worker.bgw_library_name), "citus");
|
||||
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
||||
"CitusMaintenanceDaemonMain");
|
||||
|
||||
worker.bgw_main_arg = (Datum)0;
|
||||
worker.bgw_notify_pid = MyProcPid;
|
||||
|
||||
if (!RegisterDynamicBackgroundWorker(&worker, &handle)) {
|
||||
elog(LOG, "RegisterDynamicBackgroundWorker failed for admin");
|
||||
}
|
||||
|
||||
pid_t pid;
|
||||
WaitForBackgroundWorkerStartup(handle, &pid);
|
||||
|
||||
pfree(handle);
|
||||
|
||||
}
|
||||
/*
|
||||
* InitializeMaintenanceDaemonBackend, called at backend start and
|
||||
* configuration changes, is responsible for starting a per-database
|
||||
|
@ -147,6 +190,7 @@ InitializeMaintenanceDaemon(void)
|
|||
void
|
||||
InitializeMaintenanceDaemonBackend(void)
|
||||
{
|
||||
elog(LOG, "InitializeMaintenanceDaemonBackend");
|
||||
Oid extensionOwner = CitusExtensionOwner();
|
||||
bool found;
|
||||
|
||||
|
@ -278,6 +322,7 @@ WarnMaintenanceDaemonNotStarted(void)
|
|||
void
|
||||
CitusMaintenanceDaemonMain(Datum main_arg)
|
||||
{
|
||||
elog(LOG, "CitusMaintenanceDaemonMain");
|
||||
Oid databaseOid = DatumGetObjectId(main_arg);
|
||||
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
|
||||
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
|
||||
|
@ -292,46 +337,79 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
|
||||
bool backgroundTasksQueueWarnedForLock = false;
|
||||
|
||||
|
||||
MaintenanceDaemonDBData *myDbData = NULL;
|
||||
|
||||
if (databaseOid == 0)
|
||||
{
|
||||
|
||||
/* TODO : Get the admin database name from GUC contro_db*/
|
||||
char* databaseName = "postgres";
|
||||
|
||||
BackgroundWorkerInitializeConnection(databaseName, NULL, 0);
|
||||
|
||||
// Now we have a valid MyDatabaseId.
|
||||
// Insert the daemon instance to the hash table.
|
||||
bool found;
|
||||
|
||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||
|
||||
myDbData = (MaintenanceDaemonDBData *) hash_search(
|
||||
MaintenanceDaemonDBHash,
|
||||
&MyDatabaseId,
|
||||
HASH_ENTER_NULL,
|
||||
&found);
|
||||
|
||||
if (!myDbData)
|
||||
{
|
||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!found)
|
||||
{
|
||||
/* ensure the values in MaintenanceDaemonDBData are zero */
|
||||
memset(((char *) myDbData) + sizeof(Oid), 0,
|
||||
sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
|
||||
|
||||
myDbData->userOid = 0;
|
||||
myDbData->workerPid = 0;
|
||||
myDbData->triggerNodeMetadataSync = false;
|
||||
}
|
||||
|
||||
before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(MyDatabaseId));
|
||||
databaseOid = MyDatabaseId;
|
||||
}
|
||||
else
|
||||
{
|
||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||
|
||||
myDbData = (MaintenanceDaemonDBData *)
|
||||
hash_search(MaintenanceDaemonDBHash, &databaseOid,
|
||||
HASH_FIND, NULL);
|
||||
if (!myDbData || myDbData->workerPid != 0)
|
||||
{
|
||||
/*
|
||||
* When the database crashes, background workers are restarted, but
|
||||
* the state in shared memory is lost. In that case, we exit and
|
||||
* wait for a session to call InitializeMaintenanceDaemonBackend
|
||||
* to properly add it to the hash.
|
||||
*/
|
||||
|
||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||
proc_exit(0);
|
||||
|
||||
}
|
||||
before_shmem_exit(MaintenanceDaemonShmemExit, main_arg);
|
||||
|
||||
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
|
||||
}
|
||||
/*
|
||||
* We do metadata sync in a separate background worker. We need its
|
||||
* handle to be able to check its status.
|
||||
*/
|
||||
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;
|
||||
|
||||
/*
|
||||
* Look up this worker's configuration.
|
||||
*/
|
||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||
|
||||
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
|
||||
hash_search(MaintenanceDaemonDBHash, &databaseOid,
|
||||
HASH_FIND, NULL);
|
||||
if (!myDbData)
|
||||
{
|
||||
/*
|
||||
* When the database crashes, background workers are restarted, but
|
||||
* the state in shared memory is lost. In that case, we exit and
|
||||
* wait for a session to call InitializeMaintenanceDaemonBackend
|
||||
* to properly add it to the hash.
|
||||
*/
|
||||
|
||||
proc_exit(0);
|
||||
}
|
||||
|
||||
if (myDbData->workerPid != 0)
|
||||
{
|
||||
/*
|
||||
* Another maintenance daemon is running. This usually happens because
|
||||
* postgres restarts the daemon after an non-zero exit, and
|
||||
* InitializeMaintenanceDaemonBackend started one before postgres did.
|
||||
* In that case, the first one stays and the last one exits.
|
||||
*/
|
||||
|
||||
proc_exit(0);
|
||||
}
|
||||
|
||||
before_shmem_exit(MaintenanceDaemonShmemExit, main_arg);
|
||||
|
||||
/*
|
||||
* Signal that I am the maintenance daemon now.
|
||||
*
|
||||
|
@ -373,9 +451,6 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
elog(LOG, "starting maintenance daemon on database %u user %u",
|
||||
databaseOid, myDbData->userOid);
|
||||
|
||||
/* connect to database, after that we can actually access catalogs */
|
||||
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
|
||||
|
||||
/* make worker recognizable in pg_stat_activity */
|
||||
pgstat_report_appname("Citus Maintenance Daemon");
|
||||
|
||||
|
@ -870,6 +945,7 @@ MaintenanceDaemonShmemSize(void)
|
|||
void
|
||||
MaintenanceDaemonShmemInit(void)
|
||||
{
|
||||
elog(LOG, "MaintenanceDaemonShmemInit");
|
||||
bool alreadyInitialized = false;
|
||||
HASHCTL hashInfo;
|
||||
|
||||
|
@ -921,6 +997,7 @@ MaintenanceDaemonShmemInit(void)
|
|||
static void
|
||||
MaintenanceDaemonShmemExit(int code, Datum arg)
|
||||
{
|
||||
elog(LOG, "MaintenanceDaemonShmemExit");
|
||||
Oid databaseOid = DatumGetObjectId(arg);
|
||||
|
||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||
|
@ -1031,6 +1108,7 @@ LockCitusExtension(void)
|
|||
void
|
||||
StopMaintenanceDaemon(Oid databaseId)
|
||||
{
|
||||
elog(LOG, "StopMaintenanceDaemon");
|
||||
bool found = false;
|
||||
pid_t workerPid = 0;
|
||||
|
||||
|
@ -1062,6 +1140,7 @@ StopMaintenanceDaemon(Oid databaseId)
|
|||
void
|
||||
TriggerNodeMetadataSync(Oid databaseId)
|
||||
{
|
||||
elog(LOG, "TriggerNodeMetadataSync");
|
||||
bool found = false;
|
||||
|
||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||
|
@ -1089,6 +1168,7 @@ TriggerNodeMetadataSync(Oid databaseId)
|
|||
static bool
|
||||
MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData)
|
||||
{
|
||||
elog(LOG, "MetadataSyncTriggeredCheckAndReset");
|
||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||
|
||||
bool metadataSyncTriggered = dbData->triggerNodeMetadataSync;
|
||||
|
|
|
@ -27,6 +27,7 @@ extern void InitializeMaintenanceDaemon(void);
|
|||
extern size_t MaintenanceDaemonShmemSize(void);
|
||||
extern void MaintenanceDaemonShmemInit(void);
|
||||
extern void InitializeMaintenanceDaemonBackend(void);
|
||||
extern void InitializeMaintenanceDaemonForAdminDB(void);
|
||||
extern bool LockCitusExtension(void);
|
||||
|
||||
extern PGDLLEXPORT void CitusMaintenanceDaemonMain(Datum main_arg);
|
||||
|
|
Loading…
Reference in New Issue