Merge pull request #1690 from citusdata/update_delete_multiple_shard

Support multi shard update/delete queries
pull/1719/head
Burak Velioglu 2017-10-25 16:06:12 +03:00 committed by GitHub
commit fade7c1667
31 changed files with 2305 additions and 222 deletions

View File

@ -38,6 +38,9 @@
#include "utils/memutils.h"
/* controls the connection type for multi shard update/delete queries */
int MultiShardConnectionType = PARALLEL_CONNECTION;
/*
* Define executor methods for the different executor types.
*/
@ -176,11 +179,14 @@ RouterCreateScan(CustomScan *scan)
{
Assert(isModificationQuery);
if (IsMultiRowInsert(workerJob->jobQuery))
if (IsMultiRowInsert(workerJob->jobQuery) ||
(IsUpdateOrDelete(multiPlan) &&
MultiShardConnectionType == SEQUENTIAL_CONNECTION))
{
/*
* Multi-row INSERT is executed sequentially instead of using
* parallel connections.
* Multi shard update deletes while multi_shard_modify_mode equals
* to 'sequential' or Multi-row INSERT are executed sequentially
* instead of using parallel connections.
*/
scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods;
}

View File

@ -29,7 +29,6 @@
int RemoteTaskCheckInterval = 100; /* per cycle sleep interval in millisecs */
int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */
bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */
int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log level */
/*
@ -62,15 +61,6 @@ JobExecutorType(MultiPlan *multiPlan)
return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT;
}
/* if it is not a router executable plan, inform user according to the log level */
if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF)
{
ereport(MultiTaskQueryLogLevel, (errmsg("multi-task query about to be executed"),
errhint("Queries are split to multiple tasks "
"if they have to be split into several"
" queries on the workers.")));
}
Assert(multiPlan->operation == CMD_SELECT);
workerNodeList = ActiveReadableNodeList();

View File

@ -55,8 +55,7 @@
#include "utils/memutils.h"
static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList,
Oid relationId);
static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList);
PG_FUNCTION_INFO_V1(master_modify_multiple_shards);
@ -176,8 +175,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
CHECK_FOR_INTERRUPTS();
taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList,
relationId);
taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList);
affectedTupleCount = ExecuteModifyTasksWithoutResults(taskList);
PG_RETURN_INT32(affectedTupleCount);
@ -189,14 +187,14 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
* given list of shards.
*/
static List *
ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, Oid relationId)
ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList)
{
List *taskList = NIL;
ListCell *shardIntervalCell = NULL;
uint64 jobId = INVALID_JOB_ID;
int taskId = 1;
/* lock metadata before getting placment lists */
/* lock metadata before getting placement lists */
LockShardListMetadata(shardIntervalList, ShareLock);
foreach(shardIntervalCell, shardIntervalList)

View File

@ -54,7 +54,11 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
Task *task = (Task *) lfirst(taskCell);
Query *query = originalQuery;
if (task->insertSelectQuery)
if (UpdateOrDeleteQuery(query) && list_length(taskList))
{
query = copyObject(originalQuery);
}
else if (task->insertSelectQuery)
{
/* for INSERT..SELECT, adjust shard names in SELECT part */
RangeTblEntry *copiedInsertRte = NULL;

View File

@ -269,15 +269,6 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
++taskIdIndex;
}
if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF &&
list_length(sqlTaskList) > 1)
{
ereport(MultiTaskQueryLogLevel, (errmsg("multi-task query about to be executed"),
errhint("Queries are split to multiple tasks "
"if they have to be split into several"
" queries on the workers.")));
}
/* Create the worker job */
workerJob = CitusMakeNode(Job);
workerJob->taskList = sqlTaskList;
@ -484,6 +475,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
List *shardOpExpressions = NIL;
RestrictInfo *shardRestrictionList = NULL;
DeferredErrorMessage *planningError = NULL;
bool multiShardModifyQuery = false;
/* grab shared metadata lock to stop concurrent placement additions */
LockShardDistributionMetadata(shardId, ShareLock);
@ -543,7 +535,10 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
*/
planningError = PlanRouterQuery(copiedSubquery, copiedRestrictionContext,
&selectPlacementList, &selectAnchorShardId,
&relationShardList, replacePrunedQueryWithDummy);
&relationShardList, replacePrunedQueryWithDummy,
&multiShardModifyQuery);
Assert(!multiShardModifyQuery);
if (planningError)
{

View File

@ -2341,6 +2341,7 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
List *shardOpExpressions = NIL;
RestrictInfo *shardRestrictionList = NULL;
DeferredErrorMessage *planningError = NULL;
bool multiShardModifQuery = false;
/*
* Add the restriction qual parameter value in all baserestrictinfos.
@ -2379,7 +2380,10 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
*/
planningError = PlanRouterQuery(taskQuery, copiedRestrictionContext,
&selectPlacementList, &selectAnchorShardId,
&relationShardList, replacePrunedQueryWithDummy);
&relationShardList, replacePrunedQueryWithDummy,
&multiShardModifQuery);
Assert(!multiShardModifQuery);
/* we don't expect to this this error but keeping it as a precaution for future changes */
if (planningError)

View File

@ -37,6 +37,7 @@
static List *plannerRestrictionContextList = NIL;
int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log level */
/* create custom scan methods for separate executors */
static CustomScanMethods RealTimeCustomScanMethods = {
@ -283,6 +284,56 @@ IsModifyCommand(Query *query)
}
/*
* IsMultiShardModifyPlan returns true if the given plan was generated for
* multi shard update or delete query.
*/
bool
IsMultiShardModifyPlan(MultiPlan *multiPlan)
{
if (IsUpdateOrDelete(multiPlan) && IsMultiTaskPlan(multiPlan))
{
return true;
}
return false;
}
/*
* IsMultiTaskPlan returns true if job contains multiple tasks.
*/
bool
IsMultiTaskPlan(MultiPlan *multiPlan)
{
Job *workerJob = multiPlan->workerJob;
if (workerJob != NULL && list_length(workerJob->taskList) > 1)
{
return true;
}
return false;
}
/*
* IsUpdateOrDelete returns true if the query performs update or delete.
*/
bool
IsUpdateOrDelete(MultiPlan *multiPlan)
{
CmdType commandType = multiPlan->operation;
if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
{
return true;
}
return false;
}
/*
* IsModifyMultiPlan returns true if the multi plan performs modifications,
* false otherwise.
@ -436,9 +487,11 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
/*
* As explained above, force planning costs to be unrealistically high if
* query planning failed (possibly) due to prepared statement parameters.
* query planning failed (possibly) due to prepared statement parameters or
* if it is planned as a multi shard modify query.
*/
if (distributedPlan->planningError && hasUnresolvedParams)
if ((distributedPlan->planningError || IsMultiShardModifyPlan(distributedPlan)) &&
hasUnresolvedParams)
{
/*
* Arbitraryly high cost, but low enough that it can be added up
@ -530,6 +583,20 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan)
}
}
if (IsMultiTaskPlan(multiPlan))
{
/* if it is not a single task executable plan, inform user according to the log level */
if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF)
{
ereport(MultiTaskQueryLogLevel, (errmsg(
"multi-task query about to be executed"),
errhint(
"Queries are split to multiple tasks "
"if they have to be split into several"
" queries on the workers.")));
}
}
multiPlan->relationIdList = localPlan->relationOids;
multiPlanData = (Node *) multiPlan;

View File

@ -2,8 +2,8 @@
*
* multi_router_planner.c
*
* This file contains functions to plan single shard queries
* including distributed table modifications.
* This file contains functions to plan multiple shard queries without any
* aggregation step including distributed table modifications.
*
* Copyright (c) 2014-2016, Citus Data, Inc.
*
@ -137,7 +137,6 @@ static List * ExtractInsertValuesList(Query *query, Var *partitionColumn);
static bool MultiRouterPlannableQuery(Query *query,
RelationRestrictionContext *restrictionContext);
static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree);
static bool UpdateOrDeleteQuery(Query *query);
static RangeTblEntry * GetUpdateOrDeleteRTE(List *rangeTableList);
static bool UpdateOrDeleteRTE(RangeTblEntry *rangeTableEntry);
static bool SelectsFromDistributedTable(List *rangeTableList);
@ -146,7 +145,13 @@ static List * get_all_actual_clauses(List *restrictinfo_list);
#endif
static int CompareInsertValuesByShardId(const void *leftElement,
const void *rightElement);
static uint64 GetInitialShardId(List *relationShardList);
static List * SingleShardSelectTaskList(Query *query, List *relationShardList,
List *placementList, uint64 shardId);
static List * SingleShardModifyTaskList(Query *query, List *relationShardList,
List *placementList, uint64 shardId);
static List * MultiShardModifyTaskList(Query *originalQuery, List *relationShardList,
bool requiresMasterEvaluation);
/*
* CreateRouterPlan attempts to create a router executor plan for the given
@ -250,7 +255,9 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
return multiPlan;
}
/* we cannot have multi shard update/delete query via this code path */
job = RouterJob(originalQuery, restrictionContext, &multiPlan->planningError);
if (multiPlan->planningError)
{
/* query cannot be handled by this planner */
@ -516,11 +523,21 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery)
*/
if (!UpdateOrDeleteQuery(queryTree) || multiShardQuery)
{
StringInfo errorHint = makeStringInfo();
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(
distributedTableId);
char *partitionKeyString = cacheEntry->partitionKeyString;
char *partitionColumnName = ColumnNameToColumn(distributedTableId,
partitionKeyString);
appendStringInfo(errorHint,
"Consider using an equality filter on partition column \"%s\" to target a single shard.",
partitionColumnName);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot perform distributed planning for the given "
"modifications",
"Subqueries are not supported in distributed "
"modifications.", NULL);
"subqueries are not supported in modifications across "
"multiple shards",
errorHint->data, NULL);
}
}
@ -602,8 +619,20 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery)
*/
if (rangeTableEntry->rtekind == RTE_SUBQUERY)
{
rangeTableEntryErrorDetail = "Subqueries are not supported in"
" distributed modifications.";
StringInfo errorHint = makeStringInfo();
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(
distributedTableId);
char *partitionKeyString = cacheEntry->partitionKeyString;
char *partitionColumnName = ColumnNameToColumn(distributedTableId,
partitionKeyString);
appendStringInfo(errorHint, "Consider using an equality filter on "
"partition column \"%s\" to target a single shard.",
partitionColumnName);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "subqueries are not "
"supported in modifications across multiple shards",
errorHint->data, NULL);
}
else if (rangeTableEntry->rtekind == RTE_JOIN)
{
@ -741,6 +770,14 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery)
"RETURNING clause",
NULL, NULL);
}
if (queryTree->jointree->quals != NULL &&
nodeTag(queryTree->jointree->quals) == T_CurrentOfExpr)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot run DML queries with cursors", NULL,
NULL);
}
}
if (commandType == CMD_INSERT && queryTree->onConflict != NULL)
@ -836,7 +873,7 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery)
* UpdateOrDeleteQuery checks if the given query is an UPDATE or DELETE command.
* If it is, it returns true otherwise it returns false.
*/
static bool
bool
UpdateOrDeleteQuery(Query *query)
{
CmdType commandType = query->commandType;
@ -1326,14 +1363,15 @@ ExtractFirstDistributedTableId(Query *query)
}
/* RouterJob builds a Job to represent a single shard select/update/delete query */
/*
* RouterJob builds a Job to represent a single shard select/update/delete and
* multiple shard update/delete queries.
*/
static Job *
RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext,
DeferredErrorMessage **planningError)
{
Job *job = NULL;
Task *task = NULL;
StringInfo queryString = makeStringInfo();
uint64 shardId = INVALID_SHARD_ID;
List *placementList = NIL;
List *relationShardList = NIL;
@ -1341,6 +1379,7 @@ RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext,
bool replacePrunedQueryWithDummy = false;
bool requiresMasterEvaluation = false;
RangeTblEntry *updateOrDeleteRTE = NULL;
bool isMultiShardModifyQuery = false;
/* router planner should create task even if it deosn't hit a shard at all */
replacePrunedQueryWithDummy = true;
@ -1350,7 +1389,8 @@ RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext,
(*planningError) = PlanRouterQuery(originalQuery, restrictionContext,
&placementList, &shardId, &relationShardList,
replacePrunedQueryWithDummy);
replacePrunedQueryWithDummy,
&isMultiShardModifyQuery);
if (*planningError)
{
return NULL;
@ -1374,19 +1414,108 @@ RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext,
return job;
}
pg_get_query_def(originalQuery, queryString);
if (originalQuery->commandType == CMD_SELECT)
{
task = CreateTask(ROUTER_TASK);
job->taskList = SingleShardSelectTaskList(originalQuery, relationShardList,
placementList, shardId);
}
else if (isMultiShardModifyQuery)
{
job->taskList = MultiShardModifyTaskList(originalQuery, relationShardList,
requiresMasterEvaluation);
}
else
{
job->taskList = SingleShardModifyTaskList(originalQuery, relationShardList,
placementList, shardId);
}
job->requiresMasterEvaluation = requiresMasterEvaluation;
return job;
}
/*
* SingleShardSelectTaskList generates a task for single shard select query
* and returns it as a list.
*/
static List *
SingleShardSelectTaskList(Query *query, List *relationShardList, List *placementList,
uint64 shardId)
{
Task *task = CreateTask(ROUTER_TASK);
StringInfo queryString = makeStringInfo();
pg_get_query_def(query, queryString);
task->queryString = queryString->data;
task->anchorShardId = shardId;
task->taskPlacementList = placementList;
task->relationShardList = relationShardList;
return list_make1(task);
}
/*
* MultiShardModifyTaskList generates task list for multi shard update/delete
* queries.
*/
static List *
MultiShardModifyTaskList(Query *originalQuery, List *relationShardList,
bool requiresMasterEvaluation)
{
List *taskList = NIL;
ListCell *relationShardCell = NULL;
int taskId = 1;
foreach(relationShardCell, relationShardList)
{
RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell);
List *relationShardList = list_make1(relationShard);
Task *task = CreateTask(MODIFY_TASK);
if (!requiresMasterEvaluation)
{
Query *copiedQuery = copyObject(originalQuery);
StringInfo shardQueryString = makeStringInfo();
UpdateRelationToShardNames((Node *) copiedQuery, relationShardList);
pg_get_query_def(copiedQuery, shardQueryString);
task->queryString = shardQueryString->data;
}
task->taskId = taskId++;
task->anchorShardId = relationShard->shardId;
task->relationShardList = relationShardList;
taskList = lappend(taskList, task);
}
return taskList;
}
/*
* SingleShardModifyTaskList generates a task for single shard update/delete query
* and returns it as a list.
*/
static List *
SingleShardModifyTaskList(Query *query, List *relationShardList, List *placementList,
uint64 shardId)
{
Task *task = CreateTask(MODIFY_TASK);
StringInfo queryString = makeStringInfo();
DistTableCacheEntry *modificationTableCacheEntry = NULL;
char modificationPartitionMethod = 0;
List *rangeTableList = NIL;
RangeTblEntry *updateOrDeleteRTE = NULL;
modificationTableCacheEntry = DistributedTableCacheEntry(
updateOrDeleteRTE->relid);
ExtractRangeTableEntryWalker((Node *) query, &rangeTableList);
updateOrDeleteRTE = GetUpdateOrDeleteRTE(rangeTableList);
modificationTableCacheEntry = DistributedTableCacheEntry(updateOrDeleteRTE->relid);
modificationPartitionMethod = modificationTableCacheEntry->partitionMethod;
if (modificationPartitionMethod == DISTRIBUTE_BY_NONE &&
@ -1397,19 +1526,15 @@ RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext,
"and modify a reference table")));
}
task = CreateTask(MODIFY_TASK);
task->replicationModel = modificationTableCacheEntry->replicationModel;
}
pg_get_query_def(query, queryString);
task->queryString = queryString->data;
task->anchorShardId = shardId;
task->taskPlacementList = placementList;
task->relationShardList = relationShardList;
task->replicationModel = modificationTableCacheEntry->replicationModel;
job->taskList = list_make1(task);
job->requiresMasterEvaluation = requiresMasterEvaluation;
return job;
return list_make1(task);
}
@ -1501,85 +1626,52 @@ SelectsFromDistributedTable(List *rangeTableList)
DeferredErrorMessage *
PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext,
List **placementList, uint64 *anchorShardId, List **relationShardList,
bool replacePrunedQueryWithDummy)
bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery)
{
bool multiShardQuery = false;
bool isMultiShardQuery = false;
List *prunedRelationShardList = NIL;
DeferredErrorMessage *planningError = NULL;
ListCell *prunedRelationShardListCell = NULL;
List *workerList = NIL;
bool shardsPresent = false;
uint64 shardId = INVALID_SHARD_ID;
CmdType commandType = originalQuery->commandType;
bool isMultiShardModifyQuery = false;
*placementList = NIL;
prunedRelationShardList = TargetShardIntervalsForRouter(originalQuery,
restrictionContext,
&multiShardQuery);
&isMultiShardQuery);
if (isMultiShardQuery)
{
/*
* If multiShardQuery is true then it means a relation has more
* than one shard left after pruning.
* If multiShardQuery is true and it is a type of SELECT query, then
* return deferred error. We do not support multi-shard SELECT queries
* with this code path.
*/
if (multiShardQuery)
if (commandType == CMD_SELECT)
{
StringInfo errorMessage = makeStringInfo();
StringInfo errorHint = makeStringInfo();
CmdType commandType = originalQuery->commandType;
const char *commandName = "SELECT";
if (commandType == CMD_UPDATE)
{
commandName = "UPDATE";
}
else if (commandType == CMD_DELETE)
{
commandName = "DELETE";
}
if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
{
List *rangeTableList = NIL;
RangeTblEntry *updateOrDeleteRTE = NULL;
DistTableCacheEntry *updateOrDeleteTableCacheEntry = NULL;
char *partitionKeyString = NULL;
char *partitionColumnName = NULL;
/* extract range table entries */
ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList);
updateOrDeleteRTE = GetUpdateOrDeleteRTE(rangeTableList);
updateOrDeleteTableCacheEntry =
DistributedTableCacheEntry(updateOrDeleteRTE->relid);
partitionKeyString = updateOrDeleteTableCacheEntry->partitionKeyString;
partitionColumnName = ColumnNameToColumn(updateOrDeleteRTE->relid,
partitionKeyString);
appendStringInfo(errorHint, "Consider using an equality filter on "
"partition column \"%s\" to target a "
"single shard. If you'd like to run a "
"multi-shard operation, use "
"master_modify_multiple_shards().",
partitionColumnName);
}
/* note that for SELECT queries, we never print this error message */
appendStringInfo(errorMessage,
"cannot run %s command which targets multiple shards",
commandName);
planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL,
errorHint->data);
NULL, NULL, NULL);
return planningError;
}
Assert(UpdateOrDeleteQuery(originalQuery));
planningError = ModifyQuerySupported(originalQuery, isMultiShardQuery);
if (planningError != NULL)
{
return planningError;
}
isMultiShardModifyQuery = true;
}
foreach(prunedRelationShardListCell, prunedRelationShardList)
{
List *prunedShardList = (List *) lfirst(prunedRelationShardListCell);
ShardInterval *shardInterval = NULL;
RelationShard *relationShard = NULL;
ListCell *shardIntervalCell = NULL;
/* no shard is present or all shards are pruned out case will be handled later */
if (prunedShardList == NIL)
@ -1589,24 +1681,23 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon
shardsPresent = true;
/* all relations are now pruned down to 0 or 1 shards */
Assert(list_length(prunedShardList) <= 1);
shardInterval = (ShardInterval *) linitial(prunedShardList);
/* anchor shard id */
if (shardId == INVALID_SHARD_ID)
foreach(shardIntervalCell, prunedShardList)
{
shardId = shardInterval->shardId;
}
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
RelationShard *relationShard = CitusMakeNode(RelationShard);
/* add relation to shard mapping */
relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = shardInterval->relationId;
relationShard->shardId = shardInterval->shardId;
*relationShardList = lappend(*relationShardList, relationShard);
}
}
if (isMultiShardModifyQuery)
{
*multiShardModifyQuery = true;
return planningError;
}
/*
* We bail out if there are RTEs that prune multiple shards above, but
@ -1620,6 +1711,9 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon
return planningError;
}
/* we need anchor shard id for select queries with router planner */
shardId = GetInitialShardId(prunedRelationShardList);
/*
* Determine the worker that has all shard placements if a shard placement found.
* If no shard placement exists and replacePrunedQueryWithDummy flag is set, we will
@ -1665,6 +1759,7 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon
return planningError;
}
/*
* If this is an UPDATE or DELETE query which requires master evaluation,
* don't try update shard names, and postpone that to execution phase.
@ -1674,6 +1769,7 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon
UpdateRelationToShardNames((Node *) originalQuery, *relationShardList);
}
*multiShardModifyQuery = false;
*placementList = workerList;
*anchorShardId = shardId;
@ -1681,15 +1777,43 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon
}
/*
* GetInitialShardId returns the initial shard id given relation shard list. If
* there is no relation shard exist in the list returns INAVLID_SHARD_ID.
*/
static uint64
GetInitialShardId(List *relationShardList)
{
ListCell *prunedRelationShardListCell = NULL;
foreach(prunedRelationShardListCell, relationShardList)
{
List *prunedShardList = (List *) lfirst(prunedRelationShardListCell);
ShardInterval *shardInterval = NULL;
/* no shard is present or all shards are pruned out case will be handled later */
if (prunedShardList == NIL)
{
continue;
}
shardInterval = linitial(prunedShardList);
return shardInterval->shardId;
}
return INVALID_SHARD_ID;
}
/*
* TargetShardIntervalsForRouter performs shard pruning for all referenced relations
* in the query and returns list of shards per relation. Shard pruning is done based
* on provided restriction context per relation. The function bails out and returns
* after setting multiShardQuery to true if any of the relations pruned down to
* more than one active shard. It also records pruned shard intervals in relation
* restriction context to be used later on. Some queries may have contradiction
* clauses like 'and false' or 'and 1=0', such queries are treated as if all of
* the shards of joining relations are pruned out.
* on provided restriction context per relation. The function sets multiShardQuery
* to true if any of the relations pruned down to more than one active shard. It
* also records pruned shard intervals in relation restriction context to be used
* later on. Some queries may have contradiction clauses like 'and false' or
* 'and 1=0', such queries are treated as if all of the shards of joining
* relations are pruned out.
*/
static List *
TargetShardIntervalsForRouter(Query *query,
@ -1729,17 +1853,9 @@ TargetShardIntervalsForRouter(Query *query,
{
prunedShardList = PruneShards(relationId, tableId, restrictClauseList);
/*
* Quick bail out. The query can not be router plannable if one
* relation has more than one shard left after pruning. Having no
* shard left is okay at this point. It will be handled at a later
* stage.
*/
if (list_length(prunedShardList) > 1)
{
(*multiShardQuery) = true;
return NIL;
}
}
@ -1783,10 +1899,10 @@ RelationPrunesToMultipleShards(List *relationShardList)
/*
* WorkersContainingAllShards returns list of shard placements that contain all
* shard intervals provided to the function. It returns NIL if no placement exists.
* The caller should check if there are any shard intervals exist for placement
* check prior to calling this function.
* WorkersContainingSelectShards returns list of shard placements that contain all
* shard intervals provided to the select query. It returns NIL if no placement
* exists. The caller should check if there are any shard intervals exist for
* placement check prior to calling this function.
*/
static List *
WorkersContainingAllShards(List *prunedShardIntervalsList)
@ -2324,10 +2440,9 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn)
/*
* MultiRouterPlannableQuery returns true if given query can be router plannable.
* The query is router plannable if it is a modify query, or if its is a select
* query issued on a hash partitioned distributed table, and it has a filter
* to reduce number of shard pairs to one, and all shard pairs are located on
* the same node. Router plannable checks for select queries can be turned off
* by setting citus.enable_router_execution flag to false.
* query issued on a hash partitioned distributed table. Router plannable checks
* for select queries can be turned off by setting citus.enable_router_execution
* flag to false.
*/
static bool
MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext)

View File

@ -120,6 +120,12 @@ static const struct config_enum_entry multi_task_query_log_level_options[] = {
{ NULL, 0, false }
};
static const struct config_enum_entry multi_shard_modify_connection_options[] = {
{ "parallel", PARALLEL_CONNECTION, false },
{ "sequential", SEQUENTIAL_CONNECTION, false },
{ NULL, 0, false }
};
/* *INDENT-ON* */
@ -733,6 +739,16 @@ RegisterCitusConfigVariables(void)
0,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.multi_shard_modify_mode",
gettext_noop("Sets the connection type for multi shard modify queries"),
NULL,
&MultiShardConnectionType,
PARALLEL_CONNECTION, multi_shard_modify_connection_options,
PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"citus.version",
gettext_noop("Shows the Citus library version"),

View File

@ -412,7 +412,7 @@ LoadShardPlacement(uint64 shardId, uint64 placementId)
/*
* FindShardPlacementOnGroup returns the shard placement for the given shard
* on the given group, or returns NULL of no placement for the shard exists
* on the given group, or returns NULL if no placement for the shard exists
* on the group.
*/
ShardPlacement *

View File

@ -28,6 +28,14 @@ typedef struct CitusScanState
} CitusScanState;
/* managed via guc.c */
typedef enum
{
PARALLEL_CONNECTION = 0,
SEQUENTIAL_CONNECTION = 1
} MultiShardConnectionTypes;
extern int MultiShardConnectionType;
extern Node * RealTimeCreateScan(CustomScan *scan);
extern Node * TaskTrackerCreateScan(CustomScan *scan);
extern Node * RouterCreateScan(CustomScan *scan);

View File

@ -19,7 +19,7 @@
/* values used by jobs and tasks which do not require identifiers */
#define INVALID_JOB_ID 0
#define INVALID_TASK_ID 0
#define MULTI_TASK_QUERY_INFO_OFF 0 /* do not log multi-task queries */
typedef struct RelationRestrictionContext
{
@ -83,7 +83,10 @@ extern void multi_join_restriction_hook(PlannerInfo *root,
JoinType jointype,
JoinPathExtraData *extra);
extern bool IsModifyCommand(Query *query);
extern bool IsUpdateOrDelete(struct MultiPlan *multiPlan);
extern bool IsModifyMultiPlan(struct MultiPlan *multiPlan);
extern bool IsMultiTaskPlan(struct MultiPlan *multiPlan);
extern bool IsMultiShardModifyPlan(struct MultiPlan *multiPlan);
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);

View File

@ -36,7 +36,8 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
restrictionContext,
List **placementList, uint64 *anchorShardId,
List **relationShardList, bool
replacePrunedQueryWithDummy);
replacePrunedQueryWithDummy,
bool *multiShardModifyQuery);
extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError);
extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList);
extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree,
@ -52,6 +53,7 @@ extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query);
extern bool IsMultiRowInsert(Query *query);
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
ShardInterval *shardInterval);
extern bool UpdateOrDeleteQuery(Query *query);
#endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -36,8 +36,6 @@
#define JOB_CLEANUP_QUERY "SELECT task_tracker_cleanup_job("UINT64_FORMAT ")"
#define JOB_CLEANUP_TASK_ID INT_MAX
#define MULTI_TASK_QUERY_INFO_OFF 0 /* do not log multi-task queries */
/* Enumeration to track one task's execution status */
typedef enum

View File

@ -0,0 +1,317 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s1-update_all_value_1 s2-begin s2-select s1-commit s2-select s2-commit
step s1-begin:
BEGIN;
step s1-update_all_value_1:
UPDATE users_test_table SET value_1 = 3;
step s2-begin:
BEGIN;
step s2-select:
SELECT * FROM users_test_table ORDER BY value_2;
user_id value_1 value_2 value_3
1 5 6 7
2 12 7 18
3 23 8 25
4 42 9 23
5 35 10 17
6 21 11 25
7 27 12 18
step s1-commit:
COMMIT;
step s2-select:
SELECT * FROM users_test_table ORDER BY value_2;
user_id value_1 value_2 value_3
1 3 6 7
2 3 7 18
3 3 8 25
4 3 9 23
5 3 10 17
6 3 11 25
7 3 12 18
step s2-commit:
COMMIT;
starting permutation: s1-begin s1-update_all_value_1 s2-begin s2-update_all_value_1 s1-commit s2-commit
step s1-begin:
BEGIN;
step s1-update_all_value_1:
UPDATE users_test_table SET value_1 = 3;
step s2-begin:
BEGIN;
step s2-update_all_value_1:
UPDATE users_test_table SET value_1 = 6;
<waiting ...>
step s1-commit:
COMMIT;
step s2-update_all_value_1: <... completed>
step s2-commit:
COMMIT;
starting permutation: s1-begin s1-update_value_1_of_1_or_3 s2-begin s2-update_value_1_of_4_or_6 s1-commit s2-commit s2-select
step s1-begin:
BEGIN;
step s1-update_value_1_of_1_or_3:
UPDATE users_test_table SET value_1 = 5 WHERE user_id = 1 or user_id = 3;
step s2-begin:
BEGIN;
step s2-update_value_1_of_4_or_6:
UPDATE users_test_table SET value_1 = 4 WHERE user_id = 4 or user_id = 6;
step s1-commit:
COMMIT;
step s2-commit:
COMMIT;
step s2-select:
SELECT * FROM users_test_table ORDER BY value_2;
user_id value_1 value_2 value_3
1 5 6 7
2 12 7 18
3 5 8 25
4 4 9 23
5 35 10 17
6 4 11 25
7 27 12 18
starting permutation: s1-begin s1-update_value_1_of_1_or_3 s2-begin s2-update_value_1_of_1_or_3 s1-commit s2-commit s2-select
step s1-begin:
BEGIN;
step s1-update_value_1_of_1_or_3:
UPDATE users_test_table SET value_1 = 5 WHERE user_id = 1 or user_id = 3;
step s2-begin:
BEGIN;
step s2-update_value_1_of_1_or_3:
UPDATE users_test_table SET value_1 = 8 WHERE user_id = 1 or user_id = 3;
<waiting ...>
step s1-commit:
COMMIT;
step s2-update_value_1_of_1_or_3: <... completed>
step s2-commit:
COMMIT;
step s2-select:
SELECT * FROM users_test_table ORDER BY value_2;
user_id value_1 value_2 value_3
1 8 6 7
2 12 7 18
3 8 8 25
4 42 9 23
5 35 10 17
6 21 11 25
7 27 12 18
starting permutation: s1-begin s1-update_all_value_1 s2-begin s2-insert-to-table s1-commit s2-commit s2-select
step s1-begin:
BEGIN;
step s1-update_all_value_1:
UPDATE users_test_table SET value_1 = 3;
step s2-begin:
BEGIN;
step s2-insert-to-table:
INSERT INTO users_test_table VALUES (1,2,3,4);
step s1-commit:
COMMIT;
step s2-commit:
COMMIT;
step s2-select:
SELECT * FROM users_test_table ORDER BY value_2;
user_id value_1 value_2 value_3
1 2 3 4
1 3 6 7
2 3 7 18
3 3 8 25
4 3 9 23
5 3 10 17
6 3 11 25
7 3 12 18
starting permutation: s1-begin s1-update_all_value_1 s2-begin s2-insert-into-select s1-commit s2-commit s2-select
step s1-begin:
BEGIN;
step s1-update_all_value_1:
UPDATE users_test_table SET value_1 = 3;
step s2-begin:
BEGIN;
step s2-insert-into-select:
INSERT INTO users_test_table SELECT * FROM events_test_table;
<waiting ...>
step s1-commit:
COMMIT;
step s2-insert-into-select: <... completed>
step s2-commit:
COMMIT;
step s2-select:
SELECT * FROM users_test_table ORDER BY value_2;
user_id value_1 value_2 value_3
1 3 6 7
1 5 7 7
2 3 7 18
3 3 8 25
4 3 9 23
5 22 9 25
5 3 10 17
7 41 10 23
6 3 11 25
1 20 12 25
7 3 12 18
3 26 13 18
5 17 14 4
3 11 78 18
starting permutation: s1-begin s1-change_connection_mode_to_sequential s1-update_all_value_1 s2-begin s2-change_connection_mode_to_sequential s2-update_all_value_1 s1-commit s2-commit s2-select
step s1-begin:
BEGIN;
step s1-change_connection_mode_to_sequential:
set citus.multi_shard_modify_mode to 'sequential';
step s1-update_all_value_1:
UPDATE users_test_table SET value_1 = 3;
step s2-begin:
BEGIN;
step s2-change_connection_mode_to_sequential:
set citus.multi_shard_modify_mode to 'sequential';
step s2-update_all_value_1:
UPDATE users_test_table SET value_1 = 6;
<waiting ...>
step s1-commit:
COMMIT;
step s2-update_all_value_1: <... completed>
step s2-commit:
COMMIT;
step s2-select:
SELECT * FROM users_test_table ORDER BY value_2;
user_id value_1 value_2 value_3
1 6 6 7
2 6 7 18
3 6 8 25
4 6 9 23
5 6 10 17
6 6 11 25
7 6 12 18
starting permutation: s1-begin s1-change_connection_mode_to_sequential s1-update_value_1_of_1_or_3 s2-begin s2-change_connection_mode_to_sequential s2-update_value_1_of_1_or_3 s1-commit s2-commit s2-select
step s1-begin:
BEGIN;
step s1-change_connection_mode_to_sequential:
set citus.multi_shard_modify_mode to 'sequential';
step s1-update_value_1_of_1_or_3:
UPDATE users_test_table SET value_1 = 5 WHERE user_id = 1 or user_id = 3;
step s2-begin:
BEGIN;
step s2-change_connection_mode_to_sequential:
set citus.multi_shard_modify_mode to 'sequential';
step s2-update_value_1_of_1_or_3:
UPDATE users_test_table SET value_1 = 8 WHERE user_id = 1 or user_id = 3;
<waiting ...>
step s1-commit:
COMMIT;
step s2-update_value_1_of_1_or_3: <... completed>
step s2-commit:
COMMIT;
step s2-select:
SELECT * FROM users_test_table ORDER BY value_2;
user_id value_1 value_2 value_3
1 8 6 7
2 12 7 18
3 8 8 25
4 42 9 23
5 35 10 17
6 21 11 25
7 27 12 18
starting permutation: s1-begin s1-change_connection_mode_to_sequential s1-update_value_1_of_1_or_3 s2-begin s2-change_connection_mode_to_sequential s2-update_value_1_of_4_or_6 s1-commit s2-commit s2-select
step s1-begin:
BEGIN;
step s1-change_connection_mode_to_sequential:
set citus.multi_shard_modify_mode to 'sequential';
step s1-update_value_1_of_1_or_3:
UPDATE users_test_table SET value_1 = 5 WHERE user_id = 1 or user_id = 3;
step s2-begin:
BEGIN;
step s2-change_connection_mode_to_sequential:
set citus.multi_shard_modify_mode to 'sequential';
step s2-update_value_1_of_4_or_6:
UPDATE users_test_table SET value_1 = 4 WHERE user_id = 4 or user_id = 6;
step s1-commit:
COMMIT;
step s2-commit:
COMMIT;
step s2-select:
SELECT * FROM users_test_table ORDER BY value_2;
user_id value_1 value_2 value_3
1 5 6 7
2 12 7 18
3 5 8 25
4 4 9 23
5 35 10 17
6 4 11 25
7 27 12 18

View File

@ -747,6 +747,70 @@ t
SELECT true AS valid FROM explain_json($$
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$);
t
-- Test multi shard update
EXPLAIN (COSTS FALSE)
UPDATE lineitem_hash_part
SET l_suppkey = 12;
Custom Scan (Citus Router)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Update on lineitem_hash_part_360290 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on lineitem_hash_part_360291 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360291 lineitem_hash_part
-> Task
Node: host=localhost port=57637 dbname=regression
-> Update on lineitem_hash_part_360292 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360292 lineitem_hash_part
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on lineitem_hash_part_360293 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360293 lineitem_hash_part
EXPLAIN (COSTS FALSE)
UPDATE lineitem_hash_part
SET l_suppkey = 12
WHERE l_orderkey = 1 OR l_orderkey = 3;
Custom Scan (Citus Router)
Task Count: 2
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Update on lineitem_hash_part_360290 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
Filter: ((l_orderkey = 1) OR (l_orderkey = 3))
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on lineitem_hash_part_360291 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360291 lineitem_hash_part
Filter: ((l_orderkey = 1) OR (l_orderkey = 3))
-- Test multi shard delete
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem_hash_part;
Custom Scan (Citus Router)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Delete on lineitem_hash_part_360290 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
-> Task
Node: host=localhost port=57638 dbname=regression
-> Delete on lineitem_hash_part_360291 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360291 lineitem_hash_part
-> Task
Node: host=localhost port=57637 dbname=regression
-> Delete on lineitem_hash_part_360292 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360292 lineitem_hash_part
-> Task
Node: host=localhost port=57638 dbname=regression
-> Delete on lineitem_hash_part_360293 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360293 lineitem_hash_part
-- Test track tracker
SET citus.task_executor_type TO 'task-tracker';
SET citus.explain_all_tasks TO off;

View File

@ -747,6 +747,70 @@ t
SELECT true AS valid FROM explain_json($$
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$);
t
-- Test multi shard update
EXPLAIN (COSTS FALSE)
UPDATE lineitem_hash_part
SET l_suppkey = 12;
Custom Scan (Citus Router)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Update on lineitem_hash_part_360290 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on lineitem_hash_part_360291 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360291 lineitem_hash_part
-> Task
Node: host=localhost port=57637 dbname=regression
-> Update on lineitem_hash_part_360292 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360292 lineitem_hash_part
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on lineitem_hash_part_360293 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360293 lineitem_hash_part
EXPLAIN (COSTS FALSE)
UPDATE lineitem_hash_part
SET l_suppkey = 12
WHERE l_orderkey = 1 OR l_orderkey = 3;
Custom Scan (Citus Router)
Task Count: 2
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Update on lineitem_hash_part_360290 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
Filter: ((l_orderkey = 1) OR (l_orderkey = 3))
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on lineitem_hash_part_360291 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360291 lineitem_hash_part
Filter: ((l_orderkey = 1) OR (l_orderkey = 3))
-- Test multi shard delete
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem_hash_part;
Custom Scan (Citus Router)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Delete on lineitem_hash_part_360290 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
-> Task
Node: host=localhost port=57638 dbname=regression
-> Delete on lineitem_hash_part_360291 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360291 lineitem_hash_part
-> Task
Node: host=localhost port=57637 dbname=regression
-> Delete on lineitem_hash_part_360292 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360292 lineitem_hash_part
-> Task
Node: host=localhost port=57638 dbname=regression
-> Delete on lineitem_hash_part_360293 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360293 lineitem_hash_part
-- Test track tracker
SET citus.task_executor_type TO 'task-tracker';
SET citus.explain_all_tasks TO off;

View File

@ -290,10 +290,6 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 246;
0
(1 row)
-- commands with no constraints on the partition key are not supported
DELETE FROM limit_orders WHERE bidder_id = 162;
ERROR: cannot run DELETE command which targets multiple shards
HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
-- commands with a USING clause are unsupported
CREATE TABLE bidders ( name text, id bigint );
DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND
@ -304,10 +300,6 @@ ERROR: cannot plan queries which include both local and distributed relations
WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *)
DELETE FROM limit_orders;
ERROR: common table expressions are not supported in distributed modifications
-- cursors are not supported
DELETE FROM limit_orders WHERE CURRENT OF cursor_name;
ERROR: cannot run DELETE command which targets multiple shards
HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69);
-- simple UPDATE
UPDATE limit_orders SET symbol = 'GM' WHERE id = 246;
@ -423,10 +415,6 @@ AND s.logicalrelid = 'limit_orders'::regclass;
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
-- Third: Connect back to master node
\c - - - :master_port
-- commands with no constraints on the partition key are not supported
UPDATE limit_orders SET limit_price = 0.00;
ERROR: cannot run UPDATE command which targets multiple shards
HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
-- attempting to change the partition key is unsupported
UPDATE limit_orders SET id = 0 WHERE id = 246;
ERROR: modifying the partition value of rows is not allowed
@ -522,10 +510,6 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh
-- even in RETURNING
UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW();
ERROR: non-IMMUTABLE functions are not allowed in the RETURNING clause
-- cursors are not supported
UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name;
ERROR: cannot run UPDATE command which targets multiple shards
HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
-- check that multi-row UPDATE/DELETEs with RETURNING work
INSERT INTO multiple_hash VALUES ('0', '1');
INSERT INTO multiple_hash VALUES ('0', '2');
@ -984,12 +968,12 @@ SELECT * FROM summary_table ORDER BY id;
-- unsupported multi-shard updates
UPDATE summary_table SET average_value = average_query.average FROM (
SELECT avg(value) AS average FROM raw_table) average_query;
ERROR: cannot run UPDATE command which targets multiple shards
HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
ERROR: subqueries are not supported in modifications across multiple shards
DETAIL: Consider using an equality filter on partition column "id" to target a single shard.
UPDATE summary_table SET average_value = average_value + 1 WHERE id =
(SELECT id FROM raw_table WHERE value > 100);
ERROR: cannot run UPDATE command which targets multiple shards
HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
ERROR: subqueries are not supported in modifications across multiple shards
DETAIL: Consider using an equality filter on partition column "id" to target a single shard.
-- test complex queries
UPDATE summary_table
SET
@ -1128,8 +1112,8 @@ SELECT master_modify_multiple_shards('
SELECT avg(value) AS average FROM raw_table WHERE id = 1
) average_query
WHERE id = 1');
ERROR: cannot perform distributed planning for the given modifications
DETAIL: Subqueries are not supported in distributed modifications.
ERROR: subqueries are not supported in modifications across multiple shards
DETAIL: Consider using an equality filter on partition column "id" to target a single shard.
-- test connection API via using COPY
-- COPY on SELECT part
BEGIN;

View File

@ -158,10 +158,8 @@ SELECT COUNT(*) FROM limit_orders_mx WHERE id = 246;
0
(1 row)
-- commands with no constraints on the partition key are not supported
-- multi shard delete is supported
DELETE FROM limit_orders_mx WHERE bidder_id = 162;
ERROR: cannot run DELETE command which targets multiple shards
HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
-- commands with a USING clause are unsupported
CREATE TABLE bidders ( name text, id bigint );
DELETE FROM limit_orders_mx USING bidders WHERE limit_orders_mx.id = 246 AND
@ -174,8 +172,7 @@ DELETE FROM limit_orders_mx;
ERROR: common table expressions are not supported in distributed modifications
-- cursors are not supported
DELETE FROM limit_orders_mx WHERE CURRENT OF cursor_name;
ERROR: cannot run DELETE command which targets multiple shards
HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
ERROR: cannot run DML queries with cursors
INSERT INTO limit_orders_mx VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69);
-- simple UPDATE
UPDATE limit_orders_mx SET symbol = 'GM' WHERE id = 246;
@ -228,10 +225,8 @@ INSERT INTO limit_orders_mx VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sel
ERROR: duplicate key value violates unique constraint "limit_orders_mx_pkey_1220093"
DETAIL: Key (id)=(275) already exists.
CONTEXT: while executing command on localhost:57638
-- commands with no constraints on the partition key are not supported
-- multi shard update is supported
UPDATE limit_orders_mx SET limit_price = 0.00;
ERROR: cannot run UPDATE command which targets multiple shards
HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
-- attempting to change the partition key is unsupported
UPDATE limit_orders_mx SET id = 0 WHERE id = 246;
ERROR: modifying the partition value of rows is not allowed
@ -304,7 +299,7 @@ CREATE FUNCTION temp_strict_func(integer,integer) RETURNS integer AS
'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT;
UPDATE limit_orders_mx SET bidder_id = temp_strict_func(1, null) WHERE id = 246;
ERROR: null value in column "bidder_id" violates not-null constraint
DETAIL: Failing row contains (246, GM, null, 2007-07-02 16:32:15, buy, 999, {1,2}).
DETAIL: Failing row contains (246, GM, null, 2007-07-02 16:32:15, buy, 0.00, {1,2}).
CONTEXT: while executing command on localhost:57637
SELECT array_of_values FROM limit_orders_mx WHERE id = 246;
array_of_values
@ -324,8 +319,7 @@ UPDATE limit_orders_mx SET placed_at = placed_at WHERE id = 246 RETURNING NOW();
ERROR: non-IMMUTABLE functions are not allowed in the RETURNING clause
-- cursors are not supported
UPDATE limit_orders_mx SET symbol = 'GM' WHERE CURRENT OF cursor_name;
ERROR: cannot run UPDATE command which targets multiple shards
HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
ERROR: cannot run DML queries with cursors
-- check that multi-row UPDATE/DELETEs with RETURNING work
INSERT INTO multiple_hash_mx VALUES ('0', '1');
INSERT INTO multiple_hash_mx VALUES ('0', '2');

View File

@ -0,0 +1,499 @@
--
-- multi shard update delete
-- this file is intended to test multi shard update/delete queries
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1440000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1440000;
SET citus.shard_replication_factor to 1;
SET citus.multi_shard_modify_mode to 'parallel';
CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('users_test_table', 'user_id');
create_distributed_table
--------------------------
(1 row)
\COPY users_test_table FROM STDIN DELIMITER AS ',';
CREATE TABLE events_test_table (user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('events_test_table', 'user_id');
create_distributed_table
--------------------------
(1 row)
\COPY events_test_table FROM STDIN DELIMITER AS ',';
CREATE TABLE events_reference_copy_table (like events_test_table);
SELECT create_reference_table('events_reference_copy_table');
create_reference_table
------------------------
(1 row)
INSERT INTO events_reference_copy_table SELECT * FROM events_test_table;
CREATE TABLE users_reference_copy_table (like users_test_table);
SELECT create_reference_table('users_reference_copy_table');
create_reference_table
------------------------
(1 row)
INSERT INTO users_reference_copy_table SELECT * FROM users_test_table;
-- Run multi shard updates and deletes without transaction on hash distributed tables
UPDATE users_test_table SET value_1 = 1;
SELECT COUNT(*), SUM(value_1) FROM users_test_table;
count | sum
-------+-----
15 | 15
(1 row)
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
count | sum
-------+-----
4 | 52
(1 row)
UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3;
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
count | sum
-------+-----
4 | 56
(1 row)
UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5;
SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5;
sum
-----
0
(1 row)
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
count
-------
4
(1 row)
DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5;
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
count
-------
0
(1 row)
-- Run multi shard update delete queries within transactions
BEGIN;
UPDATE users_test_table SET value_3 = 0;
END;
SELECT SUM(value_3) FROM users_test_table;
sum
-----
0
(1 row)
-- Update can also be rollbacked
BEGIN;
UPDATE users_test_table SET value_3 = 1;
ROLLBACK;
SELECT SUM(value_3) FROM users_test_table;
sum
-----
0
(1 row)
-- Run with inserts (we need to set citus.multi_shard_modify_mode to sequential)
BEGIN;
INSERT INTO users_test_table (user_id, value_3) VALUES(20, 15);
INSERT INTO users_test_table (user_id, value_3) VALUES(16,1), (20,16), (7,1), (20,17);
SET citus.multi_shard_modify_mode to sequential;
UPDATE users_test_table SET value_3 = 1;
END;
SELECT COUNT()SUM(value_3) FROM users_test_table;
ERROR: syntax error at or near "("
LINE 1: SELECT COUNT()SUM(value_3) FROM users_test_table;
^
SET citus.multi_shard_modify_mode to 'sequential';
-- Run multiple multi shard updates (with sequential executor)
BEGIN;
UPDATE users_test_table SET value_3 = 5;
UPDATE users_test_table SET value_3 = 0;
END;
SELECT SUM(value_3) FROM users_copy_table;
ERROR: relation "users_copy_table" does not exist
LINE 1: SELECT SUM(value_3) FROM users_copy_table;
^
-- Run multiple multi shard updates (with parallel executor)
SET citus.multi_shard_modify_mode to 'parallel';
UPDATE users_test_table SET value_3 = 5;
BEGIN;
UPDATE users_test_table SET value_3 = 2;
UPDATE users_test_table SET value_3 = 0;
END;
SELECT SUM(value_3) FROM users_test_table;
sum
-----
0
(1 row)
-- Check with kind of constraints
UPDATE users_test_table SET value_3 = 1 WHERE user_id = 3 or true;
SELECT COUNT(*), SUM(value_3) FROM users_test_table;
count | sum
-------+-----
16 | 16
(1 row)
UPDATE users_test_table SET value_3 = 0 WHERE user_id = 20 and false;
SELECT COUNT(*), SUM(value_3) FROM users_test_table;
count | sum
-------+-----
16 | 16
(1 row)
-- Run multi shard updates with prepared statements
PREPARE foo_plan(int,int) AS UPDATE users_test_table SET value_1 = $1, value_3 = $2;
EXECUTE foo_plan(1,5);
EXECUTE foo_plan(3,15);
EXECUTE foo_plan(5,25);
EXECUTE foo_plan(7,35);
EXECUTE foo_plan(9,45);
EXECUTE foo_plan(0,0);
SELECT SUM(value_1), SUM(value_3) FROM users_test_table;
sum | sum
-----+-----
0 | 0
(1 row)
-- Test on append table (set executor mode to sequential, since with the append
-- distributed tables parallel executor may create tons of connections)
SET citus.multi_shard_modify_mode to sequential;
CREATE TABLE append_stage_table(id int, col_2 int);
INSERT INTO append_stage_table VALUES(1,3);
INSERT INTO append_stage_table VALUES(3,2);
INSERT INTO append_stage_table VALUES(5,4);
CREATE TABLE test_append_table(id int, col_2 int);
SELECT create_distributed_table('test_append_table','id','append');
create_distributed_table
--------------------------
(1 row)
SELECT master_create_empty_shard('test_append_table');
master_create_empty_shard
---------------------------
1440066
(1 row)
SELECT * FROM master_append_table_to_shard(1440066, 'append_stage_table', 'localhost', :master_port);
master_append_table_to_shard
------------------------------
0.0266667
(1 row)
SELECT master_create_empty_shard('test_append_table') AS new_shard_id;
new_shard_id
--------------
1440067
(1 row)
SELECT * FROM master_append_table_to_shard(1440067, 'append_stage_table', 'localhost', :master_port);
master_append_table_to_shard
------------------------------
0.0266667
(1 row)
UPDATE test_append_table SET col_2 = 5;
SELECT * FROM test_append_table;
id | col_2
----+-------
1 | 5
3 | 5
5 | 5
1 | 5
3 | 5
5 | 5
(6 rows)
DROP TABLE append_stage_table;
DROP TABLE test_append_table;
-- Update multi shard of partitioned distributed table
SET citus.multi_shard_modify_mode to 'parallel';
SET citus.shard_replication_factor to 1;
CREATE TABLE tt1(id int, col_2 int) partition by range (col_2);
CREATE TABLE tt1_510 partition of tt1 for VALUES FROM (5) to (10);
CREATE TABLE tt1_1120 partition of tt1 for VALUES FROM (11) to (20);
INSERT INTO tt1 VALUES (1,11), (3,15), (5,17), (6,19), (8,17), (2,12);
SELECT create_distributed_table('tt1','id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
UPDATE tt1 SET col_2 = 13;
DELETE FROM tt1 WHERE id = 1 or id = 3 or id = 5;
SELECT * FROM tt1;
id | col_2
----+-------
8 | 13
6 | 13
2 | 13
(3 rows)
-- Partitioned distributed table within transaction
INSERT INTO tt1 VALUES(4,6);
INSERT INTO tt1 VALUES(7,7);
INSERT INTO tt1 VALUES(9,8);
BEGIN;
-- Update rows from partititon tt1_1120
UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20;
-- Update rows from partititon tt1_510
UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5;
COMMIT;
SELECT * FROM tt1;
id | col_2
----+-------
8 | 12
4 | 7
7 | 7
6 | 12
2 | 12
9 | 7
(6 rows)
-- Modify main table and partition table within same transaction
BEGIN;
UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20;
UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5;
DELETE FROM tt1_510;
DELETE FROM tt1_1120;
COMMIT;
SELECT * FROM tt1;
id | col_2
----+-------
(0 rows)
DROP TABLE tt1;
-- Update and copy in the same transaction
CREATE TABLE tt2(id int, col_2 int);
SELECT create_distributed_table('tt2','id');
create_distributed_table
--------------------------
(1 row)
BEGIN;
\COPY tt2 FROM STDIN DELIMITER AS ',';
UPDATE tt2 SET col_2 = 1;
COMMIT;
SELECT * FROM tt2;
id | col_2
----+-------
1 | 1
7 | 1
3 | 1
2 | 1
9 | 1
(5 rows)
-- Test returning with both type of executors
UPDATE tt2 SET col_2 = 5 RETURNING id, col_2;
id | col_2
----+-------
1 | 5
7 | 5
3 | 5
2 | 5
9 | 5
(5 rows)
SET citus.multi_shard_modify_mode to sequential;
UPDATE tt2 SET col_2 = 3 RETURNING id, col_2;
id | col_2
----+-------
1 | 3
7 | 3
3 | 3
2 | 3
9 | 3
(5 rows)
DROP TABLE tt2;
-- Multiple RTEs are not supported
SET citus.multi_shard_modify_mode to DEFAULT;
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table);
ERROR: subqueries are not supported in modifications across multiple shards
DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard.
UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table);
ERROR: subqueries are not supported in modifications across multiple shards
DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard.
DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table);
ERROR: subqueries are not supported in modifications across multiple shards
DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard.
DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table);
ERROR: subqueries are not supported in modifications across multiple shards
DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard.
-- Cursors are not supported
BEGIN;
DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table;
FETCH test_cursor;
user_id | value_1 | value_2 | value_3
---------+---------+---------+---------
8 | 0 | 13 | 0
(1 row)
UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor;
ERROR: cannot run DML queries with cursors
ROLLBACK;
-- Stable functions are supported
CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int);
INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5);
INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7);
INSERT INTO test_table_1 VALUES(3, '2011-01-12 08:35:19', 9);
SELECT create_distributed_table('test_table_1', 'id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
SELECT * FROM test_table_1;
id | date_col | col_3
----+------------------------------+-------
1 | Sat Apr 05 08:32:12 2014 PDT | 5
3 | Wed Jan 12 08:35:19 2011 PST | 9
2 | Sun Feb 01 08:31:16 2015 PST | 7
(3 rows)
UPDATE test_table_1 SET col_3 = 3 WHERE date_col < now();
SELECT * FROM test_table_1;
id | date_col | col_3
----+------------------------------+-------
1 | Sat Apr 05 08:32:12 2014 PDT | 3
3 | Wed Jan 12 08:35:19 2011 PST | 3
2 | Sun Feb 01 08:31:16 2015 PST | 3
(3 rows)
DELETE FROM test_table_1 WHERE date_col < current_timestamp;
SELECT * FROM test_table_1;
id | date_col | col_3
----+----------+-------
(0 rows)
DROP TABLE test_table_1;
-- Volatile functions are not supported
CREATE TABLE test_table_2(id int, double_col double precision);
INSERT INTO test_table_2 VALUES(1, random());
INSERT INTO test_table_2 VALUES(2, random());
INSERT INTO test_table_2 VALUES(3, random());
SELECT create_distributed_table('test_table_2', 'id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
UPDATE test_table_2 SET double_col = random();
ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE
DROP TABLE test_table_2;
-- Run multi shard updates and deletes without transaction on reference tables
SELECT COUNT(*) FROM users_reference_copy_table;
count
-------
15
(1 row)
UPDATE users_reference_copy_table SET value_1 = 1;
SELECT SUM(value_1) FROM users_reference_copy_table;
sum
-----
15
(1 row)
SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
count | sum
-------+-----
4 | 52
(1 row)
UPDATE users_reference_copy_table SET value_2 = value_2 + 1 WHERE user_id = 3 or user_id = 5;
SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
count | sum
-------+-----
4 | 56
(1 row)
UPDATE users_reference_copy_table SET value_3 = 0 WHERE user_id <> 3;
SELECT SUM(value_3) FROM users_reference_copy_table WHERE user_id <> 3;
sum
-----
0
(1 row)
DELETE FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
SELECT COUNT(*) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
count
-------
0
(1 row)
-- Do some tests by changing shard replication factor
DROP TABLE users_test_table;
SET citus.shard_replication_factor to 2;
CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('users_test_table', 'user_id');
create_distributed_table
--------------------------
(1 row)
\COPY users_test_table FROM STDIN DELIMITER AS ',';
-- Run multi shard updates and deletes without transaction on hash distributed tables
UPDATE users_test_table SET value_1 = 1;
SELECT COUNT(*), SUM(value_1) FROM users_test_table;
count | sum
-------+-----
15 | 15
(1 row)
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
count | sum
-------+-----
4 | 52
(1 row)
UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3;
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
count | sum
-------+-----
4 | 56
(1 row)
UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5;
SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5;
sum
-----
0
(1 row)
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
count
-------
4
(1 row)
DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5;
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
count
-------
0
(1 row)
DROP TABLE users_test_table;
DROP TABLE events_test_table;
DROP TABLE events_reference_copy_table;
DROP TABLE users_reference_copy_table;

View File

@ -0,0 +1,522 @@
--
-- multi shard update delete
-- this file is intended to test multi shard update/delete queries
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1440000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1440000;
SET citus.shard_replication_factor to 1;
SET citus.multi_shard_modify_mode to 'parallel';
CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('users_test_table', 'user_id');
create_distributed_table
--------------------------
(1 row)
\COPY users_test_table FROM STDIN DELIMITER AS ',';
CREATE TABLE events_test_table (user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('events_test_table', 'user_id');
create_distributed_table
--------------------------
(1 row)
\COPY events_test_table FROM STDIN DELIMITER AS ',';
CREATE TABLE events_reference_copy_table (like events_test_table);
SELECT create_reference_table('events_reference_copy_table');
create_reference_table
------------------------
(1 row)
INSERT INTO events_reference_copy_table SELECT * FROM events_test_table;
CREATE TABLE users_reference_copy_table (like users_test_table);
SELECT create_reference_table('users_reference_copy_table');
create_reference_table
------------------------
(1 row)
INSERT INTO users_reference_copy_table SELECT * FROM users_test_table;
-- Run multi shard updates and deletes without transaction on hash distributed tables
UPDATE users_test_table SET value_1 = 1;
SELECT COUNT(*), SUM(value_1) FROM users_test_table;
count | sum
-------+-----
15 | 15
(1 row)
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
count | sum
-------+-----
4 | 52
(1 row)
UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3;
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
count | sum
-------+-----
4 | 56
(1 row)
UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5;
SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5;
sum
-----
0
(1 row)
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
count
-------
4
(1 row)
DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5;
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
count
-------
0
(1 row)
-- Run multi shard update delete queries within transactions
BEGIN;
UPDATE users_test_table SET value_3 = 0;
END;
SELECT SUM(value_3) FROM users_test_table;
sum
-----
0
(1 row)
-- Update can also be rollbacked
BEGIN;
UPDATE users_test_table SET value_3 = 1;
ROLLBACK;
SELECT SUM(value_3) FROM users_test_table;
sum
-----
0
(1 row)
-- Run with inserts (we need to set citus.multi_shard_modify_mode to sequential)
BEGIN;
INSERT INTO users_test_table (user_id, value_3) VALUES(20, 15);
INSERT INTO users_test_table (user_id, value_3) VALUES(16,1), (20,16), (7,1), (20,17);
SET citus.multi_shard_modify_mode to sequential;
UPDATE users_test_table SET value_3 = 1;
END;
SELECT COUNT()SUM(value_3) FROM users_test_table;
ERROR: syntax error at or near "("
LINE 1: SELECT COUNT()SUM(value_3) FROM users_test_table;
^
SET citus.multi_shard_modify_mode to 'sequential';
-- Run multiple multi shard updates (with sequential executor)
BEGIN;
UPDATE users_test_table SET value_3 = 5;
UPDATE users_test_table SET value_3 = 0;
END;
SELECT SUM(value_3) FROM users_copy_table;
ERROR: relation "users_copy_table" does not exist
LINE 1: SELECT SUM(value_3) FROM users_copy_table;
^
-- Run multiple multi shard updates (with parallel executor)
SET citus.multi_shard_modify_mode to 'parallel';
UPDATE users_test_table SET value_3 = 5;
BEGIN;
UPDATE users_test_table SET value_3 = 2;
UPDATE users_test_table SET value_3 = 0;
END;
SELECT SUM(value_3) FROM users_test_table;
sum
-----
0
(1 row)
-- Check with kind of constraints
UPDATE users_test_table SET value_3 = 1 WHERE user_id = 3 or true;
SELECT COUNT(*), SUM(value_3) FROM users_test_table;
count | sum
-------+-----
16 | 16
(1 row)
UPDATE users_test_table SET value_3 = 0 WHERE user_id = 20 and false;
SELECT COUNT(*), SUM(value_3) FROM users_test_table;
count | sum
-------+-----
16 | 16
(1 row)
-- Run multi shard updates with prepared statements
PREPARE foo_plan(int,int) AS UPDATE users_test_table SET value_1 = $1, value_3 = $2;
EXECUTE foo_plan(1,5);
EXECUTE foo_plan(3,15);
EXECUTE foo_plan(5,25);
EXECUTE foo_plan(7,35);
EXECUTE foo_plan(9,45);
EXECUTE foo_plan(0,0);
SELECT SUM(value_1), SUM(value_3) FROM users_test_table;
sum | sum
-----+-----
0 | 0
(1 row)
-- Test on append table (set executor mode to sequential, since with the append
-- distributed tables parallel executor may create tons of connections)
SET citus.multi_shard_modify_mode to sequential;
CREATE TABLE append_stage_table(id int, col_2 int);
INSERT INTO append_stage_table VALUES(1,3);
INSERT INTO append_stage_table VALUES(3,2);
INSERT INTO append_stage_table VALUES(5,4);
CREATE TABLE test_append_table(id int, col_2 int);
SELECT create_distributed_table('test_append_table','id','append');
create_distributed_table
--------------------------
(1 row)
SELECT master_create_empty_shard('test_append_table');
master_create_empty_shard
---------------------------
1440066
(1 row)
SELECT * FROM master_append_table_to_shard(1440066, 'append_stage_table', 'localhost', :master_port);
master_append_table_to_shard
------------------------------
0.0266667
(1 row)
SELECT master_create_empty_shard('test_append_table') AS new_shard_id;
new_shard_id
--------------
1440067
(1 row)
SELECT * FROM master_append_table_to_shard(1440067, 'append_stage_table', 'localhost', :master_port);
master_append_table_to_shard
------------------------------
0.0266667
(1 row)
UPDATE test_append_table SET col_2 = 5;
SELECT * FROM test_append_table;
id | col_2
----+-------
1 | 5
3 | 5
5 | 5
1 | 5
3 | 5
5 | 5
(6 rows)
DROP TABLE append_stage_table;
DROP TABLE test_append_table;
-- Update multi shard of partitioned distributed table
SET citus.multi_shard_modify_mode to 'parallel';
SET citus.shard_replication_factor to 1;
CREATE TABLE tt1(id int, col_2 int) partition by range (col_2);
ERROR: syntax error at or near "partition"
LINE 1: CREATE TABLE tt1(id int, col_2 int) partition by range (col_...
^
CREATE TABLE tt1_510 partition of tt1 for VALUES FROM (5) to (10);
ERROR: syntax error at or near "partition"
LINE 1: CREATE TABLE tt1_510 partition of tt1 for VALUES FROM (5) to...
^
CREATE TABLE tt1_1120 partition of tt1 for VALUES FROM (11) to (20);
ERROR: syntax error at or near "partition"
LINE 1: CREATE TABLE tt1_1120 partition of tt1 for VALUES FROM (11) ...
^
INSERT INTO tt1 VALUES (1,11), (3,15), (5,17), (6,19), (8,17), (2,12);
ERROR: relation "tt1" does not exist
LINE 1: INSERT INTO tt1 VALUES (1,11), (3,15), (5,17), (6,19), (8,17...
^
SELECT create_distributed_table('tt1','id');
ERROR: relation "tt1" does not exist
LINE 1: SELECT create_distributed_table('tt1','id');
^
UPDATE tt1 SET col_2 = 13;
ERROR: relation "tt1" does not exist
LINE 1: UPDATE tt1 SET col_2 = 13;
^
DELETE FROM tt1 WHERE id = 1 or id = 3 or id = 5;
ERROR: relation "tt1" does not exist
LINE 1: DELETE FROM tt1 WHERE id = 1 or id = 3 or id = 5;
^
SELECT * FROM tt1;
ERROR: relation "tt1" does not exist
LINE 1: SELECT * FROM tt1;
^
-- Partitioned distributed table within transaction
INSERT INTO tt1 VALUES(4,6);
ERROR: relation "tt1" does not exist
LINE 1: INSERT INTO tt1 VALUES(4,6);
^
INSERT INTO tt1 VALUES(7,7);
ERROR: relation "tt1" does not exist
LINE 1: INSERT INTO tt1 VALUES(7,7);
^
INSERT INTO tt1 VALUES(9,8);
ERROR: relation "tt1" does not exist
LINE 1: INSERT INTO tt1 VALUES(9,8);
^
BEGIN;
-- Update rows from partititon tt1_1120
UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20;
ERROR: relation "tt1" does not exist
LINE 1: UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20;
^
-- Update rows from partititon tt1_510
UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
SELECT * FROM tt1;
ERROR: relation "tt1" does not exist
LINE 1: SELECT * FROM tt1;
^
-- Modify main table and partition table within same transaction
BEGIN;
UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20;
ERROR: relation "tt1" does not exist
LINE 1: UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20;
^
UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5;
ERROR: current transaction is aborted, commands ignored until end of transaction block
DELETE FROM tt1_510;
ERROR: current transaction is aborted, commands ignored until end of transaction block
DELETE FROM tt1_1120;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
SELECT * FROM tt1;
ERROR: relation "tt1" does not exist
LINE 1: SELECT * FROM tt1;
^
DROP TABLE tt1;
ERROR: table "tt1" does not exist
-- Update and copy in the same transaction
CREATE TABLE tt2(id int, col_2 int);
SELECT create_distributed_table('tt2','id');
create_distributed_table
--------------------------
(1 row)
BEGIN;
\COPY tt2 FROM STDIN DELIMITER AS ',';
UPDATE tt2 SET col_2 = 1;
COMMIT;
SELECT * FROM tt2;
id | col_2
----+-------
1 | 1
7 | 1
3 | 1
2 | 1
9 | 1
(5 rows)
-- Test returning with both type of executors
UPDATE tt2 SET col_2 = 5 RETURNING id, col_2;
id | col_2
----+-------
1 | 5
7 | 5
3 | 5
2 | 5
9 | 5
(5 rows)
SET citus.multi_shard_modify_mode to sequential;
UPDATE tt2 SET col_2 = 3 RETURNING id, col_2;
id | col_2
----+-------
1 | 3
7 | 3
3 | 3
2 | 3
9 | 3
(5 rows)
DROP TABLE tt2;
-- Multiple RTEs are not supported
SET citus.multi_shard_modify_mode to DEFAULT;
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table);
ERROR: subqueries are not supported in modifications across multiple shards
DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard.
UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table);
ERROR: subqueries are not supported in modifications across multiple shards
DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard.
DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table);
ERROR: subqueries are not supported in modifications across multiple shards
DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard.
DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table);
ERROR: subqueries are not supported in modifications across multiple shards
DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard.
-- Cursors are not supported
BEGIN;
DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table;
FETCH test_cursor;
user_id | value_1 | value_2 | value_3
---------+---------+---------+---------
8 | 0 | 13 | 0
(1 row)
UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor;
ERROR: cannot run DML queries with cursors
ROLLBACK;
-- Stable functions are supported
CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int);
INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5);
INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7);
INSERT INTO test_table_1 VALUES(3, '2011-01-12 08:35:19', 9);
SELECT create_distributed_table('test_table_1', 'id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
SELECT * FROM test_table_1;
id | date_col | col_3
----+------------------------------+-------
1 | Sat Apr 05 08:32:12 2014 PDT | 5
3 | Wed Jan 12 08:35:19 2011 PST | 9
2 | Sun Feb 01 08:31:16 2015 PST | 7
(3 rows)
UPDATE test_table_1 SET col_3 = 3 WHERE date_col < now();
SELECT * FROM test_table_1;
id | date_col | col_3
----+------------------------------+-------
1 | Sat Apr 05 08:32:12 2014 PDT | 3
3 | Wed Jan 12 08:35:19 2011 PST | 3
2 | Sun Feb 01 08:31:16 2015 PST | 3
(3 rows)
DELETE FROM test_table_1 WHERE date_col < current_timestamp;
SELECT * FROM test_table_1;
id | date_col | col_3
----+----------+-------
(0 rows)
DROP TABLE test_table_1;
-- Volatile functions are not supported
CREATE TABLE test_table_2(id int, double_col double precision);
INSERT INTO test_table_2 VALUES(1, random());
INSERT INTO test_table_2 VALUES(2, random());
INSERT INTO test_table_2 VALUES(3, random());
SELECT create_distributed_table('test_table_2', 'id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
UPDATE test_table_2 SET double_col = random();
ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE
DROP TABLE test_table_2;
-- Run multi shard updates and deletes without transaction on reference tables
SELECT COUNT(*) FROM users_reference_copy_table;
count
-------
15
(1 row)
UPDATE users_reference_copy_table SET value_1 = 1;
SELECT SUM(value_1) FROM users_reference_copy_table;
sum
-----
15
(1 row)
SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
count | sum
-------+-----
4 | 52
(1 row)
UPDATE users_reference_copy_table SET value_2 = value_2 + 1 WHERE user_id = 3 or user_id = 5;
SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
count | sum
-------+-----
4 | 56
(1 row)
UPDATE users_reference_copy_table SET value_3 = 0 WHERE user_id <> 3;
SELECT SUM(value_3) FROM users_reference_copy_table WHERE user_id <> 3;
sum
-----
0
(1 row)
DELETE FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
SELECT COUNT(*) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
count
-------
0
(1 row)
-- Do some tests by changing shard replication factor
DROP TABLE users_test_table;
SET citus.shard_replication_factor to 2;
CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('users_test_table', 'user_id');
create_distributed_table
--------------------------
(1 row)
\COPY users_test_table FROM STDIN DELIMITER AS ',';
-- Run multi shard updates and deletes without transaction on hash distributed tables
UPDATE users_test_table SET value_1 = 1;
SELECT COUNT(*), SUM(value_1) FROM users_test_table;
count | sum
-------+-----
15 | 15
(1 row)
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
count | sum
-------+-----
4 | 52
(1 row)
UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3;
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
count | sum
-------+-----
4 | 56
(1 row)
UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5;
SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5;
sum
-----
0
(1 row)
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
count
-------
4
(1 row)
DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5;
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
count
-------
0
(1 row)
DROP TABLE users_test_table;
DROP TABLE events_test_table;
DROP TABLE events_reference_copy_table;
DROP TABLE users_reference_copy_table;

View File

@ -255,8 +255,8 @@ INSERT INTO dropcol_distributed AS dropcol (key, keep1) VALUES (1, '5') ON CONFL
-- subquery in the SET clause
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
UPDATE SET other_col = (SELECT count(*) from upsert_test);
ERROR: cannot perform distributed planning for the given modifications
DETAIL: Subqueries are not supported in distributed modifications.
ERROR: subqueries are not supported in modifications across multiple shards
DETAIL: Consider using an equality filter on partition column "part_key" to target a single shard.
-- non mutable function call in the SET
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
UPDATE SET other_col = random()::int;

View File

@ -27,8 +27,6 @@ SELECT master_create_distributed_table('customer_delete_protocol', 'c_custkey',
SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol
WHERE c_acctbal > 0.0');
-- Check that free-form deletes are not supported.
DELETE FROM customer_delete_protocol WHERE c_custkey > 100;
-- Check that we delete a shard if and only if all rows in the shard satisfy the condition.
SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol
WHERE c_custkey > 6500');

View File

@ -23,6 +23,7 @@ test: isolation_distributed_deadlock_detection
# writes, run this test serially.
test: isolation_create_restore_point
test: isolation_multi_shard_modify_vs_all
test: isolation_hash_copy_vs_all
test: isolation_append_copy_vs_all
test: isolation_range_copy_vs_all

View File

@ -37,6 +37,7 @@ test: multi_load_data
test: multi_behavioral_analytics_create_table
test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries
test: multi_insert_select multi_insert_select_window
test: multi_shard_update_delete
# ---
# Tests for partitioning support

View File

@ -27,10 +27,6 @@ SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol
WHERE c_acctbal > 0.0');
ERROR: cannot delete from distributed table
DETAIL: Where clause includes a column other than partition column
-- Check that free-form deletes are not supported.
DELETE FROM customer_delete_protocol WHERE c_custkey > 100;
ERROR: cannot run DELETE command which targets multiple shards
HINT: Consider using an equality filter on partition column "c_custkey" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
-- Check that we delete a shard if and only if all rows in the shard satisfy the condition.
SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol
WHERE c_custkey > 6500');

View File

@ -0,0 +1,128 @@
setup
{
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SET citus.shard_replication_factor to 1;
SET citus.shard_count to 32;
SET citus.multi_shard_modify_mode to 'parallel';
CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('users_test_table', 'user_id');
INSERT INTO users_test_table VALUES
(1, 5, 6, 7),
(2, 12, 7, 18),
(3, 23, 8, 25),
(4, 42, 9, 23),
(5, 35, 10, 17),
(6, 21, 11, 25),
(7, 27, 12, 18);
CREATE TABLE events_test_table (user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('events_test_table', 'user_id');
INSERT INTO events_test_table VALUES
(1, 5, 7, 7),
(3, 11, 78, 18),
(5, 22, 9, 25),
(7, 41, 10, 23),
(1, 20, 12, 25),
(3, 26, 13, 18),
(5, 17, 14, 4);
}
teardown
{
DROP TABLE users_test_table;
DROP TABLE events_test_table;
SELECT citus.restore_isolation_tester_func();
SET citus.shard_count to 4;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-change_connection_mode_to_sequential"
{
set citus.multi_shard_modify_mode to 'sequential';
}
step "s1-update_all_value_1"
{
UPDATE users_test_table SET value_1 = 3;
}
step "s1-update_value_1_of_1_or_3"
{
UPDATE users_test_table SET value_1 = 5 WHERE user_id = 1 or user_id = 3;
}
step "s1-commit"
{
COMMIT;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-change_connection_mode_to_sequential"
{
set citus.multi_shard_modify_mode to 'sequential';
}
step "s2-select"
{
SELECT * FROM users_test_table ORDER BY value_2;
}
step "s2-insert-to-table"
{
INSERT INTO users_test_table VALUES (1,2,3,4);
}
step "s2-insert-into-select"
{
INSERT INTO users_test_table SELECT * FROM events_test_table;
}
step "s2-update_all_value_1"
{
UPDATE users_test_table SET value_1 = 6;
}
step "s2-update_value_1_of_1_or_3"
{
UPDATE users_test_table SET value_1 = 8 WHERE user_id = 1 or user_id = 3;
}
step "s2-update_value_1_of_4_or_6"
{
UPDATE users_test_table SET value_1 = 4 WHERE user_id = 4 or user_id = 6;
}
step "s2-commit"
{
COMMIT;
}
# test with parallel connections
permutation "s1-begin" "s1-update_all_value_1" "s2-begin" "s2-select" "s1-commit" "s2-select" "s2-commit"
permutation "s1-begin" "s1-update_all_value_1" "s2-begin" "s2-update_all_value_1" "s1-commit" "s2-commit"
permutation "s1-begin" "s1-update_value_1_of_1_or_3" "s2-begin" "s2-update_value_1_of_4_or_6" "s1-commit" "s2-commit" "s2-select"
permutation "s1-begin" "s1-update_value_1_of_1_or_3" "s2-begin" "s2-update_value_1_of_1_or_3" "s1-commit" "s2-commit" "s2-select"
permutation "s1-begin" "s1-update_all_value_1" "s2-begin" "s2-insert-to-table" "s1-commit" "s2-commit" "s2-select"
permutation "s1-begin" "s1-update_all_value_1" "s2-begin" "s2-insert-into-select" "s1-commit" "s2-commit" "s2-select"
# test with sequential connections, sequential tests should not block each other
# if they are targeting different shards. If multiple connections updating the same
# row, second one must wait for the first one.
permutation "s1-begin" "s1-change_connection_mode_to_sequential" "s1-update_all_value_1" "s2-begin" "s2-change_connection_mode_to_sequential" "s2-update_all_value_1" "s1-commit" "s2-commit" "s2-select"
permutation "s1-begin" "s1-change_connection_mode_to_sequential" "s1-update_value_1_of_1_or_3" "s2-begin" "s2-change_connection_mode_to_sequential" "s2-update_value_1_of_1_or_3" "s1-commit" "s2-commit" "s2-select"
permutation "s1-begin" "s1-change_connection_mode_to_sequential" "s1-update_value_1_of_1_or_3" "s2-begin" "s2-change_connection_mode_to_sequential" "s2-update_value_1_of_4_or_6" "s1-commit" "s2-commit" "s2-select"

View File

@ -368,6 +368,20 @@ SELECT true AS valid FROM explain_xml($$
SELECT true AS valid FROM explain_json($$
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$);
-- Test multi shard update
EXPLAIN (COSTS FALSE)
UPDATE lineitem_hash_part
SET l_suppkey = 12;
EXPLAIN (COSTS FALSE)
UPDATE lineitem_hash_part
SET l_suppkey = 12
WHERE l_orderkey = 1 OR l_orderkey = 3;
-- Test multi shard delete
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem_hash_part;
-- Test track tracker
SET citus.task_executor_type TO 'task-tracker';
SET citus.explain_all_tasks TO off;

View File

@ -194,9 +194,6 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 246;
DELETE FROM limit_orders WHERE id = (2 * 123);
SELECT COUNT(*) FROM limit_orders WHERE id = 246;
-- commands with no constraints on the partition key are not supported
DELETE FROM limit_orders WHERE bidder_id = 162;
-- commands with a USING clause are unsupported
CREATE TABLE bidders ( name text, id bigint );
DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND
@ -207,9 +204,6 @@ DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND
WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *)
DELETE FROM limit_orders;
-- cursors are not supported
DELETE FROM limit_orders WHERE CURRENT OF cursor_name;
INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69);
-- simple UPDATE
@ -298,9 +292,6 @@ ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
-- Third: Connect back to master node
\c - - - :master_port
-- commands with no constraints on the partition key are not supported
UPDATE limit_orders SET limit_price = 0.00;
-- attempting to change the partition key is unsupported
UPDATE limit_orders SET id = 0 WHERE id = 246;
UPDATE limit_orders SET id = 0 WHERE id = 0 OR id = 246;
@ -382,9 +373,6 @@ ALTER TABLE limit_orders DROP array_of_values;
-- even in RETURNING
UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW();
-- cursors are not supported
UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name;
-- check that multi-row UPDATE/DELETEs with RETURNING work
INSERT INTO multiple_hash VALUES ('0', '1');
INSERT INTO multiple_hash VALUES ('0', '2');

View File

@ -106,7 +106,7 @@ SELECT COUNT(*) FROM limit_orders_mx WHERE id = 246;
DELETE FROM limit_orders_mx WHERE id = (2 * 123);
SELECT COUNT(*) FROM limit_orders_mx WHERE id = 246;
-- commands with no constraints on the partition key are not supported
-- multi shard delete is supported
DELETE FROM limit_orders_mx WHERE bidder_id = 162;
-- commands with a USING clause are unsupported
@ -149,7 +149,7 @@ UPDATE limit_orders_mx SET (kind, limit_price) = ('buy', 999) WHERE id = 246 RET
INSERT INTO limit_orders_mx VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
INSERT INTO limit_orders_mx VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
-- commands with no constraints on the partition key are not supported
-- multi shard update is supported
UPDATE limit_orders_mx SET limit_price = 0.00;
-- attempting to change the partition key is unsupported

View File

@ -0,0 +1,307 @@
--
-- multi shard update delete
-- this file is intended to test multi shard update/delete queries
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1440000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1440000;
SET citus.shard_replication_factor to 1;
SET citus.multi_shard_modify_mode to 'parallel';
CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('users_test_table', 'user_id');
\COPY users_test_table FROM STDIN DELIMITER AS ',';
1, 5, 6, 7
2, 12, 7, 18
3, 23, 8, 25
4, 42, 9, 23
5, 35, 10, 21
6, 21, 11, 25
7, 27, 12, 18
8, 18, 13, 4
7, 38, 14, 22
6, 43, 15, 22
5, 61, 16, 17
4, 6, 17, 8
3, 16, 18, 44
2, 25, 19, 38
1, 55, 20, 17
\.
CREATE TABLE events_test_table (user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('events_test_table', 'user_id');
\COPY events_test_table FROM STDIN DELIMITER AS ',';
1, 5, 7, 7
3, 11, 78, 18
5, 22, 9, 25
7, 41, 10, 23
9, 34, 11, 21
1, 20, 12, 25
3, 26, 13, 18
5, 17, 14, 4
7, 37, 15, 22
9, 42, 16, 22
1, 60, 17, 17
3, 5, 18, 8
5, 15, 19, 44
7, 24, 20, 38
9, 54, 21, 17
\.
CREATE TABLE events_reference_copy_table (like events_test_table);
SELECT create_reference_table('events_reference_copy_table');
INSERT INTO events_reference_copy_table SELECT * FROM events_test_table;
CREATE TABLE users_reference_copy_table (like users_test_table);
SELECT create_reference_table('users_reference_copy_table');
INSERT INTO users_reference_copy_table SELECT * FROM users_test_table;
-- Run multi shard updates and deletes without transaction on hash distributed tables
UPDATE users_test_table SET value_1 = 1;
SELECT COUNT(*), SUM(value_1) FROM users_test_table;
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3;
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5;
SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5;
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5;
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
-- Run multi shard update delete queries within transactions
BEGIN;
UPDATE users_test_table SET value_3 = 0;
END;
SELECT SUM(value_3) FROM users_test_table;
-- Update can also be rollbacked
BEGIN;
UPDATE users_test_table SET value_3 = 1;
ROLLBACK;
SELECT SUM(value_3) FROM users_test_table;
-- Run with inserts (we need to set citus.multi_shard_modify_mode to sequential)
BEGIN;
INSERT INTO users_test_table (user_id, value_3) VALUES(20, 15);
INSERT INTO users_test_table (user_id, value_3) VALUES(16,1), (20,16), (7,1), (20,17);
SET citus.multi_shard_modify_mode to sequential;
UPDATE users_test_table SET value_3 = 1;
END;
SELECT COUNT()SUM(value_3) FROM users_test_table;
SET citus.multi_shard_modify_mode to 'sequential';
-- Run multiple multi shard updates (with sequential executor)
BEGIN;
UPDATE users_test_table SET value_3 = 5;
UPDATE users_test_table SET value_3 = 0;
END;
SELECT SUM(value_3) FROM users_copy_table;
-- Run multiple multi shard updates (with parallel executor)
SET citus.multi_shard_modify_mode to 'parallel';
UPDATE users_test_table SET value_3 = 5;
BEGIN;
UPDATE users_test_table SET value_3 = 2;
UPDATE users_test_table SET value_3 = 0;
END;
SELECT SUM(value_3) FROM users_test_table;
-- Check with kind of constraints
UPDATE users_test_table SET value_3 = 1 WHERE user_id = 3 or true;
SELECT COUNT(*), SUM(value_3) FROM users_test_table;
UPDATE users_test_table SET value_3 = 0 WHERE user_id = 20 and false;
SELECT COUNT(*), SUM(value_3) FROM users_test_table;
-- Run multi shard updates with prepared statements
PREPARE foo_plan(int,int) AS UPDATE users_test_table SET value_1 = $1, value_3 = $2;
EXECUTE foo_plan(1,5);
EXECUTE foo_plan(3,15);
EXECUTE foo_plan(5,25);
EXECUTE foo_plan(7,35);
EXECUTE foo_plan(9,45);
EXECUTE foo_plan(0,0);
SELECT SUM(value_1), SUM(value_3) FROM users_test_table;
-- Test on append table (set executor mode to sequential, since with the append
-- distributed tables parallel executor may create tons of connections)
SET citus.multi_shard_modify_mode to sequential;
CREATE TABLE append_stage_table(id int, col_2 int);
INSERT INTO append_stage_table VALUES(1,3);
INSERT INTO append_stage_table VALUES(3,2);
INSERT INTO append_stage_table VALUES(5,4);
CREATE TABLE test_append_table(id int, col_2 int);
SELECT create_distributed_table('test_append_table','id','append');
SELECT master_create_empty_shard('test_append_table');
SELECT * FROM master_append_table_to_shard(1440066, 'append_stage_table', 'localhost', :master_port);
SELECT master_create_empty_shard('test_append_table') AS new_shard_id;
SELECT * FROM master_append_table_to_shard(1440067, 'append_stage_table', 'localhost', :master_port);
UPDATE test_append_table SET col_2 = 5;
SELECT * FROM test_append_table;
DROP TABLE append_stage_table;
DROP TABLE test_append_table;
-- Update multi shard of partitioned distributed table
SET citus.multi_shard_modify_mode to 'parallel';
SET citus.shard_replication_factor to 1;
CREATE TABLE tt1(id int, col_2 int) partition by range (col_2);
CREATE TABLE tt1_510 partition of tt1 for VALUES FROM (5) to (10);
CREATE TABLE tt1_1120 partition of tt1 for VALUES FROM (11) to (20);
INSERT INTO tt1 VALUES (1,11), (3,15), (5,17), (6,19), (8,17), (2,12);
SELECT create_distributed_table('tt1','id');
UPDATE tt1 SET col_2 = 13;
DELETE FROM tt1 WHERE id = 1 or id = 3 or id = 5;
SELECT * FROM tt1;
-- Partitioned distributed table within transaction
INSERT INTO tt1 VALUES(4,6);
INSERT INTO tt1 VALUES(7,7);
INSERT INTO tt1 VALUES(9,8);
BEGIN;
-- Update rows from partititon tt1_1120
UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20;
-- Update rows from partititon tt1_510
UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5;
COMMIT;
SELECT * FROM tt1;
-- Modify main table and partition table within same transaction
BEGIN;
UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20;
UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5;
DELETE FROM tt1_510;
DELETE FROM tt1_1120;
COMMIT;
SELECT * FROM tt1;
DROP TABLE tt1;
-- Update and copy in the same transaction
CREATE TABLE tt2(id int, col_2 int);
SELECT create_distributed_table('tt2','id');
BEGIN;
\COPY tt2 FROM STDIN DELIMITER AS ',';
1, 10
3, 15
7, 14
9, 75
2, 42
\.
UPDATE tt2 SET col_2 = 1;
COMMIT;
SELECT * FROM tt2;
-- Test returning with both type of executors
UPDATE tt2 SET col_2 = 5 RETURNING id, col_2;
SET citus.multi_shard_modify_mode to sequential;
UPDATE tt2 SET col_2 = 3 RETURNING id, col_2;
DROP TABLE tt2;
-- Multiple RTEs are not supported
SET citus.multi_shard_modify_mode to DEFAULT;
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table);
UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table);
DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table);
DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table);
-- Cursors are not supported
BEGIN;
DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table;
FETCH test_cursor;
UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor;
ROLLBACK;
-- Stable functions are supported
CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int);
INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5);
INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7);
INSERT INTO test_table_1 VALUES(3, '2011-01-12 08:35:19', 9);
SELECT create_distributed_table('test_table_1', 'id');
SELECT * FROM test_table_1;
UPDATE test_table_1 SET col_3 = 3 WHERE date_col < now();
SELECT * FROM test_table_1;
DELETE FROM test_table_1 WHERE date_col < current_timestamp;
SELECT * FROM test_table_1;
DROP TABLE test_table_1;
-- Volatile functions are not supported
CREATE TABLE test_table_2(id int, double_col double precision);
INSERT INTO test_table_2 VALUES(1, random());
INSERT INTO test_table_2 VALUES(2, random());
INSERT INTO test_table_2 VALUES(3, random());
SELECT create_distributed_table('test_table_2', 'id');
UPDATE test_table_2 SET double_col = random();
DROP TABLE test_table_2;
-- Run multi shard updates and deletes without transaction on reference tables
SELECT COUNT(*) FROM users_reference_copy_table;
UPDATE users_reference_copy_table SET value_1 = 1;
SELECT SUM(value_1) FROM users_reference_copy_table;
SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
UPDATE users_reference_copy_table SET value_2 = value_2 + 1 WHERE user_id = 3 or user_id = 5;
SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
UPDATE users_reference_copy_table SET value_3 = 0 WHERE user_id <> 3;
SELECT SUM(value_3) FROM users_reference_copy_table WHERE user_id <> 3;
DELETE FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
SELECT COUNT(*) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5;
-- Do some tests by changing shard replication factor
DROP TABLE users_test_table;
SET citus.shard_replication_factor to 2;
CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('users_test_table', 'user_id');
\COPY users_test_table FROM STDIN DELIMITER AS ',';
1, 5, 6, 7
2, 12, 7, 18
3, 23, 8, 25
4, 42, 9, 23
5, 35, 10, 21
6, 21, 11, 25
7, 27, 12, 18
8, 18, 13, 4
7, 38, 14, 22
6, 43, 15, 22
5, 61, 16, 17
4, 6, 17, 8
3, 16, 18, 44
2, 25, 19, 38
1, 55, 20, 17
\.
-- Run multi shard updates and deletes without transaction on hash distributed tables
UPDATE users_test_table SET value_1 = 1;
SELECT COUNT(*), SUM(value_1) FROM users_test_table;
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3;
SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3;
UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5;
SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5;
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5;
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
DROP TABLE users_test_table;
DROP TABLE events_test_table;
DROP TABLE events_reference_copy_table;
DROP TABLE users_reference_copy_table;