/*------------------------------------------------------------------------- * * maintenanced.c * Background worker run for each citus using database in a postgres * cluster. * * This file provides infrastructure for launching exactly one a background * worker for every database in which citus is used. That background worker * can then perform work like deadlock detection, prepared transaction * recovery, and cleanup. * * Copyright (c) Citus Data, Inc. * *------------------------------------------------------------------------- */ #include #include "postgres.h" #include "miscadmin.h" #include "pgstat.h" #include "access/xact.h" #include "access/xlog.h" #include "catalog/namespace.h" #include "catalog/pg_authid.h" #include "catalog/pg_extension.h" #include "catalog/pg_namespace.h" #include "commands/async.h" #include "commands/extension.h" #include "common/hashfn.h" #include "libpq/pqsignal.h" #include "nodes/makefuncs.h" #include "postmaster/bgworker.h" #include "postmaster/postmaster.h" #include "storage/ipc.h" #include "storage/latch.h" #include "storage/lmgr.h" #include "storage/lwlock.h" #include "storage/proc.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "citus_version.h" #include "pg_version_constants.h" #include "distributed/background_jobs.h" #include "distributed/citus_safe_lib.h" #include "distributed/connection_management.h" #include "distributed/coordinator_protocol.h" #include "distributed/distributed_deadlock_detection.h" #include "distributed/maintenanced.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/query_stats.h" #include "distributed/resource_lock.h" #include "distributed/shard_cleaner.h" #include "distributed/statistics_collection.h" #include "distributed/transaction_recovery.h" #include "distributed/version_compat.h" /* * Shared memory data for all maintenance workers. */ typedef struct MaintenanceDaemonControlData { /* * Lock protecting the shared memory state. This is to be taken when * looking up (shared mode) or inserting (exclusive mode) per-database * data in MaintenanceDaemonDBHash. */ int trancheId; char *lockTrancheName; LWLock lock; } MaintenanceDaemonControlData; /* * Per database worker state. */ typedef struct MaintenanceDaemonDBData { /* hash key: database to run on */ Oid databaseOid; /* information: which user to use */ Oid userOid; pid_t workerPid; bool daemonStarted; bool daemonShuttingDown; bool triggerNodeMetadataSync; Latch *latch; /* pointer to the background worker's latch */ } MaintenanceDaemonDBData; /* config variable for distributed deadlock detection timeout */ double DistributedDeadlockDetectionTimeoutFactor = 2.0; char *MaintenanceManagementDatabase = ""; int Recover2PCInterval = 60000; int DeferShardDeleteInterval = 15000; int BackgroundTaskQueueCheckInterval = 5000; int MaxBackgroundTaskExecutors = 4; char *MainDb = ""; /* config variables for metadata sync timeout */ int MetadataSyncInterval = 60000; int MetadataSyncRetryInterval = 5000; static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL; /* * Hash-table of workers, one entry for each database with citus * activated. */ static HTAB *MaintenanceDaemonDBHash; static ErrorContextCallback errorCallback = { 0 }; static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGTERM = false; /* set to true when becoming a maintenance daemon */ bool IsMaintenanceDaemon = false; static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS); static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS); static void MaintenanceDaemonShmemExit(int code, Datum arg); static void MaintenanceDaemonErrorContext(void *arg); static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData); static void WarnMaintenanceDaemonNotStarted(void); static MaintenanceDaemonDBData * GetMaintenanceDaemonDBHashEntry(Oid databaseId, bool *found); /* * InitializeMaintenanceDaemon, called at server start, is responsible for * requesting shared memory and related infrastructure required by maintenance * daemons. */ void InitializeMaintenanceDaemon(void) { prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = MaintenanceDaemonShmemInit; } /* * GetMaintenanceDaemonDBHashEntry searches the MaintenanceDaemonDBHash for the * databaseId. It returns the entry if found or creates a new entry and initializes * the value with zeroes. */ MaintenanceDaemonDBData * GetMaintenanceDaemonDBHashEntry(Oid databaseId, bool *found) { MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search( MaintenanceDaemonDBHash, &MyDatabaseId, HASH_ENTER_NULL, found); if (!dbData) { elog(LOG, "cannot create or find the maintenance deamon hash entry for database %u", databaseId); return NULL; } if (!*found) { /* ensure the values in MaintenanceDaemonDBData are zero */ memset(((char *) dbData) + sizeof(Oid), 0, sizeof(MaintenanceDaemonDBData) - sizeof(Oid)); } return dbData; } /* * InitializeMaintenanceDaemonForMainDb is called in _PG_Init * at which stage we are not in a transaction or have databaseOid */ void InitializeMaintenanceDaemonForMainDb(void) { if (strcmp(MainDb, "") == 0) { elog(LOG, "There is no designated Main database."); return; } BackgroundWorker worker; memset(&worker, 0, sizeof(worker)); strcpy_s(worker.bgw_name, sizeof(worker.bgw_name), "Citus Maintenance Daemon for Main DB"); /* request ability to connect to target database */ worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; /* * No point in getting started before able to run query, but we do * want to get started on Hot-Standby. */ worker.bgw_start_time = BgWorkerStart_ConsistentState; /* Restart after a bit after errors, but don't bog the system. */ worker.bgw_restart_time = 5; strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), "CitusMaintenanceDaemonMain"); worker.bgw_main_arg = (Datum) 0; RegisterBackgroundWorker(&worker); } /* * InitializeMaintenanceDaemonBackend, called at backend start and * configuration changes, is responsible for starting a per-database * maintenance worker if necessary. */ void InitializeMaintenanceDaemonBackend(void) { Oid extensionOwner = CitusExtensionOwner(); bool found = false; LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); MaintenanceDaemonDBData *dbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId, &found); if (dbData == NULL) { WarnMaintenanceDaemonNotStarted(); LWLockRelease(&MaintenanceDaemonControl->lock); return; } if (dbData->daemonShuttingDown) { elog(DEBUG1, "Another maintenance daemon for database %u is shutting down. " "Aborting current initialization", MyDatabaseId); LWLockRelease(&MaintenanceDaemonControl->lock); return; } if (IsMaintenanceDaemon) { /* * InitializeMaintenanceDaemonBackend is called by the maintenance daemon * itself. In that case, we clearly don't need to start another maintenance * daemon. */ LWLockRelease(&MaintenanceDaemonControl->lock); return; } if (!found || !dbData->daemonStarted) { Assert(dbData->workerPid == 0); BackgroundWorker worker; BackgroundWorkerHandle *handle = NULL; memset(&worker, 0, sizeof(worker)); SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name), "Citus Maintenance Daemon: %u/%u", MyDatabaseId, extensionOwner); /* request ability to connect to target database */ worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; /* * No point in getting started before able to run query, but we do * want to get started on Hot-Standby. */ worker.bgw_start_time = BgWorkerStart_ConsistentState; /* Restart after a bit after errors, but don't bog the system. */ worker.bgw_restart_time = 5; strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), "CitusMaintenanceDaemonMain"); worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner, sizeof(Oid)); worker.bgw_notify_pid = MyProcPid; if (!RegisterDynamicBackgroundWorker(&worker, &handle)) { WarnMaintenanceDaemonNotStarted(); dbData->daemonStarted = false; LWLockRelease(&MaintenanceDaemonControl->lock); return; } dbData->daemonStarted = true; dbData->userOid = extensionOwner; dbData->workerPid = 0; dbData->triggerNodeMetadataSync = false; LWLockRelease(&MaintenanceDaemonControl->lock); pid_t pid; WaitForBackgroundWorkerStartup(handle, &pid); pfree(handle); } else { Assert(dbData->daemonStarted); /* * If owner of extension changed, wake up daemon. It'll notice and * restart. */ if (dbData->userOid != extensionOwner) { dbData->userOid = extensionOwner; if (dbData->latch) { SetLatch(dbData->latch); } } LWLockRelease(&MaintenanceDaemonControl->lock); } } /* * WarnMaintenanceDaemonNotStarted warns that maintenanced couldn't be started. */ static void WarnMaintenanceDaemonNotStarted(void) { ereport(WARNING, (errmsg("could not start maintenance background worker"), errhint("Increasing max_worker_processes might help."))); } /* * ConnectToDatabase connects to the database for the given databaseOid. * if databaseOid is 0, connects to MainDb and then creates a hash entry. * If a hash entry cannot be created for MainDb it exits the process requesting a restart. * However for regular databases, it exits without requesting a restart since another * subsequent backend is expected to start the Maintenance Daemon. * If the found hash entry has a valid workerPid, it exits * without requesting a restart since there is already a daemon running. */ static MaintenanceDaemonDBData * ConnectToDatabase(Oid databaseOid) { MaintenanceDaemonDBData *myDbData = NULL; bool isMainDb = false; LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); if (databaseOid == 0) { char *databaseName = MainDb; /* * Since we cannot query databaseOid without initializing Postgres * first, connect to the database by name. */ BackgroundWorkerInitializeConnection(databaseName, NULL, 0); /* * Now we have a valid MyDatabaseId. * Insert the hash entry for the database to the Maintenance Deamon Hash. */ bool found = false; myDbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId, &found); if (!myDbData) { /* * If an entry cannot be created, * return code of 1 requests worker restart * Since BackgroundWorker for the MainDb is only registered * once during server startup, we need to retry. */ proc_exit(1); } if (found && myDbData->workerPid != 0) { /* Another maintenance daemon is running.*/ proc_exit(0); } databaseOid = MyDatabaseId; myDbData->userOid = GetSessionUserId(); isMainDb = true; } else { myDbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonDBHash, &databaseOid, HASH_FIND, NULL); if (!myDbData) { /* * When the database crashes, background workers are restarted, but * the state in shared memory is lost. In that case, we exit and * wait for a session to call InitializeMaintenanceDaemonBackend * to properly add it to the hash. */ proc_exit(0); } if (myDbData->workerPid != 0) { /* * Another maintenance daemon is running. This usually happens because * postgres restarts the daemon after an non-zero exit, and * InitializeMaintenanceDaemonBackend started one before postgres did. * In that case, the first one stays and the last one exits. */ proc_exit(0); } } before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(databaseOid)); /* * Signal that I am the maintenance daemon now. * * From this point, DROP DATABASE/EXTENSION will send a SIGTERM to me. */ myDbData->workerPid = MyProcPid; /* * Signal that we are running. This in mainly needed in case of restart after * an error, otherwise the daemonStarted flag is already true. */ myDbData->daemonStarted = true; /* wire up signals */ pqsignal(SIGTERM, MaintenanceDaemonSigTermHandler); pqsignal(SIGHUP, MaintenanceDaemonSigHupHandler); BackgroundWorkerUnblockSignals(); myDbData->latch = MyLatch; IsMaintenanceDaemon = true; LWLockRelease(&MaintenanceDaemonControl->lock); memset(&errorCallback, 0, sizeof(errorCallback)); errorCallback.callback = MaintenanceDaemonErrorContext; errorCallback.arg = (void *) myDbData; errorCallback.previous = error_context_stack; error_context_stack = &errorCallback; elog(LOG, "starting maintenance daemon on database %u user %u", databaseOid, myDbData->userOid); if (!isMainDb) { /* connect to database, after that we can actually access catalogs */ BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0); } return myDbData; } /* * CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll * be started by the background worker infrastructure. If it errors out, * it'll be restarted after a few seconds. */ void CitusMaintenanceDaemonMain(Datum main_arg) { Oid databaseOid = DatumGetObjectId(main_arg); TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000); bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false; TimestampTz lastRecoveryTime = 0; TimestampTz lastShardCleanTime = 0; TimestampTz lastStatStatementsPurgeTime = 0; TimestampTz nextMetadataSyncTime = 0; /* state kept for the background tasks queue monitor */ TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp(); BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL; bool backgroundTasksQueueWarnedForLock = false; /* * We do metadata sync in a separate background worker. We need its * handle to be able to check its status. */ BackgroundWorkerHandle *metadataSyncBgwHandle = NULL; MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid); /* make worker recognizable in pg_stat_activity */ pgstat_report_appname("Citus Maintenance Daemon"); /* * Terminate orphaned metadata sync daemons spawned from previously terminated * or crashed maintenanced instances. */ SignalMetadataSyncDaemon(MyDatabaseId, SIGTERM); /* enter main loop */ while (!got_SIGTERM) { int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; double timeout = 10000.0; /* use this if the deadlock detection is disabled */ bool foundDeadlock = false; CHECK_FOR_INTERRUPTS(); CitusTableCacheFlushInvalidatedEntries(); /* * XXX: Each task should clear the metadata cache before every iteration * by calling InvalidateMetadataSystemCache(), because otherwise it * might contain stale OIDs. It appears that in some cases invalidation * messages for a DROP EXTENSION may arrive during these tasks and * this causes us to cache a stale pg_dist_node OID. We'd actually expect * all invalidations to arrive after obtaining a lock in LockCitusExtension. */ /* * Perform Work. If a specific task needs to be called sooner than * timeout indicates, it's ok to lower it to that value. Expensive * tasks should do their own time math about whether to re-run checks. */ #ifdef HAVE_LIBCURL if (EnableStatisticsCollection && GetCurrentTimestamp() >= nextStatsCollectionTime) { bool statsCollectionSuccess = false; InvalidateMetadataSystemCache(); StartTransactionCommand(); /* * Lock the extension such that it cannot be dropped or created * concurrently. Skip statistics collection if citus extension is * not accessible. * * Similarly, we skip statistics collection if there exists any * version mismatch or the extension is not fully created yet. */ if (!LockCitusExtension()) { ereport(DEBUG1, (errmsg("could not lock the citus extension, " "skipping statistics collection"))); } else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) { FlushDistTableCache(); WarnIfSyncDNS(); statsCollectionSuccess = CollectBasicUsageStatistics(); } /* * If statistics collection was successful the next collection is * 24-hours later. Also, if this was a retry attempt we don't do * any more retries until 24-hours later, so we limit number of * retries to one. */ if (statsCollectionSuccess || retryStatsCollection) { nextStatsCollectionTime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), STATS_COLLECTION_TIMEOUT_MILLIS); retryStatsCollection = false; } else { nextStatsCollectionTime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), STATS_COLLECTION_RETRY_TIMEOUT_MILLIS); retryStatsCollection = true; } CommitTransactionCommand(); } #endif pid_t metadataSyncBgwPid = 0; BgwHandleStatus metadataSyncStatus = metadataSyncBgwHandle != NULL ? GetBackgroundWorkerPid(metadataSyncBgwHandle, &metadataSyncBgwPid) : BGWH_STOPPED; if (metadataSyncStatus != BGWH_STOPPED && GetCurrentTimestamp() >= nextMetadataSyncTime) { /* * Metadata sync is still running, recheck in a short while. */ int nextTimeout = MetadataSyncRetryInterval; nextMetadataSyncTime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout); timeout = Min(timeout, nextTimeout); } else if (!RecoveryInProgress() && metadataSyncStatus == BGWH_STOPPED && (MetadataSyncTriggeredCheckAndReset(myDbData) || GetCurrentTimestamp() >= nextMetadataSyncTime)) { if (metadataSyncBgwHandle) { pfree(metadataSyncBgwHandle); metadataSyncBgwHandle = NULL; } InvalidateMetadataSystemCache(); StartTransactionCommand(); PushActiveSnapshot(GetTransactionSnapshot()); int nextTimeout = MetadataSyncRetryInterval; bool syncMetadata = false; if (!LockCitusExtension()) { ereport(DEBUG1, (errmsg("could not lock the citus extension, " "skipping metadata sync"))); } else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) { bool lockFailure = false; syncMetadata = ShouldInitiateMetadataSync(&lockFailure); /* * If lock fails, we need to recheck in a short while. If we are * going to sync metadata, we should recheck in a short while to * see if it failed. Otherwise, we can wait longer. */ nextTimeout = (lockFailure || syncMetadata) ? MetadataSyncRetryInterval : MetadataSyncInterval; } PopActiveSnapshot(); CommitTransactionCommand(); if (syncMetadata) { metadataSyncBgwHandle = SpawnSyncNodeMetadataToNodes(MyDatabaseId, myDbData->userOid); } nextMetadataSyncTime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout); timeout = Min(timeout, nextTimeout); } /* * If enabled, run 2PC recovery on primary nodes (where !RecoveryInProgress()), * since we'll write to the pg_dist_transaction log. */ if (Recover2PCInterval > 0 && !RecoveryInProgress() && TimestampDifferenceExceeds(lastRecoveryTime, GetCurrentTimestamp(), Recover2PCInterval)) { int recoveredTransactionCount = 0; InvalidateMetadataSystemCache(); StartTransactionCommand(); if (!LockCitusExtension()) { ereport(DEBUG1, (errmsg("could not lock the citus extension, " "skipping 2PC recovery"))); } else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) { /* * Record last recovery time at start to ensure we run once per * Recover2PCInterval even if RecoverTwoPhaseCommits takes some time. */ lastRecoveryTime = GetCurrentTimestamp(); recoveredTransactionCount = RecoverTwoPhaseCommits(); } CommitTransactionCommand(); if (recoveredTransactionCount > 0) { ereport(LOG, (errmsg("maintenance daemon recovered %d distributed " "transactions", recoveredTransactionCount))); } /* make sure we don't wait too long */ timeout = Min(timeout, Recover2PCInterval); } /* the config value -1 disables the distributed deadlock detection */ if (DistributedDeadlockDetectionTimeoutFactor != -1.0) { double deadlockTimeout = DistributedDeadlockDetectionTimeoutFactor * (double) DeadlockTimeout; InvalidateMetadataSystemCache(); StartTransactionCommand(); /* * We skip the deadlock detection if citus extension * is not accessible. * * Similarly, we skip to run the deadlock checks if * there exists any version mismatch or the extension * is not fully created yet. */ if (!LockCitusExtension()) { ereport(DEBUG1, (errmsg("could not lock the citus extension, " "skipping deadlock detection"))); } else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) { foundDeadlock = CheckForDistributedDeadlocks(); } CommitTransactionCommand(); /* * If we find any deadlocks, run the distributed deadlock detection * more often since it is quite possible that there are other * deadlocks need to be resolved. * * Thus, we use 1/20 of the calculated value. With the default * values (i.e., deadlock_timeout 1 seconds, * citus.distributed_deadlock_detection_factor 2), we'd be able to cancel * ~10 distributed deadlocks per second. */ if (foundDeadlock) { deadlockTimeout = deadlockTimeout / 20.0; } /* make sure we don't wait too long */ timeout = Min(timeout, deadlockTimeout); } if (!RecoveryInProgress() && DeferShardDeleteInterval > 0 && TimestampDifferenceExceeds(lastShardCleanTime, GetCurrentTimestamp(), DeferShardDeleteInterval)) { int numberOfDroppedResources = 0; InvalidateMetadataSystemCache(); StartTransactionCommand(); if (!LockCitusExtension()) { ereport(DEBUG1, (errmsg( "could not lock the citus extension, skipping shard cleaning"))); } else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) { /* * Record last shard clean time at start to ensure we run once per * DeferShardDeleteInterval. */ lastShardCleanTime = GetCurrentTimestamp(); numberOfDroppedResources = TryDropOrphanedResources(); } CommitTransactionCommand(); if (numberOfDroppedResources > 0) { ereport(LOG, (errmsg("maintenance daemon dropped %d " "resources previously marked to be removed", numberOfDroppedResources))); } /* make sure we don't wait too long */ timeout = Min(timeout, DeferShardDeleteInterval); } if (StatStatementsPurgeInterval > 0 && StatStatementsTrack != STAT_STATEMENTS_TRACK_NONE && TimestampDifferenceExceeds(lastStatStatementsPurgeTime, GetCurrentTimestamp(), (StatStatementsPurgeInterval * 1000))) { StartTransactionCommand(); if (!LockCitusExtension()) { ereport(DEBUG1, (errmsg("could not lock the citus extension, " "skipping stat statements purging"))); } else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) { /* * Record last time we perform the purge to ensure we run once per * StatStatementsPurgeInterval. */ lastStatStatementsPurgeTime = GetCurrentTimestamp(); CitusQueryStatsSynchronizeEntries(); } CommitTransactionCommand(); /* make sure we don't wait too long, need to convert seconds to milliseconds */ timeout = Min(timeout, (StatStatementsPurgeInterval * 1000)); } pid_t backgroundTaskQueueWorkerPid = 0; BgwHandleStatus backgroundTaskQueueWorkerStatus = backgroundTasksQueueBgwHandle != NULL ? GetBackgroundWorkerPid( backgroundTasksQueueBgwHandle, &backgroundTaskQueueWorkerPid) : BGWH_STOPPED; if (!RecoveryInProgress() && BackgroundTaskQueueCheckInterval > 0 && TimestampDifferenceExceeds(lastBackgroundTaskQueueCheck, GetCurrentTimestamp(), BackgroundTaskQueueCheckInterval) && backgroundTaskQueueWorkerStatus == BGWH_STOPPED) { /* clear old background worker for task queue before checking for new tasks */ if (backgroundTasksQueueBgwHandle) { pfree(backgroundTasksQueueBgwHandle); backgroundTasksQueueBgwHandle = NULL; } StartTransactionCommand(); bool shouldStartBackgroundTaskQueueBackgroundWorker = false; if (!LockCitusExtension()) { ereport(DEBUG1, (errmsg("could not lock the citus extension, " "skipping stat statements purging"))); } else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) { /* perform catalog precheck */ shouldStartBackgroundTaskQueueBackgroundWorker = HasRunnableBackgroundTask(); } CommitTransactionCommand(); if (shouldStartBackgroundTaskQueueBackgroundWorker) { /* * Before we start the background worker we want to check if an orphaned * one is still running. This could happen when the maintenance daemon * restarted in a way where the background task queue monitor wasn't * restarted. * * To check if an orphaned background task queue monitor is still running * we quickly acquire the lock without waiting. If we can't acquire the * lock this means that some other backed still has the lock. We prevent a * new backend from starting and log a warning that we found that another * process still holds the lock. */ LOCKTAG tag = { 0 }; SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR); const bool sessionLock = false; const bool dontWait = true; LockAcquireResult locked = LockAcquire(&tag, AccessExclusiveLock, sessionLock, dontWait); if (locked == LOCKACQUIRE_NOT_AVAIL) { if (!backgroundTasksQueueWarnedForLock) { ereport(WARNING, (errmsg("background task queue monitor already " "held"), errdetail("the background task queue monitor " "lock is held by another backend, " "indicating the maintenance daemon " "has lost track of an already " "running background task queue " "monitor, not starting a new one"))); backgroundTasksQueueWarnedForLock = true; } } else { LockRelease(&tag, AccessExclusiveLock, sessionLock); /* we were able to acquire the lock, reset the warning tracker */ backgroundTasksQueueWarnedForLock = false; /* spawn background worker */ ereport(LOG, (errmsg("found scheduled background tasks, starting new " "background task queue monitor"))); backgroundTasksQueueBgwHandle = StartCitusBackgroundTaskQueueMonitor(MyDatabaseId, myDbData->userOid); if (!backgroundTasksQueueBgwHandle || GetBackgroundWorkerPid(backgroundTasksQueueBgwHandle, &backgroundTaskQueueWorkerPid) == BGWH_STOPPED) { ereport(WARNING, (errmsg("unable to start background worker for " "background task execution"))); } } } /* interval management */ lastBackgroundTaskQueueCheck = GetCurrentTimestamp(); timeout = Min(timeout, BackgroundTaskQueueCheckInterval); } /* * Wait until timeout, or until somebody wakes us up. Also cast the timeout to * integer where we've calculated it using double for not losing the precision. */ int rc = WaitLatch(MyLatch, latchFlags, (long) timeout, PG_WAIT_EXTENSION); /* emergency bailout if postmaster has died */ if (rc & WL_POSTMASTER_DEATH) { proc_exit(1); } if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); /* check for changed configuration */ if (myDbData->userOid != GetSessionUserId()) { /* return code of 1 requests worker restart */ proc_exit(1); } /* * Could also add code checking whether extension still exists, * but that'd complicate things a bit, because we'd have to delete * the shared memory entry. There'd potentially be a race * condition where the extension gets re-created, checking that * this entry still exists, and it getting deleted just after. * Doesn't seem worth catering for that. */ } if (got_SIGHUP) { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); } } if (metadataSyncBgwHandle) { TerminateBackgroundWorker(metadataSyncBgwHandle); } } /* * MaintenanceDaemonShmemSize computes how much shared memory is required. */ size_t MaintenanceDaemonShmemSize(void) { Size size = 0; size = add_size(size, sizeof(MaintenanceDaemonControlData)); /* * We request enough shared memory to have one hash-table entry for each * worker process. We couldn't start more anyway, so there's little point * in allocating more. */ Size hashSize = hash_estimate_size(max_worker_processes, sizeof(MaintenanceDaemonDBData)); size = add_size(size, hashSize); return size; } /* * MaintenanceDaemonShmemInit initializes the requested shared memory for the * maintenance daemon. */ void MaintenanceDaemonShmemInit(void) { bool alreadyInitialized = false; HASHCTL hashInfo; LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); MaintenanceDaemonControl = (MaintenanceDaemonControlData *) ShmemInitStruct("Citus Maintenance Daemon", MaintenanceDaemonShmemSize(), &alreadyInitialized); /* * Might already be initialized on EXEC_BACKEND type platforms that call * shared library initialization functions in every backend. */ if (!alreadyInitialized) { MaintenanceDaemonControl->trancheId = LWLockNewTrancheId(); MaintenanceDaemonControl->lockTrancheName = "Citus Maintenance Daemon"; LWLockRegisterTranche(MaintenanceDaemonControl->trancheId, MaintenanceDaemonControl->lockTrancheName); LWLockInitialize(&MaintenanceDaemonControl->lock, MaintenanceDaemonControl->trancheId); } memset(&hashInfo, 0, sizeof(hashInfo)); hashInfo.keysize = sizeof(Oid); hashInfo.entrysize = sizeof(MaintenanceDaemonDBData); hashInfo.hash = tag_hash; int hashFlags = (HASH_ELEM | HASH_FUNCTION); MaintenanceDaemonDBHash = ShmemInitHash("Maintenance Database Hash", max_worker_processes, max_worker_processes, &hashInfo, hashFlags); LWLockRelease(AddinShmemInitLock); if (prev_shmem_startup_hook != NULL) { prev_shmem_startup_hook(); } } /* * MaintenaceDaemonShmemExit is the before_shmem_exit handler for cleaning up MaintenanceDaemonDBHash */ static void MaintenanceDaemonShmemExit(int code, Datum arg) { Oid databaseOid = DatumGetObjectId(arg); LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonDBHash, &databaseOid, HASH_REMOVE, NULL); /* Workaround for -Werror=unused-variable */ (void) myDbData; Assert(myDbData->workerPid == MyProcPid); LWLockRelease(&MaintenanceDaemonControl->lock); } /* MaintenanceDaemonSigTermHandler sets the got_SIGTERM flag.*/ static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS) { int save_errno = errno; got_SIGTERM = true; if (MyProc != NULL) { SetLatch(&MyProc->procLatch); } errno = save_errno; } /* * MaintenanceDaemonSigHupHandler set a flag to re-read config file at next * convenient time. */ static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS) { int save_errno = errno; got_SIGHUP = true; if (MyProc != NULL) { SetLatch(&MyProc->procLatch); } errno = save_errno; } /* * MaintenanceDaemonErrorContext adds some context to log messages to make it * easier to associate them with the maintenance daemon. */ static void MaintenanceDaemonErrorContext(void *arg) { MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) arg; errcontext("Citus maintenance daemon for database %u user %u", myDbData->databaseOid, myDbData->userOid); } /* * LockCitusExtension acquires a lock on the Citus extension or returns * false if the extension does not exist or is being dropped. */ bool LockCitusExtension(void) { Oid extensionOid = get_extension_oid("citus", true); if (extensionOid == InvalidOid) { /* citus extension does not exist */ return false; } LockDatabaseObject(ExtensionRelationId, extensionOid, 0, AccessShareLock); /* * The extension may have been dropped and possibly recreated prior to * obtaining a lock. Check whether we still get the expected OID. */ Oid recheckExtensionOid = get_extension_oid("citus", true); if (recheckExtensionOid != extensionOid) { return false; } return true; } /* * StopMaintenanceDaemon stops the maintenance daemon for the * given database and removes it from the maintenance daemon * control hash. */ void StopMaintenanceDaemon(Oid databaseId) { bool found = false; pid_t workerPid = 0; LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search( MaintenanceDaemonDBHash, &databaseId, HASH_FIND, &found); if (found) { workerPid = dbData->workerPid; dbData->daemonShuttingDown = true; } LWLockRelease(&MaintenanceDaemonControl->lock); if (workerPid > 0) { kill(workerPid, SIGTERM); } } /* * TriggerMetadataSync triggers the maintenance daemon to do * a node metadata sync for the given database. */ void TriggerNodeMetadataSync(Oid databaseId) { bool found = false; LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search( MaintenanceDaemonDBHash, &databaseId, HASH_FIND, &found); if (found) { dbData->triggerNodeMetadataSync = true; /* set latch to wake-up the maintenance loop */ SetLatch(dbData->latch); } LWLockRelease(&MaintenanceDaemonControl->lock); } /* * MetadataSyncTriggeredCheckAndReset checks if metadata sync has been * triggered for the given database, and resets the flag. */ static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData) { LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); bool metadataSyncTriggered = dbData->triggerNodeMetadataSync; dbData->triggerNodeMetadataSync = false; LWLockRelease(&MaintenanceDaemonControl->lock); return metadataSyncTriggered; }