Properly detect no-op shard-key updates via UPDATE / MERGE (#8214)

DESCRIPTION: Fixes a bug that causes allowing UPDATE / MERGE queries
that may change the distribution column value.

Fixes: #8087.

Probably as of #769, we were not properly checking if UPDATE
may change the distribution column.

In #769, we had these checks:
```c
	if (targetEntry->resno != column->varattno)
	{
		/* target entry of the form SET some_other_col = <x> */
		isColumnValueChanged = false;
	}
	else if (IsA(setExpr, Var))
	{
		Var *newValue = (Var *) setExpr;
		if (newValue->varattno == column->varattno)
		{
			/* target entry of the form SET col = table.col */
			isColumnValueChanged = false;
		}
	}
```

However, what we check in "if" and in the "else if" are not so
different in the sense they both attempt to verify if SET expr
of the target entry points to the attno of given column. So, in
#5220, we even removed the first check because it was redundant.
Also see this PR comment from #5220:
https://github.com/citusdata/citus/pull/5220#discussion_r699230597.
In #769, probably we actually wanted to first check whether both
SET expr of the target entry and given variable are pointing to the
same range var entry, but this wasn't what the "if" was checking,
so removed.

As a result, in the cases that are mentioned in the linked issue,
we were incorrectly concluding that the SET expr of the target
entry won't change given column just because it's pointing to the
same attno as given variable, regardless of what range var entries
the column and the SET expr are pointing to. Then we also started
using the same function to check for such cases for update action
of MERGE, so we have the same bug there as well.

So with this PR, we properly check for such cases by comparing
varno as well in TargetEntryChangesValue(). However, then some of
the existing tests started failing where the SET expr doesn't
directly assign the column to itself but the "where" clause could
actually imply that the distribution column won't change. Even before
we were not attempting to verify if "where" cluse quals could imply a
no-op assignment for the SET expr in such cases but that was not a
problem. This is because, for the most cases, we were always qualifying
such SET expressions as a no-op update as long as the SET expr's
attno is the same as given column's. For this reason, to prevent
regressions, this PR also adds some extra logic as well to understand
if the "where" clause quals could imply that SET expr for the
distribution key is a no-op.

Ideally, we should instead use "relation restriction equivalence"
mechanism to understand if the "where" clause implies a no-op
update. This is because, for instance, right now we're not able to
deduce that the update is a no-op when the "where" clause transitively
implies a no-op update, as in the case where we're setting "column a"
to "column c" and where clause looks like:
  "column a = column b AND column b = column c".
If this means a regression for some users, we can consider doing it
that way. Until then, as a workaround, we can suggest adding additional
quals to "where" clause that would directly imply equivalence.

Also, after fixing TargetEntryChangesValue(), we started successfully
deducing that the update action is a no-op for such MERGE queries:
```sql
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
```
However, we then started seeing below error for above query even
though now the update is qualified as a no-op update:
```
ERROR:  Unexpected column index of the source list
```
This was because of #8180 and #8201 fixed that.

In summary, with this PR:

* We disallow such queries,
  ```sql
  -- attno for dist_1.a, dist_1.b: 1, 2
  -- attno for dist_different_order_1.a, dist_different_order_1.b: 2, 1
  UPDATE dist_1 SET a = dist_different_order_1.b
  FROM dist_different_order_1
  WHERE dist_1.a dist_different_order_1.a;

  -- attno for dist_1.a, dist_1.b: 1, 2
  -- but ON (..) doesn't imply a no-op update for SET expr
  MERGE INTO dist_1
  USING dist_1 src
  ON (dist_1.a = src.b)
  WHEN MATCHED THEN UPDATE SET a = src.a;
  ```

* .. and allow such queries,
  ```sql
  MERGE INTO dist_1
  USING dist_1 src
  ON (dist_1.a = src.b)
  WHEN MATCHED THEN UPDATE SET a = src.b;
  ```

(cherry picked from commit 5eb1d93be1)
release-13.2-ihalatci-8302
Onur Tirtir 2025-09-30 13:13:47 +03:00
parent 490884176a
commit 2dddf43383
7 changed files with 873 additions and 59 deletions

View File

@ -629,6 +629,22 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, Query *query,
} }
} }
/*
* joinTree->quals, retrieved by GetMergeJoinTree() - either from
* mergeJoinCondition (PG >= 17) or jointree->quals (PG < 17),
* only contains the quals that present in "ON (..)" clause. Action
* quals that can be specified for each specific action, as in
* "WHEN <match condition> AND <action quals> THEN <action>"", are
* saved into "qual" field of the corresponding action's entry in
* mergeActionList, see
* https://github.com/postgres/postgres/blob/e6da68a6e1d60a037b63a9c9ed36e5ef0a996769/src/backend/parser/parse_merge.c#L285-L293.
*
* For this reason, even if TargetEntryChangesValue() could prove that
* an action's quals ensure that the action cannot change the distribution
* key, this is not the case as we don't provide action quals to
* TargetEntryChangesValue(), but just joinTree, which only contains
* the "ON (..)" clause quals.
*/
if (targetEntryDistributionColumn && if (targetEntryDistributionColumn &&
TargetEntryChangesValue(targetEntry, distributionColumn, joinTree)) TargetEntryChangesValue(targetEntry, distributionColumn, joinTree))
{ {

View File

@ -3173,16 +3173,25 @@ BuildBaseConstraint(Var *column)
/* /*
* MakeOpExpression builds an operator expression node. This operator expression * MakeOpExpressionExtended builds an operator expression node that's of
* implements the operator clause as defined by the variable and the strategy * the form "Var <op> Expr", where, Expr must either be a Const or a Var
* number. * (*1).
*
* This operator expression implements the operator clause as defined by
* the variable and the strategy number.
*/ */
OpExpr * OpExpr *
MakeOpExpression(Var *variable, int16 strategyNumber) MakeOpExpressionExtended(Var *leftVar, Expr *rightArg, int16 strategyNumber)
{ {
Oid typeId = variable->vartype; /*
Oid typeModId = variable->vartypmod; * Other types of expressions are probably also fine to be used, but
Oid collationId = variable->varcollid; * none of the callers need support for them for now, so we haven't
* tested them (*1).
*/
Assert(IsA(rightArg, Const) || IsA(rightArg, Var));
Oid typeId = leftVar->vartype;
Oid collationId = leftVar->varcollid;
Oid accessMethodId = BTREE_AM_OID; Oid accessMethodId = BTREE_AM_OID;
@ -3200,18 +3209,16 @@ MakeOpExpression(Var *variable, int16 strategyNumber)
*/ */
if (operatorClassInputType != typeId && typeType != TYPTYPE_PSEUDO) if (operatorClassInputType != typeId && typeType != TYPTYPE_PSEUDO)
{ {
variable = (Var *) makeRelabelType((Expr *) variable, operatorClassInputType, leftVar = (Var *) makeRelabelType((Expr *) leftVar, operatorClassInputType,
-1, collationId, COERCE_IMPLICIT_CAST); -1, collationId, COERCE_IMPLICIT_CAST);
} }
Const *constantValue = makeNullConst(operatorClassInputType, typeModId, collationId);
/* Now make the expression with the given variable and a null constant */ /* Now make the expression with the given variable and a null constant */
OpExpr *expression = (OpExpr *) make_opclause(operatorId, OpExpr *expression = (OpExpr *) make_opclause(operatorId,
InvalidOid, /* no result type yet */ InvalidOid, /* no result type yet */
false, /* no return set */ false, /* no return set */
(Expr *) variable, (Expr *) leftVar,
(Expr *) constantValue, rightArg,
InvalidOid, collationId); InvalidOid, collationId);
/* Set implementing function id and result type */ /* Set implementing function id and result type */
@ -3222,6 +3229,31 @@ MakeOpExpression(Var *variable, int16 strategyNumber)
} }
/*
* MakeOpExpression is a wrapper around MakeOpExpressionExtended
* that creates a null constant of the appropriate type for right
* hand side operator class input type. As a result, it builds an
* operator expression node that's of the form "Var <op> NULL".
*/
OpExpr *
MakeOpExpression(Var *leftVar, int16 strategyNumber)
{
Oid typeId = leftVar->vartype;
Oid typeModId = leftVar->vartypmod;
Oid collationId = leftVar->varcollid;
Oid accessMethodId = BTREE_AM_OID;
OperatorCacheEntry *operatorCacheEntry = LookupOperatorByType(typeId, accessMethodId,
strategyNumber);
Oid operatorClassInputType = operatorCacheEntry->operatorClassInputType;
Const *constantValue = makeNullConst(operatorClassInputType, typeModId, collationId);
return MakeOpExpressionExtended(leftVar, (Expr *) constantValue, strategyNumber);
}
/* /*
* LookupOperatorByType is a wrapper around GetOperatorByType(), * LookupOperatorByType is a wrapper around GetOperatorByType(),
* operatorClassInputType() and get_typtype() functions that uses a cache to avoid * operatorClassInputType() and get_typtype() functions that uses a cache to avoid

View File

@ -1609,10 +1609,19 @@ MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context)
/* /*
* TargetEntryChangesValue determines whether the given target entry may * TargetEntryChangesValue determines whether the given target entry may
* change the value in a given column, given a join tree. The result is * change the value given a column and a join tree.
* true unless the expression refers directly to the column, or the *
* expression is a value that is implied by the qualifiers of the join * The function assumes that the "targetEntry" references given "column"
* tree, or the target entry sets a different column. * Var via its "resname" and is used as part of a modify query. This means
* that, for example, for an update query, the input "targetEntry" constructs
* the following assignment operation as part of the SET clause:
* "col_a = expr_a ", where, "col_a" refers to input "column" Var (via
* "resname") as per the assumption written above. And we want to understand
* if "expr_a" (which is pointed to by targetEntry->expr) refers directly to
* the "column" Var, or "expr_a" is a value that is implied to be equal
* to "column" Var by the qualifiers of the join tree. If so, we know that
* the value of "col_a" effectively cannot be changed by this assignment
* operation.
*/ */
bool bool
TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree) TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree)
@ -1623,11 +1632,36 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
if (IsA(setExpr, Var)) if (IsA(setExpr, Var))
{ {
Var *newValue = (Var *) setExpr; Var *newValue = (Var *) setExpr;
if (newValue->varattno == column->varattno) if (column->varno == newValue->varno &&
column->varattno == newValue->varattno)
{ {
/* target entry of the form SET col = table.col */ /*
* Target entry is of the form "SET col_a = foo.col_b",
* where foo also points to the same range table entry
* and col_a and col_b are the same. So, effectively
* they're literally referring to the same column.
*/
isColumnValueChanged = false; isColumnValueChanged = false;
} }
else
{
List *restrictClauseList = WhereClauseList(joinTree);
OpExpr *equalityExpr = MakeOpExpressionExtended(column, (Expr *) newValue,
BTEqualStrategyNumber);
bool predicateIsImplied = predicate_implied_by(list_make1(equalityExpr),
restrictClauseList, false);
if (predicateIsImplied)
{
/*
* Target entry is of the form
* "SET col_a = foo.col_b WHERE col_a = foo.col_b (AND (...))",
* where foo points to a different relation or it points
* to the same relation but col_a is not the same column as col_b.
*/
isColumnValueChanged = false;
}
}
} }
else if (IsA(setExpr, Const)) else if (IsA(setExpr, Const))
{ {
@ -1648,7 +1682,10 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
restrictClauseList, false); restrictClauseList, false);
if (predicateIsImplied) if (predicateIsImplied)
{ {
/* target entry of the form SET col = <x> WHERE col = <x> AND ... */ /*
* Target entry is of the form
* "SET col_a = const_a WHERE col_a = const_a (AND (...))".
*/
isColumnValueChanged = false; isColumnValueChanged = false;
} }
} }

View File

@ -586,7 +586,8 @@ extern DistributedPlan * CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree,
plannerRestrictionContext); plannerRestrictionContext);
extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
char *queryString); char *queryString);
extern OpExpr * MakeOpExpressionExtended(Var *leftVar, Expr *rightArg,
int16 strategyNumber);
extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber); extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
extern Node * WrapUngroupedVarsInAnyValueAggregate(Node *expression, extern Node * WrapUngroupedVarsInAnyValueAggregate(Node *expression,
List *groupClauseList, List *groupClauseList,

View File

@ -394,9 +394,9 @@ DEBUG: Wrapping relation "mat_view_on_part_dist" "foo" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM mixed_relkind_tests.mat_view_on_part_dist foo WHERE true DEBUG: generating subplan XXX_1 for subquery SELECT a FROM mixed_relkind_tests.mat_view_on_part_dist foo WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE mixed_relkind_tests.partitioned_distributed_table SET a = foo.a FROM (SELECT foo_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) foo_1) foo WHERE (foo.a OPERATOR(pg_catalog.=) partitioned_distributed_table.a) DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE mixed_relkind_tests.partitioned_distributed_table SET a = foo.a FROM (SELECT foo_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) foo_1) foo WHERE (foo.a OPERATOR(pg_catalog.=) partitioned_distributed_table.a)
UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a; UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns ERROR: modifying the partition value of rows is not allowed
UPDATE partitioned_distributed_table SET a = foo.a FROM distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a; UPDATE partitioned_distributed_table SET a = foo.a FROM distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns ERROR: modifying the partition value of rows is not allowed
-- should work -- should work
UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a = partitioned_distributed_table.a; UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a = partitioned_distributed_table.a;
UPDATE partitioned_distributed_table SET a = foo.a FROM view_on_part_dist AS foo WHERE foo.a = partitioned_distributed_table.a; UPDATE partitioned_distributed_table SET a = foo.a FROM view_on_part_dist AS foo WHERE foo.a = partitioned_distributed_table.a;

View File

@ -2,6 +2,7 @@ SET citus.shard_count TO 32;
SET citus.next_shard_id TO 750000; SET citus.next_shard_id TO 750000;
SET citus.next_placement_id TO 750000; SET citus.next_placement_id TO 750000;
CREATE SCHEMA multi_modifications; CREATE SCHEMA multi_modifications;
SET search_path TO multi_modifications;
-- some failure messages that comes from the worker nodes -- some failure messages that comes from the worker nodes
-- might change due to parallel executions, so suppress those -- might change due to parallel executions, so suppress those
-- using \set VERBOSITY terse -- using \set VERBOSITY terse
@ -31,8 +32,12 @@ SELECT create_distributed_table('limit_orders', 'id', 'hash');
(1 row) (1 row)
SELECT create_distributed_table('multiple_hash', 'id', 'hash'); SELECT create_distributed_table('multiple_hash', 'category', 'hash');
ERROR: column "id" of relation "multiple_hash" does not exist create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('range_partitioned', 'id', 'range'); SELECT create_distributed_table('range_partitioned', 'id', 'range');
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -344,22 +349,26 @@ ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001
-- Test that shards which miss a modification are marked unhealthy -- Test that shards which miss a modification are marked unhealthy
-- First: Connect to the second worker node -- First: Connect to the second worker node
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node -- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders; ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
-- Third: Connect back to master node -- Third: Connect back to master node
\c - - - :master_port \c - - - :master_port
SET search_path TO multi_modifications;
-- Fourth: Perform an INSERT on the remaining node -- Fourth: Perform an INSERT on the remaining node
-- the whole transaction should fail -- the whole transaction should fail
\set VERBOSITY terse \set VERBOSITY terse
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
ERROR: relation "public.limit_orders_750000" does not exist ERROR: relation "multi_modifications.limit_orders_750000" does not exist
-- set the shard name back -- set the shard name back
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node -- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE renamed_orders RENAME TO limit_orders_750000; ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
-- Verify the insert failed and both placements are healthy -- Verify the insert failed and both placements are healthy
-- or the insert succeeded and placement marked unhealthy -- or the insert succeeded and placement marked unhealthy
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO multi_modifications;
SELECT count(*) FROM limit_orders_750000 WHERE id = 276; SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -367,6 +376,7 @@ SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
(1 row) (1 row)
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO multi_modifications;
SELECT count(*) FROM limit_orders_750000 WHERE id = 276; SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -374,6 +384,7 @@ SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
(1 row) (1 row)
\c - - - :master_port \c - - - :master_port
SET search_path TO multi_modifications;
SELECT count(*) FROM limit_orders WHERE id = 276; SELECT count(*) FROM limit_orders WHERE id = 276;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -394,14 +405,16 @@ AND s.logicalrelid = 'limit_orders'::regclass;
-- Test that if all shards miss a modification, no state change occurs -- Test that if all shards miss a modification, no state change occurs
-- First: Connect to the first worker node -- First: Connect to the first worker node
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node -- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders; ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
-- Third: Connect back to master node -- Third: Connect back to master node
\c - - - :master_port \c - - - :master_port
SET search_path TO multi_modifications;
-- Fourth: Perform an INSERT on the remaining node -- Fourth: Perform an INSERT on the remaining node
\set VERBOSITY terse \set VERBOSITY terse
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
ERROR: relation "public.limit_orders_750000" does not exist ERROR: relation "multi_modifications.limit_orders_750000" does not exist
\set VERBOSITY DEFAULT \set VERBOSITY DEFAULT
-- Last: Verify worker is still healthy -- Last: Verify worker is still healthy
SELECT count(*) SELECT count(*)
@ -420,10 +433,12 @@ AND s.logicalrelid = 'limit_orders'::regclass;
-- Undo our change... -- Undo our change...
-- First: Connect to the first worker node -- First: Connect to the first worker node
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node -- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE renamed_orders RENAME TO limit_orders_750000; ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
-- Third: Connect back to master node -- Third: Connect back to master node
\c - - - :master_port \c - - - :master_port
SET search_path TO multi_modifications;
-- attempting to change the partition key is unsupported -- attempting to change the partition key is unsupported
UPDATE limit_orders SET id = 0 WHERE id = 246; UPDATE limit_orders SET id = 0 WHERE id = 246;
ERROR: modifying the partition value of rows is not allowed ERROR: modifying the partition value of rows is not allowed
@ -433,6 +448,368 @@ ERROR: modifying the partition value of rows is not allowed
UPDATE limit_orders SET id = 246 WHERE id = 246; UPDATE limit_orders SET id = 246 WHERE id = 246;
UPDATE limit_orders SET id = 246 WHERE id = 246 AND symbol = 'GM'; UPDATE limit_orders SET id = 246 WHERE id = 246 AND symbol = 'GM';
UPDATE limit_orders SET id = limit_orders.id WHERE id = 246; UPDATE limit_orders SET id = limit_orders.id WHERE id = 246;
CREATE TABLE dist_1 (a int, b int, c int);
CREATE TABLE dist_2 (a int, b int, c int);
CREATE TABLE dist_non_colocated (a int, b int, c int);
CREATE TABLE dist_different_order_1 (b int, a int, c int);
SELECT create_distributed_table('dist_1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_2', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_non_colocated', 'a', colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_different_order_1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
--
-- https://github.com/citusdata/citus/issues/8087
--
---- update: should work ----
-- setting shard key to itself --
UPDATE dist_1 SET a = dist_1.a;
UPDATE dist_1 SET a = dist_1.a WHERE dist_1.a > dist_1.b AND dist_1.b > 10;
UPDATE dist_1 SET a = dist_1.a FROM dist_2 WHERE dist_1.a = dist_2.a;
-- setting shard key to another var that's implied to be equal to shard key --
UPDATE dist_1 SET a = b WHERE a = b;
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a;
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.b = dist_2.c AND (dist_2.c > 5 OR dist_2.c < 0);
with cte as (
select a, b from dist_1
)
update dist_1 set a = cte.a from cte where dist_1.a = cte.a;
with cte as (
select a as x, b as y from (select a, b from dist_1 limit 100) dt where b > 100
)
update dist_1 set a = cte.x from cte where dist_1.a = cte.x;
with cte as (
select d2.a as x, d1.b as y
from dist_1 d1, dist_different_order_1 d2
where d1.a=d2.a)
update dist_1 set a = cte.x from cte where y != 0 and dist_1.a = cte.x;
with cte as (
select * from (select a as x, b as y from dist_2 limit 100) q
)
update dist_1 set a = cte.x from cte where b = cte.y and cte.y = a and a = cte.x;
-- supported although the where clause will certainly eval to false
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.a = 5 AND dist_2.a = 7;
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
-- test with extra quals
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a AND dist_1.b = dist_different_order_1.c AND (dist_different_order_1.c > 5 OR dist_different_order_1.c < 0);
---- update: errors in router planner ----
-- different column of the same relation, which is not implied to be equal to shard key --
UPDATE dist_1 SET a = dist_1.b;
ERROR: modifying the partition value of rows is not allowed
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
UPDATE dist_1 SET a = dist_2.a FROM dist_2;
ERROR: modifying the partition value of rows is not allowed
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a != dist_2.a;
ERROR: modifying the partition value of rows is not allowed
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a >= dist_2.a;
ERROR: modifying the partition value of rows is not allowed
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a OR dist_1.a > dist_2.a;
ERROR: modifying the partition value of rows is not allowed
UPDATE dist_1 SET a = dist_different_order_1.b FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
ERROR: modifying the partition value of rows is not allowed
UPDATE dist_1 SET a = foo.a FROM dist_1 foo;
ERROR: modifying the partition value of rows is not allowed
UPDATE dist_1 SET a = foo.a FROM dist_1 foo WHERE dist_1.a != foo.a;
ERROR: modifying the partition value of rows is not allowed
-- (*1) Would normally expect this to not throw an error because
-- dist_1.a = dist_2.b AND dist_2.b = dist_2.a,
-- so dist_1.a = dist_2.a, so we should be able to deduce
-- that (dist_1.)a = dist_2.a, but seems predicate_implied_by()
-- is not that smart.
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.b AND dist_2.b = dist_2.a;
ERROR: modifying the partition value of rows is not allowed
-- and same here
with cte as (
select * from (select a as x, b as y from dist_different_order_1 limit 100) q
)
update dist_1 set a = cte.x from cte where a = cte.y and cte.y = b and b = cte.x;
ERROR: modifying the partition value of rows is not allowed
---- update: errors later (in logical or physical planner) ----
-- setting shard key to itself --
UPDATE dist_1 SET a = dist_1.a FROM dist_1 foo;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
UPDATE dist_1 SET a = dist_1.a FROM dist_2 foo;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
-- setting shard key to another var that's implied to be equal to shard key --
UPDATE dist_1 SET a = dist_non_colocated.a FROM dist_non_colocated WHERE dist_1.a = dist_non_colocated.a;
ERROR: cannot push down this subquery
DETAIL: dist_1 and dist_non_colocated are not colocated
UPDATE dist_1 SET a = dist_2.b FROM dist_2 WHERE dist_1.a = dist_2.b;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
---- update: a more sophisticated example ----
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
SELECT create_distributed_table('dist_source', 'int_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_target', 'int_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(1001, 2000) i;
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(901, 1000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1501, 2000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i-1,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1401, 1500) i;
INSERT INTO local_source SELECT * FROM dist_source;
INSERT INTO local_target SELECT * FROM dist_target;
-- execute the query on distributed tables
UPDATE dist_target target_alias
SET int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
FROM dist_source source_alias
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
-- execute the same query on local tables, everything is the same except table names behind the aliases
UPDATE local_target target_alias
SET int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
FROM local_source source_alias
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
-- compare both targets
SELECT COUNT(*) = 0 AS targets_match
FROM (
SELECT * FROM dist_target
EXCEPT
SELECT * FROM local_target
UNION ALL
SELECT * FROM local_target
EXCEPT
SELECT * FROM dist_target
) q;
targets_match
---------------------------------------------------------------------
t
(1 row)
---- merge: should work ----
-- setting shard key to itself --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
-- We don't care about action quals when deciding if the update
-- could change the shard key, but still add some action quals for
-- testing. See the comments written on top of the line we call
-- TargetEntryChangesValue() in MergeQualAndTargetListFunctionsSupported().
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED AND dist_1.a > dist_1.b AND dist_1.b > 10 THEN UPDATE SET a = dist_1.a;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
-- setting shard key to another var that's implied to be equal to shard key --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a AND dist_1.a = dist_1.b)
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a AND dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.a AND dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
-- test with extra quals
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a AND dist_1.a = src.b AND (dist_1.b > 1000 OR (dist_1.b < 500)))
WHEN MATCHED THEN UPDATE SET a = src.b;
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
MERGE INTO dist_1
USING dist_different_order_1 src
ON (dist_1.a = src.a AND dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
---- merge: errors in router planner ----
-- different column of the same relation, which is not implied to be equal to shard key --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
ERROR: updating the distribution column is not allowed in MERGE actions
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.a;
ERROR: updating the distribution column is not allowed in MERGE actions
-- as in (*1), this is not supported
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b AND src.b = src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
ERROR: updating the distribution column is not allowed in MERGE actions
MERGE INTO dist_1
USING dist_2 src
ON (true)
WHEN MATCHED THEN UPDATE SET a = src.a;
ERROR: updating the distribution column is not allowed in MERGE actions
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a <= src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
ERROR: updating the distribution column is not allowed in MERGE actions
---- merge: a more sophisticated example ----
DROP TABLE dist_source, dist_target, local_source, local_target;
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
SELECT create_distributed_table('dist_source', 'tstamp_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_target', 'int_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(1001, 2000) i;
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(901, 1000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1501, 2000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i-1,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1401, 1500) i;
INSERT INTO local_source SELECT * FROM dist_source;
INSERT INTO local_target SELECT * FROM dist_target;
-- execute the query on distributed tables
MERGE INTO dist_target target_alias
USING dist_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- execute the same query on local tables, everything is the same except table names behind the aliases
MERGE INTO local_target target_alias
USING local_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- compare both targets
SELECT COUNT(*) = 0 AS targets_match
FROM (
SELECT * FROM dist_target
EXCEPT
SELECT * FROM local_target
UNION ALL
SELECT * FROM local_target
EXCEPT
SELECT * FROM dist_target
) q;
targets_match
---------------------------------------------------------------------
t
(1 row)
-- UPDATEs with a FROM clause are supported even with local tables -- UPDATEs with a FROM clause are supported even with local tables
UPDATE limit_orders SET limit_price = 0.00 FROM bidders UPDATE limit_orders SET limit_price = 0.00 FROM bidders
WHERE limit_orders.id = 246 AND WHERE limit_orders.id = 246 AND
@ -1353,19 +1730,5 @@ CREATE TABLE multi_modifications.local (a int default 1, b int);
INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table)); INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table));
ERROR: subqueries are not supported within INSERT queries ERROR: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
DROP TABLE insufficient_shards; SET client_min_messages TO WARNING;
DROP TABLE raw_table;
DROP TABLE summary_table;
DROP TABLE reference_raw_table;
DROP TABLE reference_summary_table;
DROP TABLE limit_orders;
DROP TABLE multiple_hash;
DROP TABLE range_partitioned;
DROP TABLE append_partitioned;
DROP TABLE bidders;
DROP FUNCTION stable_append;
DROP FUNCTION immutable_append;
DROP FUNCTION temp_strict_func;
DROP TYPE order_side;
DROP SCHEMA multi_modifications CASCADE; DROP SCHEMA multi_modifications CASCADE;
NOTICE: drop cascades to table multi_modifications.local

View File

@ -3,6 +3,7 @@ SET citus.next_shard_id TO 750000;
SET citus.next_placement_id TO 750000; SET citus.next_placement_id TO 750000;
CREATE SCHEMA multi_modifications; CREATE SCHEMA multi_modifications;
SET search_path TO multi_modifications;
-- some failure messages that comes from the worker nodes -- some failure messages that comes from the worker nodes
-- might change due to parallel executions, so suppress those -- might change due to parallel executions, so suppress those
@ -36,7 +37,7 @@ CREATE TABLE append_partitioned ( LIKE limit_orders );
SET citus.shard_count TO 2; SET citus.shard_count TO 2;
SELECT create_distributed_table('limit_orders', 'id', 'hash'); SELECT create_distributed_table('limit_orders', 'id', 'hash');
SELECT create_distributed_table('multiple_hash', 'id', 'hash'); SELECT create_distributed_table('multiple_hash', 'category', 'hash');
SELECT create_distributed_table('range_partitioned', 'id', 'range'); SELECT create_distributed_table('range_partitioned', 'id', 'range');
SELECT create_distributed_table('append_partitioned', 'id', 'append'); SELECT create_distributed_table('append_partitioned', 'id', 'append');
@ -245,12 +246,14 @@ INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell',
-- First: Connect to the second worker node -- First: Connect to the second worker node
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node -- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders; ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
-- Third: Connect back to master node -- Third: Connect back to master node
\c - - - :master_port \c - - - :master_port
SET search_path TO multi_modifications;
-- Fourth: Perform an INSERT on the remaining node -- Fourth: Perform an INSERT on the remaining node
-- the whole transaction should fail -- the whole transaction should fail
@ -259,6 +262,7 @@ INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell',
-- set the shard name back -- set the shard name back
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node -- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE renamed_orders RENAME TO limit_orders_750000; ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
@ -266,12 +270,15 @@ ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
-- Verify the insert failed and both placements are healthy -- Verify the insert failed and both placements are healthy
-- or the insert succeeded and placement marked unhealthy -- or the insert succeeded and placement marked unhealthy
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO multi_modifications;
SELECT count(*) FROM limit_orders_750000 WHERE id = 276; SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO multi_modifications;
SELECT count(*) FROM limit_orders_750000 WHERE id = 276; SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
\c - - - :master_port \c - - - :master_port
SET search_path TO multi_modifications;
SELECT count(*) FROM limit_orders WHERE id = 276; SELECT count(*) FROM limit_orders WHERE id = 276;
@ -286,12 +293,14 @@ AND s.logicalrelid = 'limit_orders'::regclass;
-- First: Connect to the first worker node -- First: Connect to the first worker node
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node -- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders; ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
-- Third: Connect back to master node -- Third: Connect back to master node
\c - - - :master_port \c - - - :master_port
SET search_path TO multi_modifications;
-- Fourth: Perform an INSERT on the remaining node -- Fourth: Perform an INSERT on the remaining node
\set VERBOSITY terse \set VERBOSITY terse
@ -312,12 +321,14 @@ AND s.logicalrelid = 'limit_orders'::regclass;
-- First: Connect to the first worker node -- First: Connect to the first worker node
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO multi_modifications;
-- Second: Move aside limit_orders shard on the second worker node -- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE renamed_orders RENAME TO limit_orders_750000; ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
-- Third: Connect back to master node -- Third: Connect back to master node
\c - - - :master_port \c - - - :master_port
SET search_path TO multi_modifications;
-- attempting to change the partition key is unsupported -- attempting to change the partition key is unsupported
UPDATE limit_orders SET id = 0 WHERE id = 246; UPDATE limit_orders SET id = 0 WHERE id = 246;
@ -328,6 +339,375 @@ UPDATE limit_orders SET id = 246 WHERE id = 246;
UPDATE limit_orders SET id = 246 WHERE id = 246 AND symbol = 'GM'; UPDATE limit_orders SET id = 246 WHERE id = 246 AND symbol = 'GM';
UPDATE limit_orders SET id = limit_orders.id WHERE id = 246; UPDATE limit_orders SET id = limit_orders.id WHERE id = 246;
CREATE TABLE dist_1 (a int, b int, c int);
CREATE TABLE dist_2 (a int, b int, c int);
CREATE TABLE dist_non_colocated (a int, b int, c int);
CREATE TABLE dist_different_order_1 (b int, a int, c int);
SELECT create_distributed_table('dist_1', 'a');
SELECT create_distributed_table('dist_2', 'a');
SELECT create_distributed_table('dist_non_colocated', 'a', colocate_with=>'none');
SELECT create_distributed_table('dist_different_order_1', 'a');
--
-- https://github.com/citusdata/citus/issues/8087
--
---- update: should work ----
-- setting shard key to itself --
UPDATE dist_1 SET a = dist_1.a;
UPDATE dist_1 SET a = dist_1.a WHERE dist_1.a > dist_1.b AND dist_1.b > 10;
UPDATE dist_1 SET a = dist_1.a FROM dist_2 WHERE dist_1.a = dist_2.a;
-- setting shard key to another var that's implied to be equal to shard key --
UPDATE dist_1 SET a = b WHERE a = b;
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a;
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.b = dist_2.c AND (dist_2.c > 5 OR dist_2.c < 0);
with cte as (
select a, b from dist_1
)
update dist_1 set a = cte.a from cte where dist_1.a = cte.a;
with cte as (
select a as x, b as y from (select a, b from dist_1 limit 100) dt where b > 100
)
update dist_1 set a = cte.x from cte where dist_1.a = cte.x;
with cte as (
select d2.a as x, d1.b as y
from dist_1 d1, dist_different_order_1 d2
where d1.a=d2.a)
update dist_1 set a = cte.x from cte where y != 0 and dist_1.a = cte.x;
with cte as (
select * from (select a as x, b as y from dist_2 limit 100) q
)
update dist_1 set a = cte.x from cte where b = cte.y and cte.y = a and a = cte.x;
-- supported although the where clause will certainly eval to false
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.a = 5 AND dist_2.a = 7;
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
-- test with extra quals
UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a AND dist_1.b = dist_different_order_1.c AND (dist_different_order_1.c > 5 OR dist_different_order_1.c < 0);
---- update: errors in router planner ----
-- different column of the same relation, which is not implied to be equal to shard key --
UPDATE dist_1 SET a = dist_1.b;
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
UPDATE dist_1 SET a = dist_2.a FROM dist_2;
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a != dist_2.a;
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a >= dist_2.a;
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a OR dist_1.a > dist_2.a;
UPDATE dist_1 SET a = dist_different_order_1.b FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a;
UPDATE dist_1 SET a = foo.a FROM dist_1 foo;
UPDATE dist_1 SET a = foo.a FROM dist_1 foo WHERE dist_1.a != foo.a;
-- (*1) Would normally expect this to not throw an error because
-- dist_1.a = dist_2.b AND dist_2.b = dist_2.a,
-- so dist_1.a = dist_2.a, so we should be able to deduce
-- that (dist_1.)a = dist_2.a, but seems predicate_implied_by()
-- is not that smart.
UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.b AND dist_2.b = dist_2.a;
-- and same here
with cte as (
select * from (select a as x, b as y from dist_different_order_1 limit 100) q
)
update dist_1 set a = cte.x from cte where a = cte.y and cte.y = b and b = cte.x;
---- update: errors later (in logical or physical planner) ----
-- setting shard key to itself --
UPDATE dist_1 SET a = dist_1.a FROM dist_1 foo;
UPDATE dist_1 SET a = dist_1.a FROM dist_2 foo;
-- setting shard key to another var that's implied to be equal to shard key --
UPDATE dist_1 SET a = dist_non_colocated.a FROM dist_non_colocated WHERE dist_1.a = dist_non_colocated.a;
UPDATE dist_1 SET a = dist_2.b FROM dist_2 WHERE dist_1.a = dist_2.b;
---- update: a more sophisticated example ----
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
SELECT create_distributed_table('dist_source', 'int_col');
SELECT create_distributed_table('dist_target', 'int_col');
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(1001, 2000) i;
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(901, 1000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1501, 2000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i-1,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1401, 1500) i;
INSERT INTO local_source SELECT * FROM dist_source;
INSERT INTO local_target SELECT * FROM dist_target;
-- execute the query on distributed tables
UPDATE dist_target target_alias
SET int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
FROM dist_source source_alias
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
-- execute the same query on local tables, everything is the same except table names behind the aliases
UPDATE local_target target_alias
SET int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
FROM local_source source_alias
WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col;
-- compare both targets
SELECT COUNT(*) = 0 AS targets_match
FROM (
SELECT * FROM dist_target
EXCEPT
SELECT * FROM local_target
UNION ALL
SELECT * FROM local_target
EXCEPT
SELECT * FROM dist_target
) q;
---- merge: should work ----
-- setting shard key to itself --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
-- We don't care about action quals when deciding if the update
-- could change the shard key, but still add some action quals for
-- testing. See the comments written on top of the line we call
-- TargetEntryChangesValue() in MergeQualAndTargetListFunctionsSupported().
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED AND dist_1.a > dist_1.b AND dist_1.b > 10 THEN UPDATE SET a = dist_1.a;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = dist_1.a;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
-- setting shard key to another var that's implied to be equal to shard key --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a AND dist_1.a = dist_1.b)
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a AND dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a = src.a AND dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
-- test with extra quals
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a AND dist_1.a = src.b AND (dist_1.b > 1000 OR (dist_1.b < 500)))
WHEN MATCHED THEN UPDATE SET a = src.b;
-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 --
MERGE INTO dist_1
USING dist_different_order_1 src
ON (dist_1.a = src.a AND dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
---- merge: errors in router planner ----
-- different column of the same relation, which is not implied to be equal to shard key --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.a)
WHEN MATCHED THEN UPDATE SET a = dist_1.b;
-- another range table entry's column with the same attno, which is not implied to be equal to shard key --
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET a = src.a;
-- as in (*1), this is not supported
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b AND src.b = src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
MERGE INTO dist_1
USING dist_2 src
ON (true)
WHEN MATCHED THEN UPDATE SET a = src.a;
MERGE INTO dist_1
USING dist_2 src
ON (dist_1.a <= src.a)
WHEN MATCHED THEN UPDATE SET a = src.a;
---- merge: a more sophisticated example ----
DROP TABLE dist_source, dist_target, local_source, local_target;
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
SELECT create_distributed_table('dist_source', 'tstamp_col');
SELECT create_distributed_table('dist_target', 'int_col');
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(1001, 2000) i;
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(901, 1000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1501, 2000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i-1,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1401, 1500) i;
INSERT INTO local_source SELECT * FROM dist_source;
INSERT INTO local_target SELECT * FROM dist_target;
-- execute the query on distributed tables
MERGE INTO dist_target target_alias
USING dist_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- execute the same query on local tables, everything is the same except table names behind the aliases
MERGE INTO local_target target_alias
USING local_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
int_col = source_alias.int_col,
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- compare both targets
SELECT COUNT(*) = 0 AS targets_match
FROM (
SELECT * FROM dist_target
EXCEPT
SELECT * FROM local_target
UNION ALL
SELECT * FROM local_target
EXCEPT
SELECT * FROM dist_target
) q;
-- UPDATEs with a FROM clause are supported even with local tables -- UPDATEs with a FROM clause are supported even with local tables
UPDATE limit_orders SET limit_price = 0.00 FROM bidders UPDATE limit_orders SET limit_price = 0.00 FROM bidders
WHERE limit_orders.id = 246 AND WHERE limit_orders.id = 246 AND
@ -914,20 +1294,5 @@ DELETE FROM summary_table WHERE id < (
CREATE TABLE multi_modifications.local (a int default 1, b int); CREATE TABLE multi_modifications.local (a int default 1, b int);
INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table)); INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table));
DROP TABLE insufficient_shards; SET client_min_messages TO WARNING;
DROP TABLE raw_table;
DROP TABLE summary_table;
DROP TABLE reference_raw_table;
DROP TABLE reference_summary_table;
DROP TABLE limit_orders;
DROP TABLE multiple_hash;
DROP TABLE range_partitioned;
DROP TABLE append_partitioned;
DROP TABLE bidders;
DROP FUNCTION stable_append;
DROP FUNCTION immutable_append;
DROP FUNCTION temp_strict_func;
DROP TYPE order_side;
DROP SCHEMA multi_modifications CASCADE; DROP SCHEMA multi_modifications CASCADE;