diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index e3264311c..f699553b6 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -1267,7 +1267,21 @@ StripesForRelfilelocator(RelFileLocator relfilelocator) { uint64 storageId = LookupStorageId(relfilelocator); - return ReadDataFileStripeList(storageId, GetTransactionSnapshot()); + /* + * PG18 requires snapshot to be active or registered before it's used + * Without this, we hit + * Assert(snapshot->regd_count > 0 || snapshot->active_count > 0); + * when reading columnar stripes. + * Relevant PG18 commit: + * 8076c00592e40e8dbd1fce7a98b20d4bf075e4ba + */ + Snapshot snapshot = RegisterSnapshot(GetTransactionSnapshot()); + + List *readDataFileStripeList = ReadDataFileStripeList(storageId, snapshot); + + UnregisterSnapshot(snapshot); + + return readDataFileStripeList; } diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index 1401da0a6..d95c53fb5 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -854,8 +854,11 @@ PostprocessIndexStmt(Node *node, const char *queryString) table_close(relation, NoLock); index_close(indexRelation, NoLock); + PushActiveSnapshot(GetTransactionSnapshot()); + /* mark index as invalid, in-place (cannot be rolled back) */ index_set_state_flags(indexRelationId, INDEX_DROP_CLEAR_VALID); + PopActiveSnapshot(); /* re-open a transaction command from here on out */ CommitTransactionCommand(); @@ -1370,8 +1373,11 @@ MarkIndexValid(IndexStmt *indexStmt) schemaId); Relation indexRelation = index_open(indexRelationId, RowExclusiveLock); + PushActiveSnapshot(GetTransactionSnapshot()); + /* mark index as valid, in-place (cannot be rolled back) */ index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID); + PopActiveSnapshot(); table_close(relation, NoLock); index_close(indexRelation, NoLock); diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index b43f6335e..7a77b6b3d 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -69,7 +69,15 @@ PreprocessCreateStatisticsStmt(Node *node, const char *queryString, { CreateStatsStmt *stmt = castNode(CreateStatsStmt, node); - RangeVar *relation = (RangeVar *) linitial(stmt->relations); + Node *relationNode = (Node *) linitial(stmt->relations); + + if (!IsA(relationNode, RangeVar)) + { + return NIL; + } + + RangeVar *relation = (RangeVar *) relationNode; + Oid relationId = RangeVarGetRelid(relation, ShareUpdateExclusiveLock, false); if (!IsCitusTable(relationId) || !ShouldPropagate()) diff --git a/src/backend/distributed/deparser/ruleutils_18.c b/src/backend/distributed/deparser/ruleutils_18.c index bd044f5e7..44e4c8d38 100644 --- a/src/backend/distributed/deparser/ruleutils_18.c +++ b/src/backend/distributed/deparser/ruleutils_18.c @@ -3804,6 +3804,8 @@ get_update_query_targetlist_def(Query *query, List *targetList, SubLink *cur_ma_sublink; List *ma_sublinks; + targetList = ExpandMergedSubscriptingRefEntries(targetList); + /* * Prepare to deal with MULTIEXPR assignments: collect the source SubLinks * into a list. We expect them to appear, in ID order, in resjunk tlist @@ -3827,6 +3829,8 @@ get_update_query_targetlist_def(Query *query, List *targetList, } } } + + ensure_update_targetlist_in_param_order(targetList); } next_ma_cell = list_head(ma_sublinks); cur_ma_sublink = NULL; diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index b1c441f92..c456fa341 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -41,6 +41,7 @@ static int SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, CitusTableCacheEntry *targetRelation); +static int FindTargetListEntryWithVarExprAttno(List *targetList, AttrNumber varattno); static Var * ValidateAndReturnVarIfSupported(Node *entryExpr); static DeferredErrorMessage * DeferErrorIfTargetHasFalseClause(Oid targetRelationId, PlannerRestrictionContext * @@ -422,10 +423,13 @@ ErrorIfMergeHasUnsupportedTables(Oid targetRelationId, List *rangeTableList) case RTE_VALUES: case RTE_JOIN: case RTE_CTE: - { - /* Skip them as base table(s) will be checked */ - continue; - } +#if PG_VERSION_NUM >= PG_VERSION_18 + case RTE_GROUP: +#endif + { + /* Skip them as base table(s) will be checked */ + continue; + } /* * RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations, @@ -628,6 +632,22 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, Query *query, } } + /* + * joinTree->quals, retrieved by GetMergeJoinTree() - either from + * mergeJoinCondition (PG >= 17) or jointree->quals (PG < 17), + * only contains the quals that present in "ON (..)" clause. Action + * quals that can be specified for each specific action, as in + * "WHEN AND THEN "", are + * saved into "qual" field of the corresponding action's entry in + * mergeActionList, see + * https://github.com/postgres/postgres/blob/e6da68a6e1d60a037b63a9c9ed36e5ef0a996769/src/backend/parser/parse_merge.c#L285-L293. + * + * For this reason, even if TargetEntryChangesValue() could prove that + * an action's quals ensure that the action cannot change the distribution + * key, this is not the case as we don't provide action quals to + * TargetEntryChangesValue(), but just joinTree, which only contains + * the "ON (..)" clause quals. + */ if (targetEntryDistributionColumn && TargetEntryChangesValue(targetEntry, distributionColumn, joinTree)) { @@ -1411,7 +1431,8 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, Assert(sourceRepartitionVar); int sourceResultRepartitionColumnIndex = - DistributionColumnIndex(sourceTargetList, sourceRepartitionVar); + FindTargetListEntryWithVarExprAttno(sourceTargetList, + sourceRepartitionVar->varattno); if (sourceResultRepartitionColumnIndex == -1) { @@ -1562,6 +1583,33 @@ FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query) } +/* + * FindTargetListEntryWithVarExprAttno finds the index of the target + * entry whose expr is a Var that points to input varattno. + * + * If no such target entry is found, it returns -1. + */ +static int +FindTargetListEntryWithVarExprAttno(List *targetList, AttrNumber varattno) +{ + int targetEntryIndex = 0; + + TargetEntry *targetEntry = NULL; + foreach_declared_ptr(targetEntry, targetList) + { + if (IsA(targetEntry->expr, Var) && + ((Var *) targetEntry->expr)->varattno == varattno) + { + return targetEntryIndex; + } + + targetEntryIndex++; + } + + return -1; +} + + /* * IsLocalTableModification returns true if the table modified is a Postgres table. * We do not support recursive planning for MERGE yet, so we could have a join diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 2c0a17d16..3584246ef 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -3173,16 +3173,25 @@ BuildBaseConstraint(Var *column) /* - * MakeOpExpression builds an operator expression node. This operator expression - * implements the operator clause as defined by the variable and the strategy - * number. + * MakeOpExpressionExtended builds an operator expression node that's of + * the form "Var Expr", where, Expr must either be a Const or a Var + * (*1). + * + * This operator expression implements the operator clause as defined by + * the variable and the strategy number. */ OpExpr * -MakeOpExpression(Var *variable, int16 strategyNumber) +MakeOpExpressionExtended(Var *leftVar, Expr *rightArg, int16 strategyNumber) { - Oid typeId = variable->vartype; - Oid typeModId = variable->vartypmod; - Oid collationId = variable->varcollid; + /* + * Other types of expressions are probably also fine to be used, but + * none of the callers need support for them for now, so we haven't + * tested them (*1). + */ + Assert(IsA(rightArg, Const) || IsA(rightArg, Var)); + + Oid typeId = leftVar->vartype; + Oid collationId = leftVar->varcollid; Oid accessMethodId = BTREE_AM_OID; @@ -3200,18 +3209,16 @@ MakeOpExpression(Var *variable, int16 strategyNumber) */ if (operatorClassInputType != typeId && typeType != TYPTYPE_PSEUDO) { - variable = (Var *) makeRelabelType((Expr *) variable, operatorClassInputType, - -1, collationId, COERCE_IMPLICIT_CAST); + leftVar = (Var *) makeRelabelType((Expr *) leftVar, operatorClassInputType, + -1, collationId, COERCE_IMPLICIT_CAST); } - Const *constantValue = makeNullConst(operatorClassInputType, typeModId, collationId); - /* Now make the expression with the given variable and a null constant */ OpExpr *expression = (OpExpr *) make_opclause(operatorId, InvalidOid, /* no result type yet */ false, /* no return set */ - (Expr *) variable, - (Expr *) constantValue, + (Expr *) leftVar, + rightArg, InvalidOid, collationId); /* Set implementing function id and result type */ @@ -3222,6 +3229,31 @@ MakeOpExpression(Var *variable, int16 strategyNumber) } +/* + * MakeOpExpression is a wrapper around MakeOpExpressionExtended + * that creates a null constant of the appropriate type for right + * hand side operator class input type. As a result, it builds an + * operator expression node that's of the form "Var NULL". + */ +OpExpr * +MakeOpExpression(Var *leftVar, int16 strategyNumber) +{ + Oid typeId = leftVar->vartype; + Oid typeModId = leftVar->vartypmod; + Oid collationId = leftVar->varcollid; + + Oid accessMethodId = BTREE_AM_OID; + + OperatorCacheEntry *operatorCacheEntry = LookupOperatorByType(typeId, accessMethodId, + strategyNumber); + Oid operatorClassInputType = operatorCacheEntry->operatorClassInputType; + + Const *constantValue = makeNullConst(operatorClassInputType, typeModId, collationId); + + return MakeOpExpressionExtended(leftVar, (Expr *) constantValue, strategyNumber); +} + + /* * LookupOperatorByType is a wrapper around GetOperatorByType(), * operatorClassInputType() and get_typtype() functions that uses a cache to avoid diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 43f79f30b..017dceef6 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -372,6 +372,25 @@ AddPartitionKeyNotNullFilterToSelect(Query *subqery) /* we should have found target partition column */ Assert(targetPartitionColumnVar != NULL); +#if PG_VERSION_NUM >= PG_VERSION_18 + if (subqery->hasGroupRTE) + { + /* if the partition column is a grouped column, we need to flatten it + * to ensure query deparsing works correctly. We choose to do this here + * instead of in ruletils.c because we want to keep the flattening logic + * close to the NOT NULL filter injection. + */ + RangeTblEntry *partitionRTE = rt_fetch(targetPartitionColumnVar->varno, + subqery->rtable); + if (partitionRTE->rtekind == RTE_GROUP) + { + targetPartitionColumnVar = (Var *) flatten_group_exprs(NULL, subqery, + (Node *) + targetPartitionColumnVar); + } + } +#endif + /* create expression for partition_column IS NOT NULL */ NullTest *nullTest = makeNode(NullTest); nullTest->nulltesttype = IS_NOT_NULL; @@ -1609,10 +1628,19 @@ MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context) /* * TargetEntryChangesValue determines whether the given target entry may - * change the value in a given column, given a join tree. The result is - * true unless the expression refers directly to the column, or the - * expression is a value that is implied by the qualifiers of the join - * tree, or the target entry sets a different column. + * change the value given a column and a join tree. + * + * The function assumes that the "targetEntry" references given "column" + * Var via its "resname" and is used as part of a modify query. This means + * that, for example, for an update query, the input "targetEntry" constructs + * the following assignment operation as part of the SET clause: + * "col_a = expr_a ", where, "col_a" refers to input "column" Var (via + * "resname") as per the assumption written above. And we want to understand + * if "expr_a" (which is pointed to by targetEntry->expr) refers directly to + * the "column" Var, or "expr_a" is a value that is implied to be equal + * to "column" Var by the qualifiers of the join tree. If so, we know that + * the value of "col_a" effectively cannot be changed by this assignment + * operation. */ bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree) @@ -1623,11 +1651,36 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre if (IsA(setExpr, Var)) { Var *newValue = (Var *) setExpr; - if (newValue->varattno == column->varattno) + if (column->varno == newValue->varno && + column->varattno == newValue->varattno) { - /* target entry of the form SET col = table.col */ + /* + * Target entry is of the form "SET col_a = foo.col_b", + * where foo also points to the same range table entry + * and col_a and col_b are the same. So, effectively + * they're literally referring to the same column. + */ isColumnValueChanged = false; } + else + { + List *restrictClauseList = WhereClauseList(joinTree); + OpExpr *equalityExpr = MakeOpExpressionExtended(column, (Expr *) newValue, + BTEqualStrategyNumber); + + bool predicateIsImplied = predicate_implied_by(list_make1(equalityExpr), + restrictClauseList, false); + if (predicateIsImplied) + { + /* + * Target entry is of the form + * "SET col_a = foo.col_b WHERE col_a = foo.col_b (AND (...))", + * where foo points to a different relation or it points + * to the same relation but col_a is not the same column as col_b. + */ + isColumnValueChanged = false; + } + } } else if (IsA(setExpr, Const)) { @@ -1648,7 +1701,10 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre restrictClauseList, false); if (predicateIsImplied) { - /* target entry of the form SET col = WHERE col = AND ... */ + /* + * Target entry is of the form + * "SET col_a = const_a WHERE col_a = const_a (AND (...))". + */ isColumnValueChanged = false; } } diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 84f94dbb6..f7232e2bb 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -2029,7 +2029,9 @@ SubqueryPushdownMultiNodeTree(Query *originalQuery) pushedDownQuery->setOperations = copyObject(queryTree->setOperations); pushedDownQuery->querySource = queryTree->querySource; pushedDownQuery->hasSubLinks = queryTree->hasSubLinks; - +#if PG_VERSION_NUM >= PG_VERSION_18 + pushedDownQuery->hasGroupRTE = queryTree->hasGroupRTE; +#endif MultiTable *subqueryNode = MultiSubqueryPushdownTable(pushedDownQuery); SetChild((MultiUnaryNode *) subqueryCollectNode, (MultiNode *) subqueryNode); diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 94c99ef20..c657c7c03 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -2431,7 +2431,7 @@ FilterJoinRestrictionContext(JoinRestrictionContext *joinRestrictionContext, Rel /* * RangeTableArrayContainsAnyRTEIdentities returns true if any of the range table entries - * int rangeTableEntries array is an range table relation specified in queryRteIdentities. + * in rangeTableEntries array is a range table relation specified in queryRteIdentities. */ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int @@ -2444,6 +2444,18 @@ RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int List *rangeTableRelationList = NULL; ListCell *rteRelationCell = NULL; +#if PG_VERSION_NUM >= PG_VERSION_18 + + /* + * In PG18+, planner array simple_rte_array may contain NULL entries + * for "dead relations". See PG commits 5f6f951 and e9a20e4 for details. + */ + if (rangeTableEntry == NULL) + { + continue; + } +#endif + /* * Get list of all RTE_RELATIONs in the given range table entry * (i.e.,rangeTableEntry could be a subquery where we're interested diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 1040b4149..25ca24ec7 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -586,7 +586,8 @@ extern DistributedPlan * CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree, plannerRestrictionContext); extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryString); - +extern OpExpr * MakeOpExpressionExtended(Var *leftVar, Expr *rightArg, + int16 strategyNumber); extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber); extern Node * WrapUngroupedVarsInAnyValueAggregate(Node *expression, List *groupClauseList, diff --git a/src/test/regress/expected/merge_repartition2.out b/src/test/regress/expected/merge_repartition2.out index 524ae84f7..99cb8fbba 100644 --- a/src/test/regress/expected/merge_repartition2.out +++ b/src/test/regress/expected/merge_repartition2.out @@ -193,13 +193,148 @@ SQL function "compare_data" statement 2 (1 row) +---- https://github.com/citusdata/citus/issues/8180 ---- +CREATE TABLE dist_1 (a int, b int, c int); +CREATE TABLE dist_2 (a int, b int, c int); +CREATE TABLE dist_different_order_1 (b int, a int, c int); +SELECT create_distributed_table('dist_1', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_2', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_different_order_1', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO dist_1 +USING dist_2 +ON (dist_1.a = dist_2.b) +WHEN MATCHED THEN UPDATE SET b = dist_2.b; +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET b = src.b; +MERGE INTO dist_different_order_1 +USING dist_1 +ON (dist_different_order_1.a = dist_1.b) +WHEN MATCHED THEN UPDATE SET b = dist_1.b; +CREATE TABLE dist_1_cast (a int, b int); +CREATE TABLE dist_2_cast (a int, b numeric); +SELECT create_distributed_table('dist_1_cast', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_2_cast', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO dist_1_cast +USING dist_2_cast +ON (dist_1_cast.a = dist_2_cast.b) +WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b; +ERROR: In the MERGE ON clause, there is a datatype mismatch between target's distribution column and the expression originating from the source. +DETAIL: If the types are different, Citus uses different hash functions for the two column types, which might lead to incorrect repartitioning of the result data +MERGE INTO dist_1_cast +USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast +ON (dist_1_cast.a = dist_2_cast.b) +WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b; +-- a more sophisticated example +CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); +CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); +SELECT create_distributed_table('dist_source', 'tstamp_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_target', 'int_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(1001, 2000) i; +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(901, 1000) i; +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1501, 2000) i; +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i-1, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1401, 1500) i; +INSERT INTO local_source SELECT * FROM dist_source; +INSERT INTO local_target SELECT * FROM dist_target; +-- execute the query on distributed tables +MERGE INTO dist_target target_alias +USING dist_source source_alias +ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col) +WHEN MATCHED THEN UPDATE SET + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +WHEN NOT MATCHED THEN + INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col ); +-- execute the same query on local tables, everything is the same except table names behind the aliases +MERGE INTO local_target target_alias +USING local_source source_alias +ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col) +WHEN MATCHED THEN UPDATE SET + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +WHEN NOT MATCHED THEN + INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col ); +-- compare both targets +SELECT COUNT(*) = 0 AS targets_match +FROM ( + SELECT * FROM dist_target + EXCEPT + SELECT * FROM local_target + UNION ALL + SELECT * FROM local_target + EXCEPT + SELECT * FROM dist_target +) q; + targets_match +--------------------------------------------------------------------- + t +(1 row) + +SET client_min_messages TO WARNING; DROP SCHEMA merge_repartition2_schema CASCADE; -NOTICE: drop cascades to 8 other objects -DETAIL: drop cascades to table pg_target -drop cascades to table pg_source -drop cascades to function cleanup_data() -drop cascades to function setup_data() -drop cascades to function check_data(text,text,text,text) -drop cascades to function compare_data() -drop cascades to table citus_target -drop cascades to table citus_source diff --git a/src/test/regress/expected/mixed_relkind_tests.out b/src/test/regress/expected/mixed_relkind_tests.out index b168cd7be..b2c30d1e4 100644 --- a/src/test/regress/expected/mixed_relkind_tests.out +++ b/src/test/regress/expected/mixed_relkind_tests.out @@ -394,9 +394,9 @@ DEBUG: Wrapping relation "mat_view_on_part_dist" "foo" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT a FROM mixed_relkind_tests.mat_view_on_part_dist foo WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE mixed_relkind_tests.partitioned_distributed_table SET a = foo.a FROM (SELECT foo_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) foo_1) foo WHERE (foo.a OPERATOR(pg_catalog.=) partitioned_distributed_table.a) UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a; -ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +ERROR: modifying the partition value of rows is not allowed UPDATE partitioned_distributed_table SET a = foo.a FROM distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a; -ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +ERROR: modifying the partition value of rows is not allowed -- should work UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a = partitioned_distributed_table.a; UPDATE partitioned_distributed_table SET a = foo.a FROM view_on_part_dist AS foo WHERE foo.a = partitioned_distributed_table.a; diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index cebef0526..748ce7728 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -2,6 +2,7 @@ SET citus.shard_count TO 32; SET citus.next_shard_id TO 750000; SET citus.next_placement_id TO 750000; CREATE SCHEMA multi_modifications; +SET search_path TO multi_modifications; -- some failure messages that comes from the worker nodes -- might change due to parallel executions, so suppress those -- using \set VERBOSITY terse @@ -31,8 +32,12 @@ SELECT create_distributed_table('limit_orders', 'id', 'hash'); (1 row) -SELECT create_distributed_table('multiple_hash', 'id', 'hash'); -ERROR: column "id" of relation "multiple_hash" does not exist +SELECT create_distributed_table('multiple_hash', 'category', 'hash'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + SELECT create_distributed_table('range_partitioned', 'id', 'range'); create_distributed_table --------------------------------------------------------------------- @@ -344,22 +349,26 @@ ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001 -- Test that shards which miss a modification are marked unhealthy -- First: Connect to the second worker node \c - - - :worker_2_port +SET search_path TO multi_modifications; -- Second: Move aside limit_orders shard on the second worker node ALTER TABLE limit_orders_750000 RENAME TO renamed_orders; -- Third: Connect back to master node \c - - - :master_port +SET search_path TO multi_modifications; -- Fourth: Perform an INSERT on the remaining node -- the whole transaction should fail \set VERBOSITY terse INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); -ERROR: relation "public.limit_orders_750000" does not exist +ERROR: relation "multi_modifications.limit_orders_750000" does not exist -- set the shard name back \c - - - :worker_2_port +SET search_path TO multi_modifications; -- Second: Move aside limit_orders shard on the second worker node ALTER TABLE renamed_orders RENAME TO limit_orders_750000; -- Verify the insert failed and both placements are healthy -- or the insert succeeded and placement marked unhealthy \c - - - :worker_1_port +SET search_path TO multi_modifications; SELECT count(*) FROM limit_orders_750000 WHERE id = 276; count --------------------------------------------------------------------- @@ -367,6 +376,7 @@ SELECT count(*) FROM limit_orders_750000 WHERE id = 276; (1 row) \c - - - :worker_2_port +SET search_path TO multi_modifications; SELECT count(*) FROM limit_orders_750000 WHERE id = 276; count --------------------------------------------------------------------- @@ -374,6 +384,7 @@ SELECT count(*) FROM limit_orders_750000 WHERE id = 276; (1 row) \c - - - :master_port +SET search_path TO multi_modifications; SELECT count(*) FROM limit_orders WHERE id = 276; count --------------------------------------------------------------------- @@ -394,14 +405,16 @@ AND s.logicalrelid = 'limit_orders'::regclass; -- Test that if all shards miss a modification, no state change occurs -- First: Connect to the first worker node \c - - - :worker_1_port +SET search_path TO multi_modifications; -- Second: Move aside limit_orders shard on the second worker node ALTER TABLE limit_orders_750000 RENAME TO renamed_orders; -- Third: Connect back to master node \c - - - :master_port +SET search_path TO multi_modifications; -- Fourth: Perform an INSERT on the remaining node \set VERBOSITY terse INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); -ERROR: relation "public.limit_orders_750000" does not exist +ERROR: relation "multi_modifications.limit_orders_750000" does not exist \set VERBOSITY DEFAULT -- Last: Verify worker is still healthy SELECT count(*) @@ -420,10 +433,12 @@ AND s.logicalrelid = 'limit_orders'::regclass; -- Undo our change... -- First: Connect to the first worker node \c - - - :worker_1_port +SET search_path TO multi_modifications; -- Second: Move aside limit_orders shard on the second worker node ALTER TABLE renamed_orders RENAME TO limit_orders_750000; -- Third: Connect back to master node \c - - - :master_port +SET search_path TO multi_modifications; -- attempting to change the partition key is unsupported UPDATE limit_orders SET id = 0 WHERE id = 246; ERROR: modifying the partition value of rows is not allowed @@ -433,6 +448,368 @@ ERROR: modifying the partition value of rows is not allowed UPDATE limit_orders SET id = 246 WHERE id = 246; UPDATE limit_orders SET id = 246 WHERE id = 246 AND symbol = 'GM'; UPDATE limit_orders SET id = limit_orders.id WHERE id = 246; +CREATE TABLE dist_1 (a int, b int, c int); +CREATE TABLE dist_2 (a int, b int, c int); +CREATE TABLE dist_non_colocated (a int, b int, c int); +CREATE TABLE dist_different_order_1 (b int, a int, c int); +SELECT create_distributed_table('dist_1', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_2', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_non_colocated', 'a', colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_different_order_1', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- +-- https://github.com/citusdata/citus/issues/8087 +-- +---- update: should work ---- +-- setting shard key to itself -- +UPDATE dist_1 SET a = dist_1.a; +UPDATE dist_1 SET a = dist_1.a WHERE dist_1.a > dist_1.b AND dist_1.b > 10; +UPDATE dist_1 SET a = dist_1.a FROM dist_2 WHERE dist_1.a = dist_2.a; +-- setting shard key to another var that's implied to be equal to shard key -- +UPDATE dist_1 SET a = b WHERE a = b; +UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a; +UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.b = dist_2.c AND (dist_2.c > 5 OR dist_2.c < 0); +with cte as ( +select a, b from dist_1 +) +update dist_1 set a = cte.a from cte where dist_1.a = cte.a; +with cte as ( +select a as x, b as y from (select a, b from dist_1 limit 100) dt where b > 100 +) +update dist_1 set a = cte.x from cte where dist_1.a = cte.x; +with cte as ( +select d2.a as x, d1.b as y +from dist_1 d1, dist_different_order_1 d2 +where d1.a=d2.a) +update dist_1 set a = cte.x from cte where y != 0 and dist_1.a = cte.x; +with cte as ( +select * from (select a as x, b as y from dist_2 limit 100) q +) +update dist_1 set a = cte.x from cte where b = cte.y and cte.y = a and a = cte.x; +-- supported although the where clause will certainly eval to false +UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.a = 5 AND dist_2.a = 7; +-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 -- +UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a; +-- test with extra quals +UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a AND dist_1.b = dist_different_order_1.c AND (dist_different_order_1.c > 5 OR dist_different_order_1.c < 0); +---- update: errors in router planner ---- +-- different column of the same relation, which is not implied to be equal to shard key -- +UPDATE dist_1 SET a = dist_1.b; +ERROR: modifying the partition value of rows is not allowed +-- another range table entry's column with the same attno, which is not implied to be equal to shard key -- +UPDATE dist_1 SET a = dist_2.a FROM dist_2; +ERROR: modifying the partition value of rows is not allowed +UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a != dist_2.a; +ERROR: modifying the partition value of rows is not allowed +UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a >= dist_2.a; +ERROR: modifying the partition value of rows is not allowed +UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a OR dist_1.a > dist_2.a; +ERROR: modifying the partition value of rows is not allowed +UPDATE dist_1 SET a = dist_different_order_1.b FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a; +ERROR: modifying the partition value of rows is not allowed +UPDATE dist_1 SET a = foo.a FROM dist_1 foo; +ERROR: modifying the partition value of rows is not allowed +UPDATE dist_1 SET a = foo.a FROM dist_1 foo WHERE dist_1.a != foo.a; +ERROR: modifying the partition value of rows is not allowed +-- (*1) Would normally expect this to not throw an error because +-- dist_1.a = dist_2.b AND dist_2.b = dist_2.a, +-- so dist_1.a = dist_2.a, so we should be able to deduce +-- that (dist_1.)a = dist_2.a, but seems predicate_implied_by() +-- is not that smart. +UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.b AND dist_2.b = dist_2.a; +ERROR: modifying the partition value of rows is not allowed +-- and same here +with cte as ( +select * from (select a as x, b as y from dist_different_order_1 limit 100) q +) +update dist_1 set a = cte.x from cte where a = cte.y and cte.y = b and b = cte.x; +ERROR: modifying the partition value of rows is not allowed +---- update: errors later (in logical or physical planner) ---- +-- setting shard key to itself -- +UPDATE dist_1 SET a = dist_1.a FROM dist_1 foo; +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +UPDATE dist_1 SET a = dist_1.a FROM dist_2 foo; +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +-- setting shard key to another var that's implied to be equal to shard key -- +UPDATE dist_1 SET a = dist_non_colocated.a FROM dist_non_colocated WHERE dist_1.a = dist_non_colocated.a; +ERROR: cannot push down this subquery +DETAIL: dist_1 and dist_non_colocated are not colocated +UPDATE dist_1 SET a = dist_2.b FROM dist_2 WHERE dist_1.a = dist_2.b; +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +---- update: a more sophisticated example ---- +CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); +CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); +SELECT create_distributed_table('dist_source', 'int_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_target', 'int_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(1001, 2000) i; +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(901, 1000) i; +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1501, 2000) i; +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i-1, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1401, 1500) i; +INSERT INTO local_source SELECT * FROM dist_source; +INSERT INTO local_target SELECT * FROM dist_target; +-- execute the query on distributed tables +UPDATE dist_target target_alias +SET int_col = source_alias.int_col, + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +FROM dist_source source_alias +WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col; +-- execute the same query on local tables, everything is the same except table names behind the aliases +UPDATE local_target target_alias +SET int_col = source_alias.int_col, + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +FROM local_source source_alias +WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col; +-- compare both targets +SELECT COUNT(*) = 0 AS targets_match +FROM ( + SELECT * FROM dist_target + EXCEPT + SELECT * FROM local_target + UNION ALL + SELECT * FROM local_target + EXCEPT + SELECT * FROM dist_target +) q; + targets_match +--------------------------------------------------------------------- + t +(1 row) + +---- merge: should work ---- +-- setting shard key to itself -- +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.a) +WHEN MATCHED THEN UPDATE SET a = dist_1.a; +-- We don't care about action quals when deciding if the update +-- could change the shard key, but still add some action quals for +-- testing. See the comments written on top of the line we call +-- TargetEntryChangesValue() in MergeQualAndTargetListFunctionsSupported(). +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.a) +WHEN MATCHED AND dist_1.a > dist_1.b AND dist_1.b > 10 THEN UPDATE SET a = dist_1.a; +MERGE INTO dist_1 +USING dist_2 src +ON (dist_1.a = src.a) +WHEN MATCHED THEN UPDATE SET a = dist_1.a; +MERGE INTO dist_1 +USING dist_2 src +ON (dist_1.a = src.a) +WHEN MATCHED THEN UPDATE SET a = src.a; +-- setting shard key to another var that's implied to be equal to shard key -- +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.a AND dist_1.a = dist_1.b) +WHEN MATCHED THEN UPDATE SET a = dist_1.b; +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET a = src.b; +MERGE INTO dist_1 +USING dist_2 src +ON (dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET a = src.b; +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.a AND dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET a = src.b; +MERGE INTO dist_1 +USING dist_2 src +ON (dist_1.a = src.a AND dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET a = src.b; +-- test with extra quals +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.a AND dist_1.a = src.b AND (dist_1.b > 1000 OR (dist_1.b < 500))) +WHEN MATCHED THEN UPDATE SET a = src.b; +-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 -- +MERGE INTO dist_1 +USING dist_different_order_1 src +ON (dist_1.a = src.a AND dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET a = src.b; +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.a) +WHEN MATCHED THEN UPDATE SET a = src.a; +---- merge: errors in router planner ---- +-- different column of the same relation, which is not implied to be equal to shard key -- +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.a) +WHEN MATCHED THEN UPDATE SET a = dist_1.b; +ERROR: updating the distribution column is not allowed in MERGE actions +-- another range table entry's column with the same attno, which is not implied to be equal to shard key -- +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET a = src.a; +ERROR: updating the distribution column is not allowed in MERGE actions +-- as in (*1), this is not supported +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.b AND src.b = src.a) +WHEN MATCHED THEN UPDATE SET a = src.a; +ERROR: updating the distribution column is not allowed in MERGE actions +MERGE INTO dist_1 +USING dist_2 src +ON (true) +WHEN MATCHED THEN UPDATE SET a = src.a; +ERROR: updating the distribution column is not allowed in MERGE actions +MERGE INTO dist_1 +USING dist_2 src +ON (dist_1.a <= src.a) +WHEN MATCHED THEN UPDATE SET a = src.a; +ERROR: updating the distribution column is not allowed in MERGE actions +---- merge: a more sophisticated example ---- +DROP TABLE dist_source, dist_target, local_source, local_target; +CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); +CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); +SELECT create_distributed_table('dist_source', 'tstamp_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_target', 'int_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(1001, 2000) i; +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(901, 1000) i; +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1501, 2000) i; +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i-1, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1401, 1500) i; +INSERT INTO local_source SELECT * FROM dist_source; +INSERT INTO local_target SELECT * FROM dist_target; +-- execute the query on distributed tables +MERGE INTO dist_target target_alias +USING dist_source source_alias +ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col) +WHEN MATCHED THEN UPDATE SET + int_col = source_alias.int_col, + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +WHEN NOT MATCHED THEN + INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col ); +-- execute the same query on local tables, everything is the same except table names behind the aliases +MERGE INTO local_target target_alias +USING local_source source_alias +ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col) +WHEN MATCHED THEN UPDATE SET + int_col = source_alias.int_col, + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +WHEN NOT MATCHED THEN + INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col ); +-- compare both targets +SELECT COUNT(*) = 0 AS targets_match +FROM ( + SELECT * FROM dist_target + EXCEPT + SELECT * FROM local_target + UNION ALL + SELECT * FROM local_target + EXCEPT + SELECT * FROM dist_target +) q; + targets_match +--------------------------------------------------------------------- + t +(1 row) + -- UPDATEs with a FROM clause are supported even with local tables UPDATE limit_orders SET limit_price = 0.00 FROM bidders WHERE limit_orders.id = 246 AND @@ -1353,19 +1730,5 @@ CREATE TABLE multi_modifications.local (a int default 1, b int); INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table)); ERROR: subqueries are not supported within INSERT queries HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. -DROP TABLE insufficient_shards; -DROP TABLE raw_table; -DROP TABLE summary_table; -DROP TABLE reference_raw_table; -DROP TABLE reference_summary_table; -DROP TABLE limit_orders; -DROP TABLE multiple_hash; -DROP TABLE range_partitioned; -DROP TABLE append_partitioned; -DROP TABLE bidders; -DROP FUNCTION stable_append; -DROP FUNCTION immutable_append; -DROP FUNCTION temp_strict_func; -DROP TYPE order_side; +SET client_min_messages TO WARNING; DROP SCHEMA multi_modifications CASCADE; -NOTICE: drop cascades to table multi_modifications.local diff --git a/src/test/regress/expected/multi_sql_function.out b/src/test/regress/expected/multi_sql_function.out index bcc4efcca..87c451f99 100644 --- a/src/test/regress/expected/multi_sql_function.out +++ b/src/test/regress/expected/multi_sql_function.out @@ -323,11 +323,15 @@ HINT: Consider using PL/pgSQL functions instead. SELECT (SELECT 1 FROM test_parameterized_sql limit 1) FROM test_parameterized_sql_function(1); ERROR: cannot perform distributed planning on this query because parameterized queries for SQL functions referencing distributed tables are not supported HINT: Consider using PL/pgSQL functions instead. +-- this fails pg <18 +-- succeeds in pg18 because of pg18 commit 0dca5d68d +-- where sql functions behave as PL/pgSQL functions and use the cache SELECT test_parameterized_sql_function_in_subquery_where(1); -ERROR: could not create distributed plan -DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus. -HINT: Consider using PL/pgSQL functions instead. -CONTEXT: SQL function "test_parameterized_sql_function_in_subquery_where" statement 1 + test_parameterized_sql_function_in_subquery_where +--------------------------------------------------------------------- + 1 +(1 row) + -- postgres behaves slightly differently for the following -- query where the target list is empty SELECT test_parameterized_sql_function(1); diff --git a/src/test/regress/expected/multi_sql_function_0.out b/src/test/regress/expected/multi_sql_function_0.out new file mode 100644 index 000000000..d3e02ced7 --- /dev/null +++ b/src/test/regress/expected/multi_sql_function_0.out @@ -0,0 +1,382 @@ +-- +-- MULTI_SQL_FUNCTION +-- +SET citus.next_shard_id TO 1230000; +CREATE FUNCTION sql_test_no_1() RETURNS bigint AS ' + SELECT + count(*) + FROM + orders; +' LANGUAGE SQL; +CREATE FUNCTION sql_test_no_2() RETURNS bigint AS ' + SELECT + count(*) + FROM + orders, lineitem + WHERE + o_orderkey = l_orderkey; +' LANGUAGE SQL; +CREATE FUNCTION sql_test_no_3() RETURNS bigint AS ' + SELECT + count(*) + FROM + orders, customer + WHERE + o_custkey = c_custkey; +' LANGUAGE SQL; +CREATE FUNCTION sql_test_no_4() RETURNS bigint AS ' + SELECT + count(*) + FROM + orders, customer, lineitem + WHERE + o_custkey = c_custkey AND + o_orderkey = l_orderkey; +' LANGUAGE SQL; +SET client_min_messages TO INFO; +-- now, run plain SQL functions +SELECT sql_test_no_1(); + sql_test_no_1 +--------------------------------------------------------------------- + 2985 +(1 row) + +SELECT sql_test_no_2(); + sql_test_no_2 +--------------------------------------------------------------------- + 12000 +(1 row) + +SELECT sql_test_no_3(); + sql_test_no_3 +--------------------------------------------------------------------- + 1956 +(1 row) + +SELECT sql_test_no_4(); + sql_test_no_4 +--------------------------------------------------------------------- + 7806 +(1 row) + +-- run the tests which do not require re-partition +-- with real-time executor +-- now, run plain SQL functions +SELECT sql_test_no_1(); + sql_test_no_1 +--------------------------------------------------------------------- + 2985 +(1 row) + +SELECT sql_test_no_2(); + sql_test_no_2 +--------------------------------------------------------------------- + 12000 +(1 row) + +-- test router executor parameterized sql functions +CREATE TABLE temp_table ( + key int, + value int +); +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('temp_table','key','hash'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE FUNCTION no_parameter_insert_sql() RETURNS void AS $$ + INSERT INTO temp_table (key) VALUES (0); +$$ LANGUAGE SQL; +-- execute 6 times +SELECT no_parameter_insert_sql(); + no_parameter_insert_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT no_parameter_insert_sql(); + no_parameter_insert_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT no_parameter_insert_sql(); + no_parameter_insert_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT no_parameter_insert_sql(); + no_parameter_insert_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT no_parameter_insert_sql(); + no_parameter_insert_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT no_parameter_insert_sql(); + no_parameter_insert_sql +--------------------------------------------------------------------- + +(1 row) + +CREATE FUNCTION non_partition_parameter_insert_sql(int) RETURNS void AS $$ + INSERT INTO temp_table (key, value) VALUES (0, $1); +$$ LANGUAGE SQL; +-- execute 6 times +SELECT non_partition_parameter_insert_sql(10); + non_partition_parameter_insert_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_insert_sql(20); + non_partition_parameter_insert_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_insert_sql(30); + non_partition_parameter_insert_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_insert_sql(40); + non_partition_parameter_insert_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_insert_sql(50); + non_partition_parameter_insert_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_insert_sql(60); + non_partition_parameter_insert_sql +--------------------------------------------------------------------- + +(1 row) + +-- check inserted values +SELECT * FROM temp_table ORDER BY key, value; + key | value +--------------------------------------------------------------------- + 0 | 10 + 0 | 20 + 0 | 30 + 0 | 40 + 0 | 50 + 0 | 60 + 0 | + 0 | + 0 | + 0 | + 0 | + 0 | +(12 rows) + +-- check updates +CREATE FUNCTION non_partition_parameter_update_sql(int, int) RETURNS void AS $$ + UPDATE temp_table SET value = $2 WHERE key = 0 AND value = $1; +$$ LANGUAGE SQL; +-- execute 6 times +SELECT non_partition_parameter_update_sql(10, 12); + non_partition_parameter_update_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_update_sql(20, 22); + non_partition_parameter_update_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_update_sql(30, 32); + non_partition_parameter_update_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_update_sql(40, 42); + non_partition_parameter_update_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_update_sql(50, 52); + non_partition_parameter_update_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_update_sql(60, 62); + non_partition_parameter_update_sql +--------------------------------------------------------------------- + +(1 row) + +-- check after updates +SELECT * FROM temp_table ORDER BY key, value; + key | value +--------------------------------------------------------------------- + 0 | 12 + 0 | 22 + 0 | 32 + 0 | 42 + 0 | 52 + 0 | 62 + 0 | + 0 | + 0 | + 0 | + 0 | + 0 | +(12 rows) + +-- check deletes +CREATE FUNCTION non_partition_parameter_delete_sql(int) RETURNS void AS $$ + DELETE FROM temp_table WHERE key = 0 AND value = $1; +$$ LANGUAGE SQL; +-- execute 6 times to trigger prepared statement usage +SELECT non_partition_parameter_delete_sql(12); + non_partition_parameter_delete_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_delete_sql(22); + non_partition_parameter_delete_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_delete_sql(32); + non_partition_parameter_delete_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_delete_sql(42); + non_partition_parameter_delete_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_delete_sql(52); + non_partition_parameter_delete_sql +--------------------------------------------------------------------- + +(1 row) + +SELECT non_partition_parameter_delete_sql(62); + non_partition_parameter_delete_sql +--------------------------------------------------------------------- + +(1 row) + +-- check after deletes +SELECT * FROM temp_table ORDER BY key, value; + key | value +--------------------------------------------------------------------- + 0 | + 0 | + 0 | + 0 | + 0 | + 0 | +(6 rows) + +-- test running parameterized SQL function +CREATE TABLE test_parameterized_sql(id integer, org_id integer); +select create_distributed_table('test_parameterized_sql','org_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION test_parameterized_sql_function(org_id_val integer) +RETURNS TABLE (a bigint) +AS $$ + SELECT count(*) AS count_val from test_parameterized_sql where org_id = org_id_val; +$$ LANGUAGE SQL STABLE; +CREATE OR REPLACE FUNCTION test_parameterized_sql_function_in_subquery_where(org_id_val integer) +RETURNS TABLE (a bigint) +AS $$ + SELECT count(*) AS count_val from test_parameterized_sql as t1 where + org_id IN (SELECT org_id FROM test_parameterized_sql as t2 WHERE t2.org_id = t1.org_id AND org_id = org_id_val); +$$ LANGUAGE SQL STABLE; +INSERT INTO test_parameterized_sql VALUES(1, 1); +-- all of them should fail +SELECT * FROM test_parameterized_sql_function(1); +ERROR: cannot perform distributed planning on this query because parameterized queries for SQL functions referencing distributed tables are not supported +HINT: Consider using PL/pgSQL functions instead. +SELECT (SELECT 1 FROM test_parameterized_sql limit 1) FROM test_parameterized_sql_function(1); +ERROR: cannot perform distributed planning on this query because parameterized queries for SQL functions referencing distributed tables are not supported +HINT: Consider using PL/pgSQL functions instead. +-- this fails pg <18 +-- succeeds in pg18 because of pg18 commit 0dca5d68d +-- where sql functions behave as PL/pgSQL functions and use the cache +SELECT test_parameterized_sql_function_in_subquery_where(1); +ERROR: could not create distributed plan +DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus. +HINT: Consider using PL/pgSQL functions instead. +CONTEXT: SQL function "test_parameterized_sql_function_in_subquery_where" statement 1 +-- postgres behaves slightly differently for the following +-- query where the target list is empty +SELECT test_parameterized_sql_function(1); + test_parameterized_sql_function +--------------------------------------------------------------------- + 1 +(1 row) + +-- test that sql function calls are treated as multi-statement transactions +-- and are rolled back properly. Single-row inserts for not-replicated tables +-- don't go over 2PC if they are not part of a bigger transaction. +CREATE TABLE table_with_unique_constraint (a int UNIQUE); +SELECT create_distributed_table('table_with_unique_constraint', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table_with_unique_constraint VALUES (1), (2), (3); +CREATE OR REPLACE FUNCTION insert_twice() RETURNS VOID +AS $$ + INSERT INTO table_with_unique_constraint VALUES (4); + INSERT INTO table_with_unique_constraint VALUES (4); +$$ LANGUAGE SQL; +SELECT insert_twice(); +ERROR: duplicate key value violates unique constraint "table_with_unique_constraint_a_key_1230009" +DETAIL: Key (a)=(4) already exists. +CONTEXT: while executing command on localhost:xxxxx +SQL function "insert_twice" statement 2 +SELECT * FROM table_with_unique_constraint ORDER BY a; + a +--------------------------------------------------------------------- + 1 + 2 + 3 +(3 rows) + +DROP TABLE temp_table, test_parameterized_sql, table_with_unique_constraint; +-- clean-up functions +DROP FUNCTION sql_test_no_1(); +DROP FUNCTION sql_test_no_2(); +DROP FUNCTION sql_test_no_3(); +DROP FUNCTION sql_test_no_4(); +DROP FUNCTION no_parameter_insert_sql(); +DROP FUNCTION non_partition_parameter_insert_sql(int); +DROP FUNCTION non_partition_parameter_update_sql(int, int); +DROP FUNCTION non_partition_parameter_delete_sql(int); +DROP FUNCTION test_parameterized_sql_function(int); +DROP FUNCTION test_parameterized_sql_function_in_subquery_where(int); diff --git a/src/test/regress/expected/multi_test_catalog_views.out b/src/test/regress/expected/multi_test_catalog_views.out index 8c255f94e..65ca8f637 100644 --- a/src/test/regress/expected/multi_test_catalog_views.out +++ b/src/test/regress/expected/multi_test_catalog_views.out @@ -70,15 +70,19 @@ SELECT "name" AS "Column", "relid" FROM table_attrs; -CREATE VIEW table_checks AS -SELECT cc.constraint_name AS "Constraint", - ('CHECK ' || regexp_replace(check_clause, '^\((.*)\)$', '\1')) AS "Definition", - format('%I.%I', ccu.table_schema, ccu.table_name)::regclass::oid AS relid -FROM information_schema.check_constraints cc, - information_schema.constraint_column_usage ccu -WHERE cc.constraint_schema = ccu.constraint_schema AND - cc.constraint_name = ccu.constraint_name -ORDER BY cc.constraint_name ASC; +CREATE OR REPLACE VIEW table_checks AS +SELECT + c.conname AS "Constraint", + 'CHECK ' || + -- drop a single pair of outer parens if the deparser adds them + regexp_replace(pg_get_expr(c.conbin, c.conrelid, true), '^\((.*)\)$', '\1') + AS "Definition", + c.conrelid AS relid +FROM pg_catalog.pg_constraint AS c +WHERE c.contype <> 'n' -- drop NOT NULL + AND c.conbin IS NOT NULL -- only things with an expression (i.e., CHECKs) + AND c.conrelid <> 0 -- table-level (exclude domain checks) +ORDER BY "Constraint", "Definition"; CREATE VIEW index_attrs AS WITH indexoid AS ( diff --git a/src/test/regress/expected/pg18.out b/src/test/regress/expected/pg18.out new file mode 100644 index 000000000..fd42f4070 --- /dev/null +++ b/src/test/regress/expected/pg18.out @@ -0,0 +1,169 @@ +-- +-- PG18 +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 18 AS server_version_ge_18 +\gset +-- test invalid statistics +-- behavior is same among PG versions, error message differs +-- relevant PG18 commit: 3eea4dc2c7 +CREATE STATISTICS tst ON a FROM (VALUES (x)) AS foo; +ERROR: cannot create statistics on the specified relation +DETAIL: CREATE STATISTICS only supports tables, foreign tables and materialized views. +\if :server_version_ge_18 +\else +\q +\endif +-- PG18-specific tests go here. +-- +-- Purpose: Verify PG18 behavior that NOT NULL constraints are materialized +-- as pg_constraint rows with contype = 'n' on both coordinator and +-- worker shards. Also confirm our helper view (table_checks) does +-- NOT surface NOT NULL entries. +-- https://github.com/postgres/postgres/commit/14e87ffa5c543b5f30ead7413084c25f7735039f +CREATE SCHEMA pg18_nn; +SET search_path TO pg18_nn; +-- Local control table +DROP TABLE IF EXISTS nn_local CASCADE; +NOTICE: table "nn_local" does not exist, skipping +CREATE TABLE nn_local( + a int NOT NULL, + b int, + c text NOT NULL +); +-- Distributed table +DROP TABLE IF EXISTS nn_dist CASCADE; +NOTICE: table "nn_dist" does not exist, skipping +CREATE TABLE nn_dist( + a int NOT NULL, + b int, + c text NOT NULL +); +SELECT create_distributed_table('nn_dist', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Coordinator: count NOT NULL constraint rows +SELECT 'local_n_count' AS label, contype, count(*) +FROM pg_constraint +WHERE conrelid = 'pg18_nn.nn_local'::regclass +GROUP BY contype +ORDER BY contype; + label | contype | count +--------------------------------------------------------------------- + local_n_count | n | 2 +(1 row) + +SELECT 'dist_n_count' AS label, contype, count(*) +FROM pg_constraint +WHERE conrelid = 'pg18_nn.nn_dist'::regclass +GROUP BY contype +ORDER BY contype; + label | contype | count +--------------------------------------------------------------------- + dist_n_count | n | 2 +(1 row) + +-- Our helper view should exclude NOT NULL +SELECT 'table_checks_local_count' AS label, count(*) +FROM public.table_checks +WHERE relid = 'pg18_nn.nn_local'::regclass; + label | count +--------------------------------------------------------------------- + table_checks_local_count | 0 +(1 row) + +SELECT 'table_checks_dist_count' AS label, count(*) +FROM public.table_checks +WHERE relid = 'pg18_nn.nn_dist'::regclass; + label | count +--------------------------------------------------------------------- + table_checks_dist_count | 0 +(1 row) + +-- Add a real CHECK to ensure table_checks still reports real checks +ALTER TABLE nn_dist ADD CONSTRAINT nn_dist_check CHECK (b IS DISTINCT FROM 42); +SELECT 'table_checks_dist_with_real_check' AS label, count(*) +FROM public.table_checks +WHERE relid = 'pg18_nn.nn_dist'::regclass; + label | count +--------------------------------------------------------------------- + table_checks_dist_with_real_check | 1 +(1 row) + +-- === Worker checks === +\c - - - :worker_1_port +SET client_min_messages TO WARNING; +SET search_path TO pg18_nn; +-- Pick one heap shard of nn_dist in our schema +SELECT format('%I.%I', n.nspname, c.relname) AS shard_regclass +FROM pg_class c +JOIN pg_namespace n ON n.oid = c.relnamespace +WHERE n.nspname = 'pg18_nn' + AND c.relname LIKE 'nn_dist_%' + AND c.relkind = 'r' +ORDER BY c.relname +LIMIT 1 +\gset +-- Expect: 2 NOT NULL rows (a,c) + 1 CHECK row on the shard +SELECT 'worker_shard_n_count' AS label, contype, count(*) +FROM pg_constraint +WHERE conrelid = :'shard_regclass'::regclass +GROUP BY contype +ORDER BY contype; + label | contype | count +--------------------------------------------------------------------- + worker_shard_n_count | c | 1 + worker_shard_n_count | n | 2 +(2 rows) + +-- table_checks on shard should hide NOT NULL +SELECT 'table_checks_worker_shard_count' AS label, count(*) +FROM public.table_checks +WHERE relid = :'shard_regclass'::regclass; + label | count +--------------------------------------------------------------------- + table_checks_worker_shard_count | 1 +(1 row) + +-- Drop one NOT NULL on coordinator; verify propagation +\c - - - :master_port +SET search_path TO pg18_nn; +ALTER TABLE nn_dist ALTER COLUMN c DROP NOT NULL; +-- Re-check on worker: NOT NULL count should drop to 1 +\c - - - :worker_1_port +SET search_path TO pg18_nn; +SELECT 'worker_shard_n_after_drop' AS label, contype, count(*) +FROM pg_constraint +WHERE conrelid = :'shard_regclass'::regclass +GROUP BY contype +ORDER BY contype; + label | contype | count +--------------------------------------------------------------------- + worker_shard_n_after_drop | c | 1 + worker_shard_n_after_drop | n | 1 +(2 rows) + +-- And on coordinator +\c - - - :master_port +SET search_path TO pg18_nn; +SELECT 'dist_n_after_drop' AS label, contype, count(*) +FROM pg_constraint +WHERE conrelid = 'pg18_nn.nn_dist'::regclass +GROUP BY contype +ORDER BY contype; + label | contype | count +--------------------------------------------------------------------- + dist_n_after_drop | c | 1 + dist_n_after_drop | n | 1 +(2 rows) + +-- cleanup +RESET client_min_messages; +RESET search_path; +DROP SCHEMA pg18_nn CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table pg18_nn.nn_local +drop cascades to table pg18_nn.nn_dist diff --git a/src/test/regress/expected/pg18_0.out b/src/test/regress/expected/pg18_0.out new file mode 100644 index 000000000..8d8c55727 --- /dev/null +++ b/src/test/regress/expected/pg18_0.out @@ -0,0 +1,14 @@ +-- +-- PG18 +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 18 AS server_version_ge_18 +\gset +-- test invalid statistics +-- behavior is same among PG versions, error message differs +-- relevant PG18 commit: 3eea4dc2c7 +CREATE STATISTICS tst ON a FROM (VALUES (x)) AS foo; +ERROR: only a single relation is allowed in CREATE STATISTICS +\if :server_version_ge_18 +\else +\q diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 0b1d4ce67..98bc01ac5 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -68,6 +68,7 @@ test: pg14 test: pg15 test: pg15_jsonpath detect_conn_close test: pg17 pg17_json +test: pg18 test: drop_column_partitioned_table test: tableam diff --git a/src/test/regress/sql/merge_repartition2.sql b/src/test/regress/sql/merge_repartition2.sql index 354f0605b..6da816bb5 100644 --- a/src/test/regress/sql/merge_repartition2.sql +++ b/src/test/regress/sql/merge_repartition2.sql @@ -126,5 +126,128 @@ WHEN NOT MATCHED THEN SELECT compare_data(); -DROP SCHEMA merge_repartition2_schema CASCADE; +---- https://github.com/citusdata/citus/issues/8180 ---- +CREATE TABLE dist_1 (a int, b int, c int); +CREATE TABLE dist_2 (a int, b int, c int); +CREATE TABLE dist_different_order_1 (b int, a int, c int); + +SELECT create_distributed_table('dist_1', 'a'); +SELECT create_distributed_table('dist_2', 'a'); +SELECT create_distributed_table('dist_different_order_1', 'a'); + +MERGE INTO dist_1 +USING dist_2 +ON (dist_1.a = dist_2.b) +WHEN MATCHED THEN UPDATE SET b = dist_2.b; + +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET b = src.b; + +MERGE INTO dist_different_order_1 +USING dist_1 +ON (dist_different_order_1.a = dist_1.b) +WHEN MATCHED THEN UPDATE SET b = dist_1.b; + +CREATE TABLE dist_1_cast (a int, b int); +CREATE TABLE dist_2_cast (a int, b numeric); + +SELECT create_distributed_table('dist_1_cast', 'a'); +SELECT create_distributed_table('dist_2_cast', 'a'); + +MERGE INTO dist_1_cast +USING dist_2_cast +ON (dist_1_cast.a = dist_2_cast.b) +WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b; + +MERGE INTO dist_1_cast +USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast +ON (dist_1_cast.a = dist_2_cast.b) +WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b; + +-- a more sophisticated example +CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); + +CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); + +SELECT create_distributed_table('dist_source', 'tstamp_col'); +SELECT create_distributed_table('dist_target', 'int_col'); + +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(1001, 2000) i; + +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(901, 1000) i; + +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1501, 2000) i; + +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i-1, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1401, 1500) i; + +INSERT INTO local_source SELECT * FROM dist_source; +INSERT INTO local_target SELECT * FROM dist_target; + +-- execute the query on distributed tables +MERGE INTO dist_target target_alias +USING dist_source source_alias +ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col) +WHEN MATCHED THEN UPDATE SET + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +WHEN NOT MATCHED THEN + INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col ); + +-- execute the same query on local tables, everything is the same except table names behind the aliases +MERGE INTO local_target target_alias +USING local_source source_alias +ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col) +WHEN MATCHED THEN UPDATE SET + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +WHEN NOT MATCHED THEN + INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col ); + +-- compare both targets + +SELECT COUNT(*) = 0 AS targets_match +FROM ( + SELECT * FROM dist_target + EXCEPT + SELECT * FROM local_target + UNION ALL + SELECT * FROM local_target + EXCEPT + SELECT * FROM dist_target +) q; + +SET client_min_messages TO WARNING; +DROP SCHEMA merge_repartition2_schema CASCADE; diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 958791e44..ada5707bb 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -3,6 +3,7 @@ SET citus.next_shard_id TO 750000; SET citus.next_placement_id TO 750000; CREATE SCHEMA multi_modifications; +SET search_path TO multi_modifications; -- some failure messages that comes from the worker nodes -- might change due to parallel executions, so suppress those @@ -36,7 +37,7 @@ CREATE TABLE append_partitioned ( LIKE limit_orders ); SET citus.shard_count TO 2; SELECT create_distributed_table('limit_orders', 'id', 'hash'); -SELECT create_distributed_table('multiple_hash', 'id', 'hash'); +SELECT create_distributed_table('multiple_hash', 'category', 'hash'); SELECT create_distributed_table('range_partitioned', 'id', 'range'); SELECT create_distributed_table('append_partitioned', 'id', 'append'); @@ -245,12 +246,14 @@ INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', -- First: Connect to the second worker node \c - - - :worker_2_port +SET search_path TO multi_modifications; -- Second: Move aside limit_orders shard on the second worker node ALTER TABLE limit_orders_750000 RENAME TO renamed_orders; -- Third: Connect back to master node \c - - - :master_port +SET search_path TO multi_modifications; -- Fourth: Perform an INSERT on the remaining node -- the whole transaction should fail @@ -259,6 +262,7 @@ INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', -- set the shard name back \c - - - :worker_2_port +SET search_path TO multi_modifications; -- Second: Move aside limit_orders shard on the second worker node ALTER TABLE renamed_orders RENAME TO limit_orders_750000; @@ -266,12 +270,15 @@ ALTER TABLE renamed_orders RENAME TO limit_orders_750000; -- Verify the insert failed and both placements are healthy -- or the insert succeeded and placement marked unhealthy \c - - - :worker_1_port +SET search_path TO multi_modifications; SELECT count(*) FROM limit_orders_750000 WHERE id = 276; \c - - - :worker_2_port +SET search_path TO multi_modifications; SELECT count(*) FROM limit_orders_750000 WHERE id = 276; \c - - - :master_port +SET search_path TO multi_modifications; SELECT count(*) FROM limit_orders WHERE id = 276; @@ -286,12 +293,14 @@ AND s.logicalrelid = 'limit_orders'::regclass; -- First: Connect to the first worker node \c - - - :worker_1_port +SET search_path TO multi_modifications; -- Second: Move aside limit_orders shard on the second worker node ALTER TABLE limit_orders_750000 RENAME TO renamed_orders; -- Third: Connect back to master node \c - - - :master_port +SET search_path TO multi_modifications; -- Fourth: Perform an INSERT on the remaining node \set VERBOSITY terse @@ -312,12 +321,14 @@ AND s.logicalrelid = 'limit_orders'::regclass; -- First: Connect to the first worker node \c - - - :worker_1_port +SET search_path TO multi_modifications; -- Second: Move aside limit_orders shard on the second worker node ALTER TABLE renamed_orders RENAME TO limit_orders_750000; -- Third: Connect back to master node \c - - - :master_port +SET search_path TO multi_modifications; -- attempting to change the partition key is unsupported UPDATE limit_orders SET id = 0 WHERE id = 246; @@ -328,6 +339,375 @@ UPDATE limit_orders SET id = 246 WHERE id = 246; UPDATE limit_orders SET id = 246 WHERE id = 246 AND symbol = 'GM'; UPDATE limit_orders SET id = limit_orders.id WHERE id = 246; +CREATE TABLE dist_1 (a int, b int, c int); +CREATE TABLE dist_2 (a int, b int, c int); +CREATE TABLE dist_non_colocated (a int, b int, c int); +CREATE TABLE dist_different_order_1 (b int, a int, c int); + +SELECT create_distributed_table('dist_1', 'a'); +SELECT create_distributed_table('dist_2', 'a'); +SELECT create_distributed_table('dist_non_colocated', 'a', colocate_with=>'none'); +SELECT create_distributed_table('dist_different_order_1', 'a'); + +-- +-- https://github.com/citusdata/citus/issues/8087 +-- + +---- update: should work ---- + +-- setting shard key to itself -- + +UPDATE dist_1 SET a = dist_1.a; +UPDATE dist_1 SET a = dist_1.a WHERE dist_1.a > dist_1.b AND dist_1.b > 10; +UPDATE dist_1 SET a = dist_1.a FROM dist_2 WHERE dist_1.a = dist_2.a; + +-- setting shard key to another var that's implied to be equal to shard key -- + +UPDATE dist_1 SET a = b WHERE a = b; +UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a; +UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.b = dist_2.c AND (dist_2.c > 5 OR dist_2.c < 0); + +with cte as ( +select a, b from dist_1 +) +update dist_1 set a = cte.a from cte where dist_1.a = cte.a; + +with cte as ( +select a as x, b as y from (select a, b from dist_1 limit 100) dt where b > 100 +) +update dist_1 set a = cte.x from cte where dist_1.a = cte.x; + +with cte as ( +select d2.a as x, d1.b as y +from dist_1 d1, dist_different_order_1 d2 +where d1.a=d2.a) +update dist_1 set a = cte.x from cte where y != 0 and dist_1.a = cte.x; + +with cte as ( +select * from (select a as x, b as y from dist_2 limit 100) q +) +update dist_1 set a = cte.x from cte where b = cte.y and cte.y = a and a = cte.x; + +-- supported although the where clause will certainly eval to false +UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a AND dist_1.a = 5 AND dist_2.a = 7; + +-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 -- + +UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a; + +-- test with extra quals +UPDATE dist_1 SET a = dist_different_order_1.a FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a AND dist_1.b = dist_different_order_1.c AND (dist_different_order_1.c > 5 OR dist_different_order_1.c < 0); + +---- update: errors in router planner ---- + +-- different column of the same relation, which is not implied to be equal to shard key -- + +UPDATE dist_1 SET a = dist_1.b; + +-- another range table entry's column with the same attno, which is not implied to be equal to shard key -- + +UPDATE dist_1 SET a = dist_2.a FROM dist_2; +UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a != dist_2.a; +UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a >= dist_2.a; +UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.a OR dist_1.a > dist_2.a; +UPDATE dist_1 SET a = dist_different_order_1.b FROM dist_different_order_1 WHERE dist_1.a = dist_different_order_1.a; + +UPDATE dist_1 SET a = foo.a FROM dist_1 foo; +UPDATE dist_1 SET a = foo.a FROM dist_1 foo WHERE dist_1.a != foo.a; + +-- (*1) Would normally expect this to not throw an error because +-- dist_1.a = dist_2.b AND dist_2.b = dist_2.a, +-- so dist_1.a = dist_2.a, so we should be able to deduce +-- that (dist_1.)a = dist_2.a, but seems predicate_implied_by() +-- is not that smart. +UPDATE dist_1 SET a = dist_2.a FROM dist_2 WHERE dist_1.a = dist_2.b AND dist_2.b = dist_2.a; + +-- and same here +with cte as ( +select * from (select a as x, b as y from dist_different_order_1 limit 100) q +) +update dist_1 set a = cte.x from cte where a = cte.y and cte.y = b and b = cte.x; + +---- update: errors later (in logical or physical planner) ---- + +-- setting shard key to itself -- + +UPDATE dist_1 SET a = dist_1.a FROM dist_1 foo; +UPDATE dist_1 SET a = dist_1.a FROM dist_2 foo; + +-- setting shard key to another var that's implied to be equal to shard key -- + +UPDATE dist_1 SET a = dist_non_colocated.a FROM dist_non_colocated WHERE dist_1.a = dist_non_colocated.a; +UPDATE dist_1 SET a = dist_2.b FROM dist_2 WHERE dist_1.a = dist_2.b; + +---- update: a more sophisticated example ---- +CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); + +CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); + +SELECT create_distributed_table('dist_source', 'int_col'); +SELECT create_distributed_table('dist_target', 'int_col'); + +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(1001, 2000) i; + +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(901, 1000) i; + +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1501, 2000) i; + +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i-1, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1401, 1500) i; + +INSERT INTO local_source SELECT * FROM dist_source; +INSERT INTO local_target SELECT * FROM dist_target; + +-- execute the query on distributed tables +UPDATE dist_target target_alias +SET int_col = source_alias.int_col, + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +FROM dist_source source_alias +WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col; + +-- execute the same query on local tables, everything is the same except table names behind the aliases +UPDATE local_target target_alias +SET int_col = source_alias.int_col, + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +FROM local_source source_alias +WHERE target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col; + +-- compare both targets + +SELECT COUNT(*) = 0 AS targets_match +FROM ( + SELECT * FROM dist_target + EXCEPT + SELECT * FROM local_target + UNION ALL + SELECT * FROM local_target + EXCEPT + SELECT * FROM dist_target +) q; + +---- merge: should work ---- + +-- setting shard key to itself -- + +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.a) +WHEN MATCHED THEN UPDATE SET a = dist_1.a; + +-- We don't care about action quals when deciding if the update +-- could change the shard key, but still add some action quals for +-- testing. See the comments written on top of the line we call +-- TargetEntryChangesValue() in MergeQualAndTargetListFunctionsSupported(). +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.a) +WHEN MATCHED AND dist_1.a > dist_1.b AND dist_1.b > 10 THEN UPDATE SET a = dist_1.a; + +MERGE INTO dist_1 +USING dist_2 src +ON (dist_1.a = src.a) +WHEN MATCHED THEN UPDATE SET a = dist_1.a; + +MERGE INTO dist_1 +USING dist_2 src +ON (dist_1.a = src.a) +WHEN MATCHED THEN UPDATE SET a = src.a; + +-- setting shard key to another var that's implied to be equal to shard key -- + +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.a AND dist_1.a = dist_1.b) +WHEN MATCHED THEN UPDATE SET a = dist_1.b; + +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET a = src.b; + +MERGE INTO dist_1 +USING dist_2 src +ON (dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET a = src.b; + +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.a AND dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET a = src.b; + +MERGE INTO dist_1 +USING dist_2 src +ON (dist_1.a = src.a AND dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET a = src.b; + +-- test with extra quals +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.a AND dist_1.a = src.b AND (dist_1.b > 1000 OR (dist_1.b < 500))) +WHEN MATCHED THEN UPDATE SET a = src.b; + +-- setting shard key to another var that's implied to be equal to shard key, repeat with dist_different_order_1 -- + +MERGE INTO dist_1 +USING dist_different_order_1 src +ON (dist_1.a = src.a AND dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET a = src.b; + +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.a) +WHEN MATCHED THEN UPDATE SET a = src.a; + +---- merge: errors in router planner ---- + +-- different column of the same relation, which is not implied to be equal to shard key -- + +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.a) +WHEN MATCHED THEN UPDATE SET a = dist_1.b; + +-- another range table entry's column with the same attno, which is not implied to be equal to shard key -- + +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET a = src.a; + +-- as in (*1), this is not supported +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.b AND src.b = src.a) +WHEN MATCHED THEN UPDATE SET a = src.a; + +MERGE INTO dist_1 +USING dist_2 src +ON (true) +WHEN MATCHED THEN UPDATE SET a = src.a; + +MERGE INTO dist_1 +USING dist_2 src +ON (dist_1.a <= src.a) +WHEN MATCHED THEN UPDATE SET a = src.a; + +---- merge: a more sophisticated example ---- +DROP TABLE dist_source, dist_target, local_source, local_target; +CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); + +CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); + +SELECT create_distributed_table('dist_source', 'tstamp_col'); +SELECT create_distributed_table('dist_target', 'int_col'); + +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(1001, 2000) i; + +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(901, 1000) i; + +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1501, 2000) i; + +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i-1, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1401, 1500) i; + +INSERT INTO local_source SELECT * FROM dist_source; +INSERT INTO local_target SELECT * FROM dist_target; + +-- execute the query on distributed tables +MERGE INTO dist_target target_alias +USING dist_source source_alias +ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col) +WHEN MATCHED THEN UPDATE SET + int_col = source_alias.int_col, + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +WHEN NOT MATCHED THEN + INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col ); + +-- execute the same query on local tables, everything is the same except table names behind the aliases +MERGE INTO local_target target_alias +USING local_source source_alias +ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col) +WHEN MATCHED THEN UPDATE SET + int_col = source_alias.int_col, + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +WHEN NOT MATCHED THEN + INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col ); + +-- compare both targets + +SELECT COUNT(*) = 0 AS targets_match +FROM ( + SELECT * FROM dist_target + EXCEPT + SELECT * FROM local_target + UNION ALL + SELECT * FROM local_target + EXCEPT + SELECT * FROM dist_target +) q; + -- UPDATEs with a FROM clause are supported even with local tables UPDATE limit_orders SET limit_price = 0.00 FROM bidders WHERE limit_orders.id = 246 AND @@ -914,20 +1294,5 @@ DELETE FROM summary_table WHERE id < ( CREATE TABLE multi_modifications.local (a int default 1, b int); INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table)); -DROP TABLE insufficient_shards; -DROP TABLE raw_table; -DROP TABLE summary_table; -DROP TABLE reference_raw_table; -DROP TABLE reference_summary_table; -DROP TABLE limit_orders; -DROP TABLE multiple_hash; -DROP TABLE range_partitioned; -DROP TABLE append_partitioned; -DROP TABLE bidders; - -DROP FUNCTION stable_append; -DROP FUNCTION immutable_append; -DROP FUNCTION temp_strict_func; -DROP TYPE order_side; - +SET client_min_messages TO WARNING; DROP SCHEMA multi_modifications CASCADE; diff --git a/src/test/regress/sql/multi_sql_function.sql b/src/test/regress/sql/multi_sql_function.sql index 1ef0ef40a..e1438a893 100644 --- a/src/test/regress/sql/multi_sql_function.sql +++ b/src/test/regress/sql/multi_sql_function.sql @@ -147,6 +147,9 @@ SELECT * FROM test_parameterized_sql_function(1); SELECT (SELECT 1 FROM test_parameterized_sql limit 1) FROM test_parameterized_sql_function(1); +-- this fails pg <18 +-- succeeds in pg18 because of pg18 commit 0dca5d68d +-- where sql functions behave as PL/pgSQL functions and use the cache SELECT test_parameterized_sql_function_in_subquery_where(1); -- postgres behaves slightly differently for the following diff --git a/src/test/regress/sql/multi_test_catalog_views.sql b/src/test/regress/sql/multi_test_catalog_views.sql index bb1442edf..249b2d274 100644 --- a/src/test/regress/sql/multi_test_catalog_views.sql +++ b/src/test/regress/sql/multi_test_catalog_views.sql @@ -71,15 +71,19 @@ SELECT "name" AS "Column", "relid" FROM table_attrs; -CREATE VIEW table_checks AS -SELECT cc.constraint_name AS "Constraint", - ('CHECK ' || regexp_replace(check_clause, '^\((.*)\)$', '\1')) AS "Definition", - format('%I.%I', ccu.table_schema, ccu.table_name)::regclass::oid AS relid -FROM information_schema.check_constraints cc, - information_schema.constraint_column_usage ccu -WHERE cc.constraint_schema = ccu.constraint_schema AND - cc.constraint_name = ccu.constraint_name -ORDER BY cc.constraint_name ASC; +CREATE OR REPLACE VIEW table_checks AS +SELECT + c.conname AS "Constraint", + 'CHECK ' || + -- drop a single pair of outer parens if the deparser adds them + regexp_replace(pg_get_expr(c.conbin, c.conrelid, true), '^\((.*)\)$', '\1') + AS "Definition", + c.conrelid AS relid +FROM pg_catalog.pg_constraint AS c +WHERE c.contype <> 'n' -- drop NOT NULL + AND c.conbin IS NOT NULL -- only things with an expression (i.e., CHECKs) + AND c.conrelid <> 0 -- table-level (exclude domain checks) +ORDER BY "Constraint", "Definition"; CREATE VIEW index_attrs AS WITH indexoid AS ( diff --git a/src/test/regress/sql/pg18.sql b/src/test/regress/sql/pg18.sql new file mode 100644 index 000000000..94c0ad997 --- /dev/null +++ b/src/test/regress/sql/pg18.sql @@ -0,0 +1,134 @@ +-- +-- PG18 +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 18 AS server_version_ge_18 +\gset + +-- test invalid statistics +-- behavior is same among PG versions, error message differs +-- relevant PG18 commit: 3eea4dc2c7 +CREATE STATISTICS tst ON a FROM (VALUES (x)) AS foo; + +\if :server_version_ge_18 +\else +\q +\endif + +-- PG18-specific tests go here. +-- + +-- Purpose: Verify PG18 behavior that NOT NULL constraints are materialized +-- as pg_constraint rows with contype = 'n' on both coordinator and +-- worker shards. Also confirm our helper view (table_checks) does +-- NOT surface NOT NULL entries. +-- https://github.com/postgres/postgres/commit/14e87ffa5c543b5f30ead7413084c25f7735039f + +CREATE SCHEMA pg18_nn; +SET search_path TO pg18_nn; + +-- Local control table +DROP TABLE IF EXISTS nn_local CASCADE; +CREATE TABLE nn_local( + a int NOT NULL, + b int, + c text NOT NULL +); + +-- Distributed table +DROP TABLE IF EXISTS nn_dist CASCADE; +CREATE TABLE nn_dist( + a int NOT NULL, + b int, + c text NOT NULL +); + +SELECT create_distributed_table('nn_dist', 'a'); + +-- Coordinator: count NOT NULL constraint rows +SELECT 'local_n_count' AS label, contype, count(*) +FROM pg_constraint +WHERE conrelid = 'pg18_nn.nn_local'::regclass +GROUP BY contype +ORDER BY contype; + +SELECT 'dist_n_count' AS label, contype, count(*) +FROM pg_constraint +WHERE conrelid = 'pg18_nn.nn_dist'::regclass +GROUP BY contype +ORDER BY contype; + +-- Our helper view should exclude NOT NULL +SELECT 'table_checks_local_count' AS label, count(*) +FROM public.table_checks +WHERE relid = 'pg18_nn.nn_local'::regclass; + +SELECT 'table_checks_dist_count' AS label, count(*) +FROM public.table_checks +WHERE relid = 'pg18_nn.nn_dist'::regclass; + +-- Add a real CHECK to ensure table_checks still reports real checks +ALTER TABLE nn_dist ADD CONSTRAINT nn_dist_check CHECK (b IS DISTINCT FROM 42); + +SELECT 'table_checks_dist_with_real_check' AS label, count(*) +FROM public.table_checks +WHERE relid = 'pg18_nn.nn_dist'::regclass; + +-- === Worker checks === +\c - - - :worker_1_port +SET client_min_messages TO WARNING; +SET search_path TO pg18_nn; + +-- Pick one heap shard of nn_dist in our schema +SELECT format('%I.%I', n.nspname, c.relname) AS shard_regclass +FROM pg_class c +JOIN pg_namespace n ON n.oid = c.relnamespace +WHERE n.nspname = 'pg18_nn' + AND c.relname LIKE 'nn_dist_%' + AND c.relkind = 'r' +ORDER BY c.relname +LIMIT 1 +\gset + +-- Expect: 2 NOT NULL rows (a,c) + 1 CHECK row on the shard +SELECT 'worker_shard_n_count' AS label, contype, count(*) +FROM pg_constraint +WHERE conrelid = :'shard_regclass'::regclass +GROUP BY contype +ORDER BY contype; + +-- table_checks on shard should hide NOT NULL +SELECT 'table_checks_worker_shard_count' AS label, count(*) +FROM public.table_checks +WHERE relid = :'shard_regclass'::regclass; + +-- Drop one NOT NULL on coordinator; verify propagation +\c - - - :master_port +SET search_path TO pg18_nn; + +ALTER TABLE nn_dist ALTER COLUMN c DROP NOT NULL; + +-- Re-check on worker: NOT NULL count should drop to 1 +\c - - - :worker_1_port +SET search_path TO pg18_nn; + +SELECT 'worker_shard_n_after_drop' AS label, contype, count(*) +FROM pg_constraint +WHERE conrelid = :'shard_regclass'::regclass +GROUP BY contype +ORDER BY contype; + +-- And on coordinator +\c - - - :master_port +SET search_path TO pg18_nn; + +SELECT 'dist_n_after_drop' AS label, contype, count(*) +FROM pg_constraint +WHERE conrelid = 'pg18_nn.nn_dist'::regclass +GROUP BY contype +ORDER BY contype; + +-- cleanup +RESET client_min_messages; +RESET search_path; +DROP SCHEMA pg18_nn CASCADE;