mirror of https://github.com/citusdata/citus.git
Refactor background worker setup for security improvements (#8078)
Enhance security by addressing a code scanning alert and refactoring the background worker setup code for better maintainability and clarity. --------- Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>pull/8048/head^2
parent
41883cea38
commit
f73da1ed40
|
|
@ -58,6 +58,7 @@
|
||||||
|
|
||||||
#include "distributed/argutils.h"
|
#include "distributed/argutils.h"
|
||||||
#include "distributed/backend_data.h"
|
#include "distributed/backend_data.h"
|
||||||
|
#include "distributed/background_worker_utils.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
#include "distributed/commands.h"
|
#include "distributed/commands.h"
|
||||||
|
|
@ -3156,37 +3157,26 @@ MetadataSyncSigAlrmHandler(SIGNAL_ARGS)
|
||||||
BackgroundWorkerHandle *
|
BackgroundWorkerHandle *
|
||||||
SpawnSyncNodeMetadataToNodes(Oid database, Oid extensionOwner)
|
SpawnSyncNodeMetadataToNodes(Oid database, Oid extensionOwner)
|
||||||
{
|
{
|
||||||
BackgroundWorker worker;
|
char workerName[BGW_MAXLEN];
|
||||||
BackgroundWorkerHandle *handle = NULL;
|
|
||||||
|
|
||||||
/* Configure a worker. */
|
SafeSnprintf(workerName, BGW_MAXLEN,
|
||||||
memset(&worker, 0, sizeof(worker));
|
|
||||||
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
|
|
||||||
"Citus Metadata Sync: %u/%u",
|
"Citus Metadata Sync: %u/%u",
|
||||||
database, extensionOwner);
|
database, extensionOwner);
|
||||||
worker.bgw_flags =
|
|
||||||
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
|
||||||
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
|
||||||
|
|
||||||
/* don't restart, we manage restarts from maintenance daemon */
|
CitusBackgroundWorkerConfig config = {
|
||||||
worker.bgw_restart_time = BGW_NEVER_RESTART;
|
.workerName = workerName,
|
||||||
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
|
.functionName = "SyncNodeMetadataToNodesMain",
|
||||||
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
.mainArg = ObjectIdGetDatum(MyDatabaseId),
|
||||||
"SyncNodeMetadataToNodesMain");
|
.extensionOwner = extensionOwner,
|
||||||
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
|
.needsNotification = true,
|
||||||
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
|
.waitForStartup = false,
|
||||||
sizeof(Oid));
|
.restartTime = CITUS_BGW_NEVER_RESTART,
|
||||||
worker.bgw_notify_pid = MyProcPid;
|
.startTime = CITUS_BGW_DEFAULT_START_TIME,
|
||||||
|
.workerType = NULL, /* use default */
|
||||||
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
.extraData = NULL,
|
||||||
{
|
.extraDataSize = 0
|
||||||
return NULL;
|
};
|
||||||
}
|
return RegisterCitusBackgroundWorker(&config);
|
||||||
|
|
||||||
pid_t pid;
|
|
||||||
WaitForBackgroundWorkerStartup(handle, &pid);
|
|
||||||
|
|
||||||
return handle;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,7 @@
|
||||||
#include "storage/latch.h"
|
#include "storage/latch.h"
|
||||||
#include "utils/snapmgr.h"
|
#include "utils/snapmgr.h"
|
||||||
|
|
||||||
|
#include "distributed/background_worker_utils.h"
|
||||||
#include "distributed/citus_acquire_lock.h"
|
#include "distributed/citus_acquire_lock.h"
|
||||||
#include "distributed/citus_safe_lib.h"
|
#include "distributed/citus_safe_lib.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
|
|
@ -65,34 +66,33 @@ static bool got_sigterm = false;
|
||||||
BackgroundWorkerHandle *
|
BackgroundWorkerHandle *
|
||||||
StartLockAcquireHelperBackgroundWorker(int backendToHelp, int32 lock_cooldown)
|
StartLockAcquireHelperBackgroundWorker(int backendToHelp, int32 lock_cooldown)
|
||||||
{
|
{
|
||||||
BackgroundWorkerHandle *handle = NULL;
|
|
||||||
LockAcquireHelperArgs args;
|
LockAcquireHelperArgs args;
|
||||||
BackgroundWorker worker;
|
|
||||||
memset(&args, 0, sizeof(args));
|
memset(&args, 0, sizeof(args));
|
||||||
memset(&worker, 0, sizeof(worker));
|
|
||||||
|
|
||||||
/* collect the extra arguments required for the background worker */
|
/* collect the extra arguments required for the background worker */
|
||||||
args.DatabaseId = MyDatabaseId;
|
args.DatabaseId = MyDatabaseId;
|
||||||
args.lock_cooldown = lock_cooldown;
|
args.lock_cooldown = lock_cooldown;
|
||||||
|
|
||||||
/* construct the background worker and start it */
|
char workerName[BGW_MAXLEN];
|
||||||
SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name),
|
SafeSnprintf(workerName, BGW_MAXLEN,
|
||||||
"Citus Lock Acquire Helper: %d/%u", backendToHelp, MyDatabaseId);
|
"Citus Lock Acquire Helper: %d/%u", backendToHelp, MyDatabaseId);
|
||||||
strcpy_s(worker.bgw_type, sizeof(worker.bgw_type), "citus_lock_aqcuire");
|
|
||||||
|
|
||||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
CitusBackgroundWorkerConfig config = {
|
||||||
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
.workerName = workerName,
|
||||||
worker.bgw_restart_time = BGW_NEVER_RESTART;
|
.functionName = "LockAcquireHelperMain",
|
||||||
|
.mainArg = Int32GetDatum(backendToHelp),
|
||||||
|
.extensionOwner = InvalidOid,
|
||||||
|
.needsNotification = false,
|
||||||
|
.waitForStartup = false,
|
||||||
|
.restartTime = CITUS_BGW_NEVER_RESTART,
|
||||||
|
.startTime = BgWorkerStart_RecoveryFinished,
|
||||||
|
.workerType = "citus_lock_aqcuire",
|
||||||
|
.extraData = &args,
|
||||||
|
.extraDataSize = sizeof(args)
|
||||||
|
};
|
||||||
|
|
||||||
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
|
BackgroundWorkerHandle *handle = RegisterCitusBackgroundWorker(&config);
|
||||||
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_function_name),
|
if (!handle)
|
||||||
"LockAcquireHelperMain");
|
|
||||||
worker.bgw_main_arg = Int32GetDatum(backendToHelp);
|
|
||||||
worker.bgw_notify_pid = 0;
|
|
||||||
|
|
||||||
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &args, sizeof(args));
|
|
||||||
|
|
||||||
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -56,6 +56,7 @@
|
||||||
#include "utils/timeout.h"
|
#include "utils/timeout.h"
|
||||||
|
|
||||||
#include "distributed/background_jobs.h"
|
#include "distributed/background_jobs.h"
|
||||||
|
#include "distributed/background_worker_utils.h"
|
||||||
#include "distributed/citus_safe_lib.h"
|
#include "distributed/citus_safe_lib.h"
|
||||||
#include "distributed/hash_helpers.h"
|
#include "distributed/hash_helpers.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
|
|
@ -417,37 +418,26 @@ citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus)
|
||||||
BackgroundWorkerHandle *
|
BackgroundWorkerHandle *
|
||||||
StartCitusBackgroundTaskQueueMonitor(Oid database, Oid extensionOwner)
|
StartCitusBackgroundTaskQueueMonitor(Oid database, Oid extensionOwner)
|
||||||
{
|
{
|
||||||
BackgroundWorker worker = { 0 };
|
char workerName[BGW_MAXLEN];
|
||||||
BackgroundWorkerHandle *handle = NULL;
|
|
||||||
|
|
||||||
/* Configure a worker. */
|
SafeSnprintf(workerName, BGW_MAXLEN,
|
||||||
memset(&worker, 0, sizeof(worker));
|
|
||||||
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
|
|
||||||
"Citus Background Task Queue Monitor: %u/%u",
|
"Citus Background Task Queue Monitor: %u/%u",
|
||||||
database, extensionOwner);
|
database, extensionOwner);
|
||||||
worker.bgw_flags =
|
|
||||||
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
|
||||||
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
|
||||||
|
|
||||||
/* don't restart, we manage restarts from maintenance daemon */
|
CitusBackgroundWorkerConfig config = {
|
||||||
worker.bgw_restart_time = BGW_NEVER_RESTART;
|
.workerName = workerName,
|
||||||
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
|
.functionName = "CitusBackgroundTaskQueueMonitorMain",
|
||||||
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
.mainArg = ObjectIdGetDatum(MyDatabaseId),
|
||||||
"CitusBackgroundTaskQueueMonitorMain");
|
.extensionOwner = extensionOwner,
|
||||||
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
|
.needsNotification = true,
|
||||||
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
|
.waitForStartup = true,
|
||||||
sizeof(Oid));
|
.restartTime = CITUS_BGW_NEVER_RESTART,
|
||||||
worker.bgw_notify_pid = MyProcPid;
|
.startTime = CITUS_BGW_DEFAULT_START_TIME,
|
||||||
|
.workerType = NULL, /* use default */
|
||||||
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
.extraData = NULL,
|
||||||
{
|
.extraDataSize = 0
|
||||||
return NULL;
|
};
|
||||||
}
|
return RegisterCitusBackgroundWorker(&config);
|
||||||
|
|
||||||
pid_t pid;
|
|
||||||
WaitForBackgroundWorkerStartup(handle, &pid);
|
|
||||||
|
|
||||||
return handle;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1661,33 +1651,31 @@ StartCitusBackgroundTaskExecutor(char *database, char *user, char *command,
|
||||||
{
|
{
|
||||||
dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId, jobId);
|
dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId, jobId);
|
||||||
|
|
||||||
/* Configure a worker. */
|
char workerName[BGW_MAXLEN];
|
||||||
BackgroundWorker worker = { 0 };
|
SafeSnprintf(workerName, BGW_MAXLEN,
|
||||||
memset(&worker, 0, sizeof(worker));
|
|
||||||
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
|
|
||||||
"Citus Background Task Queue Executor: %s/%s for (%ld/%ld)",
|
"Citus Background Task Queue Executor: %s/%s for (%ld/%ld)",
|
||||||
database, user, jobId, taskId);
|
database, user, jobId, taskId);
|
||||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
|
||||||
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
|
||||||
|
|
||||||
/* don't restart, we manage restarts from maintenance daemon */
|
CitusBackgroundWorkerConfig config = {
|
||||||
worker.bgw_restart_time = BGW_NEVER_RESTART;
|
.workerName = workerName,
|
||||||
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
|
.functionName = "CitusBackgroundTaskExecutor",
|
||||||
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
.mainArg = UInt32GetDatum(dsm_segment_handle(seg)),
|
||||||
"CitusBackgroundTaskExecutor");
|
.extensionOwner = InvalidOid,
|
||||||
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
|
.needsNotification = true,
|
||||||
worker.bgw_notify_pid = MyProcPid;
|
.waitForStartup = true,
|
||||||
|
.restartTime = CITUS_BGW_NEVER_RESTART,
|
||||||
BackgroundWorkerHandle *handle = NULL;
|
.startTime = CITUS_BGW_DEFAULT_START_TIME,
|
||||||
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
.workerType = NULL, /* use default */
|
||||||
|
.extraData = NULL,
|
||||||
|
.extraDataSize = 0
|
||||||
|
};
|
||||||
|
BackgroundWorkerHandle *handle = RegisterCitusBackgroundWorker(&config);
|
||||||
|
if (!handle)
|
||||||
{
|
{
|
||||||
dsm_detach(seg);
|
dsm_detach(seg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pid_t pid = { 0 };
|
|
||||||
WaitForBackgroundWorkerStartup(handle, &pid);
|
|
||||||
|
|
||||||
if (pSegment)
|
if (pSegment)
|
||||||
{
|
{
|
||||||
*pSegment = seg;
|
*pSegment = seg;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,123 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* background_worker_utils.c
|
||||||
|
* Common utilities for initializing PostgreSQL background workers
|
||||||
|
* used by Citus distributed infrastructure.
|
||||||
|
*
|
||||||
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
#include "postmaster/bgworker.h"
|
||||||
|
#include "storage/proc.h"
|
||||||
|
|
||||||
|
#include "distributed/background_worker_utils.h"
|
||||||
|
#include "distributed/citus_safe_lib.h"
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InitializeCitusBackgroundWorker initializes a BackgroundWorker struct
|
||||||
|
* with common Citus background worker settings.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
InitializeCitusBackgroundWorker(BackgroundWorker *worker,
|
||||||
|
const CitusBackgroundWorkerConfig *config)
|
||||||
|
{
|
||||||
|
Assert(worker != NULL);
|
||||||
|
Assert(config != NULL);
|
||||||
|
Assert(config->workerName != NULL);
|
||||||
|
Assert(config->functionName != NULL);
|
||||||
|
|
||||||
|
/* Initialize the worker structure */
|
||||||
|
memset(worker, 0, sizeof(BackgroundWorker));
|
||||||
|
|
||||||
|
/* Set worker name */
|
||||||
|
strcpy_s(worker->bgw_name, sizeof(worker->bgw_name), config->workerName);
|
||||||
|
|
||||||
|
/* Set worker type if provided */
|
||||||
|
if (config->workerType != NULL)
|
||||||
|
{
|
||||||
|
strcpy_s(worker->bgw_type, sizeof(worker->bgw_type), config->workerType);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Set standard flags for Citus workers */
|
||||||
|
worker->bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||||
|
|
||||||
|
/* Set start time - use custom start time if provided, otherwise use default */
|
||||||
|
worker->bgw_start_time = (config->startTime != 0) ? config->startTime :
|
||||||
|
CITUS_BGW_DEFAULT_START_TIME;
|
||||||
|
|
||||||
|
/* Set restart behavior */
|
||||||
|
worker->bgw_restart_time = config->restartTime;
|
||||||
|
|
||||||
|
/* Set library and function names */
|
||||||
|
strcpy_s(worker->bgw_library_name, sizeof(worker->bgw_library_name), "citus");
|
||||||
|
strcpy_s(worker->bgw_function_name, sizeof(worker->bgw_function_name),
|
||||||
|
config->functionName);
|
||||||
|
|
||||||
|
/* Set main argument */
|
||||||
|
worker->bgw_main_arg = config->mainArg;
|
||||||
|
|
||||||
|
/* Set extension owner if provided */
|
||||||
|
if (OidIsValid(config->extensionOwner))
|
||||||
|
{
|
||||||
|
memcpy_s(worker->bgw_extra, sizeof(worker->bgw_extra),
|
||||||
|
&config->extensionOwner, sizeof(Oid));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Set additional extra data if provided */
|
||||||
|
if (config->extraData != NULL && config->extraDataSize > 0)
|
||||||
|
{
|
||||||
|
size_t remainingSpace = sizeof(worker->bgw_extra);
|
||||||
|
size_t usedSpace = OidIsValid(config->extensionOwner) ? sizeof(Oid) : 0;
|
||||||
|
|
||||||
|
if (usedSpace + config->extraDataSize <= remainingSpace)
|
||||||
|
{
|
||||||
|
memcpy_s(((char *) worker->bgw_extra) + usedSpace,
|
||||||
|
remainingSpace - usedSpace,
|
||||||
|
config->extraData,
|
||||||
|
config->extraDataSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Set notification PID if needed */
|
||||||
|
if (config->needsNotification)
|
||||||
|
{
|
||||||
|
worker->bgw_notify_pid = MyProcPid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RegisterCitusBackgroundWorker creates and registers a Citus background worker
|
||||||
|
* with the specified configuration. Returns the worker handle on success,
|
||||||
|
* NULL on failure.
|
||||||
|
*/
|
||||||
|
BackgroundWorkerHandle *
|
||||||
|
RegisterCitusBackgroundWorker(const CitusBackgroundWorkerConfig *config)
|
||||||
|
{
|
||||||
|
BackgroundWorker worker;
|
||||||
|
BackgroundWorkerHandle *handle = NULL;
|
||||||
|
|
||||||
|
/* Initialize the worker structure */
|
||||||
|
InitializeCitusBackgroundWorker(&worker, config);
|
||||||
|
|
||||||
|
/* Register the background worker */
|
||||||
|
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Wait for startup if requested */
|
||||||
|
if (config->waitForStartup && handle != NULL)
|
||||||
|
{
|
||||||
|
pid_t pid = 0;
|
||||||
|
WaitForBackgroundWorkerStartup(handle, &pid);
|
||||||
|
}
|
||||||
|
|
||||||
|
return handle;
|
||||||
|
}
|
||||||
|
|
@ -48,6 +48,7 @@
|
||||||
#include "pg_version_constants.h"
|
#include "pg_version_constants.h"
|
||||||
|
|
||||||
#include "distributed/background_jobs.h"
|
#include "distributed/background_jobs.h"
|
||||||
|
#include "distributed/background_worker_utils.h"
|
||||||
#include "distributed/citus_safe_lib.h"
|
#include "distributed/citus_safe_lib.h"
|
||||||
#include "distributed/coordinator_protocol.h"
|
#include "distributed/coordinator_protocol.h"
|
||||||
#include "distributed/distributed_deadlock_detection.h"
|
#include "distributed/distributed_deadlock_detection.h"
|
||||||
|
|
@ -188,32 +189,21 @@ InitializeMaintenanceDaemonForMainDb(void)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CitusBackgroundWorkerConfig config = {
|
||||||
|
.workerName = "Citus Maintenance Daemon for Main DB",
|
||||||
|
.functionName = "CitusMaintenanceDaemonMain",
|
||||||
|
.mainArg = (Datum) 0,
|
||||||
|
.extensionOwner = InvalidOid,
|
||||||
|
.needsNotification = false,
|
||||||
|
.waitForStartup = false,
|
||||||
|
.restartTime = CITUS_BGW_DEFAULT_RESTART_TIME,
|
||||||
|
.startTime = CITUS_BGW_DEFAULT_START_TIME,
|
||||||
|
.workerType = NULL, /* use default */
|
||||||
|
.extraData = NULL,
|
||||||
|
.extraDataSize = 0
|
||||||
|
};
|
||||||
BackgroundWorker worker;
|
BackgroundWorker worker;
|
||||||
|
InitializeCitusBackgroundWorker(&worker, &config);
|
||||||
memset(&worker, 0, sizeof(worker));
|
|
||||||
|
|
||||||
|
|
||||||
strcpy_s(worker.bgw_name, sizeof(worker.bgw_name),
|
|
||||||
"Citus Maintenance Daemon for Main DB");
|
|
||||||
|
|
||||||
/* request ability to connect to target database */
|
|
||||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* No point in getting started before able to run query, but we do
|
|
||||||
* want to get started on Hot-Standby.
|
|
||||||
*/
|
|
||||||
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
|
||||||
|
|
||||||
/* Restart after a bit after errors, but don't bog the system. */
|
|
||||||
worker.bgw_restart_time = 5;
|
|
||||||
strcpy_s(worker.bgw_library_name,
|
|
||||||
sizeof(worker.bgw_library_name), "citus");
|
|
||||||
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
|
||||||
"CitusMaintenanceDaemonMain");
|
|
||||||
|
|
||||||
worker.bgw_main_arg = (Datum) 0;
|
|
||||||
|
|
||||||
RegisterBackgroundWorker(&worker);
|
RegisterBackgroundWorker(&worker);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -256,37 +246,28 @@ InitializeMaintenanceDaemonBackend(void)
|
||||||
{
|
{
|
||||||
Assert(dbData->workerPid == 0);
|
Assert(dbData->workerPid == 0);
|
||||||
|
|
||||||
BackgroundWorker worker;
|
char workerName[BGW_MAXLEN];
|
||||||
BackgroundWorkerHandle *handle = NULL;
|
|
||||||
|
|
||||||
memset(&worker, 0, sizeof(worker));
|
SafeSnprintf(workerName, sizeof(workerName),
|
||||||
|
|
||||||
SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name),
|
|
||||||
"Citus Maintenance Daemon: %u/%u",
|
"Citus Maintenance Daemon: %u/%u",
|
||||||
MyDatabaseId, extensionOwner);
|
MyDatabaseId, extensionOwner);
|
||||||
|
|
||||||
/* request ability to connect to target database */
|
CitusBackgroundWorkerConfig config = {
|
||||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
.workerName = workerName,
|
||||||
|
.functionName = "CitusMaintenanceDaemonMain",
|
||||||
|
.mainArg = ObjectIdGetDatum(MyDatabaseId),
|
||||||
|
.extensionOwner = extensionOwner,
|
||||||
|
.needsNotification = true,
|
||||||
|
.waitForStartup = true,
|
||||||
|
.restartTime = CITUS_BGW_DEFAULT_RESTART_TIME,
|
||||||
|
.startTime = CITUS_BGW_DEFAULT_START_TIME,
|
||||||
|
.workerType = NULL, /* use default */
|
||||||
|
.extraData = NULL,
|
||||||
|
.extraDataSize = 0
|
||||||
|
};
|
||||||
|
BackgroundWorkerHandle *handle = RegisterCitusBackgroundWorker(&config);
|
||||||
|
|
||||||
/*
|
if (!handle)
|
||||||
* No point in getting started before able to run query, but we do
|
|
||||||
* want to get started on Hot-Standby.
|
|
||||||
*/
|
|
||||||
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
|
||||||
|
|
||||||
/* Restart after a bit after errors, but don't bog the system. */
|
|
||||||
worker.bgw_restart_time = 5;
|
|
||||||
strcpy_s(worker.bgw_library_name,
|
|
||||||
sizeof(worker.bgw_library_name), "citus");
|
|
||||||
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
|
||||||
"CitusMaintenanceDaemonMain");
|
|
||||||
|
|
||||||
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
|
|
||||||
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
|
|
||||||
sizeof(Oid));
|
|
||||||
worker.bgw_notify_pid = MyProcPid;
|
|
||||||
|
|
||||||
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
|
||||||
{
|
{
|
||||||
WarnMaintenanceDaemonNotStarted();
|
WarnMaintenanceDaemonNotStarted();
|
||||||
dbData->daemonStarted = false;
|
dbData->daemonStarted = false;
|
||||||
|
|
@ -301,9 +282,6 @@ InitializeMaintenanceDaemonBackend(void)
|
||||||
dbData->triggerNodeMetadataSync = false;
|
dbData->triggerNodeMetadataSync = false;
|
||||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||||
|
|
||||||
pid_t pid;
|
|
||||||
WaitForBackgroundWorkerStartup(handle, &pid);
|
|
||||||
|
|
||||||
pfree(handle);
|
pfree(handle);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,59 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* background_worker_utils.h
|
||||||
|
* Common utilities for initializing PostgreSQL background workers
|
||||||
|
* used by Citus distributed infrastructure.
|
||||||
|
*
|
||||||
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef BACKGROUND_WORKER_UTILS_H
|
||||||
|
#define BACKGROUND_WORKER_UTILS_H
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "postmaster/bgworker.h"
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Background worker configuration parameters
|
||||||
|
*/
|
||||||
|
typedef struct CitusBackgroundWorkerConfig
|
||||||
|
{
|
||||||
|
/* Worker identification */
|
||||||
|
const char *workerName;
|
||||||
|
const char *functionName;
|
||||||
|
const char *workerType;
|
||||||
|
|
||||||
|
/* Worker parameters */
|
||||||
|
Datum mainArg;
|
||||||
|
Oid extensionOwner;
|
||||||
|
|
||||||
|
/* Worker behavior flags */
|
||||||
|
bool needsNotification;
|
||||||
|
bool waitForStartup;
|
||||||
|
int restartTime;
|
||||||
|
|
||||||
|
/* Worker timing */
|
||||||
|
BgWorkerStartTime startTime;
|
||||||
|
|
||||||
|
/* Optional extra data */
|
||||||
|
const void *extraData;
|
||||||
|
size_t extraDataSize;
|
||||||
|
} CitusBackgroundWorkerConfig;
|
||||||
|
|
||||||
|
/* Default configuration values */
|
||||||
|
#define CITUS_BGW_DEFAULT_RESTART_TIME 5
|
||||||
|
#define CITUS_BGW_NEVER_RESTART BGW_NEVER_RESTART
|
||||||
|
#define CITUS_BGW_DEFAULT_START_TIME BgWorkerStart_ConsistentState
|
||||||
|
|
||||||
|
/* Function declarations */
|
||||||
|
extern BackgroundWorkerHandle * RegisterCitusBackgroundWorker(const
|
||||||
|
CitusBackgroundWorkerConfig
|
||||||
|
*config);
|
||||||
|
|
||||||
|
extern void InitializeCitusBackgroundWorker(BackgroundWorker *worker,
|
||||||
|
const CitusBackgroundWorkerConfig *config);
|
||||||
|
|
||||||
|
#endif /* BACKGROUND_WORKER_UTILS_H */
|
||||||
Loading…
Reference in New Issue