mirror of https://github.com/citusdata/citus.git
Some tests with changing hash key
parent
0dca65c84d
commit
b4584cfcd7
|
@ -27,6 +27,7 @@
|
||||||
#include "catalog/objectaccess.h"
|
#include "catalog/objectaccess.h"
|
||||||
#include "catalog/pg_extension.h"
|
#include "catalog/pg_extension.h"
|
||||||
#include "citus_version.h"
|
#include "citus_version.h"
|
||||||
|
#include "commands/dbcommands.h"
|
||||||
#include "commands/explain.h"
|
#include "commands/explain.h"
|
||||||
#include "commands/extension.h"
|
#include "commands/extension.h"
|
||||||
#include "common/string.h"
|
#include "common/string.h"
|
||||||
|
@ -767,7 +768,9 @@ IsSequenceOverflowError(ErrorData *edata)
|
||||||
void
|
void
|
||||||
StartupCitusBackend(void)
|
StartupCitusBackend(void)
|
||||||
{
|
{
|
||||||
InitializeMaintenanceDaemonBackend();
|
Oid superUser = CitusExtensionOwner();
|
||||||
|
|
||||||
|
InitializeMaintenanceDaemonBackend(get_database_name(MyDatabaseId), superUser);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* For query backends this will be a no-op, because InitializeBackendData
|
* For query backends this will be a no-op, because InitializeBackendData
|
||||||
|
|
|
@ -30,6 +30,7 @@
|
||||||
#include "catalog/pg_authid.h"
|
#include "catalog/pg_authid.h"
|
||||||
#include "catalog/pg_namespace.h"
|
#include "catalog/pg_namespace.h"
|
||||||
#include "commands/async.h"
|
#include "commands/async.h"
|
||||||
|
#include "commands/dbcommands.h"
|
||||||
#include "commands/extension.h"
|
#include "commands/extension.h"
|
||||||
#include "libpq/pqsignal.h"
|
#include "libpq/pqsignal.h"
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
|
@ -83,9 +84,10 @@ typedef struct MaintenanceDaemonControlData
|
||||||
typedef struct MaintenanceDaemonDBData
|
typedef struct MaintenanceDaemonDBData
|
||||||
{
|
{
|
||||||
/* hash key: database to run on */
|
/* hash key: database to run on */
|
||||||
Oid databaseOid;
|
char databaseName[NAMEDATALEN];
|
||||||
|
|
||||||
/* information: which user to use */
|
/* information: which user to use */
|
||||||
|
NameData dbname;
|
||||||
Oid userOid;
|
Oid userOid;
|
||||||
pid_t workerPid;
|
pid_t workerPid;
|
||||||
bool daemonStarted;
|
bool daemonStarted;
|
||||||
|
@ -145,32 +147,32 @@ InitializeMaintenanceDaemon(void)
|
||||||
* maintenance worker if necessary.
|
* maintenance worker if necessary.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
InitializeMaintenanceDaemonBackend(void)
|
InitializeMaintenanceDaemonBackend(const char* databaseName, Oid userOid)
|
||||||
{
|
{
|
||||||
Oid extensionOwner = CitusExtensionOwner();
|
|
||||||
bool found;
|
bool found;
|
||||||
|
|
||||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
|
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
|
||||||
MaintenanceDaemonDBHash,
|
MaintenanceDaemonDBHash,
|
||||||
&MyDatabaseId,
|
databaseName,
|
||||||
HASH_ENTER_NULL,
|
HASH_ENTER,
|
||||||
&found);
|
&found);
|
||||||
|
|
||||||
if (dbData == NULL)
|
if (dbData == NULL)
|
||||||
{
|
{
|
||||||
WarnMaintenanceDaemonNotStarted();
|
WarnMaintenanceDaemonNotStarted();
|
||||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||||
|
elog(LOG, "dbData is NULL");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!found)
|
if (!found)
|
||||||
{
|
{
|
||||||
/* ensure the values in MaintenanceDaemonDBData are zero */
|
/* ensure the values in MaintenanceDaemonDBData are zero */
|
||||||
memset(((char *) dbData) + sizeof(Oid), 0,
|
memset(((char *) dbData) + sizeof(char)*NAMEDATALEN, 0,
|
||||||
sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
|
sizeof(MaintenanceDaemonDBData) - sizeof(char)*NAMEDATALEN);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsMaintenanceDaemon)
|
if (IsMaintenanceDaemon)
|
||||||
|
@ -194,8 +196,8 @@ InitializeMaintenanceDaemonBackend(void)
|
||||||
memset(&worker, 0, sizeof(worker));
|
memset(&worker, 0, sizeof(worker));
|
||||||
|
|
||||||
SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name),
|
SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name),
|
||||||
"Citus Maintenance Daemon: %u/%u",
|
"Citus Maintenance Daemon: %s/%u",
|
||||||
MyDatabaseId, extensionOwner);
|
databaseName, userOid);
|
||||||
|
|
||||||
/* request ability to connect to target database */
|
/* request ability to connect to target database */
|
||||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||||
|
@ -213,8 +215,11 @@ InitializeMaintenanceDaemonBackend(void)
|
||||||
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
||||||
"CitusMaintenanceDaemonMain");
|
"CitusMaintenanceDaemonMain");
|
||||||
|
|
||||||
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
|
namestrcpy(&(dbData->dbname), databaseName);
|
||||||
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
|
worker.bgw_main_arg = NameGetDatum(&(dbData->dbname));
|
||||||
|
elog(LOG, "worker.bgw_main_arg databaseName = %s", databaseName);
|
||||||
|
|
||||||
|
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &userOid,
|
||||||
sizeof(Oid));
|
sizeof(Oid));
|
||||||
worker.bgw_notify_pid = MyProcPid;
|
worker.bgw_notify_pid = MyProcPid;
|
||||||
|
|
||||||
|
@ -227,8 +232,10 @@ InitializeMaintenanceDaemonBackend(void)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
elog(LOG, "RegisterDynamicBackgroundWorker");
|
||||||
|
|
||||||
dbData->daemonStarted = true;
|
dbData->daemonStarted = true;
|
||||||
dbData->userOid = extensionOwner;
|
dbData->userOid = userOid;
|
||||||
dbData->workerPid = 0;
|
dbData->workerPid = 0;
|
||||||
dbData->triggerNodeMetadataSync = false;
|
dbData->triggerNodeMetadataSync = false;
|
||||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||||
|
@ -246,9 +253,9 @@ InitializeMaintenanceDaemonBackend(void)
|
||||||
* If owner of extension changed, wake up daemon. It'll notice and
|
* If owner of extension changed, wake up daemon. It'll notice and
|
||||||
* restart.
|
* restart.
|
||||||
*/
|
*/
|
||||||
if (dbData->userOid != extensionOwner)
|
if (dbData->userOid != userOid)
|
||||||
{
|
{
|
||||||
dbData->userOid = extensionOwner;
|
dbData->userOid = userOid;
|
||||||
if (dbData->latch)
|
if (dbData->latch)
|
||||||
{
|
{
|
||||||
SetLatch(dbData->latch);
|
SetLatch(dbData->latch);
|
||||||
|
@ -278,7 +285,9 @@ WarnMaintenanceDaemonNotStarted(void)
|
||||||
void
|
void
|
||||||
CitusMaintenanceDaemonMain(Datum main_arg)
|
CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
{
|
{
|
||||||
Oid databaseOid = DatumGetObjectId(main_arg);
|
char* databaseName = NameStr(*DatumGetName(main_arg));
|
||||||
|
|
||||||
|
elog(LOG, "databaseName = %s", databaseName);
|
||||||
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
|
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
|
||||||
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
|
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
|
||||||
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
|
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
|
||||||
|
@ -304,7 +313,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
|
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
|
||||||
hash_search(MaintenanceDaemonDBHash, &databaseOid,
|
hash_search(MaintenanceDaemonDBHash, databaseName,
|
||||||
HASH_FIND, NULL);
|
HASH_FIND, NULL);
|
||||||
if (!myDbData)
|
if (!myDbData)
|
||||||
{
|
{
|
||||||
|
@ -315,6 +324,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
* to properly add it to the hash.
|
* to properly add it to the hash.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
elog(LOG, "!myDbData");
|
||||||
proc_exit(0);
|
proc_exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -327,6 +337,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
* In that case, the first one stays and the last one exits.
|
* In that case, the first one stays and the last one exits.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
elog(LOG, "Another maintenance daemon is running.");
|
||||||
proc_exit(0);
|
proc_exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -370,11 +381,14 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
error_context_stack = &errorCallback;
|
error_context_stack = &errorCallback;
|
||||||
|
|
||||||
|
|
||||||
elog(LOG, "starting maintenance daemon on database %u user %u",
|
elog(LOG, "starting maintenance daemon on database %s user %u",
|
||||||
databaseOid, myDbData->userOid);
|
databaseName, myDbData->userOid);
|
||||||
|
|
||||||
/* connect to database, after that we can actually access catalogs */
|
/* connect to database, after that we can actually access catalogs */
|
||||||
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
|
BackgroundWorkerInitializeConnection(databaseName, NULL, BGWORKER_BYPASS_ALLOWCONN);
|
||||||
|
|
||||||
|
elog(LOG, "BackgroundWorkerInitializeConnection %s DBId %u userid %u",
|
||||||
|
databaseName, MyDatabaseId, GetAuthenticatedUserId());
|
||||||
|
|
||||||
/* make worker recognizable in pg_stat_activity */
|
/* make worker recognizable in pg_stat_activity */
|
||||||
pgstat_report_appname("Citus Maintenance Daemon");
|
pgstat_report_appname("Citus Maintenance Daemon");
|
||||||
|
@ -383,7 +397,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
* Terminate orphaned metadata sync daemons spawned from previously terminated
|
* Terminate orphaned metadata sync daemons spawned from previously terminated
|
||||||
* or crashed maintenanced instances.
|
* or crashed maintenanced instances.
|
||||||
*/
|
*/
|
||||||
SignalMetadataSyncDaemon(databaseOid, SIGTERM);
|
SignalMetadataSyncDaemon(MyDatabaseId, SIGTERM);
|
||||||
|
|
||||||
/* enter main loop */
|
/* enter main loop */
|
||||||
while (!got_SIGTERM)
|
while (!got_SIGTERM)
|
||||||
|
@ -773,8 +787,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
"background task queue monitor")));
|
"background task queue monitor")));
|
||||||
|
|
||||||
backgroundTasksQueueBgwHandle =
|
backgroundTasksQueueBgwHandle =
|
||||||
StartCitusBackgroundTaskQueueMonitor(MyDatabaseId,
|
StartCitusBackgroundTaskQueueMonitor(MyDatabaseId, myDbData->userOid);
|
||||||
myDbData->userOid);
|
|
||||||
|
|
||||||
if (!backgroundTasksQueueBgwHandle ||
|
if (!backgroundTasksQueueBgwHandle ||
|
||||||
GetBackgroundWorkerPid(backgroundTasksQueueBgwHandle,
|
GetBackgroundWorkerPid(backgroundTasksQueueBgwHandle,
|
||||||
|
@ -810,9 +823,11 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
CHECK_FOR_INTERRUPTS();
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
/* check for changed configuration */
|
/* check for changed configuration */
|
||||||
|
|
||||||
if (myDbData->userOid != GetSessionUserId())
|
if (myDbData->userOid != GetSessionUserId())
|
||||||
{
|
{
|
||||||
/* return code of 1 requests worker restart */
|
/* return code of 1 requests worker restart */
|
||||||
|
elog(LOG, "userOid != GetSessionUserId()");
|
||||||
proc_exit(1);
|
proc_exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -897,10 +912,10 @@ MaintenanceDaemonShmemInit(void)
|
||||||
|
|
||||||
|
|
||||||
memset(&hashInfo, 0, sizeof(hashInfo));
|
memset(&hashInfo, 0, sizeof(hashInfo));
|
||||||
hashInfo.keysize = sizeof(Oid);
|
hashInfo.keysize = NAMEDATALEN;
|
||||||
hashInfo.entrysize = sizeof(MaintenanceDaemonDBData);
|
hashInfo.entrysize = sizeof(MaintenanceDaemonDBData);
|
||||||
hashInfo.hash = tag_hash;
|
|
||||||
int hashFlags = (HASH_ELEM | HASH_FUNCTION);
|
int hashFlags = (HASH_ELEM | HASH_STRINGS);
|
||||||
|
|
||||||
MaintenanceDaemonDBHash = ShmemInitHash("Maintenance Database Hash",
|
MaintenanceDaemonDBHash = ShmemInitHash("Maintenance Database Hash",
|
||||||
max_worker_processes, max_worker_processes,
|
max_worker_processes, max_worker_processes,
|
||||||
|
@ -921,12 +936,13 @@ MaintenanceDaemonShmemInit(void)
|
||||||
static void
|
static void
|
||||||
MaintenanceDaemonShmemExit(int code, Datum arg)
|
MaintenanceDaemonShmemExit(int code, Datum arg)
|
||||||
{
|
{
|
||||||
Oid databaseOid = DatumGetObjectId(arg);
|
|
||||||
|
char* databaseName = NameStr(*DatumGetName(arg));
|
||||||
|
|
||||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
|
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
|
||||||
hash_search(MaintenanceDaemonDBHash, &databaseOid,
|
hash_search(MaintenanceDaemonDBHash, &databaseName,
|
||||||
HASH_FIND, NULL);
|
HASH_FIND, NULL);
|
||||||
|
|
||||||
/* myDbData is NULL after StopMaintenanceDaemon */
|
/* myDbData is NULL after StopMaintenanceDaemon */
|
||||||
|
@ -988,8 +1004,8 @@ static void
|
||||||
MaintenanceDaemonErrorContext(void *arg)
|
MaintenanceDaemonErrorContext(void *arg)
|
||||||
{
|
{
|
||||||
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) arg;
|
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) arg;
|
||||||
errcontext("Citus maintenance daemon for database %u user %u",
|
errcontext("Citus maintenance daemon for database %s user %u",
|
||||||
myDbData->databaseOid, myDbData->userOid);
|
myDbData->databaseName, myDbData->userOid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1034,11 +1050,15 @@ StopMaintenanceDaemon(Oid databaseId)
|
||||||
bool found = false;
|
bool found = false;
|
||||||
pid_t workerPid = 0;
|
pid_t workerPid = 0;
|
||||||
|
|
||||||
|
StartTransactionCommand();
|
||||||
|
char* databaseName = get_database_name(databaseId);
|
||||||
|
CommitTransactionCommand();
|
||||||
|
|
||||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
|
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
|
||||||
MaintenanceDaemonDBHash,
|
MaintenanceDaemonDBHash,
|
||||||
&databaseId,
|
&databaseName,
|
||||||
HASH_REMOVE, &found);
|
HASH_REMOVE, &found);
|
||||||
|
|
||||||
if (found)
|
if (found)
|
||||||
|
@ -1064,11 +1084,12 @@ TriggerNodeMetadataSync(Oid databaseId)
|
||||||
{
|
{
|
||||||
bool found = false;
|
bool found = false;
|
||||||
|
|
||||||
|
char* databaseName = get_database_name(databaseId);
|
||||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
|
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
|
||||||
MaintenanceDaemonDBHash,
|
MaintenanceDaemonDBHash,
|
||||||
&databaseId,
|
&databaseName,
|
||||||
HASH_FIND, &found);
|
HASH_FIND, &found);
|
||||||
if (found)
|
if (found)
|
||||||
{
|
{
|
||||||
|
|
|
@ -26,7 +26,7 @@ extern void TriggerNodeMetadataSync(Oid databaseId);
|
||||||
extern void InitializeMaintenanceDaemon(void);
|
extern void InitializeMaintenanceDaemon(void);
|
||||||
extern size_t MaintenanceDaemonShmemSize(void);
|
extern size_t MaintenanceDaemonShmemSize(void);
|
||||||
extern void MaintenanceDaemonShmemInit(void);
|
extern void MaintenanceDaemonShmemInit(void);
|
||||||
extern void InitializeMaintenanceDaemonBackend(void);
|
extern void InitializeMaintenanceDaemonBackend(const char* databaseName, Oid userOid);
|
||||||
extern bool LockCitusExtension(void);
|
extern bool LockCitusExtension(void);
|
||||||
|
|
||||||
extern PGDLLEXPORT void CitusMaintenanceDaemonMain(Datum main_arg);
|
extern PGDLLEXPORT void CitusMaintenanceDaemonMain(Datum main_arg);
|
||||||
|
|
Loading…
Reference in New Issue