Compare commits

...

15 Commits

Author SHA1 Message Date
naisila 147619d79a Bump version to 13.1.1 2025-10-02 12:58:19 +03:00
naisila fd8f99b18e Add changelog for 13.1.1 2025-10-02 12:58:19 +03:00
Onur Tirtir 2296e0dd3a Properly detect no-op shard-key updates via UPDATE / MERGE (#8214)
DESCRIPTION: Fixes a bug that causes allowing UPDATE / MERGE queries
that may change the distribution column value.

Fixes: #8087.

Probably as of #769, we were not properly checking if UPDATE
may change the distribution column.

In #769, we had these checks:
```c
	if (targetEntry->resno != column->varattno)
	{
		/* target entry of the form SET some_other_col = <x> */
		isColumnValueChanged = false;
	}
	else if (IsA(setExpr, Var))
	{
		Var *newValue = (Var *) setExpr;
		if (newValue->varattno == column->varattno)
		{
			/* target entry of the form SET col = table.col */
			isColumnValueChanged = false;
		}
	}
```

However, what we check in "if" and in the "else if" are not so
different in the sense they both attempt to verify if SET expr
of the target entry points to the attno of given column. So, in
Also see this PR comment from #5220:
https://github.com/citusdata/citus/pull/5220#discussion_r699230597.
In #769, probably we actually wanted to first check whether both
SET expr of the target entry and given variable are pointing to the
same range var entry, but this wasn't what the "if" was checking,
so removed.

As a result, in the cases that are mentioned in the linked issue,
we were incorrectly concluding that the SET expr of the target
entry won't change given column just because it's pointing to the
same attno as given variable, regardless of what range var entries
the column and the SET expr are pointing to. Then we also started
using the same function to check for such cases for update action
of MERGE, so we have the same bug there as well.

So with this PR, we properly check for such cases by comparing
varno as well in TargetEntryChangesValue(). However, then some of
the existing tests started failing where the SET expr doesn't
directly assign the column to itself but the "where" clause could
actually imply that the distribution column won't change. Even before
we were not attempting to verify if "where" cluse quals could imply a
no-op assignment for the SET expr in such cases but that was not a
problem. This is because, for the most cases, we were always qualifying
such SET expressions as a no-op update as long as the SET expr's
attno is the same as given column's. For this reason, to prevent
regressions, this PR also adds some extra logic as well to understand
if the "where" clause quals could imply that SET expr for the
distribution key is a no-op.

Ideally, we should instead use "relation restriction equivalence"
mechanism to understand if the "where" clause implies a no-op
update. This is because, for instance, right now we're not able to
deduce that the update is a no-op when the "where" clause transitively
implies a no-op update, as in the case where we're setting "column a"
to "column c" and where clause looks like:
  "column a = column b AND column b = column c".
If this means a regression for some users, we can consider doing it
that way. Until then, as a workaround, we can suggest adding additional
quals to "where" clause that would directly imply equivalence.

Also, after fixing TargetEntryChangesValue(), we started successfully
deducing that the update action is a no-op for such MERGE queries:
```sql
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
```
However, we then started seeing below error for above query even
though now the update is qualified as a no-op update:
```
ERROR:  Unexpected column index of the source list
```
This was because of #8180 and #8201 fixed that.

In summary, with this PR:

* We disallow such queries,
  ```sql
  -- attno for dist_1.a, dist_1.b: 1, 2
  -- attno for dist_different_order_1.a, dist_different_order_1.b: 2, 1
  UPDATE dist_1 SET a = dist_different_order_1.b
  FROM dist_different_order_1
  WHERE dist_1.a dist_different_order_1.a;

  -- attno for dist_1.a, dist_1.b: 1, 2
  -- but ON (..) doesn't imply a no-op update for SET expr
  MERGE INTO dist_1
  USING dist_1 src
  ON (dist_1.a = src.b)
  WHEN MATCHED THEN UPDATE SET a = src.a;
  ```

* .. and allow such queries,
  ```sql
  MERGE INTO dist_1
  USING dist_1 src
  ON (dist_1.a = src.b)
  WHEN MATCHED THEN UPDATE SET a = src.b;
  ```

(cherry picked from commit 5eb1d93be1)
(cherry picked from commit 2fd20b3bb5dcc4d24cdee5985cf97c2e37a2b5e6)
2025-09-30 14:13:56 +03:00
Onur Tirtir 6467cb9e2c Fix unexpected column index error for repartitioned merge (#8201)
DESCRIPTION: Fixes a bug that causes an unexpected error when executing
repartitioned merge.

Fixes #8180.

This was happening because of a bug in
SourceResultPartitionColumnIndex(). And to fix it, this PR avoids
using DistributionColumnIndex() in SourceResultPartitionColumnIndex().
Instead, invents FindTargetListEntryWithVarExprAttno(), which finds
the index of the target entry in the source query's target list that
can be used to repartition the source for a repartitioned merge. In
short, to find the source target entry that refences the Var used in
ON (..) clause and that references the source rte, we should check the
varattno of the underlying expr, which presumably is always a Var for
repartitioned merge as we always wrap the source rte with a subquery,
where all target entries point to the columns of the original source
relation.

Using DistributionColumnIndex() prior to 13.0 wasn't causing such an
issue because prior to 13.0, the varattno of the underlying expr of
the source target entries was almost (*1) always equal to resno of the
target entry as we were including all target entries of the source
relation. However, starting with #7659, which is merged to main before
13.0, we started using CreateFilteredTargetListForRelation() instead of
CreateAllTargetListForRelation() to compute the target entry list for
the source rte to fix another bug. So we cannot revert to using
CreateAllTargetListForRelation() because otherwise we would re-introduce
bug that it helped fixing, so we instead had to find a way to properly
deal with the "filtered target list"s, as in this commit. Plus (*1),
even before #7659, probably we would still fail when the source relation
has dropped attributes or such because that would probably also cause
such a mismatch between the varattno of the underlying expr of the
target entry and its resno.

(cherry picked from commit 83b25e1fb1)
2025-09-30 14:13:56 +03:00
Naisila Puka d8fcddc558
Fix HaveRegisteredOrActiveSnapshot() crashes
Part of
ce7ddc0d3d

Also updates images
2025-09-26 16:47:23 +03:00
Ibrahim Halatci d94174d7a8 Bump PG minors 2025-09-03 14:04:41 +03:00
Mehmet YILMAZ ea78497e9e Fix CTE traversal for outer Vars in FindReferencedTableColumn (remove assert; correct parentQueryList handling) (#8106)
fixes #8105 

This change lets `FindReferencedTableColumn()` correctly resolve columns
through a CTE even when the expression comes from an outer query level
(`varlevelsup > 0`, `skipOuterVars = false`). Before, we hit an
`Assert(skipOuterVars)` in this path.

**Problem**

* Hitting a CTE after walking outer Vars triggered
`Assert(skipOuterVars)`.
* Cause: we modified `parentQueryList` in place and didn’t rebuild the
correct parent chain before recursing into the CTE, so the path was
considered unsafe.

**Fix**

* Remove the `Assert(skipOuterVars)` in the `RTE_CTE` branch.
* Find the CTE’s owning level via `ctelevelsup` and compute
`cteParentListIndex`.
* Rebuild a private parent list for recursion: `list_copy` →
`list_truncate` → `lappend(current query)`.
* Add a bounds check before indexing the CTE’s `targetList`.

**Why it works**


```diff
-parentQueryList = lappend(parentQueryList, query);
-FindReferencedTableColumn(targetEntry->expr, parentQueryList,
-                          cteQuery, column, rteContainingReferencedColumn,
-                          skipOuterVars);
+    /* hand a private, bounded parent list to the recursion */
+    List *newParent = list_copy(parentQueryList);
+    newParent = list_truncate(newParent, cteParentListIndex + 1);
+    newParent = lappend(newParent, query);
+
+    FindReferencedTableColumn(targetEntry->expr,
+                              newParent,
+                              cteQuery,
+                              column,
+                              rteContainingReferencedColumn,
+                              skipOuterVars);
+}


```
**Before:** We changed `parentQueryList` in place (`parentQueryList =
lappend(...)`) and didn’t trim it to the CTE’s owner level.

**After:** We copy the list, trim it to the CTE’s owner level, then
append the current query. This keeps the parent list accurate for the
current recursion and safe when following outer Vars.


**Example: Nested subquery referencing the CTE (two levels down)**

```
WITH c AS MATERIALIZED (SELECT user_id FROM raw_events_first)
SELECT 1
FROM raw_events_first t
WHERE EXISTS (
  SELECT 1
  FROM (SELECT user_id FROM c) c2
  WHERE c2.user_id = t.user_id
);
```

Levels:
Q0 = top SELECT
Q1 = EXISTS subquery
Q2 = inner (SELECT user_id FROM c)

When resolving c2.user_id inside Q2:

- parentQueryList is [Q0, Q1, Q2].
- `ctelevelsup`: 2


`cteParentListIndex = length(parentQueryList) - ctelevelsup - 1`

- Recurse into the CTE’s query with [Q0, Q2].


**Tests (added in `multi_insert_select`)**

* **T1:** Correlated subquery that references a CTE (one level down) 
Verifies that resolving through `RTE_CTE` after following an outer `Var`
succeeds, row count matches source table.
* **T2:** Nested subquery that references a CTE (two levels down) 
Exercises deeper recursion and confirms identical to T1.
* **T3:** Scalar subquery in a target list that reads from the outer CTE
Checks expected row count and that no NULLs are inserted.

These tests cover the cases that previously hit `Assert(skipOuterVars)`
and confirm CTE references while following outer Vars.
2025-09-03 14:04:41 +03:00
SongYoungUk 0ce8fb6e2c fix #7715 - add assign hook for CDC library path adjustment (#8025)
DESCRIPTION: Automatically updates dynamic_library_path when CDC is
enabled

fix : #7715

According to the documentation and `pg_settings`, the context of the
`citus.enable_change_data_capture` parameter is user.

However, changing this parameter — even as a superuser — doesn't work as
expected: while the initial copy phase works correctly, subsequent
change events are not propagated.

This appears to be due to the fact that `dynamic_library_path` is only
updated to `$libdir/citus_decoders:$libdir` when the server is restarted
and the `_PG_init` function is invoked.

To address this, I added an `EnableChangeDataCaptureAssignHook` that
automatically updates `dynamic_library_path` at runtime when
`citus.enable_change_data_capture` is enabled, ensuring that the CDC
decoder libraries are properly loaded.

Note that `dynamic_library_path` is already a `superuser`-context
parameter in base PostgreSQL, so updating it from within the assign hook
should be safe and consistent with PostgreSQL’s configuration model.

If there’s any reason this approach might be problematic or if there’s a
preferred alternative, I’d appreciate any feedback.

cc. @jy-min

---------

Co-authored-by: Hanefi Onaldi <Hanefi.Onaldi@microsoft.com>
Co-authored-by: ibrahim halatci <ihalatci@gmail.com>
(cherry picked from commit 743c9bbf87)
2025-07-18 09:13:40 +00:00
Alper Kocatas ad266a2c0a
Add changelog for 13.1.0 (#8006)
Add changelog entries for 13.1.0.

The list of changes was constructed using the following method: 

- include set of commits that come up in comparison of release-13.1 and
release-13.0 branch
- exclude changes in 13.0.0, 13.0.1, 13.0.2 and 13.0.3
- exclude changes related to database sharding
- exclude changes that were cherry picked from main and released before
13.1
- review remaining commits and exclude non user-facing commits.

---------

Co-authored-by: Onur Tirtir <onurcantirtir@gmail.com>
Co-authored-by: Naisila Puka <37271756+naisila@users.noreply.github.com>
2025-06-02 11:10:16 +03:00
Onur Tirtir e42847196a Add skip_qualify_public param to shard_name() to allow qualifying for "public" schema (#8014)
DESCRIPTION: Adds skip_qualify_public param to `shard_name()` UDF to
allow qualifying for "public" schema when needed.
2025-06-02 10:43:44 +03:00
Alper Kocatas b6b415c17a
Bump Citus version to 13.1.0 (#8012)
Bump Citus version to 13.1.0
2025-06-02 10:21:17 +03:00
ibrahim halatci 0355d8c392
bumbed codeql version to v3 (#7998)
DESCRIPTION: bumbed codeql version to v3
2025-05-23 14:13:49 +03:00
Naisila Puka af01fa48ec Bump PG versions to 17.5, 16.9, 15.13 (#7986)
Nontrivial bump because of the following PG15.3 commit
317aba70e
https://github.com/postgres/postgres/commit/317aba70e

Previously, when views were converted to RTE_SUBQUERY the relid
would be cleared in PG15. In this patch of PG15, relid is retained.
Therefore, we add a check with the "relkind and rtekind" to
identify the converted views in 15.13

Sister PR https://github.com/citusdata/the-process/pull/164
Using dev image sha because I encountered the libpq
symlink issue again with "-v219b87c"
2025-05-22 15:06:42 +02:00
Onur Tirtir 1cb2462818 Fix unsafe memory access in citus_unmark_object_distributed() (#7985)
_Since we've never released a Citus release that contains the commit
that introduced this bug (see #7461), we don't need to have a
DESCRIPTION line that shows up in release changelog._

From 8 valgrind test targets run for release-13.1 with PG 17.5, we got
1344 stack traces and except one of them, they were all about below
unsafe memory access because this is a very hot code-path that we
execute via our drop trigger.

On main, even `make -C src/test/regress/ check-base-vg` dumps this stack
trace with PG 16/17 to src/test/regress/citus_valgrind_test_log.txt when
executing "multi_cluster_management", and this is not the case with this
PR anymore.

```c
==27337== VALGRINDERROR-BEGIN
==27337== Conditional jump or move depends on uninitialised value(s)
==27337==    at 0x7E26B68: citus_unmark_object_distributed (home/onurctirtir/citus/src/backend/distributed/metadata/distobject.c:113)
==27337==    by 0x7E26CC7: master_unmark_object_distributed (home/onurctirtir/citus/src/backend/distributed/metadata/distobject.c:153)
==27337==    by 0x4BD852: ExecInterpExpr (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/execExprInterp.c:758)
==27337==    by 0x4BFD00: ExecInterpExprStillValid (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/execExprInterp.c:1870)
==27337==    by 0x51D82C: ExecEvalExprSwitchContext (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/../../../src/include/executor/executor.h:355)
==27337==    by 0x51D8A4: ExecProject (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/../../../src/include/executor/executor.h:389)
==27337==    by 0x51DADB: ExecResult (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/nodeResult.c:136)
==27337==    by 0x4D72ED: ExecProcNodeFirst (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/execProcnode.c:464)
==27337==    by 0x4CA394: ExecProcNode (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/../../../src/include/executor/executor.h:273)
==27337==    by 0x4CD34C: ExecutePlan (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/execMain.c:1670)
==27337==    by 0x4CAA7C: standard_ExecutorRun (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/execMain.c:365)
==27337==    by 0x7E1E475: CitusExecutorRun (home/onurctirtir/citus/src/backend/distributed/executor/multi_executor.c:238)
==27337==  Uninitialised value was created by a heap allocation
==27337==    at 0x4848899: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==27337==    by 0x9AB1F7: AllocSetContextCreateInternal (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/utils/mmgr/aset.c:438)
==27337==    by 0x4E0D56: CreateExprContextInternal (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/execUtils.c:261)
==27337==    by 0x4E0E3E: CreateExprContext (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/execUtils.c:311)
==27337==    by 0x4E10D9: ExecAssignExprContext (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/execUtils.c:490)
==27337==    by 0x51EE09: ExecInitSeqScan (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/nodeSeqscan.c:147)
==27337==    by 0x4D6CE1: ExecInitNode (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/execProcnode.c:210)
==27337==    by 0x5243C7: ExecInitSubqueryScan (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/nodeSubqueryscan.c:126)
==27337==    by 0x4D6DD9: ExecInitNode (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/execProcnode.c:250)
==27337==    by 0x4F05B2: ExecInitAppend (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/nodeAppend.c:223)
==27337==    by 0x4D6C46: ExecInitNode (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/execProcnode.c:182)
==27337==    by 0x52003D: ExecInitSetOp (home/onurctirtir/.pgenv/src/postgresql-16.2/src/backend/executor/nodeSetOp.c:530)
==27337== 
==27337== VALGRINDERROR-END
```
2025-05-20 17:49:14 +03:00
Alper Kocatas 662628fe7d Add citus_nodes view (#7968)
DESCRIPTION: Adds `citus_nodes` view that displays the node name, port,
role, and "active" for nodes in the cluster.

This PR adds `citus_nodes` view to the `pg_catalog` schema. The
`citus_nodes` view is created in the `citus` schema and is used to
display the node name, port, role, and active status of each node in the
`pg_dist_node` table.

The view is granted `SELECT` permission to the `PUBLIC` role and is set
to the `pg_catalog` schema.

Test cases was added to `multi_cluster_management` tests. 

structs.py was modified to add white spaces as `citus_indent` required.

---------

Co-authored-by: Alper Kocatas <alperkocatas@microsoft.com>
2025-05-20 17:49:14 +03:00
43 changed files with 2116 additions and 94 deletions

View File

@ -73,7 +73,7 @@ USER citus
# build postgres versions separately for effective parrallelism and caching of already built versions when changing only certain versions
FROM base AS pg15
RUN MAKEFLAGS="-j $(nproc)" pgenv build 15.12
RUN MAKEFLAGS="-j $(nproc)" pgenv build 15.14
RUN rm .pgenv/src/*.tar*
RUN make -C .pgenv/src/postgresql-*/ clean
RUN make -C .pgenv/src/postgresql-*/src/include install
@ -85,7 +85,7 @@ RUN cp -r .pgenv/src .pgenv/pgsql-* .pgenv/config .pgenv-staging/
RUN rm .pgenv-staging/config/default.conf
FROM base AS pg16
RUN MAKEFLAGS="-j $(nproc)" pgenv build 16.8
RUN MAKEFLAGS="-j $(nproc)" pgenv build 16.10
RUN rm .pgenv/src/*.tar*
RUN make -C .pgenv/src/postgresql-*/ clean
RUN make -C .pgenv/src/postgresql-*/src/include install
@ -97,7 +97,7 @@ RUN cp -r .pgenv/src .pgenv/pgsql-* .pgenv/config .pgenv-staging/
RUN rm .pgenv-staging/config/default.conf
FROM base AS pg17
RUN MAKEFLAGS="-j $(nproc)" pgenv build 17.4
RUN MAKEFLAGS="-j $(nproc)" pgenv build 17.6
RUN rm .pgenv/src/*.tar*
RUN make -C .pgenv/src/postgresql-*/ clean
RUN make -C .pgenv/src/postgresql-*/src/include install
@ -216,7 +216,7 @@ COPY --chown=citus:citus .psqlrc .
RUN sudo chown --from=root:root citus:citus -R ~
# sets default pg version
RUN pgenv switch 17.4
RUN pgenv switch 17.6
# make connecting to the coordinator easy
ENV PGPORT=9700

View File

@ -31,12 +31,12 @@ jobs:
pgupgrade_image_name: "ghcr.io/citusdata/pgupgradetester"
style_checker_image_name: "ghcr.io/citusdata/stylechecker"
style_checker_tools_version: "0.8.18"
sql_snapshot_pg_version: "17.4"
image_suffix: "-veab367a"
pg15_version: '{ "major": "15", "full": "15.12" }'
pg16_version: '{ "major": "16", "full": "16.8" }'
pg17_version: '{ "major": "17", "full": "17.4" }'
upgrade_pg_versions: "15.12-16.8-17.4"
sql_snapshot_pg_version: "17.6"
image_suffix: "-v4df94a0"
pg15_version: '{ "major": "15", "full": "15.14" }'
pg16_version: '{ "major": "16", "full": "16.10" }'
pg17_version: '{ "major": "17", "full": "17.6" }'
upgrade_pg_versions: "15.14-16.10-17.6"
steps:
# Since GHA jobs need at least one step we use a noop step here.
- name: Set up parameters

View File

@ -24,7 +24,7 @@ jobs:
uses: actions/checkout@v4
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
@ -76,4 +76,4 @@ jobs:
sudo make install-all
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2
uses: github/codeql-action/analyze@v3

View File

@ -1,3 +1,145 @@
### citus v13.1.1 (Oct 1st, 2025) ###
* Adds support for latest PG minors: 14.19, 15.14, 16.10 (#8142)
* Fixes an assertion failure when an expression in the query references
a CTE (#8106)
* Fixes a bug that causes an unexpected error when executing
repartitioned MERGE (#8201)
* Fixes a bug that causes allowing UPDATE / MERGE queries that may
change the distribution column value (#8214)
* Updates dynamic_library_path automatically when CDC is enabled (#8025)
### citus v13.1.0 (May 30th, 2025) ###
* Adds `citus_stat_counters` view that can be used to query
stat counters that Citus collects while the feature is enabled, which is
controlled by citus.enable_stat_counters. `citus_stat_counters()` can be
used to query the stat counters for the provided database oid and
`citus_stat_counters_reset()` can be used to reset them for the provided
database oid or for the current database if nothing or 0 is provided (#7917)
* Adds `citus_nodes` view that displays the node name, port role, and "active"
for nodes in the cluster (#7968)
* Adds `citus_is_primary_node()` UDF to determine if the current node is a
primary node in the cluster (#7720)
* Adds support for propagating `GRANT/REVOKE` rights on table columns (#7918)
* Adds support for propagating `REASSIGN OWNED BY` commands (#7319)
* Adds support for propagating `CREATE`/`DROP` database from all nodes (#7240,
#7253, #7359)
* Propagates `SECURITY LABEL ON ROLE` statement from any node (#7508)
* Adds support for issuing role management commands from worker nodes (#7278)
* Adds support for propagating `ALTER USER RENAME` commands (#7204)
* Adds support for propagating `ALTER DATABASE <db_name> SET ..` commands
(#7181)
* Adds support for propagating `SECURITY LABEL` on tables and columns (#7956)
* Adds support for propagating `COMMENT ON <database>/<role>` commands (#7388)
* Moves some of the internal citus functions from `pg_catalog` to
`citus_internal` schema (#7473, #7470, #7466, 7456, 7450)
* Adjusts `max_prepared_transactions` only when it's set to default on PG >= 16
(#7712)
* Adds skip_qualify_public param to shard_name() UDF to allow qualifying for
"public" schema when needed (#8014)
* Allows `citus_*_size` on indexes on a distributed tables (#7271)
* Allows `GRANT ADMIN` to now also be `INHERIT` or `SET` in support of PG16
* Makes sure `worker_copy_table_to_node` errors out with Citus tables (#7662)
* Adds information to explain output when using
`citus.explain_distributed_queries=false` (#7412)
* Logs username in the failed connection message (#7432)
* Makes sure to avoid incorrectly pushing-down the outer joins between
distributed tables and recurring relations (like reference tables, local
tables and `VALUES(..)` etc.) prior to PG 17 (#7937)
* Prevents incorrectly pushing `nextval()` call down to workers to avoid using
incorrect sequence value for some types of `INSERT .. SELECT`s (#7976)
* Makes sure to prevent `INSERT INTO ... SELECT` queries involving subfield or
sublink, to avoid crashes (#7912)
* Makes sure to take improvement_threshold into the account
in `citus_add_rebalance_strategy()` (#7247)
* Makes sure to disallow creating a replicated distributed
table concurrently (#7219)
* Fixes a bug that causes omitting `CASCADE` clause for the commands sent to
workers for `REVOKE` commands on tables (#7958)
* Fixes an issue detected using address sanitizer (#7948, #7949)
* Fixes a bug in deparsing of shard query in case of "output-table column" name
conflict (#7932)
* Fixes a crash in columnar custom scan that happens when a columnar table is
used in a join (#7703)
* Fixes `MERGE` command when insert value does not have source distributed
column (#7627)
* Fixes performance issue when using `\d tablename` on a server with many
tables (#7577)
* Fixes performance issue in `GetForeignKeyOids` on systems with many
constraints (#7580)
* Fixes performance issue when distributing a table that depends on an
extension (#7574)
* Fixes performance issue when creating distributed tables if many already
exist (#7575)
* Fixes a crash caused by some form of `ALTER TABLE ADD COLUMN` statements. When
adding multiple columns, if one of the `ADD COLUMN` statements contains a
`FOREIGN` constraint ommitting the referenced
columns in the statement, a `SEGFAULT` occurs (#7522)
* Fixes assertion failure in maintenance daemon during Citus upgrades (#7537)
* Fixes segmentation fault when using `CASE WHEN` in `DO` block functions
(#7554)
* Fixes undefined behavior in `master_disable_node` due to argument mismatch
(#7492)
* Fixes incorrect propagating of `GRANTED BY` and `CASCADE/RESTRICT` clauses
for `REVOKE` statements (#7451)
* Fixes the incorrect column count after `ALTER TABLE` (#7379)
* Fixes timeout when underlying socket is changed for an inter-node connection
(#7377)
* Fixes memory leaks (#7441, #7440)
* Fixes leaking of memory and memory contexts when tracking foreign keys between
Citus tables (#7236)
* Fixes a potential segfault for background rebalancer (#7694)
* Fixes potential `NULL` dereference in casual clocks (#7704)
### citus v13.0.3 (March 20th, 2025) ###
* Fixes a version bump issue in 13.0.2
@ -100,9 +242,8 @@
* Allows overwriting host name for all inter-node connections by
supporting "host" parameter in citus.node_conninfo (#7541)
* Changes the order in which the locks are acquired for the target and
reference tables, when a modify request is initiated from a worker
node that is not the "FirstWorkerNode" (#7542)
* Avoids distributed deadlocks by changing the order in which the locks are
acquired for the target and reference tables (#7542)
* Fixes a performance issue when distributing a table that depends on an
extension (#7574)

18
configure vendored
View File

@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for Citus 13.1devel.
# Generated by GNU Autoconf 2.69 for Citus 13.1.1.
#
#
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@ -579,8 +579,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='Citus'
PACKAGE_TARNAME='citus'
PACKAGE_VERSION='13.1devel'
PACKAGE_STRING='Citus 13.1devel'
PACKAGE_VERSION='13.1.1'
PACKAGE_STRING='Citus 13.1.1'
PACKAGE_BUGREPORT=''
PACKAGE_URL=''
@ -1262,7 +1262,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
\`configure' configures Citus 13.1devel to adapt to many kinds of systems.
\`configure' configures Citus 13.1.1 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1324,7 +1324,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
short | recursive ) echo "Configuration of Citus 13.1devel:";;
short | recursive ) echo "Configuration of Citus 13.1.1:";;
esac
cat <<\_ACEOF
@ -1429,7 +1429,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
Citus configure 13.1devel
Citus configure 13.1.1
generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc.
@ -1912,7 +1912,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
It was created by Citus $as_me 13.1devel, which was
It was created by Citus $as_me 13.1.1, which was
generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@
@ -5393,7 +5393,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
This file was extended by Citus $as_me 13.1devel, which was
This file was extended by Citus $as_me 13.1.1, which was
generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@ -5455,7 +5455,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\
Citus config.status 13.1devel
Citus config.status 13.1.1
configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\"

View File

@ -5,7 +5,7 @@
# everyone needing autoconf installed, the resulting files are checked
# into the SCM.
AC_INIT([Citus], [13.1devel])
AC_INIT([Citus], [13.1.1])
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
# we'll need sed and awk for some of the version commands

View File

@ -656,7 +656,9 @@ SaveStripeSkipList(RelFileLocator relfilelocator, uint64 stripe,
nulls[Anum_columnar_chunk_maximum_value - 1] = true;
}
PushActiveSnapshot(GetTransactionSnapshot());
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
PopActiveSnapshot();
}
}

View File

@ -109,13 +109,20 @@ citus_unmark_object_distributed(PG_FUNCTION_ARGS)
Oid classid = PG_GETARG_OID(0);
Oid objid = PG_GETARG_OID(1);
int32 objsubid = PG_GETARG_INT32(2);
/*
* SQL function master_unmark_object_distributed doesn't expect the
* 4th argument but SQL function citus_unmark_object_distributed does
* so as checkobjectexistence argument. For this reason, we try to
* get the 4th argument only if this C function is called with 4
* arguments.
*/
bool checkObjectExistence = true;
if (!PG_ARGISNULL(3))
if (PG_NARGS() == 4)
{
checkObjectExistence = PG_GETARG_BOOL(3);
}
ObjectAddress address = { 0 };
ObjectAddressSubSet(address, classid, objid, objsubid);

View File

@ -2930,7 +2930,9 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
PushActiveSnapshot(GetTransactionSnapshot());
CatalogTupleInsert(pgDistNode, heapTuple);
PopActiveSnapshot();
CitusInvalidateRelcacheByRelid(DistNodeRelationId());

View File

@ -41,6 +41,7 @@
static int SourceResultPartitionColumnIndex(Query *mergeQuery,
List *sourceTargetList,
CitusTableCacheEntry *targetRelation);
static int FindTargetListEntryWithVarExprAttno(List *targetList, AttrNumber varattno);
static Var * ValidateAndReturnVarIfSupported(Node *entryExpr);
static DeferredErrorMessage * DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
PlannerRestrictionContext *
@ -628,6 +629,22 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, Query *query,
}
}
/*
* joinTree->quals, retrieved by GetMergeJoinTree() - either from
* mergeJoinCondition (PG >= 17) or jointree->quals (PG < 17),
* only contains the quals that present in "ON (..)" clause. Action
* quals that can be specified for each specific action, as in
* "WHEN <match condition> AND <action quals> THEN <action>"", are
* saved into "qual" field of the corresponding action's entry in
* mergeActionList, see
* https://github.com/postgres/postgres/blob/e6da68a6e1d60a037b63a9c9ed36e5ef0a996769/src/backend/parser/parse_merge.c#L285-L293.
*
* For this reason, even if TargetEntryChangesValue() could prove that
* an action's quals ensure that the action cannot change the distribution
* key, this is not the case as we don't provide action quals to
* TargetEntryChangesValue(), but just joinTree, which only contains
* the "ON (..)" clause quals.
*/
if (targetEntryDistributionColumn &&
TargetEntryChangesValue(targetEntry, distributionColumn, joinTree))
{
@ -1410,7 +1427,8 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
Assert(sourceRepartitionVar);
int sourceResultRepartitionColumnIndex =
DistributionColumnIndex(sourceTargetList, sourceRepartitionVar);
FindTargetListEntryWithVarExprAttno(sourceTargetList,
sourceRepartitionVar->varattno);
if (sourceResultRepartitionColumnIndex == -1)
{
@ -1561,6 +1579,33 @@ FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query)
}
/*
* FindTargetListEntryWithVarExprAttno finds the index of the target
* entry whose expr is a Var that points to input varattno.
*
* If no such target entry is found, it returns -1.
*/
static int
FindTargetListEntryWithVarExprAttno(List *targetList, AttrNumber varattno)
{
int targetEntryIndex = 0;
TargetEntry *targetEntry = NULL;
foreach_declared_ptr(targetEntry, targetList)
{
if (IsA(targetEntry->expr, Var) &&
((Var *) targetEntry->expr)->varattno == varattno)
{
return targetEntryIndex;
}
targetEntryIndex++;
}
return -1;
}
/*
* IsLocalTableModification returns true if the table modified is a Postgres table.
* We do not support recursive planning for MERGE yet, so we could have a join

View File

@ -4560,11 +4560,10 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
else if (rangeTableEntry->rtekind == RTE_CTE)
{
/*
* When outerVars are considered, we modify parentQueryList, so this
* logic might need to change when we support outervars in CTEs.
* Resolve through a CTE even when skipOuterVars == false.
* Maintain the invariant that each recursion level owns a private,
* correctly-bounded copy of parentQueryList.
*/
Assert(skipOuterVars);
int cteParentListIndex = list_length(parentQueryList) -
rangeTableEntry->ctelevelsup - 1;
Query *cteParentQuery = NULL;
@ -4595,14 +4594,34 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
if (cte != NULL)
{
Query *cteQuery = (Query *) cte->ctequery;
List *targetEntryList = cteQuery->targetList;
AttrNumber targetEntryIndex = candidateColumn->varattno - 1;
TargetEntry *targetEntry = list_nth(targetEntryList, targetEntryIndex);
parentQueryList = lappend(parentQueryList, query);
FindReferencedTableColumn(targetEntry->expr, parentQueryList,
cteQuery, column, rteContainingReferencedColumn,
skipOuterVars);
if (targetEntryIndex >= 0 &&
targetEntryIndex < list_length(cteQuery->targetList))
{
TargetEntry *targetEntry =
list_nth(cteQuery->targetList, targetEntryIndex);
/* Build a private, bounded parentQueryList before recursing into the CTE.
* Invariant: list is [top … current], owned by this call (no aliasing).
* For RTE_CTE:
* owner_idx = list_length(parentQueryList) - rangeTableEntry->ctelevelsup - 1;
* newParent = lappend(list_truncate(list_copy(parentQueryList), owner_idx + 1), query);
* Example (Q0 owns CTE; we’re in Q2 via nested subquery):
* parent=[Q0,Q1,Q2], ctelevelsup=2 ⇒ owner_idx=0 ⇒ newParent=[Q0,Q2].
* Keeps outer-Var level math correct without mutating the caller’s list.
*/
List *newParent = list_copy(parentQueryList);
newParent = list_truncate(newParent, cteParentListIndex + 1);
newParent = lappend(newParent, query);
FindReferencedTableColumn(targetEntry->expr,
newParent,
cteQuery,
column,
rteContainingReferencedColumn,
skipOuterVars);
}
}
}
}

View File

@ -3077,16 +3077,25 @@ BuildBaseConstraint(Var *column)
/*
* MakeOpExpression builds an operator expression node. This operator expression
* implements the operator clause as defined by the variable and the strategy
* number.
* MakeOpExpressionExtended builds an operator expression node that's of
* the form "Var <op> Expr", where, Expr must either be a Const or a Var
* (*1).
*
* This operator expression implements the operator clause as defined by
* the variable and the strategy number.
*/
OpExpr *
MakeOpExpression(Var *variable, int16 strategyNumber)
MakeOpExpressionExtended(Var *leftVar, Expr *rightArg, int16 strategyNumber)
{
Oid typeId = variable->vartype;
Oid typeModId = variable->vartypmod;
Oid collationId = variable->varcollid;
/*
* Other types of expressions are probably also fine to be used, but
* none of the callers need support for them for now, so we haven't
* tested them (*1).
*/
Assert(IsA(rightArg, Const) || IsA(rightArg, Var));
Oid typeId = leftVar->vartype;
Oid collationId = leftVar->varcollid;
Oid accessMethodId = BTREE_AM_OID;
@ -3104,18 +3113,16 @@ MakeOpExpression(Var *variable, int16 strategyNumber)
*/
if (operatorClassInputType != typeId && typeType != TYPTYPE_PSEUDO)
{
variable = (Var *) makeRelabelType((Expr *) variable, operatorClassInputType,
-1, collationId, COERCE_IMPLICIT_CAST);
leftVar = (Var *) makeRelabelType((Expr *) leftVar, operatorClassInputType,
-1, collationId, COERCE_IMPLICIT_CAST);
}
Const *constantValue = makeNullConst(operatorClassInputType, typeModId, collationId);
/* Now make the expression with the given variable and a null constant */
OpExpr *expression = (OpExpr *) make_opclause(operatorId,
InvalidOid, /* no result type yet */
false, /* no return set */
(Expr *) variable,
(Expr *) constantValue,
(Expr *) leftVar,
rightArg,
InvalidOid, collationId);
/* Set implementing function id and result type */
@ -3126,6 +3133,31 @@ MakeOpExpression(Var *variable, int16 strategyNumber)
}
/*
* MakeOpExpression is a wrapper around MakeOpExpressionExtended
* that creates a null constant of the appropriate type for right
* hand side operator class input type. As a result, it builds an
* operator expression node that's of the form "Var <op> NULL".
*/
OpExpr *
MakeOpExpression(Var *leftVar, int16 strategyNumber)
{
Oid typeId = leftVar->vartype;
Oid typeModId = leftVar->vartypmod;
Oid collationId = leftVar->varcollid;
Oid accessMethodId = BTREE_AM_OID;
OperatorCacheEntry *operatorCacheEntry = LookupOperatorByType(typeId, accessMethodId,
strategyNumber);
Oid operatorClassInputType = operatorCacheEntry->operatorClassInputType;
Const *constantValue = makeNullConst(operatorClassInputType, typeModId, collationId);
return MakeOpExpressionExtended(leftVar, (Expr *) constantValue, strategyNumber);
}
/*
* LookupOperatorByType is a wrapper around GetOperatorByType(),
* operatorClassInputType() and get_typtype() functions that uses a cache to avoid

View File

@ -1604,10 +1604,19 @@ MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context)
/*
* TargetEntryChangesValue determines whether the given target entry may
* change the value in a given column, given a join tree. The result is
* true unless the expression refers directly to the column, or the
* expression is a value that is implied by the qualifiers of the join
* tree, or the target entry sets a different column.
* change the value given a column and a join tree.
*
* The function assumes that the "targetEntry" references given "column"
* Var via its "resname" and is used as part of a modify query. This means
* that, for example, for an update query, the input "targetEntry" constructs
* the following assignment operation as part of the SET clause:
* "col_a = expr_a ", where, "col_a" refers to input "column" Var (via
* "resname") as per the assumption written above. And we want to understand
* if "expr_a" (which is pointed to by targetEntry->expr) refers directly to
* the "column" Var, or "expr_a" is a value that is implied to be equal
* to "column" Var by the qualifiers of the join tree. If so, we know that
* the value of "col_a" effectively cannot be changed by this assignment
* operation.
*/
bool
TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree)
@ -1618,11 +1627,36 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
if (IsA(setExpr, Var))
{
Var *newValue = (Var *) setExpr;
if (newValue->varattno == column->varattno)
if (column->varno == newValue->varno &&
column->varattno == newValue->varattno)
{
/* target entry of the form SET col = table.col */
/*
* Target entry is of the form "SET col_a = foo.col_b",
* where foo also points to the same range table entry
* and col_a and col_b are the same. So, effectively
* they're literally referring to the same column.
*/
isColumnValueChanged = false;
}
else
{
List *restrictClauseList = WhereClauseList(joinTree);
OpExpr *equalityExpr = MakeOpExpressionExtended(column, (Expr *) newValue,
BTEqualStrategyNumber);
bool predicateIsImplied = predicate_implied_by(list_make1(equalityExpr),
restrictClauseList, false);
if (predicateIsImplied)
{
/*
* Target entry is of the form
* "SET col_a = foo.col_b WHERE col_a = foo.col_b (AND (...))",
* where foo points to a different relation or it points
* to the same relation but col_a is not the same column as col_b.
*/
isColumnValueChanged = false;
}
}
}
else if (IsA(setExpr, Const))
{
@ -1643,7 +1677,10 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
restrictClauseList, false);
if (predicateIsImplied)
{
/* target entry of the form SET col = <x> WHERE col = <x> AND ... */
/*
* Target entry is of the form
* "SET col_a = const_a WHERE col_a = const_a (AND (...))".
*/
isColumnValueChanged = false;
}
}
@ -2245,6 +2282,18 @@ SelectsFromDistributedTable(List *rangeTableList, Query *query)
continue;
}
#if PG_VERSION_NUM >= 150013 && PG_VERSION_NUM < PG_VERSION_16
if (rangeTableEntry->rtekind == RTE_SUBQUERY && rangeTableEntry->relkind == 0)
{
/*
* In PG15.13 commit https://github.com/postgres/postgres/commit/317aba70e
* relid is retained when converting views to subqueries,
* so we need an extra check identifying those views
*/
continue;
}
#endif
if (rangeTableEntry->relkind == RELKIND_VIEW ||
rangeTableEntry->relkind == RELKIND_MATVIEW)
{

View File

@ -962,6 +962,7 @@ shard_name(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
int64 shardId = PG_GETARG_INT64(1);
bool skipQualifyPublic = PG_GETARG_BOOL(2);
char *qualifiedName = NULL;
@ -991,7 +992,7 @@ shard_name(PG_FUNCTION_ARGS)
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
if (strncmp(schemaName, "public", NAMEDATALEN) == 0)
if (skipQualifyPublic && strncmp(schemaName, "public", NAMEDATALEN) == 0)
{
qualifiedName = (char *) quote_identifier(relationName);
}

View File

@ -215,6 +215,7 @@ static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSour
static void CitusAuthHook(Port *port, int status);
static bool IsSuperuser(char *userName);
static void AdjustDynamicLibraryPathForCdcDecoders(void);
static void EnableChangeDataCaptureAssignHook(bool newval, void *extra);
static ClientAuthentication_hook_type original_client_auth_hook = NULL;
static emit_log_hook_type original_emit_log_hook = NULL;
@ -1272,7 +1273,7 @@ RegisterCitusConfigVariables(void)
false,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
NULL, EnableChangeDataCaptureAssignHook, NULL);
DefineCustomBoolVariable(
"citus.enable_cluster_clock",
@ -3272,3 +3273,19 @@ CitusObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId, int su
SetCreateCitusTransactionLevel(GetCurrentTransactionNestLevel());
}
}
/*
* EnableChangeDataCaptureAssignHook is called whenever the
* citus.enable_change_data_capture setting is changed to dynamically
* adjust the dynamic_library_path based on the new value.
*/
static void
EnableChangeDataCaptureAssignHook(bool newval, void *extra)
{
if (newval)
{
/* CDC enabled: add citus_decoders to the path */
AdjustDynamicLibraryPathForCdcDecoders();
}
}

View File

@ -50,3 +50,11 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "udfs/citus_is_primary_node/13.1-1.sql"
#include "udfs/citus_stat_counters/13.1-1.sql"
#include "udfs/citus_stat_counters_reset/13.1-1.sql"
#include "udfs/citus_nodes/13.1-1.sql"
-- Since shard_name/13.1-1.sql first drops the function and then creates it, we first
-- need to drop citus_shards view since that view depends on this function. And immediately
-- after creating the function, we recreate citus_shards view again.
DROP VIEW pg_catalog.citus_shards;
#include "udfs/shard_name/13.1-1.sql"
#include "udfs/citus_shards/12.0-1.sql"

View File

@ -45,3 +45,25 @@ DROP FUNCTION citus_internal.is_replication_origin_tracking_active();
DROP VIEW pg_catalog.citus_stat_counters;
DROP FUNCTION pg_catalog.citus_stat_counters(oid);
DROP FUNCTION pg_catalog.citus_stat_counters_reset(oid);
DROP VIEW IF EXISTS pg_catalog.citus_nodes;
-- Definition of shard_name() prior to this release doesn't have a separate SQL file
-- because it's quite an old UDF that its prior definition(s) was(were) squashed into
-- citus--8.0-1.sql. For this reason, to downgrade it, here we directly execute its old
-- definition instead of including it from such a separate file.
--
-- And before dropping and creating the function, we also need to drop citus_shards view
-- since it depends on it. And immediately after creating the function, we recreate
-- citus_shards view again.
DROP VIEW pg_catalog.citus_shards;
DROP FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint, skip_qualify_public boolean);
CREATE FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint)
RETURNS text
LANGUAGE C STABLE STRICT
AS 'MODULE_PATHNAME', $$shard_name$$;
COMMENT ON FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint)
IS 'returns schema-qualified, shard-extended identifier of object name';
#include "../udfs/citus_shards/12.0-1.sql"

View File

@ -0,0 +1,18 @@
SET search_path = 'pg_catalog';
DROP VIEW IF EXISTS pg_catalog.citus_nodes;
CREATE OR REPLACE VIEW citus.citus_nodes AS
SELECT
nodename,
nodeport,
CASE
WHEN groupid = 0 THEN 'coordinator'
ELSE 'worker'
END AS role,
isactive AS active
FROM pg_dist_node;
ALTER VIEW citus.citus_nodes SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_nodes TO PUBLIC;
RESET search_path;

View File

@ -0,0 +1,18 @@
SET search_path = 'pg_catalog';
DROP VIEW IF EXISTS pg_catalog.citus_nodes;
CREATE OR REPLACE VIEW citus.citus_nodes AS
SELECT
nodename,
nodeport,
CASE
WHEN groupid = 0 THEN 'coordinator'
ELSE 'worker'
END AS role,
isactive AS active
FROM pg_dist_node;
ALTER VIEW citus.citus_nodes SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_nodes TO PUBLIC;
RESET search_path;

View File

@ -0,0 +1,8 @@
-- skip_qualify_public is set to true by default just for backward compatibility
DROP FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint);
CREATE FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint, skip_qualify_public boolean DEFAULT true)
RETURNS text
LANGUAGE C STABLE STRICT
AS 'MODULE_PATHNAME', $$shard_name$$;
COMMENT ON FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint, skip_qualify_public boolean)
IS 'returns schema-qualified, shard-extended identifier of object name';

View File

@ -0,0 +1,8 @@
-- skip_qualify_public is set to true by default just for backward compatibility
DROP FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint);
CREATE FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint, skip_qualify_public boolean DEFAULT true)
RETURNS text
LANGUAGE C STABLE STRICT
AS 'MODULE_PATHNAME', $$shard_name$$;
COMMENT ON FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint, skip_qualify_public boolean)
IS 'returns schema-qualified, shard-extended identifier of object name';

View File

@ -106,7 +106,9 @@ LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId out
TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction);
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
PushActiveSnapshot(GetTransactionSnapshot());
CatalogTupleInsert(pgDistTransaction, heapTuple);
PopActiveSnapshot();
CommandCounterIncrement();

View File

@ -549,7 +549,8 @@ extern DistributedPlan * CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree,
plannerRestrictionContext);
extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
char *queryString);
extern OpExpr * MakeOpExpressionExtended(Var *leftVar, Expr *rightArg,
int16 strategyNumber);
extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
extern Node * WrapUngroupedVarsInAnyValueAggregate(Node *expression,
List *groupClauseList,

View File

@ -0,0 +1,81 @@
-- Test for CDC library path adjustment functionality
-- This test verifies that the AdjustDynamicLibraryPathForCdcDecoders function with superuser privileges
-- correctly modifies the dynamic_library_path when CDC is enabled
-- Test 1: Show initial state and reset to ensure clean state
SHOW dynamic_library_path;
dynamic_library_path
---------------------------------------------------------------------
$libdir
(1 row)
SHOW citus.enable_change_data_capture;
citus.enable_change_data_capture
---------------------------------------------------------------------
off
(1 row)
-- Test 2: Enable CDC and verify path is set to include citus_decoders
SET citus.enable_change_data_capture = true;
SHOW dynamic_library_path;
dynamic_library_path
---------------------------------------------------------------------
$libdir/citus_decoders:$libdir
(1 row)
-- Verify that the dynamic_library_path has been modified to include citus_decoders
SELECT
CASE
WHEN current_setting('dynamic_library_path') LIKE '%citus_decoders%'
THEN 'CDC path correctly set'
ELSE 'CDC path incorrectly not set'
END AS cdc_path_status;
cdc_path_status
---------------------------------------------------------------------
CDC path correctly set
(1 row)
-- Test 3: Disable CDC and verify path remains (CDC doesn't remove the path)
SET citus.enable_change_data_capture = false;
SHOW dynamic_library_path;
dynamic_library_path
---------------------------------------------------------------------
$libdir/citus_decoders:$libdir
(1 row)
-- Test 4: Edge case - function should only work when path is exactly "$libdir"
SET dynamic_library_path = '$libdir/other_path:$libdir';
SET citus.enable_change_data_capture = true;
SHOW dynamic_library_path;
dynamic_library_path
---------------------------------------------------------------------
$libdir/other_path:$libdir
(1 row)
-- Verify that path is unchanged with custom library path
SELECT
CASE
WHEN current_setting('dynamic_library_path') LIKE '%citus_decoders%'
THEN 'CDC path incorrectly set'
ELSE 'CDC path correctly not set'
END AS custom_path_test;
custom_path_test
---------------------------------------------------------------------
CDC path correctly not set
(1 row)
-- Reset dynamic_library_path to default
RESET dynamic_library_path;
RESET citus.enable_change_data_capture;
-- Test 5: Verify that dynamic_library_path reset_val is overridden to $libdir/citus_decoders:$libdir
SHOW dynamic_library_path;
dynamic_library_path
---------------------------------------------------------------------
$libdir/citus_decoders:$libdir
(1 row)
SHOW citus.enable_change_data_capture;
citus.enable_change_data_capture
---------------------------------------------------------------------
off
(1 row)

View File

@ -33,5 +33,33 @@ SELECT * FROM citus_shards;
t1 | 99456903 | citus_shards.t1_99456903 | distributed | 456900 | localhost | 57638 | 8192
(8 rows)
SET search_path TO public;
CREATE TABLE t3 (i int);
SELECT citus_add_local_table_to_metadata('t3');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
SELECT shard_name('t3', shardid) FROM pg_dist_shard WHERE logicalrelid = 't3'::regclass;
shard_name
---------------------------------------------------------------------
t3_99456908
(1 row)
SELECT shard_name('t3', shardid, true) FROM pg_dist_shard WHERE logicalrelid = 't3'::regclass;
shard_name
---------------------------------------------------------------------
t3_99456908
(1 row)
SELECT shard_name('t3', shardid, false) FROM pg_dist_shard WHERE logicalrelid = 't3'::regclass;
shard_name
---------------------------------------------------------------------
public.t3_99456908
(1 row)
DROP TABLE t3;
SET search_path TO citus_shards;
SET client_min_messages TO WARNING;
DROP SCHEMA citus_shards CASCADE;

View File

@ -193,13 +193,148 @@ SQL function "compare_data" statement 2
(1 row)
---- https://github.com/citusdata/citus/issues/8180 ----
CREATE TABLE dist_1 (a int, b int, c int);
CREATE TABLE dist_2 (a int, b int, c int);
CREATE TABLE dist_different_order_1 (b int, a int, c int);
SELECT create_distributed_table('dist_1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_2', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_different_order_1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
MERGE INTO dist_1
USING dist_2
ON (dist_1.a = dist_2.b)
WHEN MATCHED THEN UPDATE SET b = dist_2.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET b = src.b;
MERGE INTO dist_different_order_1
USING dist_1
ON (dist_different_order_1.a = dist_1.b)
WHEN MATCHED THEN UPDATE SET b = dist_1.b;
CREATE TABLE dist_1_cast (a int, b int);
CREATE TABLE dist_2_cast (a int, b numeric);
SELECT create_distributed_table('dist_1_cast', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_2_cast', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
MERGE INTO dist_1_cast
USING dist_2_cast
ON (dist_1_cast.a = dist_2_cast.b)
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
ERROR: In the MERGE ON clause, there is a datatype mismatch between target's distribution column and the expression originating from the source.
DETAIL: If the types are different, Citus uses different hash functions for the two column types, which might lead to incorrect repartitioning of the result data
MERGE INTO dist_1_cast
USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast
ON (dist_1_cast.a = dist_2_cast.b)
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
-- a more sophisticated example
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
SELECT create_distributed_table('dist_source', 'tstamp_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_target', 'int_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(1001, 2000) i;
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(901, 1000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1501, 2000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i-1,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1401, 1500) i;
INSERT INTO local_source SELECT * FROM dist_source;
INSERT INTO local_target SELECT * FROM dist_target;
-- execute the query on distributed tables
MERGE INTO dist_target target_alias
USING dist_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- execute the same query on local tables, everything is the same except table names behind the aliases
MERGE INTO local_target target_alias
USING local_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- compare both targets
SELECT COUNT(*) = 0 AS targets_match
FROM (
SELECT * FROM dist_target
EXCEPT
SELECT * FROM local_target
UNION ALL
SELECT * FROM local_target
EXCEPT
SELECT * FROM dist_target
) q;
targets_match
---------------------------------------------------------------------
t
(1 row)
SET client_min_messages TO WARNING;
DROP SCHEMA merge_repartition2_schema CASCADE;
NOTICE: drop cascades to 8 other objects
DETAIL: drop cascades to table pg_target
drop cascades to table pg_source
drop cascades to function cleanup_data()
drop cascades to function setup_data()
drop cascades to function check_data(text,text,text,text)
drop cascades to function compare_data()
drop cascades to table citus_target
drop cascades to table citus_source

View File

@ -394,9 +394,9 @@ DEBUG: Wrapping relation "mat_view_on_part_dist" "foo" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM mixed_relkind_tests.mat_view_on_part_dist foo WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE mixed_relkind_tests.partitioned_distributed_table SET a = foo.a FROM (SELECT foo_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) foo_1) foo WHERE (foo.a OPERATOR(pg_catalog.=) partitioned_distributed_table.a)
UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: modifying the partition value of rows is not allowed
UPDATE partitioned_distributed_table SET a = foo.a FROM distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: modifying the partition value of rows is not allowed
-- should work
UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a = partitioned_distributed_table.a;
UPDATE partitioned_distributed_table SET a = foo.a FROM view_on_part_dist AS foo WHERE foo.a = partitioned_distributed_table.a;

View File

@ -72,6 +72,45 @@ SELECT master_get_active_worker_nodes();
(localhost,57637)
(2 rows)
-- get all nodes
SELECT * from citus_nodes;
nodename | nodeport | role | active
---------------------------------------------------------------------
localhost | 57637 | worker | t
localhost | 57638 | worker | t
localhost | 57636 | coordinator | t
(3 rows)
-- get get active nodes
SELECT * from citus_nodes where active = 't';
nodename | nodeport | role | active
---------------------------------------------------------------------
localhost | 57637 | worker | t
localhost | 57638 | worker | t
localhost | 57636 | coordinator | t
(3 rows)
-- get coordinator nodes
SELECT * from citus_nodes where role = 'coordinator';
nodename | nodeport | role | active
---------------------------------------------------------------------
localhost | 57636 | coordinator | t
(1 row)
-- get worker nodes
SELECT * from citus_nodes where role = 'worker';
nodename | nodeport | role | active
---------------------------------------------------------------------
localhost | 57637 | worker | t
localhost | 57638 | worker | t
(2 rows)
-- get nodes with unknown role
SELECT * from citus_nodes where role = 'foo';
nodename | nodeport | role | active
---------------------------------------------------------------------
(0 rows)
-- try to add a node that is already in the cluster
SELECT * FROM master_add_node('localhost', :worker_1_port);
master_add_node
@ -126,6 +165,34 @@ SELECT master_get_active_worker_nodes();
(localhost,57637)
(1 row)
-- get get active nodes
SELECT * from citus_nodes where active = 't';
nodename | nodeport | role | active
---------------------------------------------------------------------
localhost | 57636 | coordinator | t
localhost | 57637 | worker | t
(2 rows)
-- get get inactive nodes
SELECT * from citus_nodes where active = 'f';
nodename | nodeport | role | active
---------------------------------------------------------------------
localhost | 57638 | worker | f
(1 row)
-- make sure non-superusers can access the view
CREATE ROLE normaluser;
SET ROLE normaluser;
SELECT * FROM citus_nodes;
nodename | nodeport | role | active
---------------------------------------------------------------------
localhost | 57636 | coordinator | t
localhost | 57638 | worker | f
localhost | 57637 | worker | t
(3 rows)
SET ROLE postgres;
DROP ROLE normaluser;
-- add some shard placements to the cluster
SET citus.shard_count TO 16;
SET citus.shard_replication_factor TO 1;

View File

@ -1455,6 +1455,7 @@ SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
function citus_unmark_object_distributed(oid,oid,integer) void |
function shard_name(regclass,bigint) text |
| function citus_internal.acquire_citus_advisory_object_class_lock(integer,cstring) void
| function citus_internal.add_colocation_metadata(integer,integer,integer,regtype,oid) void
| function citus_internal.add_object_metadata(text,text[],text[],integer,integer,boolean) void
@ -1483,15 +1484,17 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_stat_counters(oid) SETOF record
| function citus_stat_counters_reset(oid) void
| function citus_unmark_object_distributed(oid,oid,integer,boolean) void
| function shard_name(regclass,bigint,boolean) text
| view citus_nodes
| view citus_stat_counters
(30 rows)
(33 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version
SHOW citus.version;
citus.version
---------------------------------------------------------------------
13.1devel
13.1.1
(1 row)
-- ensure no unexpected objects were created outside pg_catalog

View File

@ -3637,5 +3637,157 @@ SELECT id, val FROM version_dist_union ORDER BY id;
(6 rows)
-- End of Issue #7784
-- PR #8106 — CTE traversal works when following outer Vars
-- This script exercises three shapes:
-- T1) CTE referenced inside a correlated subquery (one level down)
-- T2) CTE referenced inside a nested subquery (two levels down)
-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
CREATE SCHEMA pr8106_cte_outervar;
SET search_path = pr8106_cte_outervar, public;
-- Base tables for the tests
DROP TABLE IF EXISTS raw_events_first CASCADE;
NOTICE: table "raw_events_first" does not exist, skipping
DROP TABLE IF EXISTS agg_events CASCADE;
NOTICE: table "agg_events" does not exist, skipping
CREATE TABLE raw_events_first(
user_id int,
value_1 int
);
CREATE TABLE agg_events(
user_id int,
value_1_agg int
);
-- Distribute and colocate (distribution key = user_id)
SELECT create_distributed_table('raw_events_first', 'user_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('agg_events', 'user_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Seed data (duplicates on some user_ids; some NULL value_1’s)
INSERT INTO raw_events_first(user_id, value_1) VALUES
(1, 10), (1, 20), (1, NULL),
(2, NULL),
(3, 30),
(4, NULL),
(5, 50), (5, NULL),
(6, NULL);
---------------------------------------------------------------------
-- T1) CTE referenced inside a correlated subquery (one level down)
---------------------------------------------------------------------
TRUNCATE agg_events;
WITH c AS MATERIALIZED (
SELECT user_id FROM raw_events_first
)
INSERT INTO agg_events (user_id)
SELECT t.user_id
FROM raw_events_first t
WHERE EXISTS (SELECT 1 FROM c WHERE c.user_id = t.user_id);
-- Expect one insert per row in raw_events_first (EXISTS always true per user_id)
SELECT 't1_count_matches' AS test,
(SELECT count(*) FROM agg_events) =
(SELECT count(*) FROM raw_events_first) AS ok;
test | ok
---------------------------------------------------------------------
t1_count_matches | t
(1 row)
-- Spot-check: how many rows were inserted
SELECT 't1_rows' AS test, count(*) AS rows FROM agg_events;
test | rows
---------------------------------------------------------------------
t1_rows | 9
(1 row)
---------------------------------------------------------------------
-- T2) CTE referenced inside a nested subquery (two levels down)
---------------------------------------------------------------------
TRUNCATE agg_events;
WITH c AS MATERIALIZED (
SELECT user_id FROM raw_events_first
)
INSERT INTO agg_events (user_id)
SELECT t.user_id
FROM raw_events_first t
WHERE EXISTS (
SELECT 1
FROM (SELECT user_id FROM c) c2
WHERE c2.user_id = t.user_id
);
-- Same cardinality expectation as T1
SELECT 't2_count_matches' AS test,
(SELECT count(*) FROM agg_events) =
(SELECT count(*) FROM raw_events_first) AS ok;
test | ok
---------------------------------------------------------------------
t2_count_matches | t
(1 row)
SELECT 't2_rows' AS test, count(*) AS rows FROM agg_events;
test | rows
---------------------------------------------------------------------
t2_rows | 9
(1 row)
---------------------------------------------------------------------
-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
-- (use MAX() to keep scalar subquery single-row)
---------------------------------------------------------------------
TRUNCATE agg_events;
WITH c AS (SELECT user_id, value_1 FROM raw_events_first)
INSERT INTO agg_events (user_id, value_1_agg)
SELECT d.user_id, d.value_1_agg
FROM (
SELECT t.user_id,
(SELECT max(c.value_1) FROM c WHERE c.user_id = t.user_id) AS value_1_agg
FROM raw_events_first t
) AS d
WHERE d.value_1_agg IS NOT NULL;
-- Expect one insert per row in raw_events_first whose user_id has at least one non-NULL value_1
SELECT 't3_count_matches' AS test,
(SELECT count(*) FROM agg_events) =
(
SELECT count(*)
FROM raw_events_first t
WHERE EXISTS (
SELECT 1 FROM raw_events_first c
WHERE c.user_id = t.user_id AND c.value_1 IS NOT NULL
)
) AS ok;
test | ok
---------------------------------------------------------------------
t3_count_matches | t
(1 row)
-- Also verify no NULLs were inserted into value_1_agg
SELECT 't3_no_null_value_1_agg' AS test,
NOT EXISTS (SELECT 1 FROM agg_events WHERE value_1_agg IS NULL) AS ok;
test | ok
---------------------------------------------------------------------
t3_no_null_value_1_agg | t
(1 row)
-- Deterministic sample of results
SELECT 't3_sample' AS test, user_id, value_1_agg
FROM agg_events
ORDER BY user_id
LIMIT 5;
test | user_id | value_1_agg
---------------------------------------------------------------------
t3_sample | 1 | 20
t3_sample | 1 | 20
t3_sample | 1 | 20
t3_sample | 3 | 30
t3_sample | 5 | 50
(5 rows)
-- End of PR #8106 — CTE traversal works when following outer Vars
SET client_min_messages TO ERROR;
DROP SCHEMA pr8106_cte_outervar CASCADE;
DROP SCHEMA multi_insert_select CASCADE;

View File

@ -2,6 +2,7 @@ SET citus.shard_count TO 32;
SET citus.next_shard_id TO 750000;
SET citus.next_placement_id TO 750000;
CREATE SCHEMA multi_modifications;
SET search_path TO multi_modifications;
-- some failure messages that comes from the worker nodes
-- might change due to parallel executions, so suppress those
-- using \set VERBOSITY terse
@ -31,8 +32,12 @@ SELECT create_distributed_table('limit_orders', 'id', 'hash');
(1 row)
SELECT create_distributed_table('multiple_hash', 'id', 'hash');
ERROR: column "id" of relation "multiple_hash" does not exist
SELECT create_distributed_table('multiple_hash', 'category', 'hash');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('range_partitioned', 'id', 'range');
create_distributed_table
---------------------------------------------------------------------
@ -338,22 +343,26 @@ ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001
-- Test that shards which miss a modification are marked unhealthy
-- First: Connect to the second worker node
\c - - - :worker_2_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
-- Third: Connect back to master node
\c - - - :master_port
SET search_path TO multi_modifications;
-- Fourth: Perform an INSERT on the remaining node
-- the whole transaction should fail
\set VERBOSITY terse
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
ERROR: relation "public.limit_orders_750000" does not exist
ERROR: relation "multi_modifications.limit_orders_750000" does not exist
-- set the shard name back
\c - - - :worker_2_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
-- Verify the insert failed and both placements are healthy
-- or the insert succeeded and placement marked unhealthy
\c - - - :worker_1_port
SET search_path TO multi_modifications;
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
count
---------------------------------------------------------------------
@ -361,6 +370,7 @@ SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
(1 row)
\c - - - :worker_2_port
SET search_path TO multi_modifications;
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
count
---------------------------------------------------------------------
@ -368,6 +378,7 @@ SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
(1 row)
\c - - - :master_port
SET search_path TO multi_modifications;
SELECT count(*) FROM limit_orders WHERE id = 276;
count
---------------------------------------------------------------------
@ -388,14 +399,16 @@ AND s.logicalrelid = 'limit_orders'::regclass;
-- Test that if all shards miss a modification, no state change occurs
-- First: Connect to the first worker node
\c - - - :worker_1_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
-- Third: Connect back to master node
\c - - - :master_port
SET search_path TO multi_modifications;
-- Fourth: Perform an INSERT on the remaining node
\set VERBOSITY terse
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
ERROR: relation "public.limit_orders_750000" does not exist
ERROR: relation "multi_modifications.limit_orders_750000" does not exist
\set VERBOSITY DEFAULT
-- Last: Verify worker is still healthy
SELECT count(*)
@ -414,10 +427,12 @@ AND s.logicalrelid = 'limit_orders'::regclass;
-- Undo our change...
-- First: Connect to the first worker node
\c - - - :worker_1_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
-- Third: Connect back to master node
\c - - - :master_port
SET search_path TO multi_modifications;
-- attempting to change the partition key is unsupported
UPDATE limit_orders SET id = 0 WHERE id = 246;
ERROR: modifying the partition value of rows is not allowed
@ -427,6 +442,368 @@ ERROR: modifying the partition value of rows is not allowed
UPDATE limit_orders SET id = 246 WHERE id = 246;
UPDATE limit_orders SET id = 246 WHERE id = 246 AND symbol = 'GM';
UPDATE limit_orders SET id = limit_orders.id WHERE id = 246;
CREATE TABLE dist_1 (a int, b int, c int);
CREATE TABLE dist_2 (a int, b int, c int);
CREATE TABLE dist_non_colocated (a int, b int, c int);
CREATE TABLE dist_different_order_1 (b int, a int, c int);
SELECT create_distributed_table('dist_1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_2', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_non_colocated', 'a', colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_different_order_1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
--
-- https://github.com/citusdata/citus/issues/8087
--
---- update: should work ----
-- setting shard key to itself --
UPDATE dist_1 SET a = dist_1.a;
UPDATE dist_1 SET a = dist_1.a WHERE dist_1.a > dist_1.b AND dist_1.b > 10;
UPDATE dist_1 SET a = dist_1.a FROM dist_2 WHERE dist_1.a = dist_2.a;
-- setting shard key to another var that's implied to be equal to shard key --
UPDATE dist_1 SET a = b WHERE a = b;
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a;
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.b = dist_2.c AND (dist_2.c > 5 OR dist_2.c < 0);
with cte as (
select a, b from dist_1
)
update dist_1 set a = cte.a from cte where dist_1.a = cte.a;
with cte as (
select a as x, b as y from (select a, b from dist_1 limit 100) dt where b > 100
)
update dist_1 set a = cte.x from cte where dist_1.a = cte.x;
with cte as (
select d2.a as x, d1.b as y
from dist_1 d1, dist_different_order_1 d2
where d1.a=d2.a)
update dist_1 set a = cte.x from cte where y != 0 and dist_1.a = cte.x;
with cte as (
select * from (select a as x, b as y from dist_2 limit 100) q
)
update dist_1 set a = cte.x from cte where b = cte.y and cte.y = a and a = cte.x;
-- supported although the where clause will certainly eval to false
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.a = 5 AND dist_2.a = 7;
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
-- test with extra quals
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a AND dist_1.b = dist_different_order_1.c AND (dist_different_order_1.c > 5 OR dist_different_order_1.c < 0);
---- update: errors in router planner ----
-- different column of the same relation, which is not implied to be equal to shard key --
UPDATE dist_1 SET a = dist_1.b;
ERROR: modifying the partition value of rows is not allowed
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
UPDATE dist_1 SET a = dist_2.a FROM dist_2;
ERROR: modifying the partition value of rows is not allowed
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a != dist_2.a;
ERROR: modifying the partition value of rows is not allowed
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a >= dist_2.a;
ERROR: modifying the partition value of rows is not allowed
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a OR dist_1.a > dist_2.a;
ERROR: modifying the partition value of rows is not allowed
UPDATE dist_1 SET a = dist_different_order_1.b FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
ERROR: modifying the partition value of rows is not allowed
UPDATE dist_1 SET a = foo.a FROM dist_1 foo;
ERROR: modifying the partition value of rows is not allowed
UPDATE dist_1 SET a = foo.a FROM dist_1 foo WHERE dist_1.a != foo.a;
ERROR: modifying the partition value of rows is not allowed
-- (*1) Would normally expect this to not throw an error because
-- dist_1.a = dist_2.b AND dist_2.b = dist_2.a,
-- so dist_1.a = dist_2.a, so we should be able to deduce
-- that (dist_1.)a = dist_2.a, but seems predicate_implied_by()
-- is not that smart.
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.b AND dist_2.b = dist_2.a;
ERROR: modifying the partition value of rows is not allowed
-- and same here
with cte as (
select * from (select a as x, b as y from dist_different_order_1 limit 100) q
)
update dist_1 set a = cte.x from cte where a = cte.y and cte.y = b and b = cte.x;
ERROR: modifying the partition value of rows is not allowed
---- update: errors later (in logical or physical planner) ----
-- setting shard key to itself --
UPDATE dist_1 SET a = dist_1.a FROM dist_1 foo;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
UPDATE dist_1 SET a = dist_1.a FROM dist_2 foo;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
-- setting shard key to another var that's implied to be equal to shard key --
UPDATE dist_1 SET a = dist_non_colocated.a FROM dist_non_colocated WHERE dist_1.a = dist_non_colocated.a;
ERROR: cannot push down this subquery
DETAIL: dist_1 and dist_non_colocated are not colocated
UPDATE dist_1 SET a = dist_2.b FROM dist_2 WHERE dist_1.a = dist_2.b;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
---- update: a more sophisticated example ----
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
SELECT create_distributed_table('dist_source', 'int_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_target', 'int_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(1001, 2000) i;
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(901, 1000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1501, 2000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i-1,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1401, 1500) i;
INSERT INTO local_source SELECT * FROM dist_source;
INSERT INTO local_target SELECT * FROM dist_target;
-- execute the query on distributed tables
UPDATE dist_target target_alias
SET int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
FROM dist_source source_alias
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
-- execute the same query on local tables, everything is the same except table names behind the aliases
UPDATE local_target target_alias
SET int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
FROM local_source source_alias
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
-- compare both targets
SELECT COUNT(*) = 0 AS targets_match
FROM (
SELECT * FROM dist_target
EXCEPT
SELECT * FROM local_target
UNION ALL
SELECT * FROM local_target
EXCEPT
SELECT * FROM dist_target
) q;
targets_match
---------------------------------------------------------------------
t
(1 row)
---- merge: should work ----
-- setting shard key to itself --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
-- We don't care about action quals when deciding if the update
-- could change the shard key, but still add some action quals for
-- testing. See the comments written on top of the line we call
-- TargetEntryChangesValue() in MergeQualAndTargetListFunctionsSupported().
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED AND dist_1.a > dist_1.b AND dist_1.b > 10 THEN UPDATE SET a = dist_1.a;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
-- setting shard key to another var that's implied to be equal to shard key --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a AND dist_1.a = dist_1.b)
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a AND dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.a AND dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
-- test with extra quals
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a AND dist_1.a = src.b AND (dist_1.b > 1000 OR (dist_1.b < 500)))
WHEN MATCHED THEN UPDATE SET a = src.b;
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
MERGE INTO dist_1
USING dist_different_order_1 src
ON (dist_1.a = src.a AND dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
---- merge: errors in router planner ----
-- different column of the same relation, which is not implied to be equal to shard key --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
ERROR: updating the distribution column is not allowed in MERGE actions
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.a;
ERROR: updating the distribution column is not allowed in MERGE actions
-- as in (*1), this is not supported
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b AND src.b = src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
ERROR: updating the distribution column is not allowed in MERGE actions
MERGE INTO dist_1
USING dist_2 src
ON (true)
WHEN MATCHED THEN UPDATE SET a = src.a;
ERROR: updating the distribution column is not allowed in MERGE actions
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a <= src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
ERROR: updating the distribution column is not allowed in MERGE actions
---- merge: a more sophisticated example ----
DROP TABLE dist_source, dist_target, local_source, local_target;
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
SELECT create_distributed_table('dist_source', 'tstamp_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_target', 'int_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(1001, 2000) i;
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(901, 1000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1501, 2000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i-1,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1401, 1500) i;
INSERT INTO local_source SELECT * FROM dist_source;
INSERT INTO local_target SELECT * FROM dist_target;
-- execute the query on distributed tables
MERGE INTO dist_target target_alias
USING dist_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- execute the same query on local tables, everything is the same except table names behind the aliases
MERGE INTO local_target target_alias
USING local_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- compare both targets
SELECT COUNT(*) = 0 AS targets_match
FROM (
SELECT * FROM dist_target
EXCEPT
SELECT * FROM local_target
UNION ALL
SELECT * FROM local_target
EXCEPT
SELECT * FROM dist_target
) q;
targets_match
---------------------------------------------------------------------
t
(1 row)
-- UPDATEs with a FROM clause are supported even with local tables
UPDATE limit_orders SET limit_price = 0.00 FROM bidders
WHERE limit_orders.id = 246 AND
@ -1324,10 +1701,5 @@ CREATE TABLE multi_modifications.local (a int default 1, b int);
INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table));
ERROR: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
DROP TABLE insufficient_shards;
DROP TABLE raw_table;
DROP TABLE summary_table;
DROP TABLE reference_raw_table;
DROP TABLE reference_summary_table;
SET client_min_messages TO WARNING;
DROP SCHEMA multi_modifications CASCADE;
NOTICE: drop cascades to table multi_modifications.local

View File

@ -0,0 +1,45 @@
-- Test for CDC library path adjustment functionality
-- This test verifies that the AdjustDynamicLibraryPathForCdcDecoders function with non-superuser privileges
-- correctly modifies the dynamic_library_path when CDC is enabled
-- Test 1: Non-superuser with read_all_settings can see dynamic_library_path changes
CREATE USER cdc_test_user;
GRANT pg_read_all_settings TO cdc_test_user;
SET ROLE cdc_test_user;
-- Non-superuser should be able to see the current dynamic_library_path
SELECT current_setting('dynamic_library_path') AS user_visible_path;
user_visible_path
---------------------------------------------------------------------
$libdir
(1 row)
SET citus.enable_change_data_capture = true;
SHOW citus.enable_change_data_capture;
citus.enable_change_data_capture
---------------------------------------------------------------------
on
(1 row)
SHOW dynamic_library_path;
dynamic_library_path
---------------------------------------------------------------------
$libdir/citus_decoders:$libdir
(1 row)
-- Reset to superuser and cleanup
RESET ROLE;
DROP USER cdc_test_user;
-- Final cleanup
RESET citus.enable_change_data_capture;
RESET dynamic_library_path;
SHOW citus.enable_change_data_capture;
citus.enable_change_data_capture
---------------------------------------------------------------------
off
(1 row)
SHOW dynamic_library_path;
dynamic_library_path
---------------------------------------------------------------------
$libdir/citus_decoders:$libdir
(1 row)

View File

@ -289,7 +289,7 @@ ORDER BY 1;
function run_command_on_placements(regclass,text,boolean)
function run_command_on_shards(regclass,text,boolean)
function run_command_on_workers(text,boolean)
function shard_name(regclass,bigint)
function shard_name(regclass,bigint,boolean)
function start_metadata_sync_to_all_nodes()
function start_metadata_sync_to_node(text,integer)
function stop_metadata_sync_to_node(text,integer,boolean)
@ -380,6 +380,7 @@ ORDER BY 1;
view citus_dist_stat_activity
view citus_lock_waits
view citus_locks
view citus_nodes
view citus_schema.citus_schemas
view citus_schema.citus_tables
view citus_shard_indexes_on_worker
@ -392,6 +393,6 @@ ORDER BY 1;
view citus_stat_tenants_local
view pg_dist_shard_placement
view time_partitions
(361 rows)
(362 rows)
DROP TABLE extension_basic_types;

View File

@ -137,18 +137,21 @@ class Message:
class SharedMessage(Message, metaclass=MessageMeta):
"A message which could be sent by either the frontend or the backend"
_msgtypes = dict()
_classes = dict()
class FrontendMessage(Message, metaclass=MessageMeta):
"A message which will only be sent be a backend"
_msgtypes = dict()
_classes = dict()
class BackendMessage(Message, metaclass=MessageMeta):
"A message which will only be sent be a frontend"
_msgtypes = dict()
_classes = dict()

View File

@ -5,6 +5,7 @@ test: pg16
test: multi_create_fdw
test: multi_test_catalog_views
test: replicated_table_disable_node
test: cdc_library_path non_super_user_cdc_library_path
# ----------
# The following distributed tests depend on creating a partitioned table and

View File

@ -489,7 +489,7 @@ push(@pgOptions, "citus.explain_analyze_sort_method='taskId'");
push(@pgOptions, "citus.enable_manual_changes_to_shards=on");
push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on");
push(@pgOptions, "citus.stat_statements_track = 'all'");
push(@pgOptions, "citus.enable_change_data_capture=on");
push(@pgOptions, "citus.enable_change_data_capture=off");
push(@pgOptions, "citus.stat_tenants_limit = 2");
push(@pgOptions, "citus.stat_tenants_track = 'ALL'");
push(@pgOptions, "citus.enable_stat_counters=on");

View File

@ -0,0 +1,46 @@
-- Test for CDC library path adjustment functionality
-- This test verifies that the AdjustDynamicLibraryPathForCdcDecoders function with superuser privileges
-- correctly modifies the dynamic_library_path when CDC is enabled
-- Test 1: Show initial state and reset to ensure clean state
SHOW dynamic_library_path;
SHOW citus.enable_change_data_capture;
-- Test 2: Enable CDC and verify path is set to include citus_decoders
SET citus.enable_change_data_capture = true;
SHOW dynamic_library_path;
-- Verify that the dynamic_library_path has been modified to include citus_decoders
SELECT
CASE
WHEN current_setting('dynamic_library_path') LIKE '%citus_decoders%'
THEN 'CDC path correctly set'
ELSE 'CDC path incorrectly not set'
END AS cdc_path_status;
-- Test 3: Disable CDC and verify path remains (CDC doesn't remove the path)
SET citus.enable_change_data_capture = false;
SHOW dynamic_library_path;
-- Test 4: Edge case - function should only work when path is exactly "$libdir"
SET dynamic_library_path = '$libdir/other_path:$libdir';
SET citus.enable_change_data_capture = true;
SHOW dynamic_library_path;
-- Verify that path is unchanged with custom library path
SELECT
CASE
WHEN current_setting('dynamic_library_path') LIKE '%citus_decoders%'
THEN 'CDC path incorrectly set'
ELSE 'CDC path correctly not set'
END AS custom_path_test;
-- Reset dynamic_library_path to default
RESET dynamic_library_path;
RESET citus.enable_change_data_capture;
-- Test 5: Verify that dynamic_library_path reset_val is overridden to $libdir/citus_decoders:$libdir
SHOW dynamic_library_path;
SHOW citus.enable_change_data_capture;

View File

@ -13,5 +13,16 @@ INSERT INTO t1 SELECT generate_series(1, 100);
INSERT INTO "t with space" SELECT generate_series(1, 1000);
SELECT * FROM citus_shards;
SET search_path TO public;
CREATE TABLE t3 (i int);
SELECT citus_add_local_table_to_metadata('t3');
SELECT shard_name('t3', shardid) FROM pg_dist_shard WHERE logicalrelid = 't3'::regclass;
SELECT shard_name('t3', shardid, true) FROM pg_dist_shard WHERE logicalrelid = 't3'::regclass;
SELECT shard_name('t3', shardid, false) FROM pg_dist_shard WHERE logicalrelid = 't3'::regclass;
DROP TABLE t3;
SET search_path TO citus_shards;
SET client_min_messages TO WARNING;
DROP SCHEMA citus_shards CASCADE;

View File

@ -126,5 +126,128 @@ WHEN NOT MATCHED THEN
SELECT compare_data();
DROP SCHEMA merge_repartition2_schema CASCADE;
---- https://github.com/citusdata/citus/issues/8180 ----
CREATE TABLE dist_1 (a int, b int, c int);
CREATE TABLE dist_2 (a int, b int, c int);
CREATE TABLE dist_different_order_1 (b int, a int, c int);
SELECT create_distributed_table('dist_1', 'a');
SELECT create_distributed_table('dist_2', 'a');
SELECT create_distributed_table('dist_different_order_1', 'a');
MERGE INTO dist_1
USING dist_2
ON (dist_1.a = dist_2.b)
WHEN MATCHED THEN UPDATE SET b = dist_2.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET b = src.b;
MERGE INTO dist_different_order_1
USING dist_1
ON (dist_different_order_1.a = dist_1.b)
WHEN MATCHED THEN UPDATE SET b = dist_1.b;
CREATE TABLE dist_1_cast (a int, b int);
CREATE TABLE dist_2_cast (a int, b numeric);
SELECT create_distributed_table('dist_1_cast', 'a');
SELECT create_distributed_table('dist_2_cast', 'a');
MERGE INTO dist_1_cast
USING dist_2_cast
ON (dist_1_cast.a = dist_2_cast.b)
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
MERGE INTO dist_1_cast
USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast
ON (dist_1_cast.a = dist_2_cast.b)
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
-- a more sophisticated example
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
SELECT create_distributed_table('dist_source', 'tstamp_col');
SELECT create_distributed_table('dist_target', 'int_col');
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(1001, 2000) i;
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(901, 1000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1501, 2000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i-1,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1401, 1500) i;
INSERT INTO local_source SELECT * FROM dist_source;
INSERT INTO local_target SELECT * FROM dist_target;
-- execute the query on distributed tables
MERGE INTO dist_target target_alias
USING dist_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- execute the same query on local tables, everything is the same except table names behind the aliases
MERGE INTO local_target target_alias
USING local_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- compare both targets
SELECT COUNT(*) = 0 AS targets_match
FROM (
SELECT * FROM dist_target
EXCEPT
SELECT * FROM local_target
UNION ALL
SELECT * FROM local_target
EXCEPT
SELECT * FROM dist_target
) q;
SET client_min_messages TO WARNING;
DROP SCHEMA merge_repartition2_schema CASCADE;

View File

@ -32,6 +32,21 @@ SELECT result FROM run_command_on_workers('SELECT citus_is_primary_node()');
-- get the active nodes
SELECT master_get_active_worker_nodes();
-- get all nodes
SELECT * from citus_nodes;
-- get get active nodes
SELECT * from citus_nodes where active = 't';
-- get coordinator nodes
SELECT * from citus_nodes where role = 'coordinator';
-- get worker nodes
SELECT * from citus_nodes where role = 'worker';
-- get nodes with unknown role
SELECT * from citus_nodes where role = 'foo';
-- try to add a node that is already in the cluster
SELECT * FROM master_add_node('localhost', :worker_1_port);
@ -51,6 +66,20 @@ SELECT citus_disable_node('localhost', :worker_2_port);
SELECT public.wait_until_metadata_sync(20000);
SELECT master_get_active_worker_nodes();
-- get get active nodes
SELECT * from citus_nodes where active = 't';
-- get get inactive nodes
SELECT * from citus_nodes where active = 'f';
-- make sure non-superusers can access the view
CREATE ROLE normaluser;
SET ROLE normaluser;
SELECT * FROM citus_nodes;
SET ROLE postgres;
DROP ROLE normaluser;
-- add some shard placements to the cluster
SET citus.shard_count TO 16;
SET citus.shard_replication_factor TO 1;

View File

@ -2583,5 +2583,127 @@ SELECT id, val FROM version_dist_union ORDER BY id;
-- End of Issue #7784
-- PR #8106 — CTE traversal works when following outer Vars
-- This script exercises three shapes:
-- T1) CTE referenced inside a correlated subquery (one level down)
-- T2) CTE referenced inside a nested subquery (two levels down)
-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
CREATE SCHEMA pr8106_cte_outervar;
SET search_path = pr8106_cte_outervar, public;
-- Base tables for the tests
DROP TABLE IF EXISTS raw_events_first CASCADE;
DROP TABLE IF EXISTS agg_events CASCADE;
CREATE TABLE raw_events_first(
user_id int,
value_1 int
);
CREATE TABLE agg_events(
user_id int,
value_1_agg int
);
-- Distribute and colocate (distribution key = user_id)
SELECT create_distributed_table('raw_events_first', 'user_id');
SELECT create_distributed_table('agg_events', 'user_id');
-- Seed data (duplicates on some user_ids; some NULL value_1’s)
INSERT INTO raw_events_first(user_id, value_1) VALUES
(1, 10), (1, 20), (1, NULL),
(2, NULL),
(3, 30),
(4, NULL),
(5, 50), (5, NULL),
(6, NULL);
----------------------------------------------------------------------
-- T1) CTE referenced inside a correlated subquery (one level down)
----------------------------------------------------------------------
TRUNCATE agg_events;
WITH c AS MATERIALIZED (
SELECT user_id FROM raw_events_first
)
INSERT INTO agg_events (user_id)
SELECT t.user_id
FROM raw_events_first t
WHERE EXISTS (SELECT 1 FROM c WHERE c.user_id = t.user_id);
-- Expect one insert per row in raw_events_first (EXISTS always true per user_id)
SELECT 't1_count_matches' AS test,
(SELECT count(*) FROM agg_events) =
(SELECT count(*) FROM raw_events_first) AS ok;
-- Spot-check: how many rows were inserted
SELECT 't1_rows' AS test, count(*) AS rows FROM agg_events;
----------------------------------------------------------------------
-- T2) CTE referenced inside a nested subquery (two levels down)
----------------------------------------------------------------------
TRUNCATE agg_events;
WITH c AS MATERIALIZED (
SELECT user_id FROM raw_events_first
)
INSERT INTO agg_events (user_id)
SELECT t.user_id
FROM raw_events_first t
WHERE EXISTS (
SELECT 1
FROM (SELECT user_id FROM c) c2
WHERE c2.user_id = t.user_id
);
-- Same cardinality expectation as T1
SELECT 't2_count_matches' AS test,
(SELECT count(*) FROM agg_events) =
(SELECT count(*) FROM raw_events_first) AS ok;
SELECT 't2_rows' AS test, count(*) AS rows FROM agg_events;
----------------------------------------------------------------------
-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
-- (use MAX() to keep scalar subquery single-row)
----------------------------------------------------------------------
TRUNCATE agg_events;
WITH c AS (SELECT user_id, value_1 FROM raw_events_first)
INSERT INTO agg_events (user_id, value_1_agg)
SELECT d.user_id, d.value_1_agg
FROM (
SELECT t.user_id,
(SELECT max(c.value_1) FROM c WHERE c.user_id = t.user_id) AS value_1_agg
FROM raw_events_first t
) AS d
WHERE d.value_1_agg IS NOT NULL;
-- Expect one insert per row in raw_events_first whose user_id has at least one non-NULL value_1
SELECT 't3_count_matches' AS test,
(SELECT count(*) FROM agg_events) =
(
SELECT count(*)
FROM raw_events_first t
WHERE EXISTS (
SELECT 1 FROM raw_events_first c
WHERE c.user_id = t.user_id AND c.value_1 IS NOT NULL
)
) AS ok;
-- Also verify no NULLs were inserted into value_1_agg
SELECT 't3_no_null_value_1_agg' AS test,
NOT EXISTS (SELECT 1 FROM agg_events WHERE value_1_agg IS NULL) AS ok;
-- Deterministic sample of results
SELECT 't3_sample' AS test, user_id, value_1_agg
FROM agg_events
ORDER BY user_id
LIMIT 5;
-- End of PR #8106 — CTE traversal works when following outer Vars
SET client_min_messages TO ERROR;
DROP SCHEMA pr8106_cte_outervar CASCADE;
DROP SCHEMA multi_insert_select CASCADE;

View File

@ -3,6 +3,7 @@ SET citus.next_shard_id TO 750000;
SET citus.next_placement_id TO 750000;
CREATE SCHEMA multi_modifications;
SET search_path TO multi_modifications;
-- some failure messages that comes from the worker nodes
-- might change due to parallel executions, so suppress those
@ -36,7 +37,7 @@ CREATE TABLE append_partitioned ( LIKE limit_orders );
SET citus.shard_count TO 2;
SELECT create_distributed_table('limit_orders', 'id', 'hash');
SELECT create_distributed_table('multiple_hash', 'id', 'hash');
SELECT create_distributed_table('multiple_hash', 'category', 'hash');
SELECT create_distributed_table('range_partitioned', 'id', 'range');
SELECT create_distributed_table('append_partitioned', 'id', 'append');
@ -244,12 +245,14 @@ INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell',
-- First: Connect to the second worker node
\c - - - :worker_2_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
-- Third: Connect back to master node
\c - - - :master_port
SET search_path TO multi_modifications;
-- Fourth: Perform an INSERT on the remaining node
-- the whole transaction should fail
@ -258,6 +261,7 @@ INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell',
-- set the shard name back
\c - - - :worker_2_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
@ -265,12 +269,15 @@ ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
-- Verify the insert failed and both placements are healthy
-- or the insert succeeded and placement marked unhealthy
\c - - - :worker_1_port
SET search_path TO multi_modifications;
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
\c - - - :worker_2_port
SET search_path TO multi_modifications;
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
\c - - - :master_port
SET search_path TO multi_modifications;
SELECT count(*) FROM limit_orders WHERE id = 276;
@ -285,12 +292,14 @@ AND s.logicalrelid = 'limit_orders'::regclass;
-- First: Connect to the first worker node
\c - - - :worker_1_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
-- Third: Connect back to master node
\c - - - :master_port
SET search_path TO multi_modifications;
-- Fourth: Perform an INSERT on the remaining node
\set VERBOSITY terse
@ -311,12 +320,14 @@ AND s.logicalrelid = 'limit_orders'::regclass;
-- First: Connect to the first worker node
\c - - - :worker_1_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
-- Third: Connect back to master node
\c - - - :master_port
SET search_path TO multi_modifications;
-- attempting to change the partition key is unsupported
UPDATE limit_orders SET id = 0 WHERE id = 246;
@ -327,6 +338,375 @@ UPDATE limit_orders SET id = 246 WHERE id = 246;
UPDATE limit_orders SET id = 246 WHERE id = 246 AND symbol = 'GM';
UPDATE limit_orders SET id = limit_orders.id WHERE id = 246;
CREATE TABLE dist_1 (a int, b int, c int);
CREATE TABLE dist_2 (a int, b int, c int);
CREATE TABLE dist_non_colocated (a int, b int, c int);
CREATE TABLE dist_different_order_1 (b int, a int, c int);
SELECT create_distributed_table('dist_1', 'a');
SELECT create_distributed_table('dist_2', 'a');
SELECT create_distributed_table('dist_non_colocated', 'a', colocate_with=>'none');
SELECT create_distributed_table('dist_different_order_1', 'a');
--
-- https://github.com/citusdata/citus/issues/8087
--
---- update: should work ----
-- setting shard key to itself --
UPDATE dist_1 SET a = dist_1.a;
UPDATE dist_1 SET a = dist_1.a WHERE dist_1.a > dist_1.b AND dist_1.b > 10;
UPDATE dist_1 SET a = dist_1.a FROM dist_2 WHERE dist_1.a = dist_2.a;
-- setting shard key to another var that's implied to be equal to shard key --
UPDATE dist_1 SET a = b WHERE a = b;
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a;
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.b = dist_2.c AND (dist_2.c > 5 OR dist_2.c < 0);
with cte as (
select a, b from dist_1
)
update dist_1 set a = cte.a from cte where dist_1.a = cte.a;
with cte as (
select a as x, b as y from (select a, b from dist_1 limit 100) dt where b > 100
)
update dist_1 set a = cte.x from cte where dist_1.a = cte.x;
with cte as (
select d2.a as x, d1.b as y
from dist_1 d1, dist_different_order_1 d2
where d1.a=d2.a)
update dist_1 set a = cte.x from cte where y != 0 and dist_1.a = cte.x;
with cte as (
select * from (select a as x, b as y from dist_2 limit 100) q
)
update dist_1 set a = cte.x from cte where b = cte.y and cte.y = a and a = cte.x;
-- supported although the where clause will certainly eval to false
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.a = 5 AND dist_2.a = 7;
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
-- test with extra quals
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a AND dist_1.b = dist_different_order_1.c AND (dist_different_order_1.c > 5 OR dist_different_order_1.c < 0);
---- update: errors in router planner ----
-- different column of the same relation, which is not implied to be equal to shard key --
UPDATE dist_1 SET a = dist_1.b;
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
UPDATE dist_1 SET a = dist_2.a FROM dist_2;
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a != dist_2.a;
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a >= dist_2.a;
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a OR dist_1.a > dist_2.a;
UPDATE dist_1 SET a = dist_different_order_1.b FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
UPDATE dist_1 SET a = foo.a FROM dist_1 foo;
UPDATE dist_1 SET a = foo.a FROM dist_1 foo WHERE dist_1.a != foo.a;
-- (*1) Would normally expect this to not throw an error because
-- dist_1.a = dist_2.b AND dist_2.b = dist_2.a,
-- so dist_1.a = dist_2.a, so we should be able to deduce
-- that (dist_1.)a = dist_2.a, but seems predicate_implied_by()
-- is not that smart.
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.b AND dist_2.b = dist_2.a;
-- and same here
with cte as (
select * from (select a as x, b as y from dist_different_order_1 limit 100) q
)
update dist_1 set a = cte.x from cte where a = cte.y and cte.y = b and b = cte.x;
---- update: errors later (in logical or physical planner) ----
-- setting shard key to itself --
UPDATE dist_1 SET a = dist_1.a FROM dist_1 foo;
UPDATE dist_1 SET a = dist_1.a FROM dist_2 foo;
-- setting shard key to another var that's implied to be equal to shard key --
UPDATE dist_1 SET a = dist_non_colocated.a FROM dist_non_colocated WHERE dist_1.a = dist_non_colocated.a;
UPDATE dist_1 SET a = dist_2.b FROM dist_2 WHERE dist_1.a = dist_2.b;
---- update: a more sophisticated example ----
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
SELECT create_distributed_table('dist_source', 'int_col');
SELECT create_distributed_table('dist_target', 'int_col');
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(1001, 2000) i;
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(901, 1000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1501, 2000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i-1,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1401, 1500) i;
INSERT INTO local_source SELECT * FROM dist_source;
INSERT INTO local_target SELECT * FROM dist_target;
-- execute the query on distributed tables
UPDATE dist_target target_alias
SET int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
FROM dist_source source_alias
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
-- execute the same query on local tables, everything is the same except table names behind the aliases
UPDATE local_target target_alias
SET int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
FROM local_source source_alias
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
-- compare both targets
SELECT COUNT(*) = 0 AS targets_match
FROM (
SELECT * FROM dist_target
EXCEPT
SELECT * FROM local_target
UNION ALL
SELECT * FROM local_target
EXCEPT
SELECT * FROM dist_target
) q;
---- merge: should work ----
-- setting shard key to itself --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
-- We don't care about action quals when deciding if the update
-- could change the shard key, but still add some action quals for
-- testing. See the comments written on top of the line we call
-- TargetEntryChangesValue() in MergeQualAndTargetListFunctionsSupported().
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED AND dist_1.a > dist_1.b AND dist_1.b > 10 THEN UPDATE SET a = dist_1.a;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
-- setting shard key to another var that's implied to be equal to shard key --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a AND dist_1.a = dist_1.b)
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a AND dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.a AND dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
-- test with extra quals
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a AND dist_1.a = src.b AND (dist_1.b > 1000 OR (dist_1.b < 500)))
WHEN MATCHED THEN UPDATE SET a = src.b;
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
MERGE INTO dist_1
USING dist_different_order_1 src
ON (dist_1.a = src.a AND dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
---- merge: errors in router planner ----
-- different column of the same relation, which is not implied to be equal to shard key --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.a;
-- as in (*1), this is not supported
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b AND src.b = src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
MERGE INTO dist_1
USING dist_2 src
ON (true)
WHEN MATCHED THEN UPDATE SET a = src.a;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a <= src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
---- merge: a more sophisticated example ----
DROP TABLE dist_source, dist_target, local_source, local_target;
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
SELECT create_distributed_table('dist_source', 'tstamp_col');
SELECT create_distributed_table('dist_target', 'int_col');
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(1001, 2000) i;
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(901, 1000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1501, 2000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i-1,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1401, 1500) i;
INSERT INTO local_source SELECT * FROM dist_source;
INSERT INTO local_target SELECT * FROM dist_target;
-- execute the query on distributed tables
MERGE INTO dist_target target_alias
USING dist_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- execute the same query on local tables, everything is the same except table names behind the aliases
MERGE INTO local_target target_alias
USING local_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- compare both targets
SELECT COUNT(*) = 0 AS targets_match
FROM (
SELECT * FROM dist_target
EXCEPT
SELECT * FROM local_target
UNION ALL
SELECT * FROM local_target
EXCEPT
SELECT * FROM dist_target
) q;
-- UPDATEs with a FROM clause are supported even with local tables
UPDATE limit_orders SET limit_price = 0.00 FROM bidders
WHERE limit_orders.id = 246 AND
@ -897,9 +1277,5 @@ DELETE FROM summary_table WHERE id < (
CREATE TABLE multi_modifications.local (a int default 1, b int);
INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table));
DROP TABLE insufficient_shards;
DROP TABLE raw_table;
DROP TABLE summary_table;
DROP TABLE reference_raw_table;
DROP TABLE reference_summary_table;
SET client_min_messages TO WARNING;
DROP SCHEMA multi_modifications CASCADE;

View File

@ -0,0 +1,27 @@
-- Test for CDC library path adjustment functionality
-- This test verifies that the AdjustDynamicLibraryPathForCdcDecoders function with non-superuser privileges
-- correctly modifies the dynamic_library_path when CDC is enabled
-- Test 1: Non-superuser with read_all_settings can see dynamic_library_path changes
CREATE USER cdc_test_user;
GRANT pg_read_all_settings TO cdc_test_user;
SET ROLE cdc_test_user;
-- Non-superuser should be able to see the current dynamic_library_path
SELECT current_setting('dynamic_library_path') AS user_visible_path;
SET citus.enable_change_data_capture = true;
SHOW citus.enable_change_data_capture;
SHOW dynamic_library_path;
-- Reset to superuser and cleanup
RESET ROLE;
DROP USER cdc_test_user;
-- Final cleanup
RESET citus.enable_change_data_capture;
RESET dynamic_library_path;
SHOW citus.enable_change_data_capture;
SHOW dynamic_library_path;