From c3b7c5dc336f2c83b840d2a50479c3e6858e8fbf Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sun, 11 Jun 2017 18:24:12 -0700 Subject: [PATCH] Introduce per-database maintenance process. This will be used for deadlock detection, prepared transaction recovery amongst others, but currently is just idling around. --- src/backend/distributed/shared_library_init.c | 4 + src/backend/distributed/utils/maintenanced.c | 412 ++++++++++++++++++ src/include/distributed/maintenanced.h | 20 + 3 files changed, 436 insertions(+) create mode 100644 src/backend/distributed/utils/maintenanced.c create mode 100644 src/include/distributed/maintenanced.h diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index f6a03702b..0c09eb6f6 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -22,6 +22,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/connection_management.h" #include "distributed/connection_management.h" +#include "distributed/maintenanced.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/multi_copy.h" @@ -161,6 +162,8 @@ _PG_init(void) set_rel_pathlist_hook = multi_relation_restriction_hook; set_join_pathlist_hook = multi_join_restriction_hook; + InitializeMaintenanceDaemon(); + /* organize that task tracker is started once server is up */ TaskTrackerRegister(); @@ -189,6 +192,7 @@ _PG_init(void) void StartupCitusBackend(void) { + InitializeMaintenanceDaemonBackend(); } diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c new file mode 100644 index 000000000..bf90c6df7 --- /dev/null +++ b/src/backend/distributed/utils/maintenanced.c @@ -0,0 +1,412 @@ +/*------------------------------------------------------------------------- + * + * maintenanced.c + * Background worker run for each citus using database in a postgres + * cluster. + * + * This file provides infrastructure for launching exactly one a background + * worker for every database in which citus is used. That background worker + * can then perform work like deadlock detection, prepared transaction + * recovery, and cleanup. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + + +#include "miscadmin.h" +#include "pgstat.h" + +#include "access/xact.h" +#include "libpq/pqsignal.h" +#include "distributed/maintenanced.h" +#include "distributed/metadata_cache.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "storage/latch.h" +#include "storage/lwlock.h" +#include "tcop/tcopprot.h" + + +/* + * Shared memory data for all maintenance workers. + */ +typedef struct MaintenanceDaemonControlData +{ + /* + * Lock protecting the shared memory state. This is to be taken when + * looking up (shared mode) or inserting (exclusive mode) per-database + * data in dbHash. + */ + int trancheId; + LWLockTranche lockTranche; + LWLock lock; + + /* + * Hash-table of workers, one entry for each database with citus + * activated. + */ + HTAB *dbHash; +} MaintenanceDaemonControlData; + + +/* + * Per database worker state. + */ +typedef struct MaintenanceDaemonDBData +{ + /* hash key: database to run on */ + Oid databaseOid; + + /* information: which user to use */ + Oid userOid; + bool daemonStarted; + Latch *latch; /* pointer to the background worker's latch */ +} MaintenanceDaemonDBData; + + +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; +static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL; + +static volatile sig_atomic_t got_SIGHUP = false; + +static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS); +static size_t MaintenanceDaemonShmemSize(void); +static void MaintenanceDaemonShmemInit(void); +static void MaintenanceDaemonErrorContext(void *arg); + +/* + * InitializeMaintenanceDaemon, called at server start, is responsible for + * requesting shared memory and related infrastructure required by maintenance + * daemons. + */ +void +InitializeMaintenanceDaemon(void) +{ + RequestAddinShmemSpace(MaintenanceDaemonShmemSize()); + + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = MaintenanceDaemonShmemInit; +} + + +/* + * InitializeMaintenanceDaemonBackend, called at backend start and + * configuration changes, is responsible for starting a per-database + * maintenance worker if necessary. + */ +void +InitializeMaintenanceDaemonBackend(void) +{ + MaintenanceDaemonDBData *dbData = NULL; + Oid extensionOwner = CitusExtensionOwner(); + bool found; + + LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); + + dbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonControl->dbHash, + &MyDatabaseId, + HASH_ENTER_NULL, &found); + + if (dbData == NULL) + { + /* FIXME: better message, reference relevant guc in hint */ + ereport(ERROR, (errmsg("ran out of database slots"))); + } + + if (!found || !dbData->daemonStarted) + { + BackgroundWorker worker; + BackgroundWorkerHandle *handle = NULL; + int pid = 0; + + dbData->userOid = extensionOwner; + + memset(&worker, 0, sizeof(worker)); + + snprintf(worker.bgw_name, BGW_MAXLEN, + "Citus Maintenance Daemon: %u/%u", + MyDatabaseId, extensionOwner); + + /* request ability to connect to target database */ + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + + /* + * No point in getting started before able to run query, but we do + * want to get started on Hot-Stanby standbys. + */ + worker.bgw_start_time = BgWorkerStart_ConsistentState; + + /* + * Restart after a bit after errors, but don't bog the system. + */ + worker.bgw_restart_time = 5; + sprintf(worker.bgw_library_name, "citus"); + sprintf(worker.bgw_function_name, "CitusMaintenanceDaemonMain"); + worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); + memcpy(worker.bgw_extra, &extensionOwner, sizeof(Oid)); + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + ereport(ERROR, (errmsg("could not start maintenance background worker"), + errhint("Increasing max_worker_processes might help."))); + } + + dbData->daemonStarted = true; + LWLockRelease(&MaintenanceDaemonControl->lock); + + WaitForBackgroundWorkerStartup(handle, &pid); + } + else + { + Assert(dbData->daemonStarted); + + /* + * If owner of extension changed, wake up daemon. It'll notice and + * restart. + */ + if (dbData->userOid != extensionOwner) + { + dbData->userOid = extensionOwner; + if (dbData->latch) + { + SetLatch(dbData->latch); + } + } + LWLockRelease(&MaintenanceDaemonControl->lock); + } +} + + +/* + * CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll + * be started by the background worker infrastructure. If it errors out, + * it'll be restarted after a few seconds. + */ +void +CitusMaintenanceDaemonMain(Datum main_arg) +{ + Oid databaseOid = DatumGetObjectId(main_arg); + MaintenanceDaemonDBData *myDbData = NULL; + ErrorContextCallback errorCallback; + + /* + * Look up this worker's configuration. + */ + LWLockAcquire(&MaintenanceDaemonControl->lock, LW_SHARED); + + myDbData = (MaintenanceDaemonDBData *) + hash_search(MaintenanceDaemonControl->dbHash, &databaseOid, + HASH_FIND, NULL); + if (!myDbData) + { + /* should never happen */ + ereport(ERROR, (errmsg("got lost finding myself"))); + } + LWLockRelease(&MaintenanceDaemonControl->lock); + + + myDbData->latch = MyLatch; + + /* + * Setup error context so log messages can be properly attributed. Some of + * them otherwise sound like they might be from a normal user connection. + * Do so before setting up signals etc, so we never exit without the + * context setup. + */ + memset(&errorCallback, 0, sizeof(errorCallback)); + errorCallback.callback = MaintenanceDaemonErrorContext; + errorCallback.arg = (void *) myDbData; + errorCallback.previous = error_context_stack; + error_context_stack = &errorCallback; + + /* wire up signals */ + pqsignal(SIGTERM, die); + pqsignal(SIGHUP, MaintenanceDaemonSigHupHandler); + BackgroundWorkerUnblockSignals(); + + elog(LOG, "starting maintenance daemon on database %u user %u", + databaseOid, myDbData->userOid); + + /* connect to database, after that we can actually access catalogs */ + BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid); + + /* make worker recognizable in pg_stat_activity */ + pgstat_report_appname("Citus Maintenance Daemon"); + + /* enter main loop */ + for (;;) + { + int rc; + int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; + int timeout = 10000; /* wake up at least every so often */ + + CHECK_FOR_INTERRUPTS(); + + /* + * Perform Work. If a specific task needs to be called sooner than + * timeout indicates, it's ok to lower it to that value. Expensive + * tasks should do their own time math about whether to re-run checks. + */ + + /* + * Wait until timeout, or until somebody wakes us up. + */ + rc = WaitLatch(MyLatch, latchFlags, timeout); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + { + proc_exit(1); + } + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + + /* check for changed configuration */ + if (myDbData->userOid != GetSessionUserId()) + { + /* return code of 1 requests worker restart */ + proc_exit(1); + } + + /* + * Could also add code checking whether extension still exists, + * but that'd complicate things a bit, because we'd have to delete + * the shared memory entry. There'd potentially be a race + * condition where the extension gets re-created, checking that + * this entry still exists, and it getting deleted just after. + * Doesn't seem worth catering for that. + */ + } + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + } +} + + +/* + * MaintenanceDaemonShmemSize computes how much shared memory is required. + */ +static size_t +MaintenanceDaemonShmemSize(void) +{ + Size size = 0; + Size hashSize = 0; + + size = add_size(size, sizeof(MaintenanceDaemonControlData)); + + /* + * We request enough shared memory to have one hash-table entry for each + * worker process. We couldn't start more anyway, so there's little point + * in allocating more. + */ + hashSize = hash_estimate_size(max_worker_processes, sizeof(MaintenanceDaemonDBData)); + size = add_size(size, hashSize); + + return size; +} + + +/* + * MaintenanceDaemonShmemInit initializes the requested shared memory for the + * maintenance daemon. + */ +static void +MaintenanceDaemonShmemInit(void) +{ + bool alreadyInitialized = false; + HASHCTL hashInfo; + int hashFlags = 0; + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + MaintenanceDaemonControl = + (MaintenanceDaemonControlData *) ShmemInitStruct("Citus Maintenance Daemon", + MaintenanceDaemonShmemSize(), + &alreadyInitialized); + + /* + * Might already be initialized on EXEC_BACKEND type platforms that call + * shared library initialization functions in every backend. + */ + if (!alreadyInitialized) + { + /* initialize lwlock */ + LWLockTranche *tranche = &MaintenanceDaemonControl->lockTranche; + + /* start by zeroing out all the memory */ + memset(MaintenanceDaemonControl, 0, MaintenanceDaemonShmemSize()); + + /* initialize lock */ + MaintenanceDaemonControl->trancheId = LWLockNewTrancheId(); + tranche->array_base = &MaintenanceDaemonControl->lock; + tranche->array_stride = sizeof(LWLock); + tranche->name = "Citus Maintenance Daemon"; + LWLockRegisterTranche(MaintenanceDaemonControl->trancheId, tranche); + LWLockInitialize(&MaintenanceDaemonControl->lock, + MaintenanceDaemonControl->trancheId); + } + + + memset(&hashInfo, 0, sizeof(hashInfo)); + hashInfo.keysize = sizeof(Oid); + hashInfo.entrysize = sizeof(MaintenanceDaemonDBData); + hashInfo.hash = tag_hash; + hashFlags = (HASH_ELEM | HASH_FUNCTION); + + MaintenanceDaemonControl->dbHash = + ShmemInitHash("Maintenance Database Hash", + max_worker_processes, max_worker_processes, + &hashInfo, hashFlags); + + LWLockRelease(AddinShmemInitLock); + + if (prev_shmem_startup_hook != NULL) + { + prev_shmem_startup_hook(); + } +} + + +/* + * MaintenanceDaemonSigHupHandler set a flag to re-read config file at next + * convenient time. + */ +static void +MaintenanceDaemonSigHupHandler(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGHUP = true; + if (MyProc != NULL) + { + SetLatch(&MyProc->procLatch); + } + + errno = save_errno; +} + + +/* + * MaintenanceDaemonErrorContext adds some context to log messages to make it + * easier to associate them with the maintenance daemon. + */ +static void +MaintenanceDaemonErrorContext(void *arg) +{ + MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) arg; + errcontext("Citus maintenance daemon for database %u user %u", + myDbData->databaseOid, myDbData->userOid); +} diff --git a/src/include/distributed/maintenanced.h b/src/include/distributed/maintenanced.h new file mode 100644 index 000000000..bb6ebcd6b --- /dev/null +++ b/src/include/distributed/maintenanced.h @@ -0,0 +1,20 @@ +/*------------------------------------------------------------------------- + * + * maintenanced.h + * Background worker run for each citus using database in a postgres + * cluster. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef MAINTENANCED_H +#define MAINTENANCED_H + +extern void InitializeMaintenanceDaemon(void); +extern void InitializeMaintenanceDaemonBackend(void); + +extern void CitusMaintenanceDaemonMain(Datum main_arg); + +#endif /* MAINTENANCED_H */