diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 262258d7f..095c7e2f0 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -77,7 +77,7 @@ int PlannerLevel = 0; static bool ListContainsDistributedTableRTE(List *rangeTableList, bool *maybeHasForeignDistributedTable); -static bool IsUpdateOrDelete(Query *query); +static bool IsUpdateOrDeleteOrMerge(Query *query); static PlannedStmt * CreateDistributedPlannedStmt( DistributedPlanningContext *planContext); static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, @@ -631,7 +631,7 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan) * IsUpdateOrDelete returns true if the query performs an update or delete. */ bool -IsUpdateOrDelete(Query *query) +IsUpdateOrDeleteOrMerge(Query *query) { return query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE || @@ -809,7 +809,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) * if it is planned as a multi shard modify query. */ if ((distributedPlan->planningError || - (IsUpdateOrDelete(planContext->originalQuery) && IsMultiTaskPlan( + (IsUpdateOrDeleteOrMerge(planContext->originalQuery) && IsMultiTaskPlan( distributedPlan))) && hasUnresolvedParams) { diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index a04d982f6..8f9847de2 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -27,6 +27,9 @@ #if PG_VERSION_NUM >= PG_VERSION_15 +static bool CheckIfRTETypeIsUnsupported(Query *parse, + RangeTblEntry *rangeTableEntry, + DeferredErrorMessage **returnMessage); static DeferredErrorMessage * ErrorIfTablesNotColocatedAndJoinedOnDistColumn(Query *parse, List * distTablesList, @@ -231,6 +234,78 @@ ErrorIfTablesNotColocatedAndJoinedOnDistColumn(Query *parse, List *distTablesLis } +/* + * ErrorIfRTETypeIsUnsupported Checks for types of tables that are not supported, such + * as, reference tables, append-distributed tables and materialized view as target relation. + * Routine returns true for the supported types, and false for everything else, only for + * unsupported types it fills the appropriate error message in the parameter passed. + */ +static bool +CheckIfRTETypeIsUnsupported(Query *parse, + RangeTblEntry *rangeTableEntry, + DeferredErrorMessage **returnMessage) +{ + /* skip the regular views as they are replaced with subqueries */ + if (rangeTableEntry->relkind == RELKIND_VIEW) + { + return true; + } + + if (rangeTableEntry->relkind == RELKIND_MATVIEW || + rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE) + { + /* Materialized view or Foreign table as target is not allowed */ + if (IsMergeAllowedOnRelation(parse, rangeTableEntry)) + { + /* Non target relation is ok */ + return true; + } + else + { + /* Usually we don't reach this exception as the Postgres parser catches it */ + StringInfo errorMessage = makeStringInfo(); + appendStringInfo(errorMessage, "MERGE command is not allowed on " + "relation type(relkind:%c)", rangeTableEntry->relkind); + *returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + errorMessage->data, NULL, NULL); + } + } + + if (rangeTableEntry->relkind != RELKIND_RELATION && + rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE) + { + StringInfo errorMessage = makeStringInfo(); + appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) " + "in MERGE command", rangeTableEntry->relkind); + *returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + errorMessage->data, NULL, NULL); + } + + Assert(rangeTableEntry->relid != 0); + + /* Reference tables are not supported yet */ + if (IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE)) + { + *returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command is not supported on reference " + "tables yet", NULL, NULL); + } + + /* Append/Range tables are not supported */ + if (IsCitusTableType(rangeTableEntry->relid, APPEND_DISTRIBUTED) || + IsCitusTableType(rangeTableEntry->relid, RANGE_DISTRIBUTED)) + { + *returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "For MERGE command, all the distributed tables " + "must be colocated, for append/range distribution, " + "colocation is not supported", NULL, + "Consider using hash distribution instead"); + } + + return false; +} + + /* * ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE * present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus @@ -294,63 +369,17 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, } /* RTE Relation can be of various types, check them now */ - - /* skip the regular views as they are replaced with subqueries */ - if (rangeTableEntry->relkind == RELKIND_VIEW) + DeferredErrorMessage *errorMessage = NULL; + if (CheckIfRTETypeIsUnsupported(parse, rangeTableEntry, &errorMessage)) { continue; } - - if (rangeTableEntry->relkind == RELKIND_MATVIEW || - rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE) + else { - /* Materialized view or Foreign table as target is not allowed */ - if (IsMergeAllowedOnRelation(parse, rangeTableEntry)) + if (errorMessage) { - /* Non target relation is ok */ - continue; + return errorMessage; } - else - { - /* Usually we don't reach this exception as the Postgres parser catches it */ - StringInfo errorMessage = makeStringInfo(); - appendStringInfo(errorMessage, - "MERGE command is not allowed on " - "relation type(relkind:%c)", rangeTableEntry->relkind); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, - NULL, NULL); - } - } - - if (rangeTableEntry->relkind != RELKIND_RELATION && - rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE) - { - StringInfo errorMessage = makeStringInfo(); - appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) " - "in MERGE command", rangeTableEntry->relkind); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, - NULL, NULL); - } - - Assert(rangeTableEntry->relid != 0); - - /* Reference tables are not supported yet */ - if (IsCitusTableType(relationId, REFERENCE_TABLE)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE command is not supported on reference " - "tables yet", NULL, NULL); - } - - /* Append/Range tables are not supported */ - if (IsCitusTableType(relationId, APPEND_DISTRIBUTED) || - IsCitusTableType(relationId, RANGE_DISTRIBUTED)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "For MERGE command, all the distributed tables " - "must be colocated, for append/range distribution, " - "colocation is not supported", NULL, - "Consider using hash distribution instead"); } /* @@ -389,8 +418,8 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, } /* Ensure all distributed tables are indeed co-located and joined on distribution column */ - return ErrorIfTablesNotColocatedAndJoinedOnDistColumn(parse, distTablesList, - restrictionContext); + return ErrorIfTablesNotColocatedAndJoinedOnDistColumn(parse, + distTablesList, restrictionContext); } @@ -461,7 +490,8 @@ IsPartitionColumnInMergeSource(Expr *columnExpression, Query *query, bool skipOu * value into the target which is not from the source table, if so, it * raises an exception. * Note: Inserting random values other than the joined column values will - * result in unexpected behaviour of rows ending up in incorrect shards. + * result in unexpected behaviour of rows ending up in incorrect shards, to + * prevent such mishaps, we disallow such inserts here. */ static DeferredErrorMessage * InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte) @@ -475,7 +505,7 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte) MergeAction *action = NULL; foreach_ptr(action, query->mergeActionList) { - /* Skip MATCHED clauses */ + /* Skip MATCHED clause as INSERTS are not allowed in it*/ if (action->matched) { continue; @@ -501,11 +531,6 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte) TargetEntry *targetEntry = NULL; foreach_ptr(targetEntry, action->targetList) { - if (targetEntry->resjunk) - { - continue; - } - AttrNumber originalAttrNo = targetEntry->resno; /* skip processing of target table non-partition columns */ @@ -516,7 +541,7 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte) foundDistributionColumn = true; - if (targetEntry->expr->type == T_Var) + if (IsA(targetEntry->expr, Var)) { if (IsPartitionColumnInMergeSource(targetEntry->expr, query, true)) { diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 901e9de17..7d7fac23f 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -4573,7 +4573,8 @@ RowModifyLevelForQuery(Query *query) } if (commandType == CMD_UPDATE || - commandType == CMD_DELETE) + commandType == CMD_DELETE || + commandType == CMD_MERGE) { return ROW_MODIFY_NONCOMMUTATIVE; } diff --git a/src/test/regress/expected/isolation_merge.out b/src/test/regress/expected/isolation_merge.out new file mode 100644 index 000000000..477580aaa --- /dev/null +++ b/src/test/regress/expected/isolation_merge.out @@ -0,0 +1,44 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s2-begin s1-insert s2-delete s1-commit s2-commit s2-result +step s1-begin: BEGIN; +step s2-begin: BEGIN; +step s1-insert: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1 + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); + +step s2-delete: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED AND prept.t2 = 0 THEN DELETE + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1; + +step s1-commit: COMMIT; +step s2-delete: <... completed> +step s2-commit: COMMIT; +step s2-result: SELECT * FROM prept; + t1|t2 +--------------------------------------------------------------------- +100| 2 +(1 row) + + +starting permutation: s2-begin s2-delete s1-begin s1-insert s2-commit s1-commit s2-result +step s2-begin: BEGIN; +step s2-delete: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED AND prept.t2 = 0 THEN DELETE + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1; + +step s1-begin: BEGIN; +step s1-insert: MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1 + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); + +step s2-commit: COMMIT; +step s1-insert: <... completed> +step s1-commit: COMMIT; +step s2-result: SELECT * FROM prept; + t1|t2 +--------------------------------------------------------------------- +200| 0 +100| 0 +(2 rows) + diff --git a/src/test/regress/expected/isolation_merge_0.out b/src/test/regress/expected/isolation_merge_0.out new file mode 100644 index 000000000..61d7f2bab --- /dev/null +++ b/src/test/regress/expected/isolation_merge_0.out @@ -0,0 +1,5 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s2-begin s1-insert s2-delete s1-commit s2-commit s2-result +setup failed: ERROR: MERGE is not supported on PG version: 14 +CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE diff --git a/src/test/regress/expected/isolation_merge_1.out b/src/test/regress/expected/isolation_merge_1.out new file mode 100644 index 000000000..69fcfaa53 --- /dev/null +++ b/src/test/regress/expected/isolation_merge_1.out @@ -0,0 +1,5 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s2-begin s1-insert s2-delete s1-commit s2-commit s2-result +setup failed: ERROR: MERGE is not supported on PG version: 13 +CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE diff --git a/src/test/regress/expected/isolation_merge_update.out b/src/test/regress/expected/isolation_merge_update.out new file mode 100644 index 000000000..c31d2844b --- /dev/null +++ b/src/test/regress/expected/isolation_merge_update.out @@ -0,0 +1,314 @@ +Parsed test spec with 2 sessions + +starting permutation: merge1 c1 select2 c2 +step merge1: + MERGE INTO target t + USING (SELECT skey as key, 'merge1' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step c1: COMMIT; +step select2: SELECT key, val FROM target ORDER BY 1,2; +key|val +--------------------------------------------------------------------- + 2|setup1 updated by merge1 +(1 row) + +step c2: COMMIT; + +starting permutation: merge1 c1 merge2a select2 c2 +step merge1: + MERGE INTO target t + USING (SELECT skey as key, 'merge1' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step c1: COMMIT; +step merge2a: + MERGE INTO target t + USING (SELECT skey as key, 'merge2a' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step select2: SELECT key, val FROM target ORDER BY 1,2; +key|val +--------------------------------------------------------------------- + 2|setup1 updated by merge1 + 1|merge2a +(2 rows) + +step c2: COMMIT; + +starting permutation: merge1 merge2a c1 select2 c2 +step merge1: + MERGE INTO target t + USING (SELECT skey as key, 'merge1' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step merge2a: + MERGE INTO target t + USING (SELECT skey as key, 'merge2a' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step c1: COMMIT; +step merge2a: <... completed> +step select2: SELECT key, val FROM target ORDER BY 1,2; +key|val +--------------------------------------------------------------------- + 2|setup1 updated by merge1 + 1|merge2a +(2 rows) + +step c2: COMMIT; + +starting permutation: merge1 merge2a a1 select2 c2 +step merge1: + MERGE INTO target t + USING (SELECT skey as key, 'merge1' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step merge2a: + MERGE INTO target t + USING (SELECT skey as key, 'merge2a' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step a1: ABORT; +step merge2a: <... completed> +step select2: SELECT key, val FROM target ORDER BY 1,2; +key|val +--------------------------------------------------------------------- + 2|setup1 updated by merge2a +(1 row) + +step c2: COMMIT; + +starting permutation: merge1 merge2b c1 select2 c2 +step merge1: + MERGE INTO target t + USING (SELECT skey as key, 'merge1' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step merge2b: + MERGE INTO target t + USING (SELECT skey as key, 'merge2b' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED AND t.key < 2 THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step c1: COMMIT; +step merge2b: <... completed> +step select2: SELECT key, val FROM target ORDER BY 1,2; +key|val +--------------------------------------------------------------------- + 2|setup1 updated by merge1 + 1|merge2b +(2 rows) + +step c2: COMMIT; + +starting permutation: merge1 merge2c c1 select2 c2 +step merge1: + MERGE INTO target t + USING (SELECT skey as key, 'merge1' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step merge2c: + MERGE INTO target t + USING (SELECT skey as key, 'merge2c' as val FROM source) s + ON s.key = t.skey AND t.key < 2 + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step c1: COMMIT; +step merge2c: <... completed> +step select2: SELECT key, val FROM target ORDER BY 1,2; +key|val +--------------------------------------------------------------------- + 2|setup1 updated by merge1 + 1|merge2c +(2 rows) + +step c2: COMMIT; + +starting permutation: pa_merge1 pa_merge2a c1 pa_select2 c2 +step pa_merge1: + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge1' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED THEN + UPDATE set val = t.val || ' updated by ' || s.val; + +step pa_merge2a: + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge2a' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step c1: COMMIT; +step pa_merge2a: <... completed> +step pa_select2: SELECT key, val FROM pa_target ORDER BY 1,2; +key|val +--------------------------------------------------------------------- + 2|initial + 2|initial updated by pa_merge1 updated by pa_merge2a +(2 rows) + +step c2: COMMIT; + +starting permutation: pa_merge2 pa_merge2a c1 pa_select2 c2 +step pa_merge2: + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge2' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step pa_merge2a: + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge2a' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step c1: COMMIT; +step pa_merge2a: <... completed> +ERROR: tuple to be locked was already moved to another partition due to concurrent update +step pa_select2: SELECT key, val FROM pa_target ORDER BY 1,2; +ERROR: current transaction is aborted, commands ignored until end of transaction block +step c2: COMMIT; + +starting permutation: pa_merge2 c1 pa_merge2a pa_select2 c2 +step pa_merge2: + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge2' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step c1: COMMIT; +step pa_merge2a: + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge2a' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step pa_select2: SELECT key, val FROM pa_target ORDER BY 1,2; +key|val +--------------------------------------------------------------------- + 1|pa_merge2a + 2|initial + 2|initial updated by pa_merge2 +(3 rows) + +step c2: COMMIT; + +starting permutation: pa_merge3 pa_merge2b_when c1 pa_select2 c2 +step pa_merge3: + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge2' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED THEN + UPDATE set val = 'prefix ' || t.val; + +step pa_merge2b_when: + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge2b_when' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED AND t.val like 'initial%' THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step c1: COMMIT; +step pa_merge2b_when: <... completed> +step pa_select2: SELECT key, val FROM pa_target ORDER BY 1,2; +key|val +--------------------------------------------------------------------- + 1|prefix initial + 2|initial +(2 rows) + +step c2: COMMIT; + +starting permutation: pa_merge1 pa_merge2b_when c1 pa_select2 c2 +step pa_merge1: + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge1' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED THEN + UPDATE set val = t.val || ' updated by ' || s.val; + +step pa_merge2b_when: + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge2b_when' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED AND t.val like 'initial%' THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; + +step c1: COMMIT; +step pa_merge2b_when: <... completed> +step pa_select2: SELECT key, val FROM pa_target ORDER BY 1,2; +key|val +--------------------------------------------------------------------- + 2|initial + 2|initial updated by pa_merge1 updated by pa_merge2b_when +(2 rows) + +step c2: COMMIT; diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 90d1463ad..23dbc3c7a 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -108,3 +108,6 @@ test: isolation_acquire_distributed_locks test: isolation_concurrent_move_create_table test: isolation_check_mx + +# MERGE +test: isolation_merge diff --git a/src/test/regress/spec/isolation_merge.spec b/src/test/regress/spec/isolation_merge.spec new file mode 100644 index 000000000..3cd32ac21 --- /dev/null +++ b/src/test/regress/spec/isolation_merge.spec @@ -0,0 +1,57 @@ +// +// How we organize this isolation test spec, is explained at README.md file in this directory. +// + +// create distributed tables to test behavior of MERGE in concurrent operations +setup +{ + DO + $do$ + DECLARE ver int; + BEGIN + SELECT substring(version(), '\d+')::int into ver; + IF (ver < 15) + THEN + RAISE EXCEPTION 'MERGE is not supported on PG version: %', ver; + END IF; + END + $do$; + + SET citus.shard_replication_factor TO 1; + CREATE TABLE prept(t1 int, t2 int); + CREATE TABLE preps(s1 int, s2 int); + SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps', 's1'); + INSERT INTO prept VALUES(100, 0); + INSERT INTO preps VALUES(100, 0); + INSERT INTO preps VALUES(200, 0); +} + +// drop distributed tables +teardown +{ + DROP TABLE IF EXISTS prept CASCADE; + DROP TABLE IF EXISTS preps CASCADE; +} + +// session 1 +session "s1" +step "s1-begin" { BEGIN; } +step "s1-insert" { MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1 + WHEN NOT MATCHED THEN INSERT VALUES(s1, s2); + } +step "s1-commit" { COMMIT; } + +// session 2 +session "s2" +step "s2-begin" { BEGIN; } +step "s2-delete" { MERGE INTO prept USING preps ON prept.t1 = preps.s1 + WHEN MATCHED AND prept.t2 = 0 THEN DELETE + WHEN MATCHED THEN UPDATE SET t2 = t2 + 1; + } +step "s2-commit" { COMMIT; } +step "s2-result" { SELECT * FROM prept; } + +// permutations - MERGE vs MERGE +permutation "s1-begin" "s2-begin" "s1-insert" "s2-delete" "s1-commit" "s2-commit" "s2-result" +permutation "s2-begin" "s2-delete" "s1-begin" "s1-insert" "s2-commit" "s1-commit" "s2-result" diff --git a/src/test/regress/spec/isolation_merge_update.spec b/src/test/regress/spec/isolation_merge_update.spec new file mode 100644 index 000000000..17d310034 --- /dev/null +++ b/src/test/regress/spec/isolation_merge_update.spec @@ -0,0 +1,164 @@ +// MERGE UPDATE +// +// This test exercises atypical cases +// 1. UPDATEs of PKs that change the join in the ON clause +// 2. UPDATEs with WHEN conditions that would fail after concurrent update +// 3. UPDATEs with extra ON conditions that would fail after concurrent update + +setup +{ + CREATE TABLE target (skey int, key int, val text); + CREATE TABLE source (skey int primary key, val text); + INSERT INTO target VALUES (1, 1, 'setup1'); + INSERT INTO source VALUES (1, 'setup1'); + + CREATE TABLE pa_target (skey integer, key integer, val text) + PARTITION BY LIST (key); + + SELECT create_distributed_table('target', 'skey'); + SELECT create_distributed_table('source', 'skey'); + + CREATE TABLE part1 (skey integer, key integer, val text); + CREATE TABLE part2 (val text, skey integer, key integer); + CREATE TABLE part3 (skey integer, key integer, val text); + + ALTER TABLE pa_target ATTACH PARTITION part1 FOR VALUES IN (1,4); + ALTER TABLE pa_target ATTACH PARTITION part2 FOR VALUES IN (2,5,6); + ALTER TABLE pa_target ATTACH PARTITION part3 DEFAULT; + + SELECT create_distributed_table('pa_target', 'skey'); + INSERT INTO pa_target VALUES (1, 1, 'initial'); + INSERT INTO pa_target VALUES (2, 2, 'initial'); +} + +teardown +{ + DROP TABLE target; + DROP TABLE source; + DROP TABLE pa_target CASCADE; +} + +session "s1" +setup +{ + BEGIN ISOLATION LEVEL READ COMMITTED; +} +step "merge1" +{ + MERGE INTO target t + USING (SELECT skey as key, 'merge1' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; +} +step "pa_merge1" +{ + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge1' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED THEN + UPDATE set val = t.val || ' updated by ' || s.val; +} +step "pa_merge2" +{ + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge2' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; +} +step "pa_merge3" +{ + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge2' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED THEN + UPDATE set val = 'prefix ' || t.val; +} +step "c1" { COMMIT; } +step "a1" { ABORT; } + +session "s2" +setup +{ + BEGIN ISOLATION LEVEL READ COMMITTED; +} +step "merge2a" +{ + MERGE INTO target t + USING (SELECT skey as key, 'merge2a' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED AND t.key = 1 THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; +} +step "merge2b" +{ + MERGE INTO target t + USING (SELECT skey as key, 'merge2b' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED AND t.key < 2 THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; +} +step "merge2c" +{ + MERGE INTO target t + USING (SELECT skey as key, 'merge2c' as val FROM source) s + ON s.key = t.skey AND t.key < 2 + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, 1, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; +} +step "pa_merge2a" +{ + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge2a' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; +} +// MERGE proceeds only if 'val' unchanged +step "pa_merge2b_when" +{ + MERGE INTO pa_target t + USING (SELECT skey as key, 'pa_merge2b_when' as val FROM source) s + ON s.key = t.skey + WHEN NOT MATCHED THEN + INSERT VALUES (s.key, s.key, s.val) + WHEN MATCHED AND t.val like 'initial%' THEN + UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val; +} +step "select2" { SELECT key, val FROM target ORDER BY 1,2; } +step "pa_select2" { SELECT key, val FROM pa_target ORDER BY 1,2; } +step "c2" { COMMIT; } + +// Basic effects +permutation "merge1" "c1" "select2" "c2" + +// One after the other, no concurrency +permutation "merge1" "c1" "merge2a" "select2" "c2" + +// Now with concurrency +permutation "merge1" "merge2a" "c1" "select2" "c2" +permutation "merge1" "merge2a" "a1" "select2" "c2" +permutation "merge1" "merge2b" "c1" "select2" "c2" +permutation "merge1" "merge2c" "c1" "select2" "c2" +permutation "pa_merge1" "pa_merge2a" "c1" "pa_select2" "c2" +permutation "pa_merge2" "pa_merge2a" "c1" "pa_select2" "c2" // fails +permutation "pa_merge2" "c1" "pa_merge2a" "pa_select2" "c2" // succeeds +permutation "pa_merge3" "pa_merge2b_when" "c1" "pa_select2" "c2" // WHEN not satisfied by updated tuple +permutation "pa_merge1" "pa_merge2b_when" "c1" "pa_select2" "c2" // WHEN satisfied by updated tuple