mirror of https://github.com/citusdata/citus.git
Fix some bugs
parent
87a5fcff41
commit
0ba47298a7
|
|
@ -481,6 +481,7 @@ _PG_init(void)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
InitializeMaintenanceDaemon();
|
InitializeMaintenanceDaemon();
|
||||||
|
InitializeMaintenanceDaemonForAdminDB();
|
||||||
|
|
||||||
/* initialize coordinated transaction management */
|
/* initialize coordinated transaction management */
|
||||||
InitializeTransactionManagement();
|
InitializeTransactionManagement();
|
||||||
|
|
|
||||||
|
|
@ -134,54 +134,48 @@ static void WarnMaintenanceDaemonNotStarted(void);
|
||||||
void
|
void
|
||||||
InitializeMaintenanceDaemon(void)
|
InitializeMaintenanceDaemon(void)
|
||||||
{
|
{
|
||||||
elog(LOG, "InitializeMaintenanceDaemon");
|
|
||||||
prev_shmem_startup_hook = shmem_startup_hook;
|
prev_shmem_startup_hook = shmem_startup_hook;
|
||||||
shmem_startup_hook = MaintenanceDaemonShmemInit;
|
shmem_startup_hook = MaintenanceDaemonShmemInit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InitializeMaintenanceDaemonForAdminDB is called in _PG_Init
|
||||||
|
* at which stage we are not in a transaction or have databaseOid
|
||||||
|
*/
|
||||||
void
|
void
|
||||||
InitializeMaintenanceDaemonForAdminDB(void)
|
InitializeMaintenanceDaemonForAdminDB(void)
|
||||||
{
|
{
|
||||||
elog(LOG, "InitializeMaintenanceDaemonForAdmin");
|
|
||||||
|
|
||||||
BackgroundWorker worker;
|
BackgroundWorker worker;
|
||||||
BackgroundWorkerHandle *handle = NULL;
|
|
||||||
|
|
||||||
memset(&worker, 0, sizeof(worker));
|
memset(&worker, 0, sizeof(worker));
|
||||||
|
|
||||||
SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name),
|
SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name),
|
||||||
"Citus Maintenance Daemon: %u/%u",
|
"Citus Maintenance Daemon: %u/%u",
|
||||||
0, 0);
|
0, 0);
|
||||||
|
|
||||||
/* request ability to connect to target database */
|
/* request ability to connect to target database */
|
||||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* No point in getting started before able to run query, but we do
|
* No point in getting started before able to run query, but we do
|
||||||
* want to get started on Hot-Standby.
|
* want to get started on Hot-Standby.
|
||||||
*/
|
*/
|
||||||
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
||||||
|
|
||||||
/* Restart after a bit after errors, but don't bog the system. */
|
/* Restart after a bit after errors, but don't bog the system. */
|
||||||
worker.bgw_restart_time = 5;
|
worker.bgw_restart_time = 5;
|
||||||
strcpy_s(worker.bgw_library_name,
|
strcpy_s(worker.bgw_library_name,
|
||||||
sizeof(worker.bgw_library_name), "citus");
|
sizeof(worker.bgw_library_name), "citus");
|
||||||
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
||||||
"CitusMaintenanceDaemonMain");
|
"CitusMaintenanceDaemonMain");
|
||||||
|
|
||||||
worker.bgw_main_arg = (Datum)0;
|
worker.bgw_main_arg = (Datum) 0;
|
||||||
worker.bgw_notify_pid = MyProcPid;
|
|
||||||
|
|
||||||
if (!RegisterDynamicBackgroundWorker(&worker, &handle)) {
|
|
||||||
elog(LOG, "RegisterDynamicBackgroundWorker failed for admin");
|
|
||||||
}
|
|
||||||
|
|
||||||
pid_t pid;
|
|
||||||
WaitForBackgroundWorkerStartup(handle, &pid);
|
|
||||||
|
|
||||||
pfree(handle);
|
|
||||||
|
|
||||||
|
RegisterBackgroundWorker(&worker);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* InitializeMaintenanceDaemonBackend, called at backend start and
|
* InitializeMaintenanceDaemonBackend, called at backend start and
|
||||||
* configuration changes, is responsible for starting a per-database
|
* configuration changes, is responsible for starting a per-database
|
||||||
|
|
@ -190,7 +184,6 @@ InitializeMaintenanceDaemonForAdminDB(void)
|
||||||
void
|
void
|
||||||
InitializeMaintenanceDaemonBackend(void)
|
InitializeMaintenanceDaemonBackend(void)
|
||||||
{
|
{
|
||||||
elog(LOG, "InitializeMaintenanceDaemonBackend");
|
|
||||||
Oid extensionOwner = CitusExtensionOwner();
|
Oid extensionOwner = CitusExtensionOwner();
|
||||||
bool found;
|
bool found;
|
||||||
|
|
||||||
|
|
@ -322,7 +315,6 @@ WarnMaintenanceDaemonNotStarted(void)
|
||||||
void
|
void
|
||||||
CitusMaintenanceDaemonMain(Datum main_arg)
|
CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
{
|
{
|
||||||
elog(LOG, "CitusMaintenanceDaemonMain");
|
|
||||||
Oid databaseOid = DatumGetObjectId(main_arg);
|
Oid databaseOid = DatumGetObjectId(main_arg);
|
||||||
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
|
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
|
||||||
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
|
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
|
||||||
|
|
@ -342,68 +334,64 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
|
|
||||||
if (databaseOid == 0)
|
if (databaseOid == 0)
|
||||||
{
|
{
|
||||||
|
|
||||||
/* TODO : Get the admin database name from GUC contro_db*/
|
/* TODO : Get the admin database name from GUC contro_db*/
|
||||||
char* databaseName = "postgres";
|
char *databaseName = "postgres";
|
||||||
|
|
||||||
BackgroundWorkerInitializeConnection(databaseName, NULL, 0);
|
BackgroundWorkerInitializeConnection(databaseName, NULL, 0);
|
||||||
|
|
||||||
// Now we have a valid MyDatabaseId.
|
/* Now we have a valid MyDatabaseId. */
|
||||||
// Insert the daemon instance to the hash table.
|
/* Insert the daemon instance to the hash table. */
|
||||||
bool found;
|
bool found;
|
||||||
|
|
||||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
myDbData = (MaintenanceDaemonDBData *) hash_search(
|
myDbData = (MaintenanceDaemonDBData *) hash_search(
|
||||||
MaintenanceDaemonDBHash,
|
MaintenanceDaemonDBHash,
|
||||||
&MyDatabaseId,
|
&MyDatabaseId,
|
||||||
HASH_ENTER_NULL,
|
HASH_ENTER_NULL,
|
||||||
&found);
|
&found);
|
||||||
|
|
||||||
if (!myDbData)
|
if (!myDbData)
|
||||||
{
|
{
|
||||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!found)
|
if (!found)
|
||||||
{
|
{
|
||||||
/* ensure the values in MaintenanceDaemonDBData are zero */
|
/* ensure the values in MaintenanceDaemonDBData are zero */
|
||||||
memset(((char *) myDbData) + sizeof(Oid), 0,
|
memset(((char *) myDbData) + sizeof(Oid), 0,
|
||||||
sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
|
sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
|
||||||
|
}
|
||||||
myDbData->userOid = 0;
|
|
||||||
myDbData->workerPid = 0;
|
|
||||||
myDbData->triggerNodeMetadataSync = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(MyDatabaseId));
|
before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(MyDatabaseId));
|
||||||
databaseOid = MyDatabaseId;
|
databaseOid = MyDatabaseId;
|
||||||
|
myDbData->userOid = GetSessionUserId();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
myDbData = (MaintenanceDaemonDBData *)
|
myDbData = (MaintenanceDaemonDBData *)
|
||||||
hash_search(MaintenanceDaemonDBHash, &databaseOid,
|
hash_search(MaintenanceDaemonDBHash, &databaseOid,
|
||||||
HASH_FIND, NULL);
|
HASH_FIND, NULL);
|
||||||
if (!myDbData || myDbData->workerPid != 0)
|
if (!myDbData || myDbData->workerPid != 0)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* When the database crashes, background workers are restarted, but
|
* When the database crashes, background workers are restarted, but
|
||||||
* the state in shared memory is lost. In that case, we exit and
|
* the state in shared memory is lost. In that case, we exit and
|
||||||
* wait for a session to call InitializeMaintenanceDaemonBackend
|
* wait for a session to call InitializeMaintenanceDaemonBackend
|
||||||
* to properly add it to the hash.
|
* to properly add it to the hash.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
|
||||||
proc_exit(0);
|
|
||||||
|
|
||||||
|
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||||
|
proc_exit(0);
|
||||||
}
|
}
|
||||||
before_shmem_exit(MaintenanceDaemonShmemExit, main_arg);
|
before_shmem_exit(MaintenanceDaemonShmemExit, main_arg);
|
||||||
|
|
||||||
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
|
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We do metadata sync in a separate background worker. We need its
|
* We do metadata sync in a separate background worker. We need its
|
||||||
* handle to be able to check its status.
|
* handle to be able to check its status.
|
||||||
|
|
@ -945,7 +933,6 @@ MaintenanceDaemonShmemSize(void)
|
||||||
void
|
void
|
||||||
MaintenanceDaemonShmemInit(void)
|
MaintenanceDaemonShmemInit(void)
|
||||||
{
|
{
|
||||||
elog(LOG, "MaintenanceDaemonShmemInit");
|
|
||||||
bool alreadyInitialized = false;
|
bool alreadyInitialized = false;
|
||||||
HASHCTL hashInfo;
|
HASHCTL hashInfo;
|
||||||
|
|
||||||
|
|
@ -997,7 +984,6 @@ MaintenanceDaemonShmemInit(void)
|
||||||
static void
|
static void
|
||||||
MaintenanceDaemonShmemExit(int code, Datum arg)
|
MaintenanceDaemonShmemExit(int code, Datum arg)
|
||||||
{
|
{
|
||||||
elog(LOG, "MaintenanceDaemonShmemExit");
|
|
||||||
Oid databaseOid = DatumGetObjectId(arg);
|
Oid databaseOid = DatumGetObjectId(arg);
|
||||||
|
|
||||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||||
|
|
@ -1108,7 +1094,6 @@ LockCitusExtension(void)
|
||||||
void
|
void
|
||||||
StopMaintenanceDaemon(Oid databaseId)
|
StopMaintenanceDaemon(Oid databaseId)
|
||||||
{
|
{
|
||||||
elog(LOG, "StopMaintenanceDaemon");
|
|
||||||
bool found = false;
|
bool found = false;
|
||||||
pid_t workerPid = 0;
|
pid_t workerPid = 0;
|
||||||
|
|
||||||
|
|
@ -1140,7 +1125,6 @@ StopMaintenanceDaemon(Oid databaseId)
|
||||||
void
|
void
|
||||||
TriggerNodeMetadataSync(Oid databaseId)
|
TriggerNodeMetadataSync(Oid databaseId)
|
||||||
{
|
{
|
||||||
elog(LOG, "TriggerNodeMetadataSync");
|
|
||||||
bool found = false;
|
bool found = false;
|
||||||
|
|
||||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||||
|
|
@ -1168,7 +1152,6 @@ TriggerNodeMetadataSync(Oid databaseId)
|
||||||
static bool
|
static bool
|
||||||
MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData)
|
MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData)
|
||||||
{
|
{
|
||||||
elog(LOG, "MetadataSyncTriggeredCheckAndReset");
|
|
||||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
bool metadataSyncTriggered = dbData->triggerNodeMetadataSync;
|
bool metadataSyncTriggered = dbData->triggerNodeMetadataSync;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue