mirror of https://github.com/citusdata/citus.git
Merge pull request #2197 from citusdata/select_update_hash
Adds SELECT ... FOR UPDATE support for router plannable queriespull/2222/head
commit
82829dfdc9
|
@ -199,7 +199,15 @@ RouterCreateScan(CustomScan *scan)
|
|||
/* check whether query has at most one shard */
|
||||
if (list_length(taskList) <= 1)
|
||||
{
|
||||
if (isModificationQuery)
|
||||
List *relationRowLockList = NIL;
|
||||
if (list_length(taskList) == 1)
|
||||
{
|
||||
Task *task = (Task *) linitial(taskList);
|
||||
relationRowLockList = task->relationRowLockList;
|
||||
}
|
||||
|
||||
/* if query is SELECT ... FOR UPDATE query, use modify logic */
|
||||
if (isModificationQuery || relationRowLockList != NIL)
|
||||
{
|
||||
scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods;
|
||||
}
|
||||
|
|
|
@ -151,7 +151,7 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
|
|||
LOCKMODE lockMode = NoLock;
|
||||
int64 shardId = task->anchorShardId;
|
||||
|
||||
if (commandType == CMD_SELECT || list_length(task->taskPlacementList) == 1)
|
||||
if (commandType == CMD_SELECT)
|
||||
{
|
||||
/*
|
||||
* The executor shard lock is used to maintain consistency between
|
||||
|
@ -161,6 +161,28 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
|
|||
|
||||
lockMode = NoLock;
|
||||
}
|
||||
else if (list_length(task->taskPlacementList) == 1)
|
||||
{
|
||||
if (task->replicationModel == REPLICATION_MODEL_2PC)
|
||||
{
|
||||
/*
|
||||
* While we don't need a lock to ensure writes are applied in
|
||||
* a consistent order when there is a single replica. We also use
|
||||
* shard resource locks as a crude implementation of SELECT..
|
||||
* FOR UPDATE on reference tables, so we should always take
|
||||
* a lock that conflicts with the FOR UPDATE/SHARE locks.
|
||||
*/
|
||||
lockMode = RowExclusiveLock;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* When there is no replication, the worker itself can decide on
|
||||
* on the order in which writes are applied.
|
||||
*/
|
||||
lockMode = NoLock;
|
||||
}
|
||||
}
|
||||
else if (AllModificationsCommutative)
|
||||
{
|
||||
/*
|
||||
|
@ -224,6 +246,55 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
|
|||
LockShardResource(shardId, lockMode);
|
||||
}
|
||||
|
||||
/*
|
||||
* If lock clause exists and it effects any reference table, we need to get
|
||||
* lock on shard resource. Type of lock is determined by the type of row lock
|
||||
* given in the query. If the type of row lock is either FOR NO KEY UPDATE or
|
||||
* FOR UPDATE we get ExclusiveLock on shard resource. We get ShareLock if it
|
||||
* is FOR KEY SHARE or FOR KEY SHARE.
|
||||
*
|
||||
* We have selected these lock types according to conflict table given in the
|
||||
* Postgres documentation. It is given that FOR UPDATE and FOR NO KEY UPDATE
|
||||
* must be conflict with each other modify command. By getting ExlcusiveLock
|
||||
* we guarantee that. Note that, getting ExlusiveLock does not mimic the
|
||||
* behaviour of Postgres exactly. Getting row lock with FOR NO KEY UPDATE and
|
||||
* FOR KEY SHARE do not conflicts in Postgres, yet they block each other in
|
||||
* our implementation. Since FOR SHARE and FOR KEY SHARE does not conflict
|
||||
* with each other but conflicts with modify commands, we get ShareLock for
|
||||
* them.
|
||||
*/
|
||||
if (task->relationRowLockList != NIL)
|
||||
{
|
||||
ListCell *rtiLockCell = NULL;
|
||||
LOCKMODE rowLockMode = NoLock;
|
||||
|
||||
foreach(rtiLockCell, task->relationRowLockList)
|
||||
{
|
||||
RelationRowLock *relationRowLock = (RelationRowLock *) lfirst(rtiLockCell);
|
||||
LockClauseStrength rowLockStrength = relationRowLock->rowLockStrength;
|
||||
Oid relationId = relationRowLock->relationId;
|
||||
|
||||
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
ShardInterval *referenceTableShardInterval = (ShardInterval *) linitial(
|
||||
shardIntervalList);
|
||||
|
||||
if (rowLockStrength == LCS_FORKEYSHARE || rowLockStrength == LCS_FORSHARE)
|
||||
{
|
||||
rowLockMode = ShareLock;
|
||||
}
|
||||
else if (rowLockStrength == LCS_FORNOKEYUPDATE || rowLockStrength ==
|
||||
LCS_FORUPDATE)
|
||||
{
|
||||
rowLockMode = ExclusiveLock;
|
||||
}
|
||||
|
||||
LockShardResource(referenceTableShardInterval->shardId, rowLockMode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* If the task has a subselect, then we may need to lock the shards from which
|
||||
* the query selects as well to prevent the subselects from seeing different
|
||||
|
@ -462,6 +533,7 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
|
|||
/*
|
||||
* RouterSequentialModifyExecScan executes 0 or more modifications on a
|
||||
* distributed table sequentially and returns results if there are any.
|
||||
* Note that we also use this path for SELECT ... FOR UPDATE queries.
|
||||
*/
|
||||
TupleTableSlot *
|
||||
RouterSequentialModifyExecScan(CustomScanState *node)
|
||||
|
@ -522,9 +594,13 @@ RouterSequentialModifyExecScan(CustomScanState *node)
|
|||
{
|
||||
Task *task = (Task *) lfirst(taskCell);
|
||||
|
||||
/*
|
||||
* Result is expected for SELECT ... FOR UPDATE queries as well.
|
||||
*/
|
||||
executorState->es_processed +=
|
||||
ExecuteSingleModifyTask(scanState, task, operation,
|
||||
alwaysThrowErrorOnFailure, hasReturning);
|
||||
alwaysThrowErrorOnFailure,
|
||||
hasReturning || task->relationRowLockList != NIL);
|
||||
}
|
||||
|
||||
scanState->finishedRemoteScan = true;
|
||||
|
|
|
@ -139,9 +139,8 @@ static List * ExtractInsertValuesList(Query *query, Var *partitionColumn);
|
|||
static bool MultiRouterPlannableQuery(Query *query,
|
||||
RelationRestrictionContext *restrictionContext);
|
||||
static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree);
|
||||
static RangeTblEntry * GetUpdateOrDeleteRTE(List *rangeTableList);
|
||||
static bool UpdateOrDeleteRTE(RangeTblEntry *rangeTableEntry);
|
||||
static bool SelectsFromDistributedTable(List *rangeTableList);
|
||||
static RangeTblEntry * GetUpdateOrDeleteRTE(Query *query);
|
||||
static bool SelectsFromDistributedTable(List *rangeTableList, Query *query);
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
static List * get_all_actual_clauses(List *restrictinfo_list);
|
||||
#endif
|
||||
|
@ -150,6 +149,7 @@ static int CompareInsertValuesByShardId(const void *leftElement,
|
|||
static uint64 GetInitialShardId(List *relationShardList);
|
||||
static List * SingleShardSelectTaskList(Query *query, List *relationShardList,
|
||||
List *placementList, uint64 shardId);
|
||||
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
|
||||
static List * SingleShardModifyTaskList(Query *query, List *relationShardList,
|
||||
List *placementList, uint64 shardId);
|
||||
|
||||
|
@ -1490,6 +1490,7 @@ CreateTask(TaskType taskType)
|
|||
task->taskExecution = NULL;
|
||||
task->upsertQuery = false;
|
||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||
task->relationRowLockList = NIL;
|
||||
|
||||
task->modifyWithSubquery = false;
|
||||
task->relationShardList = NIL;
|
||||
|
@ -1540,7 +1541,6 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
|||
uint64 shardId = INVALID_SHARD_ID;
|
||||
List *placementList = NIL;
|
||||
List *relationShardList = NIL;
|
||||
List *rangeTableList = NIL;
|
||||
bool replacePrunedQueryWithDummy = false;
|
||||
bool requiresMasterEvaluation = false;
|
||||
RangeTblEntry *updateOrDeleteRTE = NULL;
|
||||
|
@ -1566,8 +1566,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
|||
job = CreateJob(originalQuery);
|
||||
job->partitionValueConst = partitionValueConst;
|
||||
|
||||
ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList);
|
||||
updateOrDeleteRTE = GetUpdateOrDeleteRTE(rangeTableList);
|
||||
updateOrDeleteRTE = GetUpdateOrDeleteRTE(originalQuery);
|
||||
|
||||
/*
|
||||
* If all of the shards are pruned, we replace the relation RTE into
|
||||
|
@ -1616,18 +1615,62 @@ SingleShardSelectTaskList(Query *query, List *relationShardList, List *placement
|
|||
{
|
||||
Task *task = CreateTask(ROUTER_TASK);
|
||||
StringInfo queryString = makeStringInfo();
|
||||
List *relationRowLockList = NIL;
|
||||
|
||||
RowLocksOnRelations((Node *) query, &relationRowLockList);
|
||||
pg_get_query_def(query, queryString);
|
||||
|
||||
task->queryString = queryString->data;
|
||||
task->anchorShardId = shardId;
|
||||
task->taskPlacementList = placementList;
|
||||
task->relationShardList = relationShardList;
|
||||
task->relationRowLockList = relationRowLockList;
|
||||
|
||||
return list_make1(task);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RowLocksOnRelations forms the list for range table IDs and corresponding
|
||||
* row lock modes.
|
||||
*/
|
||||
static bool
|
||||
RowLocksOnRelations(Node *node, List **relationRowLockList)
|
||||
{
|
||||
if (node == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (IsA(node, Query))
|
||||
{
|
||||
Query *query = (Query *) node;
|
||||
ListCell *rowMarkCell = NULL;
|
||||
|
||||
foreach(rowMarkCell, query->rowMarks)
|
||||
{
|
||||
RowMarkClause *rowMarkClause = (RowMarkClause *) lfirst(rowMarkCell);
|
||||
RangeTblEntry *rangeTable = rt_fetch(rowMarkClause->rti, query->rtable);
|
||||
Oid relationId = rangeTable->relid;
|
||||
|
||||
if (IsDistributedTable(relationId))
|
||||
{
|
||||
RelationRowLock *relationRowLock = CitusMakeNode(RelationRowLock);
|
||||
relationRowLock->relationId = relationId;
|
||||
relationRowLock->rowLockStrength = rowMarkClause->strength;
|
||||
*relationRowLockList = lappend(*relationRowLockList, relationRowLock);
|
||||
}
|
||||
}
|
||||
|
||||
return query_tree_walker(query, RowLocksOnRelations, relationRowLockList, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
return expression_tree_walker(node, RowLocksOnRelations, relationRowLockList);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SingleShardModifyTaskList generates a task for single shard update/delete query
|
||||
* and returns it as a list.
|
||||
|
@ -1644,13 +1687,13 @@ SingleShardModifyTaskList(Query *query, List *relationShardList, List *placement
|
|||
RangeTblEntry *updateOrDeleteRTE = NULL;
|
||||
|
||||
ExtractRangeTableEntryWalker((Node *) query, &rangeTableList);
|
||||
updateOrDeleteRTE = GetUpdateOrDeleteRTE(rangeTableList);
|
||||
updateOrDeleteRTE = GetUpdateOrDeleteRTE(query);
|
||||
|
||||
modificationTableCacheEntry = DistributedTableCacheEntry(updateOrDeleteRTE->relid);
|
||||
modificationPartitionMethod = modificationTableCacheEntry->partitionMethod;
|
||||
|
||||
if (modificationPartitionMethod == DISTRIBUTE_BY_NONE &&
|
||||
SelectsFromDistributedTable(rangeTableList))
|
||||
SelectsFromDistributedTable(rangeTableList, query))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot perform select on a distributed table "
|
||||
|
@ -1670,55 +1713,37 @@ SingleShardModifyTaskList(Query *query, List *relationShardList, List *placement
|
|||
|
||||
|
||||
/*
|
||||
* GetUpdateOrDeleteRTE walks over the given range table list, and checks if
|
||||
* it has an UPDATE or DELETE RTE. If it finds one, it return it immediately.
|
||||
* GetUpdateOrDeleteRTE checks query if it has an UPDATE or DELETE RTE. If it finds
|
||||
* it returns it.
|
||||
*/
|
||||
static RangeTblEntry *
|
||||
GetUpdateOrDeleteRTE(List *rangeTableList)
|
||||
GetUpdateOrDeleteRTE(Query *query)
|
||||
{
|
||||
ListCell *rangeTableCell = NULL;
|
||||
|
||||
foreach(rangeTableCell, rangeTableList)
|
||||
if (query->resultRelation > 0)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||
|
||||
if (UpdateOrDeleteRTE(rangeTableEntry))
|
||||
{
|
||||
return rangeTableEntry;
|
||||
}
|
||||
return rt_fetch(query->resultRelation, query->rtable);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateOrDeleteRTE checks if the given range table entry is an UPDATE or
|
||||
* DELETE RTE by checking required permissions on it.
|
||||
*/
|
||||
static bool
|
||||
UpdateOrDeleteRTE(RangeTblEntry *rangeTableEntry)
|
||||
{
|
||||
if ((ACL_UPDATE & rangeTableEntry->requiredPerms) ||
|
||||
(ACL_DELETE & rangeTableEntry->requiredPerms))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SelectsFromDistributedTable checks if there is a select on a distributed
|
||||
* table by looking into range table entries.
|
||||
*/
|
||||
static bool
|
||||
SelectsFromDistributedTable(List *rangeTableList)
|
||||
SelectsFromDistributedTable(List *rangeTableList, Query *query)
|
||||
{
|
||||
ListCell *rangeTableCell = NULL;
|
||||
int resultRelation = query->resultRelation;
|
||||
RangeTblEntry *resultRangeTableEntry = NULL;
|
||||
|
||||
if (resultRelation > 0)
|
||||
{
|
||||
resultRangeTableEntry = rt_fetch(resultRelation, query->rtable);
|
||||
}
|
||||
|
||||
foreach(rangeTableCell, rangeTableList)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||
|
@ -1731,7 +1756,8 @@ SelectsFromDistributedTable(List *rangeTableList)
|
|||
|
||||
cacheEntry = DistributedTableCacheEntry(rangeTableEntry->relid);
|
||||
if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE &&
|
||||
!UpdateOrDeleteRTE(rangeTableEntry))
|
||||
(resultRangeTableEntry == NULL || resultRangeTableEntry->relid !=
|
||||
rangeTableEntry->relid))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
@ -2740,11 +2766,6 @@ MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionC
|
|||
return false;
|
||||
}
|
||||
|
||||
if (query->hasForUpdate)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
foreach(relationRestrictionContextCell, restrictionContext->relationRestrictionList)
|
||||
{
|
||||
RelationRestriction *relationRestriction =
|
||||
|
@ -2768,6 +2789,28 @@ MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionC
|
|||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Currently, we don't support tables with replication factor > 1,
|
||||
* except reference tables with SELECT ... FOR UDPATE queries. It is
|
||||
* also not supported from MX nodes.
|
||||
*/
|
||||
if (query->hasForUpdate)
|
||||
{
|
||||
uint32 tableReplicationFactor = TableShardReplicationFactor(
|
||||
distributedTableId);
|
||||
|
||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
EnsureCoordinator();
|
||||
}
|
||||
|
||||
|
||||
if (tableReplicationFactor > 1 && partitionMethod != DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -227,6 +227,16 @@ CopyNodeRelationShard(COPYFUNC_ARGS)
|
|||
}
|
||||
|
||||
|
||||
void
|
||||
CopyNodeRelationRowLock(COPYFUNC_ARGS)
|
||||
{
|
||||
DECLARE_FROM_AND_NEW_NODE(RelationRowLock);
|
||||
|
||||
COPY_SCALAR_FIELD(relationId);
|
||||
COPY_SCALAR_FIELD(rowLockStrength);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
CopyNodeTask(COPYFUNC_ARGS)
|
||||
{
|
||||
|
@ -248,6 +258,7 @@ CopyNodeTask(COPYFUNC_ARGS)
|
|||
COPY_SCALAR_FIELD(replicationModel);
|
||||
COPY_SCALAR_FIELD(modifyWithSubquery);
|
||||
COPY_NODE_FIELD(relationShardList);
|
||||
COPY_NODE_FIELD(relationRowLockList);
|
||||
COPY_NODE_FIELD(rowValuesLists);
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "distributed/errormessage.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/distributed_planner.h"
|
||||
#include "distributed/multi_router_planner.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
|
||||
static const char *CitusNodeTagNamesD[] = {
|
||||
|
@ -38,6 +39,7 @@ static const char *CitusNodeTagNamesD[] = {
|
|||
"ShardInterval",
|
||||
"ShardPlacement",
|
||||
"RelationShard",
|
||||
"RelationRowLock",
|
||||
"DeferredErrorMessage",
|
||||
"GroupShardPlacement"
|
||||
};
|
||||
|
@ -393,6 +395,7 @@ const ExtensibleNodeMethods nodeMethods[] =
|
|||
DEFINE_NODE_METHODS(MapMergeJob),
|
||||
DEFINE_NODE_METHODS(ShardPlacement),
|
||||
DEFINE_NODE_METHODS(RelationShard),
|
||||
DEFINE_NODE_METHODS(RelationRowLock),
|
||||
DEFINE_NODE_METHODS(Task),
|
||||
DEFINE_NODE_METHODS(TaskExecution),
|
||||
DEFINE_NODE_METHODS(DeferredErrorMessage),
|
||||
|
|
|
@ -435,6 +435,17 @@ OutRelationShard(OUTFUNC_ARGS)
|
|||
}
|
||||
|
||||
|
||||
void
|
||||
OutRelationRowLock(OUTFUNC_ARGS)
|
||||
{
|
||||
WRITE_LOCALS(RelationRowLock);
|
||||
WRITE_NODE_TYPE("RELATIONROWLOCK");
|
||||
|
||||
WRITE_OID_FIELD(relationId);
|
||||
WRITE_ENUM_FIELD(rowLockStrength, LockClauseStrength);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
OutTask(OUTFUNC_ARGS)
|
||||
{
|
||||
|
@ -457,6 +468,7 @@ OutTask(OUTFUNC_ARGS)
|
|||
WRITE_CHAR_FIELD(replicationModel);
|
||||
WRITE_BOOL_FIELD(modifyWithSubquery);
|
||||
WRITE_NODE_FIELD(relationShardList);
|
||||
WRITE_NODE_FIELD(relationRowLockList);
|
||||
WRITE_NODE_FIELD(rowValuesLists);
|
||||
}
|
||||
|
||||
|
|
|
@ -349,6 +349,18 @@ ReadRelationShard(READFUNC_ARGS)
|
|||
}
|
||||
|
||||
|
||||
READFUNC_RET
|
||||
ReadRelationRowLock(READFUNC_ARGS)
|
||||
{
|
||||
READ_LOCALS(RelationRowLock);
|
||||
|
||||
READ_OID_FIELD(relationId);
|
||||
READ_ENUM_FIELD(rowLockStrength, LockClauseStrength);
|
||||
|
||||
READ_DONE();
|
||||
}
|
||||
|
||||
|
||||
READFUNC_RET
|
||||
ReadTask(READFUNC_ARGS)
|
||||
{
|
||||
|
@ -370,6 +382,7 @@ ReadTask(READFUNC_ARGS)
|
|||
READ_CHAR_FIELD(replicationModel);
|
||||
READ_BOOL_FIELD(modifyWithSubquery);
|
||||
READ_NODE_FIELD(relationShardList);
|
||||
READ_NODE_FIELD(relationRowLockList);
|
||||
READ_NODE_FIELD(rowValuesLists);
|
||||
|
||||
READ_DONE();
|
||||
|
|
|
@ -49,6 +49,7 @@ extern READFUNC_RET ReadShardInterval(READFUNC_ARGS);
|
|||
extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS);
|
||||
extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
|
||||
extern READFUNC_RET ReadRelationShard(READFUNC_ARGS);
|
||||
extern READFUNC_RET ReadRelationRowLock(READFUNC_ARGS);
|
||||
extern READFUNC_RET ReadTask(READFUNC_ARGS);
|
||||
extern READFUNC_RET ReadTaskExecution(READFUNC_ARGS);
|
||||
extern READFUNC_RET ReadDeferredErrorMessage(READFUNC_ARGS);
|
||||
|
@ -63,6 +64,7 @@ extern void OutShardInterval(OUTFUNC_ARGS);
|
|||
extern void OutMapMergeJob(OUTFUNC_ARGS);
|
||||
extern void OutShardPlacement(OUTFUNC_ARGS);
|
||||
extern void OutRelationShard(OUTFUNC_ARGS);
|
||||
extern void OutRelationRowLock(OUTFUNC_ARGS);
|
||||
extern void OutTask(OUTFUNC_ARGS);
|
||||
extern void OutTaskExecution(OUTFUNC_ARGS);
|
||||
extern void OutDeferredErrorMessage(OUTFUNC_ARGS);
|
||||
|
@ -87,6 +89,7 @@ extern void CopyNodeMapMergeJob(COPYFUNC_ARGS);
|
|||
extern void CopyNodeShardPlacement(COPYFUNC_ARGS);
|
||||
extern void CopyNodeGroupShardPlacement(COPYFUNC_ARGS);
|
||||
extern void CopyNodeRelationShard(COPYFUNC_ARGS);
|
||||
extern void CopyNodeRelationRowLock(COPYFUNC_ARGS);
|
||||
extern void CopyNodeTask(COPYFUNC_ARGS);
|
||||
extern void CopyNodeTaskExecution(COPYFUNC_ARGS);
|
||||
extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS);
|
||||
|
|
|
@ -62,6 +62,7 @@ typedef enum CitusNodeTag
|
|||
T_ShardInterval,
|
||||
T_ShardPlacement,
|
||||
T_RelationShard,
|
||||
T_RelationRowLock,
|
||||
T_DeferredErrorMessage,
|
||||
T_GroupShardPlacement
|
||||
} CitusNodeTag;
|
||||
|
|
|
@ -73,6 +73,13 @@ typedef struct RelationShard
|
|||
uint64 shardId;
|
||||
} RelationShard;
|
||||
|
||||
typedef struct RelationRowLock
|
||||
{
|
||||
CitusNode type;
|
||||
Oid relationId;
|
||||
LockClauseStrength rowLockStrength;
|
||||
} RelationRowLock;
|
||||
|
||||
|
||||
extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions,
|
||||
ParamListInfo boundParams);
|
||||
|
|
|
@ -186,6 +186,7 @@ typedef struct Task
|
|||
bool upsertQuery; /* only applies to modify tasks */
|
||||
char replicationModel; /* only applies to modify tasks */
|
||||
|
||||
List *relationRowLockList;
|
||||
bool modifyWithSubquery;
|
||||
List *relationShardList;
|
||||
|
||||
|
|
|
@ -0,0 +1,376 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s1-select-from-t1-t2-for-update s2-begin s2-update-t1 s1-finish s2-finish
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-select-from-t1-t2-for-update:
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_2_rf1 as tt2 on tt1.id = tt2.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
|
||||
id val_1 id val_1
|
||||
|
||||
1 2 1 2
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-update-t1:
|
||||
UPDATE test_table_1_rf1 SET val_1 = 5 WHERE id = 1;
|
||||
<waiting ...>
|
||||
step s1-finish:
|
||||
COMMIT;
|
||||
|
||||
step s2-update-t1: <... completed>
|
||||
step s2-finish:
|
||||
COMMIT;
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-select-from-t1-t2-for-share s2-begin s2-delete-t1 s1-finish s2-finish
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-select-from-t1-t2-for-share:
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_2_rf1 as tt2 on tt1.id = tt2.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR SHARE;
|
||||
|
||||
id val_1 id val_1
|
||||
|
||||
1 2 1 2
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-delete-t1:
|
||||
DELETE FROM test_table_1_rf1 WHERE id = 1;
|
||||
<waiting ...>
|
||||
step s1-finish:
|
||||
COMMIT;
|
||||
|
||||
step s2-delete-t1: <... completed>
|
||||
step s2-finish:
|
||||
COMMIT;
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-select-from-t1-rt-for-update s2-begin s2-update-t1 s1-finish s2-finish
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-select-from-t1-rt-for-update:
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
|
||||
id val_1 id val_1
|
||||
|
||||
1 2 1 2
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-update-t1:
|
||||
UPDATE test_table_1_rf1 SET val_1 = 5 WHERE id = 1;
|
||||
<waiting ...>
|
||||
step s1-finish:
|
||||
COMMIT;
|
||||
|
||||
step s2-update-t1: <... completed>
|
||||
step s2-finish:
|
||||
COMMIT;
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-select-from-t1-rt-with-lc-for-update s2-begin s2-update-rt s1-finish s2-finish
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-select-from-t1-rt-with-lc-for-update:
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE
|
||||
OF rt1;
|
||||
|
||||
id val_1 id val_1
|
||||
|
||||
1 2 1 2
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-update-rt:
|
||||
UPDATE ref_table SET val_1 = 5 WHERE id = 1;
|
||||
<waiting ...>
|
||||
step s1-finish:
|
||||
COMMIT;
|
||||
|
||||
step s2-update-rt: <... completed>
|
||||
step s2-finish:
|
||||
COMMIT;
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-select-from-t1-rt-with-lc-for-update s2-begin s2-update-t1 s1-finish s2-finish
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-select-from-t1-rt-with-lc-for-update:
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE
|
||||
OF rt1;
|
||||
|
||||
id val_1 id val_1
|
||||
|
||||
1 2 1 2
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-update-t1:
|
||||
UPDATE test_table_1_rf1 SET val_1 = 5 WHERE id = 1;
|
||||
|
||||
step s1-finish:
|
||||
COMMIT;
|
||||
|
||||
step s2-finish:
|
||||
COMMIT;
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-select-from-t1-t2-for-share s2-begin s2-select-from-t1-t2-for-share s1-finish s2-finish
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-select-from-t1-t2-for-share:
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_2_rf1 as tt2 on tt1.id = tt2.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR SHARE;
|
||||
|
||||
id val_1 id val_1
|
||||
|
||||
1 2 1 2
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-select-from-t1-t2-for-share:
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR SHARE;
|
||||
|
||||
id val_1 id val_1
|
||||
|
||||
1 2 1 2
|
||||
step s1-finish:
|
||||
COMMIT;
|
||||
|
||||
step s2-finish:
|
||||
COMMIT;
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-select-from-t1-rt-for-update s2-begin s2-select-from-t1-t2-for-update s1-finish s2-finish
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-select-from-t1-rt-for-update:
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
|
||||
id val_1 id val_1
|
||||
|
||||
1 2 1 2
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-select-from-t1-t2-for-update:
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
<waiting ...>
|
||||
step s1-finish:
|
||||
COMMIT;
|
||||
|
||||
step s2-select-from-t1-t2-for-update: <... completed>
|
||||
id val_1 id val_1
|
||||
|
||||
1 2 1 2
|
||||
step s2-finish:
|
||||
COMMIT;
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-select-from-t1-within-cte s2-begin s2-select-from-t1-t2-for-update s1-finish s2-finish
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-select-from-t1-within-cte:
|
||||
WITH first_value AS ( SELECT val_1 FROM test_table_1_rf1 WHERE id = 1 FOR UPDATE)
|
||||
SELECT * FROM first_value;
|
||||
|
||||
val_1
|
||||
|
||||
2
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-select-from-t1-t2-for-update:
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
<waiting ...>
|
||||
step s1-finish:
|
||||
COMMIT;
|
||||
|
||||
step s2-select-from-t1-t2-for-update: <... completed>
|
||||
id val_1 id val_1
|
||||
|
||||
1 2 1 2
|
||||
step s2-finish:
|
||||
COMMIT;
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-select-from-t1-within-cte s2-begin s2-update-t1 s1-finish s2-finish
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-select-from-t1-within-cte:
|
||||
WITH first_value AS ( SELECT val_1 FROM test_table_1_rf1 WHERE id = 1 FOR UPDATE)
|
||||
SELECT * FROM first_value;
|
||||
|
||||
val_1
|
||||
|
||||
2
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-update-t1:
|
||||
UPDATE test_table_1_rf1 SET val_1 = 5 WHERE id = 1;
|
||||
<waiting ...>
|
||||
step s1-finish:
|
||||
COMMIT;
|
||||
|
||||
step s2-update-t1: <... completed>
|
||||
step s2-finish:
|
||||
COMMIT;
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-select-from-t1-with-subquery s2-begin s2-update-t1 s1-finish s2-finish
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-select-from-t1-with-subquery:
|
||||
SELECT * FROM (SELECT * FROM test_table_1_rf1 FOR UPDATE) foo WHERE id = 1;
|
||||
|
||||
id val_1
|
||||
|
||||
1 2
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-update-t1:
|
||||
UPDATE test_table_1_rf1 SET val_1 = 5 WHERE id = 1;
|
||||
<waiting ...>
|
||||
step s1-finish:
|
||||
COMMIT;
|
||||
|
||||
step s2-update-t1: <... completed>
|
||||
step s2-finish:
|
||||
COMMIT;
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-select-from-rt-with-subquery s2-begin s2-update-rt s1-finish s2-finish
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-select-from-rt-with-subquery:
|
||||
SELECT * FROM (SELECT * FROM ref_table FOR UPDATE) foo WHERE id = 1;
|
||||
|
||||
id val_1
|
||||
|
||||
1 2
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-update-rt:
|
||||
UPDATE ref_table SET val_1 = 5 WHERE id = 1;
|
||||
<waiting ...>
|
||||
step s1-finish:
|
||||
COMMIT;
|
||||
|
||||
step s2-update-rt: <... completed>
|
||||
step s2-finish:
|
||||
COMMIT;
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-select-from-t1-with-view s2-begin s2-update-t1 s1-finish s2-finish
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-select-from-t1-with-view:
|
||||
SELECT * FROM test_1 WHERE id = 1 FOR UPDATE;
|
||||
|
||||
id val_1
|
||||
|
||||
1 2
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-update-t1:
|
||||
UPDATE test_table_1_rf1 SET val_1 = 5 WHERE id = 1;
|
||||
<waiting ...>
|
||||
step s1-finish:
|
||||
COMMIT;
|
||||
|
||||
step s2-update-t1: <... completed>
|
||||
step s2-finish:
|
||||
COMMIT;
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
|
@ -1047,6 +1047,11 @@ UPDATE reference_summary_table SET average_value = average_query.average FROM (
|
|||
SELECT avg(value) AS average FROM reference_raw_table WHERE id = 1
|
||||
) average_query
|
||||
WHERE id = 1;
|
||||
UPDATE reference_summary_table SET average_value = average_query.average_value FROM (
|
||||
SELECT average_value FROM summary_table WHERE id = 1 FOR UPDATE
|
||||
) average_query
|
||||
WHERE id = 1;
|
||||
ERROR: cannot perform select on a distributed table and modify a reference table
|
||||
UPDATE reference_summary_table SET (min_value, average_value) =
|
||||
(SELECT min(value), avg(value) FROM reference_raw_table WHERE id = 2)
|
||||
WHERE id = 2;
|
||||
|
|
|
@ -1047,6 +1047,11 @@ UPDATE reference_summary_table SET average_value = average_query.average FROM (
|
|||
SELECT avg(value) AS average FROM reference_raw_table WHERE id = 1
|
||||
) average_query
|
||||
WHERE id = 1;
|
||||
UPDATE reference_summary_table SET average_value = average_query.average_value FROM (
|
||||
SELECT average_value FROM summary_table WHERE id = 1 FOR UPDATE
|
||||
) average_query
|
||||
WHERE id = 1;
|
||||
ERROR: cannot perform select on a distributed table and modify a reference table
|
||||
UPDATE reference_summary_table SET (min_value, average_value) =
|
||||
(SELECT min(value), avg(value) FROM reference_raw_table WHERE id = 2)
|
||||
WHERE id = 2;
|
||||
|
|
|
@ -930,6 +930,11 @@ SELECT *
|
|||
41 | 1 | aznavour | 11814
|
||||
(5 rows)
|
||||
|
||||
-- SELECT ... FOR UPDATE does not supported from MX nodes if it contains
|
||||
-- reference table.
|
||||
SELECT * FROM customer_mx FOR UPDATE;
|
||||
ERROR: operation is not allowed on this node
|
||||
HINT: Connect to the coordinator and run it again.
|
||||
-- not router plannable due to or
|
||||
SELECT *
|
||||
FROM articles_hash_mx
|
||||
|
|
|
@ -0,0 +1,185 @@
|
|||
--
|
||||
-- MULTI_SIZE_QUERIES
|
||||
--
|
||||
-- Test checks whether size of distributed tables can be obtained with citus_table_size.
|
||||
-- To find the relation size and total relation size citus_relation_size and
|
||||
-- citus_total_relation_size are also tested.
|
||||
SET citus.next_shard_id TO 1460000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1460000;
|
||||
SET citus.shard_replication_factor to 1;
|
||||
CREATE TABLE test_table_1_rf1(id int, val_1 int);
|
||||
SELECT create_distributed_table('test_table_1_rf1','id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test_table_1_rf1 values(1,2),(2,3),(3,4),(15,16);
|
||||
CREATE TABLE test_table_2_rf1(id int, val_1 int);
|
||||
SELECT create_distributed_table('test_table_2_rf1','id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test_table_2_rf1 values(1,2),(2,3),(3,4);
|
||||
CREATE TABLE ref_table(id int, val_1 int);
|
||||
SELECT create_reference_table('ref_table');
|
||||
create_reference_table
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO ref_table values(1,2),(3,4),(5,6);
|
||||
CREATE TABLE ref_table_2(id int, val_1 int);
|
||||
SELECT create_reference_table('ref_table_2');
|
||||
create_reference_table
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO ref_table_2 values(3,4),(5,6),(8,9);
|
||||
SET citus.shard_replication_factor to 2;
|
||||
CREATE TABLE test_table_3_rf2(id int, val_1 int);
|
||||
SELECT create_distributed_table('test_table_3_rf2','id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test_table_3_rf2 values(1,2),(2,3),(3,4);
|
||||
CREATE TABLE test_table_4_rf2(id int, val_1 int);
|
||||
SELECT create_distributed_table('test_table_4_rf2','id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test_table_4_rf2 values(1,2),(2,3),(3,4);
|
||||
-- Hash tables with RF = 1 is supported for router planner queries
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
id | val_1 | id | val_1
|
||||
----+-------+----+-------
|
||||
1 | 2 | 1 | 2
|
||||
(1 row)
|
||||
|
||||
-- May hav two different filters on partition column, if they resulted on the same shard
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 WHERE tt1.id = 1 OR tt1.id = 15
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
id | val_1
|
||||
----+-------
|
||||
1 | 2
|
||||
15 | 16
|
||||
(2 rows)
|
||||
|
||||
-- Only router plannable queries are supported right now.
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
ERROR: could not run distributed query with FOR UPDATE/SHARE commands
|
||||
HINT: Consider using an equality filter on the distributed table's partition column.
|
||||
-- RF > 1 is not supported
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_3_rf2 as tt3 on tt1.id = tt3.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
ERROR: could not run distributed query with FOR UPDATE/SHARE commands
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_3_rf2 as tt3 on tt1.id = tt3.id
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
ERROR: could not run distributed query with FOR UPDATE/SHARE commands
|
||||
SELECT * FROM
|
||||
test_table_3_rf2 as tt3 INNER JOIN test_table_4_rf2 as tt4 on tt3.id = tt4.id
|
||||
WHERE tt3.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
ERROR: could not run distributed query with FOR UPDATE/SHARE commands
|
||||
HINT: Consider using an equality filter on the distributed table's partition column.
|
||||
-- We take executor shard lock for reference tables
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
id | val_1 | id | val_1
|
||||
----+-------+----+-------
|
||||
1 | 2 | 1 | 2
|
||||
(1 row)
|
||||
|
||||
-- Lock type varies according to the strength of row level lock
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR SHARE;
|
||||
id | val_1 | id | val_1
|
||||
----+-------+----+-------
|
||||
1 | 2 | 1 | 2
|
||||
(1 row)
|
||||
|
||||
-- You can choose which tables should be affected by locking clause
|
||||
SELECT * FROM
|
||||
ref_table as rt1 INNER JOIN ref_table_2 as rt2 on rt1.id = rt2.id
|
||||
ORDER BY 1
|
||||
FOR UPDATE
|
||||
OF rt1;
|
||||
id | val_1 | id | val_1
|
||||
----+-------+----+-------
|
||||
3 | 4 | 3 | 4
|
||||
5 | 6 | 5 | 6
|
||||
(2 rows)
|
||||
|
||||
-- You can choose which tables should be affected by locking clause
|
||||
SELECT * FROM
|
||||
ref_table as rt1 INNER JOIN ref_table_2 as rt2 on rt1.id = rt2.id
|
||||
ORDER BY 1
|
||||
FOR UPDATE
|
||||
OF rt1
|
||||
NOWAIT;
|
||||
id | val_1 | id | val_1
|
||||
----+-------+----+-------
|
||||
3 | 4 | 3 | 4
|
||||
5 | 6 | 5 | 6
|
||||
(2 rows)
|
||||
|
||||
-- queries with CTEs are supported
|
||||
WITH first_value AS (
|
||||
SELECT val_1 FROM test_table_1_rf1 WHERE id = 1 FOR UPDATE)
|
||||
SELECT * FROM first_value;
|
||||
val_1
|
||||
-------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
-- queries with modifying CTEs are also supported
|
||||
WITH update_table AS (
|
||||
UPDATE test_table_1_rf1 SET val_1 = 10 WHERE id = 1 RETURNING *
|
||||
)
|
||||
SELECT * FROM update_table FOR UPDATE;
|
||||
id | val_1
|
||||
----+-------
|
||||
1 | 10
|
||||
(1 row)
|
||||
|
||||
-- Subqueries also supported
|
||||
SELECT * FROM (SELECT * FROM test_table_1_rf1 FOR UPDATE) foo WHERE id = 1;
|
||||
id | val_1
|
||||
----+-------
|
||||
1 | 10
|
||||
(1 row)
|
||||
|
||||
DROP TABLE test_table_1_rf1;
|
||||
DROP TABLE test_table_2_rf1;
|
||||
DROP TABLE ref_table;
|
||||
DROP TABLE ref_table_2;
|
||||
DROP TABLE test_table_3_rf2;
|
||||
DROP TABLE test_table_4_rf2;
|
|
@ -38,6 +38,7 @@ test: isolation_insert_vs_all
|
|||
test: isolation_insert_select_vs_all
|
||||
test: isolation_upsert_vs_all
|
||||
test: isolation_update_vs_all
|
||||
test: isolation_select_for_update
|
||||
test: isolation_delete_vs_all
|
||||
test: isolation_truncate_vs_all
|
||||
test: isolation_drop_vs_all
|
||||
|
|
|
@ -58,7 +58,7 @@ test: multi_subquery_complex_reference_clause multi_subquery_window_functions mu
|
|||
test: multi_subquery_in_where_reference_clause
|
||||
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
|
||||
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql
|
||||
test: multi_reference_table
|
||||
test: multi_reference_table multi_select_for_update
|
||||
test: multi_average_expression multi_working_columns multi_having_pushdown
|
||||
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
|
||||
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg
|
||||
|
|
|
@ -0,0 +1,159 @@
|
|||
|
||||
setup
|
||||
{
|
||||
SELECT citus.replace_isolation_tester_func();
|
||||
SELECT citus.refresh_isolation_tester_prepared_statement();
|
||||
|
||||
SET citus.shard_replication_factor to 1;
|
||||
|
||||
CREATE TABLE test_table_1_rf1(id int, val_1 int);
|
||||
SELECT create_distributed_table('test_table_1_rf1','id');
|
||||
INSERT INTO test_table_1_rf1 values(1,2),(2,3),(3,4);
|
||||
|
||||
CREATE VIEW test_1 AS SELECT * FROM test_table_1_rf1 WHERE val_1 = 2;
|
||||
|
||||
CREATE TABLE test_table_2_rf1(id int, val_1 int);
|
||||
SELECT create_distributed_table('test_table_2_rf1','id');
|
||||
INSERT INTO test_table_2_rf1 values(1,2),(2,3),(3,4);
|
||||
|
||||
CREATE TABLE ref_table(id int, val_1 int);
|
||||
SELECT create_reference_table('ref_table');
|
||||
INSERT INTO ref_table values(1,2),(3,4),(5,6);
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE test_table_1_rf1 CASCADE;
|
||||
DROP TABLE test_table_2_rf1;
|
||||
DROP TABLE ref_table;
|
||||
|
||||
SELECT citus.restore_isolation_tester_func();
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
step "s1-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s1-select-from-t1-t2-for-update"
|
||||
{
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_2_rf1 as tt2 on tt1.id = tt2.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
}
|
||||
|
||||
step "s1-select-from-t1-t2-for-share"
|
||||
{
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_2_rf1 as tt2 on tt1.id = tt2.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR SHARE;
|
||||
}
|
||||
|
||||
step "s1-select-from-t1-rt-for-update"
|
||||
{
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
}
|
||||
|
||||
step "s1-select-from-t1-rt-with-lc-for-update"
|
||||
{
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE
|
||||
OF rt1;
|
||||
}
|
||||
|
||||
step "s1-select-from-t1-within-cte"
|
||||
{
|
||||
WITH first_value AS ( SELECT val_1 FROM test_table_1_rf1 WHERE id = 1 FOR UPDATE)
|
||||
SELECT * FROM first_value;
|
||||
}
|
||||
|
||||
step "s1-select-from-t1-with-subquery"
|
||||
{
|
||||
SELECT * FROM (SELECT * FROM test_table_1_rf1 FOR UPDATE) foo WHERE id = 1;
|
||||
}
|
||||
|
||||
step "s1-select-from-rt-with-subquery"
|
||||
{
|
||||
SELECT * FROM (SELECT * FROM ref_table FOR UPDATE) foo WHERE id = 1;
|
||||
}
|
||||
|
||||
step "s1-select-from-t1-with-view"
|
||||
{
|
||||
SELECT * FROM test_1 WHERE id = 1 FOR UPDATE;
|
||||
}
|
||||
|
||||
step "s1-finish"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
step "s2-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s2-update-t1"
|
||||
{
|
||||
UPDATE test_table_1_rf1 SET val_1 = 5 WHERE id = 1;
|
||||
}
|
||||
|
||||
step "s2-update-rt"
|
||||
{
|
||||
UPDATE ref_table SET val_1 = 5 WHERE id = 1;
|
||||
}
|
||||
|
||||
step "s2-delete-t1"
|
||||
{
|
||||
DELETE FROM test_table_1_rf1 WHERE id = 1;
|
||||
}
|
||||
|
||||
step "s2-select-from-t1-t2-for-share"
|
||||
{
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR SHARE;
|
||||
}
|
||||
|
||||
step "s2-select-from-t1-t2-for-update"
|
||||
{
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
}
|
||||
|
||||
step "s2-finish"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
permutation "s1-begin" "s1-select-from-t1-t2-for-update" "s2-begin" "s2-update-t1" "s1-finish" "s2-finish"
|
||||
permutation "s1-begin" "s1-select-from-t1-t2-for-share" "s2-begin" "s2-delete-t1" "s1-finish" "s2-finish"
|
||||
permutation "s1-begin" "s1-select-from-t1-rt-for-update" "s2-begin" "s2-update-t1" "s1-finish" "s2-finish"
|
||||
permutation "s1-begin" "s1-select-from-t1-rt-with-lc-for-update" "s2-begin" "s2-update-rt" "s1-finish" "s2-finish"
|
||||
permutation "s1-begin" "s1-select-from-t1-rt-with-lc-for-update" "s2-begin" "s2-update-t1" "s1-finish" "s2-finish"
|
||||
permutation "s1-begin" "s1-select-from-t1-t2-for-share" "s2-begin" "s2-select-from-t1-t2-for-share" "s1-finish" "s2-finish"
|
||||
permutation "s1-begin" "s1-select-from-t1-rt-for-update" "s2-begin" "s2-select-from-t1-t2-for-update" "s1-finish" "s2-finish"
|
||||
permutation "s1-begin" "s1-select-from-t1-within-cte" "s2-begin" "s2-select-from-t1-t2-for-update" "s1-finish" "s2-finish"
|
||||
permutation "s1-begin" "s1-select-from-t1-within-cte" "s2-begin" "s2-update-t1" "s1-finish" "s2-finish"
|
||||
permutation "s1-begin" "s1-select-from-t1-with-subquery" "s2-begin" "s2-update-t1" "s1-finish" "s2-finish"
|
||||
permutation "s1-begin" "s1-select-from-rt-with-subquery" "s2-begin" "s2-update-rt" "s1-finish" "s2-finish"
|
||||
permutation "s1-begin" "s1-select-from-t1-with-view" "s2-begin" "s2-update-t1" "s1-finish" "s2-finish"
|
|
@ -653,6 +653,11 @@ UPDATE reference_summary_table SET average_value = average_query.average FROM (
|
|||
) average_query
|
||||
WHERE id = 1;
|
||||
|
||||
UPDATE reference_summary_table SET average_value = average_query.average_value FROM (
|
||||
SELECT average_value FROM summary_table WHERE id = 1 FOR UPDATE
|
||||
) average_query
|
||||
WHERE id = 1;
|
||||
|
||||
UPDATE reference_summary_table SET (min_value, average_value) =
|
||||
(SELECT min(value), avg(value) FROM reference_raw_table WHERE id = 2)
|
||||
WHERE id = 2;
|
||||
|
|
|
@ -62,8 +62,6 @@ INSERT INTO articles_hash_mx VALUES (48, 8, 'alkylic', 18610);
|
|||
INSERT INTO articles_hash_mx VALUES (49, 9, 'anyone', 2681);
|
||||
INSERT INTO articles_hash_mx VALUES (50, 10, 'anjanette', 19519);
|
||||
|
||||
|
||||
|
||||
SET citus.task_executor_type TO 'real-time';
|
||||
SET client_min_messages TO 'DEBUG2';
|
||||
|
||||
|
@ -409,6 +407,10 @@ SELECT *
|
|||
FROM articles_hash_mx
|
||||
WHERE author_id = (random()::int * 0 + 1);
|
||||
|
||||
-- SELECT ... FOR UPDATE does not supported from MX nodes if it contains
|
||||
-- reference table.
|
||||
SELECT * FROM customer_mx FOR UPDATE;
|
||||
|
||||
-- not router plannable due to or
|
||||
SELECT *
|
||||
FROM articles_hash_mx
|
||||
|
|
|
@ -0,0 +1,125 @@
|
|||
--
|
||||
-- MULTI_SIZE_QUERIES
|
||||
--
|
||||
-- Test checks whether size of distributed tables can be obtained with citus_table_size.
|
||||
-- To find the relation size and total relation size citus_relation_size and
|
||||
-- citus_total_relation_size are also tested.
|
||||
|
||||
SET citus.next_shard_id TO 1460000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1460000;
|
||||
|
||||
SET citus.shard_replication_factor to 1;
|
||||
|
||||
CREATE TABLE test_table_1_rf1(id int, val_1 int);
|
||||
SELECT create_distributed_table('test_table_1_rf1','id');
|
||||
INSERT INTO test_table_1_rf1 values(1,2),(2,3),(3,4),(15,16);
|
||||
|
||||
CREATE TABLE test_table_2_rf1(id int, val_1 int);
|
||||
SELECT create_distributed_table('test_table_2_rf1','id');
|
||||
INSERT INTO test_table_2_rf1 values(1,2),(2,3),(3,4);
|
||||
|
||||
CREATE TABLE ref_table(id int, val_1 int);
|
||||
SELECT create_reference_table('ref_table');
|
||||
INSERT INTO ref_table values(1,2),(3,4),(5,6);
|
||||
|
||||
CREATE TABLE ref_table_2(id int, val_1 int);
|
||||
SELECT create_reference_table('ref_table_2');
|
||||
INSERT INTO ref_table_2 values(3,4),(5,6),(8,9);
|
||||
|
||||
SET citus.shard_replication_factor to 2;
|
||||
|
||||
CREATE TABLE test_table_3_rf2(id int, val_1 int);
|
||||
SELECT create_distributed_table('test_table_3_rf2','id');
|
||||
INSERT INTO test_table_3_rf2 values(1,2),(2,3),(3,4);
|
||||
|
||||
CREATE TABLE test_table_4_rf2(id int, val_1 int);
|
||||
SELECT create_distributed_table('test_table_4_rf2','id');
|
||||
INSERT INTO test_table_4_rf2 values(1,2),(2,3),(3,4);
|
||||
|
||||
-- Hash tables with RF = 1 is supported for router planner queries
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
|
||||
-- May hav two different filters on partition column, if they resulted on the same shard
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 WHERE tt1.id = 1 OR tt1.id = 15
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
|
||||
-- Only router plannable queries are supported right now.
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_1_rf1 as tt2 on tt1.id = tt2.id
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
|
||||
-- RF > 1 is not supported
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_3_rf2 as tt3 on tt1.id = tt3.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN test_table_3_rf2 as tt3 on tt1.id = tt3.id
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
|
||||
SELECT * FROM
|
||||
test_table_3_rf2 as tt3 INNER JOIN test_table_4_rf2 as tt4 on tt3.id = tt4.id
|
||||
WHERE tt3.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
|
||||
-- We take executor shard lock for reference tables
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR UPDATE;
|
||||
|
||||
-- Lock type varies according to the strength of row level lock
|
||||
SELECT * FROM
|
||||
test_table_1_rf1 as tt1 INNER JOIN ref_table as rt1 on tt1.id = rt1.id
|
||||
WHERE tt1.id = 1
|
||||
ORDER BY 1
|
||||
FOR SHARE;
|
||||
|
||||
-- You can choose which tables should be affected by locking clause
|
||||
SELECT * FROM
|
||||
ref_table as rt1 INNER JOIN ref_table_2 as rt2 on rt1.id = rt2.id
|
||||
ORDER BY 1
|
||||
FOR UPDATE
|
||||
OF rt1;
|
||||
|
||||
-- You can choose which tables should be affected by locking clause
|
||||
SELECT * FROM
|
||||
ref_table as rt1 INNER JOIN ref_table_2 as rt2 on rt1.id = rt2.id
|
||||
ORDER BY 1
|
||||
FOR UPDATE
|
||||
OF rt1
|
||||
NOWAIT;
|
||||
|
||||
-- queries with CTEs are supported
|
||||
WITH first_value AS (
|
||||
SELECT val_1 FROM test_table_1_rf1 WHERE id = 1 FOR UPDATE)
|
||||
SELECT * FROM first_value;
|
||||
|
||||
-- queries with modifying CTEs are also supported
|
||||
WITH update_table AS (
|
||||
UPDATE test_table_1_rf1 SET val_1 = 10 WHERE id = 1 RETURNING *
|
||||
)
|
||||
SELECT * FROM update_table FOR UPDATE;
|
||||
|
||||
-- Subqueries also supported
|
||||
SELECT * FROM (SELECT * FROM test_table_1_rf1 FOR UPDATE) foo WHERE id = 1;
|
||||
|
||||
|
||||
DROP TABLE test_table_1_rf1;
|
||||
DROP TABLE test_table_2_rf1;
|
||||
DROP TABLE ref_table;
|
||||
DROP TABLE ref_table_2;
|
||||
DROP TABLE test_table_3_rf2;
|
||||
DROP TABLE test_table_4_rf2;
|
Loading…
Reference in New Issue