mirror of https://github.com/citusdata/citus.git
Refactor
parent
9f6e0b83af
commit
297ef2056e
|
@ -113,7 +113,7 @@ static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL;
|
|||
* activated.
|
||||
*/
|
||||
static HTAB *MaintenanceDaemonDBHash;
|
||||
|
||||
static ErrorContextCallback errorCallback = { 0 };
|
||||
static volatile sig_atomic_t got_SIGHUP = false;
|
||||
static volatile sig_atomic_t got_SIGTERM = false;
|
||||
|
||||
|
@ -339,50 +339,50 @@ WarnMaintenanceDaemonNotStarted(void)
|
|||
|
||||
|
||||
/*
|
||||
* CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll
|
||||
* be started by the background worker infrastructure. If it errors out,
|
||||
* it'll be restarted after a few seconds.
|
||||
* ConnectToDatabase connects to the database for the given databaseOid.
|
||||
* if databaseOid is 0, it connects to MainDb. It returns the hash entry
|
||||
* for the database. If there hash entry already has a valid workerPid,
|
||||
* it exits the process. If a hash entry cannot be created it exists the
|
||||
* process requesting a restart.
|
||||
*/
|
||||
void
|
||||
CitusMaintenanceDaemonMain(Datum main_arg)
|
||||
static MaintenanceDaemonDBData *
|
||||
ConnectToDatabase(Oid databaseOid)
|
||||
{
|
||||
Oid databaseOid = DatumGetObjectId(main_arg);
|
||||
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
|
||||
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
|
||||
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
|
||||
TimestampTz lastRecoveryTime = 0;
|
||||
TimestampTz lastShardCleanTime = 0;
|
||||
TimestampTz lastStatStatementsPurgeTime = 0;
|
||||
TimestampTz nextMetadataSyncTime = 0;
|
||||
|
||||
/* state kept for the background tasks queue monitor */
|
||||
TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
|
||||
BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
|
||||
bool backgroundTasksQueueWarnedForLock = false;
|
||||
|
||||
|
||||
MaintenanceDaemonDBData *myDbData = NULL;
|
||||
|
||||
|
||||
bool isMainDb = false;
|
||||
|
||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||
|
||||
|
||||
if (databaseOid == 0)
|
||||
{
|
||||
char *databaseName = MainDb;
|
||||
isMainDb = true;
|
||||
|
||||
/*
|
||||
* Since we cannot query databaseOid without initializing Postgres
|
||||
* first, connect to the database by name.
|
||||
*/
|
||||
BackgroundWorkerInitializeConnection(databaseName, NULL, 0);
|
||||
|
||||
/* Now we have a valid MyDatabaseId. */
|
||||
/* Insert the daemon instance to the hash table. */
|
||||
/*
|
||||
* Now we have a valid MyDatabaseId.
|
||||
* Insert the hash entry for the database to the Maintenance Deamon Hash.
|
||||
*/
|
||||
bool found = false;
|
||||
|
||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||
|
||||
myDbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId, &found);
|
||||
|
||||
if (!myDbData)
|
||||
{
|
||||
proc_exit(0);
|
||||
/*
|
||||
* If an entry cannot be created,
|
||||
* return code of 1 requests worker restart
|
||||
* Since BackgroundWorker for the MainDb is only registered
|
||||
* once during server startup, we need to retry.
|
||||
*/
|
||||
proc_exit(1);
|
||||
}
|
||||
|
||||
if (found && myDbData->workerPid != 0)
|
||||
|
@ -392,15 +392,12 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
proc_exit(0);
|
||||
}
|
||||
|
||||
before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(MyDatabaseId));
|
||||
|
||||
databaseOid = MyDatabaseId;
|
||||
myDbData->userOid = GetSessionUserId();
|
||||
isMainDb = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||
|
||||
myDbData = (MaintenanceDaemonDBData *)
|
||||
hash_search(MaintenanceDaemonDBHash, &databaseOid,
|
||||
HASH_FIND, NULL);
|
||||
|
@ -428,15 +425,9 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
|
||||
proc_exit(0);
|
||||
}
|
||||
|
||||
before_shmem_exit(MaintenanceDaemonShmemExit, main_arg);
|
||||
}
|
||||
|
||||
/*
|
||||
* We do metadata sync in a separate background worker. We need its
|
||||
* handle to be able to check its status.
|
||||
*/
|
||||
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;
|
||||
before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(databaseOid));
|
||||
|
||||
/*
|
||||
* Signal that I am the maintenance daemon now.
|
||||
|
@ -462,28 +453,56 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
|
||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||
|
||||
/*
|
||||
* Setup error context so log messages can be properly attributed. Some of
|
||||
* them otherwise sound like they might be from a normal user connection.
|
||||
* Do so before setting up signals etc, so we never exit without the
|
||||
* context setup.
|
||||
*/
|
||||
ErrorContextCallback errorCallback = { 0 };
|
||||
memset(&errorCallback, 0, sizeof(errorCallback));
|
||||
errorCallback.callback = MaintenanceDaemonErrorContext;
|
||||
errorCallback.arg = (void *) myDbData;
|
||||
errorCallback.previous = error_context_stack;
|
||||
error_context_stack = &errorCallback;
|
||||
|
||||
|
||||
elog(LOG, "starting maintenance daemon on database %u user %u",
|
||||
databaseOid, myDbData->userOid);
|
||||
|
||||
if (!isMainDb)
|
||||
{
|
||||
/* connect to database, after that we can actually access catalogs */
|
||||
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
|
||||
}
|
||||
|
||||
return myDbData;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll
|
||||
* be started by the background worker infrastructure. If it errors out,
|
||||
* it'll be restarted after a few seconds.
|
||||
*/
|
||||
void
|
||||
CitusMaintenanceDaemonMain(Datum main_arg)
|
||||
{
|
||||
Oid databaseOid = DatumGetObjectId(main_arg);
|
||||
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
|
||||
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
|
||||
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
|
||||
TimestampTz lastRecoveryTime = 0;
|
||||
TimestampTz lastShardCleanTime = 0;
|
||||
TimestampTz lastStatStatementsPurgeTime = 0;
|
||||
TimestampTz nextMetadataSyncTime = 0;
|
||||
|
||||
/* state kept for the background tasks queue monitor */
|
||||
TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
|
||||
BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
|
||||
bool backgroundTasksQueueWarnedForLock = false;
|
||||
|
||||
|
||||
/*
|
||||
* We do metadata sync in a separate background worker. We need its
|
||||
* handle to be able to check its status.
|
||||
*/
|
||||
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;
|
||||
|
||||
MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid);
|
||||
|
||||
/* make worker recognizable in pg_stat_activity */
|
||||
pgstat_report_appname("Citus Maintenance Daemon");
|
||||
|
||||
|
@ -491,7 +510,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
* Terminate orphaned metadata sync daemons spawned from previously terminated
|
||||
* or crashed maintenanced instances.
|
||||
*/
|
||||
SignalMetadataSyncDaemon(databaseOid, SIGTERM);
|
||||
SignalMetadataSyncDaemon(MyDatabaseId, SIGTERM);
|
||||
|
||||
/* enter main loop */
|
||||
while (!got_SIGTERM)
|
||||
|
|
Loading…
Reference in New Issue