mirror of https://github.com/citusdata/citus.git
Add appropriate lock mode for MERGE SQL
parent
5ff880cc6c
commit
d55c4f7521
|
@ -77,7 +77,7 @@ int PlannerLevel = 0;
|
|||
|
||||
static bool ListContainsDistributedTableRTE(List *rangeTableList,
|
||||
bool *maybeHasForeignDistributedTable);
|
||||
static bool IsUpdateOrDelete(Query *query);
|
||||
static bool IsUpdateOrDeleteOrMerge(Query *query);
|
||||
static PlannedStmt * CreateDistributedPlannedStmt(
|
||||
DistributedPlanningContext *planContext);
|
||||
static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
|
||||
|
@ -631,7 +631,7 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan)
|
|||
* IsUpdateOrDelete returns true if the query performs an update or delete.
|
||||
*/
|
||||
bool
|
||||
IsUpdateOrDelete(Query *query)
|
||||
IsUpdateOrDeleteOrMerge(Query *query)
|
||||
{
|
||||
return query->commandType == CMD_UPDATE ||
|
||||
query->commandType == CMD_DELETE ||
|
||||
|
@ -809,7 +809,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
|
|||
* if it is planned as a multi shard modify query.
|
||||
*/
|
||||
if ((distributedPlan->planningError ||
|
||||
(IsUpdateOrDelete(planContext->originalQuery) && IsMultiTaskPlan(
|
||||
(IsUpdateOrDeleteOrMerge(planContext->originalQuery) && IsMultiTaskPlan(
|
||||
distributedPlan))) &&
|
||||
hasUnresolvedParams)
|
||||
{
|
||||
|
|
|
@ -27,6 +27,9 @@
|
|||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||
|
||||
static bool CheckIfRTETypeIsUnsupported(Query *parse,
|
||||
RangeTblEntry *rangeTableEntry,
|
||||
DeferredErrorMessage **returnMessage);
|
||||
static DeferredErrorMessage * ErrorIfTablesNotColocatedAndJoinedOnDistColumn(Query *parse,
|
||||
List *
|
||||
distTablesList,
|
||||
|
@ -231,6 +234,78 @@ ErrorIfTablesNotColocatedAndJoinedOnDistColumn(Query *parse, List *distTablesLis
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfRTETypeIsUnsupported Checks for types of tables that are not supported, such
|
||||
* as, reference tables, append-distributed tables and materialized view as target relation.
|
||||
* Routine returns true for the supported types, and false for everything else, only for
|
||||
* unsupported types it fills the appropriate error message in the parameter passed.
|
||||
*/
|
||||
static bool
|
||||
CheckIfRTETypeIsUnsupported(Query *parse,
|
||||
RangeTblEntry *rangeTableEntry,
|
||||
DeferredErrorMessage **returnMessage)
|
||||
{
|
||||
/* skip the regular views as they are replaced with subqueries */
|
||||
if (rangeTableEntry->relkind == RELKIND_VIEW)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (rangeTableEntry->relkind == RELKIND_MATVIEW ||
|
||||
rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
/* Materialized view or Foreign table as target is not allowed */
|
||||
if (IsMergeAllowedOnRelation(parse, rangeTableEntry))
|
||||
{
|
||||
/* Non target relation is ok */
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Usually we don't reach this exception as the Postgres parser catches it */
|
||||
StringInfo errorMessage = makeStringInfo();
|
||||
appendStringInfo(errorMessage, "MERGE command is not allowed on "
|
||||
"relation type(relkind:%c)", rangeTableEntry->relkind);
|
||||
*returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
errorMessage->data, NULL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
if (rangeTableEntry->relkind != RELKIND_RELATION &&
|
||||
rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE)
|
||||
{
|
||||
StringInfo errorMessage = makeStringInfo();
|
||||
appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) "
|
||||
"in MERGE command", rangeTableEntry->relkind);
|
||||
*returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
errorMessage->data, NULL, NULL);
|
||||
}
|
||||
|
||||
Assert(rangeTableEntry->relid != 0);
|
||||
|
||||
/* Reference tables are not supported yet */
|
||||
if (IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE))
|
||||
{
|
||||
*returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"MERGE command is not supported on reference "
|
||||
"tables yet", NULL, NULL);
|
||||
}
|
||||
|
||||
/* Append/Range tables are not supported */
|
||||
if (IsCitusTableType(rangeTableEntry->relid, APPEND_DISTRIBUTED) ||
|
||||
IsCitusTableType(rangeTableEntry->relid, RANGE_DISTRIBUTED))
|
||||
{
|
||||
*returnMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"For MERGE command, all the distributed tables "
|
||||
"must be colocated, for append/range distribution, "
|
||||
"colocation is not supported", NULL,
|
||||
"Consider using hash distribution instead");
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE
|
||||
* present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus
|
||||
|
@ -294,63 +369,17 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList,
|
|||
}
|
||||
|
||||
/* RTE Relation can be of various types, check them now */
|
||||
|
||||
/* skip the regular views as they are replaced with subqueries */
|
||||
if (rangeTableEntry->relkind == RELKIND_VIEW)
|
||||
DeferredErrorMessage *errorMessage = NULL;
|
||||
if (CheckIfRTETypeIsUnsupported(parse, rangeTableEntry, &errorMessage))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (rangeTableEntry->relkind == RELKIND_MATVIEW ||
|
||||
rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
/* Materialized view or Foreign table as target is not allowed */
|
||||
if (IsMergeAllowedOnRelation(parse, rangeTableEntry))
|
||||
{
|
||||
/* Non target relation is ok */
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Usually we don't reach this exception as the Postgres parser catches it */
|
||||
StringInfo errorMessage = makeStringInfo();
|
||||
appendStringInfo(errorMessage,
|
||||
"MERGE command is not allowed on "
|
||||
"relation type(relkind:%c)", rangeTableEntry->relkind);
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data,
|
||||
NULL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
if (rangeTableEntry->relkind != RELKIND_RELATION &&
|
||||
rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE)
|
||||
if (errorMessage)
|
||||
{
|
||||
StringInfo errorMessage = makeStringInfo();
|
||||
appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) "
|
||||
"in MERGE command", rangeTableEntry->relkind);
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data,
|
||||
NULL, NULL);
|
||||
return errorMessage;
|
||||
}
|
||||
|
||||
Assert(rangeTableEntry->relid != 0);
|
||||
|
||||
/* Reference tables are not supported yet */
|
||||
if (IsCitusTableType(relationId, REFERENCE_TABLE))
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"MERGE command is not supported on reference "
|
||||
"tables yet", NULL, NULL);
|
||||
}
|
||||
|
||||
/* Append/Range tables are not supported */
|
||||
if (IsCitusTableType(relationId, APPEND_DISTRIBUTED) ||
|
||||
IsCitusTableType(relationId, RANGE_DISTRIBUTED))
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"For MERGE command, all the distributed tables "
|
||||
"must be colocated, for append/range distribution, "
|
||||
"colocation is not supported", NULL,
|
||||
"Consider using hash distribution instead");
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -389,8 +418,8 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList,
|
|||
}
|
||||
|
||||
/* Ensure all distributed tables are indeed co-located and joined on distribution column */
|
||||
return ErrorIfTablesNotColocatedAndJoinedOnDistColumn(parse, distTablesList,
|
||||
restrictionContext);
|
||||
return ErrorIfTablesNotColocatedAndJoinedOnDistColumn(parse,
|
||||
distTablesList, restrictionContext);
|
||||
}
|
||||
|
||||
|
||||
|
@ -461,7 +490,8 @@ IsPartitionColumnInMergeSource(Expr *columnExpression, Query *query, bool skipOu
|
|||
* value into the target which is not from the source table, if so, it
|
||||
* raises an exception.
|
||||
* Note: Inserting random values other than the joined column values will
|
||||
* result in unexpected behaviour of rows ending up in incorrect shards.
|
||||
* result in unexpected behaviour of rows ending up in incorrect shards, to
|
||||
* prevent such mishaps, we disallow such inserts here.
|
||||
*/
|
||||
static DeferredErrorMessage *
|
||||
InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte)
|
||||
|
@ -475,7 +505,7 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte)
|
|||
MergeAction *action = NULL;
|
||||
foreach_ptr(action, query->mergeActionList)
|
||||
{
|
||||
/* Skip MATCHED clauses */
|
||||
/* Skip MATCHED clause as INSERTS are not allowed in it*/
|
||||
if (action->matched)
|
||||
{
|
||||
continue;
|
||||
|
@ -501,11 +531,6 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte)
|
|||
TargetEntry *targetEntry = NULL;
|
||||
foreach_ptr(targetEntry, action->targetList)
|
||||
{
|
||||
if (targetEntry->resjunk)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
AttrNumber originalAttrNo = targetEntry->resno;
|
||||
|
||||
/* skip processing of target table non-partition columns */
|
||||
|
@ -516,7 +541,7 @@ InsertPartitionColumnMatchesSource(Query *query, RangeTblEntry *resultRte)
|
|||
|
||||
foundDistributionColumn = true;
|
||||
|
||||
if (targetEntry->expr->type == T_Var)
|
||||
if (IsA(targetEntry->expr, Var))
|
||||
{
|
||||
if (IsPartitionColumnInMergeSource(targetEntry->expr, query, true))
|
||||
{
|
||||
|
|
|
@ -4573,7 +4573,8 @@ RowModifyLevelForQuery(Query *query)
|
|||
}
|
||||
|
||||
if (commandType == CMD_UPDATE ||
|
||||
commandType == CMD_DELETE)
|
||||
commandType == CMD_DELETE ||
|
||||
commandType == CMD_MERGE)
|
||||
{
|
||||
return ROW_MODIFY_NONCOMMUTATIVE;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-insert s2-delete s1-commit s2-commit s2-result
|
||||
step s1-begin: BEGIN;
|
||||
step s2-begin: BEGIN;
|
||||
step s1-insert: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
|
||||
step s2-delete: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED AND prept.t2 = 0 THEN DELETE
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1;
|
||||
<waiting ...>
|
||||
step s1-commit: COMMIT;
|
||||
step s2-delete: <... completed>
|
||||
step s2-commit: COMMIT;
|
||||
step s2-result: SELECT * FROM prept;
|
||||
t1|t2
|
||||
---------------------------------------------------------------------
|
||||
100| 2
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s2-begin s2-delete s1-begin s1-insert s2-commit s1-commit s2-result
|
||||
step s2-begin: BEGIN;
|
||||
step s2-delete: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED AND prept.t2 = 0 THEN DELETE
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1;
|
||||
|
||||
step s1-begin: BEGIN;
|
||||
step s1-insert: MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
<waiting ...>
|
||||
step s2-commit: COMMIT;
|
||||
step s1-insert: <... completed>
|
||||
step s1-commit: COMMIT;
|
||||
step s2-result: SELECT * FROM prept;
|
||||
t1|t2
|
||||
---------------------------------------------------------------------
|
||||
200| 0
|
||||
100| 0
|
||||
(2 rows)
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-insert s2-delete s1-commit s2-commit s2-result
|
||||
setup failed: ERROR: MERGE is not supported on PG version: 14
|
||||
CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE
|
|
@ -0,0 +1,5 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-insert s2-delete s1-commit s2-commit s2-result
|
||||
setup failed: ERROR: MERGE is not supported on PG version: 13
|
||||
CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE
|
|
@ -0,0 +1,314 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: merge1 c1 select2 c2
|
||||
step merge1:
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge1' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
|
||||
step c1: COMMIT;
|
||||
step select2: SELECT key, val FROM target ORDER BY 1,2;
|
||||
key|val
|
||||
---------------------------------------------------------------------
|
||||
2|setup1 updated by merge1
|
||||
(1 row)
|
||||
|
||||
step c2: COMMIT;
|
||||
|
||||
starting permutation: merge1 c1 merge2a select2 c2
|
||||
step merge1:
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge1' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
|
||||
step c1: COMMIT;
|
||||
step merge2a:
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge2a' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
|
||||
step select2: SELECT key, val FROM target ORDER BY 1,2;
|
||||
key|val
|
||||
---------------------------------------------------------------------
|
||||
2|setup1 updated by merge1
|
||||
1|merge2a
|
||||
(2 rows)
|
||||
|
||||
step c2: COMMIT;
|
||||
|
||||
starting permutation: merge1 merge2a c1 select2 c2
|
||||
step merge1:
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge1' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
|
||||
step merge2a:
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge2a' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
<waiting ...>
|
||||
step c1: COMMIT;
|
||||
step merge2a: <... completed>
|
||||
step select2: SELECT key, val FROM target ORDER BY 1,2;
|
||||
key|val
|
||||
---------------------------------------------------------------------
|
||||
2|setup1 updated by merge1
|
||||
1|merge2a
|
||||
(2 rows)
|
||||
|
||||
step c2: COMMIT;
|
||||
|
||||
starting permutation: merge1 merge2a a1 select2 c2
|
||||
step merge1:
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge1' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
|
||||
step merge2a:
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge2a' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
<waiting ...>
|
||||
step a1: ABORT;
|
||||
step merge2a: <... completed>
|
||||
step select2: SELECT key, val FROM target ORDER BY 1,2;
|
||||
key|val
|
||||
---------------------------------------------------------------------
|
||||
2|setup1 updated by merge2a
|
||||
(1 row)
|
||||
|
||||
step c2: COMMIT;
|
||||
|
||||
starting permutation: merge1 merge2b c1 select2 c2
|
||||
step merge1:
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge1' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
|
||||
step merge2b:
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge2b' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED AND t.key < 2 THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
<waiting ...>
|
||||
step c1: COMMIT;
|
||||
step merge2b: <... completed>
|
||||
step select2: SELECT key, val FROM target ORDER BY 1,2;
|
||||
key|val
|
||||
---------------------------------------------------------------------
|
||||
2|setup1 updated by merge1
|
||||
1|merge2b
|
||||
(2 rows)
|
||||
|
||||
step c2: COMMIT;
|
||||
|
||||
starting permutation: merge1 merge2c c1 select2 c2
|
||||
step merge1:
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge1' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
|
||||
step merge2c:
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge2c' as val FROM source) s
|
||||
ON s.key = t.skey AND t.key < 2
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
<waiting ...>
|
||||
step c1: COMMIT;
|
||||
step merge2c: <... completed>
|
||||
step select2: SELECT key, val FROM target ORDER BY 1,2;
|
||||
key|val
|
||||
---------------------------------------------------------------------
|
||||
2|setup1 updated by merge1
|
||||
1|merge2c
|
||||
(2 rows)
|
||||
|
||||
step c2: COMMIT;
|
||||
|
||||
starting permutation: pa_merge1 pa_merge2a c1 pa_select2 c2
|
||||
step pa_merge1:
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge1' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set val = t.val || ' updated by ' || s.val;
|
||||
|
||||
step pa_merge2a:
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge2a' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
<waiting ...>
|
||||
step c1: COMMIT;
|
||||
step pa_merge2a: <... completed>
|
||||
step pa_select2: SELECT key, val FROM pa_target ORDER BY 1,2;
|
||||
key|val
|
||||
---------------------------------------------------------------------
|
||||
2|initial
|
||||
2|initial updated by pa_merge1 updated by pa_merge2a
|
||||
(2 rows)
|
||||
|
||||
step c2: COMMIT;
|
||||
|
||||
starting permutation: pa_merge2 pa_merge2a c1 pa_select2 c2
|
||||
step pa_merge2:
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge2' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
|
||||
step pa_merge2a:
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge2a' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
<waiting ...>
|
||||
step c1: COMMIT;
|
||||
step pa_merge2a: <... completed>
|
||||
ERROR: tuple to be locked was already moved to another partition due to concurrent update
|
||||
step pa_select2: SELECT key, val FROM pa_target ORDER BY 1,2;
|
||||
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||
step c2: COMMIT;
|
||||
|
||||
starting permutation: pa_merge2 c1 pa_merge2a pa_select2 c2
|
||||
step pa_merge2:
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge2' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
|
||||
step c1: COMMIT;
|
||||
step pa_merge2a:
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge2a' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
|
||||
step pa_select2: SELECT key, val FROM pa_target ORDER BY 1,2;
|
||||
key|val
|
||||
---------------------------------------------------------------------
|
||||
1|pa_merge2a
|
||||
2|initial
|
||||
2|initial updated by pa_merge2
|
||||
(3 rows)
|
||||
|
||||
step c2: COMMIT;
|
||||
|
||||
starting permutation: pa_merge3 pa_merge2b_when c1 pa_select2 c2
|
||||
step pa_merge3:
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge2' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set val = 'prefix ' || t.val;
|
||||
|
||||
step pa_merge2b_when:
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge2b_when' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED AND t.val like 'initial%' THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
<waiting ...>
|
||||
step c1: COMMIT;
|
||||
step pa_merge2b_when: <... completed>
|
||||
step pa_select2: SELECT key, val FROM pa_target ORDER BY 1,2;
|
||||
key|val
|
||||
---------------------------------------------------------------------
|
||||
1|prefix initial
|
||||
2|initial
|
||||
(2 rows)
|
||||
|
||||
step c2: COMMIT;
|
||||
|
||||
starting permutation: pa_merge1 pa_merge2b_when c1 pa_select2 c2
|
||||
step pa_merge1:
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge1' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set val = t.val || ' updated by ' || s.val;
|
||||
|
||||
step pa_merge2b_when:
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge2b_when' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED AND t.val like 'initial%' THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
<waiting ...>
|
||||
step c1: COMMIT;
|
||||
step pa_merge2b_when: <... completed>
|
||||
step pa_select2: SELECT key, val FROM pa_target ORDER BY 1,2;
|
||||
key|val
|
||||
---------------------------------------------------------------------
|
||||
2|initial
|
||||
2|initial updated by pa_merge1 updated by pa_merge2b_when
|
||||
(2 rows)
|
||||
|
||||
step c2: COMMIT;
|
|
@ -108,3 +108,6 @@ test: isolation_acquire_distributed_locks
|
|||
test: isolation_concurrent_move_create_table
|
||||
|
||||
test: isolation_check_mx
|
||||
|
||||
# MERGE
|
||||
test: isolation_merge
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
//
|
||||
// How we organize this isolation test spec, is explained at README.md file in this directory.
|
||||
//
|
||||
|
||||
// create distributed tables to test behavior of MERGE in concurrent operations
|
||||
setup
|
||||
{
|
||||
DO
|
||||
$do$
|
||||
DECLARE ver int;
|
||||
BEGIN
|
||||
SELECT substring(version(), '\d+')::int into ver;
|
||||
IF (ver < 15)
|
||||
THEN
|
||||
RAISE EXCEPTION 'MERGE is not supported on PG version: %', ver;
|
||||
END IF;
|
||||
END
|
||||
$do$;
|
||||
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE prept(t1 int, t2 int);
|
||||
CREATE TABLE preps(s1 int, s2 int);
|
||||
SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps', 's1');
|
||||
INSERT INTO prept VALUES(100, 0);
|
||||
INSERT INTO preps VALUES(100, 0);
|
||||
INSERT INTO preps VALUES(200, 0);
|
||||
}
|
||||
|
||||
// drop distributed tables
|
||||
teardown
|
||||
{
|
||||
DROP TABLE IF EXISTS prept CASCADE;
|
||||
DROP TABLE IF EXISTS preps CASCADE;
|
||||
}
|
||||
|
||||
// session 1
|
||||
session "s1"
|
||||
step "s1-begin" { BEGIN; }
|
||||
step "s1-insert" { MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1
|
||||
WHEN NOT MATCHED THEN INSERT VALUES(s1, s2);
|
||||
}
|
||||
step "s1-commit" { COMMIT; }
|
||||
|
||||
// session 2
|
||||
session "s2"
|
||||
step "s2-begin" { BEGIN; }
|
||||
step "s2-delete" { MERGE INTO prept USING preps ON prept.t1 = preps.s1
|
||||
WHEN MATCHED AND prept.t2 = 0 THEN DELETE
|
||||
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1;
|
||||
}
|
||||
step "s2-commit" { COMMIT; }
|
||||
step "s2-result" { SELECT * FROM prept; }
|
||||
|
||||
// permutations - MERGE vs MERGE
|
||||
permutation "s1-begin" "s2-begin" "s1-insert" "s2-delete" "s1-commit" "s2-commit" "s2-result"
|
||||
permutation "s2-begin" "s2-delete" "s1-begin" "s1-insert" "s2-commit" "s1-commit" "s2-result"
|
|
@ -0,0 +1,164 @@
|
|||
// MERGE UPDATE
|
||||
//
|
||||
// This test exercises atypical cases
|
||||
// 1. UPDATEs of PKs that change the join in the ON clause
|
||||
// 2. UPDATEs with WHEN conditions that would fail after concurrent update
|
||||
// 3. UPDATEs with extra ON conditions that would fail after concurrent update
|
||||
|
||||
setup
|
||||
{
|
||||
CREATE TABLE target (skey int, key int, val text);
|
||||
CREATE TABLE source (skey int primary key, val text);
|
||||
INSERT INTO target VALUES (1, 1, 'setup1');
|
||||
INSERT INTO source VALUES (1, 'setup1');
|
||||
|
||||
CREATE TABLE pa_target (skey integer, key integer, val text)
|
||||
PARTITION BY LIST (key);
|
||||
|
||||
SELECT create_distributed_table('target', 'skey');
|
||||
SELECT create_distributed_table('source', 'skey');
|
||||
|
||||
CREATE TABLE part1 (skey integer, key integer, val text);
|
||||
CREATE TABLE part2 (val text, skey integer, key integer);
|
||||
CREATE TABLE part3 (skey integer, key integer, val text);
|
||||
|
||||
ALTER TABLE pa_target ATTACH PARTITION part1 FOR VALUES IN (1,4);
|
||||
ALTER TABLE pa_target ATTACH PARTITION part2 FOR VALUES IN (2,5,6);
|
||||
ALTER TABLE pa_target ATTACH PARTITION part3 DEFAULT;
|
||||
|
||||
SELECT create_distributed_table('pa_target', 'skey');
|
||||
INSERT INTO pa_target VALUES (1, 1, 'initial');
|
||||
INSERT INTO pa_target VALUES (2, 2, 'initial');
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE target;
|
||||
DROP TABLE source;
|
||||
DROP TABLE pa_target CASCADE;
|
||||
}
|
||||
|
||||
session "s1"
|
||||
setup
|
||||
{
|
||||
BEGIN ISOLATION LEVEL READ COMMITTED;
|
||||
}
|
||||
step "merge1"
|
||||
{
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge1' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
}
|
||||
step "pa_merge1"
|
||||
{
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge1' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set val = t.val || ' updated by ' || s.val;
|
||||
}
|
||||
step "pa_merge2"
|
||||
{
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge2' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
}
|
||||
step "pa_merge3"
|
||||
{
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge2' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set val = 'prefix ' || t.val;
|
||||
}
|
||||
step "c1" { COMMIT; }
|
||||
step "a1" { ABORT; }
|
||||
|
||||
session "s2"
|
||||
setup
|
||||
{
|
||||
BEGIN ISOLATION LEVEL READ COMMITTED;
|
||||
}
|
||||
step "merge2a"
|
||||
{
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge2a' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED AND t.key = 1 THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
}
|
||||
step "merge2b"
|
||||
{
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge2b' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED AND t.key < 2 THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
}
|
||||
step "merge2c"
|
||||
{
|
||||
MERGE INTO target t
|
||||
USING (SELECT skey as key, 'merge2c' as val FROM source) s
|
||||
ON s.key = t.skey AND t.key < 2
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, 1, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
}
|
||||
step "pa_merge2a"
|
||||
{
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge2a' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
}
|
||||
// MERGE proceeds only if 'val' unchanged
|
||||
step "pa_merge2b_when"
|
||||
{
|
||||
MERGE INTO pa_target t
|
||||
USING (SELECT skey as key, 'pa_merge2b_when' as val FROM source) s
|
||||
ON s.key = t.skey
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (s.key, s.key, s.val)
|
||||
WHEN MATCHED AND t.val like 'initial%' THEN
|
||||
UPDATE set key = t.key + 1, val = t.val || ' updated by ' || s.val;
|
||||
}
|
||||
step "select2" { SELECT key, val FROM target ORDER BY 1,2; }
|
||||
step "pa_select2" { SELECT key, val FROM pa_target ORDER BY 1,2; }
|
||||
step "c2" { COMMIT; }
|
||||
|
||||
// Basic effects
|
||||
permutation "merge1" "c1" "select2" "c2"
|
||||
|
||||
// One after the other, no concurrency
|
||||
permutation "merge1" "c1" "merge2a" "select2" "c2"
|
||||
|
||||
// Now with concurrency
|
||||
permutation "merge1" "merge2a" "c1" "select2" "c2"
|
||||
permutation "merge1" "merge2a" "a1" "select2" "c2"
|
||||
permutation "merge1" "merge2b" "c1" "select2" "c2"
|
||||
permutation "merge1" "merge2c" "c1" "select2" "c2"
|
||||
permutation "pa_merge1" "pa_merge2a" "c1" "pa_select2" "c2"
|
||||
permutation "pa_merge2" "pa_merge2a" "c1" "pa_select2" "c2" // fails
|
||||
permutation "pa_merge2" "c1" "pa_merge2a" "pa_select2" "c2" // succeeds
|
||||
permutation "pa_merge3" "pa_merge2b_when" "c1" "pa_select2" "c2" // WHEN not satisfied by updated tuple
|
||||
permutation "pa_merge1" "pa_merge2b_when" "c1" "pa_select2" "c2" // WHEN satisfied by updated tuple
|
Loading…
Reference in New Issue