From 6b6d959facac53aa49e21dc0751b48f75561d117 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Fri, 8 Aug 2025 13:46:11 +0300 Subject: [PATCH 1/6] PG18 - pg17.sql Simplify step 10 verification to use COUNT(*) instead of SELECT * (#8111) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fixes #8096 PostgreSQL 18 adds a `conenforced` flag allowing `CHECK` constraints to be declared `NOT ENFORCED`. https://github.com/postgres/postgres/commit/ca87c415e2fccf81cec6fd45698dde9fae0ab570 ```diff @@ -1256,26 +1278,26 @@ distributed_partitioned_table_id_partition_col_excl | x (2 rows) -- Step 9: Drop the exclusion constraints from both tables \c - - :master_host :master_port SET search_path TO pg17; ALTER TABLE distributed_partitioned_table DROP CONSTRAINT dist_exclude_named; ALTER TABLE local_partitioned_table DROP CONSTRAINT local_exclude_named; -- Step 10: Verify the constraints were dropped SELECT * FROM pg_constraint WHERE conname = 'dist_exclude_named' AND contype = 'x'; - oid | conname | connamespace | contype | condeferrable | condeferred | convalidated | conrelid | contypid | conindid | conparentid | confrelid | confupdtype | confdeltype | confmatchtype | conislocal | coninhcount | connoinherit | conkey | confkey | conpfeqop | conppeqop | conffeqop | confdelsetcols | conexclop | conbin + oid | conname | connamespace | contype | condeferrable | condeferred | conenforced | convalidated | conrelid | contypid | conindid | conparentid | confrelid | confupdtype | confdeltype | confmatchtype | conislocal | coninhcount | connoinherit | conperiod | conkey | confkey | conpfeqop | conppeqop | conffeqop | confdelsetcols | conexclop | conbin -----+---------+--------------+---------+---------------+-------------+-------------+--------------+----------+----------+----------+-------------+-----------+-------------+-------------+---------------+------------+-------------+--------------+-----------+--------+---------+-----------+-----------+-----------+----------------+-----------+-------- (0 rows) SELECT * FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x'; - oid | conname | connamespace | contype | condeferrable | condeferred | convalidated | conrelid | contypid | conindid | conparentid | confrelid | confupdtype | confdeltype | confmatchtype | conislocal | coninhcount | connoinherit | conkey | confkey | conpfeqop | conppeqop | conffeqop | confdelsetcols | conexclop | conbin + oid | conname | connamespace | contype | condeferrable | condeferred | conenforced | convalidated | conrelid | contypid | conindid | conparentid | confrelid | confupdtype | confdeltype | confmatchtype | conislocal | coninhcount | connoinherit | conperiod | conkey | confkey | conpfeqop | conppeqop | conffeqop | confdelsetcols | conexclop | conbin -----+---------+--------------+---------+---------------+-------------+-------------+--------------+----------+----------+----------+-------------+-----------+-------------+-------------+---------------+------------+-------------+--------------+-----------+--------+---------+-----------+-----------+-----------+----------------+-----------+-------- (0 rows) ``` The purpose of step 10 is merely to confirm that the exclusion constraints dist_exclude_named and local_exclude_named have been dropped. There’s no need to pull back every column from pg_constraint—we only care about whether any matching row remains. - Reduces noise in the output - Eliminates dependence on the full set of pg_constraint columns (which can drift across Postgres versions) - Resolves the pg18 regression diff without altering test expectations --- src/test/regress/expected/pg17.out | 14 ++++++++------ src/test/regress/sql/pg17.sql | 4 ++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/test/regress/expected/pg17.out b/src/test/regress/expected/pg17.out index 721087d3d..41b82b067 100644 --- a/src/test/regress/expected/pg17.out +++ b/src/test/regress/expected/pg17.out @@ -1262,15 +1262,17 @@ SET search_path TO pg17; ALTER TABLE distributed_partitioned_table DROP CONSTRAINT dist_exclude_named; ALTER TABLE local_partitioned_table DROP CONSTRAINT local_exclude_named; -- Step 10: Verify the constraints were dropped -SELECT * FROM pg_constraint WHERE conname = 'dist_exclude_named' AND contype = 'x'; - oid | conname | connamespace | contype | condeferrable | condeferred | convalidated | conrelid | contypid | conindid | conparentid | confrelid | confupdtype | confdeltype | confmatchtype | conislocal | coninhcount | connoinherit | conkey | confkey | conpfeqop | conppeqop | conffeqop | confdelsetcols | conexclop | conbin +SELECT COUNT(*) FROM pg_constraint WHERE conname = 'dist_exclude_named' AND contype = 'x'; + count --------------------------------------------------------------------- -(0 rows) + 0 +(1 row) -SELECT * FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x'; - oid | conname | connamespace | contype | condeferrable | condeferred | convalidated | conrelid | contypid | conindid | conparentid | confrelid | confupdtype | confdeltype | confmatchtype | conislocal | coninhcount | connoinherit | conkey | confkey | conpfeqop | conppeqop | conffeqop | confdelsetcols | conexclop | conbin +SELECT COUNT(*) FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x'; + count --------------------------------------------------------------------- -(0 rows) + 0 +(1 row) -- Step 11: Clean up - Drop the tables DROP TABLE distributed_partitioned_table CASCADE; diff --git a/src/test/regress/sql/pg17.sql b/src/test/regress/sql/pg17.sql index 60ea6d9e7..713b58952 100644 --- a/src/test/regress/sql/pg17.sql +++ b/src/test/regress/sql/pg17.sql @@ -669,8 +669,8 @@ ALTER TABLE distributed_partitioned_table DROP CONSTRAINT dist_exclude_named; ALTER TABLE local_partitioned_table DROP CONSTRAINT local_exclude_named; -- Step 10: Verify the constraints were dropped -SELECT * FROM pg_constraint WHERE conname = 'dist_exclude_named' AND contype = 'x'; -SELECT * FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x'; +SELECT COUNT(*) FROM pg_constraint WHERE conname = 'dist_exclude_named' AND contype = 'x'; +SELECT COUNT(*) FROM pg_constraint WHERE conname = 'local_exclude_named' AND contype = 'x'; -- Step 11: Clean up - Drop the tables DROP TABLE distributed_partitioned_table CASCADE; From 71d63283782a0a76d66345c79f7ccaefa347160c Mon Sep 17 00:00:00 2001 From: Karina <55838532+Green-Chan@users.noreply.github.com> Date: Mon, 11 Aug 2025 18:34:06 +0300 Subject: [PATCH 2/6] Fix memory corruptions around pg_dist_background_task accessors after a Citus downgrade is followed by an upgrade (#8114) DESCRIPTION: Fixes potential memory corruptions that could happen when accessing pg_dist_background_task after a Citus downgrade is followed by a Citus upgrade. In case of Citus downgrade and further upgrade an undefined behavior may be encountered. The reason is that Citus hardcoded the number of columns in the extension's tables, but in case of downgrade and following update some of these tables can have more columns, and some of them can be marked as dropped. This PR fixes all such tables using the approach introduced in #7950, which solved the problem for the pg_dist_partition table. See #7515 for a more thorough explanation. --------- Co-authored-by: Karina Litskevich Co-authored-by: Onur Tirtir --- .../distributed/metadata/metadata_utility.c | 115 +++++++++++++----- .../distributed/pg_dist_background_task.h | 2 + 2 files changed, 86 insertions(+), 31 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 2b8bd0d1c..b84260f9e 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -3130,10 +3130,10 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC /* 2. insert new task */ { - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool nulls[Natts_pg_dist_background_task] = { 0 }; + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTask); - memset(nulls, true, sizeof(nulls)); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *nulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); int64 taskId = GetNextBackgroundTaskTaskId(); @@ -3164,15 +3164,17 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC values[Anum_pg_dist_background_task_message - 1] = CStringGetTextDatum(""); nulls[Anum_pg_dist_background_task_message - 1] = false; - values[Anum_pg_dist_background_task_nodes_involved - 1] = - IntArrayToDatum(nodesInvolvedCount, nodesInvolved); - nulls[Anum_pg_dist_background_task_nodes_involved - 1] = - (nodesInvolvedCount == 0); + int nodesInvolvedIndex = + GetNodesInvolvedAttrIndexInPgDistBackgroundTask(tupleDescriptor); + values[nodesInvolvedIndex] = IntArrayToDatum(nodesInvolvedCount, nodesInvolved); + nulls[nodesInvolvedIndex] = (nodesInvolvedCount == 0); - HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask), - values, nulls); + HeapTuple newTuple = heap_form_tuple(tupleDescriptor, values, nulls); CatalogTupleInsert(pgDistBackgroundTask, newTuple); + pfree(values); + pfree(nulls); + task = palloc0(sizeof(BackgroundTask)); task->taskid = taskId; task->status = BACKGROUND_TASK_STATUS_RUNNABLE; @@ -3285,11 +3287,12 @@ ResetRunningBackgroundTasks(void) List *taskIdsToWait = NIL; while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) { - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool isnull[Natts_pg_dist_background_task] = { 0 }; - bool replace[Natts_pg_dist_background_task] = { 0 }; - TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); + + Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(taskTuple, tupleDescriptor, values, isnull); values[Anum_pg_dist_background_task_status - 1] = @@ -3358,6 +3361,10 @@ ResetRunningBackgroundTasks(void) replace); CatalogTupleUpdate(pgDistBackgroundTasks, &taskTuple->t_self, taskTuple); + + pfree(values); + pfree(isnull); + pfree(replace); } if (list_length(taskIdsToWait) > 0) @@ -3441,8 +3448,9 @@ DeformBackgroundJobHeapTuple(TupleDesc tupleDescriptor, HeapTuple jobTuple) static BackgroundTask * DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple) { - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool nulls[Natts_pg_dist_background_task] = { 0 }; + Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *nulls = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls); BackgroundTask *task = palloc0(sizeof(BackgroundTask)); @@ -3480,13 +3488,18 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple) TextDatumGetCString(values[Anum_pg_dist_background_task_message - 1]); } - if (!nulls[Anum_pg_dist_background_task_nodes_involved - 1]) + int nodesInvolvedIndex = + GetNodesInvolvedAttrIndexInPgDistBackgroundTask(tupleDescriptor); + if (!nulls[nodesInvolvedIndex]) { ArrayType *nodesInvolvedArrayObject = - DatumGetArrayTypeP(values[Anum_pg_dist_background_task_nodes_involved - 1]); + DatumGetArrayTypeP(values[nodesInvolvedIndex]); task->nodesInvolved = IntegerArrayTypeToList(nodesInvolvedArrayObject); } + pfree(values); + pfree(nulls); + return task; } @@ -3751,8 +3764,8 @@ JobTasksStatusCount(int64 jobId) HeapTuple heapTuple = NULL; while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) { - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool isnull[Natts_pg_dist_background_task] = { 0 }; + Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); @@ -3760,6 +3773,9 @@ JobTasksStatusCount(int64 jobId) 1]); BackgroundTaskStatus status = BackgroundTaskStatusByOid(statusOid); + pfree(values); + pfree(isnull); + switch (status) { case BACKGROUND_TASK_STATUS_BLOCKED: @@ -4012,9 +4028,9 @@ UpdateBackgroundJob(int64 jobId) UINT64_FORMAT, jobId))); } - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool isnull[Natts_pg_dist_background_task] = { 0 }; - bool replace[Natts_pg_dist_background_task] = { 0 }; + Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); @@ -4058,6 +4074,10 @@ UpdateBackgroundJob(int64 jobId) systable_endscan(scanDescriptor); table_close(pgDistBackgroundJobs, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -4093,9 +4113,9 @@ UpdateBackgroundTask(BackgroundTask *task) task->jobid, task->taskid))); } - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool isnull[Natts_pg_dist_background_task] = { 0 }; - bool replace[Natts_pg_dist_background_task] = { 0 }; + Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); @@ -4164,6 +4184,10 @@ UpdateBackgroundTask(BackgroundTask *task) systable_endscan(scanDescriptor); table_close(pgDistBackgroundTasks, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -4251,9 +4275,10 @@ CancelTasksForJob(int64 jobid) HeapTuple taskTuple = NULL; while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) { - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool nulls[Natts_pg_dist_background_task] = { 0 }; - bool replace[Natts_pg_dist_background_task] = { 0 }; + Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *nulls = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls); Oid statusOid = @@ -4302,6 +4327,10 @@ CancelTasksForJob(int64 jobid) taskTuple = heap_modify_tuple(taskTuple, tupleDescriptor, values, nulls, replace); CatalogTupleUpdate(pgDistBackgroundTasks, &taskTuple->t_self, taskTuple); + + pfree(values); + pfree(nulls); + pfree(replace); } systable_endscan(scanDescriptor); @@ -4358,9 +4387,9 @@ UnscheduleDependentTasks(BackgroundTask *task) "task_id: " UINT64_FORMAT, cTaskId))); } - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool isnull[Natts_pg_dist_background_task] = { 0 }; - bool replace[Natts_pg_dist_background_task] = { 0 }; + Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); values[Anum_pg_dist_background_task_status - 1] = ObjectIdGetDatum(CitusTaskStatusUnscheduledId()); @@ -4372,6 +4401,10 @@ UnscheduleDependentTasks(BackgroundTask *task) CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple); systable_endscan(scanDescriptor); + + pfree(values); + pfree(isnull); + pfree(replace); } } @@ -4457,3 +4490,23 @@ GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc) ? (Anum_pg_dist_partition_autoconverted - 1) : tupleDesc->natts - 1; } + + +/* + * GetNodesInvolvedAttrIndexInPgDistBackgroundTask returns attrnum for nodes_involved attr. + * + * nodes_involved attr was added to table pg_dist_background_task using alter operation after + * the version where Citus started supporting downgrades, and it's only column that we've + * introduced to pg_dist_background_task since then. + * + * And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than + * Natts_pg_dist_background_task and when this happens, then we know that attrnum nodes_involved is + * not Anum_pg_dist_background_task_nodes_involved anymore but tupleDesc->natts - 1. + */ +int +GetNodesInvolvedAttrIndexInPgDistBackgroundTask(TupleDesc tupleDesc) +{ + return TupleDescSize(tupleDesc) == Natts_pg_dist_background_task + ? (Anum_pg_dist_background_task_nodes_involved - 1) + : tupleDesc->natts - 1; +} diff --git a/src/include/distributed/pg_dist_background_task.h b/src/include/distributed/pg_dist_background_task.h index 9e6673a64..60e30e638 100644 --- a/src/include/distributed/pg_dist_background_task.h +++ b/src/include/distributed/pg_dist_background_task.h @@ -27,4 +27,6 @@ #define Anum_pg_dist_background_task_message 9 #define Anum_pg_dist_background_task_nodes_involved 10 +extern int GetNodesInvolvedAttrIndexInPgDistBackgroundTask(TupleDesc tupleDesc); + #endif /* CITUS_PG_DIST_BACKGROUND_TASK_H */ From a6161f5a21a150220f40fff5bc7ee03c71ce3ff9 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Tue, 12 Aug 2025 11:49:50 +0300 Subject: [PATCH 3/6] Fix CTE traversal for outer Vars in FindReferencedTableColumn (remove assert; correct parentQueryList handling) (#8106) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fixes #8105 This change lets `FindReferencedTableColumn()` correctly resolve columns through a CTE even when the expression comes from an outer query level (`varlevelsup > 0`, `skipOuterVars = false`). Before, we hit an `Assert(skipOuterVars)` in this path. **Problem** * Hitting a CTE after walking outer Vars triggered `Assert(skipOuterVars)`. * Cause: we modified `parentQueryList` in place and didn’t rebuild the correct parent chain before recursing into the CTE, so the path was considered unsafe. **Fix** * Remove the `Assert(skipOuterVars)` in the `RTE_CTE` branch. * Find the CTE’s owning level via `ctelevelsup` and compute `cteParentListIndex`. * Rebuild a private parent list for recursion: `list_copy` → `list_truncate` → `lappend(current query)`. * Add a bounds check before indexing the CTE’s `targetList`. **Why it works** ```diff -parentQueryList = lappend(parentQueryList, query); -FindReferencedTableColumn(targetEntry->expr, parentQueryList, - cteQuery, column, rteContainingReferencedColumn, - skipOuterVars); + /* hand a private, bounded parent list to the recursion */ + List *newParent = list_copy(parentQueryList); + newParent = list_truncate(newParent, cteParentListIndex + 1); + newParent = lappend(newParent, query); + + FindReferencedTableColumn(targetEntry->expr, + newParent, + cteQuery, + column, + rteContainingReferencedColumn, + skipOuterVars); +} ``` **Before:** We changed `parentQueryList` in place (`parentQueryList = lappend(...)`) and didn’t trim it to the CTE’s owner level. **After:** We copy the list, trim it to the CTE’s owner level, then append the current query. This keeps the parent list accurate for the current recursion and safe when following outer Vars. **Example: Nested subquery referencing the CTE (two levels down)** ``` WITH c AS MATERIALIZED (SELECT user_id FROM raw_events_first) SELECT 1 FROM raw_events_first t WHERE EXISTS ( SELECT 1 FROM (SELECT user_id FROM c) c2 WHERE c2.user_id = t.user_id ); ``` Levels: Q0 = top SELECT Q1 = EXISTS subquery Q2 = inner (SELECT user_id FROM c) When resolving c2.user_id inside Q2: - parentQueryList is [Q0, Q1, Q2]. - `ctelevelsup`: 2 `cteParentListIndex = length(parentQueryList) - ctelevelsup - 1` - Recurse into the CTE’s query with [Q0, Q2]. **Tests (added in `multi_insert_select`)** * **T1:** Correlated subquery that references a CTE (one level down) Verifies that resolving through `RTE_CTE` after following an outer `Var` succeeds, row count matches source table. * **T2:** Nested subquery that references a CTE (two levels down) Exercises deeper recursion and confirms identical to T1. * **T3:** Scalar subquery in a target list that reads from the outer CTE Checks expected row count and that no NULLs are inserted. These tests cover the cases that previously hit `Assert(skipOuterVars)` and confirm CTE references while following outer Vars. --- .../planner/multi_logical_optimizer.c | 39 +++-- .../regress/expected/multi_insert_select.out | 152 ++++++++++++++++++ src/test/regress/sql/multi_insert_select.sql | 122 ++++++++++++++ 3 files changed, 303 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 7deced084..c0679c14e 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -4583,11 +4583,10 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * else if (rangeTableEntry->rtekind == RTE_CTE) { /* - * When outerVars are considered, we modify parentQueryList, so this - * logic might need to change when we support outervars in CTEs. + * Resolve through a CTE even when skipOuterVars == false. + * Maintain the invariant that each recursion level owns a private, + * correctly-bounded copy of parentQueryList. */ - Assert(skipOuterVars); - int cteParentListIndex = list_length(parentQueryList) - rangeTableEntry->ctelevelsup - 1; Query *cteParentQuery = NULL; @@ -4618,14 +4617,34 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * if (cte != NULL) { Query *cteQuery = (Query *) cte->ctequery; - List *targetEntryList = cteQuery->targetList; AttrNumber targetEntryIndex = candidateColumn->varattno - 1; - TargetEntry *targetEntry = list_nth(targetEntryList, targetEntryIndex); - parentQueryList = lappend(parentQueryList, query); - FindReferencedTableColumn(targetEntry->expr, parentQueryList, - cteQuery, column, rteContainingReferencedColumn, - skipOuterVars); + if (targetEntryIndex >= 0 && + targetEntryIndex < list_length(cteQuery->targetList)) + { + TargetEntry *targetEntry = + list_nth(cteQuery->targetList, targetEntryIndex); + + /* Build a private, bounded parentQueryList before recursing into the CTE. + * Invariant: list is [top … current], owned by this call (no aliasing). + * For RTE_CTE: + * owner_idx = list_length(parentQueryList) - rangeTableEntry->ctelevelsup - 1; + * newParent = lappend(list_truncate(list_copy(parentQueryList), owner_idx + 1), query); + * Example (Q0 owns CTE; we’re in Q2 via nested subquery): + * parent=[Q0,Q1,Q2], ctelevelsup=2 ⇒ owner_idx=0 ⇒ newParent=[Q0,Q2]. + * Keeps outer-Var level math correct without mutating the caller’s list. + */ + List *newParent = list_copy(parentQueryList); + newParent = list_truncate(newParent, cteParentListIndex + 1); + newParent = lappend(newParent, query); + + FindReferencedTableColumn(targetEntry->expr, + newParent, + cteQuery, + column, + rteContainingReferencedColumn, + skipOuterVars); + } } } } diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 00e335a82..76538f22d 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -3637,5 +3637,157 @@ SELECT id, val FROM version_dist_union ORDER BY id; (6 rows) -- End of Issue #7784 +-- PR #8106 — CTE traversal works when following outer Vars +-- This script exercises three shapes: +-- T1) CTE referenced inside a correlated subquery (one level down) +-- T2) CTE referenced inside a nested subquery (two levels down) +-- T3) Subquery targetlist uses a scalar sublink into the outer CTE +CREATE SCHEMA pr8106_cte_outervar; +SET search_path = pr8106_cte_outervar, public; +-- Base tables for the tests +DROP TABLE IF EXISTS raw_events_first CASCADE; +NOTICE: table "raw_events_first" does not exist, skipping +DROP TABLE IF EXISTS agg_events CASCADE; +NOTICE: table "agg_events" does not exist, skipping +CREATE TABLE raw_events_first( + user_id int, + value_1 int +); +CREATE TABLE agg_events( + user_id int, + value_1_agg int +); +-- Distribute and colocate (distribution key = user_id) +SELECT create_distributed_table('raw_events_first', 'user_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('agg_events', 'user_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Seed data (duplicates on some user_ids; some NULL value_1’s) +INSERT INTO raw_events_first(user_id, value_1) VALUES + (1, 10), (1, 20), (1, NULL), + (2, NULL), + (3, 30), + (4, NULL), + (5, 50), (5, NULL), + (6, NULL); +--------------------------------------------------------------------- +-- T1) CTE referenced inside a correlated subquery (one level down) +--------------------------------------------------------------------- +TRUNCATE agg_events; +WITH c AS MATERIALIZED ( + SELECT user_id FROM raw_events_first +) +INSERT INTO agg_events (user_id) +SELECT t.user_id +FROM raw_events_first t +WHERE EXISTS (SELECT 1 FROM c WHERE c.user_id = t.user_id); +-- Expect one insert per row in raw_events_first (EXISTS always true per user_id) +SELECT 't1_count_matches' AS test, + (SELECT count(*) FROM agg_events) = + (SELECT count(*) FROM raw_events_first) AS ok; + test | ok +--------------------------------------------------------------------- + t1_count_matches | t +(1 row) + +-- Spot-check: how many rows were inserted +SELECT 't1_rows' AS test, count(*) AS rows FROM agg_events; + test | rows +--------------------------------------------------------------------- + t1_rows | 9 +(1 row) + +--------------------------------------------------------------------- +-- T2) CTE referenced inside a nested subquery (two levels down) +--------------------------------------------------------------------- +TRUNCATE agg_events; +WITH c AS MATERIALIZED ( + SELECT user_id FROM raw_events_first +) +INSERT INTO agg_events (user_id) +SELECT t.user_id +FROM raw_events_first t +WHERE EXISTS ( + SELECT 1 + FROM (SELECT user_id FROM c) c2 + WHERE c2.user_id = t.user_id +); +-- Same cardinality expectation as T1 +SELECT 't2_count_matches' AS test, + (SELECT count(*) FROM agg_events) = + (SELECT count(*) FROM raw_events_first) AS ok; + test | ok +--------------------------------------------------------------------- + t2_count_matches | t +(1 row) + +SELECT 't2_rows' AS test, count(*) AS rows FROM agg_events; + test | rows +--------------------------------------------------------------------- + t2_rows | 9 +(1 row) + +--------------------------------------------------------------------- +-- T3) Subquery targetlist uses a scalar sublink into the outer CTE +-- (use MAX() to keep scalar subquery single-row) +--------------------------------------------------------------------- +TRUNCATE agg_events; +WITH c AS (SELECT user_id, value_1 FROM raw_events_first) +INSERT INTO agg_events (user_id, value_1_agg) +SELECT d.user_id, d.value_1_agg +FROM ( + SELECT t.user_id, + (SELECT max(c.value_1) FROM c WHERE c.user_id = t.user_id) AS value_1_agg + FROM raw_events_first t +) AS d +WHERE d.value_1_agg IS NOT NULL; +-- Expect one insert per row in raw_events_first whose user_id has at least one non-NULL value_1 +SELECT 't3_count_matches' AS test, + (SELECT count(*) FROM agg_events) = + ( + SELECT count(*) + FROM raw_events_first t + WHERE EXISTS ( + SELECT 1 FROM raw_events_first c + WHERE c.user_id = t.user_id AND c.value_1 IS NOT NULL + ) + ) AS ok; + test | ok +--------------------------------------------------------------------- + t3_count_matches | t +(1 row) + +-- Also verify no NULLs were inserted into value_1_agg +SELECT 't3_no_null_value_1_agg' AS test, + NOT EXISTS (SELECT 1 FROM agg_events WHERE value_1_agg IS NULL) AS ok; + test | ok +--------------------------------------------------------------------- + t3_no_null_value_1_agg | t +(1 row) + +-- Deterministic sample of results +SELECT 't3_sample' AS test, user_id, value_1_agg +FROM agg_events +ORDER BY user_id +LIMIT 5; + test | user_id | value_1_agg +--------------------------------------------------------------------- + t3_sample | 1 | 20 + t3_sample | 1 | 20 + t3_sample | 1 | 20 + t3_sample | 3 | 30 + t3_sample | 5 | 50 +(5 rows) + +-- End of PR #8106 — CTE traversal works when following outer Vars SET client_min_messages TO ERROR; +DROP SCHEMA pr8106_cte_outervar CASCADE; DROP SCHEMA multi_insert_select CASCADE; diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index eabadda7c..19ae70abc 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -2583,5 +2583,127 @@ SELECT id, val FROM version_dist_union ORDER BY id; -- End of Issue #7784 +-- PR #8106 — CTE traversal works when following outer Vars +-- This script exercises three shapes: +-- T1) CTE referenced inside a correlated subquery (one level down) +-- T2) CTE referenced inside a nested subquery (two levels down) +-- T3) Subquery targetlist uses a scalar sublink into the outer CTE + +CREATE SCHEMA pr8106_cte_outervar; +SET search_path = pr8106_cte_outervar, public; + +-- Base tables for the tests +DROP TABLE IF EXISTS raw_events_first CASCADE; +DROP TABLE IF EXISTS agg_events CASCADE; + +CREATE TABLE raw_events_first( + user_id int, + value_1 int +); + +CREATE TABLE agg_events( + user_id int, + value_1_agg int +); + +-- Distribute and colocate (distribution key = user_id) +SELECT create_distributed_table('raw_events_first', 'user_id'); +SELECT create_distributed_table('agg_events', 'user_id'); + +-- Seed data (duplicates on some user_ids; some NULL value_1’s) +INSERT INTO raw_events_first(user_id, value_1) VALUES + (1, 10), (1, 20), (1, NULL), + (2, NULL), + (3, 30), + (4, NULL), + (5, 50), (5, NULL), + (6, NULL); + +---------------------------------------------------------------------- +-- T1) CTE referenced inside a correlated subquery (one level down) +---------------------------------------------------------------------- +TRUNCATE agg_events; + +WITH c AS MATERIALIZED ( + SELECT user_id FROM raw_events_first +) +INSERT INTO agg_events (user_id) +SELECT t.user_id +FROM raw_events_first t +WHERE EXISTS (SELECT 1 FROM c WHERE c.user_id = t.user_id); + +-- Expect one insert per row in raw_events_first (EXISTS always true per user_id) +SELECT 't1_count_matches' AS test, + (SELECT count(*) FROM agg_events) = + (SELECT count(*) FROM raw_events_first) AS ok; + +-- Spot-check: how many rows were inserted +SELECT 't1_rows' AS test, count(*) AS rows FROM agg_events; + +---------------------------------------------------------------------- +-- T2) CTE referenced inside a nested subquery (two levels down) +---------------------------------------------------------------------- +TRUNCATE agg_events; + +WITH c AS MATERIALIZED ( + SELECT user_id FROM raw_events_first +) +INSERT INTO agg_events (user_id) +SELECT t.user_id +FROM raw_events_first t +WHERE EXISTS ( + SELECT 1 + FROM (SELECT user_id FROM c) c2 + WHERE c2.user_id = t.user_id +); + +-- Same cardinality expectation as T1 +SELECT 't2_count_matches' AS test, + (SELECT count(*) FROM agg_events) = + (SELECT count(*) FROM raw_events_first) AS ok; + +SELECT 't2_rows' AS test, count(*) AS rows FROM agg_events; + +---------------------------------------------------------------------- +-- T3) Subquery targetlist uses a scalar sublink into the outer CTE +-- (use MAX() to keep scalar subquery single-row) +---------------------------------------------------------------------- +TRUNCATE agg_events; + +WITH c AS (SELECT user_id, value_1 FROM raw_events_first) +INSERT INTO agg_events (user_id, value_1_agg) +SELECT d.user_id, d.value_1_agg +FROM ( + SELECT t.user_id, + (SELECT max(c.value_1) FROM c WHERE c.user_id = t.user_id) AS value_1_agg + FROM raw_events_first t +) AS d +WHERE d.value_1_agg IS NOT NULL; + +-- Expect one insert per row in raw_events_first whose user_id has at least one non-NULL value_1 +SELECT 't3_count_matches' AS test, + (SELECT count(*) FROM agg_events) = + ( + SELECT count(*) + FROM raw_events_first t + WHERE EXISTS ( + SELECT 1 FROM raw_events_first c + WHERE c.user_id = t.user_id AND c.value_1 IS NOT NULL + ) + ) AS ok; + +-- Also verify no NULLs were inserted into value_1_agg +SELECT 't3_no_null_value_1_agg' AS test, + NOT EXISTS (SELECT 1 FROM agg_events WHERE value_1_agg IS NULL) AS ok; + +-- Deterministic sample of results +SELECT 't3_sample' AS test, user_id, value_1_agg +FROM agg_events +ORDER BY user_id +LIMIT 5; + +-- End of PR #8106 — CTE traversal works when following outer Vars + SET client_min_messages TO ERROR; +DROP SCHEMA pr8106_cte_outervar CASCADE; DROP SCHEMA multi_insert_select CASCADE; From bfc6d1f44005940cfd3e1b9a8a1cb2a165e67962 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Tue, 12 Aug 2025 12:38:19 +0300 Subject: [PATCH 4/6] PG18 - Adjust EXPLAIN's output for disabled nodes (#8108) fixes #8097 --- src/test/regress/bin/normalize.sed | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 48396fcfd..37aa4e25f 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -328,3 +328,14 @@ s/\| CHECK ([a-zA-Z])(.*)/| CHECK \(\1\2\)/g # pg18 change: strip trailing “.00” (or “.0…”) from actual rows counts s/(actual rows=[0-9]+)\.[0-9]+/\1/g + +# pg18 “Disabled” change start +# ignore any “Disabled:” lines in test output +/^\s*Disabled:/d + +# ignore any JSON-style Disabled field +/^\s*"Disabled":/d + +# ignore XML true or false +/^\s*.*<\/Disabled>/d +# pg18 “Disabled” change end From 41883cea38e7de37d1e03997f747106d71ae3ddc Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Wed, 13 Aug 2025 12:22:23 +0300 Subject: [PATCH 5/6] =?UTF-8?q?PG18=20-=20unify=20psql=20headings=20to=20?= =?UTF-8?q?=E2=80=98List=20of=20relations=E2=80=99=20(#8119)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fixes #8110 This patch updates the `normalize.sed` script used in pg18 psql regression tests: - Replaces the headings “List of tables”, “List of indexes”, and “List of sequences” with a single, uniform heading: “List of relations”. --- src/test/regress/bin/normalize.sed | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 37aa4e25f..38fd45837 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -339,3 +339,8 @@ s/(actual rows=[0-9]+)\.[0-9]+/\1/g # ignore XML true or false /^\s*.*<\/Disabled>/d # pg18 “Disabled” change end + +# PG18 psql: headings changed from "List of relations" to per-type titles +s/^([ \t]*)List of tables$/\1List of relations/g +s/^([ \t]*)List of indexes$/\1List of relations/g +s/^([ \t]*)List of sequences$/\1List of relations/g From f73da1ed407c53283c87489141763fea30f301bd Mon Sep 17 00:00:00 2001 From: ibrahim halatci Date: Wed, 13 Aug 2025 19:25:31 +0300 Subject: [PATCH 6/6] Refactor background worker setup for security improvements (#8078) Enhance security by addressing a code scanning alert and refactoring the background worker setup code for better maintainability and clarity. --------- Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> --- .../distributed/metadata/metadata_sync.c | 44 +++---- src/backend/distributed/utils/acquire_lock.c | 36 ++--- .../distributed/utils/background_jobs.c | 80 +++++------- .../utils/background_worker_utils.c | 123 ++++++++++++++++++ src/backend/distributed/utils/maintenanced.c | 86 +++++------- .../distributed/background_worker_utils.h | 59 +++++++++ 6 files changed, 283 insertions(+), 145 deletions(-) create mode 100644 src/backend/distributed/utils/background_worker_utils.c create mode 100644 src/include/distributed/background_worker_utils.h diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index e3b655ab0..15dd6375a 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -58,6 +58,7 @@ #include "distributed/argutils.h" #include "distributed/backend_data.h" +#include "distributed/background_worker_utils.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" @@ -3156,37 +3157,26 @@ MetadataSyncSigAlrmHandler(SIGNAL_ARGS) BackgroundWorkerHandle * SpawnSyncNodeMetadataToNodes(Oid database, Oid extensionOwner) { - BackgroundWorker worker; - BackgroundWorkerHandle *handle = NULL; + char workerName[BGW_MAXLEN]; - /* Configure a worker. */ - memset(&worker, 0, sizeof(worker)); - SafeSnprintf(worker.bgw_name, BGW_MAXLEN, + SafeSnprintf(workerName, BGW_MAXLEN, "Citus Metadata Sync: %u/%u", database, extensionOwner); - worker.bgw_flags = - BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - worker.bgw_start_time = BgWorkerStart_ConsistentState; - /* don't restart, we manage restarts from maintenance daemon */ - worker.bgw_restart_time = BGW_NEVER_RESTART; - strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); - strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), - "SyncNodeMetadataToNodesMain"); - worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); - memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner, - sizeof(Oid)); - worker.bgw_notify_pid = MyProcPid; - - if (!RegisterDynamicBackgroundWorker(&worker, &handle)) - { - return NULL; - } - - pid_t pid; - WaitForBackgroundWorkerStartup(handle, &pid); - - return handle; + CitusBackgroundWorkerConfig config = { + .workerName = workerName, + .functionName = "SyncNodeMetadataToNodesMain", + .mainArg = ObjectIdGetDatum(MyDatabaseId), + .extensionOwner = extensionOwner, + .needsNotification = true, + .waitForStartup = false, + .restartTime = CITUS_BGW_NEVER_RESTART, + .startTime = CITUS_BGW_DEFAULT_START_TIME, + .workerType = NULL, /* use default */ + .extraData = NULL, + .extraDataSize = 0 + }; + return RegisterCitusBackgroundWorker(&config); } diff --git a/src/backend/distributed/utils/acquire_lock.c b/src/backend/distributed/utils/acquire_lock.c index d0f6193c2..897374ed4 100644 --- a/src/backend/distributed/utils/acquire_lock.c +++ b/src/backend/distributed/utils/acquire_lock.c @@ -33,6 +33,7 @@ #include "storage/latch.h" #include "utils/snapmgr.h" +#include "distributed/background_worker_utils.h" #include "distributed/citus_acquire_lock.h" #include "distributed/citus_safe_lib.h" #include "distributed/connection_management.h" @@ -65,34 +66,33 @@ static bool got_sigterm = false; BackgroundWorkerHandle * StartLockAcquireHelperBackgroundWorker(int backendToHelp, int32 lock_cooldown) { - BackgroundWorkerHandle *handle = NULL; LockAcquireHelperArgs args; - BackgroundWorker worker; memset(&args, 0, sizeof(args)); - memset(&worker, 0, sizeof(worker)); /* collect the extra arguments required for the background worker */ args.DatabaseId = MyDatabaseId; args.lock_cooldown = lock_cooldown; - /* construct the background worker and start it */ - SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name), + char workerName[BGW_MAXLEN]; + SafeSnprintf(workerName, BGW_MAXLEN, "Citus Lock Acquire Helper: %d/%u", backendToHelp, MyDatabaseId); - strcpy_s(worker.bgw_type, sizeof(worker.bgw_type), "citus_lock_aqcuire"); - worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - worker.bgw_start_time = BgWorkerStart_RecoveryFinished; - worker.bgw_restart_time = BGW_NEVER_RESTART; + CitusBackgroundWorkerConfig config = { + .workerName = workerName, + .functionName = "LockAcquireHelperMain", + .mainArg = Int32GetDatum(backendToHelp), + .extensionOwner = InvalidOid, + .needsNotification = false, + .waitForStartup = false, + .restartTime = CITUS_BGW_NEVER_RESTART, + .startTime = BgWorkerStart_RecoveryFinished, + .workerType = "citus_lock_aqcuire", + .extraData = &args, + .extraDataSize = sizeof(args) + }; - strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); - strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_function_name), - "LockAcquireHelperMain"); - worker.bgw_main_arg = Int32GetDatum(backendToHelp); - worker.bgw_notify_pid = 0; - - memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &args, sizeof(args)); - - if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + BackgroundWorkerHandle *handle = RegisterCitusBackgroundWorker(&config); + if (!handle) { return NULL; } diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 2d0f03a4c..272f1d6d8 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -56,6 +56,7 @@ #include "utils/timeout.h" #include "distributed/background_jobs.h" +#include "distributed/background_worker_utils.h" #include "distributed/citus_safe_lib.h" #include "distributed/hash_helpers.h" #include "distributed/listutils.h" @@ -417,37 +418,26 @@ citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus) BackgroundWorkerHandle * StartCitusBackgroundTaskQueueMonitor(Oid database, Oid extensionOwner) { - BackgroundWorker worker = { 0 }; - BackgroundWorkerHandle *handle = NULL; + char workerName[BGW_MAXLEN]; - /* Configure a worker. */ - memset(&worker, 0, sizeof(worker)); - SafeSnprintf(worker.bgw_name, BGW_MAXLEN, + SafeSnprintf(workerName, BGW_MAXLEN, "Citus Background Task Queue Monitor: %u/%u", database, extensionOwner); - worker.bgw_flags = - BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - worker.bgw_start_time = BgWorkerStart_ConsistentState; - /* don't restart, we manage restarts from maintenance daemon */ - worker.bgw_restart_time = BGW_NEVER_RESTART; - strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); - strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), - "CitusBackgroundTaskQueueMonitorMain"); - worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); - memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner, - sizeof(Oid)); - worker.bgw_notify_pid = MyProcPid; - - if (!RegisterDynamicBackgroundWorker(&worker, &handle)) - { - return NULL; - } - - pid_t pid; - WaitForBackgroundWorkerStartup(handle, &pid); - - return handle; + CitusBackgroundWorkerConfig config = { + .workerName = workerName, + .functionName = "CitusBackgroundTaskQueueMonitorMain", + .mainArg = ObjectIdGetDatum(MyDatabaseId), + .extensionOwner = extensionOwner, + .needsNotification = true, + .waitForStartup = true, + .restartTime = CITUS_BGW_NEVER_RESTART, + .startTime = CITUS_BGW_DEFAULT_START_TIME, + .workerType = NULL, /* use default */ + .extraData = NULL, + .extraDataSize = 0 + }; + return RegisterCitusBackgroundWorker(&config); } @@ -1661,33 +1651,31 @@ StartCitusBackgroundTaskExecutor(char *database, char *user, char *command, { dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId, jobId); - /* Configure a worker. */ - BackgroundWorker worker = { 0 }; - memset(&worker, 0, sizeof(worker)); - SafeSnprintf(worker.bgw_name, BGW_MAXLEN, + char workerName[BGW_MAXLEN]; + SafeSnprintf(workerName, BGW_MAXLEN, "Citus Background Task Queue Executor: %s/%s for (%ld/%ld)", database, user, jobId, taskId); - worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - worker.bgw_start_time = BgWorkerStart_ConsistentState; - /* don't restart, we manage restarts from maintenance daemon */ - worker.bgw_restart_time = BGW_NEVER_RESTART; - strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); - strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), - "CitusBackgroundTaskExecutor"); - worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); - worker.bgw_notify_pid = MyProcPid; - - BackgroundWorkerHandle *handle = NULL; - if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + CitusBackgroundWorkerConfig config = { + .workerName = workerName, + .functionName = "CitusBackgroundTaskExecutor", + .mainArg = UInt32GetDatum(dsm_segment_handle(seg)), + .extensionOwner = InvalidOid, + .needsNotification = true, + .waitForStartup = true, + .restartTime = CITUS_BGW_NEVER_RESTART, + .startTime = CITUS_BGW_DEFAULT_START_TIME, + .workerType = NULL, /* use default */ + .extraData = NULL, + .extraDataSize = 0 + }; + BackgroundWorkerHandle *handle = RegisterCitusBackgroundWorker(&config); + if (!handle) { dsm_detach(seg); return NULL; } - pid_t pid = { 0 }; - WaitForBackgroundWorkerStartup(handle, &pid); - if (pSegment) { *pSegment = seg; diff --git a/src/backend/distributed/utils/background_worker_utils.c b/src/backend/distributed/utils/background_worker_utils.c new file mode 100644 index 000000000..b25ac556a --- /dev/null +++ b/src/backend/distributed/utils/background_worker_utils.c @@ -0,0 +1,123 @@ +/*------------------------------------------------------------------------- + * + * background_worker_utils.c + * Common utilities for initializing PostgreSQL background workers + * used by Citus distributed infrastructure. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" + +#include "postmaster/bgworker.h" +#include "storage/proc.h" + +#include "distributed/background_worker_utils.h" +#include "distributed/citus_safe_lib.h" + +/* + * InitializeCitusBackgroundWorker initializes a BackgroundWorker struct + * with common Citus background worker settings. + */ +void +InitializeCitusBackgroundWorker(BackgroundWorker *worker, + const CitusBackgroundWorkerConfig *config) +{ + Assert(worker != NULL); + Assert(config != NULL); + Assert(config->workerName != NULL); + Assert(config->functionName != NULL); + + /* Initialize the worker structure */ + memset(worker, 0, sizeof(BackgroundWorker)); + + /* Set worker name */ + strcpy_s(worker->bgw_name, sizeof(worker->bgw_name), config->workerName); + + /* Set worker type if provided */ + if (config->workerType != NULL) + { + strcpy_s(worker->bgw_type, sizeof(worker->bgw_type), config->workerType); + } + + /* Set standard flags for Citus workers */ + worker->bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + + /* Set start time - use custom start time if provided, otherwise use default */ + worker->bgw_start_time = (config->startTime != 0) ? config->startTime : + CITUS_BGW_DEFAULT_START_TIME; + + /* Set restart behavior */ + worker->bgw_restart_time = config->restartTime; + + /* Set library and function names */ + strcpy_s(worker->bgw_library_name, sizeof(worker->bgw_library_name), "citus"); + strcpy_s(worker->bgw_function_name, sizeof(worker->bgw_function_name), + config->functionName); + + /* Set main argument */ + worker->bgw_main_arg = config->mainArg; + + /* Set extension owner if provided */ + if (OidIsValid(config->extensionOwner)) + { + memcpy_s(worker->bgw_extra, sizeof(worker->bgw_extra), + &config->extensionOwner, sizeof(Oid)); + } + + /* Set additional extra data if provided */ + if (config->extraData != NULL && config->extraDataSize > 0) + { + size_t remainingSpace = sizeof(worker->bgw_extra); + size_t usedSpace = OidIsValid(config->extensionOwner) ? sizeof(Oid) : 0; + + if (usedSpace + config->extraDataSize <= remainingSpace) + { + memcpy_s(((char *) worker->bgw_extra) + usedSpace, + remainingSpace - usedSpace, + config->extraData, + config->extraDataSize); + } + } + + /* Set notification PID if needed */ + if (config->needsNotification) + { + worker->bgw_notify_pid = MyProcPid; + } +} + + +/* + * RegisterCitusBackgroundWorker creates and registers a Citus background worker + * with the specified configuration. Returns the worker handle on success, + * NULL on failure. + */ +BackgroundWorkerHandle * +RegisterCitusBackgroundWorker(const CitusBackgroundWorkerConfig *config) +{ + BackgroundWorker worker; + BackgroundWorkerHandle *handle = NULL; + + /* Initialize the worker structure */ + InitializeCitusBackgroundWorker(&worker, config); + + /* Register the background worker */ + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + return NULL; + } + + /* Wait for startup if requested */ + if (config->waitForStartup && handle != NULL) + { + pid_t pid = 0; + WaitForBackgroundWorkerStartup(handle, &pid); + } + + return handle; +} diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index e6bf3d00c..9ce29a99c 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -48,6 +48,7 @@ #include "pg_version_constants.h" #include "distributed/background_jobs.h" +#include "distributed/background_worker_utils.h" #include "distributed/citus_safe_lib.h" #include "distributed/coordinator_protocol.h" #include "distributed/distributed_deadlock_detection.h" @@ -188,32 +189,21 @@ InitializeMaintenanceDaemonForMainDb(void) return; } + CitusBackgroundWorkerConfig config = { + .workerName = "Citus Maintenance Daemon for Main DB", + .functionName = "CitusMaintenanceDaemonMain", + .mainArg = (Datum) 0, + .extensionOwner = InvalidOid, + .needsNotification = false, + .waitForStartup = false, + .restartTime = CITUS_BGW_DEFAULT_RESTART_TIME, + .startTime = CITUS_BGW_DEFAULT_START_TIME, + .workerType = NULL, /* use default */ + .extraData = NULL, + .extraDataSize = 0 + }; BackgroundWorker worker; - - memset(&worker, 0, sizeof(worker)); - - - strcpy_s(worker.bgw_name, sizeof(worker.bgw_name), - "Citus Maintenance Daemon for Main DB"); - - /* request ability to connect to target database */ - worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - - /* - * No point in getting started before able to run query, but we do - * want to get started on Hot-Standby. - */ - worker.bgw_start_time = BgWorkerStart_ConsistentState; - - /* Restart after a bit after errors, but don't bog the system. */ - worker.bgw_restart_time = 5; - strcpy_s(worker.bgw_library_name, - sizeof(worker.bgw_library_name), "citus"); - strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), - "CitusMaintenanceDaemonMain"); - - worker.bgw_main_arg = (Datum) 0; - + InitializeCitusBackgroundWorker(&worker, &config); RegisterBackgroundWorker(&worker); } @@ -256,37 +246,28 @@ InitializeMaintenanceDaemonBackend(void) { Assert(dbData->workerPid == 0); - BackgroundWorker worker; - BackgroundWorkerHandle *handle = NULL; + char workerName[BGW_MAXLEN]; - memset(&worker, 0, sizeof(worker)); - - SafeSnprintf(worker.bgw_name, sizeof(worker.bgw_name), + SafeSnprintf(workerName, sizeof(workerName), "Citus Maintenance Daemon: %u/%u", MyDatabaseId, extensionOwner); - /* request ability to connect to target database */ - worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + CitusBackgroundWorkerConfig config = { + .workerName = workerName, + .functionName = "CitusMaintenanceDaemonMain", + .mainArg = ObjectIdGetDatum(MyDatabaseId), + .extensionOwner = extensionOwner, + .needsNotification = true, + .waitForStartup = true, + .restartTime = CITUS_BGW_DEFAULT_RESTART_TIME, + .startTime = CITUS_BGW_DEFAULT_START_TIME, + .workerType = NULL, /* use default */ + .extraData = NULL, + .extraDataSize = 0 + }; + BackgroundWorkerHandle *handle = RegisterCitusBackgroundWorker(&config); - /* - * No point in getting started before able to run query, but we do - * want to get started on Hot-Standby. - */ - worker.bgw_start_time = BgWorkerStart_ConsistentState; - - /* Restart after a bit after errors, but don't bog the system. */ - worker.bgw_restart_time = 5; - strcpy_s(worker.bgw_library_name, - sizeof(worker.bgw_library_name), "citus"); - strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), - "CitusMaintenanceDaemonMain"); - - worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); - memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner, - sizeof(Oid)); - worker.bgw_notify_pid = MyProcPid; - - if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + if (!handle) { WarnMaintenanceDaemonNotStarted(); dbData->daemonStarted = false; @@ -301,9 +282,6 @@ InitializeMaintenanceDaemonBackend(void) dbData->triggerNodeMetadataSync = false; LWLockRelease(&MaintenanceDaemonControl->lock); - pid_t pid; - WaitForBackgroundWorkerStartup(handle, &pid); - pfree(handle); } else diff --git a/src/include/distributed/background_worker_utils.h b/src/include/distributed/background_worker_utils.h new file mode 100644 index 000000000..c915b46b9 --- /dev/null +++ b/src/include/distributed/background_worker_utils.h @@ -0,0 +1,59 @@ +/*------------------------------------------------------------------------- + * + * background_worker_utils.h + * Common utilities for initializing PostgreSQL background workers + * used by Citus distributed infrastructure. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef BACKGROUND_WORKER_UTILS_H +#define BACKGROUND_WORKER_UTILS_H + +#include "postgres.h" + +#include "postmaster/bgworker.h" + +/* + * Background worker configuration parameters + */ +typedef struct CitusBackgroundWorkerConfig +{ + /* Worker identification */ + const char *workerName; + const char *functionName; + const char *workerType; + + /* Worker parameters */ + Datum mainArg; + Oid extensionOwner; + + /* Worker behavior flags */ + bool needsNotification; + bool waitForStartup; + int restartTime; + + /* Worker timing */ + BgWorkerStartTime startTime; + + /* Optional extra data */ + const void *extraData; + size_t extraDataSize; +} CitusBackgroundWorkerConfig; + +/* Default configuration values */ +#define CITUS_BGW_DEFAULT_RESTART_TIME 5 +#define CITUS_BGW_NEVER_RESTART BGW_NEVER_RESTART +#define CITUS_BGW_DEFAULT_START_TIME BgWorkerStart_ConsistentState + +/* Function declarations */ +extern BackgroundWorkerHandle * RegisterCitusBackgroundWorker(const + CitusBackgroundWorkerConfig + *config); + +extern void InitializeCitusBackgroundWorker(BackgroundWorker *worker, + const CitusBackgroundWorkerConfig *config); + +#endif /* BACKGROUND_WORKER_UTILS_H */