From 0b5db5d826009d75cb4bd4ee6cd498c67c3e751d Mon Sep 17 00:00:00 2001 From: velioglu Date: Fri, 22 Sep 2017 13:44:31 -0700 Subject: [PATCH] Support multi shard update/delete queries --- .../distributed/executor/multi_executor.c | 12 +- .../executor/multi_server_executor.c | 10 - .../master/master_modify_multiple_shards.c | 10 +- .../distributed/planner/deparse_shard_query.c | 6 +- .../planner/insert_select_planner.c | 15 +- .../planner/multi_physical_planner.c | 6 +- .../distributed/planner/multi_planner.c | 71 ++- .../planner/multi_router_planner.c | 373 ++++++++----- src/backend/distributed/shared_library_init.c | 16 + .../distributed/utils/metadata_cache.c | 2 +- src/include/distributed/multi_executor.h | 8 + src/include/distributed/multi_planner.h | 5 +- .../distributed/multi_router_planner.h | 4 +- .../distributed/multi_server_executor.h | 2 - .../isolation_multi_shard_modify_vs_all.out | 317 +++++++++++ src/test/regress/expected/multi_explain.out | 64 +++ src/test/regress/expected/multi_explain_0.out | 64 +++ .../regress/expected/multi_modifications.out | 28 +- .../expected/multi_mx_modifications.out | 16 +- .../expected/multi_shard_update_delete.out | 499 +++++++++++++++++ .../expected/multi_shard_update_delete_0.out | 522 ++++++++++++++++++ src/test/regress/expected/multi_upsert.out | 4 +- .../input/multi_master_delete_protocol.source | 2 - src/test/regress/isolation_schedule | 1 + src/test/regress/multi_schedule | 1 + .../multi_master_delete_protocol.source | 4 - .../isolation_multi_shard_modify_vs_all.spec | 128 +++++ src/test/regress/sql/multi_explain.sql | 14 + src/test/regress/sql/multi_modifications.sql | 12 - .../regress/sql/multi_mx_modifications.sql | 4 +- .../regress/sql/multi_shard_update_delete.sql | 307 ++++++++++ 31 files changed, 2305 insertions(+), 222 deletions(-) create mode 100644 src/test/regress/expected/isolation_multi_shard_modify_vs_all.out create mode 100644 src/test/regress/expected/multi_shard_update_delete.out create mode 100644 src/test/regress/expected/multi_shard_update_delete_0.out create mode 100644 src/test/regress/specs/isolation_multi_shard_modify_vs_all.spec create mode 100644 src/test/regress/sql/multi_shard_update_delete.sql diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 207c89748..be9053ce6 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -38,6 +38,9 @@ #include "utils/memutils.h" +/* controls the connection type for multi shard update/delete queries */ +int MultiShardConnectionType = PARALLEL_CONNECTION; + /* * Define executor methods for the different executor types. */ @@ -176,11 +179,14 @@ RouterCreateScan(CustomScan *scan) { Assert(isModificationQuery); - if (IsMultiRowInsert(workerJob->jobQuery)) + if (IsMultiRowInsert(workerJob->jobQuery) || + (IsUpdateOrDelete(multiPlan) && + MultiShardConnectionType == SEQUENTIAL_CONNECTION)) { /* - * Multi-row INSERT is executed sequentially instead of using - * parallel connections. + * Multi shard update deletes while multi_shard_modify_mode equals + * to 'sequential' or Multi-row INSERT are executed sequentially + * instead of using parallel connections. */ scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods; } diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 5d248c8a4..3afc6cec7 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -29,7 +29,6 @@ int RemoteTaskCheckInterval = 100; /* per cycle sleep interval in millisecs */ int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */ bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */ -int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log level */ /* @@ -62,15 +61,6 @@ JobExecutorType(MultiPlan *multiPlan) return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; } - /* if it is not a router executable plan, inform user according to the log level */ - if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF) - { - ereport(MultiTaskQueryLogLevel, (errmsg("multi-task query about to be executed"), - errhint("Queries are split to multiple tasks " - "if they have to be split into several" - " queries on the workers."))); - } - Assert(multiPlan->operation == CMD_SELECT); workerNodeList = ActiveReadableNodeList(); diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 5e918bad5..62f594701 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -55,8 +55,7 @@ #include "utils/memutils.h" -static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, - Oid relationId); +static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList); PG_FUNCTION_INFO_V1(master_modify_multiple_shards); @@ -176,8 +175,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) CHECK_FOR_INTERRUPTS(); - taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, - relationId); + taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList); affectedTupleCount = ExecuteModifyTasksWithoutResults(taskList); PG_RETURN_INT32(affectedTupleCount); @@ -189,14 +187,14 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) * given list of shards. */ static List * -ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, Oid relationId) +ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList) { List *taskList = NIL; ListCell *shardIntervalCell = NULL; uint64 jobId = INVALID_JOB_ID; int taskId = 1; - /* lock metadata before getting placment lists */ + /* lock metadata before getting placement lists */ LockShardListMetadata(shardIntervalList, ShareLock); foreach(shardIntervalCell, shardIntervalList) diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 5bb718501..66f81a47a 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -54,7 +54,11 @@ RebuildQueryStrings(Query *originalQuery, List *taskList) Task *task = (Task *) lfirst(taskCell); Query *query = originalQuery; - if (task->insertSelectQuery) + if (UpdateOrDeleteQuery(query) && list_length(taskList)) + { + query = copyObject(originalQuery); + } + else if (task->insertSelectQuery) { /* for INSERT..SELECT, adjust shard names in SELECT part */ RangeTblEntry *copiedInsertRte = NULL; diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 538f0c929..6f98b3710 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -269,15 +269,6 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, ++taskIdIndex; } - if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF && - list_length(sqlTaskList) > 1) - { - ereport(MultiTaskQueryLogLevel, (errmsg("multi-task query about to be executed"), - errhint("Queries are split to multiple tasks " - "if they have to be split into several" - " queries on the workers."))); - } - /* Create the worker job */ workerJob = CitusMakeNode(Job); workerJob->taskList = sqlTaskList; @@ -484,6 +475,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter List *shardOpExpressions = NIL; RestrictInfo *shardRestrictionList = NULL; DeferredErrorMessage *planningError = NULL; + bool multiShardModifyQuery = false; /* grab shared metadata lock to stop concurrent placement additions */ LockShardDistributionMetadata(shardId, ShareLock); @@ -543,7 +535,10 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter */ planningError = PlanRouterQuery(copiedSubquery, copiedRestrictionContext, &selectPlacementList, &selectAnchorShardId, - &relationShardList, replacePrunedQueryWithDummy); + &relationShardList, replacePrunedQueryWithDummy, + &multiShardModifyQuery); + + Assert(!multiShardModifyQuery); if (planningError) { diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 50ec33878..4fc20931a 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2341,6 +2341,7 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval, List *shardOpExpressions = NIL; RestrictInfo *shardRestrictionList = NULL; DeferredErrorMessage *planningError = NULL; + bool multiShardModifQuery = false; /* * Add the restriction qual parameter value in all baserestrictinfos. @@ -2379,7 +2380,10 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval, */ planningError = PlanRouterQuery(taskQuery, copiedRestrictionContext, &selectPlacementList, &selectAnchorShardId, - &relationShardList, replacePrunedQueryWithDummy); + &relationShardList, replacePrunedQueryWithDummy, + &multiShardModifQuery); + + Assert(!multiShardModifQuery); /* we don't expect to this this error but keeping it as a precaution for future changes */ if (planningError) diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index d020b479b..52681bcc3 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -37,6 +37,7 @@ static List *plannerRestrictionContextList = NIL; +int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log level */ /* create custom scan methods for separate executors */ static CustomScanMethods RealTimeCustomScanMethods = { @@ -283,6 +284,56 @@ IsModifyCommand(Query *query) } +/* + * IsMultiShardModifyPlan returns true if the given plan was generated for + * multi shard update or delete query. + */ +bool +IsMultiShardModifyPlan(MultiPlan *multiPlan) +{ + if (IsUpdateOrDelete(multiPlan) && IsMultiTaskPlan(multiPlan)) + { + return true; + } + + return false; +} + + +/* + * IsMultiTaskPlan returns true if job contains multiple tasks. + */ +bool +IsMultiTaskPlan(MultiPlan *multiPlan) +{ + Job *workerJob = multiPlan->workerJob; + + if (workerJob != NULL && list_length(workerJob->taskList) > 1) + { + return true; + } + + return false; +} + + +/* + * IsUpdateOrDelete returns true if the query performs update or delete. + */ +bool +IsUpdateOrDelete(MultiPlan *multiPlan) +{ + CmdType commandType = multiPlan->operation; + + if (commandType == CMD_UPDATE || commandType == CMD_DELETE) + { + return true; + } + + return false; +} + + /* * IsModifyMultiPlan returns true if the multi plan performs modifications, * false otherwise. @@ -436,9 +487,11 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query /* * As explained above, force planning costs to be unrealistically high if - * query planning failed (possibly) due to prepared statement parameters. + * query planning failed (possibly) due to prepared statement parameters or + * if it is planned as a multi shard modify query. */ - if (distributedPlan->planningError && hasUnresolvedParams) + if ((distributedPlan->planningError || IsMultiShardModifyPlan(distributedPlan)) && + hasUnresolvedParams) { /* * Arbitraryly high cost, but low enough that it can be added up @@ -530,6 +583,20 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) } } + if (IsMultiTaskPlan(multiPlan)) + { + /* if it is not a single task executable plan, inform user according to the log level */ + if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF) + { + ereport(MultiTaskQueryLogLevel, (errmsg( + "multi-task query about to be executed"), + errhint( + "Queries are split to multiple tasks " + "if they have to be split into several" + " queries on the workers."))); + } + } + multiPlan->relationIdList = localPlan->relationOids; multiPlanData = (Node *) multiPlan; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 1013a214a..a61627527 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2,8 +2,8 @@ * * multi_router_planner.c * - * This file contains functions to plan single shard queries - * including distributed table modifications. + * This file contains functions to plan multiple shard queries without any + * aggregation step including distributed table modifications. * * Copyright (c) 2014-2016, Citus Data, Inc. * @@ -137,7 +137,6 @@ static List * ExtractInsertValuesList(Query *query, Var *partitionColumn); static bool MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext); static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree); -static bool UpdateOrDeleteQuery(Query *query); static RangeTblEntry * GetUpdateOrDeleteRTE(List *rangeTableList); static bool UpdateOrDeleteRTE(RangeTblEntry *rangeTableEntry); static bool SelectsFromDistributedTable(List *rangeTableList); @@ -146,7 +145,13 @@ static List * get_all_actual_clauses(List *restrictinfo_list); #endif static int CompareInsertValuesByShardId(const void *leftElement, const void *rightElement); - +static uint64 GetInitialShardId(List *relationShardList); +static List * SingleShardSelectTaskList(Query *query, List *relationShardList, + List *placementList, uint64 shardId); +static List * SingleShardModifyTaskList(Query *query, List *relationShardList, + List *placementList, uint64 shardId); +static List * MultiShardModifyTaskList(Query *originalQuery, List *relationShardList, + bool requiresMasterEvaluation); /* * CreateRouterPlan attempts to create a router executor plan for the given @@ -250,7 +255,9 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, return multiPlan; } + /* we cannot have multi shard update/delete query via this code path */ job = RouterJob(originalQuery, restrictionContext, &multiPlan->planningError); + if (multiPlan->planningError) { /* query cannot be handled by this planner */ @@ -516,11 +523,21 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery) */ if (!UpdateOrDeleteQuery(queryTree) || multiShardQuery) { + StringInfo errorHint = makeStringInfo(); + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry( + distributedTableId); + char *partitionKeyString = cacheEntry->partitionKeyString; + char *partitionColumnName = ColumnNameToColumn(distributedTableId, + partitionKeyString); + + appendStringInfo(errorHint, + "Consider using an equality filter on partition column \"%s\" to target a single shard.", + partitionColumnName); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot perform distributed planning for the given " - "modifications", - "Subqueries are not supported in distributed " - "modifications.", NULL); + "subqueries are not supported in modifications across " + "multiple shards", + errorHint->data, NULL); } } @@ -602,8 +619,20 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery) */ if (rangeTableEntry->rtekind == RTE_SUBQUERY) { - rangeTableEntryErrorDetail = "Subqueries are not supported in" - " distributed modifications."; + StringInfo errorHint = makeStringInfo(); + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry( + distributedTableId); + char *partitionKeyString = cacheEntry->partitionKeyString; + char *partitionColumnName = ColumnNameToColumn(distributedTableId, + partitionKeyString); + + appendStringInfo(errorHint, "Consider using an equality filter on " + "partition column \"%s\" to target a single shard.", + partitionColumnName); + + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "subqueries are not " + "supported in modifications across multiple shards", + errorHint->data, NULL); } else if (rangeTableEntry->rtekind == RTE_JOIN) { @@ -741,6 +770,14 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery) "RETURNING clause", NULL, NULL); } + + if (queryTree->jointree->quals != NULL && + nodeTag(queryTree->jointree->quals) == T_CurrentOfExpr) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot run DML queries with cursors", NULL, + NULL); + } } if (commandType == CMD_INSERT && queryTree->onConflict != NULL) @@ -836,7 +873,7 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery) * UpdateOrDeleteQuery checks if the given query is an UPDATE or DELETE command. * If it is, it returns true otherwise it returns false. */ -static bool +bool UpdateOrDeleteQuery(Query *query) { CmdType commandType = query->commandType; @@ -1326,14 +1363,15 @@ ExtractFirstDistributedTableId(Query *query) } -/* RouterJob builds a Job to represent a single shard select/update/delete query */ +/* + * RouterJob builds a Job to represent a single shard select/update/delete and + * multiple shard update/delete queries. + */ static Job * RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext, DeferredErrorMessage **planningError) { Job *job = NULL; - Task *task = NULL; - StringInfo queryString = makeStringInfo(); uint64 shardId = INVALID_SHARD_ID; List *placementList = NIL; List *relationShardList = NIL; @@ -1341,6 +1379,7 @@ RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext, bool replacePrunedQueryWithDummy = false; bool requiresMasterEvaluation = false; RangeTblEntry *updateOrDeleteRTE = NULL; + bool isMultiShardModifyQuery = false; /* router planner should create task even if it deosn't hit a shard at all */ replacePrunedQueryWithDummy = true; @@ -1350,7 +1389,8 @@ RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext, (*planningError) = PlanRouterQuery(originalQuery, restrictionContext, &placementList, &shardId, &relationShardList, - replacePrunedQueryWithDummy); + replacePrunedQueryWithDummy, + &isMultiShardModifyQuery); if (*planningError) { return NULL; @@ -1374,42 +1414,127 @@ RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext, return job; } - pg_get_query_def(originalQuery, queryString); - if (originalQuery->commandType == CMD_SELECT) { - task = CreateTask(ROUTER_TASK); + job->taskList = SingleShardSelectTaskList(originalQuery, relationShardList, + placementList, shardId); + } + else if (isMultiShardModifyQuery) + { + job->taskList = MultiShardModifyTaskList(originalQuery, relationShardList, + requiresMasterEvaluation); } else { - DistTableCacheEntry *modificationTableCacheEntry = NULL; - char modificationPartitionMethod = 0; - - modificationTableCacheEntry = DistributedTableCacheEntry( - updateOrDeleteRTE->relid); - modificationPartitionMethod = modificationTableCacheEntry->partitionMethod; - - if (modificationPartitionMethod == DISTRIBUTE_BY_NONE && - SelectsFromDistributedTable(rangeTableList)) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform select on a distributed table " - "and modify a reference table"))); - } - - task = CreateTask(MODIFY_TASK); - task->replicationModel = modificationTableCacheEntry->replicationModel; + job->taskList = SingleShardModifyTaskList(originalQuery, relationShardList, + placementList, shardId); } + job->requiresMasterEvaluation = requiresMasterEvaluation; + return job; +} + + +/* + * SingleShardSelectTaskList generates a task for single shard select query + * and returns it as a list. + */ +static List * +SingleShardSelectTaskList(Query *query, List *relationShardList, List *placementList, + uint64 shardId) +{ + Task *task = CreateTask(ROUTER_TASK); + StringInfo queryString = makeStringInfo(); + + pg_get_query_def(query, queryString); + task->queryString = queryString->data; task->anchorShardId = shardId; task->taskPlacementList = placementList; task->relationShardList = relationShardList; - job->taskList = list_make1(task); - job->requiresMasterEvaluation = requiresMasterEvaluation; + return list_make1(task); +} - return job; + +/* + * MultiShardModifyTaskList generates task list for multi shard update/delete + * queries. + */ +static List * +MultiShardModifyTaskList(Query *originalQuery, List *relationShardList, + bool requiresMasterEvaluation) +{ + List *taskList = NIL; + ListCell *relationShardCell = NULL; + int taskId = 1; + + foreach(relationShardCell, relationShardList) + { + RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell); + List *relationShardList = list_make1(relationShard); + Task *task = CreateTask(MODIFY_TASK); + + if (!requiresMasterEvaluation) + { + Query *copiedQuery = copyObject(originalQuery); + StringInfo shardQueryString = makeStringInfo(); + + UpdateRelationToShardNames((Node *) copiedQuery, relationShardList); + pg_get_query_def(copiedQuery, shardQueryString); + + task->queryString = shardQueryString->data; + } + + task->taskId = taskId++; + task->anchorShardId = relationShard->shardId; + task->relationShardList = relationShardList; + + taskList = lappend(taskList, task); + } + + return taskList; +} + + +/* + * SingleShardModifyTaskList generates a task for single shard update/delete query + * and returns it as a list. + */ +static List * +SingleShardModifyTaskList(Query *query, List *relationShardList, List *placementList, + uint64 shardId) +{ + Task *task = CreateTask(MODIFY_TASK); + StringInfo queryString = makeStringInfo(); + DistTableCacheEntry *modificationTableCacheEntry = NULL; + char modificationPartitionMethod = 0; + List *rangeTableList = NIL; + RangeTblEntry *updateOrDeleteRTE = NULL; + + ExtractRangeTableEntryWalker((Node *) query, &rangeTableList); + updateOrDeleteRTE = GetUpdateOrDeleteRTE(rangeTableList); + + modificationTableCacheEntry = DistributedTableCacheEntry(updateOrDeleteRTE->relid); + modificationPartitionMethod = modificationTableCacheEntry->partitionMethod; + + if (modificationPartitionMethod == DISTRIBUTE_BY_NONE && + SelectsFromDistributedTable(rangeTableList)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform select on a distributed table " + "and modify a reference table"))); + } + + pg_get_query_def(query, queryString); + + task->queryString = queryString->data; + task->anchorShardId = shardId; + task->taskPlacementList = placementList; + task->relationShardList = relationShardList; + task->replicationModel = modificationTableCacheEntry->replicationModel; + + return list_make1(task); } @@ -1501,85 +1626,52 @@ SelectsFromDistributedTable(List *rangeTableList) DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext, List **placementList, uint64 *anchorShardId, List **relationShardList, - bool replacePrunedQueryWithDummy) + bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery) { - bool multiShardQuery = false; + bool isMultiShardQuery = false; List *prunedRelationShardList = NIL; DeferredErrorMessage *planningError = NULL; ListCell *prunedRelationShardListCell = NULL; List *workerList = NIL; bool shardsPresent = false; uint64 shardId = INVALID_SHARD_ID; + CmdType commandType = originalQuery->commandType; + bool isMultiShardModifyQuery = false; *placementList = NIL; prunedRelationShardList = TargetShardIntervalsForRouter(originalQuery, restrictionContext, - &multiShardQuery); + &isMultiShardQuery); - /* - * If multiShardQuery is true then it means a relation has more - * than one shard left after pruning. - */ - if (multiShardQuery) + if (isMultiShardQuery) { - StringInfo errorMessage = makeStringInfo(); - StringInfo errorHint = makeStringInfo(); - CmdType commandType = originalQuery->commandType; - const char *commandName = "SELECT"; - - if (commandType == CMD_UPDATE) + /* + * If multiShardQuery is true and it is a type of SELECT query, then + * return deferred error. We do not support multi-shard SELECT queries + * with this code path. + */ + if (commandType == CMD_SELECT) { - commandName = "UPDATE"; - } - else if (commandType == CMD_DELETE) - { - commandName = "DELETE"; + planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + NULL, NULL, NULL); + return planningError; } - if (commandType == CMD_UPDATE || commandType == CMD_DELETE) + Assert(UpdateOrDeleteQuery(originalQuery)); + + planningError = ModifyQuerySupported(originalQuery, isMultiShardQuery); + if (planningError != NULL) { - List *rangeTableList = NIL; - RangeTblEntry *updateOrDeleteRTE = NULL; - DistTableCacheEntry *updateOrDeleteTableCacheEntry = NULL; - char *partitionKeyString = NULL; - char *partitionColumnName = NULL; - - /* extract range table entries */ - ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList); - - updateOrDeleteRTE = GetUpdateOrDeleteRTE(rangeTableList); - updateOrDeleteTableCacheEntry = - DistributedTableCacheEntry(updateOrDeleteRTE->relid); - - partitionKeyString = updateOrDeleteTableCacheEntry->partitionKeyString; - partitionColumnName = ColumnNameToColumn(updateOrDeleteRTE->relid, - partitionKeyString); - - appendStringInfo(errorHint, "Consider using an equality filter on " - "partition column \"%s\" to target a " - "single shard. If you'd like to run a " - "multi-shard operation, use " - "master_modify_multiple_shards().", - partitionColumnName); + return planningError; } - /* note that for SELECT queries, we never print this error message */ - appendStringInfo(errorMessage, - "cannot run %s command which targets multiple shards", - commandName); - - planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - errorMessage->data, NULL, - errorHint->data); - return planningError; + isMultiShardModifyQuery = true; } foreach(prunedRelationShardListCell, prunedRelationShardList) { List *prunedShardList = (List *) lfirst(prunedRelationShardListCell); - - ShardInterval *shardInterval = NULL; - RelationShard *relationShard = NULL; + ListCell *shardIntervalCell = NULL; /* no shard is present or all shards are pruned out case will be handled later */ if (prunedShardList == NIL) @@ -1589,23 +1681,22 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon shardsPresent = true; - /* all relations are now pruned down to 0 or 1 shards */ - Assert(list_length(prunedShardList) <= 1); - - shardInterval = (ShardInterval *) linitial(prunedShardList); - - /* anchor shard id */ - if (shardId == INVALID_SHARD_ID) + foreach(shardIntervalCell, prunedShardList) { - shardId = shardInterval->shardId; + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + RelationShard *relationShard = CitusMakeNode(RelationShard); + + relationShard->relationId = shardInterval->relationId; + relationShard->shardId = shardInterval->shardId; + + *relationShardList = lappend(*relationShardList, relationShard); } + } - /* add relation to shard mapping */ - relationShard = CitusMakeNode(RelationShard); - relationShard->relationId = shardInterval->relationId; - relationShard->shardId = shardInterval->shardId; - - *relationShardList = lappend(*relationShardList, relationShard); + if (isMultiShardModifyQuery) + { + *multiShardModifyQuery = true; + return planningError; } /* @@ -1620,6 +1711,9 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon return planningError; } + /* we need anchor shard id for select queries with router planner */ + shardId = GetInitialShardId(prunedRelationShardList); + /* * Determine the worker that has all shard placements if a shard placement found. * If no shard placement exists and replacePrunedQueryWithDummy flag is set, we will @@ -1665,6 +1759,7 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon return planningError; } + /* * If this is an UPDATE or DELETE query which requires master evaluation, * don't try update shard names, and postpone that to execution phase. @@ -1674,6 +1769,7 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon UpdateRelationToShardNames((Node *) originalQuery, *relationShardList); } + *multiShardModifyQuery = false; *placementList = workerList; *anchorShardId = shardId; @@ -1681,15 +1777,43 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon } +/* + * GetInitialShardId returns the initial shard id given relation shard list. If + * there is no relation shard exist in the list returns INAVLID_SHARD_ID. + */ +static uint64 +GetInitialShardId(List *relationShardList) +{ + ListCell *prunedRelationShardListCell = NULL; + + foreach(prunedRelationShardListCell, relationShardList) + { + List *prunedShardList = (List *) lfirst(prunedRelationShardListCell); + ShardInterval *shardInterval = NULL; + + /* no shard is present or all shards are pruned out case will be handled later */ + if (prunedShardList == NIL) + { + continue; + } + + shardInterval = linitial(prunedShardList); + return shardInterval->shardId; + } + + return INVALID_SHARD_ID; +} + + /* * TargetShardIntervalsForRouter performs shard pruning for all referenced relations * in the query and returns list of shards per relation. Shard pruning is done based - * on provided restriction context per relation. The function bails out and returns - * after setting multiShardQuery to true if any of the relations pruned down to - * more than one active shard. It also records pruned shard intervals in relation - * restriction context to be used later on. Some queries may have contradiction - * clauses like 'and false' or 'and 1=0', such queries are treated as if all of - * the shards of joining relations are pruned out. + * on provided restriction context per relation. The function sets multiShardQuery + * to true if any of the relations pruned down to more than one active shard. It + * also records pruned shard intervals in relation restriction context to be used + * later on. Some queries may have contradiction clauses like 'and false' or + * 'and 1=0', such queries are treated as if all of the shards of joining + * relations are pruned out. */ static List * TargetShardIntervalsForRouter(Query *query, @@ -1729,17 +1853,9 @@ TargetShardIntervalsForRouter(Query *query, { prunedShardList = PruneShards(relationId, tableId, restrictClauseList); - /* - * Quick bail out. The query can not be router plannable if one - * relation has more than one shard left after pruning. Having no - * shard left is okay at this point. It will be handled at a later - * stage. - */ if (list_length(prunedShardList) > 1) { (*multiShardQuery) = true; - - return NIL; } } @@ -1783,10 +1899,10 @@ RelationPrunesToMultipleShards(List *relationShardList) /* - * WorkersContainingAllShards returns list of shard placements that contain all - * shard intervals provided to the function. It returns NIL if no placement exists. - * The caller should check if there are any shard intervals exist for placement - * check prior to calling this function. + * WorkersContainingSelectShards returns list of shard placements that contain all + * shard intervals provided to the select query. It returns NIL if no placement + * exists. The caller should check if there are any shard intervals exist for + * placement check prior to calling this function. */ static List * WorkersContainingAllShards(List *prunedShardIntervalsList) @@ -2324,10 +2440,9 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn) /* * MultiRouterPlannableQuery returns true if given query can be router plannable. * The query is router plannable if it is a modify query, or if its is a select - * query issued on a hash partitioned distributed table, and it has a filter - * to reduce number of shard pairs to one, and all shard pairs are located on - * the same node. Router plannable checks for select queries can be turned off - * by setting citus.enable_router_execution flag to false. + * query issued on a hash partitioned distributed table. Router plannable checks + * for select queries can be turned off by setting citus.enable_router_execution + * flag to false. */ static bool MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 9e5f8f38c..1584ef107 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -120,6 +120,12 @@ static const struct config_enum_entry multi_task_query_log_level_options[] = { { NULL, 0, false } }; +static const struct config_enum_entry multi_shard_modify_connection_options[] = { + { "parallel", PARALLEL_CONNECTION, false }, + { "sequential", SEQUENTIAL_CONNECTION, false }, + { NULL, 0, false } +}; + /* *INDENT-ON* */ @@ -733,6 +739,16 @@ RegisterCitusConfigVariables(void) 0, NULL, NULL, NULL); + DefineCustomEnumVariable( + "citus.multi_shard_modify_mode", + gettext_noop("Sets the connection type for multi shard modify queries"), + NULL, + &MultiShardConnectionType, + PARALLEL_CONNECTION, multi_shard_modify_connection_options, + PGC_USERSET, + 0, + NULL, NULL, NULL); + DefineCustomStringVariable( "citus.version", gettext_noop("Shows the Citus library version"), diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index d8392808d..de745ca65 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -412,7 +412,7 @@ LoadShardPlacement(uint64 shardId, uint64 placementId) /* * FindShardPlacementOnGroup returns the shard placement for the given shard - * on the given group, or returns NULL of no placement for the shard exists + * on the given group, or returns NULL if no placement for the shard exists * on the group. */ ShardPlacement * diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index eac04ab92..775e598c2 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -28,6 +28,14 @@ typedef struct CitusScanState } CitusScanState; +/* managed via guc.c */ +typedef enum +{ + PARALLEL_CONNECTION = 0, + SEQUENTIAL_CONNECTION = 1 +} MultiShardConnectionTypes; +extern int MultiShardConnectionType; + extern Node * RealTimeCreateScan(CustomScan *scan); extern Node * TaskTrackerCreateScan(CustomScan *scan); extern Node * RouterCreateScan(CustomScan *scan); diff --git a/src/include/distributed/multi_planner.h b/src/include/distributed/multi_planner.h index 25c2ee4c5..71359759f 100644 --- a/src/include/distributed/multi_planner.h +++ b/src/include/distributed/multi_planner.h @@ -19,7 +19,7 @@ /* values used by jobs and tasks which do not require identifiers */ #define INVALID_JOB_ID 0 #define INVALID_TASK_ID 0 - +#define MULTI_TASK_QUERY_INFO_OFF 0 /* do not log multi-task queries */ typedef struct RelationRestrictionContext { @@ -83,7 +83,10 @@ extern void multi_join_restriction_hook(PlannerInfo *root, JoinType jointype, JoinPathExtraData *extra); extern bool IsModifyCommand(Query *query); +extern bool IsUpdateOrDelete(struct MultiPlan *multiPlan); extern bool IsModifyMultiPlan(struct MultiPlan *multiPlan); +extern bool IsMultiTaskPlan(struct MultiPlan *multiPlan); +extern bool IsMultiShardModifyPlan(struct MultiPlan *multiPlan); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index f9b7461d4..789051228 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -36,7 +36,8 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, restrictionContext, List **placementList, uint64 *anchorShardId, List **relationShardList, bool - replacePrunedQueryWithDummy); + replacePrunedQueryWithDummy, + bool *multiShardModifyQuery); extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError); extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList); extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, @@ -52,6 +53,7 @@ extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query); extern bool IsMultiRowInsert(Query *query); extern void AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval); +extern bool UpdateOrDeleteQuery(Query *query); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index eb9c87e4e..24df83037 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -36,8 +36,6 @@ #define JOB_CLEANUP_QUERY "SELECT task_tracker_cleanup_job("UINT64_FORMAT ")" #define JOB_CLEANUP_TASK_ID INT_MAX -#define MULTI_TASK_QUERY_INFO_OFF 0 /* do not log multi-task queries */ - /* Enumeration to track one task's execution status */ typedef enum diff --git a/src/test/regress/expected/isolation_multi_shard_modify_vs_all.out b/src/test/regress/expected/isolation_multi_shard_modify_vs_all.out new file mode 100644 index 000000000..fb0462e00 --- /dev/null +++ b/src/test/regress/expected/isolation_multi_shard_modify_vs_all.out @@ -0,0 +1,317 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-update_all_value_1 s2-begin s2-select s1-commit s2-select s2-commit +step s1-begin: + BEGIN; + +step s1-update_all_value_1: + UPDATE users_test_table SET value_1 = 3; + +step s2-begin: + BEGIN; + +step s2-select: + SELECT * FROM users_test_table ORDER BY value_2; + +user_id value_1 value_2 value_3 + +1 5 6 7 +2 12 7 18 +3 23 8 25 +4 42 9 23 +5 35 10 17 +6 21 11 25 +7 27 12 18 +step s1-commit: + COMMIT; + +step s2-select: + SELECT * FROM users_test_table ORDER BY value_2; + +user_id value_1 value_2 value_3 + +1 3 6 7 +2 3 7 18 +3 3 8 25 +4 3 9 23 +5 3 10 17 +6 3 11 25 +7 3 12 18 +step s2-commit: + COMMIT; + + +starting permutation: s1-begin s1-update_all_value_1 s2-begin s2-update_all_value_1 s1-commit s2-commit +step s1-begin: + BEGIN; + +step s1-update_all_value_1: + UPDATE users_test_table SET value_1 = 3; + +step s2-begin: + BEGIN; + +step s2-update_all_value_1: + UPDATE users_test_table SET value_1 = 6; + +step s1-commit: + COMMIT; + +step s2-update_all_value_1: <... completed> +step s2-commit: + COMMIT; + + +starting permutation: s1-begin s1-update_value_1_of_1_or_3 s2-begin s2-update_value_1_of_4_or_6 s1-commit s2-commit s2-select +step s1-begin: + BEGIN; + +step s1-update_value_1_of_1_or_3: + UPDATE users_test_table SET value_1 = 5 WHERE user_id = 1 or user_id = 3; + +step s2-begin: + BEGIN; + +step s2-update_value_1_of_4_or_6: + UPDATE users_test_table SET value_1 = 4 WHERE user_id = 4 or user_id = 6; + +step s1-commit: + COMMIT; + +step s2-commit: + COMMIT; + +step s2-select: + SELECT * FROM users_test_table ORDER BY value_2; + +user_id value_1 value_2 value_3 + +1 5 6 7 +2 12 7 18 +3 5 8 25 +4 4 9 23 +5 35 10 17 +6 4 11 25 +7 27 12 18 + +starting permutation: s1-begin s1-update_value_1_of_1_or_3 s2-begin s2-update_value_1_of_1_or_3 s1-commit s2-commit s2-select +step s1-begin: + BEGIN; + +step s1-update_value_1_of_1_or_3: + UPDATE users_test_table SET value_1 = 5 WHERE user_id = 1 or user_id = 3; + +step s2-begin: + BEGIN; + +step s2-update_value_1_of_1_or_3: + UPDATE users_test_table SET value_1 = 8 WHERE user_id = 1 or user_id = 3; + +step s1-commit: + COMMIT; + +step s2-update_value_1_of_1_or_3: <... completed> +step s2-commit: + COMMIT; + +step s2-select: + SELECT * FROM users_test_table ORDER BY value_2; + +user_id value_1 value_2 value_3 + +1 8 6 7 +2 12 7 18 +3 8 8 25 +4 42 9 23 +5 35 10 17 +6 21 11 25 +7 27 12 18 + +starting permutation: s1-begin s1-update_all_value_1 s2-begin s2-insert-to-table s1-commit s2-commit s2-select +step s1-begin: + BEGIN; + +step s1-update_all_value_1: + UPDATE users_test_table SET value_1 = 3; + +step s2-begin: + BEGIN; + +step s2-insert-to-table: + INSERT INTO users_test_table VALUES (1,2,3,4); + +step s1-commit: + COMMIT; + +step s2-commit: + COMMIT; + +step s2-select: + SELECT * FROM users_test_table ORDER BY value_2; + +user_id value_1 value_2 value_3 + +1 2 3 4 +1 3 6 7 +2 3 7 18 +3 3 8 25 +4 3 9 23 +5 3 10 17 +6 3 11 25 +7 3 12 18 + +starting permutation: s1-begin s1-update_all_value_1 s2-begin s2-insert-into-select s1-commit s2-commit s2-select +step s1-begin: + BEGIN; + +step s1-update_all_value_1: + UPDATE users_test_table SET value_1 = 3; + +step s2-begin: + BEGIN; + +step s2-insert-into-select: + INSERT INTO users_test_table SELECT * FROM events_test_table; + +step s1-commit: + COMMIT; + +step s2-insert-into-select: <... completed> +step s2-commit: + COMMIT; + +step s2-select: + SELECT * FROM users_test_table ORDER BY value_2; + +user_id value_1 value_2 value_3 + +1 3 6 7 +1 5 7 7 +2 3 7 18 +3 3 8 25 +4 3 9 23 +5 22 9 25 +5 3 10 17 +7 41 10 23 +6 3 11 25 +1 20 12 25 +7 3 12 18 +3 26 13 18 +5 17 14 4 +3 11 78 18 + +starting permutation: s1-begin s1-change_connection_mode_to_sequential s1-update_all_value_1 s2-begin s2-change_connection_mode_to_sequential s2-update_all_value_1 s1-commit s2-commit s2-select +step s1-begin: + BEGIN; + +step s1-change_connection_mode_to_sequential: + set citus.multi_shard_modify_mode to 'sequential'; + +step s1-update_all_value_1: + UPDATE users_test_table SET value_1 = 3; + +step s2-begin: + BEGIN; + +step s2-change_connection_mode_to_sequential: + set citus.multi_shard_modify_mode to 'sequential'; + +step s2-update_all_value_1: + UPDATE users_test_table SET value_1 = 6; + +step s1-commit: + COMMIT; + +step s2-update_all_value_1: <... completed> +step s2-commit: + COMMIT; + +step s2-select: + SELECT * FROM users_test_table ORDER BY value_2; + +user_id value_1 value_2 value_3 + +1 6 6 7 +2 6 7 18 +3 6 8 25 +4 6 9 23 +5 6 10 17 +6 6 11 25 +7 6 12 18 + +starting permutation: s1-begin s1-change_connection_mode_to_sequential s1-update_value_1_of_1_or_3 s2-begin s2-change_connection_mode_to_sequential s2-update_value_1_of_1_or_3 s1-commit s2-commit s2-select +step s1-begin: + BEGIN; + +step s1-change_connection_mode_to_sequential: + set citus.multi_shard_modify_mode to 'sequential'; + +step s1-update_value_1_of_1_or_3: + UPDATE users_test_table SET value_1 = 5 WHERE user_id = 1 or user_id = 3; + +step s2-begin: + BEGIN; + +step s2-change_connection_mode_to_sequential: + set citus.multi_shard_modify_mode to 'sequential'; + +step s2-update_value_1_of_1_or_3: + UPDATE users_test_table SET value_1 = 8 WHERE user_id = 1 or user_id = 3; + +step s1-commit: + COMMIT; + +step s2-update_value_1_of_1_or_3: <... completed> +step s2-commit: + COMMIT; + +step s2-select: + SELECT * FROM users_test_table ORDER BY value_2; + +user_id value_1 value_2 value_3 + +1 8 6 7 +2 12 7 18 +3 8 8 25 +4 42 9 23 +5 35 10 17 +6 21 11 25 +7 27 12 18 + +starting permutation: s1-begin s1-change_connection_mode_to_sequential s1-update_value_1_of_1_or_3 s2-begin s2-change_connection_mode_to_sequential s2-update_value_1_of_4_or_6 s1-commit s2-commit s2-select +step s1-begin: + BEGIN; + +step s1-change_connection_mode_to_sequential: + set citus.multi_shard_modify_mode to 'sequential'; + +step s1-update_value_1_of_1_or_3: + UPDATE users_test_table SET value_1 = 5 WHERE user_id = 1 or user_id = 3; + +step s2-begin: + BEGIN; + +step s2-change_connection_mode_to_sequential: + set citus.multi_shard_modify_mode to 'sequential'; + +step s2-update_value_1_of_4_or_6: + UPDATE users_test_table SET value_1 = 4 WHERE user_id = 4 or user_id = 6; + +step s1-commit: + COMMIT; + +step s2-commit: + COMMIT; + +step s2-select: + SELECT * FROM users_test_table ORDER BY value_2; + +user_id value_1 value_2 value_3 + +1 5 6 7 +2 12 7 18 +3 5 8 25 +4 4 9 23 +5 35 10 17 +6 4 11 25 +7 27 12 18 diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 2c73cde0a..d99e91fca 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -747,6 +747,70 @@ t SELECT true AS valid FROM explain_json($$ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$); t + +-- Test multi shard update +EXPLAIN (COSTS FALSE) + UPDATE lineitem_hash_part + SET l_suppkey = 12; +Custom Scan (Citus Router) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Update on lineitem_hash_part_360290 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part + -> Task + Node: host=localhost port=57638 dbname=regression + -> Update on lineitem_hash_part_360291 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360291 lineitem_hash_part + -> Task + Node: host=localhost port=57637 dbname=regression + -> Update on lineitem_hash_part_360292 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360292 lineitem_hash_part + -> Task + Node: host=localhost port=57638 dbname=regression + -> Update on lineitem_hash_part_360293 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360293 lineitem_hash_part + +EXPLAIN (COSTS FALSE) + UPDATE lineitem_hash_part + SET l_suppkey = 12 + WHERE l_orderkey = 1 OR l_orderkey = 3; +Custom Scan (Citus Router) + Task Count: 2 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Update on lineitem_hash_part_360290 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part + Filter: ((l_orderkey = 1) OR (l_orderkey = 3)) + -> Task + Node: host=localhost port=57638 dbname=regression + -> Update on lineitem_hash_part_360291 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360291 lineitem_hash_part + Filter: ((l_orderkey = 1) OR (l_orderkey = 3)) +-- Test multi shard delete +EXPLAIN (COSTS FALSE) + DELETE FROM lineitem_hash_part; +Custom Scan (Citus Router) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Delete on lineitem_hash_part_360290 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part + -> Task + Node: host=localhost port=57638 dbname=regression + -> Delete on lineitem_hash_part_360291 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360291 lineitem_hash_part + -> Task + Node: host=localhost port=57637 dbname=regression + -> Delete on lineitem_hash_part_360292 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360292 lineitem_hash_part + -> Task + Node: host=localhost port=57638 dbname=regression + -> Delete on lineitem_hash_part_360293 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360293 lineitem_hash_part -- Test track tracker SET citus.task_executor_type TO 'task-tracker'; SET citus.explain_all_tasks TO off; diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index e3f7d231a..2d9f73c63 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -747,6 +747,70 @@ t SELECT true AS valid FROM explain_json($$ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$); t + +-- Test multi shard update +EXPLAIN (COSTS FALSE) + UPDATE lineitem_hash_part + SET l_suppkey = 12; +Custom Scan (Citus Router) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Update on lineitem_hash_part_360290 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part + -> Task + Node: host=localhost port=57638 dbname=regression + -> Update on lineitem_hash_part_360291 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360291 lineitem_hash_part + -> Task + Node: host=localhost port=57637 dbname=regression + -> Update on lineitem_hash_part_360292 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360292 lineitem_hash_part + -> Task + Node: host=localhost port=57638 dbname=regression + -> Update on lineitem_hash_part_360293 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360293 lineitem_hash_part + +EXPLAIN (COSTS FALSE) + UPDATE lineitem_hash_part + SET l_suppkey = 12 + WHERE l_orderkey = 1 OR l_orderkey = 3; +Custom Scan (Citus Router) + Task Count: 2 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Update on lineitem_hash_part_360290 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part + Filter: ((l_orderkey = 1) OR (l_orderkey = 3)) + -> Task + Node: host=localhost port=57638 dbname=regression + -> Update on lineitem_hash_part_360291 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360291 lineitem_hash_part + Filter: ((l_orderkey = 1) OR (l_orderkey = 3)) +-- Test multi shard delete +EXPLAIN (COSTS FALSE) + DELETE FROM lineitem_hash_part; +Custom Scan (Citus Router) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Delete on lineitem_hash_part_360290 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part + -> Task + Node: host=localhost port=57638 dbname=regression + -> Delete on lineitem_hash_part_360291 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360291 lineitem_hash_part + -> Task + Node: host=localhost port=57637 dbname=regression + -> Delete on lineitem_hash_part_360292 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360292 lineitem_hash_part + -> Task + Node: host=localhost port=57638 dbname=regression + -> Delete on lineitem_hash_part_360293 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360293 lineitem_hash_part -- Test track tracker SET citus.task_executor_type TO 'task-tracker'; SET citus.explain_all_tasks TO off; diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 64ab1bf42..fb3b1e1b6 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -290,10 +290,6 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 246; 0 (1 row) --- commands with no constraints on the partition key are not supported -DELETE FROM limit_orders WHERE bidder_id = 162; -ERROR: cannot run DELETE command which targets multiple shards -HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). -- commands with a USING clause are unsupported CREATE TABLE bidders ( name text, id bigint ); DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND @@ -304,10 +300,6 @@ ERROR: cannot plan queries which include both local and distributed relations WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) DELETE FROM limit_orders; ERROR: common table expressions are not supported in distributed modifications --- cursors are not supported -DELETE FROM limit_orders WHERE CURRENT OF cursor_name; -ERROR: cannot run DELETE command which targets multiple shards -HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); -- simple UPDATE UPDATE limit_orders SET symbol = 'GM' WHERE id = 246; @@ -423,10 +415,6 @@ AND s.logicalrelid = 'limit_orders'::regclass; ALTER TABLE renamed_orders RENAME TO limit_orders_750000; -- Third: Connect back to master node \c - - - :master_port --- commands with no constraints on the partition key are not supported -UPDATE limit_orders SET limit_price = 0.00; -ERROR: cannot run UPDATE command which targets multiple shards -HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). -- attempting to change the partition key is unsupported UPDATE limit_orders SET id = 0 WHERE id = 246; ERROR: modifying the partition value of rows is not allowed @@ -522,10 +510,6 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh -- even in RETURNING UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW(); ERROR: non-IMMUTABLE functions are not allowed in the RETURNING clause --- cursors are not supported -UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name; -ERROR: cannot run UPDATE command which targets multiple shards -HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). -- check that multi-row UPDATE/DELETEs with RETURNING work INSERT INTO multiple_hash VALUES ('0', '1'); INSERT INTO multiple_hash VALUES ('0', '2'); @@ -984,12 +968,12 @@ SELECT * FROM summary_table ORDER BY id; -- unsupported multi-shard updates UPDATE summary_table SET average_value = average_query.average FROM ( SELECT avg(value) AS average FROM raw_table) average_query; -ERROR: cannot run UPDATE command which targets multiple shards -HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). +ERROR: subqueries are not supported in modifications across multiple shards +DETAIL: Consider using an equality filter on partition column "id" to target a single shard. UPDATE summary_table SET average_value = average_value + 1 WHERE id = (SELECT id FROM raw_table WHERE value > 100); -ERROR: cannot run UPDATE command which targets multiple shards -HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). +ERROR: subqueries are not supported in modifications across multiple shards +DETAIL: Consider using an equality filter on partition column "id" to target a single shard. -- test complex queries UPDATE summary_table SET @@ -1128,8 +1112,8 @@ SELECT master_modify_multiple_shards(' SELECT avg(value) AS average FROM raw_table WHERE id = 1 ) average_query WHERE id = 1'); -ERROR: cannot perform distributed planning for the given modifications -DETAIL: Subqueries are not supported in distributed modifications. +ERROR: subqueries are not supported in modifications across multiple shards +DETAIL: Consider using an equality filter on partition column "id" to target a single shard. -- test connection API via using COPY -- COPY on SELECT part BEGIN; diff --git a/src/test/regress/expected/multi_mx_modifications.out b/src/test/regress/expected/multi_mx_modifications.out index fbd0bba89..660684e9a 100644 --- a/src/test/regress/expected/multi_mx_modifications.out +++ b/src/test/regress/expected/multi_mx_modifications.out @@ -158,10 +158,8 @@ SELECT COUNT(*) FROM limit_orders_mx WHERE id = 246; 0 (1 row) --- commands with no constraints on the partition key are not supported +-- multi shard delete is supported DELETE FROM limit_orders_mx WHERE bidder_id = 162; -ERROR: cannot run DELETE command which targets multiple shards -HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). -- commands with a USING clause are unsupported CREATE TABLE bidders ( name text, id bigint ); DELETE FROM limit_orders_mx USING bidders WHERE limit_orders_mx.id = 246 AND @@ -174,8 +172,7 @@ DELETE FROM limit_orders_mx; ERROR: common table expressions are not supported in distributed modifications -- cursors are not supported DELETE FROM limit_orders_mx WHERE CURRENT OF cursor_name; -ERROR: cannot run DELETE command which targets multiple shards -HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). +ERROR: cannot run DML queries with cursors INSERT INTO limit_orders_mx VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); -- simple UPDATE UPDATE limit_orders_mx SET symbol = 'GM' WHERE id = 246; @@ -228,10 +225,8 @@ INSERT INTO limit_orders_mx VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sel ERROR: duplicate key value violates unique constraint "limit_orders_mx_pkey_1220093" DETAIL: Key (id)=(275) already exists. CONTEXT: while executing command on localhost:57638 --- commands with no constraints on the partition key are not supported +-- multi shard update is supported UPDATE limit_orders_mx SET limit_price = 0.00; -ERROR: cannot run UPDATE command which targets multiple shards -HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). -- attempting to change the partition key is unsupported UPDATE limit_orders_mx SET id = 0 WHERE id = 246; ERROR: modifying the partition value of rows is not allowed @@ -304,7 +299,7 @@ CREATE FUNCTION temp_strict_func(integer,integer) RETURNS integer AS 'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT; UPDATE limit_orders_mx SET bidder_id = temp_strict_func(1, null) WHERE id = 246; ERROR: null value in column "bidder_id" violates not-null constraint -DETAIL: Failing row contains (246, GM, null, 2007-07-02 16:32:15, buy, 999, {1,2}). +DETAIL: Failing row contains (246, GM, null, 2007-07-02 16:32:15, buy, 0.00, {1,2}). CONTEXT: while executing command on localhost:57637 SELECT array_of_values FROM limit_orders_mx WHERE id = 246; array_of_values @@ -324,8 +319,7 @@ UPDATE limit_orders_mx SET placed_at = placed_at WHERE id = 246 RETURNING NOW(); ERROR: non-IMMUTABLE functions are not allowed in the RETURNING clause -- cursors are not supported UPDATE limit_orders_mx SET symbol = 'GM' WHERE CURRENT OF cursor_name; -ERROR: cannot run UPDATE command which targets multiple shards -HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). +ERROR: cannot run DML queries with cursors -- check that multi-row UPDATE/DELETEs with RETURNING work INSERT INTO multiple_hash_mx VALUES ('0', '1'); INSERT INTO multiple_hash_mx VALUES ('0', '2'); diff --git a/src/test/regress/expected/multi_shard_update_delete.out b/src/test/regress/expected/multi_shard_update_delete.out new file mode 100644 index 000000000..962079e3e --- /dev/null +++ b/src/test/regress/expected/multi_shard_update_delete.out @@ -0,0 +1,499 @@ +-- +-- multi shard update delete +-- this file is intended to test multi shard update/delete queries +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1440000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1440000; +SET citus.shard_replication_factor to 1; +SET citus.multi_shard_modify_mode to 'parallel'; +CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int); +SELECT create_distributed_table('users_test_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY users_test_table FROM STDIN DELIMITER AS ','; +CREATE TABLE events_test_table (user_id int, value_1 int, value_2 int, value_3 int); +SELECT create_distributed_table('events_test_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY events_test_table FROM STDIN DELIMITER AS ','; +CREATE TABLE events_reference_copy_table (like events_test_table); +SELECT create_reference_table('events_reference_copy_table'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO events_reference_copy_table SELECT * FROM events_test_table; +CREATE TABLE users_reference_copy_table (like users_test_table); +SELECT create_reference_table('users_reference_copy_table'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO users_reference_copy_table SELECT * FROM users_test_table; +-- Run multi shard updates and deletes without transaction on hash distributed tables +UPDATE users_test_table SET value_1 = 1; +SELECT COUNT(*), SUM(value_1) FROM users_test_table; + count | sum +-------+----- + 15 | 15 +(1 row) + +SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; + count | sum +-------+----- + 4 | 52 +(1 row) + +UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3; +SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; + count | sum +-------+----- + 4 | 56 +(1 row) + +UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5; +SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5; + sum +----- + 0 +(1 row) + +SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; + count +------- + 4 +(1 row) + +DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5; +SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; + count +------- + 0 +(1 row) + +-- Run multi shard update delete queries within transactions +BEGIN; +UPDATE users_test_table SET value_3 = 0; +END; +SELECT SUM(value_3) FROM users_test_table; + sum +----- + 0 +(1 row) + +-- Update can also be rollbacked +BEGIN; +UPDATE users_test_table SET value_3 = 1; +ROLLBACK; +SELECT SUM(value_3) FROM users_test_table; + sum +----- + 0 +(1 row) + +-- Run with inserts (we need to set citus.multi_shard_modify_mode to sequential) +BEGIN; +INSERT INTO users_test_table (user_id, value_3) VALUES(20, 15); +INSERT INTO users_test_table (user_id, value_3) VALUES(16,1), (20,16), (7,1), (20,17); +SET citus.multi_shard_modify_mode to sequential; +UPDATE users_test_table SET value_3 = 1; +END; +SELECT COUNT()SUM(value_3) FROM users_test_table; +ERROR: syntax error at or near "(" +LINE 1: SELECT COUNT()SUM(value_3) FROM users_test_table; + ^ +SET citus.multi_shard_modify_mode to 'sequential'; +-- Run multiple multi shard updates (with sequential executor) +BEGIN; +UPDATE users_test_table SET value_3 = 5; +UPDATE users_test_table SET value_3 = 0; +END; +SELECT SUM(value_3) FROM users_copy_table; +ERROR: relation "users_copy_table" does not exist +LINE 1: SELECT SUM(value_3) FROM users_copy_table; + ^ +-- Run multiple multi shard updates (with parallel executor) +SET citus.multi_shard_modify_mode to 'parallel'; +UPDATE users_test_table SET value_3 = 5; +BEGIN; +UPDATE users_test_table SET value_3 = 2; +UPDATE users_test_table SET value_3 = 0; +END; +SELECT SUM(value_3) FROM users_test_table; + sum +----- + 0 +(1 row) + +-- Check with kind of constraints +UPDATE users_test_table SET value_3 = 1 WHERE user_id = 3 or true; +SELECT COUNT(*), SUM(value_3) FROM users_test_table; + count | sum +-------+----- + 16 | 16 +(1 row) + +UPDATE users_test_table SET value_3 = 0 WHERE user_id = 20 and false; +SELECT COUNT(*), SUM(value_3) FROM users_test_table; + count | sum +-------+----- + 16 | 16 +(1 row) + +-- Run multi shard updates with prepared statements +PREPARE foo_plan(int,int) AS UPDATE users_test_table SET value_1 = $1, value_3 = $2; +EXECUTE foo_plan(1,5); +EXECUTE foo_plan(3,15); +EXECUTE foo_plan(5,25); +EXECUTE foo_plan(7,35); +EXECUTE foo_plan(9,45); +EXECUTE foo_plan(0,0); +SELECT SUM(value_1), SUM(value_3) FROM users_test_table; + sum | sum +-----+----- + 0 | 0 +(1 row) + +-- Test on append table (set executor mode to sequential, since with the append +-- distributed tables parallel executor may create tons of connections) +SET citus.multi_shard_modify_mode to sequential; +CREATE TABLE append_stage_table(id int, col_2 int); +INSERT INTO append_stage_table VALUES(1,3); +INSERT INTO append_stage_table VALUES(3,2); +INSERT INTO append_stage_table VALUES(5,4); +CREATE TABLE test_append_table(id int, col_2 int); +SELECT create_distributed_table('test_append_table','id','append'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT master_create_empty_shard('test_append_table'); + master_create_empty_shard +--------------------------- + 1440066 +(1 row) + +SELECT * FROM master_append_table_to_shard(1440066, 'append_stage_table', 'localhost', :master_port); + master_append_table_to_shard +------------------------------ + 0.0266667 +(1 row) + +SELECT master_create_empty_shard('test_append_table') AS new_shard_id; + new_shard_id +-------------- + 1440067 +(1 row) + +SELECT * FROM master_append_table_to_shard(1440067, 'append_stage_table', 'localhost', :master_port); + master_append_table_to_shard +------------------------------ + 0.0266667 +(1 row) + +UPDATE test_append_table SET col_2 = 5; +SELECT * FROM test_append_table; + id | col_2 +----+------- + 1 | 5 + 3 | 5 + 5 | 5 + 1 | 5 + 3 | 5 + 5 | 5 +(6 rows) + +DROP TABLE append_stage_table; +DROP TABLE test_append_table; +-- Update multi shard of partitioned distributed table +SET citus.multi_shard_modify_mode to 'parallel'; +SET citus.shard_replication_factor to 1; +CREATE TABLE tt1(id int, col_2 int) partition by range (col_2); +CREATE TABLE tt1_510 partition of tt1 for VALUES FROM (5) to (10); +CREATE TABLE tt1_1120 partition of tt1 for VALUES FROM (11) to (20); +INSERT INTO tt1 VALUES (1,11), (3,15), (5,17), (6,19), (8,17), (2,12); +SELECT create_distributed_table('tt1','id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +UPDATE tt1 SET col_2 = 13; +DELETE FROM tt1 WHERE id = 1 or id = 3 or id = 5; +SELECT * FROM tt1; + id | col_2 +----+------- + 8 | 13 + 6 | 13 + 2 | 13 +(3 rows) + +-- Partitioned distributed table within transaction +INSERT INTO tt1 VALUES(4,6); +INSERT INTO tt1 VALUES(7,7); +INSERT INTO tt1 VALUES(9,8); +BEGIN; +-- Update rows from partititon tt1_1120 +UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; +-- Update rows from partititon tt1_510 +UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5; +COMMIT; +SELECT * FROM tt1; + id | col_2 +----+------- + 8 | 12 + 4 | 7 + 7 | 7 + 6 | 12 + 2 | 12 + 9 | 7 +(6 rows) + +-- Modify main table and partition table within same transaction +BEGIN; +UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; +UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5; +DELETE FROM tt1_510; +DELETE FROM tt1_1120; +COMMIT; +SELECT * FROM tt1; + id | col_2 +----+------- +(0 rows) + +DROP TABLE tt1; +-- Update and copy in the same transaction +CREATE TABLE tt2(id int, col_2 int); +SELECT create_distributed_table('tt2','id'); + create_distributed_table +-------------------------- + +(1 row) + +BEGIN; +\COPY tt2 FROM STDIN DELIMITER AS ','; +UPDATE tt2 SET col_2 = 1; +COMMIT; +SELECT * FROM tt2; + id | col_2 +----+------- + 1 | 1 + 7 | 1 + 3 | 1 + 2 | 1 + 9 | 1 +(5 rows) + +-- Test returning with both type of executors +UPDATE tt2 SET col_2 = 5 RETURNING id, col_2; + id | col_2 +----+------- + 1 | 5 + 7 | 5 + 3 | 5 + 2 | 5 + 9 | 5 +(5 rows) + +SET citus.multi_shard_modify_mode to sequential; +UPDATE tt2 SET col_2 = 3 RETURNING id, col_2; + id | col_2 +----+------- + 1 | 3 + 7 | 3 + 3 | 3 + 2 | 3 + 9 | 3 +(5 rows) + +DROP TABLE tt2; +-- Multiple RTEs are not supported +SET citus.multi_shard_modify_mode to DEFAULT; +UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. +UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table); +ERROR: subqueries are not supported in modifications across multiple shards +DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table); +ERROR: subqueries are not supported in modifications across multiple shards +DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. +DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table); +ERROR: subqueries are not supported in modifications across multiple shards +DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table); +ERROR: subqueries are not supported in modifications across multiple shards +DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +-- Cursors are not supported +BEGIN; +DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table; +FETCH test_cursor; + user_id | value_1 | value_2 | value_3 +---------+---------+---------+--------- + 8 | 0 | 13 | 0 +(1 row) + +UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor; +ERROR: cannot run DML queries with cursors +ROLLBACK; +-- Stable functions are supported +CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int); +INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5); +INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7); +INSERT INTO test_table_1 VALUES(3, '2011-01-12 08:35:19', 9); +SELECT create_distributed_table('test_table_1', 'id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SELECT * FROM test_table_1; + id | date_col | col_3 +----+------------------------------+------- + 1 | Sat Apr 05 08:32:12 2014 PDT | 5 + 3 | Wed Jan 12 08:35:19 2011 PST | 9 + 2 | Sun Feb 01 08:31:16 2015 PST | 7 +(3 rows) + +UPDATE test_table_1 SET col_3 = 3 WHERE date_col < now(); +SELECT * FROM test_table_1; + id | date_col | col_3 +----+------------------------------+------- + 1 | Sat Apr 05 08:32:12 2014 PDT | 3 + 3 | Wed Jan 12 08:35:19 2011 PST | 3 + 2 | Sun Feb 01 08:31:16 2015 PST | 3 +(3 rows) + +DELETE FROM test_table_1 WHERE date_col < current_timestamp; +SELECT * FROM test_table_1; + id | date_col | col_3 +----+----------+------- +(0 rows) + +DROP TABLE test_table_1; +-- Volatile functions are not supported +CREATE TABLE test_table_2(id int, double_col double precision); +INSERT INTO test_table_2 VALUES(1, random()); +INSERT INTO test_table_2 VALUES(2, random()); +INSERT INTO test_table_2 VALUES(3, random()); +SELECT create_distributed_table('test_table_2', 'id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +UPDATE test_table_2 SET double_col = random(); +ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE +DROP TABLE test_table_2; +-- Run multi shard updates and deletes without transaction on reference tables +SELECT COUNT(*) FROM users_reference_copy_table; + count +------- + 15 +(1 row) + +UPDATE users_reference_copy_table SET value_1 = 1; +SELECT SUM(value_1) FROM users_reference_copy_table; + sum +----- + 15 +(1 row) + +SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; + count | sum +-------+----- + 4 | 52 +(1 row) + +UPDATE users_reference_copy_table SET value_2 = value_2 + 1 WHERE user_id = 3 or user_id = 5; +SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; + count | sum +-------+----- + 4 | 56 +(1 row) + +UPDATE users_reference_copy_table SET value_3 = 0 WHERE user_id <> 3; +SELECT SUM(value_3) FROM users_reference_copy_table WHERE user_id <> 3; + sum +----- + 0 +(1 row) + +DELETE FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; +SELECT COUNT(*) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; + count +------- + 0 +(1 row) + +-- Do some tests by changing shard replication factor +DROP TABLE users_test_table; +SET citus.shard_replication_factor to 2; +CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int); +SELECT create_distributed_table('users_test_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY users_test_table FROM STDIN DELIMITER AS ','; +-- Run multi shard updates and deletes without transaction on hash distributed tables +UPDATE users_test_table SET value_1 = 1; +SELECT COUNT(*), SUM(value_1) FROM users_test_table; + count | sum +-------+----- + 15 | 15 +(1 row) + +SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; + count | sum +-------+----- + 4 | 52 +(1 row) + +UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3; +SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; + count | sum +-------+----- + 4 | 56 +(1 row) + +UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5; +SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5; + sum +----- + 0 +(1 row) + +SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; + count +------- + 4 +(1 row) + +DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5; +SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; + count +------- + 0 +(1 row) + +DROP TABLE users_test_table; +DROP TABLE events_test_table; +DROP TABLE events_reference_copy_table; +DROP TABLE users_reference_copy_table; diff --git a/src/test/regress/expected/multi_shard_update_delete_0.out b/src/test/regress/expected/multi_shard_update_delete_0.out new file mode 100644 index 000000000..2c5c2a58d --- /dev/null +++ b/src/test/regress/expected/multi_shard_update_delete_0.out @@ -0,0 +1,522 @@ +-- +-- multi shard update delete +-- this file is intended to test multi shard update/delete queries +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1440000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1440000; +SET citus.shard_replication_factor to 1; +SET citus.multi_shard_modify_mode to 'parallel'; +CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int); +SELECT create_distributed_table('users_test_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY users_test_table FROM STDIN DELIMITER AS ','; +CREATE TABLE events_test_table (user_id int, value_1 int, value_2 int, value_3 int); +SELECT create_distributed_table('events_test_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY events_test_table FROM STDIN DELIMITER AS ','; +CREATE TABLE events_reference_copy_table (like events_test_table); +SELECT create_reference_table('events_reference_copy_table'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO events_reference_copy_table SELECT * FROM events_test_table; +CREATE TABLE users_reference_copy_table (like users_test_table); +SELECT create_reference_table('users_reference_copy_table'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO users_reference_copy_table SELECT * FROM users_test_table; +-- Run multi shard updates and deletes without transaction on hash distributed tables +UPDATE users_test_table SET value_1 = 1; +SELECT COUNT(*), SUM(value_1) FROM users_test_table; + count | sum +-------+----- + 15 | 15 +(1 row) + +SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; + count | sum +-------+----- + 4 | 52 +(1 row) + +UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3; +SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; + count | sum +-------+----- + 4 | 56 +(1 row) + +UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5; +SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5; + sum +----- + 0 +(1 row) + +SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; + count +------- + 4 +(1 row) + +DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5; +SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; + count +------- + 0 +(1 row) + +-- Run multi shard update delete queries within transactions +BEGIN; +UPDATE users_test_table SET value_3 = 0; +END; +SELECT SUM(value_3) FROM users_test_table; + sum +----- + 0 +(1 row) + +-- Update can also be rollbacked +BEGIN; +UPDATE users_test_table SET value_3 = 1; +ROLLBACK; +SELECT SUM(value_3) FROM users_test_table; + sum +----- + 0 +(1 row) + +-- Run with inserts (we need to set citus.multi_shard_modify_mode to sequential) +BEGIN; +INSERT INTO users_test_table (user_id, value_3) VALUES(20, 15); +INSERT INTO users_test_table (user_id, value_3) VALUES(16,1), (20,16), (7,1), (20,17); +SET citus.multi_shard_modify_mode to sequential; +UPDATE users_test_table SET value_3 = 1; +END; +SELECT COUNT()SUM(value_3) FROM users_test_table; +ERROR: syntax error at or near "(" +LINE 1: SELECT COUNT()SUM(value_3) FROM users_test_table; + ^ +SET citus.multi_shard_modify_mode to 'sequential'; +-- Run multiple multi shard updates (with sequential executor) +BEGIN; +UPDATE users_test_table SET value_3 = 5; +UPDATE users_test_table SET value_3 = 0; +END; +SELECT SUM(value_3) FROM users_copy_table; +ERROR: relation "users_copy_table" does not exist +LINE 1: SELECT SUM(value_3) FROM users_copy_table; + ^ +-- Run multiple multi shard updates (with parallel executor) +SET citus.multi_shard_modify_mode to 'parallel'; +UPDATE users_test_table SET value_3 = 5; +BEGIN; +UPDATE users_test_table SET value_3 = 2; +UPDATE users_test_table SET value_3 = 0; +END; +SELECT SUM(value_3) FROM users_test_table; + sum +----- + 0 +(1 row) + +-- Check with kind of constraints +UPDATE users_test_table SET value_3 = 1 WHERE user_id = 3 or true; +SELECT COUNT(*), SUM(value_3) FROM users_test_table; + count | sum +-------+----- + 16 | 16 +(1 row) + +UPDATE users_test_table SET value_3 = 0 WHERE user_id = 20 and false; +SELECT COUNT(*), SUM(value_3) FROM users_test_table; + count | sum +-------+----- + 16 | 16 +(1 row) + +-- Run multi shard updates with prepared statements +PREPARE foo_plan(int,int) AS UPDATE users_test_table SET value_1 = $1, value_3 = $2; +EXECUTE foo_plan(1,5); +EXECUTE foo_plan(3,15); +EXECUTE foo_plan(5,25); +EXECUTE foo_plan(7,35); +EXECUTE foo_plan(9,45); +EXECUTE foo_plan(0,0); +SELECT SUM(value_1), SUM(value_3) FROM users_test_table; + sum | sum +-----+----- + 0 | 0 +(1 row) + +-- Test on append table (set executor mode to sequential, since with the append +-- distributed tables parallel executor may create tons of connections) +SET citus.multi_shard_modify_mode to sequential; +CREATE TABLE append_stage_table(id int, col_2 int); +INSERT INTO append_stage_table VALUES(1,3); +INSERT INTO append_stage_table VALUES(3,2); +INSERT INTO append_stage_table VALUES(5,4); +CREATE TABLE test_append_table(id int, col_2 int); +SELECT create_distributed_table('test_append_table','id','append'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT master_create_empty_shard('test_append_table'); + master_create_empty_shard +--------------------------- + 1440066 +(1 row) + +SELECT * FROM master_append_table_to_shard(1440066, 'append_stage_table', 'localhost', :master_port); + master_append_table_to_shard +------------------------------ + 0.0266667 +(1 row) + +SELECT master_create_empty_shard('test_append_table') AS new_shard_id; + new_shard_id +-------------- + 1440067 +(1 row) + +SELECT * FROM master_append_table_to_shard(1440067, 'append_stage_table', 'localhost', :master_port); + master_append_table_to_shard +------------------------------ + 0.0266667 +(1 row) + +UPDATE test_append_table SET col_2 = 5; +SELECT * FROM test_append_table; + id | col_2 +----+------- + 1 | 5 + 3 | 5 + 5 | 5 + 1 | 5 + 3 | 5 + 5 | 5 +(6 rows) + +DROP TABLE append_stage_table; +DROP TABLE test_append_table; +-- Update multi shard of partitioned distributed table +SET citus.multi_shard_modify_mode to 'parallel'; +SET citus.shard_replication_factor to 1; +CREATE TABLE tt1(id int, col_2 int) partition by range (col_2); +ERROR: syntax error at or near "partition" +LINE 1: CREATE TABLE tt1(id int, col_2 int) partition by range (col_... + ^ +CREATE TABLE tt1_510 partition of tt1 for VALUES FROM (5) to (10); +ERROR: syntax error at or near "partition" +LINE 1: CREATE TABLE tt1_510 partition of tt1 for VALUES FROM (5) to... + ^ +CREATE TABLE tt1_1120 partition of tt1 for VALUES FROM (11) to (20); +ERROR: syntax error at or near "partition" +LINE 1: CREATE TABLE tt1_1120 partition of tt1 for VALUES FROM (11) ... + ^ +INSERT INTO tt1 VALUES (1,11), (3,15), (5,17), (6,19), (8,17), (2,12); +ERROR: relation "tt1" does not exist +LINE 1: INSERT INTO tt1 VALUES (1,11), (3,15), (5,17), (6,19), (8,17... + ^ +SELECT create_distributed_table('tt1','id'); +ERROR: relation "tt1" does not exist +LINE 1: SELECT create_distributed_table('tt1','id'); + ^ +UPDATE tt1 SET col_2 = 13; +ERROR: relation "tt1" does not exist +LINE 1: UPDATE tt1 SET col_2 = 13; + ^ +DELETE FROM tt1 WHERE id = 1 or id = 3 or id = 5; +ERROR: relation "tt1" does not exist +LINE 1: DELETE FROM tt1 WHERE id = 1 or id = 3 or id = 5; + ^ +SELECT * FROM tt1; +ERROR: relation "tt1" does not exist +LINE 1: SELECT * FROM tt1; + ^ +-- Partitioned distributed table within transaction +INSERT INTO tt1 VALUES(4,6); +ERROR: relation "tt1" does not exist +LINE 1: INSERT INTO tt1 VALUES(4,6); + ^ +INSERT INTO tt1 VALUES(7,7); +ERROR: relation "tt1" does not exist +LINE 1: INSERT INTO tt1 VALUES(7,7); + ^ +INSERT INTO tt1 VALUES(9,8); +ERROR: relation "tt1" does not exist +LINE 1: INSERT INTO tt1 VALUES(9,8); + ^ +BEGIN; +-- Update rows from partititon tt1_1120 +UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; +ERROR: relation "tt1" does not exist +LINE 1: UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; + ^ +-- Update rows from partititon tt1_510 +UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +SELECT * FROM tt1; +ERROR: relation "tt1" does not exist +LINE 1: SELECT * FROM tt1; + ^ +-- Modify main table and partition table within same transaction +BEGIN; +UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; +ERROR: relation "tt1" does not exist +LINE 1: UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; + ^ +UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5; +ERROR: current transaction is aborted, commands ignored until end of transaction block +DELETE FROM tt1_510; +ERROR: current transaction is aborted, commands ignored until end of transaction block +DELETE FROM tt1_1120; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +SELECT * FROM tt1; +ERROR: relation "tt1" does not exist +LINE 1: SELECT * FROM tt1; + ^ +DROP TABLE tt1; +ERROR: table "tt1" does not exist +-- Update and copy in the same transaction +CREATE TABLE tt2(id int, col_2 int); +SELECT create_distributed_table('tt2','id'); + create_distributed_table +-------------------------- + +(1 row) + +BEGIN; +\COPY tt2 FROM STDIN DELIMITER AS ','; +UPDATE tt2 SET col_2 = 1; +COMMIT; +SELECT * FROM tt2; + id | col_2 +----+------- + 1 | 1 + 7 | 1 + 3 | 1 + 2 | 1 + 9 | 1 +(5 rows) + +-- Test returning with both type of executors +UPDATE tt2 SET col_2 = 5 RETURNING id, col_2; + id | col_2 +----+------- + 1 | 5 + 7 | 5 + 3 | 5 + 2 | 5 + 9 | 5 +(5 rows) + +SET citus.multi_shard_modify_mode to sequential; +UPDATE tt2 SET col_2 = 3 RETURNING id, col_2; + id | col_2 +----+------- + 1 | 3 + 7 | 3 + 3 | 3 + 2 | 3 + 9 | 3 +(5 rows) + +DROP TABLE tt2; +-- Multiple RTEs are not supported +SET citus.multi_shard_modify_mode to DEFAULT; +UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. +UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table); +ERROR: subqueries are not supported in modifications across multiple shards +DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table); +ERROR: subqueries are not supported in modifications across multiple shards +DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. +DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table); +ERROR: subqueries are not supported in modifications across multiple shards +DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table); +ERROR: subqueries are not supported in modifications across multiple shards +DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +-- Cursors are not supported +BEGIN; +DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table; +FETCH test_cursor; + user_id | value_1 | value_2 | value_3 +---------+---------+---------+--------- + 8 | 0 | 13 | 0 +(1 row) + +UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor; +ERROR: cannot run DML queries with cursors +ROLLBACK; +-- Stable functions are supported +CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int); +INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5); +INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7); +INSERT INTO test_table_1 VALUES(3, '2011-01-12 08:35:19', 9); +SELECT create_distributed_table('test_table_1', 'id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SELECT * FROM test_table_1; + id | date_col | col_3 +----+------------------------------+------- + 1 | Sat Apr 05 08:32:12 2014 PDT | 5 + 3 | Wed Jan 12 08:35:19 2011 PST | 9 + 2 | Sun Feb 01 08:31:16 2015 PST | 7 +(3 rows) + +UPDATE test_table_1 SET col_3 = 3 WHERE date_col < now(); +SELECT * FROM test_table_1; + id | date_col | col_3 +----+------------------------------+------- + 1 | Sat Apr 05 08:32:12 2014 PDT | 3 + 3 | Wed Jan 12 08:35:19 2011 PST | 3 + 2 | Sun Feb 01 08:31:16 2015 PST | 3 +(3 rows) + +DELETE FROM test_table_1 WHERE date_col < current_timestamp; +SELECT * FROM test_table_1; + id | date_col | col_3 +----+----------+------- +(0 rows) + +DROP TABLE test_table_1; +-- Volatile functions are not supported +CREATE TABLE test_table_2(id int, double_col double precision); +INSERT INTO test_table_2 VALUES(1, random()); +INSERT INTO test_table_2 VALUES(2, random()); +INSERT INTO test_table_2 VALUES(3, random()); +SELECT create_distributed_table('test_table_2', 'id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +UPDATE test_table_2 SET double_col = random(); +ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE +DROP TABLE test_table_2; +-- Run multi shard updates and deletes without transaction on reference tables +SELECT COUNT(*) FROM users_reference_copy_table; + count +------- + 15 +(1 row) + +UPDATE users_reference_copy_table SET value_1 = 1; +SELECT SUM(value_1) FROM users_reference_copy_table; + sum +----- + 15 +(1 row) + +SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; + count | sum +-------+----- + 4 | 52 +(1 row) + +UPDATE users_reference_copy_table SET value_2 = value_2 + 1 WHERE user_id = 3 or user_id = 5; +SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; + count | sum +-------+----- + 4 | 56 +(1 row) + +UPDATE users_reference_copy_table SET value_3 = 0 WHERE user_id <> 3; +SELECT SUM(value_3) FROM users_reference_copy_table WHERE user_id <> 3; + sum +----- + 0 +(1 row) + +DELETE FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; +SELECT COUNT(*) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; + count +------- + 0 +(1 row) + +-- Do some tests by changing shard replication factor +DROP TABLE users_test_table; +SET citus.shard_replication_factor to 2; +CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int); +SELECT create_distributed_table('users_test_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY users_test_table FROM STDIN DELIMITER AS ','; +-- Run multi shard updates and deletes without transaction on hash distributed tables +UPDATE users_test_table SET value_1 = 1; +SELECT COUNT(*), SUM(value_1) FROM users_test_table; + count | sum +-------+----- + 15 | 15 +(1 row) + +SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; + count | sum +-------+----- + 4 | 52 +(1 row) + +UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3; +SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; + count | sum +-------+----- + 4 | 56 +(1 row) + +UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5; +SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5; + sum +----- + 0 +(1 row) + +SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; + count +------- + 4 +(1 row) + +DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5; +SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; + count +------- + 0 +(1 row) + +DROP TABLE users_test_table; +DROP TABLE events_test_table; +DROP TABLE events_reference_copy_table; +DROP TABLE users_reference_copy_table; diff --git a/src/test/regress/expected/multi_upsert.out b/src/test/regress/expected/multi_upsert.out index d41abe94a..092401d25 100644 --- a/src/test/regress/expected/multi_upsert.out +++ b/src/test/regress/expected/multi_upsert.out @@ -255,8 +255,8 @@ INSERT INTO dropcol_distributed AS dropcol (key, keep1) VALUES (1, '5') ON CONFL -- subquery in the SET clause INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO UPDATE SET other_col = (SELECT count(*) from upsert_test); -ERROR: cannot perform distributed planning for the given modifications -DETAIL: Subqueries are not supported in distributed modifications. +ERROR: subqueries are not supported in modifications across multiple shards +DETAIL: Consider using an equality filter on partition column "part_key" to target a single shard. -- non mutable function call in the SET INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO UPDATE SET other_col = random()::int; diff --git a/src/test/regress/input/multi_master_delete_protocol.source b/src/test/regress/input/multi_master_delete_protocol.source index 547530469..ce5d8b0a1 100644 --- a/src/test/regress/input/multi_master_delete_protocol.source +++ b/src/test/regress/input/multi_master_delete_protocol.source @@ -27,8 +27,6 @@ SELECT master_create_distributed_table('customer_delete_protocol', 'c_custkey', SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol WHERE c_acctbal > 0.0'); --- Check that free-form deletes are not supported. -DELETE FROM customer_delete_protocol WHERE c_custkey > 100; -- Check that we delete a shard if and only if all rows in the shard satisfy the condition. SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol WHERE c_custkey > 6500'); diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index bf55710d5..b58d38cc8 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -23,6 +23,7 @@ test: isolation_distributed_deadlock_detection # writes, run this test serially. test: isolation_create_restore_point +test: isolation_multi_shard_modify_vs_all test: isolation_hash_copy_vs_all test: isolation_append_copy_vs_all test: isolation_range_copy_vs_all diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index ca9822e6c..ce6b1aa0f 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -37,6 +37,7 @@ test: multi_load_data test: multi_behavioral_analytics_create_table test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries test: multi_insert_select multi_insert_select_window +test: multi_shard_update_delete # --- # Tests for partitioning support diff --git a/src/test/regress/output/multi_master_delete_protocol.source b/src/test/regress/output/multi_master_delete_protocol.source index 24255a337..e2a324465 100644 --- a/src/test/regress/output/multi_master_delete_protocol.source +++ b/src/test/regress/output/multi_master_delete_protocol.source @@ -27,10 +27,6 @@ SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol WHERE c_acctbal > 0.0'); ERROR: cannot delete from distributed table DETAIL: Where clause includes a column other than partition column --- Check that free-form deletes are not supported. -DELETE FROM customer_delete_protocol WHERE c_custkey > 100; -ERROR: cannot run DELETE command which targets multiple shards -HINT: Consider using an equality filter on partition column "c_custkey" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). -- Check that we delete a shard if and only if all rows in the shard satisfy the condition. SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol WHERE c_custkey > 6500'); diff --git a/src/test/regress/specs/isolation_multi_shard_modify_vs_all.spec b/src/test/regress/specs/isolation_multi_shard_modify_vs_all.spec new file mode 100644 index 000000000..b76220406 --- /dev/null +++ b/src/test/regress/specs/isolation_multi_shard_modify_vs_all.spec @@ -0,0 +1,128 @@ +setup +{ + SELECT citus.replace_isolation_tester_func(); + SELECT citus.refresh_isolation_tester_prepared_statement(); + + SET citus.shard_replication_factor to 1; + SET citus.shard_count to 32; + SET citus.multi_shard_modify_mode to 'parallel'; + + CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int); + SELECT create_distributed_table('users_test_table', 'user_id'); + INSERT INTO users_test_table VALUES + (1, 5, 6, 7), + (2, 12, 7, 18), + (3, 23, 8, 25), + (4, 42, 9, 23), + (5, 35, 10, 17), + (6, 21, 11, 25), + (7, 27, 12, 18); + + CREATE TABLE events_test_table (user_id int, value_1 int, value_2 int, value_3 int); + SELECT create_distributed_table('events_test_table', 'user_id'); + INSERT INTO events_test_table VALUES + (1, 5, 7, 7), + (3, 11, 78, 18), + (5, 22, 9, 25), + (7, 41, 10, 23), + (1, 20, 12, 25), + (3, 26, 13, 18), + (5, 17, 14, 4); +} + +teardown +{ + DROP TABLE users_test_table; + DROP TABLE events_test_table; + SELECT citus.restore_isolation_tester_func(); + SET citus.shard_count to 4; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-change_connection_mode_to_sequential" +{ + set citus.multi_shard_modify_mode to 'sequential'; +} + +step "s1-update_all_value_1" +{ + UPDATE users_test_table SET value_1 = 3; +} + +step "s1-update_value_1_of_1_or_3" +{ + UPDATE users_test_table SET value_1 = 5 WHERE user_id = 1 or user_id = 3; +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-change_connection_mode_to_sequential" +{ + set citus.multi_shard_modify_mode to 'sequential'; +} + +step "s2-select" +{ + SELECT * FROM users_test_table ORDER BY value_2; +} + +step "s2-insert-to-table" +{ + INSERT INTO users_test_table VALUES (1,2,3,4); +} + +step "s2-insert-into-select" +{ + INSERT INTO users_test_table SELECT * FROM events_test_table; +} + +step "s2-update_all_value_1" +{ + UPDATE users_test_table SET value_1 = 6; +} + +step "s2-update_value_1_of_1_or_3" +{ + UPDATE users_test_table SET value_1 = 8 WHERE user_id = 1 or user_id = 3; +} + +step "s2-update_value_1_of_4_or_6" +{ + UPDATE users_test_table SET value_1 = 4 WHERE user_id = 4 or user_id = 6; +} + +step "s2-commit" +{ + COMMIT; +} + +# test with parallel connections +permutation "s1-begin" "s1-update_all_value_1" "s2-begin" "s2-select" "s1-commit" "s2-select" "s2-commit" +permutation "s1-begin" "s1-update_all_value_1" "s2-begin" "s2-update_all_value_1" "s1-commit" "s2-commit" +permutation "s1-begin" "s1-update_value_1_of_1_or_3" "s2-begin" "s2-update_value_1_of_4_or_6" "s1-commit" "s2-commit" "s2-select" +permutation "s1-begin" "s1-update_value_1_of_1_or_3" "s2-begin" "s2-update_value_1_of_1_or_3" "s1-commit" "s2-commit" "s2-select" +permutation "s1-begin" "s1-update_all_value_1" "s2-begin" "s2-insert-to-table" "s1-commit" "s2-commit" "s2-select" +permutation "s1-begin" "s1-update_all_value_1" "s2-begin" "s2-insert-into-select" "s1-commit" "s2-commit" "s2-select" + +# test with sequential connections, sequential tests should not block each other +# if they are targeting different shards. If multiple connections updating the same +# row, second one must wait for the first one. +permutation "s1-begin" "s1-change_connection_mode_to_sequential" "s1-update_all_value_1" "s2-begin" "s2-change_connection_mode_to_sequential" "s2-update_all_value_1" "s1-commit" "s2-commit" "s2-select" +permutation "s1-begin" "s1-change_connection_mode_to_sequential" "s1-update_value_1_of_1_or_3" "s2-begin" "s2-change_connection_mode_to_sequential" "s2-update_value_1_of_1_or_3" "s1-commit" "s2-commit" "s2-select" +permutation "s1-begin" "s1-change_connection_mode_to_sequential" "s1-update_value_1_of_1_or_3" "s2-begin" "s2-change_connection_mode_to_sequential" "s2-update_value_1_of_4_or_6" "s1-commit" "s2-commit" "s2-select" diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index 1bf135afc..29cfd39df 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -367,6 +367,20 @@ SELECT true AS valid FROM explain_xml($$ SELECT true AS valid FROM explain_json($$ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$); + +-- Test multi shard update +EXPLAIN (COSTS FALSE) + UPDATE lineitem_hash_part + SET l_suppkey = 12; + +EXPLAIN (COSTS FALSE) + UPDATE lineitem_hash_part + SET l_suppkey = 12 + WHERE l_orderkey = 1 OR l_orderkey = 3; + +-- Test multi shard delete +EXPLAIN (COSTS FALSE) + DELETE FROM lineitem_hash_part; -- Test track tracker SET citus.task_executor_type TO 'task-tracker'; diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 42b5a36ae..512404809 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -194,9 +194,6 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 246; DELETE FROM limit_orders WHERE id = (2 * 123); SELECT COUNT(*) FROM limit_orders WHERE id = 246; --- commands with no constraints on the partition key are not supported -DELETE FROM limit_orders WHERE bidder_id = 162; - -- commands with a USING clause are unsupported CREATE TABLE bidders ( name text, id bigint ); DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND @@ -207,9 +204,6 @@ DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) DELETE FROM limit_orders; --- cursors are not supported -DELETE FROM limit_orders WHERE CURRENT OF cursor_name; - INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); -- simple UPDATE @@ -298,9 +292,6 @@ ALTER TABLE renamed_orders RENAME TO limit_orders_750000; -- Third: Connect back to master node \c - - - :master_port --- commands with no constraints on the partition key are not supported -UPDATE limit_orders SET limit_price = 0.00; - -- attempting to change the partition key is unsupported UPDATE limit_orders SET id = 0 WHERE id = 246; UPDATE limit_orders SET id = 0 WHERE id = 0 OR id = 246; @@ -382,9 +373,6 @@ ALTER TABLE limit_orders DROP array_of_values; -- even in RETURNING UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW(); --- cursors are not supported -UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name; - -- check that multi-row UPDATE/DELETEs with RETURNING work INSERT INTO multiple_hash VALUES ('0', '1'); INSERT INTO multiple_hash VALUES ('0', '2'); diff --git a/src/test/regress/sql/multi_mx_modifications.sql b/src/test/regress/sql/multi_mx_modifications.sql index fd3874c02..6b2d52e0b 100644 --- a/src/test/regress/sql/multi_mx_modifications.sql +++ b/src/test/regress/sql/multi_mx_modifications.sql @@ -106,7 +106,7 @@ SELECT COUNT(*) FROM limit_orders_mx WHERE id = 246; DELETE FROM limit_orders_mx WHERE id = (2 * 123); SELECT COUNT(*) FROM limit_orders_mx WHERE id = 246; --- commands with no constraints on the partition key are not supported +-- multi shard delete is supported DELETE FROM limit_orders_mx WHERE bidder_id = 162; -- commands with a USING clause are unsupported @@ -149,7 +149,7 @@ UPDATE limit_orders_mx SET (kind, limit_price) = ('buy', 999) WHERE id = 246 RET INSERT INTO limit_orders_mx VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); INSERT INTO limit_orders_mx VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); --- commands with no constraints on the partition key are not supported +-- multi shard update is supported UPDATE limit_orders_mx SET limit_price = 0.00; -- attempting to change the partition key is unsupported diff --git a/src/test/regress/sql/multi_shard_update_delete.sql b/src/test/regress/sql/multi_shard_update_delete.sql new file mode 100644 index 000000000..eb98dfd59 --- /dev/null +++ b/src/test/regress/sql/multi_shard_update_delete.sql @@ -0,0 +1,307 @@ +-- +-- multi shard update delete +-- this file is intended to test multi shard update/delete queries +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1440000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1440000; + +SET citus.shard_replication_factor to 1; +SET citus.multi_shard_modify_mode to 'parallel'; + +CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int); +SELECT create_distributed_table('users_test_table', 'user_id'); +\COPY users_test_table FROM STDIN DELIMITER AS ','; +1, 5, 6, 7 +2, 12, 7, 18 +3, 23, 8, 25 +4, 42, 9, 23 +5, 35, 10, 21 +6, 21, 11, 25 +7, 27, 12, 18 +8, 18, 13, 4 +7, 38, 14, 22 +6, 43, 15, 22 +5, 61, 16, 17 +4, 6, 17, 8 +3, 16, 18, 44 +2, 25, 19, 38 +1, 55, 20, 17 +\. + +CREATE TABLE events_test_table (user_id int, value_1 int, value_2 int, value_3 int); +SELECT create_distributed_table('events_test_table', 'user_id'); +\COPY events_test_table FROM STDIN DELIMITER AS ','; +1, 5, 7, 7 +3, 11, 78, 18 +5, 22, 9, 25 +7, 41, 10, 23 +9, 34, 11, 21 +1, 20, 12, 25 +3, 26, 13, 18 +5, 17, 14, 4 +7, 37, 15, 22 +9, 42, 16, 22 +1, 60, 17, 17 +3, 5, 18, 8 +5, 15, 19, 44 +7, 24, 20, 38 +9, 54, 21, 17 +\. + +CREATE TABLE events_reference_copy_table (like events_test_table); +SELECT create_reference_table('events_reference_copy_table'); +INSERT INTO events_reference_copy_table SELECT * FROM events_test_table; + +CREATE TABLE users_reference_copy_table (like users_test_table); +SELECT create_reference_table('users_reference_copy_table'); +INSERT INTO users_reference_copy_table SELECT * FROM users_test_table; + +-- Run multi shard updates and deletes without transaction on hash distributed tables +UPDATE users_test_table SET value_1 = 1; +SELECT COUNT(*), SUM(value_1) FROM users_test_table; + +SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; +UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3; +SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; + +UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5; +SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5; + +SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; +DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5; +SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; + +-- Run multi shard update delete queries within transactions +BEGIN; +UPDATE users_test_table SET value_3 = 0; +END; +SELECT SUM(value_3) FROM users_test_table; + +-- Update can also be rollbacked +BEGIN; +UPDATE users_test_table SET value_3 = 1; +ROLLBACK; +SELECT SUM(value_3) FROM users_test_table; + +-- Run with inserts (we need to set citus.multi_shard_modify_mode to sequential) +BEGIN; +INSERT INTO users_test_table (user_id, value_3) VALUES(20, 15); +INSERT INTO users_test_table (user_id, value_3) VALUES(16,1), (20,16), (7,1), (20,17); +SET citus.multi_shard_modify_mode to sequential; +UPDATE users_test_table SET value_3 = 1; +END; +SELECT COUNT()SUM(value_3) FROM users_test_table; + +SET citus.multi_shard_modify_mode to 'sequential'; +-- Run multiple multi shard updates (with sequential executor) +BEGIN; +UPDATE users_test_table SET value_3 = 5; +UPDATE users_test_table SET value_3 = 0; +END; +SELECT SUM(value_3) FROM users_copy_table; + +-- Run multiple multi shard updates (with parallel executor) +SET citus.multi_shard_modify_mode to 'parallel'; +UPDATE users_test_table SET value_3 = 5; +BEGIN; +UPDATE users_test_table SET value_3 = 2; +UPDATE users_test_table SET value_3 = 0; +END; +SELECT SUM(value_3) FROM users_test_table; + +-- Check with kind of constraints +UPDATE users_test_table SET value_3 = 1 WHERE user_id = 3 or true; +SELECT COUNT(*), SUM(value_3) FROM users_test_table; +UPDATE users_test_table SET value_3 = 0 WHERE user_id = 20 and false; +SELECT COUNT(*), SUM(value_3) FROM users_test_table; + +-- Run multi shard updates with prepared statements +PREPARE foo_plan(int,int) AS UPDATE users_test_table SET value_1 = $1, value_3 = $2; + +EXECUTE foo_plan(1,5); +EXECUTE foo_plan(3,15); +EXECUTE foo_plan(5,25); +EXECUTE foo_plan(7,35); +EXECUTE foo_plan(9,45); +EXECUTE foo_plan(0,0); + +SELECT SUM(value_1), SUM(value_3) FROM users_test_table; + +-- Test on append table (set executor mode to sequential, since with the append +-- distributed tables parallel executor may create tons of connections) +SET citus.multi_shard_modify_mode to sequential; +CREATE TABLE append_stage_table(id int, col_2 int); +INSERT INTO append_stage_table VALUES(1,3); +INSERT INTO append_stage_table VALUES(3,2); +INSERT INTO append_stage_table VALUES(5,4); + +CREATE TABLE test_append_table(id int, col_2 int); +SELECT create_distributed_table('test_append_table','id','append'); +SELECT master_create_empty_shard('test_append_table'); +SELECT * FROM master_append_table_to_shard(1440066, 'append_stage_table', 'localhost', :master_port); +SELECT master_create_empty_shard('test_append_table') AS new_shard_id; +SELECT * FROM master_append_table_to_shard(1440067, 'append_stage_table', 'localhost', :master_port); +UPDATE test_append_table SET col_2 = 5; +SELECT * FROM test_append_table; + +DROP TABLE append_stage_table; +DROP TABLE test_append_table; + +-- Update multi shard of partitioned distributed table +SET citus.multi_shard_modify_mode to 'parallel'; +SET citus.shard_replication_factor to 1; +CREATE TABLE tt1(id int, col_2 int) partition by range (col_2); +CREATE TABLE tt1_510 partition of tt1 for VALUES FROM (5) to (10); +CREATE TABLE tt1_1120 partition of tt1 for VALUES FROM (11) to (20); +INSERT INTO tt1 VALUES (1,11), (3,15), (5,17), (6,19), (8,17), (2,12); +SELECT create_distributed_table('tt1','id'); +UPDATE tt1 SET col_2 = 13; +DELETE FROM tt1 WHERE id = 1 or id = 3 or id = 5; +SELECT * FROM tt1; + +-- Partitioned distributed table within transaction +INSERT INTO tt1 VALUES(4,6); +INSERT INTO tt1 VALUES(7,7); +INSERT INTO tt1 VALUES(9,8); +BEGIN; +-- Update rows from partititon tt1_1120 +UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; +-- Update rows from partititon tt1_510 +UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5; +COMMIT; +SELECT * FROM tt1; + +-- Modify main table and partition table within same transaction +BEGIN; +UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; +UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5; +DELETE FROM tt1_510; +DELETE FROM tt1_1120; +COMMIT; +SELECT * FROM tt1; +DROP TABLE tt1; + +-- Update and copy in the same transaction +CREATE TABLE tt2(id int, col_2 int); +SELECT create_distributed_table('tt2','id'); + +BEGIN; +\COPY tt2 FROM STDIN DELIMITER AS ','; +1, 10 +3, 15 +7, 14 +9, 75 +2, 42 +\. +UPDATE tt2 SET col_2 = 1; +COMMIT; +SELECT * FROM tt2; + +-- Test returning with both type of executors +UPDATE tt2 SET col_2 = 5 RETURNING id, col_2; +SET citus.multi_shard_modify_mode to sequential; +UPDATE tt2 SET col_2 = 3 RETURNING id, col_2; +DROP TABLE tt2; + +-- Multiple RTEs are not supported +SET citus.multi_shard_modify_mode to DEFAULT; +UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; +UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table); +UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table); + +DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id; +DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table); +DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table); + +-- Cursors are not supported +BEGIN; +DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table; +FETCH test_cursor; +UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor; +ROLLBACK; + +-- Stable functions are supported +CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int); +INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5); +INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7); +INSERT INTO test_table_1 VALUES(3, '2011-01-12 08:35:19', 9); +SELECT create_distributed_table('test_table_1', 'id'); + +SELECT * FROM test_table_1; +UPDATE test_table_1 SET col_3 = 3 WHERE date_col < now(); +SELECT * FROM test_table_1; +DELETE FROM test_table_1 WHERE date_col < current_timestamp; +SELECT * FROM test_table_1; + +DROP TABLE test_table_1; + +-- Volatile functions are not supported +CREATE TABLE test_table_2(id int, double_col double precision); +INSERT INTO test_table_2 VALUES(1, random()); +INSERT INTO test_table_2 VALUES(2, random()); +INSERT INTO test_table_2 VALUES(3, random()); +SELECT create_distributed_table('test_table_2', 'id'); + +UPDATE test_table_2 SET double_col = random(); + +DROP TABLE test_table_2; + +-- Run multi shard updates and deletes without transaction on reference tables +SELECT COUNT(*) FROM users_reference_copy_table; +UPDATE users_reference_copy_table SET value_1 = 1; +SELECT SUM(value_1) FROM users_reference_copy_table; + +SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; +UPDATE users_reference_copy_table SET value_2 = value_2 + 1 WHERE user_id = 3 or user_id = 5; +SELECT COUNT(*), SUM(value_2) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; + +UPDATE users_reference_copy_table SET value_3 = 0 WHERE user_id <> 3; +SELECT SUM(value_3) FROM users_reference_copy_table WHERE user_id <> 3; + +DELETE FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; +SELECT COUNT(*) FROM users_reference_copy_table WHERE user_id = 3 or user_id = 5; + +-- Do some tests by changing shard replication factor +DROP TABLE users_test_table; + +SET citus.shard_replication_factor to 2; + +CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int); +SELECT create_distributed_table('users_test_table', 'user_id'); +\COPY users_test_table FROM STDIN DELIMITER AS ','; +1, 5, 6, 7 +2, 12, 7, 18 +3, 23, 8, 25 +4, 42, 9, 23 +5, 35, 10, 21 +6, 21, 11, 25 +7, 27, 12, 18 +8, 18, 13, 4 +7, 38, 14, 22 +6, 43, 15, 22 +5, 61, 16, 17 +4, 6, 17, 8 +3, 16, 18, 44 +2, 25, 19, 38 +1, 55, 20, 17 +\. + +-- Run multi shard updates and deletes without transaction on hash distributed tables +UPDATE users_test_table SET value_1 = 1; +SELECT COUNT(*), SUM(value_1) FROM users_test_table; + +SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; +UPDATE users_test_table SET value_2 = value_2 + 1 WHERE user_id = 1 or user_id = 3; +SELECT COUNT(*), SUM(value_2) FROM users_test_table WHERE user_id = 1 or user_id = 3; + +UPDATE users_test_table SET value_3 = 0 WHERE user_id <> 5; +SELECT SUM(value_3) FROM users_test_table WHERE user_id <> 5; + +SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; +DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5; +SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5; + +DROP TABLE users_test_table; +DROP TABLE events_test_table; +DROP TABLE events_reference_copy_table; +DROP TABLE users_reference_copy_table;