diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 25fb34e43..82f96648e 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -199,7 +199,15 @@ RouterCreateScan(CustomScan *scan) /* check whether query has at most one shard */ if (list_length(taskList) <= 1) { - if (isModificationQuery) + List *relationRowLockList = NIL; + if (list_length(taskList) == 1) + { + Task *task = (Task *) linitial(taskList); + relationRowLockList = task->relationRowLockList; + } + + /* if query is SELECT ... FOR UPDATE query, use modify logic */ + if (isModificationQuery || relationRowLockList != NIL) { scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods; } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index f194b703e..c1f978cfd 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -151,7 +151,7 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) LOCKMODE lockMode = NoLock; int64 shardId = task->anchorShardId; - if (commandType == CMD_SELECT || list_length(task->taskPlacementList) == 1) + if (commandType == CMD_SELECT) { /* * The executor shard lock is used to maintain consistency between @@ -161,6 +161,28 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) lockMode = NoLock; } + else if (list_length(task->taskPlacementList) == 1) + { + if (task->replicationModel == REPLICATION_MODEL_2PC) + { + /* + * While we don't need a lock to ensure writes are applied in + * a consistent order when there is a single replica. We also use + * shard resource locks as a crude implementation of SELECT.. + * FOR UPDATE on reference tables, so we should always take + * a lock that conflicts with the FOR UPDATE/SHARE locks. + */ + lockMode = RowExclusiveLock; + } + else + { + /* + * When there is no replication, the worker itself can decide on + * on the order in which writes are applied. + */ + lockMode = NoLock; + } + } else if (AllModificationsCommutative) { /* @@ -224,6 +246,55 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) LockShardResource(shardId, lockMode); } + /* + * If lock clause exists and it effects any reference table, we need to get + * lock on shard resource. Type of lock is determined by the type of row lock + * given in the query. If the type of row lock is either FOR NO KEY UPDATE or + * FOR UPDATE we get ExclusiveLock on shard resource. We get ShareLock if it + * is FOR KEY SHARE or FOR KEY SHARE. + * + * We have selected these lock types according to conflict table given in the + * Postgres documentation. It is given that FOR UPDATE and FOR NO KEY UPDATE + * must be conflict with each other modify command. By getting ExlcusiveLock + * we guarantee that. Note that, getting ExlusiveLock does not mimic the + * behaviour of Postgres exactly. Getting row lock with FOR NO KEY UPDATE and + * FOR KEY SHARE do not conflicts in Postgres, yet they block each other in + * our implementation. Since FOR SHARE and FOR KEY SHARE does not conflict + * with each other but conflicts with modify commands, we get ShareLock for + * them. + */ + if (task->relationRowLockList != NIL) + { + ListCell *rtiLockCell = NULL; + LOCKMODE rowLockMode = NoLock; + + foreach(rtiLockCell, task->relationRowLockList) + { + RelationRowLock *relationRowLock = (RelationRowLock *) lfirst(rtiLockCell); + LockClauseStrength rowLockStrength = relationRowLock->rowLockStrength; + Oid relationId = relationRowLock->relationId; + + if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + { + List *shardIntervalList = LoadShardIntervalList(relationId); + ShardInterval *referenceTableShardInterval = (ShardInterval *) linitial( + shardIntervalList); + + if (rowLockStrength == LCS_FORKEYSHARE || rowLockStrength == LCS_FORSHARE) + { + rowLockMode = ShareLock; + } + else if (rowLockStrength == LCS_FORNOKEYUPDATE || rowLockStrength == + LCS_FORUPDATE) + { + rowLockMode = ExclusiveLock; + } + + LockShardResource(referenceTableShardInterval->shardId, rowLockMode); + } + } + } + /* * If the task has a subselect, then we may need to lock the shards from which * the query selects as well to prevent the subselects from seeing different @@ -462,6 +533,7 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) /* * RouterSequentialModifyExecScan executes 0 or more modifications on a * distributed table sequentially and returns results if there are any. + * Note that we also use this path for SELECT ... FOR UPDATE queries. */ TupleTableSlot * RouterSequentialModifyExecScan(CustomScanState *node) @@ -522,9 +594,13 @@ RouterSequentialModifyExecScan(CustomScanState *node) { Task *task = (Task *) lfirst(taskCell); + /* + * Result is expected for SELECT ... FOR UPDATE queries as well. + */ executorState->es_processed += ExecuteSingleModifyTask(scanState, task, operation, - alwaysThrowErrorOnFailure, hasReturning); + alwaysThrowErrorOnFailure, + hasReturning || task->relationRowLockList != NIL); } scanState->finishedRemoteScan = true; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 41ecb84b0..66fa6606a 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -139,9 +139,8 @@ static List * ExtractInsertValuesList(Query *query, Var *partitionColumn); static bool MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext); static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree); -static RangeTblEntry * GetUpdateOrDeleteRTE(List *rangeTableList); -static bool UpdateOrDeleteRTE(RangeTblEntry *rangeTableEntry); -static bool SelectsFromDistributedTable(List *rangeTableList); +static RangeTblEntry * GetUpdateOrDeleteRTE(Query *query); +static bool SelectsFromDistributedTable(List *rangeTableList, Query *query); #if (PG_VERSION_NUM >= 100000) static List * get_all_actual_clauses(List *restrictinfo_list); #endif @@ -150,6 +149,7 @@ static int CompareInsertValuesByShardId(const void *leftElement, static uint64 GetInitialShardId(List *relationShardList); static List * SingleShardSelectTaskList(Query *query, List *relationShardList, List *placementList, uint64 shardId); +static bool RowLocksOnRelations(Node *node, List **rtiLockList); static List * SingleShardModifyTaskList(Query *query, List *relationShardList, List *placementList, uint64 shardId); @@ -1490,6 +1490,7 @@ CreateTask(TaskType taskType) task->taskExecution = NULL; task->upsertQuery = false; task->replicationModel = REPLICATION_MODEL_INVALID; + task->relationRowLockList = NIL; task->modifyWithSubquery = false; task->relationShardList = NIL; @@ -1540,7 +1541,6 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon uint64 shardId = INVALID_SHARD_ID; List *placementList = NIL; List *relationShardList = NIL; - List *rangeTableList = NIL; bool replacePrunedQueryWithDummy = false; bool requiresMasterEvaluation = false; RangeTblEntry *updateOrDeleteRTE = NULL; @@ -1566,8 +1566,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon job = CreateJob(originalQuery); job->partitionValueConst = partitionValueConst; - ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList); - updateOrDeleteRTE = GetUpdateOrDeleteRTE(rangeTableList); + updateOrDeleteRTE = GetUpdateOrDeleteRTE(originalQuery); /* * If all of the shards are pruned, we replace the relation RTE into @@ -1616,18 +1615,62 @@ SingleShardSelectTaskList(Query *query, List *relationShardList, List *placement { Task *task = CreateTask(ROUTER_TASK); StringInfo queryString = makeStringInfo(); + List *relationRowLockList = NIL; + RowLocksOnRelations((Node *) query, &relationRowLockList); pg_get_query_def(query, queryString); task->queryString = queryString->data; task->anchorShardId = shardId; task->taskPlacementList = placementList; task->relationShardList = relationShardList; + task->relationRowLockList = relationRowLockList; return list_make1(task); } +/* + * RowLocksOnRelations forms the list for range table IDs and corresponding + * row lock modes. + */ +static bool +RowLocksOnRelations(Node *node, List **relationRowLockList) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, Query)) + { + Query *query = (Query *) node; + ListCell *rowMarkCell = NULL; + + foreach(rowMarkCell, query->rowMarks) + { + RowMarkClause *rowMarkClause = (RowMarkClause *) lfirst(rowMarkCell); + RangeTblEntry *rangeTable = rt_fetch(rowMarkClause->rti, query->rtable); + Oid relationId = rangeTable->relid; + + if (IsDistributedTable(relationId)) + { + RelationRowLock *relationRowLock = CitusMakeNode(RelationRowLock); + relationRowLock->relationId = relationId; + relationRowLock->rowLockStrength = rowMarkClause->strength; + *relationRowLockList = lappend(*relationRowLockList, relationRowLock); + } + } + + return query_tree_walker(query, RowLocksOnRelations, relationRowLockList, 0); + } + else + { + return expression_tree_walker(node, RowLocksOnRelations, relationRowLockList); + } +} + + /* * SingleShardModifyTaskList generates a task for single shard update/delete query * and returns it as a list. @@ -1644,13 +1687,13 @@ SingleShardModifyTaskList(Query *query, List *relationShardList, List *placement RangeTblEntry *updateOrDeleteRTE = NULL; ExtractRangeTableEntryWalker((Node *) query, &rangeTableList); - updateOrDeleteRTE = GetUpdateOrDeleteRTE(rangeTableList); + updateOrDeleteRTE = GetUpdateOrDeleteRTE(query); modificationTableCacheEntry = DistributedTableCacheEntry(updateOrDeleteRTE->relid); modificationPartitionMethod = modificationTableCacheEntry->partitionMethod; if (modificationPartitionMethod == DISTRIBUTE_BY_NONE && - SelectsFromDistributedTable(rangeTableList)) + SelectsFromDistributedTable(rangeTableList, query)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform select on a distributed table " @@ -1670,55 +1713,37 @@ SingleShardModifyTaskList(Query *query, List *relationShardList, List *placement /* - * GetUpdateOrDeleteRTE walks over the given range table list, and checks if - * it has an UPDATE or DELETE RTE. If it finds one, it return it immediately. + * GetUpdateOrDeleteRTE checks query if it has an UPDATE or DELETE RTE. If it finds + * it returns it. */ static RangeTblEntry * -GetUpdateOrDeleteRTE(List *rangeTableList) +GetUpdateOrDeleteRTE(Query *query) { - ListCell *rangeTableCell = NULL; - - foreach(rangeTableCell, rangeTableList) + if (query->resultRelation > 0) { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - - if (UpdateOrDeleteRTE(rangeTableEntry)) - { - return rangeTableEntry; - } + return rt_fetch(query->resultRelation, query->rtable); } return NULL; } -/* - * UpdateOrDeleteRTE checks if the given range table entry is an UPDATE or - * DELETE RTE by checking required permissions on it. - */ -static bool -UpdateOrDeleteRTE(RangeTblEntry *rangeTableEntry) -{ - if ((ACL_UPDATE & rangeTableEntry->requiredPerms) || - (ACL_DELETE & rangeTableEntry->requiredPerms)) - { - return true; - } - else - { - return false; - } -} - - /* * SelectsFromDistributedTable checks if there is a select on a distributed * table by looking into range table entries. */ static bool -SelectsFromDistributedTable(List *rangeTableList) +SelectsFromDistributedTable(List *rangeTableList, Query *query) { ListCell *rangeTableCell = NULL; + int resultRelation = query->resultRelation; + RangeTblEntry *resultRangeTableEntry = NULL; + + if (resultRelation > 0) + { + resultRangeTableEntry = rt_fetch(resultRelation, query->rtable); + } + foreach(rangeTableCell, rangeTableList) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); @@ -1731,7 +1756,8 @@ SelectsFromDistributedTable(List *rangeTableList) cacheEntry = DistributedTableCacheEntry(rangeTableEntry->relid); if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE && - !UpdateOrDeleteRTE(rangeTableEntry)) + (resultRangeTableEntry == NULL || resultRangeTableEntry->relid != + rangeTableEntry->relid)) { return true; } @@ -2740,11 +2766,6 @@ MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionC return false; } - if (query->hasForUpdate) - { - return false; - } - foreach(relationRestrictionContextCell, restrictionContext->relationRestrictionList) { RelationRestriction *relationRestriction = @@ -2768,6 +2789,28 @@ MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionC { return false; } + + /* + * Currently, we don't support tables with replication factor > 1, + * except reference tables with SELECT ... FOR UDPATE queries. It is + * also not supported from MX nodes. + */ + if (query->hasForUpdate) + { + uint32 tableReplicationFactor = TableShardReplicationFactor( + distributedTableId); + + if (partitionMethod == DISTRIBUTE_BY_NONE) + { + EnsureCoordinator(); + } + + + if (tableReplicationFactor > 1 && partitionMethod != DISTRIBUTE_BY_NONE) + { + return false; + } + } } } diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index a4b115086..03e100473 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -227,6 +227,16 @@ CopyNodeRelationShard(COPYFUNC_ARGS) } +void +CopyNodeRelationRowLock(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(RelationRowLock); + + COPY_SCALAR_FIELD(relationId); + COPY_SCALAR_FIELD(rowLockStrength); +} + + void CopyNodeTask(COPYFUNC_ARGS) { @@ -248,6 +258,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(replicationModel); COPY_SCALAR_FIELD(modifyWithSubquery); COPY_NODE_FIELD(relationShardList); + COPY_NODE_FIELD(relationRowLockList); COPY_NODE_FIELD(rowValuesLists); } diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index 4cf380403..f5f65ff12 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -16,6 +16,7 @@ #include "distributed/errormessage.h" #include "distributed/metadata_cache.h" #include "distributed/distributed_planner.h" +#include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" static const char *CitusNodeTagNamesD[] = { @@ -38,6 +39,7 @@ static const char *CitusNodeTagNamesD[] = { "ShardInterval", "ShardPlacement", "RelationShard", + "RelationRowLock", "DeferredErrorMessage", "GroupShardPlacement" }; @@ -393,6 +395,7 @@ const ExtensibleNodeMethods nodeMethods[] = DEFINE_NODE_METHODS(MapMergeJob), DEFINE_NODE_METHODS(ShardPlacement), DEFINE_NODE_METHODS(RelationShard), + DEFINE_NODE_METHODS(RelationRowLock), DEFINE_NODE_METHODS(Task), DEFINE_NODE_METHODS(TaskExecution), DEFINE_NODE_METHODS(DeferredErrorMessage), diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index a5b4aa8f7..c16d43125 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -435,6 +435,17 @@ OutRelationShard(OUTFUNC_ARGS) } +void +OutRelationRowLock(OUTFUNC_ARGS) +{ + WRITE_LOCALS(RelationRowLock); + WRITE_NODE_TYPE("RELATIONROWLOCK"); + + WRITE_OID_FIELD(relationId); + WRITE_ENUM_FIELD(rowLockStrength, LockClauseStrength); +} + + void OutTask(OUTFUNC_ARGS) { @@ -457,6 +468,7 @@ OutTask(OUTFUNC_ARGS) WRITE_CHAR_FIELD(replicationModel); WRITE_BOOL_FIELD(modifyWithSubquery); WRITE_NODE_FIELD(relationShardList); + WRITE_NODE_FIELD(relationRowLockList); WRITE_NODE_FIELD(rowValuesLists); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index b974c0de9..bed80f3cb 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -349,6 +349,18 @@ ReadRelationShard(READFUNC_ARGS) } +READFUNC_RET +ReadRelationRowLock(READFUNC_ARGS) +{ + READ_LOCALS(RelationRowLock); + + READ_OID_FIELD(relationId); + READ_ENUM_FIELD(rowLockStrength, LockClauseStrength); + + READ_DONE(); +} + + READFUNC_RET ReadTask(READFUNC_ARGS) { @@ -370,6 +382,7 @@ ReadTask(READFUNC_ARGS) READ_CHAR_FIELD(replicationModel); READ_BOOL_FIELD(modifyWithSubquery); READ_NODE_FIELD(relationShardList); + READ_NODE_FIELD(relationRowLockList); READ_NODE_FIELD(rowValuesLists); READ_DONE(); diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index 969140abd..1b9d20bff 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -49,6 +49,7 @@ extern READFUNC_RET ReadShardInterval(READFUNC_ARGS); extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS); extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS); extern READFUNC_RET ReadRelationShard(READFUNC_ARGS); +extern READFUNC_RET ReadRelationRowLock(READFUNC_ARGS); extern READFUNC_RET ReadTask(READFUNC_ARGS); extern READFUNC_RET ReadTaskExecution(READFUNC_ARGS); extern READFUNC_RET ReadDeferredErrorMessage(READFUNC_ARGS); @@ -63,6 +64,7 @@ extern void OutShardInterval(OUTFUNC_ARGS); extern void OutMapMergeJob(OUTFUNC_ARGS); extern void OutShardPlacement(OUTFUNC_ARGS); extern void OutRelationShard(OUTFUNC_ARGS); +extern void OutRelationRowLock(OUTFUNC_ARGS); extern void OutTask(OUTFUNC_ARGS); extern void OutTaskExecution(OUTFUNC_ARGS); extern void OutDeferredErrorMessage(OUTFUNC_ARGS); @@ -87,6 +89,7 @@ extern void CopyNodeMapMergeJob(COPYFUNC_ARGS); extern void CopyNodeShardPlacement(COPYFUNC_ARGS); extern void CopyNodeGroupShardPlacement(COPYFUNC_ARGS); extern void CopyNodeRelationShard(COPYFUNC_ARGS); +extern void CopyNodeRelationRowLock(COPYFUNC_ARGS); extern void CopyNodeTask(COPYFUNC_ARGS); extern void CopyNodeTaskExecution(COPYFUNC_ARGS); extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index 5a579e3aa..49ece0775 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -62,6 +62,7 @@ typedef enum CitusNodeTag T_ShardInterval, T_ShardPlacement, T_RelationShard, + T_RelationRowLock, T_DeferredErrorMessage, T_GroupShardPlacement } CitusNodeTag; diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index cad9d9451..f18934e28 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -73,6 +73,13 @@ typedef struct RelationShard uint64 shardId; } RelationShard; +typedef struct RelationRowLock +{ + CitusNode type; + Oid relationId; + LockClauseStrength rowLockStrength; +} RelationRowLock; + extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 92297d0ed..b01ff563c 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -186,6 +186,7 @@ typedef struct Task bool upsertQuery; /* only applies to modify tasks */ char replicationModel; /* only applies to modify tasks */ + List *relationRowLockList; bool modifyWithSubquery; List *relationShardList; diff --git a/src/test/regress/expected/isolation_select_for_update.out b/src/test/regress/expected/isolation_select_for_update.out new file mode 100644 index 000000000..0587b37ea --- /dev/null +++ b/src/test/regress/expected/isolation_select_for_update.out @@ -0,0 +1,376 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-select-from-t1-t2-for-update s2-begin s2-update-t1 s1-finish s2-finish +step s1-begin: + BEGIN; + +step s1-select-from-t1-t2-for-update: + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_2_rf1 as tt2 on tt1.id = tt2.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE; + +id val_1 id val_1 + +1 2 1 2 +step s2-begin: + BEGIN; + +step s2-update-t1: + UPDATE test_table_1_rf1 SET val_1 = 5 WHERE id = 1; + +step s1-finish: + COMMIT; + +step s2-update-t1: <... completed> +step s2-finish: + COMMIT; + +restore_isolation_tester_func + + + +starting permutation: s1-begin s1-select-from-t1-t2-for-share s2-begin s2-delete-t1 s1-finish s2-finish +step s1-begin: + BEGIN; + +step s1-select-from-t1-t2-for-share: + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_2_rf1 as tt2 on tt1.id = tt2.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR SHARE; + +id val_1 id val_1 + +1 2 1 2 +step s2-begin: + BEGIN; + +step s2-delete-t1: + DELETE FROM test_table_1_rf1 WHERE id = 1; + +step s1-finish: + COMMIT; + +step s2-delete-t1: <... completed> +step s2-finish: + COMMIT; + +restore_isolation_tester_func + + + +starting permutation: s1-begin s1-select-from-t1-rt-for-update s2-begin s2-update-t1 s1-finish s2-finish +step s1-begin: + BEGIN; + +step s1-select-from-t1-rt-for-update: + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE; + +id val_1 id val_1 + +1 2 1 2 +step s2-begin: + BEGIN; + +step s2-update-t1: + UPDATE test_table_1_rf1 SET val_1 = 5 WHERE id = 1; + +step s1-finish: + COMMIT; + +step s2-update-t1: <... completed> +step s2-finish: + COMMIT; + +restore_isolation_tester_func + + + +starting permutation: s1-begin s1-select-from-t1-rt-with-lc-for-update s2-begin s2-update-rt s1-finish s2-finish +step s1-begin: + BEGIN; + +step s1-select-from-t1-rt-with-lc-for-update: + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE + OF rt1; + +id val_1 id val_1 + +1 2 1 2 +step s2-begin: + BEGIN; + +step s2-update-rt: + UPDATE ref_table SET val_1 = 5 WHERE id = 1; + +step s1-finish: + COMMIT; + +step s2-update-rt: <... completed> +step s2-finish: + COMMIT; + +restore_isolation_tester_func + + + +starting permutation: s1-begin s1-select-from-t1-rt-with-lc-for-update s2-begin s2-update-t1 s1-finish s2-finish +step s1-begin: + BEGIN; + +step s1-select-from-t1-rt-with-lc-for-update: + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE + OF rt1; + +id val_1 id val_1 + +1 2 1 2 +step s2-begin: + BEGIN; + +step s2-update-t1: + UPDATE test_table_1_rf1 SET val_1 = 5 WHERE id = 1; + +step s1-finish: + COMMIT; + +step s2-finish: + COMMIT; + +restore_isolation_tester_func + + + +starting permutation: s1-begin s1-select-from-t1-t2-for-share s2-begin s2-select-from-t1-t2-for-share s1-finish s2-finish +step s1-begin: + BEGIN; + +step s1-select-from-t1-t2-for-share: + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_2_rf1 as tt2 on tt1.id = tt2.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR SHARE; + +id val_1 id val_1 + +1 2 1 2 +step s2-begin: + BEGIN; + +step s2-select-from-t1-t2-for-share: + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR SHARE; + +id val_1 id val_1 + +1 2 1 2 +step s1-finish: + COMMIT; + +step s2-finish: + COMMIT; + +restore_isolation_tester_func + + + +starting permutation: s1-begin s1-select-from-t1-rt-for-update s2-begin s2-select-from-t1-t2-for-update s1-finish s2-finish +step s1-begin: + BEGIN; + +step s1-select-from-t1-rt-for-update: + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE; + +id val_1 id val_1 + +1 2 1 2 +step s2-begin: + BEGIN; + +step s2-select-from-t1-t2-for-update: + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE; + +step s1-finish: + COMMIT; + +step s2-select-from-t1-t2-for-update: <... completed> +id val_1 id val_1 + +1 2 1 2 +step s2-finish: + COMMIT; + +restore_isolation_tester_func + + + +starting permutation: s1-begin s1-select-from-t1-within-cte s2-begin s2-select-from-t1-t2-for-update s1-finish s2-finish +step s1-begin: + BEGIN; + +step s1-select-from-t1-within-cte: + WITH first_value AS ( SELECT val_1 FROM test_table_1_rf1 WHERE id = 1 FOR UPDATE) + SELECT * FROM first_value; + +val_1 + +2 +step s2-begin: + BEGIN; + +step s2-select-from-t1-t2-for-update: + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE; + +step s1-finish: + COMMIT; + +step s2-select-from-t1-t2-for-update: <... completed> +id val_1 id val_1 + +1 2 1 2 +step s2-finish: + COMMIT; + +restore_isolation_tester_func + + + +starting permutation: s1-begin s1-select-from-t1-within-cte s2-begin s2-update-t1 s1-finish s2-finish +step s1-begin: + BEGIN; + +step s1-select-from-t1-within-cte: + WITH first_value AS ( SELECT val_1 FROM test_table_1_rf1 WHERE id = 1 FOR UPDATE) + SELECT * FROM first_value; + +val_1 + +2 +step s2-begin: + BEGIN; + +step s2-update-t1: + UPDATE test_table_1_rf1 SET val_1 = 5 WHERE id = 1; + +step s1-finish: + COMMIT; + +step s2-update-t1: <... completed> +step s2-finish: + COMMIT; + +restore_isolation_tester_func + + + +starting permutation: s1-begin s1-select-from-t1-with-subquery s2-begin s2-update-t1 s1-finish s2-finish +step s1-begin: + BEGIN; + +step s1-select-from-t1-with-subquery: + SELECT * FROM (SELECT * FROM test_table_1_rf1 FOR UPDATE) foo WHERE id = 1; + +id val_1 + +1 2 +step s2-begin: + BEGIN; + +step s2-update-t1: + UPDATE test_table_1_rf1 SET val_1 = 5 WHERE id = 1; + +step s1-finish: + COMMIT; + +step s2-update-t1: <... completed> +step s2-finish: + COMMIT; + +restore_isolation_tester_func + + + +starting permutation: s1-begin s1-select-from-rt-with-subquery s2-begin s2-update-rt s1-finish s2-finish +step s1-begin: + BEGIN; + +step s1-select-from-rt-with-subquery: + SELECT * FROM (SELECT * FROM ref_table FOR UPDATE) foo WHERE id = 1; + +id val_1 + +1 2 +step s2-begin: + BEGIN; + +step s2-update-rt: + UPDATE ref_table SET val_1 = 5 WHERE id = 1; + +step s1-finish: + COMMIT; + +step s2-update-rt: <... completed> +step s2-finish: + COMMIT; + +restore_isolation_tester_func + + + +starting permutation: s1-begin s1-select-from-t1-with-view s2-begin s2-update-t1 s1-finish s2-finish +step s1-begin: + BEGIN; + +step s1-select-from-t1-with-view: + SELECT * FROM test_1 WHERE id = 1 FOR UPDATE; + +id val_1 + +1 2 +step s2-begin: + BEGIN; + +step s2-update-t1: + UPDATE test_table_1_rf1 SET val_1 = 5 WHERE id = 1; + +step s1-finish: + COMMIT; + +step s2-update-t1: <... completed> +step s2-finish: + COMMIT; + +restore_isolation_tester_func + + diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index a39cf8b65..e985f05cc 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -1047,6 +1047,11 @@ UPDATE reference_summary_table SET average_value = average_query.average FROM ( SELECT avg(value) AS average FROM reference_raw_table WHERE id = 1 ) average_query WHERE id = 1; +UPDATE reference_summary_table SET average_value = average_query.average_value FROM ( + SELECT average_value FROM summary_table WHERE id = 1 FOR UPDATE + ) average_query +WHERE id = 1; +ERROR: cannot perform select on a distributed table and modify a reference table UPDATE reference_summary_table SET (min_value, average_value) = (SELECT min(value), avg(value) FROM reference_raw_table WHERE id = 2) WHERE id = 2; diff --git a/src/test/regress/expected/multi_modifications_0.out b/src/test/regress/expected/multi_modifications_0.out index ca556dd1f..772c29609 100644 --- a/src/test/regress/expected/multi_modifications_0.out +++ b/src/test/regress/expected/multi_modifications_0.out @@ -1047,6 +1047,11 @@ UPDATE reference_summary_table SET average_value = average_query.average FROM ( SELECT avg(value) AS average FROM reference_raw_table WHERE id = 1 ) average_query WHERE id = 1; +UPDATE reference_summary_table SET average_value = average_query.average_value FROM ( + SELECT average_value FROM summary_table WHERE id = 1 FOR UPDATE + ) average_query +WHERE id = 1; +ERROR: cannot perform select on a distributed table and modify a reference table UPDATE reference_summary_table SET (min_value, average_value) = (SELECT min(value), avg(value) FROM reference_raw_table WHERE id = 2) WHERE id = 2; diff --git a/src/test/regress/expected/multi_mx_router_planner.out b/src/test/regress/expected/multi_mx_router_planner.out index 5f61b5854..e7ad50b2f 100644 --- a/src/test/regress/expected/multi_mx_router_planner.out +++ b/src/test/regress/expected/multi_mx_router_planner.out @@ -930,6 +930,11 @@ SELECT * 41 | 1 | aznavour | 11814 (5 rows) +-- SELECT ... FOR UPDATE does not supported from MX nodes if it contains +-- reference table. +SELECT * FROM customer_mx FOR UPDATE; +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. -- not router plannable due to or SELECT * FROM articles_hash_mx diff --git a/src/test/regress/expected/multi_select_for_update.out b/src/test/regress/expected/multi_select_for_update.out new file mode 100644 index 000000000..6605f1b2c --- /dev/null +++ b/src/test/regress/expected/multi_select_for_update.out @@ -0,0 +1,185 @@ +-- +-- MULTI_SIZE_QUERIES +-- +-- Test checks whether size of distributed tables can be obtained with citus_table_size. +-- To find the relation size and total relation size citus_relation_size and +-- citus_total_relation_size are also tested. +SET citus.next_shard_id TO 1460000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1460000; +SET citus.shard_replication_factor to 1; +CREATE TABLE test_table_1_rf1(id int, val_1 int); +SELECT create_distributed_table('test_table_1_rf1','id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO test_table_1_rf1 values(1,2),(2,3),(3,4),(15,16); +CREATE TABLE test_table_2_rf1(id int, val_1 int); +SELECT create_distributed_table('test_table_2_rf1','id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO test_table_2_rf1 values(1,2),(2,3),(3,4); +CREATE TABLE ref_table(id int, val_1 int); +SELECT create_reference_table('ref_table'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO ref_table values(1,2),(3,4),(5,6); +CREATE TABLE ref_table_2(id int, val_1 int); +SELECT create_reference_table('ref_table_2'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO ref_table_2 values(3,4),(5,6),(8,9); +SET citus.shard_replication_factor to 2; +CREATE TABLE test_table_3_rf2(id int, val_1 int); +SELECT create_distributed_table('test_table_3_rf2','id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO test_table_3_rf2 values(1,2),(2,3),(3,4); +CREATE TABLE test_table_4_rf2(id int, val_1 int); +SELECT create_distributed_table('test_table_4_rf2','id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO test_table_4_rf2 values(1,2),(2,3),(3,4); +-- Hash tables with RF = 1 is supported for router planner queries +SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE; + id | val_1 | id | val_1 +----+-------+----+------- + 1 | 2 | 1 | 2 +(1 row) + +-- May hav two different filters on partition column, if they resulted on the same shard +SELECT * FROM + test_table_1_rf1 as tt1 WHERE tt1.id = 1 OR tt1.id = 15 + ORDER BY 1 + FOR UPDATE; + id | val_1 +----+------- + 1 | 2 + 15 | 16 +(2 rows) + +-- Only router plannable queries are supported right now. +SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id + ORDER BY 1 + FOR UPDATE; +ERROR: could not run distributed query with FOR UPDATE/SHARE commands +HINT: Consider using an equality filter on the distributed table's partition column. +-- RF > 1 is not supported +SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_3_rf2 as tt3 on tt1.id = tt3.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE; +ERROR: could not run distributed query with FOR UPDATE/SHARE commands +SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_3_rf2 as tt3 on tt1.id = tt3.id + ORDER BY 1 + FOR UPDATE; +ERROR: could not run distributed query with FOR UPDATE/SHARE commands +SELECT * FROM + test_table_3_rf2 as tt3 INNER JOIN test_table_4_rf2 as tt4 on tt3.id = tt4.id + WHERE tt3.id = 1 + ORDER BY 1 + FOR UPDATE; +ERROR: could not run distributed query with FOR UPDATE/SHARE commands +HINT: Consider using an equality filter on the distributed table's partition column. +-- We take executor shard lock for reference tables +SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE; + id | val_1 | id | val_1 +----+-------+----+------- + 1 | 2 | 1 | 2 +(1 row) + +-- Lock type varies according to the strength of row level lock +SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR SHARE; + id | val_1 | id | val_1 +----+-------+----+------- + 1 | 2 | 1 | 2 +(1 row) + +-- You can choose which tables should be affected by locking clause +SELECT * FROM + ref_table as rt1 INNER JOIN ref_table_2 as rt2 on rt1.id = rt2.id + ORDER BY 1 + FOR UPDATE + OF rt1; + id | val_1 | id | val_1 +----+-------+----+------- + 3 | 4 | 3 | 4 + 5 | 6 | 5 | 6 +(2 rows) + +-- You can choose which tables should be affected by locking clause +SELECT * FROM + ref_table as rt1 INNER JOIN ref_table_2 as rt2 on rt1.id = rt2.id + ORDER BY 1 + FOR UPDATE + OF rt1 + NOWAIT; + id | val_1 | id | val_1 +----+-------+----+------- + 3 | 4 | 3 | 4 + 5 | 6 | 5 | 6 +(2 rows) + +-- queries with CTEs are supported +WITH first_value AS ( + SELECT val_1 FROM test_table_1_rf1 WHERE id = 1 FOR UPDATE) +SELECT * FROM first_value; + val_1 +------- + 2 +(1 row) + +-- queries with modifying CTEs are also supported +WITH update_table AS ( + UPDATE test_table_1_rf1 SET val_1 = 10 WHERE id = 1 RETURNING * +) +SELECT * FROM update_table FOR UPDATE; + id | val_1 +----+------- + 1 | 10 +(1 row) + +-- Subqueries also supported +SELECT * FROM (SELECT * FROM test_table_1_rf1 FOR UPDATE) foo WHERE id = 1; + id | val_1 +----+------- + 1 | 10 +(1 row) + +DROP TABLE test_table_1_rf1; +DROP TABLE test_table_2_rf1; +DROP TABLE ref_table; +DROP TABLE ref_table_2; +DROP TABLE test_table_3_rf2; +DROP TABLE test_table_4_rf2; diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 4a5ab1392..b19551a3d 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -38,6 +38,7 @@ test: isolation_insert_vs_all test: isolation_insert_select_vs_all test: isolation_upsert_vs_all test: isolation_update_vs_all +test: isolation_select_for_update test: isolation_delete_vs_all test: isolation_truncate_vs_all test: isolation_drop_vs_all diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 89ee73b47..af0de31f8 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -58,7 +58,7 @@ test: multi_subquery_complex_reference_clause multi_subquery_window_functions mu test: multi_subquery_in_where_reference_clause test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql -test: multi_reference_table +test: multi_reference_table multi_select_for_update test: multi_average_expression multi_working_columns multi_having_pushdown test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg diff --git a/src/test/regress/specs/isolation_select_for_update.spec b/src/test/regress/specs/isolation_select_for_update.spec new file mode 100644 index 000000000..517259cb7 --- /dev/null +++ b/src/test/regress/specs/isolation_select_for_update.spec @@ -0,0 +1,159 @@ + +setup +{ + SELECT citus.replace_isolation_tester_func(); + SELECT citus.refresh_isolation_tester_prepared_statement(); + + SET citus.shard_replication_factor to 1; + + CREATE TABLE test_table_1_rf1(id int, val_1 int); + SELECT create_distributed_table('test_table_1_rf1','id'); + INSERT INTO test_table_1_rf1 values(1,2),(2,3),(3,4); + + CREATE VIEW test_1 AS SELECT * FROM test_table_1_rf1 WHERE val_1 = 2; + + CREATE TABLE test_table_2_rf1(id int, val_1 int); + SELECT create_distributed_table('test_table_2_rf1','id'); + INSERT INTO test_table_2_rf1 values(1,2),(2,3),(3,4); + + CREATE TABLE ref_table(id int, val_1 int); + SELECT create_reference_table('ref_table'); + INSERT INTO ref_table values(1,2),(3,4),(5,6); +} + +teardown +{ + DROP TABLE test_table_1_rf1 CASCADE; + DROP TABLE test_table_2_rf1; + DROP TABLE ref_table; + + SELECT citus.restore_isolation_tester_func(); +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-select-from-t1-t2-for-update" +{ + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_2_rf1 as tt2 on tt1.id = tt2.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE; +} + +step "s1-select-from-t1-t2-for-share" +{ + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_2_rf1 as tt2 on tt1.id = tt2.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR SHARE; +} + +step "s1-select-from-t1-rt-for-update" +{ + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE; +} + +step "s1-select-from-t1-rt-with-lc-for-update" +{ + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE + OF rt1; +} + +step "s1-select-from-t1-within-cte" +{ + WITH first_value AS ( SELECT val_1 FROM test_table_1_rf1 WHERE id = 1 FOR UPDATE) + SELECT * FROM first_value; +} + +step "s1-select-from-t1-with-subquery" +{ + SELECT * FROM (SELECT * FROM test_table_1_rf1 FOR UPDATE) foo WHERE id = 1; +} + +step "s1-select-from-rt-with-subquery" +{ + SELECT * FROM (SELECT * FROM ref_table FOR UPDATE) foo WHERE id = 1; +} + +step "s1-select-from-t1-with-view" +{ + SELECT * FROM test_1 WHERE id = 1 FOR UPDATE; +} + +step "s1-finish" +{ + COMMIT; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-update-t1" +{ + UPDATE test_table_1_rf1 SET val_1 = 5 WHERE id = 1; +} + +step "s2-update-rt" +{ + UPDATE ref_table SET val_1 = 5 WHERE id = 1; +} + +step "s2-delete-t1" +{ + DELETE FROM test_table_1_rf1 WHERE id = 1; +} + +step "s2-select-from-t1-t2-for-share" +{ + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR SHARE; +} + +step "s2-select-from-t1-t2-for-update" +{ + SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE; +} + +step "s2-finish" +{ + COMMIT; +} + +permutation "s1-begin" "s1-select-from-t1-t2-for-update" "s2-begin" "s2-update-t1" "s1-finish" "s2-finish" +permutation "s1-begin" "s1-select-from-t1-t2-for-share" "s2-begin" "s2-delete-t1" "s1-finish" "s2-finish" +permutation "s1-begin" "s1-select-from-t1-rt-for-update" "s2-begin" "s2-update-t1" "s1-finish" "s2-finish" +permutation "s1-begin" "s1-select-from-t1-rt-with-lc-for-update" "s2-begin" "s2-update-rt" "s1-finish" "s2-finish" +permutation "s1-begin" "s1-select-from-t1-rt-with-lc-for-update" "s2-begin" "s2-update-t1" "s1-finish" "s2-finish" +permutation "s1-begin" "s1-select-from-t1-t2-for-share" "s2-begin" "s2-select-from-t1-t2-for-share" "s1-finish" "s2-finish" +permutation "s1-begin" "s1-select-from-t1-rt-for-update" "s2-begin" "s2-select-from-t1-t2-for-update" "s1-finish" "s2-finish" +permutation "s1-begin" "s1-select-from-t1-within-cte" "s2-begin" "s2-select-from-t1-t2-for-update" "s1-finish" "s2-finish" +permutation "s1-begin" "s1-select-from-t1-within-cte" "s2-begin" "s2-update-t1" "s1-finish" "s2-finish" +permutation "s1-begin" "s1-select-from-t1-with-subquery" "s2-begin" "s2-update-t1" "s1-finish" "s2-finish" +permutation "s1-begin" "s1-select-from-rt-with-subquery" "s2-begin" "s2-update-rt" "s1-finish" "s2-finish" +permutation "s1-begin" "s1-select-from-t1-with-view" "s2-begin" "s2-update-t1" "s1-finish" "s2-finish" diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 99cf36022..e01437b0c 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -653,6 +653,11 @@ UPDATE reference_summary_table SET average_value = average_query.average FROM ( ) average_query WHERE id = 1; +UPDATE reference_summary_table SET average_value = average_query.average_value FROM ( + SELECT average_value FROM summary_table WHERE id = 1 FOR UPDATE + ) average_query +WHERE id = 1; + UPDATE reference_summary_table SET (min_value, average_value) = (SELECT min(value), avg(value) FROM reference_raw_table WHERE id = 2) WHERE id = 2; diff --git a/src/test/regress/sql/multi_mx_router_planner.sql b/src/test/regress/sql/multi_mx_router_planner.sql index 82620d7b1..8da234cc1 100644 --- a/src/test/regress/sql/multi_mx_router_planner.sql +++ b/src/test/regress/sql/multi_mx_router_planner.sql @@ -62,8 +62,6 @@ INSERT INTO articles_hash_mx VALUES (48, 8, 'alkylic', 18610); INSERT INTO articles_hash_mx VALUES (49, 9, 'anyone', 2681); INSERT INTO articles_hash_mx VALUES (50, 10, 'anjanette', 19519); - - SET citus.task_executor_type TO 'real-time'; SET client_min_messages TO 'DEBUG2'; @@ -409,6 +407,10 @@ SELECT * FROM articles_hash_mx WHERE author_id = (random()::int * 0 + 1); +-- SELECT ... FOR UPDATE does not supported from MX nodes if it contains +-- reference table. +SELECT * FROM customer_mx FOR UPDATE; + -- not router plannable due to or SELECT * FROM articles_hash_mx diff --git a/src/test/regress/sql/multi_select_for_update.sql b/src/test/regress/sql/multi_select_for_update.sql new file mode 100644 index 000000000..26fd14bb7 --- /dev/null +++ b/src/test/regress/sql/multi_select_for_update.sql @@ -0,0 +1,125 @@ +-- +-- MULTI_SIZE_QUERIES +-- +-- Test checks whether size of distributed tables can be obtained with citus_table_size. +-- To find the relation size and total relation size citus_relation_size and +-- citus_total_relation_size are also tested. + +SET citus.next_shard_id TO 1460000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1460000; + +SET citus.shard_replication_factor to 1; + +CREATE TABLE test_table_1_rf1(id int, val_1 int); +SELECT create_distributed_table('test_table_1_rf1','id'); +INSERT INTO test_table_1_rf1 values(1,2),(2,3),(3,4),(15,16); + +CREATE TABLE test_table_2_rf1(id int, val_1 int); +SELECT create_distributed_table('test_table_2_rf1','id'); +INSERT INTO test_table_2_rf1 values(1,2),(2,3),(3,4); + +CREATE TABLE ref_table(id int, val_1 int); +SELECT create_reference_table('ref_table'); +INSERT INTO ref_table values(1,2),(3,4),(5,6); + +CREATE TABLE ref_table_2(id int, val_1 int); +SELECT create_reference_table('ref_table_2'); +INSERT INTO ref_table_2 values(3,4),(5,6),(8,9); + +SET citus.shard_replication_factor to 2; + +CREATE TABLE test_table_3_rf2(id int, val_1 int); +SELECT create_distributed_table('test_table_3_rf2','id'); +INSERT INTO test_table_3_rf2 values(1,2),(2,3),(3,4); + +CREATE TABLE test_table_4_rf2(id int, val_1 int); +SELECT create_distributed_table('test_table_4_rf2','id'); +INSERT INTO test_table_4_rf2 values(1,2),(2,3),(3,4); + +-- Hash tables with RF = 1 is supported for router planner queries +SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE; + +-- May hav two different filters on partition column, if they resulted on the same shard +SELECT * FROM + test_table_1_rf1 as tt1 WHERE tt1.id = 1 OR tt1.id = 15 + ORDER BY 1 + FOR UPDATE; + +-- Only router plannable queries are supported right now. +SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id + ORDER BY 1 + FOR UPDATE; + +-- RF > 1 is not supported +SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_3_rf2 as tt3 on tt1.id = tt3.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE; + +SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN test_table_3_rf2 as tt3 on tt1.id = tt3.id + ORDER BY 1 + FOR UPDATE; + +SELECT * FROM + test_table_3_rf2 as tt3 INNER JOIN test_table_4_rf2 as tt4 on tt3.id = tt4.id + WHERE tt3.id = 1 + ORDER BY 1 + FOR UPDATE; + +-- We take executor shard lock for reference tables +SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR UPDATE; + +-- Lock type varies according to the strength of row level lock +SELECT * FROM + test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id + WHERE tt1.id = 1 + ORDER BY 1 + FOR SHARE; + +-- You can choose which tables should be affected by locking clause +SELECT * FROM + ref_table as rt1 INNER JOIN ref_table_2 as rt2 on rt1.id = rt2.id + ORDER BY 1 + FOR UPDATE + OF rt1; + +-- You can choose which tables should be affected by locking clause +SELECT * FROM + ref_table as rt1 INNER JOIN ref_table_2 as rt2 on rt1.id = rt2.id + ORDER BY 1 + FOR UPDATE + OF rt1 + NOWAIT; + +-- queries with CTEs are supported +WITH first_value AS ( + SELECT val_1 FROM test_table_1_rf1 WHERE id = 1 FOR UPDATE) +SELECT * FROM first_value; + +-- queries with modifying CTEs are also supported +WITH update_table AS ( + UPDATE test_table_1_rf1 SET val_1 = 10 WHERE id = 1 RETURNING * +) +SELECT * FROM update_table FOR UPDATE; + +-- Subqueries also supported +SELECT * FROM (SELECT * FROM test_table_1_rf1 FOR UPDATE) foo WHERE id = 1; + + +DROP TABLE test_table_1_rf1; +DROP TABLE test_table_2_rf1; +DROP TABLE ref_table; +DROP TABLE ref_table_2; +DROP TABLE test_table_3_rf2; +DROP TABLE test_table_4_rf2;