mirror of https://github.com/citusdata/citus.git
Compare commits
15 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
147619d79a | |
|
|
fd8f99b18e | |
|
|
2296e0dd3a | |
|
|
6467cb9e2c | |
|
|
d8fcddc558 | |
|
|
d94174d7a8 | |
|
|
ea78497e9e | |
|
|
0ce8fb6e2c | |
|
|
ad266a2c0a | |
|
|
e42847196a | |
|
|
b6b415c17a | |
|
|
0355d8c392 | |
|
|
af01fa48ec | |
|
|
1cb2462818 | |
|
|
662628fe7d |
|
|
@ -73,7 +73,7 @@ USER citus
|
|||
|
||||
# build postgres versions separately for effective parrallelism and caching of already built versions when changing only certain versions
|
||||
FROM base AS pg15
|
||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 15.12
|
||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 15.14
|
||||
RUN rm .pgenv/src/*.tar*
|
||||
RUN make -C .pgenv/src/postgresql-*/ clean
|
||||
RUN make -C .pgenv/src/postgresql-*/src/include install
|
||||
|
|
@ -85,7 +85,7 @@ RUN cp -r .pgenv/src .pgenv/pgsql-* .pgenv/config .pgenv-staging/
|
|||
RUN rm .pgenv-staging/config/default.conf
|
||||
|
||||
FROM base AS pg16
|
||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 16.8
|
||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 16.10
|
||||
RUN rm .pgenv/src/*.tar*
|
||||
RUN make -C .pgenv/src/postgresql-*/ clean
|
||||
RUN make -C .pgenv/src/postgresql-*/src/include install
|
||||
|
|
@ -97,7 +97,7 @@ RUN cp -r .pgenv/src .pgenv/pgsql-* .pgenv/config .pgenv-staging/
|
|||
RUN rm .pgenv-staging/config/default.conf
|
||||
|
||||
FROM base AS pg17
|
||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 17.4
|
||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 17.6
|
||||
RUN rm .pgenv/src/*.tar*
|
||||
RUN make -C .pgenv/src/postgresql-*/ clean
|
||||
RUN make -C .pgenv/src/postgresql-*/src/include install
|
||||
|
|
@ -216,7 +216,7 @@ COPY --chown=citus:citus .psqlrc .
|
|||
RUN sudo chown --from=root:root citus:citus -R ~
|
||||
|
||||
# sets default pg version
|
||||
RUN pgenv switch 17.4
|
||||
RUN pgenv switch 17.6
|
||||
|
||||
# make connecting to the coordinator easy
|
||||
ENV PGPORT=9700
|
||||
|
|
|
|||
|
|
@ -31,12 +31,12 @@ jobs:
|
|||
pgupgrade_image_name: "ghcr.io/citusdata/pgupgradetester"
|
||||
style_checker_image_name: "ghcr.io/citusdata/stylechecker"
|
||||
style_checker_tools_version: "0.8.18"
|
||||
sql_snapshot_pg_version: "17.4"
|
||||
image_suffix: "-veab367a"
|
||||
pg15_version: '{ "major": "15", "full": "15.12" }'
|
||||
pg16_version: '{ "major": "16", "full": "16.8" }'
|
||||
pg17_version: '{ "major": "17", "full": "17.4" }'
|
||||
upgrade_pg_versions: "15.12-16.8-17.4"
|
||||
sql_snapshot_pg_version: "17.6"
|
||||
image_suffix: "-v4df94a0"
|
||||
pg15_version: '{ "major": "15", "full": "15.14" }'
|
||||
pg16_version: '{ "major": "16", "full": "16.10" }'
|
||||
pg17_version: '{ "major": "17", "full": "17.6" }'
|
||||
upgrade_pg_versions: "15.14-16.10-17.6"
|
||||
steps:
|
||||
# Since GHA jobs need at least one step we use a noop step here.
|
||||
- name: Set up parameters
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ jobs:
|
|||
uses: actions/checkout@v4
|
||||
|
||||
- name: Initialize CodeQL
|
||||
uses: github/codeql-action/init@v2
|
||||
uses: github/codeql-action/init@v3
|
||||
with:
|
||||
languages: ${{ matrix.language }}
|
||||
|
||||
|
|
@ -76,4 +76,4 @@ jobs:
|
|||
sudo make install-all
|
||||
|
||||
- name: Perform CodeQL Analysis
|
||||
uses: github/codeql-action/analyze@v2
|
||||
uses: github/codeql-action/analyze@v3
|
||||
|
|
|
|||
147
CHANGELOG.md
147
CHANGELOG.md
|
|
@ -1,3 +1,145 @@
|
|||
### citus v13.1.1 (Oct 1st, 2025) ###
|
||||
|
||||
* Adds support for latest PG minors: 14.19, 15.14, 16.10 (#8142)
|
||||
|
||||
* Fixes an assertion failure when an expression in the query references
|
||||
a CTE (#8106)
|
||||
|
||||
* Fixes a bug that causes an unexpected error when executing
|
||||
repartitioned MERGE (#8201)
|
||||
|
||||
* Fixes a bug that causes allowing UPDATE / MERGE queries that may
|
||||
change the distribution column value (#8214)
|
||||
|
||||
* Updates dynamic_library_path automatically when CDC is enabled (#8025)
|
||||
|
||||
### citus v13.1.0 (May 30th, 2025) ###
|
||||
|
||||
* Adds `citus_stat_counters` view that can be used to query
|
||||
stat counters that Citus collects while the feature is enabled, which is
|
||||
controlled by citus.enable_stat_counters. `citus_stat_counters()` can be
|
||||
used to query the stat counters for the provided database oid and
|
||||
`citus_stat_counters_reset()` can be used to reset them for the provided
|
||||
database oid or for the current database if nothing or 0 is provided (#7917)
|
||||
|
||||
* Adds `citus_nodes` view that displays the node name, port role, and "active"
|
||||
for nodes in the cluster (#7968)
|
||||
|
||||
* Adds `citus_is_primary_node()` UDF to determine if the current node is a
|
||||
primary node in the cluster (#7720)
|
||||
|
||||
* Adds support for propagating `GRANT/REVOKE` rights on table columns (#7918)
|
||||
|
||||
* Adds support for propagating `REASSIGN OWNED BY` commands (#7319)
|
||||
|
||||
* Adds support for propagating `CREATE`/`DROP` database from all nodes (#7240,
|
||||
#7253, #7359)
|
||||
|
||||
* Propagates `SECURITY LABEL ON ROLE` statement from any node (#7508)
|
||||
|
||||
* Adds support for issuing role management commands from worker nodes (#7278)
|
||||
|
||||
* Adds support for propagating `ALTER USER RENAME` commands (#7204)
|
||||
|
||||
* Adds support for propagating `ALTER DATABASE <db_name> SET ..` commands
|
||||
(#7181)
|
||||
|
||||
* Adds support for propagating `SECURITY LABEL` on tables and columns (#7956)
|
||||
|
||||
* Adds support for propagating `COMMENT ON <database>/<role>` commands (#7388)
|
||||
|
||||
* Moves some of the internal citus functions from `pg_catalog` to
|
||||
`citus_internal` schema (#7473, #7470, #7466, 7456, 7450)
|
||||
|
||||
* Adjusts `max_prepared_transactions` only when it's set to default on PG >= 16
|
||||
(#7712)
|
||||
|
||||
* Adds skip_qualify_public param to shard_name() UDF to allow qualifying for
|
||||
"public" schema when needed (#8014)
|
||||
|
||||
* Allows `citus_*_size` on indexes on a distributed tables (#7271)
|
||||
|
||||
* Allows `GRANT ADMIN` to now also be `INHERIT` or `SET` in support of PG16
|
||||
|
||||
* Makes sure `worker_copy_table_to_node` errors out with Citus tables (#7662)
|
||||
|
||||
* Adds information to explain output when using
|
||||
`citus.explain_distributed_queries=false` (#7412)
|
||||
|
||||
* Logs username in the failed connection message (#7432)
|
||||
|
||||
* Makes sure to avoid incorrectly pushing-down the outer joins between
|
||||
distributed tables and recurring relations (like reference tables, local
|
||||
tables and `VALUES(..)` etc.) prior to PG 17 (#7937)
|
||||
|
||||
* Prevents incorrectly pushing `nextval()` call down to workers to avoid using
|
||||
incorrect sequence value for some types of `INSERT .. SELECT`s (#7976)
|
||||
|
||||
* Makes sure to prevent `INSERT INTO ... SELECT` queries involving subfield or
|
||||
sublink, to avoid crashes (#7912)
|
||||
|
||||
* Makes sure to take improvement_threshold into the account
|
||||
in `citus_add_rebalance_strategy()` (#7247)
|
||||
|
||||
* Makes sure to disallow creating a replicated distributed
|
||||
table concurrently (#7219)
|
||||
|
||||
* Fixes a bug that causes omitting `CASCADE` clause for the commands sent to
|
||||
workers for `REVOKE` commands on tables (#7958)
|
||||
|
||||
* Fixes an issue detected using address sanitizer (#7948, #7949)
|
||||
|
||||
* Fixes a bug in deparsing of shard query in case of "output-table column" name
|
||||
conflict (#7932)
|
||||
|
||||
* Fixes a crash in columnar custom scan that happens when a columnar table is
|
||||
used in a join (#7703)
|
||||
|
||||
* Fixes `MERGE` command when insert value does not have source distributed
|
||||
column (#7627)
|
||||
|
||||
* Fixes performance issue when using `\d tablename` on a server with many
|
||||
tables (#7577)
|
||||
|
||||
* Fixes performance issue in `GetForeignKeyOids` on systems with many
|
||||
constraints (#7580)
|
||||
|
||||
* Fixes performance issue when distributing a table that depends on an
|
||||
extension (#7574)
|
||||
|
||||
* Fixes performance issue when creating distributed tables if many already
|
||||
exist (#7575)
|
||||
|
||||
* Fixes a crash caused by some form of `ALTER TABLE ADD COLUMN` statements. When
|
||||
adding multiple columns, if one of the `ADD COLUMN` statements contains a
|
||||
`FOREIGN` constraint ommitting the referenced
|
||||
columns in the statement, a `SEGFAULT` occurs (#7522)
|
||||
|
||||
* Fixes assertion failure in maintenance daemon during Citus upgrades (#7537)
|
||||
|
||||
* Fixes segmentation fault when using `CASE WHEN` in `DO` block functions
|
||||
(#7554)
|
||||
|
||||
* Fixes undefined behavior in `master_disable_node` due to argument mismatch
|
||||
(#7492)
|
||||
|
||||
* Fixes incorrect propagating of `GRANTED BY` and `CASCADE/RESTRICT` clauses
|
||||
for `REVOKE` statements (#7451)
|
||||
|
||||
* Fixes the incorrect column count after `ALTER TABLE` (#7379)
|
||||
|
||||
* Fixes timeout when underlying socket is changed for an inter-node connection
|
||||
(#7377)
|
||||
|
||||
* Fixes memory leaks (#7441, #7440)
|
||||
|
||||
* Fixes leaking of memory and memory contexts when tracking foreign keys between
|
||||
Citus tables (#7236)
|
||||
|
||||
* Fixes a potential segfault for background rebalancer (#7694)
|
||||
|
||||
* Fixes potential `NULL` dereference in casual clocks (#7704)
|
||||
|
||||
### citus v13.0.3 (March 20th, 2025) ###
|
||||
|
||||
* Fixes a version bump issue in 13.0.2
|
||||
|
|
@ -100,9 +242,8 @@
|
|||
* Allows overwriting host name for all inter-node connections by
|
||||
supporting "host" parameter in citus.node_conninfo (#7541)
|
||||
|
||||
* Changes the order in which the locks are acquired for the target and
|
||||
reference tables, when a modify request is initiated from a worker
|
||||
node that is not the "FirstWorkerNode" (#7542)
|
||||
* Avoids distributed deadlocks by changing the order in which the locks are
|
||||
acquired for the target and reference tables (#7542)
|
||||
|
||||
* Fixes a performance issue when distributing a table that depends on an
|
||||
extension (#7574)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
#! /bin/sh
|
||||
# Guess values for system-dependent variables and create Makefiles.
|
||||
# Generated by GNU Autoconf 2.69 for Citus 13.1devel.
|
||||
# Generated by GNU Autoconf 2.69 for Citus 13.1.1.
|
||||
#
|
||||
#
|
||||
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
|
||||
|
|
@ -579,8 +579,8 @@ MAKEFLAGS=
|
|||
# Identity of this package.
|
||||
PACKAGE_NAME='Citus'
|
||||
PACKAGE_TARNAME='citus'
|
||||
PACKAGE_VERSION='13.1devel'
|
||||
PACKAGE_STRING='Citus 13.1devel'
|
||||
PACKAGE_VERSION='13.1.1'
|
||||
PACKAGE_STRING='Citus 13.1.1'
|
||||
PACKAGE_BUGREPORT=''
|
||||
PACKAGE_URL=''
|
||||
|
||||
|
|
@ -1262,7 +1262,7 @@ if test "$ac_init_help" = "long"; then
|
|||
# Omit some internal or obsolete options to make the list less imposing.
|
||||
# This message is too long to be a string in the A/UX 3.1 sh.
|
||||
cat <<_ACEOF
|
||||
\`configure' configures Citus 13.1devel to adapt to many kinds of systems.
|
||||
\`configure' configures Citus 13.1.1 to adapt to many kinds of systems.
|
||||
|
||||
Usage: $0 [OPTION]... [VAR=VALUE]...
|
||||
|
||||
|
|
@ -1324,7 +1324,7 @@ fi
|
|||
|
||||
if test -n "$ac_init_help"; then
|
||||
case $ac_init_help in
|
||||
short | recursive ) echo "Configuration of Citus 13.1devel:";;
|
||||
short | recursive ) echo "Configuration of Citus 13.1.1:";;
|
||||
esac
|
||||
cat <<\_ACEOF
|
||||
|
||||
|
|
@ -1429,7 +1429,7 @@ fi
|
|||
test -n "$ac_init_help" && exit $ac_status
|
||||
if $ac_init_version; then
|
||||
cat <<\_ACEOF
|
||||
Citus configure 13.1devel
|
||||
Citus configure 13.1.1
|
||||
generated by GNU Autoconf 2.69
|
||||
|
||||
Copyright (C) 2012 Free Software Foundation, Inc.
|
||||
|
|
@ -1912,7 +1912,7 @@ cat >config.log <<_ACEOF
|
|||
This file contains any messages produced by compilers while
|
||||
running configure, to aid debugging if configure makes a mistake.
|
||||
|
||||
It was created by Citus $as_me 13.1devel, which was
|
||||
It was created by Citus $as_me 13.1.1, which was
|
||||
generated by GNU Autoconf 2.69. Invocation command line was
|
||||
|
||||
$ $0 $@
|
||||
|
|
@ -5393,7 +5393,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
|
|||
# report actual input values of CONFIG_FILES etc. instead of their
|
||||
# values after options handling.
|
||||
ac_log="
|
||||
This file was extended by Citus $as_me 13.1devel, which was
|
||||
This file was extended by Citus $as_me 13.1.1, which was
|
||||
generated by GNU Autoconf 2.69. Invocation command line was
|
||||
|
||||
CONFIG_FILES = $CONFIG_FILES
|
||||
|
|
@ -5455,7 +5455,7 @@ _ACEOF
|
|||
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
|
||||
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
|
||||
ac_cs_version="\\
|
||||
Citus config.status 13.1devel
|
||||
Citus config.status 13.1.1
|
||||
configured by $0, generated by GNU Autoconf 2.69,
|
||||
with options \\"\$ac_cs_config\\"
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
# everyone needing autoconf installed, the resulting files are checked
|
||||
# into the SCM.
|
||||
|
||||
AC_INIT([Citus], [13.1devel])
|
||||
AC_INIT([Citus], [13.1.1])
|
||||
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
|
||||
|
||||
# we'll need sed and awk for some of the version commands
|
||||
|
|
|
|||
|
|
@ -656,7 +656,9 @@ SaveStripeSkipList(RelFileLocator relfilelocator, uint64 stripe,
|
|||
nulls[Anum_columnar_chunk_maximum_value - 1] = true;
|
||||
}
|
||||
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
||||
PopActiveSnapshot();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -109,13 +109,20 @@ citus_unmark_object_distributed(PG_FUNCTION_ARGS)
|
|||
Oid classid = PG_GETARG_OID(0);
|
||||
Oid objid = PG_GETARG_OID(1);
|
||||
int32 objsubid = PG_GETARG_INT32(2);
|
||||
|
||||
/*
|
||||
* SQL function master_unmark_object_distributed doesn't expect the
|
||||
* 4th argument but SQL function citus_unmark_object_distributed does
|
||||
* so as checkobjectexistence argument. For this reason, we try to
|
||||
* get the 4th argument only if this C function is called with 4
|
||||
* arguments.
|
||||
*/
|
||||
bool checkObjectExistence = true;
|
||||
if (!PG_ARGISNULL(3))
|
||||
if (PG_NARGS() == 4)
|
||||
{
|
||||
checkObjectExistence = PG_GETARG_BOOL(3);
|
||||
}
|
||||
|
||||
|
||||
ObjectAddress address = { 0 };
|
||||
ObjectAddressSubSet(address, classid, objid, objsubid);
|
||||
|
||||
|
|
|
|||
|
|
@ -2930,7 +2930,9 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
|
|||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
||||
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
CatalogTupleInsert(pgDistNode, heapTuple);
|
||||
PopActiveSnapshot();
|
||||
|
||||
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
|
||||
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@
|
|||
static int SourceResultPartitionColumnIndex(Query *mergeQuery,
|
||||
List *sourceTargetList,
|
||||
CitusTableCacheEntry *targetRelation);
|
||||
static int FindTargetListEntryWithVarExprAttno(List *targetList, AttrNumber varattno);
|
||||
static Var * ValidateAndReturnVarIfSupported(Node *entryExpr);
|
||||
static DeferredErrorMessage * DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
|
||||
PlannerRestrictionContext *
|
||||
|
|
@ -628,6 +629,22 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, Query *query,
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* joinTree->quals, retrieved by GetMergeJoinTree() - either from
|
||||
* mergeJoinCondition (PG >= 17) or jointree->quals (PG < 17),
|
||||
* only contains the quals that present in "ON (..)" clause. Action
|
||||
* quals that can be specified for each specific action, as in
|
||||
* "WHEN <match condition> AND <action quals> THEN <action>"", are
|
||||
* saved into "qual" field of the corresponding action's entry in
|
||||
* mergeActionList, see
|
||||
* https://github.com/postgres/postgres/blob/e6da68a6e1d60a037b63a9c9ed36e5ef0a996769/src/backend/parser/parse_merge.c#L285-L293.
|
||||
*
|
||||
* For this reason, even if TargetEntryChangesValue() could prove that
|
||||
* an action's quals ensure that the action cannot change the distribution
|
||||
* key, this is not the case as we don't provide action quals to
|
||||
* TargetEntryChangesValue(), but just joinTree, which only contains
|
||||
* the "ON (..)" clause quals.
|
||||
*/
|
||||
if (targetEntryDistributionColumn &&
|
||||
TargetEntryChangesValue(targetEntry, distributionColumn, joinTree))
|
||||
{
|
||||
|
|
@ -1410,7 +1427,8 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
|
|||
Assert(sourceRepartitionVar);
|
||||
|
||||
int sourceResultRepartitionColumnIndex =
|
||||
DistributionColumnIndex(sourceTargetList, sourceRepartitionVar);
|
||||
FindTargetListEntryWithVarExprAttno(sourceTargetList,
|
||||
sourceRepartitionVar->varattno);
|
||||
|
||||
if (sourceResultRepartitionColumnIndex == -1)
|
||||
{
|
||||
|
|
@ -1561,6 +1579,33 @@ FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* FindTargetListEntryWithVarExprAttno finds the index of the target
|
||||
* entry whose expr is a Var that points to input varattno.
|
||||
*
|
||||
* If no such target entry is found, it returns -1.
|
||||
*/
|
||||
static int
|
||||
FindTargetListEntryWithVarExprAttno(List *targetList, AttrNumber varattno)
|
||||
{
|
||||
int targetEntryIndex = 0;
|
||||
|
||||
TargetEntry *targetEntry = NULL;
|
||||
foreach_declared_ptr(targetEntry, targetList)
|
||||
{
|
||||
if (IsA(targetEntry->expr, Var) &&
|
||||
((Var *) targetEntry->expr)->varattno == varattno)
|
||||
{
|
||||
return targetEntryIndex;
|
||||
}
|
||||
|
||||
targetEntryIndex++;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsLocalTableModification returns true if the table modified is a Postgres table.
|
||||
* We do not support recursive planning for MERGE yet, so we could have a join
|
||||
|
|
|
|||
|
|
@ -4560,11 +4560,10 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
|
|||
else if (rangeTableEntry->rtekind == RTE_CTE)
|
||||
{
|
||||
/*
|
||||
* When outerVars are considered, we modify parentQueryList, so this
|
||||
* logic might need to change when we support outervars in CTEs.
|
||||
* Resolve through a CTE even when skipOuterVars == false.
|
||||
* Maintain the invariant that each recursion level owns a private,
|
||||
* correctly-bounded copy of parentQueryList.
|
||||
*/
|
||||
Assert(skipOuterVars);
|
||||
|
||||
int cteParentListIndex = list_length(parentQueryList) -
|
||||
rangeTableEntry->ctelevelsup - 1;
|
||||
Query *cteParentQuery = NULL;
|
||||
|
|
@ -4595,14 +4594,34 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
|
|||
if (cte != NULL)
|
||||
{
|
||||
Query *cteQuery = (Query *) cte->ctequery;
|
||||
List *targetEntryList = cteQuery->targetList;
|
||||
AttrNumber targetEntryIndex = candidateColumn->varattno - 1;
|
||||
TargetEntry *targetEntry = list_nth(targetEntryList, targetEntryIndex);
|
||||
|
||||
parentQueryList = lappend(parentQueryList, query);
|
||||
FindReferencedTableColumn(targetEntry->expr, parentQueryList,
|
||||
cteQuery, column, rteContainingReferencedColumn,
|
||||
skipOuterVars);
|
||||
if (targetEntryIndex >= 0 &&
|
||||
targetEntryIndex < list_length(cteQuery->targetList))
|
||||
{
|
||||
TargetEntry *targetEntry =
|
||||
list_nth(cteQuery->targetList, targetEntryIndex);
|
||||
|
||||
/* Build a private, bounded parentQueryList before recursing into the CTE.
|
||||
* Invariant: list is [top … current], owned by this call (no aliasing).
|
||||
* For RTE_CTE:
|
||||
* owner_idx = list_length(parentQueryList) - rangeTableEntry->ctelevelsup - 1;
|
||||
* newParent = lappend(list_truncate(list_copy(parentQueryList), owner_idx + 1), query);
|
||||
* Example (Q0 owns CTE; we’re in Q2 via nested subquery):
|
||||
* parent=[Q0,Q1,Q2], ctelevelsup=2 ⇒ owner_idx=0 ⇒ newParent=[Q0,Q2].
|
||||
* Keeps outer-Var level math correct without mutating the caller’s list.
|
||||
*/
|
||||
List *newParent = list_copy(parentQueryList);
|
||||
newParent = list_truncate(newParent, cteParentListIndex + 1);
|
||||
newParent = lappend(newParent, query);
|
||||
|
||||
FindReferencedTableColumn(targetEntry->expr,
|
||||
newParent,
|
||||
cteQuery,
|
||||
column,
|
||||
rteContainingReferencedColumn,
|
||||
skipOuterVars);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3077,16 +3077,25 @@ BuildBaseConstraint(Var *column)
|
|||
|
||||
|
||||
/*
|
||||
* MakeOpExpression builds an operator expression node. This operator expression
|
||||
* implements the operator clause as defined by the variable and the strategy
|
||||
* number.
|
||||
* MakeOpExpressionExtended builds an operator expression node that's of
|
||||
* the form "Var <op> Expr", where, Expr must either be a Const or a Var
|
||||
* (*1).
|
||||
*
|
||||
* This operator expression implements the operator clause as defined by
|
||||
* the variable and the strategy number.
|
||||
*/
|
||||
OpExpr *
|
||||
MakeOpExpression(Var *variable, int16 strategyNumber)
|
||||
MakeOpExpressionExtended(Var *leftVar, Expr *rightArg, int16 strategyNumber)
|
||||
{
|
||||
Oid typeId = variable->vartype;
|
||||
Oid typeModId = variable->vartypmod;
|
||||
Oid collationId = variable->varcollid;
|
||||
/*
|
||||
* Other types of expressions are probably also fine to be used, but
|
||||
* none of the callers need support for them for now, so we haven't
|
||||
* tested them (*1).
|
||||
*/
|
||||
Assert(IsA(rightArg, Const) || IsA(rightArg, Var));
|
||||
|
||||
Oid typeId = leftVar->vartype;
|
||||
Oid collationId = leftVar->varcollid;
|
||||
|
||||
Oid accessMethodId = BTREE_AM_OID;
|
||||
|
||||
|
|
@ -3104,18 +3113,16 @@ MakeOpExpression(Var *variable, int16 strategyNumber)
|
|||
*/
|
||||
if (operatorClassInputType != typeId && typeType != TYPTYPE_PSEUDO)
|
||||
{
|
||||
variable = (Var *) makeRelabelType((Expr *) variable, operatorClassInputType,
|
||||
-1, collationId, COERCE_IMPLICIT_CAST);
|
||||
leftVar = (Var *) makeRelabelType((Expr *) leftVar, operatorClassInputType,
|
||||
-1, collationId, COERCE_IMPLICIT_CAST);
|
||||
}
|
||||
|
||||
Const *constantValue = makeNullConst(operatorClassInputType, typeModId, collationId);
|
||||
|
||||
/* Now make the expression with the given variable and a null constant */
|
||||
OpExpr *expression = (OpExpr *) make_opclause(operatorId,
|
||||
InvalidOid, /* no result type yet */
|
||||
false, /* no return set */
|
||||
(Expr *) variable,
|
||||
(Expr *) constantValue,
|
||||
(Expr *) leftVar,
|
||||
rightArg,
|
||||
InvalidOid, collationId);
|
||||
|
||||
/* Set implementing function id and result type */
|
||||
|
|
@ -3126,6 +3133,31 @@ MakeOpExpression(Var *variable, int16 strategyNumber)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* MakeOpExpression is a wrapper around MakeOpExpressionExtended
|
||||
* that creates a null constant of the appropriate type for right
|
||||
* hand side operator class input type. As a result, it builds an
|
||||
* operator expression node that's of the form "Var <op> NULL".
|
||||
*/
|
||||
OpExpr *
|
||||
MakeOpExpression(Var *leftVar, int16 strategyNumber)
|
||||
{
|
||||
Oid typeId = leftVar->vartype;
|
||||
Oid typeModId = leftVar->vartypmod;
|
||||
Oid collationId = leftVar->varcollid;
|
||||
|
||||
Oid accessMethodId = BTREE_AM_OID;
|
||||
|
||||
OperatorCacheEntry *operatorCacheEntry = LookupOperatorByType(typeId, accessMethodId,
|
||||
strategyNumber);
|
||||
Oid operatorClassInputType = operatorCacheEntry->operatorClassInputType;
|
||||
|
||||
Const *constantValue = makeNullConst(operatorClassInputType, typeModId, collationId);
|
||||
|
||||
return MakeOpExpressionExtended(leftVar, (Expr *) constantValue, strategyNumber);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LookupOperatorByType is a wrapper around GetOperatorByType(),
|
||||
* operatorClassInputType() and get_typtype() functions that uses a cache to avoid
|
||||
|
|
|
|||
|
|
@ -1604,10 +1604,19 @@ MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context)
|
|||
|
||||
/*
|
||||
* TargetEntryChangesValue determines whether the given target entry may
|
||||
* change the value in a given column, given a join tree. The result is
|
||||
* true unless the expression refers directly to the column, or the
|
||||
* expression is a value that is implied by the qualifiers of the join
|
||||
* tree, or the target entry sets a different column.
|
||||
* change the value given a column and a join tree.
|
||||
*
|
||||
* The function assumes that the "targetEntry" references given "column"
|
||||
* Var via its "resname" and is used as part of a modify query. This means
|
||||
* that, for example, for an update query, the input "targetEntry" constructs
|
||||
* the following assignment operation as part of the SET clause:
|
||||
* "col_a = expr_a ", where, "col_a" refers to input "column" Var (via
|
||||
* "resname") as per the assumption written above. And we want to understand
|
||||
* if "expr_a" (which is pointed to by targetEntry->expr) refers directly to
|
||||
* the "column" Var, or "expr_a" is a value that is implied to be equal
|
||||
* to "column" Var by the qualifiers of the join tree. If so, we know that
|
||||
* the value of "col_a" effectively cannot be changed by this assignment
|
||||
* operation.
|
||||
*/
|
||||
bool
|
||||
TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree)
|
||||
|
|
@ -1618,11 +1627,36 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
|
|||
if (IsA(setExpr, Var))
|
||||
{
|
||||
Var *newValue = (Var *) setExpr;
|
||||
if (newValue->varattno == column->varattno)
|
||||
if (column->varno == newValue->varno &&
|
||||
column->varattno == newValue->varattno)
|
||||
{
|
||||
/* target entry of the form SET col = table.col */
|
||||
/*
|
||||
* Target entry is of the form "SET col_a = foo.col_b",
|
||||
* where foo also points to the same range table entry
|
||||
* and col_a and col_b are the same. So, effectively
|
||||
* they're literally referring to the same column.
|
||||
*/
|
||||
isColumnValueChanged = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
List *restrictClauseList = WhereClauseList(joinTree);
|
||||
OpExpr *equalityExpr = MakeOpExpressionExtended(column, (Expr *) newValue,
|
||||
BTEqualStrategyNumber);
|
||||
|
||||
bool predicateIsImplied = predicate_implied_by(list_make1(equalityExpr),
|
||||
restrictClauseList, false);
|
||||
if (predicateIsImplied)
|
||||
{
|
||||
/*
|
||||
* Target entry is of the form
|
||||
* "SET col_a = foo.col_b WHERE col_a = foo.col_b (AND (...))",
|
||||
* where foo points to a different relation or it points
|
||||
* to the same relation but col_a is not the same column as col_b.
|
||||
*/
|
||||
isColumnValueChanged = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (IsA(setExpr, Const))
|
||||
{
|
||||
|
|
@ -1643,7 +1677,10 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
|
|||
restrictClauseList, false);
|
||||
if (predicateIsImplied)
|
||||
{
|
||||
/* target entry of the form SET col = <x> WHERE col = <x> AND ... */
|
||||
/*
|
||||
* Target entry is of the form
|
||||
* "SET col_a = const_a WHERE col_a = const_a (AND (...))".
|
||||
*/
|
||||
isColumnValueChanged = false;
|
||||
}
|
||||
}
|
||||
|
|
@ -2245,6 +2282,18 @@ SelectsFromDistributedTable(List *rangeTableList, Query *query)
|
|||
continue;
|
||||
}
|
||||
|
||||
#if PG_VERSION_NUM >= 150013 && PG_VERSION_NUM < PG_VERSION_16
|
||||
if (rangeTableEntry->rtekind == RTE_SUBQUERY && rangeTableEntry->relkind == 0)
|
||||
{
|
||||
/*
|
||||
* In PG15.13 commit https://github.com/postgres/postgres/commit/317aba70e
|
||||
* relid is retained when converting views to subqueries,
|
||||
* so we need an extra check identifying those views
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (rangeTableEntry->relkind == RELKIND_VIEW ||
|
||||
rangeTableEntry->relkind == RELKIND_MATVIEW)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -962,6 +962,7 @@ shard_name(PG_FUNCTION_ARGS)
|
|||
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
int64 shardId = PG_GETARG_INT64(1);
|
||||
bool skipQualifyPublic = PG_GETARG_BOOL(2);
|
||||
|
||||
char *qualifiedName = NULL;
|
||||
|
||||
|
|
@ -991,7 +992,7 @@ shard_name(PG_FUNCTION_ARGS)
|
|||
Oid schemaId = get_rel_namespace(relationId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
|
||||
if (strncmp(schemaName, "public", NAMEDATALEN) == 0)
|
||||
if (skipQualifyPublic && strncmp(schemaName, "public", NAMEDATALEN) == 0)
|
||||
{
|
||||
qualifiedName = (char *) quote_identifier(relationName);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -215,6 +215,7 @@ static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSour
|
|||
static void CitusAuthHook(Port *port, int status);
|
||||
static bool IsSuperuser(char *userName);
|
||||
static void AdjustDynamicLibraryPathForCdcDecoders(void);
|
||||
static void EnableChangeDataCaptureAssignHook(bool newval, void *extra);
|
||||
|
||||
static ClientAuthentication_hook_type original_client_auth_hook = NULL;
|
||||
static emit_log_hook_type original_emit_log_hook = NULL;
|
||||
|
|
@ -1272,7 +1273,7 @@ RegisterCitusConfigVariables(void)
|
|||
false,
|
||||
PGC_USERSET,
|
||||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
NULL, EnableChangeDataCaptureAssignHook, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.enable_cluster_clock",
|
||||
|
|
@ -3272,3 +3273,19 @@ CitusObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId, int su
|
|||
SetCreateCitusTransactionLevel(GetCurrentTransactionNestLevel());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnableChangeDataCaptureAssignHook is called whenever the
|
||||
* citus.enable_change_data_capture setting is changed to dynamically
|
||||
* adjust the dynamic_library_path based on the new value.
|
||||
*/
|
||||
static void
|
||||
EnableChangeDataCaptureAssignHook(bool newval, void *extra)
|
||||
{
|
||||
if (newval)
|
||||
{
|
||||
/* CDC enabled: add citus_decoders to the path */
|
||||
AdjustDynamicLibraryPathForCdcDecoders();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,3 +50,11 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
|
|||
#include "udfs/citus_is_primary_node/13.1-1.sql"
|
||||
#include "udfs/citus_stat_counters/13.1-1.sql"
|
||||
#include "udfs/citus_stat_counters_reset/13.1-1.sql"
|
||||
#include "udfs/citus_nodes/13.1-1.sql"
|
||||
|
||||
-- Since shard_name/13.1-1.sql first drops the function and then creates it, we first
|
||||
-- need to drop citus_shards view since that view depends on this function. And immediately
|
||||
-- after creating the function, we recreate citus_shards view again.
|
||||
DROP VIEW pg_catalog.citus_shards;
|
||||
#include "udfs/shard_name/13.1-1.sql"
|
||||
#include "udfs/citus_shards/12.0-1.sql"
|
||||
|
|
|
|||
|
|
@ -45,3 +45,25 @@ DROP FUNCTION citus_internal.is_replication_origin_tracking_active();
|
|||
DROP VIEW pg_catalog.citus_stat_counters;
|
||||
DROP FUNCTION pg_catalog.citus_stat_counters(oid);
|
||||
DROP FUNCTION pg_catalog.citus_stat_counters_reset(oid);
|
||||
DROP VIEW IF EXISTS pg_catalog.citus_nodes;
|
||||
|
||||
-- Definition of shard_name() prior to this release doesn't have a separate SQL file
|
||||
-- because it's quite an old UDF that its prior definition(s) was(were) squashed into
|
||||
-- citus--8.0-1.sql. For this reason, to downgrade it, here we directly execute its old
|
||||
-- definition instead of including it from such a separate file.
|
||||
--
|
||||
-- And before dropping and creating the function, we also need to drop citus_shards view
|
||||
-- since it depends on it. And immediately after creating the function, we recreate
|
||||
-- citus_shards view again.
|
||||
|
||||
DROP VIEW pg_catalog.citus_shards;
|
||||
|
||||
DROP FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint, skip_qualify_public boolean);
|
||||
CREATE FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint)
|
||||
RETURNS text
|
||||
LANGUAGE C STABLE STRICT
|
||||
AS 'MODULE_PATHNAME', $$shard_name$$;
|
||||
COMMENT ON FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint)
|
||||
IS 'returns schema-qualified, shard-extended identifier of object name';
|
||||
|
||||
#include "../udfs/citus_shards/12.0-1.sql"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,18 @@
|
|||
SET search_path = 'pg_catalog';
|
||||
DROP VIEW IF EXISTS pg_catalog.citus_nodes;
|
||||
|
||||
CREATE OR REPLACE VIEW citus.citus_nodes AS
|
||||
SELECT
|
||||
nodename,
|
||||
nodeport,
|
||||
CASE
|
||||
WHEN groupid = 0 THEN 'coordinator'
|
||||
ELSE 'worker'
|
||||
END AS role,
|
||||
isactive AS active
|
||||
FROM pg_dist_node;
|
||||
|
||||
ALTER VIEW citus.citus_nodes SET SCHEMA pg_catalog;
|
||||
GRANT SELECT ON pg_catalog.citus_nodes TO PUBLIC;
|
||||
|
||||
RESET search_path;
|
||||
|
|
@ -0,0 +1,18 @@
|
|||
SET search_path = 'pg_catalog';
|
||||
DROP VIEW IF EXISTS pg_catalog.citus_nodes;
|
||||
|
||||
CREATE OR REPLACE VIEW citus.citus_nodes AS
|
||||
SELECT
|
||||
nodename,
|
||||
nodeport,
|
||||
CASE
|
||||
WHEN groupid = 0 THEN 'coordinator'
|
||||
ELSE 'worker'
|
||||
END AS role,
|
||||
isactive AS active
|
||||
FROM pg_dist_node;
|
||||
|
||||
ALTER VIEW citus.citus_nodes SET SCHEMA pg_catalog;
|
||||
GRANT SELECT ON pg_catalog.citus_nodes TO PUBLIC;
|
||||
|
||||
RESET search_path;
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
-- skip_qualify_public is set to true by default just for backward compatibility
|
||||
DROP FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint);
|
||||
CREATE FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint, skip_qualify_public boolean DEFAULT true)
|
||||
RETURNS text
|
||||
LANGUAGE C STABLE STRICT
|
||||
AS 'MODULE_PATHNAME', $$shard_name$$;
|
||||
COMMENT ON FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint, skip_qualify_public boolean)
|
||||
IS 'returns schema-qualified, shard-extended identifier of object name';
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
-- skip_qualify_public is set to true by default just for backward compatibility
|
||||
DROP FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint);
|
||||
CREATE FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint, skip_qualify_public boolean DEFAULT true)
|
||||
RETURNS text
|
||||
LANGUAGE C STABLE STRICT
|
||||
AS 'MODULE_PATHNAME', $$shard_name$$;
|
||||
COMMENT ON FUNCTION pg_catalog.shard_name(object_name regclass, shard_id bigint, skip_qualify_public boolean)
|
||||
IS 'returns schema-qualified, shard-extended identifier of object name';
|
||||
|
|
@ -106,7 +106,9 @@ LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId out
|
|||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction);
|
||||
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
||||
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
CatalogTupleInsert(pgDistTransaction, heapTuple);
|
||||
PopActiveSnapshot();
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
|
|
|
|||
|
|
@ -549,7 +549,8 @@ extern DistributedPlan * CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree,
|
|||
plannerRestrictionContext);
|
||||
extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
|
||||
char *queryString);
|
||||
|
||||
extern OpExpr * MakeOpExpressionExtended(Var *leftVar, Expr *rightArg,
|
||||
int16 strategyNumber);
|
||||
extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
|
||||
extern Node * WrapUngroupedVarsInAnyValueAggregate(Node *expression,
|
||||
List *groupClauseList,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,81 @@
|
|||
-- Test for CDC library path adjustment functionality
|
||||
-- This test verifies that the AdjustDynamicLibraryPathForCdcDecoders function with superuser privileges
|
||||
-- correctly modifies the dynamic_library_path when CDC is enabled
|
||||
-- Test 1: Show initial state and reset to ensure clean state
|
||||
SHOW dynamic_library_path;
|
||||
dynamic_library_path
|
||||
---------------------------------------------------------------------
|
||||
$libdir
|
||||
(1 row)
|
||||
|
||||
SHOW citus.enable_change_data_capture;
|
||||
citus.enable_change_data_capture
|
||||
---------------------------------------------------------------------
|
||||
off
|
||||
(1 row)
|
||||
|
||||
-- Test 2: Enable CDC and verify path is set to include citus_decoders
|
||||
SET citus.enable_change_data_capture = true;
|
||||
SHOW dynamic_library_path;
|
||||
dynamic_library_path
|
||||
---------------------------------------------------------------------
|
||||
$libdir/citus_decoders:$libdir
|
||||
(1 row)
|
||||
|
||||
-- Verify that the dynamic_library_path has been modified to include citus_decoders
|
||||
SELECT
|
||||
CASE
|
||||
WHEN current_setting('dynamic_library_path') LIKE '%citus_decoders%'
|
||||
THEN 'CDC path correctly set'
|
||||
ELSE 'CDC path incorrectly not set'
|
||||
END AS cdc_path_status;
|
||||
cdc_path_status
|
||||
---------------------------------------------------------------------
|
||||
CDC path correctly set
|
||||
(1 row)
|
||||
|
||||
-- Test 3: Disable CDC and verify path remains (CDC doesn't remove the path)
|
||||
SET citus.enable_change_data_capture = false;
|
||||
SHOW dynamic_library_path;
|
||||
dynamic_library_path
|
||||
---------------------------------------------------------------------
|
||||
$libdir/citus_decoders:$libdir
|
||||
(1 row)
|
||||
|
||||
-- Test 4: Edge case - function should only work when path is exactly "$libdir"
|
||||
SET dynamic_library_path = '$libdir/other_path:$libdir';
|
||||
SET citus.enable_change_data_capture = true;
|
||||
SHOW dynamic_library_path;
|
||||
dynamic_library_path
|
||||
---------------------------------------------------------------------
|
||||
$libdir/other_path:$libdir
|
||||
(1 row)
|
||||
|
||||
-- Verify that path is unchanged with custom library path
|
||||
SELECT
|
||||
CASE
|
||||
WHEN current_setting('dynamic_library_path') LIKE '%citus_decoders%'
|
||||
THEN 'CDC path incorrectly set'
|
||||
ELSE 'CDC path correctly not set'
|
||||
END AS custom_path_test;
|
||||
custom_path_test
|
||||
---------------------------------------------------------------------
|
||||
CDC path correctly not set
|
||||
(1 row)
|
||||
|
||||
-- Reset dynamic_library_path to default
|
||||
RESET dynamic_library_path;
|
||||
RESET citus.enable_change_data_capture;
|
||||
-- Test 5: Verify that dynamic_library_path reset_val is overridden to $libdir/citus_decoders:$libdir
|
||||
SHOW dynamic_library_path;
|
||||
dynamic_library_path
|
||||
---------------------------------------------------------------------
|
||||
$libdir/citus_decoders:$libdir
|
||||
(1 row)
|
||||
|
||||
SHOW citus.enable_change_data_capture;
|
||||
citus.enable_change_data_capture
|
||||
---------------------------------------------------------------------
|
||||
off
|
||||
(1 row)
|
||||
|
||||
|
|
@ -33,5 +33,33 @@ SELECT * FROM citus_shards;
|
|||
t1 | 99456903 | citus_shards.t1_99456903 | distributed | 456900 | localhost | 57638 | 8192
|
||||
(8 rows)
|
||||
|
||||
SET search_path TO public;
|
||||
CREATE TABLE t3 (i int);
|
||||
SELECT citus_add_local_table_to_metadata('t3');
|
||||
citus_add_local_table_to_metadata
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT shard_name('t3', shardid) FROM pg_dist_shard WHERE logicalrelid = 't3'::regclass;
|
||||
shard_name
|
||||
---------------------------------------------------------------------
|
||||
t3_99456908
|
||||
(1 row)
|
||||
|
||||
SELECT shard_name('t3', shardid, true) FROM pg_dist_shard WHERE logicalrelid = 't3'::regclass;
|
||||
shard_name
|
||||
---------------------------------------------------------------------
|
||||
t3_99456908
|
||||
(1 row)
|
||||
|
||||
SELECT shard_name('t3', shardid, false) FROM pg_dist_shard WHERE logicalrelid = 't3'::regclass;
|
||||
shard_name
|
||||
---------------------------------------------------------------------
|
||||
public.t3_99456908
|
||||
(1 row)
|
||||
|
||||
DROP TABLE t3;
|
||||
SET search_path TO citus_shards;
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA citus_shards CASCADE;
|
||||
|
|
|
|||
|
|
@ -193,13 +193,148 @@ SQL function "compare_data" statement 2
|
|||
|
||||
(1 row)
|
||||
|
||||
---- https://github.com/citusdata/citus/issues/8180 ----
|
||||
CREATE TABLE dist_1 (a int, b int, c int);
|
||||
CREATE TABLE dist_2 (a int, b int, c int);
|
||||
CREATE TABLE dist_different_order_1 (b int, a int, c int);
|
||||
SELECT create_distributed_table('dist_1', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_2', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_different_order_1', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2
|
||||
ON (dist_1.a = dist_2.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_2.b;
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = src.b;
|
||||
MERGE INTO dist_different_order_1
|
||||
USING dist_1
|
||||
ON (dist_different_order_1.a = dist_1.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_1.b;
|
||||
CREATE TABLE dist_1_cast (a int, b int);
|
||||
CREATE TABLE dist_2_cast (a int, b numeric);
|
||||
SELECT create_distributed_table('dist_1_cast', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_2_cast', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
MERGE INTO dist_1_cast
|
||||
USING dist_2_cast
|
||||
ON (dist_1_cast.a = dist_2_cast.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
|
||||
ERROR: In the MERGE ON clause, there is a datatype mismatch between target's distribution column and the expression originating from the source.
|
||||
DETAIL: If the types are different, Citus uses different hash functions for the two column types, which might lead to incorrect repartitioning of the result data
|
||||
MERGE INTO dist_1_cast
|
||||
USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast
|
||||
ON (dist_1_cast.a = dist_2_cast.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
|
||||
-- a more sophisticated example
|
||||
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
SELECT create_distributed_table('dist_source', 'tstamp_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_target', 'int_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(1001, 2000) i;
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(901, 1000) i;
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1501, 2000) i;
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i-1,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1401, 1500) i;
|
||||
INSERT INTO local_source SELECT * FROM dist_source;
|
||||
INSERT INTO local_target SELECT * FROM dist_target;
|
||||
-- execute the query on distributed tables
|
||||
MERGE INTO dist_target target_alias
|
||||
USING dist_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
||||
MERGE INTO local_target target_alias
|
||||
USING local_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
-- compare both targets
|
||||
SELECT COUNT(*) = 0 AS targets_match
|
||||
FROM (
|
||||
SELECT * FROM dist_target
|
||||
EXCEPT
|
||||
SELECT * FROM local_target
|
||||
UNION ALL
|
||||
SELECT * FROM local_target
|
||||
EXCEPT
|
||||
SELECT * FROM dist_target
|
||||
) q;
|
||||
targets_match
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA merge_repartition2_schema CASCADE;
|
||||
NOTICE: drop cascades to 8 other objects
|
||||
DETAIL: drop cascades to table pg_target
|
||||
drop cascades to table pg_source
|
||||
drop cascades to function cleanup_data()
|
||||
drop cascades to function setup_data()
|
||||
drop cascades to function check_data(text,text,text,text)
|
||||
drop cascades to function compare_data()
|
||||
drop cascades to table citus_target
|
||||
drop cascades to table citus_source
|
||||
|
|
|
|||
|
|
@ -394,9 +394,9 @@ DEBUG: Wrapping relation "mat_view_on_part_dist" "foo" to a subquery
|
|||
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM mixed_relkind_tests.mat_view_on_part_dist foo WHERE true
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE mixed_relkind_tests.partitioned_distributed_table SET a = foo.a FROM (SELECT foo_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) foo_1) foo WHERE (foo.a OPERATOR(pg_catalog.=) partitioned_distributed_table.a)
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a;
|
||||
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a;
|
||||
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
-- should work
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM view_on_part_dist AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
|
|
|
|||
|
|
@ -72,6 +72,45 @@ SELECT master_get_active_worker_nodes();
|
|||
(localhost,57637)
|
||||
(2 rows)
|
||||
|
||||
-- get all nodes
|
||||
SELECT * from citus_nodes;
|
||||
nodename | nodeport | role | active
|
||||
---------------------------------------------------------------------
|
||||
localhost | 57637 | worker | t
|
||||
localhost | 57638 | worker | t
|
||||
localhost | 57636 | coordinator | t
|
||||
(3 rows)
|
||||
|
||||
-- get get active nodes
|
||||
SELECT * from citus_nodes where active = 't';
|
||||
nodename | nodeport | role | active
|
||||
---------------------------------------------------------------------
|
||||
localhost | 57637 | worker | t
|
||||
localhost | 57638 | worker | t
|
||||
localhost | 57636 | coordinator | t
|
||||
(3 rows)
|
||||
|
||||
-- get coordinator nodes
|
||||
SELECT * from citus_nodes where role = 'coordinator';
|
||||
nodename | nodeport | role | active
|
||||
---------------------------------------------------------------------
|
||||
localhost | 57636 | coordinator | t
|
||||
(1 row)
|
||||
|
||||
-- get worker nodes
|
||||
SELECT * from citus_nodes where role = 'worker';
|
||||
nodename | nodeport | role | active
|
||||
---------------------------------------------------------------------
|
||||
localhost | 57637 | worker | t
|
||||
localhost | 57638 | worker | t
|
||||
(2 rows)
|
||||
|
||||
-- get nodes with unknown role
|
||||
SELECT * from citus_nodes where role = 'foo';
|
||||
nodename | nodeport | role | active
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- try to add a node that is already in the cluster
|
||||
SELECT * FROM master_add_node('localhost', :worker_1_port);
|
||||
master_add_node
|
||||
|
|
@ -126,6 +165,34 @@ SELECT master_get_active_worker_nodes();
|
|||
(localhost,57637)
|
||||
(1 row)
|
||||
|
||||
-- get get active nodes
|
||||
SELECT * from citus_nodes where active = 't';
|
||||
nodename | nodeport | role | active
|
||||
---------------------------------------------------------------------
|
||||
localhost | 57636 | coordinator | t
|
||||
localhost | 57637 | worker | t
|
||||
(2 rows)
|
||||
|
||||
-- get get inactive nodes
|
||||
SELECT * from citus_nodes where active = 'f';
|
||||
nodename | nodeport | role | active
|
||||
---------------------------------------------------------------------
|
||||
localhost | 57638 | worker | f
|
||||
(1 row)
|
||||
|
||||
-- make sure non-superusers can access the view
|
||||
CREATE ROLE normaluser;
|
||||
SET ROLE normaluser;
|
||||
SELECT * FROM citus_nodes;
|
||||
nodename | nodeport | role | active
|
||||
---------------------------------------------------------------------
|
||||
localhost | 57636 | coordinator | t
|
||||
localhost | 57638 | worker | f
|
||||
localhost | 57637 | worker | t
|
||||
(3 rows)
|
||||
|
||||
SET ROLE postgres;
|
||||
DROP ROLE normaluser;
|
||||
-- add some shard placements to the cluster
|
||||
SET citus.shard_count TO 16;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
|
|
|||
|
|
@ -1455,6 +1455,7 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
previous_object | current_object
|
||||
---------------------------------------------------------------------
|
||||
function citus_unmark_object_distributed(oid,oid,integer) void |
|
||||
function shard_name(regclass,bigint) text |
|
||||
| function citus_internal.acquire_citus_advisory_object_class_lock(integer,cstring) void
|
||||
| function citus_internal.add_colocation_metadata(integer,integer,integer,regtype,oid) void
|
||||
| function citus_internal.add_object_metadata(text,text[],text[],integer,integer,boolean) void
|
||||
|
|
@ -1483,15 +1484,17 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
| function citus_stat_counters(oid) SETOF record
|
||||
| function citus_stat_counters_reset(oid) void
|
||||
| function citus_unmark_object_distributed(oid,oid,integer,boolean) void
|
||||
| function shard_name(regclass,bigint,boolean) text
|
||||
| view citus_nodes
|
||||
| view citus_stat_counters
|
||||
(30 rows)
|
||||
(33 rows)
|
||||
|
||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||
-- show running version
|
||||
SHOW citus.version;
|
||||
citus.version
|
||||
---------------------------------------------------------------------
|
||||
13.1devel
|
||||
13.1.1
|
||||
(1 row)
|
||||
|
||||
-- ensure no unexpected objects were created outside pg_catalog
|
||||
|
|
|
|||
|
|
@ -3637,5 +3637,157 @@ SELECT id, val FROM version_dist_union ORDER BY id;
|
|||
(6 rows)
|
||||
|
||||
-- End of Issue #7784
|
||||
-- PR #8106 — CTE traversal works when following outer Vars
|
||||
-- This script exercises three shapes:
|
||||
-- T1) CTE referenced inside a correlated subquery (one level down)
|
||||
-- T2) CTE referenced inside a nested subquery (two levels down)
|
||||
-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
|
||||
CREATE SCHEMA pr8106_cte_outervar;
|
||||
SET search_path = pr8106_cte_outervar, public;
|
||||
-- Base tables for the tests
|
||||
DROP TABLE IF EXISTS raw_events_first CASCADE;
|
||||
NOTICE: table "raw_events_first" does not exist, skipping
|
||||
DROP TABLE IF EXISTS agg_events CASCADE;
|
||||
NOTICE: table "agg_events" does not exist, skipping
|
||||
CREATE TABLE raw_events_first(
|
||||
user_id int,
|
||||
value_1 int
|
||||
);
|
||||
CREATE TABLE agg_events(
|
||||
user_id int,
|
||||
value_1_agg int
|
||||
);
|
||||
-- Distribute and colocate (distribution key = user_id)
|
||||
SELECT create_distributed_table('raw_events_first', 'user_id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('agg_events', 'user_id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Seed data (duplicates on some user_ids; some NULL value_1’s)
|
||||
INSERT INTO raw_events_first(user_id, value_1) VALUES
|
||||
(1, 10), (1, 20), (1, NULL),
|
||||
(2, NULL),
|
||||
(3, 30),
|
||||
(4, NULL),
|
||||
(5, 50), (5, NULL),
|
||||
(6, NULL);
|
||||
---------------------------------------------------------------------
|
||||
-- T1) CTE referenced inside a correlated subquery (one level down)
|
||||
---------------------------------------------------------------------
|
||||
TRUNCATE agg_events;
|
||||
WITH c AS MATERIALIZED (
|
||||
SELECT user_id FROM raw_events_first
|
||||
)
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT t.user_id
|
||||
FROM raw_events_first t
|
||||
WHERE EXISTS (SELECT 1 FROM c WHERE c.user_id = t.user_id);
|
||||
-- Expect one insert per row in raw_events_first (EXISTS always true per user_id)
|
||||
SELECT 't1_count_matches' AS test,
|
||||
(SELECT count(*) FROM agg_events) =
|
||||
(SELECT count(*) FROM raw_events_first) AS ok;
|
||||
test | ok
|
||||
---------------------------------------------------------------------
|
||||
t1_count_matches | t
|
||||
(1 row)
|
||||
|
||||
-- Spot-check: how many rows were inserted
|
||||
SELECT 't1_rows' AS test, count(*) AS rows FROM agg_events;
|
||||
test | rows
|
||||
---------------------------------------------------------------------
|
||||
t1_rows | 9
|
||||
(1 row)
|
||||
|
||||
---------------------------------------------------------------------
|
||||
-- T2) CTE referenced inside a nested subquery (two levels down)
|
||||
---------------------------------------------------------------------
|
||||
TRUNCATE agg_events;
|
||||
WITH c AS MATERIALIZED (
|
||||
SELECT user_id FROM raw_events_first
|
||||
)
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT t.user_id
|
||||
FROM raw_events_first t
|
||||
WHERE EXISTS (
|
||||
SELECT 1
|
||||
FROM (SELECT user_id FROM c) c2
|
||||
WHERE c2.user_id = t.user_id
|
||||
);
|
||||
-- Same cardinality expectation as T1
|
||||
SELECT 't2_count_matches' AS test,
|
||||
(SELECT count(*) FROM agg_events) =
|
||||
(SELECT count(*) FROM raw_events_first) AS ok;
|
||||
test | ok
|
||||
---------------------------------------------------------------------
|
||||
t2_count_matches | t
|
||||
(1 row)
|
||||
|
||||
SELECT 't2_rows' AS test, count(*) AS rows FROM agg_events;
|
||||
test | rows
|
||||
---------------------------------------------------------------------
|
||||
t2_rows | 9
|
||||
(1 row)
|
||||
|
||||
---------------------------------------------------------------------
|
||||
-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
|
||||
-- (use MAX() to keep scalar subquery single-row)
|
||||
---------------------------------------------------------------------
|
||||
TRUNCATE agg_events;
|
||||
WITH c AS (SELECT user_id, value_1 FROM raw_events_first)
|
||||
INSERT INTO agg_events (user_id, value_1_agg)
|
||||
SELECT d.user_id, d.value_1_agg
|
||||
FROM (
|
||||
SELECT t.user_id,
|
||||
(SELECT max(c.value_1) FROM c WHERE c.user_id = t.user_id) AS value_1_agg
|
||||
FROM raw_events_first t
|
||||
) AS d
|
||||
WHERE d.value_1_agg IS NOT NULL;
|
||||
-- Expect one insert per row in raw_events_first whose user_id has at least one non-NULL value_1
|
||||
SELECT 't3_count_matches' AS test,
|
||||
(SELECT count(*) FROM agg_events) =
|
||||
(
|
||||
SELECT count(*)
|
||||
FROM raw_events_first t
|
||||
WHERE EXISTS (
|
||||
SELECT 1 FROM raw_events_first c
|
||||
WHERE c.user_id = t.user_id AND c.value_1 IS NOT NULL
|
||||
)
|
||||
) AS ok;
|
||||
test | ok
|
||||
---------------------------------------------------------------------
|
||||
t3_count_matches | t
|
||||
(1 row)
|
||||
|
||||
-- Also verify no NULLs were inserted into value_1_agg
|
||||
SELECT 't3_no_null_value_1_agg' AS test,
|
||||
NOT EXISTS (SELECT 1 FROM agg_events WHERE value_1_agg IS NULL) AS ok;
|
||||
test | ok
|
||||
---------------------------------------------------------------------
|
||||
t3_no_null_value_1_agg | t
|
||||
(1 row)
|
||||
|
||||
-- Deterministic sample of results
|
||||
SELECT 't3_sample' AS test, user_id, value_1_agg
|
||||
FROM agg_events
|
||||
ORDER BY user_id
|
||||
LIMIT 5;
|
||||
test | user_id | value_1_agg
|
||||
---------------------------------------------------------------------
|
||||
t3_sample | 1 | 20
|
||||
t3_sample | 1 | 20
|
||||
t3_sample | 1 | 20
|
||||
t3_sample | 3 | 30
|
||||
t3_sample | 5 | 50
|
||||
(5 rows)
|
||||
|
||||
-- End of PR #8106 — CTE traversal works when following outer Vars
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA pr8106_cte_outervar CASCADE;
|
||||
DROP SCHEMA multi_insert_select CASCADE;
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ SET citus.shard_count TO 32;
|
|||
SET citus.next_shard_id TO 750000;
|
||||
SET citus.next_placement_id TO 750000;
|
||||
CREATE SCHEMA multi_modifications;
|
||||
SET search_path TO multi_modifications;
|
||||
-- some failure messages that comes from the worker nodes
|
||||
-- might change due to parallel executions, so suppress those
|
||||
-- using \set VERBOSITY terse
|
||||
|
|
@ -31,8 +32,12 @@ SELECT create_distributed_table('limit_orders', 'id', 'hash');
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('multiple_hash', 'id', 'hash');
|
||||
ERROR: column "id" of relation "multiple_hash" does not exist
|
||||
SELECT create_distributed_table('multiple_hash', 'category', 'hash');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('range_partitioned', 'id', 'range');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
|
@ -338,22 +343,26 @@ ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001
|
|||
-- Test that shards which miss a modification are marked unhealthy
|
||||
-- First: Connect to the second worker node
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO multi_modifications;
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
-- Fourth: Perform an INSERT on the remaining node
|
||||
-- the whole transaction should fail
|
||||
\set VERBOSITY terse
|
||||
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
ERROR: relation "public.limit_orders_750000" does not exist
|
||||
ERROR: relation "multi_modifications.limit_orders_750000" does not exist
|
||||
-- set the shard name back
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO multi_modifications;
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
|
||||
-- Verify the insert failed and both placements are healthy
|
||||
-- or the insert succeeded and placement marked unhealthy
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO multi_modifications;
|
||||
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
|
|
@ -361,6 +370,7 @@ SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
|
|||
(1 row)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO multi_modifications;
|
||||
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
|
|
@ -368,6 +378,7 @@ SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
|
|||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
SELECT count(*) FROM limit_orders WHERE id = 276;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
|
|
@ -388,14 +399,16 @@ AND s.logicalrelid = 'limit_orders'::regclass;
|
|||
-- Test that if all shards miss a modification, no state change occurs
|
||||
-- First: Connect to the first worker node
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO multi_modifications;
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
-- Fourth: Perform an INSERT on the remaining node
|
||||
\set VERBOSITY terse
|
||||
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
ERROR: relation "public.limit_orders_750000" does not exist
|
||||
ERROR: relation "multi_modifications.limit_orders_750000" does not exist
|
||||
\set VERBOSITY DEFAULT
|
||||
-- Last: Verify worker is still healthy
|
||||
SELECT count(*)
|
||||
|
|
@ -414,10 +427,12 @@ AND s.logicalrelid = 'limit_orders'::regclass;
|
|||
-- Undo our change...
|
||||
-- First: Connect to the first worker node
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO multi_modifications;
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
-- attempting to change the partition key is unsupported
|
||||
UPDATE limit_orders SET id = 0 WHERE id = 246;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
|
|
@ -427,6 +442,368 @@ ERROR: modifying the partition value of rows is not allowed
|
|||
UPDATE limit_orders SET id = 246 WHERE id = 246;
|
||||
UPDATE limit_orders SET id = 246 WHERE id = 246 AND symbol = 'GM';
|
||||
UPDATE limit_orders SET id = limit_orders.id WHERE id = 246;
|
||||
CREATE TABLE dist_1 (a int, b int, c int);
|
||||
CREATE TABLE dist_2 (a int, b int, c int);
|
||||
CREATE TABLE dist_non_colocated (a int, b int, c int);
|
||||
CREATE TABLE dist_different_order_1 (b int, a int, c int);
|
||||
SELECT create_distributed_table('dist_1', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_2', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_non_colocated', 'a', colocate_with=>'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_different_order_1', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
--
|
||||
-- https://github.com/citusdata/citus/issues/8087
|
||||
--
|
||||
---- update: should work ----
|
||||
-- setting shard key to itself --
|
||||
UPDATE dist_1 SET a = dist_1.a;
|
||||
UPDATE dist_1 SET a = dist_1.a WHERE dist_1.a > dist_1.b AND dist_1.b > 10;
|
||||
UPDATE dist_1 SET a = dist_1.a FROM dist_2 WHERE dist_1.a = dist_2.a;
|
||||
-- setting shard key to another var that's implied to be equal to shard key --
|
||||
UPDATE dist_1 SET a = b WHERE a = b;
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a;
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.b = dist_2.c AND (dist_2.c > 5 OR dist_2.c < 0);
|
||||
with cte as (
|
||||
select a, b from dist_1
|
||||
)
|
||||
update dist_1 set a = cte.a from cte where dist_1.a = cte.a;
|
||||
with cte as (
|
||||
select a as x, b as y from (select a, b from dist_1 limit 100) dt where b > 100
|
||||
)
|
||||
update dist_1 set a = cte.x from cte where dist_1.a = cte.x;
|
||||
with cte as (
|
||||
select d2.a as x, d1.b as y
|
||||
from dist_1 d1, dist_different_order_1 d2
|
||||
where d1.a=d2.a)
|
||||
update dist_1 set a = cte.x from cte where y != 0 and dist_1.a = cte.x;
|
||||
with cte as (
|
||||
select * from (select a as x, b as y from dist_2 limit 100) q
|
||||
)
|
||||
update dist_1 set a = cte.x from cte where b = cte.y and cte.y = a and a = cte.x;
|
||||
-- supported although the where clause will certainly eval to false
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.a = 5 AND dist_2.a = 7;
|
||||
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
|
||||
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
|
||||
-- test with extra quals
|
||||
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a AND dist_1.b = dist_different_order_1.c AND (dist_different_order_1.c > 5 OR dist_different_order_1.c < 0);
|
||||
---- update: errors in router planner ----
|
||||
-- different column of the same relation, which is not implied to be equal to shard key --
|
||||
UPDATE dist_1 SET a = dist_1.b;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a != dist_2.a;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a >= dist_2.a;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a OR dist_1.a > dist_2.a;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
UPDATE dist_1 SET a = dist_different_order_1.b FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
UPDATE dist_1 SET a = foo.a FROM dist_1 foo;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
UPDATE dist_1 SET a = foo.a FROM dist_1 foo WHERE dist_1.a != foo.a;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
-- (*1) Would normally expect this to not throw an error because
|
||||
-- dist_1.a = dist_2.b AND dist_2.b = dist_2.a,
|
||||
-- so dist_1.a = dist_2.a, so we should be able to deduce
|
||||
-- that (dist_1.)a = dist_2.a, but seems predicate_implied_by()
|
||||
-- is not that smart.
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.b AND dist_2.b = dist_2.a;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
-- and same here
|
||||
with cte as (
|
||||
select * from (select a as x, b as y from dist_different_order_1 limit 100) q
|
||||
)
|
||||
update dist_1 set a = cte.x from cte where a = cte.y and cte.y = b and b = cte.x;
|
||||
ERROR: modifying the partition value of rows is not allowed
|
||||
---- update: errors later (in logical or physical planner) ----
|
||||
-- setting shard key to itself --
|
||||
UPDATE dist_1 SET a = dist_1.a FROM dist_1 foo;
|
||||
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
|
||||
UPDATE dist_1 SET a = dist_1.a FROM dist_2 foo;
|
||||
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
|
||||
-- setting shard key to another var that's implied to be equal to shard key --
|
||||
UPDATE dist_1 SET a = dist_non_colocated.a FROM dist_non_colocated WHERE dist_1.a = dist_non_colocated.a;
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: dist_1 and dist_non_colocated are not colocated
|
||||
UPDATE dist_1 SET a = dist_2.b FROM dist_2 WHERE dist_1.a = dist_2.b;
|
||||
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
|
||||
---- update: a more sophisticated example ----
|
||||
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
SELECT create_distributed_table('dist_source', 'int_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_target', 'int_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(1001, 2000) i;
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(901, 1000) i;
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1501, 2000) i;
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i-1,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1401, 1500) i;
|
||||
INSERT INTO local_source SELECT * FROM dist_source;
|
||||
INSERT INTO local_target SELECT * FROM dist_target;
|
||||
-- execute the query on distributed tables
|
||||
UPDATE dist_target target_alias
|
||||
SET int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
FROM dist_source source_alias
|
||||
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
|
||||
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
||||
UPDATE local_target target_alias
|
||||
SET int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
FROM local_source source_alias
|
||||
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
|
||||
-- compare both targets
|
||||
SELECT COUNT(*) = 0 AS targets_match
|
||||
FROM (
|
||||
SELECT * FROM dist_target
|
||||
EXCEPT
|
||||
SELECT * FROM local_target
|
||||
UNION ALL
|
||||
SELECT * FROM local_target
|
||||
EXCEPT
|
||||
SELECT * FROM dist_target
|
||||
) q;
|
||||
targets_match
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
---- merge: should work ----
|
||||
-- setting shard key to itself --
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
|
||||
-- We don't care about action quals when deciding if the update
|
||||
-- could change the shard key, but still add some action quals for
|
||||
-- testing. See the comments written on top of the line we call
|
||||
-- TargetEntryChangesValue() in MergeQualAndTargetListFunctionsSupported().
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED AND dist_1.a > dist_1.b AND dist_1.b > 10 THEN UPDATE SET a = dist_1.a;
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
-- setting shard key to another var that's implied to be equal to shard key --
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = dist_1.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
-- test with extra quals
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b AND (dist_1.b > 1000 OR (dist_1.b < 500)))
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
|
||||
MERGE INTO dist_1
|
||||
USING dist_different_order_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
---- merge: errors in router planner ----
|
||||
-- different column of the same relation, which is not implied to be equal to shard key --
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
|
||||
ERROR: updating the distribution column is not allowed in MERGE actions
|
||||
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
ERROR: updating the distribution column is not allowed in MERGE actions
|
||||
-- as in (*1), this is not supported
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b AND src.b = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
ERROR: updating the distribution column is not allowed in MERGE actions
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (true)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
ERROR: updating the distribution column is not allowed in MERGE actions
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a <= src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
ERROR: updating the distribution column is not allowed in MERGE actions
|
||||
---- merge: a more sophisticated example ----
|
||||
DROP TABLE dist_source, dist_target, local_source, local_target;
|
||||
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
SELECT create_distributed_table('dist_source', 'tstamp_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_target', 'int_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(1001, 2000) i;
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(901, 1000) i;
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1501, 2000) i;
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i-1,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1401, 1500) i;
|
||||
INSERT INTO local_source SELECT * FROM dist_source;
|
||||
INSERT INTO local_target SELECT * FROM dist_target;
|
||||
-- execute the query on distributed tables
|
||||
MERGE INTO dist_target target_alias
|
||||
USING dist_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
||||
MERGE INTO local_target target_alias
|
||||
USING local_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
-- compare both targets
|
||||
SELECT COUNT(*) = 0 AS targets_match
|
||||
FROM (
|
||||
SELECT * FROM dist_target
|
||||
EXCEPT
|
||||
SELECT * FROM local_target
|
||||
UNION ALL
|
||||
SELECT * FROM local_target
|
||||
EXCEPT
|
||||
SELECT * FROM dist_target
|
||||
) q;
|
||||
targets_match
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- UPDATEs with a FROM clause are supported even with local tables
|
||||
UPDATE limit_orders SET limit_price = 0.00 FROM bidders
|
||||
WHERE limit_orders.id = 246 AND
|
||||
|
|
@ -1324,10 +1701,5 @@ CREATE TABLE multi_modifications.local (a int default 1, b int);
|
|||
INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table));
|
||||
ERROR: subqueries are not supported within INSERT queries
|
||||
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
|
||||
DROP TABLE insufficient_shards;
|
||||
DROP TABLE raw_table;
|
||||
DROP TABLE summary_table;
|
||||
DROP TABLE reference_raw_table;
|
||||
DROP TABLE reference_summary_table;
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA multi_modifications CASCADE;
|
||||
NOTICE: drop cascades to table multi_modifications.local
|
||||
|
|
|
|||
|
|
@ -0,0 +1,45 @@
|
|||
-- Test for CDC library path adjustment functionality
|
||||
-- This test verifies that the AdjustDynamicLibraryPathForCdcDecoders function with non-superuser privileges
|
||||
-- correctly modifies the dynamic_library_path when CDC is enabled
|
||||
-- Test 1: Non-superuser with read_all_settings can see dynamic_library_path changes
|
||||
CREATE USER cdc_test_user;
|
||||
GRANT pg_read_all_settings TO cdc_test_user;
|
||||
SET ROLE cdc_test_user;
|
||||
-- Non-superuser should be able to see the current dynamic_library_path
|
||||
SELECT current_setting('dynamic_library_path') AS user_visible_path;
|
||||
user_visible_path
|
||||
---------------------------------------------------------------------
|
||||
$libdir
|
||||
(1 row)
|
||||
|
||||
SET citus.enable_change_data_capture = true;
|
||||
SHOW citus.enable_change_data_capture;
|
||||
citus.enable_change_data_capture
|
||||
---------------------------------------------------------------------
|
||||
on
|
||||
(1 row)
|
||||
|
||||
SHOW dynamic_library_path;
|
||||
dynamic_library_path
|
||||
---------------------------------------------------------------------
|
||||
$libdir/citus_decoders:$libdir
|
||||
(1 row)
|
||||
|
||||
-- Reset to superuser and cleanup
|
||||
RESET ROLE;
|
||||
DROP USER cdc_test_user;
|
||||
-- Final cleanup
|
||||
RESET citus.enable_change_data_capture;
|
||||
RESET dynamic_library_path;
|
||||
SHOW citus.enable_change_data_capture;
|
||||
citus.enable_change_data_capture
|
||||
---------------------------------------------------------------------
|
||||
off
|
||||
(1 row)
|
||||
|
||||
SHOW dynamic_library_path;
|
||||
dynamic_library_path
|
||||
---------------------------------------------------------------------
|
||||
$libdir/citus_decoders:$libdir
|
||||
(1 row)
|
||||
|
||||
|
|
@ -289,7 +289,7 @@ ORDER BY 1;
|
|||
function run_command_on_placements(regclass,text,boolean)
|
||||
function run_command_on_shards(regclass,text,boolean)
|
||||
function run_command_on_workers(text,boolean)
|
||||
function shard_name(regclass,bigint)
|
||||
function shard_name(regclass,bigint,boolean)
|
||||
function start_metadata_sync_to_all_nodes()
|
||||
function start_metadata_sync_to_node(text,integer)
|
||||
function stop_metadata_sync_to_node(text,integer,boolean)
|
||||
|
|
@ -380,6 +380,7 @@ ORDER BY 1;
|
|||
view citus_dist_stat_activity
|
||||
view citus_lock_waits
|
||||
view citus_locks
|
||||
view citus_nodes
|
||||
view citus_schema.citus_schemas
|
||||
view citus_schema.citus_tables
|
||||
view citus_shard_indexes_on_worker
|
||||
|
|
@ -392,6 +393,6 @@ ORDER BY 1;
|
|||
view citus_stat_tenants_local
|
||||
view pg_dist_shard_placement
|
||||
view time_partitions
|
||||
(361 rows)
|
||||
(362 rows)
|
||||
|
||||
DROP TABLE extension_basic_types;
|
||||
|
|
|
|||
|
|
@ -137,18 +137,21 @@ class Message:
|
|||
|
||||
class SharedMessage(Message, metaclass=MessageMeta):
|
||||
"A message which could be sent by either the frontend or the backend"
|
||||
|
||||
_msgtypes = dict()
|
||||
_classes = dict()
|
||||
|
||||
|
||||
class FrontendMessage(Message, metaclass=MessageMeta):
|
||||
"A message which will only be sent be a backend"
|
||||
|
||||
_msgtypes = dict()
|
||||
_classes = dict()
|
||||
|
||||
|
||||
class BackendMessage(Message, metaclass=MessageMeta):
|
||||
"A message which will only be sent be a frontend"
|
||||
|
||||
_msgtypes = dict()
|
||||
_classes = dict()
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ test: pg16
|
|||
test: multi_create_fdw
|
||||
test: multi_test_catalog_views
|
||||
test: replicated_table_disable_node
|
||||
test: cdc_library_path non_super_user_cdc_library_path
|
||||
|
||||
# ----------
|
||||
# The following distributed tests depend on creating a partitioned table and
|
||||
|
|
|
|||
|
|
@ -489,7 +489,7 @@ push(@pgOptions, "citus.explain_analyze_sort_method='taskId'");
|
|||
push(@pgOptions, "citus.enable_manual_changes_to_shards=on");
|
||||
push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on");
|
||||
push(@pgOptions, "citus.stat_statements_track = 'all'");
|
||||
push(@pgOptions, "citus.enable_change_data_capture=on");
|
||||
push(@pgOptions, "citus.enable_change_data_capture=off");
|
||||
push(@pgOptions, "citus.stat_tenants_limit = 2");
|
||||
push(@pgOptions, "citus.stat_tenants_track = 'ALL'");
|
||||
push(@pgOptions, "citus.enable_stat_counters=on");
|
||||
|
|
|
|||
|
|
@ -0,0 +1,46 @@
|
|||
-- Test for CDC library path adjustment functionality
|
||||
-- This test verifies that the AdjustDynamicLibraryPathForCdcDecoders function with superuser privileges
|
||||
-- correctly modifies the dynamic_library_path when CDC is enabled
|
||||
|
||||
-- Test 1: Show initial state and reset to ensure clean state
|
||||
SHOW dynamic_library_path;
|
||||
SHOW citus.enable_change_data_capture;
|
||||
|
||||
-- Test 2: Enable CDC and verify path is set to include citus_decoders
|
||||
SET citus.enable_change_data_capture = true;
|
||||
SHOW dynamic_library_path;
|
||||
|
||||
-- Verify that the dynamic_library_path has been modified to include citus_decoders
|
||||
SELECT
|
||||
CASE
|
||||
WHEN current_setting('dynamic_library_path') LIKE '%citus_decoders%'
|
||||
THEN 'CDC path correctly set'
|
||||
ELSE 'CDC path incorrectly not set'
|
||||
END AS cdc_path_status;
|
||||
|
||||
-- Test 3: Disable CDC and verify path remains (CDC doesn't remove the path)
|
||||
SET citus.enable_change_data_capture = false;
|
||||
SHOW dynamic_library_path;
|
||||
|
||||
-- Test 4: Edge case - function should only work when path is exactly "$libdir"
|
||||
SET dynamic_library_path = '$libdir/other_path:$libdir';
|
||||
SET citus.enable_change_data_capture = true;
|
||||
SHOW dynamic_library_path;
|
||||
|
||||
-- Verify that path is unchanged with custom library path
|
||||
SELECT
|
||||
CASE
|
||||
WHEN current_setting('dynamic_library_path') LIKE '%citus_decoders%'
|
||||
THEN 'CDC path incorrectly set'
|
||||
ELSE 'CDC path correctly not set'
|
||||
END AS custom_path_test;
|
||||
|
||||
-- Reset dynamic_library_path to default
|
||||
RESET dynamic_library_path;
|
||||
RESET citus.enable_change_data_capture;
|
||||
|
||||
-- Test 5: Verify that dynamic_library_path reset_val is overridden to $libdir/citus_decoders:$libdir
|
||||
SHOW dynamic_library_path;
|
||||
SHOW citus.enable_change_data_capture;
|
||||
|
||||
|
||||
|
|
@ -13,5 +13,16 @@ INSERT INTO t1 SELECT generate_series(1, 100);
|
|||
INSERT INTO "t with space" SELECT generate_series(1, 1000);
|
||||
SELECT * FROM citus_shards;
|
||||
|
||||
SET search_path TO public;
|
||||
CREATE TABLE t3 (i int);
|
||||
SELECT citus_add_local_table_to_metadata('t3');
|
||||
|
||||
SELECT shard_name('t3', shardid) FROM pg_dist_shard WHERE logicalrelid = 't3'::regclass;
|
||||
SELECT shard_name('t3', shardid, true) FROM pg_dist_shard WHERE logicalrelid = 't3'::regclass;
|
||||
SELECT shard_name('t3', shardid, false) FROM pg_dist_shard WHERE logicalrelid = 't3'::regclass;
|
||||
|
||||
DROP TABLE t3;
|
||||
SET search_path TO citus_shards;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA citus_shards CASCADE;
|
||||
|
|
|
|||
|
|
@ -126,5 +126,128 @@ WHEN NOT MATCHED THEN
|
|||
|
||||
SELECT compare_data();
|
||||
|
||||
DROP SCHEMA merge_repartition2_schema CASCADE;
|
||||
---- https://github.com/citusdata/citus/issues/8180 ----
|
||||
|
||||
CREATE TABLE dist_1 (a int, b int, c int);
|
||||
CREATE TABLE dist_2 (a int, b int, c int);
|
||||
CREATE TABLE dist_different_order_1 (b int, a int, c int);
|
||||
|
||||
SELECT create_distributed_table('dist_1', 'a');
|
||||
SELECT create_distributed_table('dist_2', 'a');
|
||||
SELECT create_distributed_table('dist_different_order_1', 'a');
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2
|
||||
ON (dist_1.a = dist_2.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_2.b;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = src.b;
|
||||
|
||||
MERGE INTO dist_different_order_1
|
||||
USING dist_1
|
||||
ON (dist_different_order_1.a = dist_1.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_1.b;
|
||||
|
||||
CREATE TABLE dist_1_cast (a int, b int);
|
||||
CREATE TABLE dist_2_cast (a int, b numeric);
|
||||
|
||||
SELECT create_distributed_table('dist_1_cast', 'a');
|
||||
SELECT create_distributed_table('dist_2_cast', 'a');
|
||||
|
||||
MERGE INTO dist_1_cast
|
||||
USING dist_2_cast
|
||||
ON (dist_1_cast.a = dist_2_cast.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
|
||||
|
||||
MERGE INTO dist_1_cast
|
||||
USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast
|
||||
ON (dist_1_cast.a = dist_2_cast.b)
|
||||
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
|
||||
|
||||
-- a more sophisticated example
|
||||
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
|
||||
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
|
||||
SELECT create_distributed_table('dist_source', 'tstamp_col');
|
||||
SELECT create_distributed_table('dist_target', 'int_col');
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(1001, 2000) i;
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(901, 1000) i;
|
||||
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1501, 2000) i;
|
||||
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i-1,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1401, 1500) i;
|
||||
|
||||
INSERT INTO local_source SELECT * FROM dist_source;
|
||||
INSERT INTO local_target SELECT * FROM dist_target;
|
||||
|
||||
-- execute the query on distributed tables
|
||||
MERGE INTO dist_target target_alias
|
||||
USING dist_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
|
||||
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
||||
MERGE INTO local_target target_alias
|
||||
USING local_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
|
||||
-- compare both targets
|
||||
|
||||
SELECT COUNT(*) = 0 AS targets_match
|
||||
FROM (
|
||||
SELECT * FROM dist_target
|
||||
EXCEPT
|
||||
SELECT * FROM local_target
|
||||
UNION ALL
|
||||
SELECT * FROM local_target
|
||||
EXCEPT
|
||||
SELECT * FROM dist_target
|
||||
) q;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA merge_repartition2_schema CASCADE;
|
||||
|
|
|
|||
|
|
@ -32,6 +32,21 @@ SELECT result FROM run_command_on_workers('SELECT citus_is_primary_node()');
|
|||
-- get the active nodes
|
||||
SELECT master_get_active_worker_nodes();
|
||||
|
||||
-- get all nodes
|
||||
SELECT * from citus_nodes;
|
||||
|
||||
-- get get active nodes
|
||||
SELECT * from citus_nodes where active = 't';
|
||||
|
||||
-- get coordinator nodes
|
||||
SELECT * from citus_nodes where role = 'coordinator';
|
||||
|
||||
-- get worker nodes
|
||||
SELECT * from citus_nodes where role = 'worker';
|
||||
|
||||
-- get nodes with unknown role
|
||||
SELECT * from citus_nodes where role = 'foo';
|
||||
|
||||
-- try to add a node that is already in the cluster
|
||||
SELECT * FROM master_add_node('localhost', :worker_1_port);
|
||||
|
||||
|
|
@ -51,6 +66,20 @@ SELECT citus_disable_node('localhost', :worker_2_port);
|
|||
SELECT public.wait_until_metadata_sync(20000);
|
||||
SELECT master_get_active_worker_nodes();
|
||||
|
||||
-- get get active nodes
|
||||
SELECT * from citus_nodes where active = 't';
|
||||
|
||||
-- get get inactive nodes
|
||||
SELECT * from citus_nodes where active = 'f';
|
||||
|
||||
-- make sure non-superusers can access the view
|
||||
CREATE ROLE normaluser;
|
||||
SET ROLE normaluser;
|
||||
SELECT * FROM citus_nodes;
|
||||
|
||||
SET ROLE postgres;
|
||||
DROP ROLE normaluser;
|
||||
|
||||
-- add some shard placements to the cluster
|
||||
SET citus.shard_count TO 16;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
|
|
|||
|
|
@ -2583,5 +2583,127 @@ SELECT id, val FROM version_dist_union ORDER BY id;
|
|||
|
||||
-- End of Issue #7784
|
||||
|
||||
-- PR #8106 — CTE traversal works when following outer Vars
|
||||
-- This script exercises three shapes:
|
||||
-- T1) CTE referenced inside a correlated subquery (one level down)
|
||||
-- T2) CTE referenced inside a nested subquery (two levels down)
|
||||
-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
|
||||
|
||||
CREATE SCHEMA pr8106_cte_outervar;
|
||||
SET search_path = pr8106_cte_outervar, public;
|
||||
|
||||
-- Base tables for the tests
|
||||
DROP TABLE IF EXISTS raw_events_first CASCADE;
|
||||
DROP TABLE IF EXISTS agg_events CASCADE;
|
||||
|
||||
CREATE TABLE raw_events_first(
|
||||
user_id int,
|
||||
value_1 int
|
||||
);
|
||||
|
||||
CREATE TABLE agg_events(
|
||||
user_id int,
|
||||
value_1_agg int
|
||||
);
|
||||
|
||||
-- Distribute and colocate (distribution key = user_id)
|
||||
SELECT create_distributed_table('raw_events_first', 'user_id');
|
||||
SELECT create_distributed_table('agg_events', 'user_id');
|
||||
|
||||
-- Seed data (duplicates on some user_ids; some NULL value_1’s)
|
||||
INSERT INTO raw_events_first(user_id, value_1) VALUES
|
||||
(1, 10), (1, 20), (1, NULL),
|
||||
(2, NULL),
|
||||
(3, 30),
|
||||
(4, NULL),
|
||||
(5, 50), (5, NULL),
|
||||
(6, NULL);
|
||||
|
||||
----------------------------------------------------------------------
|
||||
-- T1) CTE referenced inside a correlated subquery (one level down)
|
||||
----------------------------------------------------------------------
|
||||
TRUNCATE agg_events;
|
||||
|
||||
WITH c AS MATERIALIZED (
|
||||
SELECT user_id FROM raw_events_first
|
||||
)
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT t.user_id
|
||||
FROM raw_events_first t
|
||||
WHERE EXISTS (SELECT 1 FROM c WHERE c.user_id = t.user_id);
|
||||
|
||||
-- Expect one insert per row in raw_events_first (EXISTS always true per user_id)
|
||||
SELECT 't1_count_matches' AS test,
|
||||
(SELECT count(*) FROM agg_events) =
|
||||
(SELECT count(*) FROM raw_events_first) AS ok;
|
||||
|
||||
-- Spot-check: how many rows were inserted
|
||||
SELECT 't1_rows' AS test, count(*) AS rows FROM agg_events;
|
||||
|
||||
----------------------------------------------------------------------
|
||||
-- T2) CTE referenced inside a nested subquery (two levels down)
|
||||
----------------------------------------------------------------------
|
||||
TRUNCATE agg_events;
|
||||
|
||||
WITH c AS MATERIALIZED (
|
||||
SELECT user_id FROM raw_events_first
|
||||
)
|
||||
INSERT INTO agg_events (user_id)
|
||||
SELECT t.user_id
|
||||
FROM raw_events_first t
|
||||
WHERE EXISTS (
|
||||
SELECT 1
|
||||
FROM (SELECT user_id FROM c) c2
|
||||
WHERE c2.user_id = t.user_id
|
||||
);
|
||||
|
||||
-- Same cardinality expectation as T1
|
||||
SELECT 't2_count_matches' AS test,
|
||||
(SELECT count(*) FROM agg_events) =
|
||||
(SELECT count(*) FROM raw_events_first) AS ok;
|
||||
|
||||
SELECT 't2_rows' AS test, count(*) AS rows FROM agg_events;
|
||||
|
||||
----------------------------------------------------------------------
|
||||
-- T3) Subquery targetlist uses a scalar sublink into the outer CTE
|
||||
-- (use MAX() to keep scalar subquery single-row)
|
||||
----------------------------------------------------------------------
|
||||
TRUNCATE agg_events;
|
||||
|
||||
WITH c AS (SELECT user_id, value_1 FROM raw_events_first)
|
||||
INSERT INTO agg_events (user_id, value_1_agg)
|
||||
SELECT d.user_id, d.value_1_agg
|
||||
FROM (
|
||||
SELECT t.user_id,
|
||||
(SELECT max(c.value_1) FROM c WHERE c.user_id = t.user_id) AS value_1_agg
|
||||
FROM raw_events_first t
|
||||
) AS d
|
||||
WHERE d.value_1_agg IS NOT NULL;
|
||||
|
||||
-- Expect one insert per row in raw_events_first whose user_id has at least one non-NULL value_1
|
||||
SELECT 't3_count_matches' AS test,
|
||||
(SELECT count(*) FROM agg_events) =
|
||||
(
|
||||
SELECT count(*)
|
||||
FROM raw_events_first t
|
||||
WHERE EXISTS (
|
||||
SELECT 1 FROM raw_events_first c
|
||||
WHERE c.user_id = t.user_id AND c.value_1 IS NOT NULL
|
||||
)
|
||||
) AS ok;
|
||||
|
||||
-- Also verify no NULLs were inserted into value_1_agg
|
||||
SELECT 't3_no_null_value_1_agg' AS test,
|
||||
NOT EXISTS (SELECT 1 FROM agg_events WHERE value_1_agg IS NULL) AS ok;
|
||||
|
||||
-- Deterministic sample of results
|
||||
SELECT 't3_sample' AS test, user_id, value_1_agg
|
||||
FROM agg_events
|
||||
ORDER BY user_id
|
||||
LIMIT 5;
|
||||
|
||||
-- End of PR #8106 — CTE traversal works when following outer Vars
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA pr8106_cte_outervar CASCADE;
|
||||
DROP SCHEMA multi_insert_select CASCADE;
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ SET citus.next_shard_id TO 750000;
|
|||
SET citus.next_placement_id TO 750000;
|
||||
|
||||
CREATE SCHEMA multi_modifications;
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- some failure messages that comes from the worker nodes
|
||||
-- might change due to parallel executions, so suppress those
|
||||
|
|
@ -36,7 +37,7 @@ CREATE TABLE append_partitioned ( LIKE limit_orders );
|
|||
SET citus.shard_count TO 2;
|
||||
|
||||
SELECT create_distributed_table('limit_orders', 'id', 'hash');
|
||||
SELECT create_distributed_table('multiple_hash', 'id', 'hash');
|
||||
SELECT create_distributed_table('multiple_hash', 'category', 'hash');
|
||||
SELECT create_distributed_table('range_partitioned', 'id', 'range');
|
||||
SELECT create_distributed_table('append_partitioned', 'id', 'append');
|
||||
|
||||
|
|
@ -244,12 +245,14 @@ INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell',
|
|||
|
||||
-- First: Connect to the second worker node
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
|
||||
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- Fourth: Perform an INSERT on the remaining node
|
||||
-- the whole transaction should fail
|
||||
|
|
@ -258,6 +261,7 @@ INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell',
|
|||
|
||||
-- set the shard name back
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
|
||||
|
|
@ -265,12 +269,15 @@ ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
|
|||
-- Verify the insert failed and both placements are healthy
|
||||
-- or the insert succeeded and placement marked unhealthy
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO multi_modifications;
|
||||
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO multi_modifications;
|
||||
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
SELECT count(*) FROM limit_orders WHERE id = 276;
|
||||
|
||||
|
|
@ -285,12 +292,14 @@ AND s.logicalrelid = 'limit_orders'::regclass;
|
|||
|
||||
-- First: Connect to the first worker node
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
|
||||
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- Fourth: Perform an INSERT on the remaining node
|
||||
\set VERBOSITY terse
|
||||
|
|
@ -311,12 +320,14 @@ AND s.logicalrelid = 'limit_orders'::regclass;
|
|||
|
||||
-- First: Connect to the first worker node
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
|
||||
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_modifications;
|
||||
|
||||
-- attempting to change the partition key is unsupported
|
||||
UPDATE limit_orders SET id = 0 WHERE id = 246;
|
||||
|
|
@ -327,6 +338,375 @@ UPDATE limit_orders SET id = 246 WHERE id = 246;
|
|||
UPDATE limit_orders SET id = 246 WHERE id = 246 AND symbol = 'GM';
|
||||
UPDATE limit_orders SET id = limit_orders.id WHERE id = 246;
|
||||
|
||||
CREATE TABLE dist_1 (a int, b int, c int);
|
||||
CREATE TABLE dist_2 (a int, b int, c int);
|
||||
CREATE TABLE dist_non_colocated (a int, b int, c int);
|
||||
CREATE TABLE dist_different_order_1 (b int, a int, c int);
|
||||
|
||||
SELECT create_distributed_table('dist_1', 'a');
|
||||
SELECT create_distributed_table('dist_2', 'a');
|
||||
SELECT create_distributed_table('dist_non_colocated', 'a', colocate_with=>'none');
|
||||
SELECT create_distributed_table('dist_different_order_1', 'a');
|
||||
|
||||
--
|
||||
-- https://github.com/citusdata/citus/issues/8087
|
||||
--
|
||||
|
||||
---- update: should work ----
|
||||
|
||||
-- setting shard key to itself --
|
||||
|
||||
UPDATE dist_1 SET a = dist_1.a;
|
||||
UPDATE dist_1 SET a = dist_1.a WHERE dist_1.a > dist_1.b AND dist_1.b > 10;
|
||||
UPDATE dist_1 SET a = dist_1.a FROM dist_2 WHERE dist_1.a = dist_2.a;
|
||||
|
||||
-- setting shard key to another var that's implied to be equal to shard key --
|
||||
|
||||
UPDATE dist_1 SET a = b WHERE a = b;
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a;
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.b = dist_2.c AND (dist_2.c > 5 OR dist_2.c < 0);
|
||||
|
||||
with cte as (
|
||||
select a, b from dist_1
|
||||
)
|
||||
update dist_1 set a = cte.a from cte where dist_1.a = cte.a;
|
||||
|
||||
with cte as (
|
||||
select a as x, b as y from (select a, b from dist_1 limit 100) dt where b > 100
|
||||
)
|
||||
update dist_1 set a = cte.x from cte where dist_1.a = cte.x;
|
||||
|
||||
with cte as (
|
||||
select d2.a as x, d1.b as y
|
||||
from dist_1 d1, dist_different_order_1 d2
|
||||
where d1.a=d2.a)
|
||||
update dist_1 set a = cte.x from cte where y != 0 and dist_1.a = cte.x;
|
||||
|
||||
with cte as (
|
||||
select * from (select a as x, b as y from dist_2 limit 100) q
|
||||
)
|
||||
update dist_1 set a = cte.x from cte where b = cte.y and cte.y = a and a = cte.x;
|
||||
|
||||
-- supported although the where clause will certainly eval to false
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.a = 5 AND dist_2.a = 7;
|
||||
|
||||
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
|
||||
|
||||
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
|
||||
|
||||
-- test with extra quals
|
||||
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a AND dist_1.b = dist_different_order_1.c AND (dist_different_order_1.c > 5 OR dist_different_order_1.c < 0);
|
||||
|
||||
---- update: errors in router planner ----
|
||||
|
||||
-- different column of the same relation, which is not implied to be equal to shard key --
|
||||
|
||||
UPDATE dist_1 SET a = dist_1.b;
|
||||
|
||||
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
|
||||
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2;
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a != dist_2.a;
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a >= dist_2.a;
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a OR dist_1.a > dist_2.a;
|
||||
UPDATE dist_1 SET a = dist_different_order_1.b FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
|
||||
|
||||
UPDATE dist_1 SET a = foo.a FROM dist_1 foo;
|
||||
UPDATE dist_1 SET a = foo.a FROM dist_1 foo WHERE dist_1.a != foo.a;
|
||||
|
||||
-- (*1) Would normally expect this to not throw an error because
|
||||
-- dist_1.a = dist_2.b AND dist_2.b = dist_2.a,
|
||||
-- so dist_1.a = dist_2.a, so we should be able to deduce
|
||||
-- that (dist_1.)a = dist_2.a, but seems predicate_implied_by()
|
||||
-- is not that smart.
|
||||
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.b AND dist_2.b = dist_2.a;
|
||||
|
||||
-- and same here
|
||||
with cte as (
|
||||
select * from (select a as x, b as y from dist_different_order_1 limit 100) q
|
||||
)
|
||||
update dist_1 set a = cte.x from cte where a = cte.y and cte.y = b and b = cte.x;
|
||||
|
||||
---- update: errors later (in logical or physical planner) ----
|
||||
|
||||
-- setting shard key to itself --
|
||||
|
||||
UPDATE dist_1 SET a = dist_1.a FROM dist_1 foo;
|
||||
UPDATE dist_1 SET a = dist_1.a FROM dist_2 foo;
|
||||
|
||||
-- setting shard key to another var that's implied to be equal to shard key --
|
||||
|
||||
UPDATE dist_1 SET a = dist_non_colocated.a FROM dist_non_colocated WHERE dist_1.a = dist_non_colocated.a;
|
||||
UPDATE dist_1 SET a = dist_2.b FROM dist_2 WHERE dist_1.a = dist_2.b;
|
||||
|
||||
---- update: a more sophisticated example ----
|
||||
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
|
||||
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
|
||||
SELECT create_distributed_table('dist_source', 'int_col');
|
||||
SELECT create_distributed_table('dist_target', 'int_col');
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(1001, 2000) i;
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(901, 1000) i;
|
||||
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1501, 2000) i;
|
||||
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i-1,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1401, 1500) i;
|
||||
|
||||
INSERT INTO local_source SELECT * FROM dist_source;
|
||||
INSERT INTO local_target SELECT * FROM dist_target;
|
||||
|
||||
-- execute the query on distributed tables
|
||||
UPDATE dist_target target_alias
|
||||
SET int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
FROM dist_source source_alias
|
||||
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
|
||||
|
||||
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
||||
UPDATE local_target target_alias
|
||||
SET int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
FROM local_source source_alias
|
||||
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
|
||||
|
||||
-- compare both targets
|
||||
|
||||
SELECT COUNT(*) = 0 AS targets_match
|
||||
FROM (
|
||||
SELECT * FROM dist_target
|
||||
EXCEPT
|
||||
SELECT * FROM local_target
|
||||
UNION ALL
|
||||
SELECT * FROM local_target
|
||||
EXCEPT
|
||||
SELECT * FROM dist_target
|
||||
) q;
|
||||
|
||||
---- merge: should work ----
|
||||
|
||||
-- setting shard key to itself --
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
|
||||
|
||||
-- We don't care about action quals when deciding if the update
|
||||
-- could change the shard key, but still add some action quals for
|
||||
-- testing. See the comments written on top of the line we call
|
||||
-- TargetEntryChangesValue() in MergeQualAndTargetListFunctionsSupported().
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED AND dist_1.a > dist_1.b AND dist_1.b > 10 THEN UPDATE SET a = dist_1.a;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
|
||||
-- setting shard key to another var that's implied to be equal to shard key --
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = dist_1.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
|
||||
-- test with extra quals
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b AND (dist_1.b > 1000 OR (dist_1.b < 500)))
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
|
||||
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_different_order_1 src
|
||||
ON (dist_1.a = src.a AND dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.b;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
|
||||
---- merge: errors in router planner ----
|
||||
|
||||
-- different column of the same relation, which is not implied to be equal to shard key --
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
|
||||
|
||||
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
|
||||
-- as in (*1), this is not supported
|
||||
MERGE INTO dist_1
|
||||
USING dist_1 src
|
||||
ON (dist_1.a = src.b AND src.b = src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (true)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
|
||||
MERGE INTO dist_1
|
||||
USING dist_2 src
|
||||
ON (dist_1.a <= src.a)
|
||||
WHEN MATCHED THEN UPDATE SET a = src.a;
|
||||
|
||||
---- merge: a more sophisticated example ----
|
||||
DROP TABLE dist_source, dist_target, local_source, local_target;
|
||||
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
|
||||
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||
|
||||
SELECT create_distributed_table('dist_source', 'tstamp_col');
|
||||
SELECT create_distributed_table('dist_target', 'int_col');
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(1001, 2000) i;
|
||||
|
||||
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||
FROM generate_series(901, 1000) i;
|
||||
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1501, 2000) i;
|
||||
|
||||
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||
i,
|
||||
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||
'source_' || i-1,
|
||||
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||
FROM generate_series(1401, 1500) i;
|
||||
|
||||
INSERT INTO local_source SELECT * FROM dist_source;
|
||||
INSERT INTO local_target SELECT * FROM dist_target;
|
||||
|
||||
-- execute the query on distributed tables
|
||||
MERGE INTO dist_target target_alias
|
||||
USING dist_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
|
||||
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
||||
MERGE INTO local_target target_alias
|
||||
USING local_source source_alias
|
||||
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||
WHEN MATCHED THEN UPDATE SET
|
||||
int_col = source_alias.int_col,
|
||||
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||
text_col = source_alias.json_col->>'a'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||
|
||||
-- compare both targets
|
||||
|
||||
SELECT COUNT(*) = 0 AS targets_match
|
||||
FROM (
|
||||
SELECT * FROM dist_target
|
||||
EXCEPT
|
||||
SELECT * FROM local_target
|
||||
UNION ALL
|
||||
SELECT * FROM local_target
|
||||
EXCEPT
|
||||
SELECT * FROM dist_target
|
||||
) q;
|
||||
|
||||
-- UPDATEs with a FROM clause are supported even with local tables
|
||||
UPDATE limit_orders SET limit_price = 0.00 FROM bidders
|
||||
WHERE limit_orders.id = 246 AND
|
||||
|
|
@ -897,9 +1277,5 @@ DELETE FROM summary_table WHERE id < (
|
|||
CREATE TABLE multi_modifications.local (a int default 1, b int);
|
||||
INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table));
|
||||
|
||||
DROP TABLE insufficient_shards;
|
||||
DROP TABLE raw_table;
|
||||
DROP TABLE summary_table;
|
||||
DROP TABLE reference_raw_table;
|
||||
DROP TABLE reference_summary_table;
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA multi_modifications CASCADE;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,27 @@
|
|||
-- Test for CDC library path adjustment functionality
|
||||
-- This test verifies that the AdjustDynamicLibraryPathForCdcDecoders function with non-superuser privileges
|
||||
-- correctly modifies the dynamic_library_path when CDC is enabled
|
||||
|
||||
-- Test 1: Non-superuser with read_all_settings can see dynamic_library_path changes
|
||||
CREATE USER cdc_test_user;
|
||||
GRANT pg_read_all_settings TO cdc_test_user;
|
||||
SET ROLE cdc_test_user;
|
||||
|
||||
-- Non-superuser should be able to see the current dynamic_library_path
|
||||
SELECT current_setting('dynamic_library_path') AS user_visible_path;
|
||||
|
||||
SET citus.enable_change_data_capture = true;
|
||||
|
||||
SHOW citus.enable_change_data_capture;
|
||||
SHOW dynamic_library_path;
|
||||
|
||||
-- Reset to superuser and cleanup
|
||||
RESET ROLE;
|
||||
DROP USER cdc_test_user;
|
||||
|
||||
-- Final cleanup
|
||||
RESET citus.enable_change_data_capture;
|
||||
RESET dynamic_library_path;
|
||||
|
||||
SHOW citus.enable_change_data_capture;
|
||||
SHOW dynamic_library_path;
|
||||
Loading…
Reference in New Issue