Merge branch 'main' into ihalatci-extension-compat-test-report

pull/8048/head
ibrahim halatci 2025-08-13 19:27:45 +03:00 committed by GitHub
commit cf9a4476e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 698 additions and 194 deletions

View File

@ -58,6 +58,7 @@
#include "distributed/argutils.h"
#include "distributed/backend_data.h"
#include "distributed/background_worker_utils.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
@ -3156,37 +3157,26 @@ MetadataSyncSigAlrmHandler(SIGNAL_ARGS)
BackgroundWorkerHandle *
SpawnSyncNodeMetadataToNodes(Oid database, Oid extensionOwner)
{
BackgroundWorker worker;
BackgroundWorkerHandle *handle = NULL;
char workerName[BGW_MAXLEN];
/* Configure a worker. */
memset(&worker, 0, sizeof(worker));
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
SafeSnprintf(workerName, BGW_MAXLEN,
"Citus Metadata Sync: %u/%u",
database, extensionOwner);
worker.bgw_flags =
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
/* don't restart, we manage restarts from maintenance daemon */
worker.bgw_restart_time = BGW_NEVER_RESTART;
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
"SyncNodeMetadataToNodesMain");
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
sizeof(Oid));
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
{
return NULL;
}
pid_t pid;
WaitForBackgroundWorkerStartup(handle, &pid);
return handle;
CitusBackgroundWorkerConfig config = {
.workerName = workerName,
.functionName = "SyncNodeMetadataToNodesMain",
.mainArg = ObjectIdGetDatum(MyDatabaseId),
.extensionOwner = extensionOwner,
.needsNotification = true,
.waitForStartup = false,
.restartTime = CITUS_BGW_NEVER_RESTART,
.startTime = CITUS_BGW_DEFAULT_START_TIME,
.workerType = NULL, /* use default */
.extraData = NULL,
.extraDataSize = 0
};
return RegisterCitusBackgroundWorker(&config);
}

View File

@ -3130,10 +3130,10 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC
/* 2. insert new task */
{
Datum values[Natts_pg_dist_background_task] = { 0 };
bool nulls[Natts_pg_dist_background_task] = { 0 };
TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTask);
memset(nulls, true, sizeof(nulls));
Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *nulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
int64 taskId = GetNextBackgroundTaskTaskId();
@ -3164,15 +3164,17 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC
values[Anum_pg_dist_background_task_message - 1] = CStringGetTextDatum("");
nulls[Anum_pg_dist_background_task_message - 1] = false;
values[Anum_pg_dist_background_task_nodes_involved - 1] =
IntArrayToDatum(nodesInvolvedCount, nodesInvolved);
nulls[Anum_pg_dist_background_task_nodes_involved - 1] =
(nodesInvolvedCount == 0);
int nodesInvolvedIndex =
GetNodesInvolvedAttrIndexInPgDistBackgroundTask(tupleDescriptor);
values[nodesInvolvedIndex] = IntArrayToDatum(nodesInvolvedCount, nodesInvolved);
nulls[nodesInvolvedIndex] = (nodesInvolvedCount == 0);
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask),
values, nulls);
HeapTuple newTuple = heap_form_tuple(tupleDescriptor, values, nulls);
CatalogTupleInsert(pgDistBackgroundTask, newTuple);
pfree(values);
pfree(nulls);
task = palloc0(sizeof(BackgroundTask));
task->taskid = taskId;
task->status = BACKGROUND_TASK_STATUS_RUNNABLE;
@ -3285,11 +3287,12 @@ ResetRunningBackgroundTasks(void)
List *taskIdsToWait = NIL;
while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor)))
{
Datum values[Natts_pg_dist_background_task] = { 0 };
bool isnull[Natts_pg_dist_background_task] = { 0 };
bool replace[Natts_pg_dist_background_task] = { 0 };
TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks);
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(taskTuple, tupleDescriptor, values, isnull);
values[Anum_pg_dist_background_task_status - 1] =
@ -3358,6 +3361,10 @@ ResetRunningBackgroundTasks(void)
replace);
CatalogTupleUpdate(pgDistBackgroundTasks, &taskTuple->t_self, taskTuple);
pfree(values);
pfree(isnull);
pfree(replace);
}
if (list_length(taskIdsToWait) > 0)
@ -3441,8 +3448,9 @@ DeformBackgroundJobHeapTuple(TupleDesc tupleDescriptor, HeapTuple jobTuple)
static BackgroundTask *
DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple)
{
Datum values[Natts_pg_dist_background_task] = { 0 };
bool nulls[Natts_pg_dist_background_task] = { 0 };
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *nulls = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls);
BackgroundTask *task = palloc0(sizeof(BackgroundTask));
@ -3480,13 +3488,18 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple)
TextDatumGetCString(values[Anum_pg_dist_background_task_message - 1]);
}
if (!nulls[Anum_pg_dist_background_task_nodes_involved - 1])
int nodesInvolvedIndex =
GetNodesInvolvedAttrIndexInPgDistBackgroundTask(tupleDescriptor);
if (!nulls[nodesInvolvedIndex])
{
ArrayType *nodesInvolvedArrayObject =
DatumGetArrayTypeP(values[Anum_pg_dist_background_task_nodes_involved - 1]);
DatumGetArrayTypeP(values[nodesInvolvedIndex]);
task->nodesInvolved = IntegerArrayTypeToList(nodesInvolvedArrayObject);
}
pfree(values);
pfree(nulls);
return task;
}
@ -3751,8 +3764,8 @@ JobTasksStatusCount(int64 jobId)
HeapTuple heapTuple = NULL;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
Datum values[Natts_pg_dist_background_task] = { 0 };
bool isnull[Natts_pg_dist_background_task] = { 0 };
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
@ -3760,6 +3773,9 @@ JobTasksStatusCount(int64 jobId)
1]);
BackgroundTaskStatus status = BackgroundTaskStatusByOid(statusOid);
pfree(values);
pfree(isnull);
switch (status)
{
case BACKGROUND_TASK_STATUS_BLOCKED:
@ -4012,9 +4028,9 @@ UpdateBackgroundJob(int64 jobId)
UINT64_FORMAT, jobId)));
}
Datum values[Natts_pg_dist_background_task] = { 0 };
bool isnull[Natts_pg_dist_background_task] = { 0 };
bool replace[Natts_pg_dist_background_task] = { 0 };
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
@ -4058,6 +4074,10 @@ UpdateBackgroundJob(int64 jobId)
systable_endscan(scanDescriptor);
table_close(pgDistBackgroundJobs, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
}
@ -4093,9 +4113,9 @@ UpdateBackgroundTask(BackgroundTask *task)
task->jobid, task->taskid)));
}
Datum values[Natts_pg_dist_background_task] = { 0 };
bool isnull[Natts_pg_dist_background_task] = { 0 };
bool replace[Natts_pg_dist_background_task] = { 0 };
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
@ -4164,6 +4184,10 @@ UpdateBackgroundTask(BackgroundTask *task)
systable_endscan(scanDescriptor);
table_close(pgDistBackgroundTasks, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
}
@ -4251,9 +4275,10 @@ CancelTasksForJob(int64 jobid)
HeapTuple taskTuple = NULL;
while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor)))
{
Datum values[Natts_pg_dist_background_task] = { 0 };
bool nulls[Natts_pg_dist_background_task] = { 0 };
bool replace[Natts_pg_dist_background_task] = { 0 };
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *nulls = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls);
Oid statusOid =
@ -4302,6 +4327,10 @@ CancelTasksForJob(int64 jobid)
taskTuple = heap_modify_tuple(taskTuple, tupleDescriptor, values, nulls,
replace);
CatalogTupleUpdate(pgDistBackgroundTasks, &taskTuple->t_self, taskTuple);
pfree(values);
pfree(nulls);
pfree(replace);
}
systable_endscan(scanDescriptor);
@ -4358,9 +4387,9 @@ UnscheduleDependentTasks(BackgroundTask *task)
"task_id: " UINT64_FORMAT, cTaskId)));
}
Datum values[Natts_pg_dist_background_task] = { 0 };
bool isnull[Natts_pg_dist_background_task] = { 0 };
bool replace[Natts_pg_dist_background_task] = { 0 };
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
values[Anum_pg_dist_background_task_status - 1] =
ObjectIdGetDatum(CitusTaskStatusUnscheduledId());
@ -4372,6 +4401,10 @@ UnscheduleDependentTasks(BackgroundTask *task)
CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple);
systable_endscan(scanDescriptor);
pfree(values);
pfree(isnull);
pfree(replace);
}
}
@ -4457,3 +4490,23 @@ GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc)
? (Anum_pg_dist_partition_autoconverted - 1)
: tupleDesc->natts - 1;
}
/*
* GetNodesInvolvedAttrIndexInPgDistBackgroundTask returns attrnum for nodes_involved attr.
*
* nodes_involved attr was added to table pg_dist_background_task using alter operation after
* the version where Citus started supporting downgrades, and it's only column that we've
* introduced to pg_dist_background_task since then.
*
* And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than
* Natts_pg_dist_background_task and when this happens, then we know that attrnum nodes_involved is
* not Anum_pg_dist_background_task_nodes_involved anymore but tupleDesc->natts - 1.
*/
int
GetNodesInvolvedAttrIndexInPgDistBackgroundTask(TupleDesc tupleDesc)
{
return TupleDescSize(tupleDesc) == Natts_pg_dist_background_task
? (Anum_pg_dist_background_task_nodes_involved - 1)
: tupleDesc->natts - 1;
}

View File

@ -4583,11 +4583,10 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
else if (rangeTableEntry->rtekind == RTE_CTE)
{
/*
* When outerVars are considered, we modify parentQueryList, so this
* logic might need to change when we support outervars in CTEs.
* Resolve through a CTE even when skipOuterVars == false.
* Maintain the invariant that each recursion level owns a private,
* correctly-bounded copy of parentQueryList.
*/
Assert(skipOuterVars);
int cteParentListIndex = list_length(parentQueryList) -
rangeTableEntry->ctelevelsup - 1;
Query *cteParentQuery = NULL;
@ -4618,14 +4617,34 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
if (cte != NULL)
{
Query *cteQuery = (Query *) cte->ctequery;
List *targetEntryList = cteQuery->targetList;
AttrNumber targetEntryIndex = candidateColumn->varattno - 1;
TargetEntry *targetEntry = list_nth(targetEntryList, targetEntryIndex);
parentQueryList = lappend(parentQueryList, query);
FindReferencedTableColumn(targetEntry->expr, parentQueryList,
cteQuery, column, rteContainingReferencedColumn,
skipOuterVars);
if (targetEntryIndex >= 0 &&
targetEntryIndex < list_length(cteQuery->targetList))
{
TargetEntry *targetEntry =
list_nth(cteQuery->targetList, targetEntryIndex);
/* Build a private, bounded parentQueryList before recursing into the CTE.
* Invariant: list is [top current], owned by this call (no aliasing).
* For RTE_CTE:
* owner_idx = list_length(parentQueryList) - rangeTableEntry->ctelevelsup - 1;
* newParent = lappend(list_truncate(list_copy(parentQueryList), owner_idx + 1), query);
* Example (Q0 owns CTE; were in Q2 via nested subquery):
* parent=[Q0,Q1,Q2], ctelevelsup=2 owner_idx=0 newParent=[Q0,Q2].
* Keeps outer-Var level math correct without mutating the callers list.
*/
List *newParent = list_copy(parentQueryList);
newParent = list_truncate(newParent, cteParentListIndex + 1);
newParent = lappend(newParent, query);
FindReferencedTableColumn(targetEntry->expr,
newParent,
cteQuery,
column,
rteContainingReferencedColumn,
skipOuterVars);
}
}
}
}

View File

@ -33,6 +33,7 @@
#include "storage/latch.h"
#include "utils/snapmgr.h"
#include "distributed/background_worker_utils.h"
#include "distributed/citus_acquire_lock.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/connection_management.h"
@ -65,34 +66,33 @@ static bool got_sigterm = false;
BackgroundWorkerHandle *
StartLockAcquireHelperBackgroundWorker(int backendToHelp, int32 lock_cooldown)
{
BackgroundWorkerHandle *handle = NULL;
LockAcquireHelperArgs args;
BackgroundWorker worker;
memset(&args, 0, sizeof(args));
memset(&worker, 0, sizeof(worker));
/* collect the extra arguments required for the background worker */
args.DatabaseId = MyDatabaseId;
args.lock_cooldown = lock_cooldown;
/* construct the background worker and start it */
SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name),
char workerName[BGW_MAXLEN];
SafeSnprintf(workerName, BGW_MAXLEN,
"Citus Lock Acquire Helper: %d/%u", backendToHelp, MyDatabaseId);
strcpy_s(worker.bgw_type, sizeof(worker.bgw_type), "citus_lock_aqcuire");
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
CitusBackgroundWorkerConfig config = {
.workerName = workerName,
.functionName = "LockAcquireHelperMain",
.mainArg = Int32GetDatum(backendToHelp),
.extensionOwner = InvalidOid,
.needsNotification = false,
.waitForStartup = false,
.restartTime = CITUS_BGW_NEVER_RESTART,
.startTime = BgWorkerStart_RecoveryFinished,
.workerType = "citus_lock_aqcuire",
.extraData = &args,
.extraDataSize = sizeof(args)
};
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_function_name),
"LockAcquireHelperMain");
worker.bgw_main_arg = Int32GetDatum(backendToHelp);
worker.bgw_notify_pid = 0;
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &args, sizeof(args));
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
BackgroundWorkerHandle *handle = RegisterCitusBackgroundWorker(&config);
if (!handle)
{
return NULL;
}

View File

@ -56,6 +56,7 @@
#include "utils/timeout.h"
#include "distributed/background_jobs.h"
#include "distributed/background_worker_utils.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/hash_helpers.h"
#include "distributed/listutils.h"
@ -417,37 +418,26 @@ citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus)
BackgroundWorkerHandle *
StartCitusBackgroundTaskQueueMonitor(Oid database, Oid extensionOwner)
{
BackgroundWorker worker = { 0 };
BackgroundWorkerHandle *handle = NULL;
char workerName[BGW_MAXLEN];
/* Configure a worker. */
memset(&worker, 0, sizeof(worker));
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
SafeSnprintf(workerName, BGW_MAXLEN,
"Citus Background Task Queue Monitor: %u/%u",
database, extensionOwner);
worker.bgw_flags =
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
/* don't restart, we manage restarts from maintenance daemon */
worker.bgw_restart_time = BGW_NEVER_RESTART;
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
"CitusBackgroundTaskQueueMonitorMain");
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
sizeof(Oid));
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
{
return NULL;
}
pid_t pid;
WaitForBackgroundWorkerStartup(handle, &pid);
return handle;
CitusBackgroundWorkerConfig config = {
.workerName = workerName,
.functionName = "CitusBackgroundTaskQueueMonitorMain",
.mainArg = ObjectIdGetDatum(MyDatabaseId),
.extensionOwner = extensionOwner,
.needsNotification = true,
.waitForStartup = true,
.restartTime = CITUS_BGW_NEVER_RESTART,
.startTime = CITUS_BGW_DEFAULT_START_TIME,
.workerType = NULL, /* use default */
.extraData = NULL,
.extraDataSize = 0
};
return RegisterCitusBackgroundWorker(&config);
}
@ -1661,33 +1651,31 @@ StartCitusBackgroundTaskExecutor(char *database, char *user, char *command,
{
dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId, jobId);
/* Configure a worker. */
BackgroundWorker worker = { 0 };
memset(&worker, 0, sizeof(worker));
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
char workerName[BGW_MAXLEN];
SafeSnprintf(workerName, BGW_MAXLEN,
"Citus Background Task Queue Executor: %s/%s for (%ld/%ld)",
database, user, jobId, taskId);
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
/* don't restart, we manage restarts from maintenance daemon */
worker.bgw_restart_time = BGW_NEVER_RESTART;
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
"CitusBackgroundTaskExecutor");
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
worker.bgw_notify_pid = MyProcPid;
BackgroundWorkerHandle *handle = NULL;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
CitusBackgroundWorkerConfig config = {
.workerName = workerName,
.functionName = "CitusBackgroundTaskExecutor",
.mainArg = UInt32GetDatum(dsm_segment_handle(seg)),
.extensionOwner = InvalidOid,
.needsNotification = true,
.waitForStartup = true,
.restartTime = CITUS_BGW_NEVER_RESTART,
.startTime = CITUS_BGW_DEFAULT_START_TIME,
.workerType = NULL, /* use default */
.extraData = NULL,
.extraDataSize = 0
};
BackgroundWorkerHandle *handle = RegisterCitusBackgroundWorker(&config);
if (!handle)
{
dsm_detach(seg);
return NULL;
}
pid_t pid = { 0 };
WaitForBackgroundWorkerStartup(handle, &pid);
if (pSegment)
{
*pSegment = seg;

View File

@ -0,0 +1,123 @@
/*-------------------------------------------------------------------------
*
* background_worker_utils.c
* Common utilities for initializing PostgreSQL background workers
* used by Citus distributed infrastructure.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "storage/proc.h"
#include "distributed/background_worker_utils.h"
#include "distributed/citus_safe_lib.h"
/*
* InitializeCitusBackgroundWorker initializes a BackgroundWorker struct
* with common Citus background worker settings.
*/
void
InitializeCitusBackgroundWorker(BackgroundWorker *worker,
const CitusBackgroundWorkerConfig *config)
{
Assert(worker != NULL);
Assert(config != NULL);
Assert(config->workerName != NULL);
Assert(config->functionName != NULL);
/* Initialize the worker structure */
memset(worker, 0, sizeof(BackgroundWorker));
/* Set worker name */
strcpy_s(worker->bgw_name, sizeof(worker->bgw_name), config->workerName);
/* Set worker type if provided */
if (config->workerType != NULL)
{
strcpy_s(worker->bgw_type, sizeof(worker->bgw_type), config->workerType);
}
/* Set standard flags for Citus workers */
worker->bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
/* Set start time - use custom start time if provided, otherwise use default */
worker->bgw_start_time = (config->startTime != 0) ? config->startTime :
CITUS_BGW_DEFAULT_START_TIME;
/* Set restart behavior */
worker->bgw_restart_time = config->restartTime;
/* Set library and function names */
strcpy_s(worker->bgw_library_name, sizeof(worker->bgw_library_name), "citus");
strcpy_s(worker->bgw_function_name, sizeof(worker->bgw_function_name),
config->functionName);
/* Set main argument */
worker->bgw_main_arg = config->mainArg;
/* Set extension owner if provided */
if (OidIsValid(config->extensionOwner))
{
memcpy_s(worker->bgw_extra, sizeof(worker->bgw_extra),
&config->extensionOwner, sizeof(Oid));
}
/* Set additional extra data if provided */
if (config->extraData != NULL && config->extraDataSize > 0)
{
size_t remainingSpace = sizeof(worker->bgw_extra);
size_t usedSpace = OidIsValid(config->extensionOwner) ? sizeof(Oid) : 0;
if (usedSpace + config->extraDataSize <= remainingSpace)
{
memcpy_s(((char *) worker->bgw_extra) + usedSpace,
remainingSpace - usedSpace,
config->extraData,
config->extraDataSize);
}
}
/* Set notification PID if needed */
if (config->needsNotification)
{
worker->bgw_notify_pid = MyProcPid;
}
}
/*
* RegisterCitusBackgroundWorker creates and registers a Citus background worker
* with the specified configuration. Returns the worker handle on success,
* NULL on failure.
*/
BackgroundWorkerHandle *
RegisterCitusBackgroundWorker(const CitusBackgroundWorkerConfig *config)
{
BackgroundWorker worker;
BackgroundWorkerHandle *handle = NULL;
/* Initialize the worker structure */
InitializeCitusBackgroundWorker(&worker, config);
/* Register the background worker */
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
{
return NULL;
}
/* Wait for startup if requested */
if (config->waitForStartup && handle != NULL)
{
pid_t pid = 0;
WaitForBackgroundWorkerStartup(handle, &pid);
}
return handle;
}

View File

@ -48,6 +48,7 @@
#include "pg_version_constants.h"
#include "distributed/background_jobs.h"
#include "distributed/background_worker_utils.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/distributed_deadlock_detection.h"
@ -188,32 +189,21 @@ InitializeMaintenanceDaemonForMainDb(void)
return;
}
CitusBackgroundWorkerConfig config = {
.workerName = "Citus Maintenance Daemon for Main DB",
.functionName = "CitusMaintenanceDaemonMain",
.mainArg = (Datum) 0,
.extensionOwner = InvalidOid,
.needsNotification = false,
.waitForStartup = false,
.restartTime = CITUS_BGW_DEFAULT_RESTART_TIME,
.startTime = CITUS_BGW_DEFAULT_START_TIME,
.workerType = NULL, /* use default */
.extraData = NULL,
.extraDataSize = 0
};
BackgroundWorker worker;
memset(&worker, 0, sizeof(worker));
strcpy_s(worker.bgw_name, sizeof(worker.bgw_name),
"Citus Maintenance Daemon for Main DB");
/* request ability to connect to target database */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
/*
* No point in getting started before able to run query, but we do
* want to get started on Hot-Standby.
*/
worker.bgw_start_time = BgWorkerStart_ConsistentState;
/* Restart after a bit after errors, but don't bog the system. */
worker.bgw_restart_time = 5;
strcpy_s(worker.bgw_library_name,
sizeof(worker.bgw_library_name), "citus");
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
"CitusMaintenanceDaemonMain");
worker.bgw_main_arg = (Datum) 0;
InitializeCitusBackgroundWorker(&worker, &config);
RegisterBackgroundWorker(&worker);
}
@ -256,37 +246,28 @@ InitializeMaintenanceDaemonBackend(void)
{
Assert(dbData->workerPid == 0);
BackgroundWorker worker;
BackgroundWorkerHandle *handle = NULL;
char workerName[BGW_MAXLEN];
memset(&worker, 0, sizeof(worker));
SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name),
SafeSnprintf(workerName, sizeof(workerName),
"Citus Maintenance Daemon: %u/%u",
MyDatabaseId, extensionOwner);
/* request ability to connect to target database */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
CitusBackgroundWorkerConfig config = {
.workerName = workerName,
.functionName = "CitusMaintenanceDaemonMain",
.mainArg = ObjectIdGetDatum(MyDatabaseId),
.extensionOwner = extensionOwner,
.needsNotification = true,
.waitForStartup = true,
.restartTime = CITUS_BGW_DEFAULT_RESTART_TIME,
.startTime = CITUS_BGW_DEFAULT_START_TIME,
.workerType = NULL, /* use default */
.extraData = NULL,
.extraDataSize = 0
};
BackgroundWorkerHandle *handle = RegisterCitusBackgroundWorker(&config);
/*
* No point in getting started before able to run query, but we do
* want to get started on Hot-Standby.
*/
worker.bgw_start_time = BgWorkerStart_ConsistentState;
/* Restart after a bit after errors, but don't bog the system. */
worker.bgw_restart_time = 5;
strcpy_s(worker.bgw_library_name,
sizeof(worker.bgw_library_name), "citus");
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
"CitusMaintenanceDaemonMain");
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
sizeof(Oid));
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
if (!handle)
{
WarnMaintenanceDaemonNotStarted();
dbData->daemonStarted = false;
@ -301,9 +282,6 @@ InitializeMaintenanceDaemonBackend(void)
dbData->triggerNodeMetadataSync = false;
LWLockRelease(&MaintenanceDaemonControl->lock);
pid_t pid;
WaitForBackgroundWorkerStartup(handle, &pid);
pfree(handle);
}
else

View File

@ -0,0 +1,59 @@
/*-------------------------------------------------------------------------
*
* background_worker_utils.h
* Common utilities for initializing PostgreSQL background workers
* used by Citus distributed infrastructure.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef BACKGROUND_WORKER_UTILS_H
#define BACKGROUND_WORKER_UTILS_H
#include "postgres.h"
#include "postmaster/bgworker.h"
/*
* Background worker configuration parameters
*/
typedef struct CitusBackgroundWorkerConfig
{
/* Worker identification */
const char *workerName;
const char *functionName;
const char *workerType;
/* Worker parameters */
Datum mainArg;
Oid extensionOwner;
/* Worker behavior flags */
bool needsNotification;
bool waitForStartup;
int restartTime;
/* Worker timing */
BgWorkerStartTime startTime;
/* Optional extra data */
const void *extraData;
size_t extraDataSize;
} CitusBackgroundWorkerConfig;
/* Default configuration values */
#define CITUS_BGW_DEFAULT_RESTART_TIME 5
#define CITUS_BGW_NEVER_RESTART BGW_NEVER_RESTART
#define CITUS_BGW_DEFAULT_START_TIME BgWorkerStart_ConsistentState
/* Function declarations */
extern BackgroundWorkerHandle * RegisterCitusBackgroundWorker(const
CitusBackgroundWorkerConfig
*config);
extern void InitializeCitusBackgroundWorker(BackgroundWorker *worker,
const CitusBackgroundWorkerConfig *config);
#endif /* BACKGROUND_WORKER_UTILS_H */

View File

@ -27,4 +27,6 @@
#define Anum_pg_dist_background_task_message 9
#define Anum_pg_dist_background_task_nodes_involved 10
extern int GetNodesInvolvedAttrIndexInPgDistBackgroundTask(TupleDesc tupleDesc);
#endif /* CITUS_PG_DIST_BACKGROUND_TASK_H */

View File

@ -328,3 +328,19 @@ s/\| CHECK ([a-zA-Z])(.*)/| CHECK \(\1\2\)/g
# pg18 change: strip trailing “.00” (or “.0…”) from actual rows counts
s/(actual rows=[0-9]+)\.[0-9]+/\1/g
# pg18 “Disabled” change start
# ignore any “Disabled:” lines in test output
/^\s*Disabled:/d
# ignore any JSON-style Disabled field
/^\s*"Disabled":/d
# ignore XML <Disabled>true</Disabled> or <Disabled>false</Disabled>
/^\s*<Disabled>.*<\/Disabled>/d
# pg18 “Disabled” change end
# PG18 psql: headings changed from "List of relations" to per-type titles
s/^([ \t]*)List of tables$/\1List of relations/g
s/^([ \t]*)List of indexes$/\1List of relations/g
s/^([ \t]*)List of sequences$/\1List of relations/g

View File

@ -3637,5 +3637,157 @@ SELECT id, val FROM version_dist_union ORDER BY id;
(6 rows)
-- End of Issue #7784
-- PR #8106 — CTE traversal works when following outer Vars
-- This script exercises three shapes:
-- T1) CTE referenced inside a correlated subquery (one level down)
-- T2) CTE referenced inside a nested subquery (two levels down)
-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
CREATE SCHEMA pr8106_cte_outervar;
SET search_path = pr8106_cte_outervar, public;
-- Base tables for the tests
DROP TABLE IF EXISTS raw_events_first CASCADE;
NOTICE: table "raw_events_first" does not exist, skipping
DROP TABLE IF EXISTS agg_events CASCADE;
NOTICE: table "agg_events" does not exist, skipping
CREATE TABLE raw_events_first(
user_id int,
value_1 int
);
CREATE TABLE agg_events(
user_id int,
value_1_agg int
);
-- Distribute and colocate (distribution key = user_id)
SELECT create_distributed_table('raw_events_first', 'user_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('agg_events', 'user_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Seed data (duplicates on some user_ids; some NULL value_1s)
INSERT INTO raw_events_first(user_id, value_1) VALUES
(1, 10), (1, 20), (1, NULL),
(2, NULL),
(3, 30),
(4, NULL),
(5, 50), (5, NULL),
(6, NULL);
---------------------------------------------------------------------
-- T1) CTE referenced inside a correlated subquery (one level down)
---------------------------------------------------------------------
TRUNCATE agg_events;
WITH c AS MATERIALIZED (
SELECT user_id FROM raw_events_first
)
INSERT INTO agg_events (user_id)
SELECT t.user_id
FROM raw_events_first t
WHERE EXISTS (SELECT 1 FROM c WHERE c.user_id = t.user_id);
-- Expect one insert per row in raw_events_first (EXISTS always true per user_id)
SELECT 't1_count_matches' AS test,
(SELECT count(*) FROM agg_events) =
(SELECT count(*) FROM raw_events_first) AS ok;
test | ok
---------------------------------------------------------------------
t1_count_matches | t
(1 row)
-- Spot-check: how many rows were inserted
SELECT 't1_rows' AS test, count(*) AS rows FROM agg_events;
test | rows
---------------------------------------------------------------------
t1_rows | 9
(1 row)
---------------------------------------------------------------------
-- T2) CTE referenced inside a nested subquery (two levels down)
---------------------------------------------------------------------
TRUNCATE agg_events;
WITH c AS MATERIALIZED (
SELECT user_id FROM raw_events_first
)
INSERT INTO agg_events (user_id)
SELECT t.user_id
FROM raw_events_first t
WHERE EXISTS (
SELECT 1
FROM (SELECT user_id FROM c) c2
WHERE c2.user_id = t.user_id
);
-- Same cardinality expectation as T1
SELECT 't2_count_matches' AS test,
(SELECT count(*) FROM agg_events) =
(SELECT count(*) FROM raw_events_first) AS ok;
test | ok
---------------------------------------------------------------------
t2_count_matches | t
(1 row)
SELECT 't2_rows' AS test, count(*) AS rows FROM agg_events;
test | rows
---------------------------------------------------------------------
t2_rows | 9
(1 row)
---------------------------------------------------------------------
-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
-- (use MAX() to keep scalar subquery single-row)
---------------------------------------------------------------------
TRUNCATE agg_events;
WITH c AS (SELECT user_id, value_1 FROM raw_events_first)
INSERT INTO agg_events (user_id, value_1_agg)
SELECT d.user_id, d.value_1_agg
FROM (
SELECT t.user_id,
(SELECT max(c.value_1) FROM c WHERE c.user_id = t.user_id) AS value_1_agg
FROM raw_events_first t
) AS d
WHERE d.value_1_agg IS NOT NULL;
-- Expect one insert per row in raw_events_first whose user_id has at least one non-NULL value_1
SELECT 't3_count_matches' AS test,
(SELECT count(*) FROM agg_events) =
(
SELECT count(*)
FROM raw_events_first t
WHERE EXISTS (
SELECT 1 FROM raw_events_first c
WHERE c.user_id = t.user_id AND c.value_1 IS NOT NULL
)
) AS ok;
test | ok
---------------------------------------------------------------------
t3_count_matches | t
(1 row)
-- Also verify no NULLs were inserted into value_1_agg
SELECT 't3_no_null_value_1_agg' AS test,
NOT EXISTS (SELECT 1 FROM agg_events WHERE value_1_agg IS NULL) AS ok;
test | ok
---------------------------------------------------------------------
t3_no_null_value_1_agg | t
(1 row)
-- Deterministic sample of results
SELECT 't3_sample' AS test, user_id, value_1_agg
FROM agg_events
ORDER BY user_id
LIMIT 5;
test | user_id | value_1_agg
---------------------------------------------------------------------
t3_sample | 1 | 20
t3_sample | 1 | 20
t3_sample | 1 | 20
t3_sample | 3 | 30
t3_sample | 5 | 50
(5 rows)
-- End of PR #8106 — CTE traversal works when following outer Vars
SET client_min_messages TO ERROR;
DROP SCHEMA pr8106_cte_outervar CASCADE;
DROP SCHEMA multi_insert_select CASCADE;

View File

@ -1262,15 +1262,17 @@ SET search_path TO pg17;
ALTER TABLE distributed_partitioned_table DROP CONSTRAINT dist_exclude_named;
ALTER TABLE local_partitioned_table DROP CONSTRAINT local_exclude_named;
-- Step 10: Verify the constraints were dropped
SELECT * FROM pg_constraint WHERE conname = 'dist_exclude_named' AND contype = 'x';
oid | conname | connamespace | contype | condeferrable | condeferred | convalidated | conrelid | contypid | conindid | conparentid | confrelid | confupdtype | confdeltype | confmatchtype | conislocal | coninhcount | connoinherit | conkey | confkey | conpfeqop | conppeqop | conffeqop | confdelsetcols | conexclop | conbin
SELECT COUNT(*) FROM pg_constraint WHERE conname = 'dist_exclude_named' AND contype = 'x';
count
---------------------------------------------------------------------
(0 rows)
0
(1 row)
SELECT * FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x';
oid | conname | connamespace | contype | condeferrable | condeferred | convalidated | conrelid | contypid | conindid | conparentid | confrelid | confupdtype | confdeltype | confmatchtype | conislocal | coninhcount | connoinherit | conkey | confkey | conpfeqop | conppeqop | conffeqop | confdelsetcols | conexclop | conbin
SELECT COUNT(*) FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x';
count
---------------------------------------------------------------------
(0 rows)
0
(1 row)
-- Step 11: Clean up - Drop the tables
DROP TABLE distributed_partitioned_table CASCADE;

View File

@ -2583,5 +2583,127 @@ SELECT id, val FROM version_dist_union ORDER BY id;
-- End of Issue #7784
-- PR #8106 — CTE traversal works when following outer Vars
-- This script exercises three shapes:
-- T1) CTE referenced inside a correlated subquery (one level down)
-- T2) CTE referenced inside a nested subquery (two levels down)
-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
CREATE SCHEMA pr8106_cte_outervar;
SET search_path = pr8106_cte_outervar, public;
-- Base tables for the tests
DROP TABLE IF EXISTS raw_events_first CASCADE;
DROP TABLE IF EXISTS agg_events CASCADE;
CREATE TABLE raw_events_first(
user_id int,
value_1 int
);
CREATE TABLE agg_events(
user_id int,
value_1_agg int
);
-- Distribute and colocate (distribution key = user_id)
SELECT create_distributed_table('raw_events_first', 'user_id');
SELECT create_distributed_table('agg_events', 'user_id');
-- Seed data (duplicates on some user_ids; some NULL value_1s)
INSERT INTO raw_events_first(user_id, value_1) VALUES
(1, 10), (1, 20), (1, NULL),
(2, NULL),
(3, 30),
(4, NULL),
(5, 50), (5, NULL),
(6, NULL);
----------------------------------------------------------------------
-- T1) CTE referenced inside a correlated subquery (one level down)
----------------------------------------------------------------------
TRUNCATE agg_events;
WITH c AS MATERIALIZED (
SELECT user_id FROM raw_events_first
)
INSERT INTO agg_events (user_id)
SELECT t.user_id
FROM raw_events_first t
WHERE EXISTS (SELECT 1 FROM c WHERE c.user_id = t.user_id);
-- Expect one insert per row in raw_events_first (EXISTS always true per user_id)
SELECT 't1_count_matches' AS test,
(SELECT count(*) FROM agg_events) =
(SELECT count(*) FROM raw_events_first) AS ok;
-- Spot-check: how many rows were inserted
SELECT 't1_rows' AS test, count(*) AS rows FROM agg_events;
----------------------------------------------------------------------
-- T2) CTE referenced inside a nested subquery (two levels down)
----------------------------------------------------------------------
TRUNCATE agg_events;
WITH c AS MATERIALIZED (
SELECT user_id FROM raw_events_first
)
INSERT INTO agg_events (user_id)
SELECT t.user_id
FROM raw_events_first t
WHERE EXISTS (
SELECT 1
FROM (SELECT user_id FROM c) c2
WHERE c2.user_id = t.user_id
);
-- Same cardinality expectation as T1
SELECT 't2_count_matches' AS test,
(SELECT count(*) FROM agg_events) =
(SELECT count(*) FROM raw_events_first) AS ok;
SELECT 't2_rows' AS test, count(*) AS rows FROM agg_events;
----------------------------------------------------------------------
-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
-- (use MAX() to keep scalar subquery single-row)
----------------------------------------------------------------------
TRUNCATE agg_events;
WITH c AS (SELECT user_id, value_1 FROM raw_events_first)
INSERT INTO agg_events (user_id, value_1_agg)
SELECT d.user_id, d.value_1_agg
FROM (
SELECT t.user_id,
(SELECT max(c.value_1) FROM c WHERE c.user_id = t.user_id) AS value_1_agg
FROM raw_events_first t
) AS d
WHERE d.value_1_agg IS NOT NULL;
-- Expect one insert per row in raw_events_first whose user_id has at least one non-NULL value_1
SELECT 't3_count_matches' AS test,
(SELECT count(*) FROM agg_events) =
(
SELECT count(*)
FROM raw_events_first t
WHERE EXISTS (
SELECT 1 FROM raw_events_first c
WHERE c.user_id = t.user_id AND c.value_1 IS NOT NULL
)
) AS ok;
-- Also verify no NULLs were inserted into value_1_agg
SELECT 't3_no_null_value_1_agg' AS test,
NOT EXISTS (SELECT 1 FROM agg_events WHERE value_1_agg IS NULL) AS ok;
-- Deterministic sample of results
SELECT 't3_sample' AS test, user_id, value_1_agg
FROM agg_events
ORDER BY user_id
LIMIT 5;
-- End of PR #8106 — CTE traversal works when following outer Vars
SET client_min_messages TO ERROR;
DROP SCHEMA pr8106_cte_outervar CASCADE;
DROP SCHEMA multi_insert_select CASCADE;

View File

@ -669,8 +669,8 @@ ALTER TABLE distributed_partitioned_table DROP CONSTRAINT dist_exclude_named;
ALTER TABLE local_partitioned_table DROP CONSTRAINT local_exclude_named;
-- Step 10: Verify the constraints were dropped
SELECT * FROM pg_constraint WHERE conname = 'dist_exclude_named' AND contype = 'x';
SELECT * FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x';
SELECT COUNT(*) FROM pg_constraint WHERE conname = 'dist_exclude_named' AND contype = 'x';
SELECT COUNT(*) FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x';
-- Step 11: Clean up - Drop the tables
DROP TABLE distributed_partitioned_table CASCADE;