diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c
index e3b655ab0..15dd6375a 100644
--- a/src/backend/distributed/metadata/metadata_sync.c
+++ b/src/backend/distributed/metadata/metadata_sync.c
@@ -58,6 +58,7 @@
#include "distributed/argutils.h"
#include "distributed/backend_data.h"
+#include "distributed/background_worker_utils.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
@@ -3156,37 +3157,26 @@ MetadataSyncSigAlrmHandler(SIGNAL_ARGS)
BackgroundWorkerHandle *
SpawnSyncNodeMetadataToNodes(Oid database, Oid extensionOwner)
{
- BackgroundWorker worker;
- BackgroundWorkerHandle *handle = NULL;
+ char workerName[BGW_MAXLEN];
- /* Configure a worker. */
- memset(&worker, 0, sizeof(worker));
- SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
+ SafeSnprintf(workerName, BGW_MAXLEN,
"Citus Metadata Sync: %u/%u",
database, extensionOwner);
- worker.bgw_flags =
- BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
- worker.bgw_start_time = BgWorkerStart_ConsistentState;
- /* don't restart, we manage restarts from maintenance daemon */
- worker.bgw_restart_time = BGW_NEVER_RESTART;
- strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
- strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
- "SyncNodeMetadataToNodesMain");
- worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
- memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
- sizeof(Oid));
- worker.bgw_notify_pid = MyProcPid;
-
- if (!RegisterDynamicBackgroundWorker(&worker, &handle))
- {
- return NULL;
- }
-
- pid_t pid;
- WaitForBackgroundWorkerStartup(handle, &pid);
-
- return handle;
+ CitusBackgroundWorkerConfig config = {
+ .workerName = workerName,
+ .functionName = "SyncNodeMetadataToNodesMain",
+ .mainArg = ObjectIdGetDatum(MyDatabaseId),
+ .extensionOwner = extensionOwner,
+ .needsNotification = true,
+ .waitForStartup = false,
+ .restartTime = CITUS_BGW_NEVER_RESTART,
+ .startTime = CITUS_BGW_DEFAULT_START_TIME,
+ .workerType = NULL, /* use default */
+ .extraData = NULL,
+ .extraDataSize = 0
+ };
+ return RegisterCitusBackgroundWorker(&config);
}
diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c
index 2b8bd0d1c..b84260f9e 100644
--- a/src/backend/distributed/metadata/metadata_utility.c
+++ b/src/backend/distributed/metadata/metadata_utility.c
@@ -3130,10 +3130,10 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC
/* 2. insert new task */
{
- Datum values[Natts_pg_dist_background_task] = { 0 };
- bool nulls[Natts_pg_dist_background_task] = { 0 };
+ TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTask);
- memset(nulls, true, sizeof(nulls));
+ Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
+ bool *nulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
int64 taskId = GetNextBackgroundTaskTaskId();
@@ -3164,15 +3164,17 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC
values[Anum_pg_dist_background_task_message - 1] = CStringGetTextDatum("");
nulls[Anum_pg_dist_background_task_message - 1] = false;
- values[Anum_pg_dist_background_task_nodes_involved - 1] =
- IntArrayToDatum(nodesInvolvedCount, nodesInvolved);
- nulls[Anum_pg_dist_background_task_nodes_involved - 1] =
- (nodesInvolvedCount == 0);
+ int nodesInvolvedIndex =
+ GetNodesInvolvedAttrIndexInPgDistBackgroundTask(tupleDescriptor);
+ values[nodesInvolvedIndex] = IntArrayToDatum(nodesInvolvedCount, nodesInvolved);
+ nulls[nodesInvolvedIndex] = (nodesInvolvedCount == 0);
- HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask),
- values, nulls);
+ HeapTuple newTuple = heap_form_tuple(tupleDescriptor, values, nulls);
CatalogTupleInsert(pgDistBackgroundTask, newTuple);
+ pfree(values);
+ pfree(nulls);
+
task = palloc0(sizeof(BackgroundTask));
task->taskid = taskId;
task->status = BACKGROUND_TASK_STATUS_RUNNABLE;
@@ -3285,11 +3287,12 @@ ResetRunningBackgroundTasks(void)
List *taskIdsToWait = NIL;
while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor)))
{
- Datum values[Natts_pg_dist_background_task] = { 0 };
- bool isnull[Natts_pg_dist_background_task] = { 0 };
- bool replace[Natts_pg_dist_background_task] = { 0 };
-
TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks);
+
+ Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
+ bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
+ bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
+
heap_deform_tuple(taskTuple, tupleDescriptor, values, isnull);
values[Anum_pg_dist_background_task_status - 1] =
@@ -3358,6 +3361,10 @@ ResetRunningBackgroundTasks(void)
replace);
CatalogTupleUpdate(pgDistBackgroundTasks, &taskTuple->t_self, taskTuple);
+
+ pfree(values);
+ pfree(isnull);
+ pfree(replace);
}
if (list_length(taskIdsToWait) > 0)
@@ -3441,8 +3448,9 @@ DeformBackgroundJobHeapTuple(TupleDesc tupleDescriptor, HeapTuple jobTuple)
static BackgroundTask *
DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple)
{
- Datum values[Natts_pg_dist_background_task] = { 0 };
- bool nulls[Natts_pg_dist_background_task] = { 0 };
+ Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
+ bool *nulls = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
+
heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls);
BackgroundTask *task = palloc0(sizeof(BackgroundTask));
@@ -3480,13 +3488,18 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple)
TextDatumGetCString(values[Anum_pg_dist_background_task_message - 1]);
}
- if (!nulls[Anum_pg_dist_background_task_nodes_involved - 1])
+ int nodesInvolvedIndex =
+ GetNodesInvolvedAttrIndexInPgDistBackgroundTask(tupleDescriptor);
+ if (!nulls[nodesInvolvedIndex])
{
ArrayType *nodesInvolvedArrayObject =
- DatumGetArrayTypeP(values[Anum_pg_dist_background_task_nodes_involved - 1]);
+ DatumGetArrayTypeP(values[nodesInvolvedIndex]);
task->nodesInvolved = IntegerArrayTypeToList(nodesInvolvedArrayObject);
}
+ pfree(values);
+ pfree(nulls);
+
return task;
}
@@ -3751,8 +3764,8 @@ JobTasksStatusCount(int64 jobId)
HeapTuple heapTuple = NULL;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
- Datum values[Natts_pg_dist_background_task] = { 0 };
- bool isnull[Natts_pg_dist_background_task] = { 0 };
+ Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
+ bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
@@ -3760,6 +3773,9 @@ JobTasksStatusCount(int64 jobId)
1]);
BackgroundTaskStatus status = BackgroundTaskStatusByOid(statusOid);
+ pfree(values);
+ pfree(isnull);
+
switch (status)
{
case BACKGROUND_TASK_STATUS_BLOCKED:
@@ -4012,9 +4028,9 @@ UpdateBackgroundJob(int64 jobId)
UINT64_FORMAT, jobId)));
}
- Datum values[Natts_pg_dist_background_task] = { 0 };
- bool isnull[Natts_pg_dist_background_task] = { 0 };
- bool replace[Natts_pg_dist_background_task] = { 0 };
+ Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
+ bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
+ bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
@@ -4058,6 +4074,10 @@ UpdateBackgroundJob(int64 jobId)
systable_endscan(scanDescriptor);
table_close(pgDistBackgroundJobs, NoLock);
+
+ pfree(values);
+ pfree(isnull);
+ pfree(replace);
}
@@ -4093,9 +4113,9 @@ UpdateBackgroundTask(BackgroundTask *task)
task->jobid, task->taskid)));
}
- Datum values[Natts_pg_dist_background_task] = { 0 };
- bool isnull[Natts_pg_dist_background_task] = { 0 };
- bool replace[Natts_pg_dist_background_task] = { 0 };
+ Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
+ bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
+ bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
@@ -4164,6 +4184,10 @@ UpdateBackgroundTask(BackgroundTask *task)
systable_endscan(scanDescriptor);
table_close(pgDistBackgroundTasks, NoLock);
+
+ pfree(values);
+ pfree(isnull);
+ pfree(replace);
}
@@ -4251,9 +4275,10 @@ CancelTasksForJob(int64 jobid)
HeapTuple taskTuple = NULL;
while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor)))
{
- Datum values[Natts_pg_dist_background_task] = { 0 };
- bool nulls[Natts_pg_dist_background_task] = { 0 };
- bool replace[Natts_pg_dist_background_task] = { 0 };
+ Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
+ bool *nulls = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
+ bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
+
heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls);
Oid statusOid =
@@ -4302,6 +4327,10 @@ CancelTasksForJob(int64 jobid)
taskTuple = heap_modify_tuple(taskTuple, tupleDescriptor, values, nulls,
replace);
CatalogTupleUpdate(pgDistBackgroundTasks, &taskTuple->t_self, taskTuple);
+
+ pfree(values);
+ pfree(nulls);
+ pfree(replace);
}
systable_endscan(scanDescriptor);
@@ -4358,9 +4387,9 @@ UnscheduleDependentTasks(BackgroundTask *task)
"task_id: " UINT64_FORMAT, cTaskId)));
}
- Datum values[Natts_pg_dist_background_task] = { 0 };
- bool isnull[Natts_pg_dist_background_task] = { 0 };
- bool replace[Natts_pg_dist_background_task] = { 0 };
+ Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
+ bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
+ bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
values[Anum_pg_dist_background_task_status - 1] =
ObjectIdGetDatum(CitusTaskStatusUnscheduledId());
@@ -4372,6 +4401,10 @@ UnscheduleDependentTasks(BackgroundTask *task)
CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple);
systable_endscan(scanDescriptor);
+
+ pfree(values);
+ pfree(isnull);
+ pfree(replace);
}
}
@@ -4457,3 +4490,23 @@ GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc)
? (Anum_pg_dist_partition_autoconverted - 1)
: tupleDesc->natts - 1;
}
+
+
+/*
+ * GetNodesInvolvedAttrIndexInPgDistBackgroundTask returns attrnum for nodes_involved attr.
+ *
+ * nodes_involved attr was added to table pg_dist_background_task using alter operation after
+ * the version where Citus started supporting downgrades, and it's only column that we've
+ * introduced to pg_dist_background_task since then.
+ *
+ * And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than
+ * Natts_pg_dist_background_task and when this happens, then we know that attrnum nodes_involved is
+ * not Anum_pg_dist_background_task_nodes_involved anymore but tupleDesc->natts - 1.
+ */
+int
+GetNodesInvolvedAttrIndexInPgDistBackgroundTask(TupleDesc tupleDesc)
+{
+ return TupleDescSize(tupleDesc) == Natts_pg_dist_background_task
+ ? (Anum_pg_dist_background_task_nodes_involved - 1)
+ : tupleDesc->natts - 1;
+}
diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c
index 7deced084..c0679c14e 100644
--- a/src/backend/distributed/planner/multi_logical_optimizer.c
+++ b/src/backend/distributed/planner/multi_logical_optimizer.c
@@ -4583,11 +4583,10 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
else if (rangeTableEntry->rtekind == RTE_CTE)
{
/*
- * When outerVars are considered, we modify parentQueryList, so this
- * logic might need to change when we support outervars in CTEs.
+ * Resolve through a CTE even when skipOuterVars == false.
+ * Maintain the invariant that each recursion level owns a private,
+ * correctly-bounded copy of parentQueryList.
*/
- Assert(skipOuterVars);
-
int cteParentListIndex = list_length(parentQueryList) -
rangeTableEntry->ctelevelsup - 1;
Query *cteParentQuery = NULL;
@@ -4618,14 +4617,34 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
if (cte != NULL)
{
Query *cteQuery = (Query *) cte->ctequery;
- List *targetEntryList = cteQuery->targetList;
AttrNumber targetEntryIndex = candidateColumn->varattno - 1;
- TargetEntry *targetEntry = list_nth(targetEntryList, targetEntryIndex);
- parentQueryList = lappend(parentQueryList, query);
- FindReferencedTableColumn(targetEntry->expr, parentQueryList,
- cteQuery, column, rteContainingReferencedColumn,
- skipOuterVars);
+ if (targetEntryIndex >= 0 &&
+ targetEntryIndex < list_length(cteQuery->targetList))
+ {
+ TargetEntry *targetEntry =
+ list_nth(cteQuery->targetList, targetEntryIndex);
+
+ /* Build a private, bounded parentQueryList before recursing into the CTE.
+ * Invariant: list is [top … current], owned by this call (no aliasing).
+ * For RTE_CTE:
+ * owner_idx = list_length(parentQueryList) - rangeTableEntry->ctelevelsup - 1;
+ * newParent = lappend(list_truncate(list_copy(parentQueryList), owner_idx + 1), query);
+ * Example (Q0 owns CTE; we’re in Q2 via nested subquery):
+ * parent=[Q0,Q1,Q2], ctelevelsup=2 ⇒ owner_idx=0 ⇒ newParent=[Q0,Q2].
+ * Keeps outer-Var level math correct without mutating the caller’s list.
+ */
+ List *newParent = list_copy(parentQueryList);
+ newParent = list_truncate(newParent, cteParentListIndex + 1);
+ newParent = lappend(newParent, query);
+
+ FindReferencedTableColumn(targetEntry->expr,
+ newParent,
+ cteQuery,
+ column,
+ rteContainingReferencedColumn,
+ skipOuterVars);
+ }
}
}
}
diff --git a/src/backend/distributed/utils/acquire_lock.c b/src/backend/distributed/utils/acquire_lock.c
index d0f6193c2..897374ed4 100644
--- a/src/backend/distributed/utils/acquire_lock.c
+++ b/src/backend/distributed/utils/acquire_lock.c
@@ -33,6 +33,7 @@
#include "storage/latch.h"
#include "utils/snapmgr.h"
+#include "distributed/background_worker_utils.h"
#include "distributed/citus_acquire_lock.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/connection_management.h"
@@ -65,34 +66,33 @@ static bool got_sigterm = false;
BackgroundWorkerHandle *
StartLockAcquireHelperBackgroundWorker(int backendToHelp, int32 lock_cooldown)
{
- BackgroundWorkerHandle *handle = NULL;
LockAcquireHelperArgs args;
- BackgroundWorker worker;
memset(&args, 0, sizeof(args));
- memset(&worker, 0, sizeof(worker));
/* collect the extra arguments required for the background worker */
args.DatabaseId = MyDatabaseId;
args.lock_cooldown = lock_cooldown;
- /* construct the background worker and start it */
- SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name),
+ char workerName[BGW_MAXLEN];
+ SafeSnprintf(workerName, BGW_MAXLEN,
"Citus Lock Acquire Helper: %d/%u", backendToHelp, MyDatabaseId);
- strcpy_s(worker.bgw_type, sizeof(worker.bgw_type), "citus_lock_aqcuire");
- worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
- worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
- worker.bgw_restart_time = BGW_NEVER_RESTART;
+ CitusBackgroundWorkerConfig config = {
+ .workerName = workerName,
+ .functionName = "LockAcquireHelperMain",
+ .mainArg = Int32GetDatum(backendToHelp),
+ .extensionOwner = InvalidOid,
+ .needsNotification = false,
+ .waitForStartup = false,
+ .restartTime = CITUS_BGW_NEVER_RESTART,
+ .startTime = BgWorkerStart_RecoveryFinished,
+ .workerType = "citus_lock_aqcuire",
+ .extraData = &args,
+ .extraDataSize = sizeof(args)
+ };
- strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
- strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_function_name),
- "LockAcquireHelperMain");
- worker.bgw_main_arg = Int32GetDatum(backendToHelp);
- worker.bgw_notify_pid = 0;
-
- memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &args, sizeof(args));
-
- if (!RegisterDynamicBackgroundWorker(&worker, &handle))
+ BackgroundWorkerHandle *handle = RegisterCitusBackgroundWorker(&config);
+ if (!handle)
{
return NULL;
}
diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c
index 2d0f03a4c..272f1d6d8 100644
--- a/src/backend/distributed/utils/background_jobs.c
+++ b/src/backend/distributed/utils/background_jobs.c
@@ -56,6 +56,7 @@
#include "utils/timeout.h"
#include "distributed/background_jobs.h"
+#include "distributed/background_worker_utils.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/hash_helpers.h"
#include "distributed/listutils.h"
@@ -417,37 +418,26 @@ citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus)
BackgroundWorkerHandle *
StartCitusBackgroundTaskQueueMonitor(Oid database, Oid extensionOwner)
{
- BackgroundWorker worker = { 0 };
- BackgroundWorkerHandle *handle = NULL;
+ char workerName[BGW_MAXLEN];
- /* Configure a worker. */
- memset(&worker, 0, sizeof(worker));
- SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
+ SafeSnprintf(workerName, BGW_MAXLEN,
"Citus Background Task Queue Monitor: %u/%u",
database, extensionOwner);
- worker.bgw_flags =
- BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
- worker.bgw_start_time = BgWorkerStart_ConsistentState;
- /* don't restart, we manage restarts from maintenance daemon */
- worker.bgw_restart_time = BGW_NEVER_RESTART;
- strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
- strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
- "CitusBackgroundTaskQueueMonitorMain");
- worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
- memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
- sizeof(Oid));
- worker.bgw_notify_pid = MyProcPid;
-
- if (!RegisterDynamicBackgroundWorker(&worker, &handle))
- {
- return NULL;
- }
-
- pid_t pid;
- WaitForBackgroundWorkerStartup(handle, &pid);
-
- return handle;
+ CitusBackgroundWorkerConfig config = {
+ .workerName = workerName,
+ .functionName = "CitusBackgroundTaskQueueMonitorMain",
+ .mainArg = ObjectIdGetDatum(MyDatabaseId),
+ .extensionOwner = extensionOwner,
+ .needsNotification = true,
+ .waitForStartup = true,
+ .restartTime = CITUS_BGW_NEVER_RESTART,
+ .startTime = CITUS_BGW_DEFAULT_START_TIME,
+ .workerType = NULL, /* use default */
+ .extraData = NULL,
+ .extraDataSize = 0
+ };
+ return RegisterCitusBackgroundWorker(&config);
}
@@ -1661,33 +1651,31 @@ StartCitusBackgroundTaskExecutor(char *database, char *user, char *command,
{
dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId, jobId);
- /* Configure a worker. */
- BackgroundWorker worker = { 0 };
- memset(&worker, 0, sizeof(worker));
- SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
+ char workerName[BGW_MAXLEN];
+ SafeSnprintf(workerName, BGW_MAXLEN,
"Citus Background Task Queue Executor: %s/%s for (%ld/%ld)",
database, user, jobId, taskId);
- worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
- worker.bgw_start_time = BgWorkerStart_ConsistentState;
- /* don't restart, we manage restarts from maintenance daemon */
- worker.bgw_restart_time = BGW_NEVER_RESTART;
- strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
- strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
- "CitusBackgroundTaskExecutor");
- worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
- worker.bgw_notify_pid = MyProcPid;
-
- BackgroundWorkerHandle *handle = NULL;
- if (!RegisterDynamicBackgroundWorker(&worker, &handle))
+ CitusBackgroundWorkerConfig config = {
+ .workerName = workerName,
+ .functionName = "CitusBackgroundTaskExecutor",
+ .mainArg = UInt32GetDatum(dsm_segment_handle(seg)),
+ .extensionOwner = InvalidOid,
+ .needsNotification = true,
+ .waitForStartup = true,
+ .restartTime = CITUS_BGW_NEVER_RESTART,
+ .startTime = CITUS_BGW_DEFAULT_START_TIME,
+ .workerType = NULL, /* use default */
+ .extraData = NULL,
+ .extraDataSize = 0
+ };
+ BackgroundWorkerHandle *handle = RegisterCitusBackgroundWorker(&config);
+ if (!handle)
{
dsm_detach(seg);
return NULL;
}
- pid_t pid = { 0 };
- WaitForBackgroundWorkerStartup(handle, &pid);
-
if (pSegment)
{
*pSegment = seg;
diff --git a/src/backend/distributed/utils/background_worker_utils.c b/src/backend/distributed/utils/background_worker_utils.c
new file mode 100644
index 000000000..b25ac556a
--- /dev/null
+++ b/src/backend/distributed/utils/background_worker_utils.c
@@ -0,0 +1,123 @@
+/*-------------------------------------------------------------------------
+ *
+ * background_worker_utils.c
+ * Common utilities for initializing PostgreSQL background workers
+ * used by Citus distributed infrastructure.
+ *
+ * Copyright (c) Citus Data, Inc.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+
+#include "postmaster/bgworker.h"
+#include "storage/proc.h"
+
+#include "distributed/background_worker_utils.h"
+#include "distributed/citus_safe_lib.h"
+
+/*
+ * InitializeCitusBackgroundWorker initializes a BackgroundWorker struct
+ * with common Citus background worker settings.
+ */
+void
+InitializeCitusBackgroundWorker(BackgroundWorker *worker,
+ const CitusBackgroundWorkerConfig *config)
+{
+ Assert(worker != NULL);
+ Assert(config != NULL);
+ Assert(config->workerName != NULL);
+ Assert(config->functionName != NULL);
+
+ /* Initialize the worker structure */
+ memset(worker, 0, sizeof(BackgroundWorker));
+
+ /* Set worker name */
+ strcpy_s(worker->bgw_name, sizeof(worker->bgw_name), config->workerName);
+
+ /* Set worker type if provided */
+ if (config->workerType != NULL)
+ {
+ strcpy_s(worker->bgw_type, sizeof(worker->bgw_type), config->workerType);
+ }
+
+ /* Set standard flags for Citus workers */
+ worker->bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+
+ /* Set start time - use custom start time if provided, otherwise use default */
+ worker->bgw_start_time = (config->startTime != 0) ? config->startTime :
+ CITUS_BGW_DEFAULT_START_TIME;
+
+ /* Set restart behavior */
+ worker->bgw_restart_time = config->restartTime;
+
+ /* Set library and function names */
+ strcpy_s(worker->bgw_library_name, sizeof(worker->bgw_library_name), "citus");
+ strcpy_s(worker->bgw_function_name, sizeof(worker->bgw_function_name),
+ config->functionName);
+
+ /* Set main argument */
+ worker->bgw_main_arg = config->mainArg;
+
+ /* Set extension owner if provided */
+ if (OidIsValid(config->extensionOwner))
+ {
+ memcpy_s(worker->bgw_extra, sizeof(worker->bgw_extra),
+ &config->extensionOwner, sizeof(Oid));
+ }
+
+ /* Set additional extra data if provided */
+ if (config->extraData != NULL && config->extraDataSize > 0)
+ {
+ size_t remainingSpace = sizeof(worker->bgw_extra);
+ size_t usedSpace = OidIsValid(config->extensionOwner) ? sizeof(Oid) : 0;
+
+ if (usedSpace + config->extraDataSize <= remainingSpace)
+ {
+ memcpy_s(((char *) worker->bgw_extra) + usedSpace,
+ remainingSpace - usedSpace,
+ config->extraData,
+ config->extraDataSize);
+ }
+ }
+
+ /* Set notification PID if needed */
+ if (config->needsNotification)
+ {
+ worker->bgw_notify_pid = MyProcPid;
+ }
+}
+
+
+/*
+ * RegisterCitusBackgroundWorker creates and registers a Citus background worker
+ * with the specified configuration. Returns the worker handle on success,
+ * NULL on failure.
+ */
+BackgroundWorkerHandle *
+RegisterCitusBackgroundWorker(const CitusBackgroundWorkerConfig *config)
+{
+ BackgroundWorker worker;
+ BackgroundWorkerHandle *handle = NULL;
+
+ /* Initialize the worker structure */
+ InitializeCitusBackgroundWorker(&worker, config);
+
+ /* Register the background worker */
+ if (!RegisterDynamicBackgroundWorker(&worker, &handle))
+ {
+ return NULL;
+ }
+
+ /* Wait for startup if requested */
+ if (config->waitForStartup && handle != NULL)
+ {
+ pid_t pid = 0;
+ WaitForBackgroundWorkerStartup(handle, &pid);
+ }
+
+ return handle;
+}
diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c
index e6bf3d00c..9ce29a99c 100644
--- a/src/backend/distributed/utils/maintenanced.c
+++ b/src/backend/distributed/utils/maintenanced.c
@@ -48,6 +48,7 @@
#include "pg_version_constants.h"
#include "distributed/background_jobs.h"
+#include "distributed/background_worker_utils.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/distributed_deadlock_detection.h"
@@ -188,32 +189,21 @@ InitializeMaintenanceDaemonForMainDb(void)
return;
}
+ CitusBackgroundWorkerConfig config = {
+ .workerName = "Citus Maintenance Daemon for Main DB",
+ .functionName = "CitusMaintenanceDaemonMain",
+ .mainArg = (Datum) 0,
+ .extensionOwner = InvalidOid,
+ .needsNotification = false,
+ .waitForStartup = false,
+ .restartTime = CITUS_BGW_DEFAULT_RESTART_TIME,
+ .startTime = CITUS_BGW_DEFAULT_START_TIME,
+ .workerType = NULL, /* use default */
+ .extraData = NULL,
+ .extraDataSize = 0
+ };
BackgroundWorker worker;
-
- memset(&worker, 0, sizeof(worker));
-
-
- strcpy_s(worker.bgw_name, sizeof(worker.bgw_name),
- "Citus Maintenance Daemon for Main DB");
-
- /* request ability to connect to target database */
- worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
-
- /*
- * No point in getting started before able to run query, but we do
- * want to get started on Hot-Standby.
- */
- worker.bgw_start_time = BgWorkerStart_ConsistentState;
-
- /* Restart after a bit after errors, but don't bog the system. */
- worker.bgw_restart_time = 5;
- strcpy_s(worker.bgw_library_name,
- sizeof(worker.bgw_library_name), "citus");
- strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
- "CitusMaintenanceDaemonMain");
-
- worker.bgw_main_arg = (Datum) 0;
-
+ InitializeCitusBackgroundWorker(&worker, &config);
RegisterBackgroundWorker(&worker);
}
@@ -256,37 +246,28 @@ InitializeMaintenanceDaemonBackend(void)
{
Assert(dbData->workerPid == 0);
- BackgroundWorker worker;
- BackgroundWorkerHandle *handle = NULL;
+ char workerName[BGW_MAXLEN];
- memset(&worker, 0, sizeof(worker));
-
- SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name),
+ SafeSnprintf(workerName, sizeof(workerName),
"Citus Maintenance Daemon: %u/%u",
MyDatabaseId, extensionOwner);
- /* request ability to connect to target database */
- worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+ CitusBackgroundWorkerConfig config = {
+ .workerName = workerName,
+ .functionName = "CitusMaintenanceDaemonMain",
+ .mainArg = ObjectIdGetDatum(MyDatabaseId),
+ .extensionOwner = extensionOwner,
+ .needsNotification = true,
+ .waitForStartup = true,
+ .restartTime = CITUS_BGW_DEFAULT_RESTART_TIME,
+ .startTime = CITUS_BGW_DEFAULT_START_TIME,
+ .workerType = NULL, /* use default */
+ .extraData = NULL,
+ .extraDataSize = 0
+ };
+ BackgroundWorkerHandle *handle = RegisterCitusBackgroundWorker(&config);
- /*
- * No point in getting started before able to run query, but we do
- * want to get started on Hot-Standby.
- */
- worker.bgw_start_time = BgWorkerStart_ConsistentState;
-
- /* Restart after a bit after errors, but don't bog the system. */
- worker.bgw_restart_time = 5;
- strcpy_s(worker.bgw_library_name,
- sizeof(worker.bgw_library_name), "citus");
- strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
- "CitusMaintenanceDaemonMain");
-
- worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
- memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
- sizeof(Oid));
- worker.bgw_notify_pid = MyProcPid;
-
- if (!RegisterDynamicBackgroundWorker(&worker, &handle))
+ if (!handle)
{
WarnMaintenanceDaemonNotStarted();
dbData->daemonStarted = false;
@@ -301,9 +282,6 @@ InitializeMaintenanceDaemonBackend(void)
dbData->triggerNodeMetadataSync = false;
LWLockRelease(&MaintenanceDaemonControl->lock);
- pid_t pid;
- WaitForBackgroundWorkerStartup(handle, &pid);
-
pfree(handle);
}
else
diff --git a/src/include/distributed/background_worker_utils.h b/src/include/distributed/background_worker_utils.h
new file mode 100644
index 000000000..c915b46b9
--- /dev/null
+++ b/src/include/distributed/background_worker_utils.h
@@ -0,0 +1,59 @@
+/*-------------------------------------------------------------------------
+ *
+ * background_worker_utils.h
+ * Common utilities for initializing PostgreSQL background workers
+ * used by Citus distributed infrastructure.
+ *
+ * Copyright (c) Citus Data, Inc.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef BACKGROUND_WORKER_UTILS_H
+#define BACKGROUND_WORKER_UTILS_H
+
+#include "postgres.h"
+
+#include "postmaster/bgworker.h"
+
+/*
+ * Background worker configuration parameters
+ */
+typedef struct CitusBackgroundWorkerConfig
+{
+ /* Worker identification */
+ const char *workerName;
+ const char *functionName;
+ const char *workerType;
+
+ /* Worker parameters */
+ Datum mainArg;
+ Oid extensionOwner;
+
+ /* Worker behavior flags */
+ bool needsNotification;
+ bool waitForStartup;
+ int restartTime;
+
+ /* Worker timing */
+ BgWorkerStartTime startTime;
+
+ /* Optional extra data */
+ const void *extraData;
+ size_t extraDataSize;
+} CitusBackgroundWorkerConfig;
+
+/* Default configuration values */
+#define CITUS_BGW_DEFAULT_RESTART_TIME 5
+#define CITUS_BGW_NEVER_RESTART BGW_NEVER_RESTART
+#define CITUS_BGW_DEFAULT_START_TIME BgWorkerStart_ConsistentState
+
+/* Function declarations */
+extern BackgroundWorkerHandle * RegisterCitusBackgroundWorker(const
+ CitusBackgroundWorkerConfig
+ *config);
+
+extern void InitializeCitusBackgroundWorker(BackgroundWorker *worker,
+ const CitusBackgroundWorkerConfig *config);
+
+#endif /* BACKGROUND_WORKER_UTILS_H */
diff --git a/src/include/distributed/pg_dist_background_task.h b/src/include/distributed/pg_dist_background_task.h
index 9e6673a64..60e30e638 100644
--- a/src/include/distributed/pg_dist_background_task.h
+++ b/src/include/distributed/pg_dist_background_task.h
@@ -27,4 +27,6 @@
#define Anum_pg_dist_background_task_message 9
#define Anum_pg_dist_background_task_nodes_involved 10
+extern int GetNodesInvolvedAttrIndexInPgDistBackgroundTask(TupleDesc tupleDesc);
+
#endif /* CITUS_PG_DIST_BACKGROUND_TASK_H */
diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed
index 48396fcfd..38fd45837 100644
--- a/src/test/regress/bin/normalize.sed
+++ b/src/test/regress/bin/normalize.sed
@@ -328,3 +328,19 @@ s/\| CHECK ([a-zA-Z])(.*)/| CHECK \(\1\2\)/g
# pg18 change: strip trailing “.00” (or “.0…”) from actual rows counts
s/(actual rows=[0-9]+)\.[0-9]+/\1/g
+
+# pg18 “Disabled” change start
+# ignore any “Disabled:” lines in test output
+/^\s*Disabled:/d
+
+# ignore any JSON-style Disabled field
+/^\s*"Disabled":/d
+
+# ignore XML true or false
+/^\s*.*<\/Disabled>/d
+# pg18 “Disabled” change end
+
+# PG18 psql: headings changed from "List of relations" to per-type titles
+s/^([ \t]*)List of tables$/\1List of relations/g
+s/^([ \t]*)List of indexes$/\1List of relations/g
+s/^([ \t]*)List of sequences$/\1List of relations/g
diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out
index 00e335a82..76538f22d 100644
--- a/src/test/regress/expected/multi_insert_select.out
+++ b/src/test/regress/expected/multi_insert_select.out
@@ -3637,5 +3637,157 @@ SELECT id, val FROM version_dist_union ORDER BY id;
(6 rows)
-- End of Issue #7784
+-- PR #8106 — CTE traversal works when following outer Vars
+-- This script exercises three shapes:
+-- T1) CTE referenced inside a correlated subquery (one level down)
+-- T2) CTE referenced inside a nested subquery (two levels down)
+-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
+CREATE SCHEMA pr8106_cte_outervar;
+SET search_path = pr8106_cte_outervar, public;
+-- Base tables for the tests
+DROP TABLE IF EXISTS raw_events_first CASCADE;
+NOTICE: table "raw_events_first" does not exist, skipping
+DROP TABLE IF EXISTS agg_events CASCADE;
+NOTICE: table "agg_events" does not exist, skipping
+CREATE TABLE raw_events_first(
+ user_id int,
+ value_1 int
+);
+CREATE TABLE agg_events(
+ user_id int,
+ value_1_agg int
+);
+-- Distribute and colocate (distribution key = user_id)
+SELECT create_distributed_table('raw_events_first', 'user_id');
+ create_distributed_table
+---------------------------------------------------------------------
+
+(1 row)
+
+SELECT create_distributed_table('agg_events', 'user_id');
+ create_distributed_table
+---------------------------------------------------------------------
+
+(1 row)
+
+-- Seed data (duplicates on some user_ids; some NULL value_1’s)
+INSERT INTO raw_events_first(user_id, value_1) VALUES
+ (1, 10), (1, 20), (1, NULL),
+ (2, NULL),
+ (3, 30),
+ (4, NULL),
+ (5, 50), (5, NULL),
+ (6, NULL);
+---------------------------------------------------------------------
+-- T1) CTE referenced inside a correlated subquery (one level down)
+---------------------------------------------------------------------
+TRUNCATE agg_events;
+WITH c AS MATERIALIZED (
+ SELECT user_id FROM raw_events_first
+)
+INSERT INTO agg_events (user_id)
+SELECT t.user_id
+FROM raw_events_first t
+WHERE EXISTS (SELECT 1 FROM c WHERE c.user_id = t.user_id);
+-- Expect one insert per row in raw_events_first (EXISTS always true per user_id)
+SELECT 't1_count_matches' AS test,
+ (SELECT count(*) FROM agg_events) =
+ (SELECT count(*) FROM raw_events_first) AS ok;
+ test | ok
+---------------------------------------------------------------------
+ t1_count_matches | t
+(1 row)
+
+-- Spot-check: how many rows were inserted
+SELECT 't1_rows' AS test, count(*) AS rows FROM agg_events;
+ test | rows
+---------------------------------------------------------------------
+ t1_rows | 9
+(1 row)
+
+---------------------------------------------------------------------
+-- T2) CTE referenced inside a nested subquery (two levels down)
+---------------------------------------------------------------------
+TRUNCATE agg_events;
+WITH c AS MATERIALIZED (
+ SELECT user_id FROM raw_events_first
+)
+INSERT INTO agg_events (user_id)
+SELECT t.user_id
+FROM raw_events_first t
+WHERE EXISTS (
+ SELECT 1
+ FROM (SELECT user_id FROM c) c2
+ WHERE c2.user_id = t.user_id
+);
+-- Same cardinality expectation as T1
+SELECT 't2_count_matches' AS test,
+ (SELECT count(*) FROM agg_events) =
+ (SELECT count(*) FROM raw_events_first) AS ok;
+ test | ok
+---------------------------------------------------------------------
+ t2_count_matches | t
+(1 row)
+
+SELECT 't2_rows' AS test, count(*) AS rows FROM agg_events;
+ test | rows
+---------------------------------------------------------------------
+ t2_rows | 9
+(1 row)
+
+---------------------------------------------------------------------
+-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
+-- (use MAX() to keep scalar subquery single-row)
+---------------------------------------------------------------------
+TRUNCATE agg_events;
+WITH c AS (SELECT user_id, value_1 FROM raw_events_first)
+INSERT INTO agg_events (user_id, value_1_agg)
+SELECT d.user_id, d.value_1_agg
+FROM (
+ SELECT t.user_id,
+ (SELECT max(c.value_1) FROM c WHERE c.user_id = t.user_id) AS value_1_agg
+ FROM raw_events_first t
+) AS d
+WHERE d.value_1_agg IS NOT NULL;
+-- Expect one insert per row in raw_events_first whose user_id has at least one non-NULL value_1
+SELECT 't3_count_matches' AS test,
+ (SELECT count(*) FROM agg_events) =
+ (
+ SELECT count(*)
+ FROM raw_events_first t
+ WHERE EXISTS (
+ SELECT 1 FROM raw_events_first c
+ WHERE c.user_id = t.user_id AND c.value_1 IS NOT NULL
+ )
+ ) AS ok;
+ test | ok
+---------------------------------------------------------------------
+ t3_count_matches | t
+(1 row)
+
+-- Also verify no NULLs were inserted into value_1_agg
+SELECT 't3_no_null_value_1_agg' AS test,
+ NOT EXISTS (SELECT 1 FROM agg_events WHERE value_1_agg IS NULL) AS ok;
+ test | ok
+---------------------------------------------------------------------
+ t3_no_null_value_1_agg | t
+(1 row)
+
+-- Deterministic sample of results
+SELECT 't3_sample' AS test, user_id, value_1_agg
+FROM agg_events
+ORDER BY user_id
+LIMIT 5;
+ test | user_id | value_1_agg
+---------------------------------------------------------------------
+ t3_sample | 1 | 20
+ t3_sample | 1 | 20
+ t3_sample | 1 | 20
+ t3_sample | 3 | 30
+ t3_sample | 5 | 50
+(5 rows)
+
+-- End of PR #8106 — CTE traversal works when following outer Vars
SET client_min_messages TO ERROR;
+DROP SCHEMA pr8106_cte_outervar CASCADE;
DROP SCHEMA multi_insert_select CASCADE;
diff --git a/src/test/regress/expected/pg17.out b/src/test/regress/expected/pg17.out
index 721087d3d..41b82b067 100644
--- a/src/test/regress/expected/pg17.out
+++ b/src/test/regress/expected/pg17.out
@@ -1262,15 +1262,17 @@ SET search_path TO pg17;
ALTER TABLE distributed_partitioned_table DROP CONSTRAINT dist_exclude_named;
ALTER TABLE local_partitioned_table DROP CONSTRAINT local_exclude_named;
-- Step 10: Verify the constraints were dropped
-SELECT * FROM pg_constraint WHERE conname = 'dist_exclude_named' AND contype = 'x';
- oid | conname | connamespace | contype | condeferrable | condeferred | convalidated | conrelid | contypid | conindid | conparentid | confrelid | confupdtype | confdeltype | confmatchtype | conislocal | coninhcount | connoinherit | conkey | confkey | conpfeqop | conppeqop | conffeqop | confdelsetcols | conexclop | conbin
+SELECT COUNT(*) FROM pg_constraint WHERE conname = 'dist_exclude_named' AND contype = 'x';
+ count
---------------------------------------------------------------------
-(0 rows)
+ 0
+(1 row)
-SELECT * FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x';
- oid | conname | connamespace | contype | condeferrable | condeferred | convalidated | conrelid | contypid | conindid | conparentid | confrelid | confupdtype | confdeltype | confmatchtype | conislocal | coninhcount | connoinherit | conkey | confkey | conpfeqop | conppeqop | conffeqop | confdelsetcols | conexclop | conbin
+SELECT COUNT(*) FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x';
+ count
---------------------------------------------------------------------
-(0 rows)
+ 0
+(1 row)
-- Step 11: Clean up - Drop the tables
DROP TABLE distributed_partitioned_table CASCADE;
diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql
index eabadda7c..19ae70abc 100644
--- a/src/test/regress/sql/multi_insert_select.sql
+++ b/src/test/regress/sql/multi_insert_select.sql
@@ -2583,5 +2583,127 @@ SELECT id, val FROM version_dist_union ORDER BY id;
-- End of Issue #7784
+-- PR #8106 — CTE traversal works when following outer Vars
+-- This script exercises three shapes:
+-- T1) CTE referenced inside a correlated subquery (one level down)
+-- T2) CTE referenced inside a nested subquery (two levels down)
+-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
+
+CREATE SCHEMA pr8106_cte_outervar;
+SET search_path = pr8106_cte_outervar, public;
+
+-- Base tables for the tests
+DROP TABLE IF EXISTS raw_events_first CASCADE;
+DROP TABLE IF EXISTS agg_events CASCADE;
+
+CREATE TABLE raw_events_first(
+ user_id int,
+ value_1 int
+);
+
+CREATE TABLE agg_events(
+ user_id int,
+ value_1_agg int
+);
+
+-- Distribute and colocate (distribution key = user_id)
+SELECT create_distributed_table('raw_events_first', 'user_id');
+SELECT create_distributed_table('agg_events', 'user_id');
+
+-- Seed data (duplicates on some user_ids; some NULL value_1’s)
+INSERT INTO raw_events_first(user_id, value_1) VALUES
+ (1, 10), (1, 20), (1, NULL),
+ (2, NULL),
+ (3, 30),
+ (4, NULL),
+ (5, 50), (5, NULL),
+ (6, NULL);
+
+----------------------------------------------------------------------
+-- T1) CTE referenced inside a correlated subquery (one level down)
+----------------------------------------------------------------------
+TRUNCATE agg_events;
+
+WITH c AS MATERIALIZED (
+ SELECT user_id FROM raw_events_first
+)
+INSERT INTO agg_events (user_id)
+SELECT t.user_id
+FROM raw_events_first t
+WHERE EXISTS (SELECT 1 FROM c WHERE c.user_id = t.user_id);
+
+-- Expect one insert per row in raw_events_first (EXISTS always true per user_id)
+SELECT 't1_count_matches' AS test,
+ (SELECT count(*) FROM agg_events) =
+ (SELECT count(*) FROM raw_events_first) AS ok;
+
+-- Spot-check: how many rows were inserted
+SELECT 't1_rows' AS test, count(*) AS rows FROM agg_events;
+
+----------------------------------------------------------------------
+-- T2) CTE referenced inside a nested subquery (two levels down)
+----------------------------------------------------------------------
+TRUNCATE agg_events;
+
+WITH c AS MATERIALIZED (
+ SELECT user_id FROM raw_events_first
+)
+INSERT INTO agg_events (user_id)
+SELECT t.user_id
+FROM raw_events_first t
+WHERE EXISTS (
+ SELECT 1
+ FROM (SELECT user_id FROM c) c2
+ WHERE c2.user_id = t.user_id
+);
+
+-- Same cardinality expectation as T1
+SELECT 't2_count_matches' AS test,
+ (SELECT count(*) FROM agg_events) =
+ (SELECT count(*) FROM raw_events_first) AS ok;
+
+SELECT 't2_rows' AS test, count(*) AS rows FROM agg_events;
+
+----------------------------------------------------------------------
+-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
+-- (use MAX() to keep scalar subquery single-row)
+----------------------------------------------------------------------
+TRUNCATE agg_events;
+
+WITH c AS (SELECT user_id, value_1 FROM raw_events_first)
+INSERT INTO agg_events (user_id, value_1_agg)
+SELECT d.user_id, d.value_1_agg
+FROM (
+ SELECT t.user_id,
+ (SELECT max(c.value_1) FROM c WHERE c.user_id = t.user_id) AS value_1_agg
+ FROM raw_events_first t
+) AS d
+WHERE d.value_1_agg IS NOT NULL;
+
+-- Expect one insert per row in raw_events_first whose user_id has at least one non-NULL value_1
+SELECT 't3_count_matches' AS test,
+ (SELECT count(*) FROM agg_events) =
+ (
+ SELECT count(*)
+ FROM raw_events_first t
+ WHERE EXISTS (
+ SELECT 1 FROM raw_events_first c
+ WHERE c.user_id = t.user_id AND c.value_1 IS NOT NULL
+ )
+ ) AS ok;
+
+-- Also verify no NULLs were inserted into value_1_agg
+SELECT 't3_no_null_value_1_agg' AS test,
+ NOT EXISTS (SELECT 1 FROM agg_events WHERE value_1_agg IS NULL) AS ok;
+
+-- Deterministic sample of results
+SELECT 't3_sample' AS test, user_id, value_1_agg
+FROM agg_events
+ORDER BY user_id
+LIMIT 5;
+
+-- End of PR #8106 — CTE traversal works when following outer Vars
+
SET client_min_messages TO ERROR;
+DROP SCHEMA pr8106_cte_outervar CASCADE;
DROP SCHEMA multi_insert_select CASCADE;
diff --git a/src/test/regress/sql/pg17.sql b/src/test/regress/sql/pg17.sql
index 60ea6d9e7..713b58952 100644
--- a/src/test/regress/sql/pg17.sql
+++ b/src/test/regress/sql/pg17.sql
@@ -669,8 +669,8 @@ ALTER TABLE distributed_partitioned_table DROP CONSTRAINT dist_exclude_named;
ALTER TABLE local_partitioned_table DROP CONSTRAINT local_exclude_named;
-- Step 10: Verify the constraints were dropped
-SELECT * FROM pg_constraint WHERE conname = 'dist_exclude_named' AND contype = 'x';
-SELECT * FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x';
+SELECT COUNT(*) FROM pg_constraint WHERE conname = 'dist_exclude_named' AND contype = 'x';
+SELECT COUNT(*) FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x';
-- Step 11: Clean up - Drop the tables
DROP TABLE distributed_partitioned_table CASCADE;