From b8a9e7c1bf2c65f166f32a682606b98c17ceb3b7 Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Tue, 4 Jul 2017 12:18:28 +0300 Subject: [PATCH] Add support for UPDATE/DELETE with subqueries --- .../executor/multi_router_executor.c | 2 +- .../master/master_modify_multiple_shards.c | 4 +- .../distributed/planner/deparse_shard_query.c | 22 +- .../planner/insert_select_planner.c | 16 +- .../planner/multi_physical_planner.c | 10 +- .../planner/multi_router_planner.c | 591 ++++++++++-------- src/backend/distributed/utils/ruleutils_10.c | 83 ++- src/backend/distributed/utils/ruleutils_96.c | 83 ++- .../distributed/multi_router_planner.h | 15 +- src/test/regress/expected/multi_explain.out | 8 +- src/test/regress/expected/multi_explain_0.out | 8 +- .../regress/expected/multi_modifications.out | 427 +++++++++++++ .../regress/expected/multi_mx_explain.out | 8 +- src/test/regress/expected/multi_upsert.out | 3 +- .../multi_master_delete_protocol.source | 2 +- src/test/regress/sql/multi_modifications.sql | 332 ++++++++++ 16 files changed, 1276 insertions(+), 338 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index f35627eed..7ba5b864d 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -412,7 +412,7 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) DeferredErrorMessage *planningError = NULL; /* need to perform shard pruning, rebuild the task list from scratch */ - taskList = RouterModifyTaskList(jobQuery, &planningError); + taskList = RouterInsertTaskList(jobQuery, &planningError); if (planningError != NULL) { diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 0224759d3..5e918bad5 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -150,7 +150,9 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) if (modifyQuery->commandType != CMD_UTILITY) { - DeferredErrorMessage *error = ModifyQuerySupported(modifyQuery); + bool multiShardQuery = true; + DeferredErrorMessage *error = ModifyQuerySupported(modifyQuery, multiShardQuery); + if (error) { RaiseDeferredError(error, ERROR); diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index caf24bde8..9ddcc0be1 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -90,11 +90,25 @@ RebuildQueryStrings(Query *originalQuery, List *taskList) } } - deparse_shard_query(query, relationId, task->anchorShardId, - newQueryString); + /* + * For INSERT queries, we only have one relation to update, so we can + * use deparse_shard_query(). For UPDATE and DELETE queries, we may have + * subqueries and joins, so we use relation shard list to update shard + * names and call pg_get_query_def() directly. + */ + if (query->commandType == CMD_INSERT) + { + deparse_shard_query(query, relationId, task->anchorShardId, newQueryString); + } + else + { + List *relationShardList = task->relationShardList; + UpdateRelationToShardNames((Node *) query, relationShardList); - ereport(DEBUG4, (errmsg("distributed statement: %s", - newQueryString->data))); + pg_get_query_def(query, newQueryString); + } + + ereport(DEBUG4, (errmsg("distributed statement: %s", newQueryString->data))); task->queryString = newQueryString->data; } diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 405c082df..c7cc50fb8 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -437,12 +437,12 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter uint64 jobId = INVALID_JOB_ID; List *insertShardPlacementList = NULL; List *intersectedPlacementList = NULL; - bool routerPlannable = false; bool upsertQuery = false; bool replacePrunedQueryWithDummy = false; bool allReferenceTables = restrictionContext->allReferenceTables; List *shardOpExpressions = NIL; RestrictInfo *shardRestrictionList = NULL; + DeferredErrorMessage *planningError = NULL; /* grab shared metadata lock to stop concurrent placement additions */ LockShardDistributionMetadata(shardId, ShareLock); @@ -489,15 +489,15 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter replacePrunedQueryWithDummy = false; /* - * Use router select planner to decide on whether we can push down the query - * or not. If we can, we also rely on the side-effects that all RTEs have been - * updated to point to the relevant nodes and selectPlacementList is determined. + * Use router planner to decide on whether we can push down the query or not. + * If we can, we also rely on the side-effects that all RTEs have been updated + * to point to the relevant nodes and selectPlacementList is determined. */ - routerPlannable = RouterSelectQuery(copiedSubquery, copiedRestrictionContext, - &selectPlacementList, &selectAnchorShardId, - &relationShardList, replacePrunedQueryWithDummy); + planningError = PlanRouterQuery(copiedSubquery, copiedRestrictionContext, + &selectPlacementList, &selectAnchorShardId, + &relationShardList, replacePrunedQueryWithDummy); - if (!routerPlannable) + if (planningError) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform distributed planning for the given " diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 4b8ed9468..c1f25d80f 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2291,12 +2291,12 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval, uint64 selectAnchorShardId = INVALID_SHARD_ID; List *relationShardList = NIL; uint64 jobId = INVALID_JOB_ID; - bool routerPlannable = false; bool replacePrunedQueryWithDummy = false; RelationRestrictionContext *copiedRestrictionContext = CopyRelationRestrictionContext(restrictionContext); List *shardOpExpressions = NIL; RestrictInfo *shardRestrictionList = NULL; + DeferredErrorMessage *planningError = NULL; /* such queries should go through router planner */ Assert(!restrictionContext->allReferenceTables); @@ -2330,12 +2330,12 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval, * or not. If we can, we also rely on the side-effects that all RTEs have been * updated to point to the relevant nodes and selectPlacementList is determined. */ - routerPlannable = RouterSelectQuery(taskQuery, copiedRestrictionContext, - &selectPlacementList, &selectAnchorShardId, - &relationShardList, replacePrunedQueryWithDummy); + planningError = PlanRouterQuery(taskQuery, copiedRestrictionContext, + &selectPlacementList, &selectAnchorShardId, + &relationShardList, replacePrunedQueryWithDummy); /* we don't expect to this this error but keeping it as a precaution for future changes */ - if (!routerPlannable) + if (planningError) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform distributed planning for the given " diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 46e411863..2021379c3 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -93,33 +93,30 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *sta static bool MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context); static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree); -static Job * RouterModifyJob(Query *originalQuery, Query *query, +static Job * RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **planningError); static void ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry); static bool CanShardPrune(Oid distributedTableId, Query *query); -static List * RouterInsertTaskList(Query *query, DistTableCacheEntry *cacheEntry, - DeferredErrorMessage **planningError); -static List * RouterUpdateOrDeleteTaskList(Query *query, DistTableCacheEntry *cacheEntry, - DeferredErrorMessage **planningError); static Job * CreateJob(Query *query); static Task * CreateTask(TaskType taskType); static ShardInterval * FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry, DeferredErrorMessage **planningError); -static ShardInterval * FindShardForUpdateOrDelete(Query *query, - DistTableCacheEntry *cacheEntry, - DeferredErrorMessage **planningError); -static List * QueryRestrictList(Query *query, char partitionMethod); static Expr * ExtractInsertPartitionValue(Query *query, Var *partitionColumn); -static Job * RouterSelectJob(Query *originalQuery, - RelationRestrictionContext *restrictionContext, - bool *queryRoutable); +static Job * RouterJob(Query *originalQuery, + RelationRestrictionContext *restrictionContext, + DeferredErrorMessage **planningError); static bool RelationPrunesToMultipleShards(List *relationShardList); -static List * TargetShardIntervalsForSelect(Query *query, - RelationRestrictionContext *restrictionContext); +static List * TargetShardIntervalsForRouter(Query *query, + RelationRestrictionContext *restrictionContext, + bool *multiShardQuery); static List * WorkersContainingAllShards(List *prunedShardIntervalsList); static bool MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext); static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree); +static bool UpdateOrDeleteQuery(Query *query); +static RangeTblEntry * GetUpdateOrDeleteRTE(List *rangeTableList); +static bool UpdateOrDeleteRTE(RangeTblEntry *rangeTableEntry); +static bool SelectsFromDistributedTable(List *rangeTableList); #if (PG_VERSION_NUM >= 100000) static List * get_all_actual_clauses(List *restrictinfo_list); #endif @@ -161,16 +158,28 @@ CreateModifyPlan(Query *originalQuery, Query *query, { Job *job = NULL; MultiPlan *multiPlan = CitusMakeNode(MultiPlan); + bool multiShardQuery = false; multiPlan->operation = query->commandType; - multiPlan->planningError = ModifyQuerySupported(query); + multiPlan->planningError = ModifyQuerySupported(query, multiShardQuery); if (multiPlan->planningError != NULL) { return multiPlan; } - job = RouterModifyJob(originalQuery, query, &multiPlan->planningError); + if (UpdateOrDeleteQuery(query)) + { + RelationRestrictionContext *restrictionContext = + plannerRestrictionContext->relationRestrictionContext; + + job = RouterJob(originalQuery, restrictionContext, &multiPlan->planningError); + } + else + { + job = RouterInsertJob(originalQuery, query, &multiPlan->planningError); + } + if (multiPlan->planningError != NULL) { return multiPlan; @@ -204,7 +213,6 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, RelationRestrictionContext *restrictionContext) { Job *job = NULL; - bool queryRoutable = false; MultiPlan *multiPlan = CitusMakeNode(MultiPlan); multiPlan->operation = query->commandType; @@ -216,8 +224,8 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, return multiPlan; } - job = RouterSelectJob(originalQuery, restrictionContext, &queryRoutable); - if (!queryRoutable) + job = RouterJob(originalQuery, restrictionContext, &multiPlan->planningError); + if (multiPlan->planningError) { /* query cannot be handled by this planner */ return NULL; @@ -455,7 +463,7 @@ ExtractInsertRangeTableEntry(Query *query) * features, otherwise it returns an error description. */ DeferredErrorMessage * -ModifyQuerySupported(Query *queryTree) +ModifyQuerySupported(Query *queryTree, bool multiShardQuery) { Oid distributedTableId = ExtractFirstDistributedTableId(queryTree); uint32 rangeTableId = 1; @@ -479,9 +487,18 @@ ModifyQuerySupported(Query *queryTree) */ if (queryTree->hasSubLinks == true) { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "subqueries are not supported in distributed modifications", - NULL, NULL); + /* + * We support UPDATE and DELETE with subqueries unless they are multi + * shard queries. + */ + if (!UpdateOrDeleteQuery(queryTree) || multiShardQuery) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot perform distributed planning for the given " + "modifications", + "Subqueries are not supported in distributed " + "modifications.", NULL); + } } /* reject queries which include CommonTableExpr */ @@ -542,6 +559,17 @@ ModifyQuerySupported(Query *queryTree) } else { + char *rangeTableEntryErrorDetail = NULL; + + /* + * We support UPDATE and DELETE with subqueries and joins unless + * they are multi shard queries. + */ + if (UpdateOrDeleteQuery(queryTree) && !multiShardQuery) + { + continue; + } + /* * Error out for rangeTableEntries that we do not support. * We do not explicitly specify "in FROM clause" in the error detail @@ -549,7 +577,6 @@ ModifyQuerySupported(Query *queryTree) * We do not need to check for RTE_CTE because all common table expressions * are rejected above with queryTree->cteList check. */ - char *rangeTableEntryErrorDetail = NULL; if (rangeTableEntry->rtekind == RTE_SUBQUERY) { rangeTableEntryErrorDetail = "Subqueries are not supported in" @@ -585,12 +612,18 @@ ModifyQuerySupported(Query *queryTree) */ if (commandType != CMD_INSERT && queryTableCount != 1) { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot perform distributed planning for the given" - " modification", - "Joins are not supported in distributed " - "modifications.", - NULL); + /* + * We support UPDATE and DELETE with joins unless they are multi shard + * queries. + */ + if (!UpdateOrDeleteQuery(queryTree) || multiShardQuery) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot perform distributed planning for the given " + "modification", + "Joins are not supported in distributed " + "modifications.", NULL); + } } /* reject queries which involve multi-row inserts */ @@ -794,6 +827,24 @@ ModifyQuerySupported(Query *queryTree) } +/* + * UpdateOrDeleteQuery checks if the given query is an UPDATE or DELETE command. + * If it is, it returns true otherwise it returns false. + */ +static bool +UpdateOrDeleteQuery(Query *query) +{ + CmdType commandType = query->commandType; + + if (commandType == CMD_UPDATE || commandType == CMD_DELETE) + { + return true; + } + + return false; +} + + /* * If the expression contains STABLE functions which accept any parameters derived from a * Var returns true and sets varArgument. @@ -1013,12 +1064,12 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre /* - * RouterModifyJob builds a Job to represent a modification performed by + * RouterInsertJob builds a Job to represent an insertion performed by * the provided query against the provided shard interval. This task contains * shard-extended deparsed SQL to be run during execution. */ static Job * -RouterModifyJob(Query *originalQuery, Query *query, DeferredErrorMessage **planningError) +RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **planningError) { Oid distributedTableId = ExtractFirstDistributedTableId(query); List *taskList = NIL; @@ -1037,7 +1088,7 @@ RouterModifyJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann } else { - taskList = RouterModifyTaskList(query, planningError); + taskList = RouterInsertTaskList(query, planningError); if (*planningError) { return NULL; @@ -1118,36 +1169,6 @@ CanShardPrune(Oid distributedTableId, Query *query) } -/* - * RouterModifyTaskList builds a list of tasks for a given query. - */ -List * -RouterModifyTaskList(Query *query, DeferredErrorMessage **planningError) -{ - Oid distributedTableId = ExtractFirstDistributedTableId(query); - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); - CmdType commandType = query->commandType; - List *taskList = NIL; - - ErrorIfNoShardsExist(cacheEntry); - - if (commandType == CMD_INSERT) - { - taskList = RouterInsertTaskList(query, cacheEntry, planningError); - } - else - { - taskList = RouterUpdateOrDeleteTaskList(query, cacheEntry, planningError); - if (*planningError) - { - return NIL; - } - } - - return taskList; -} - - /* * ErrorIfNoShardsExist throws an error if the given table has no shards. */ @@ -1174,13 +1195,17 @@ ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry) * RouterInsertTaskList generates a list of tasks for performing an INSERT on * a distributed table via the router executor. */ -static List * -RouterInsertTaskList(Query *query, DistTableCacheEntry *cacheEntry, - DeferredErrorMessage **planningError) +List * +RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError) { ShardInterval *shardInterval = NULL; Task *modifyTask = NULL; + Oid distributedTableId = ExtractFirstDistributedTableId(query); + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + + ErrorIfNoShardsExist(cacheEntry); + Assert(query->commandType == CMD_INSERT); shardInterval = FindShardForInsert(query, cacheEntry, planningError); @@ -1206,41 +1231,6 @@ RouterInsertTaskList(Query *query, DistTableCacheEntry *cacheEntry, } -/* - * RouterUpdateOrDeleteTaskList returns a list of tasks for executing an UPDATE - * or DELETE command on a distributed table via the router executor. - */ -static List * -RouterUpdateOrDeleteTaskList(Query *query, DistTableCacheEntry *cacheEntry, - DeferredErrorMessage **planningError) -{ - ShardInterval *shardInterval = NULL; - List *taskList = NIL; - - Assert(query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE); - - shardInterval = FindShardForUpdateOrDelete(query, cacheEntry, planningError); - - if (*planningError != NULL) - { - return NIL; - } - - if (shardInterval != NULL) - { - Task *modifyTask = NULL; - - modifyTask = CreateTask(MODIFY_TASK); - modifyTask->anchorShardId = shardInterval->shardId; - modifyTask->replicationModel = cacheEntry->replicationModel; - - taskList = lappend(taskList, modifyTask); - } - - return taskList; -} - - /* * CreateTask returns a new Task with the given type. */ @@ -1400,109 +1390,6 @@ FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry, } -/* - * FindShardForUpdateOrDelete finds the shard interval in which an UPDATE or - * DELETE command should be applied, or sets planningError when the query - * needs to be applied to multiple or no shards. - */ -static ShardInterval * -FindShardForUpdateOrDelete(Query *query, DistTableCacheEntry *cacheEntry, - DeferredErrorMessage **planningError) -{ - Oid distributedTableId = cacheEntry->relationId; - char partitionMethod = cacheEntry->partitionMethod; - CmdType commandType = query->commandType; - List *restrictClauseList = NIL; - Index tableId = 1; - List *prunedShardList = NIL; - int prunedShardCount = 0; - - Assert(commandType == CMD_UPDATE || commandType == CMD_DELETE); - - restrictClauseList = QueryRestrictList(query, partitionMethod); - prunedShardList = PruneShards(distributedTableId, tableId, restrictClauseList); - - prunedShardCount = list_length(prunedShardList); - if (prunedShardCount > 1) - { - char *partitionKeyString = cacheEntry->partitionKeyString; - char *partitionColumnName = ColumnNameToColumn(distributedTableId, - partitionKeyString); - StringInfo errorMessage = makeStringInfo(); - StringInfo errorHint = makeStringInfo(); - const char *commandName = NULL; - - if (commandType == CMD_UPDATE) - { - commandName = "UPDATE"; - } - else - { - commandName = "DELETE"; - } - - appendStringInfo(errorHint, "Consider using an equality filter on " - "partition column \"%s\" to target a " - "single shard. If you'd like to run a " - "multi-shard operation, use " - "master_modify_multiple_shards().", - partitionColumnName); - - if (commandType == CMD_DELETE && partitionMethod == DISTRIBUTE_BY_APPEND) - { - appendStringInfo(errorHint, " You can also use " - "master_apply_delete_command() to drop " - "all shards satisfying delete criteria."); - } - - appendStringInfo(errorMessage, - "cannot run %s command which targets multiple shards", - commandName); - - (*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - errorMessage->data, NULL, - errorHint->data); - - return NULL; - } - else if (prunedShardCount == 0) - { - return NULL; - } - - return (ShardInterval *) linitial(prunedShardList); -} - - -/* - * QueryRestrictList returns the restriction clauses for the query. For a SELECT - * statement these are the where-clause expressions. For INSERT statements we - * build an equality clause based on the partition-column and its supplied - * insert value. - * - * Since reference tables do not have partition columns, the function returns - * NIL for reference tables. - */ -static List * -QueryRestrictList(Query *query, char partitionMethod) -{ - List *queryRestrictList = NIL; - - /* - * Reference tables do not have the notion of partition column. Thus, - * there are no restrictions on the partition column. - */ - if (partitionMethod == DISTRIBUTE_BY_NONE) - { - return queryRestrictList; - } - - queryRestrictList = WhereClauseList(query->jointree); - - return queryRestrictList; -} - - /* * ExtractFirstDistributedTableId takes a given query, and finds the relationId * for the first distributed table in that query. If the function cannot find a @@ -1554,84 +1441,253 @@ ExtractInsertPartitionValue(Query *query, Var *partitionColumn) } -/* RouterSelectJob builds a Job to represent a single shard select query */ +/* RouterJob builds a Job to represent a single shard select/update/delete query */ static Job * -RouterSelectJob(Query *originalQuery, RelationRestrictionContext *restrictionContext, - bool *returnQueryRoutable) +RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext, + DeferredErrorMessage **planningError) { Job *job = NULL; Task *task = NULL; - bool queryRoutable = false; StringInfo queryString = makeStringInfo(); uint64 shardId = INVALID_SHARD_ID; List *placementList = NIL; List *relationShardList = NIL; + List *rangeTableList = NIL; bool replacePrunedQueryWithDummy = false; + bool requiresMasterEvaluation = false; + RangeTblEntry *updateOrDeleteRTE = NULL; /* router planner should create task even if it deosn't hit a shard at all */ replacePrunedQueryWithDummy = true; - queryRoutable = RouterSelectQuery(originalQuery, restrictionContext, - &placementList, &shardId, &relationShardList, - replacePrunedQueryWithDummy); + /* check if this query requires master evaluation */ + requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); - - if (!queryRoutable) + (*planningError) = PlanRouterQuery(originalQuery, restrictionContext, + &placementList, &shardId, &relationShardList, + replacePrunedQueryWithDummy); + if (*planningError) { - *returnQueryRoutable = false; return NULL; } job = CreateJob(originalQuery); + ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList); + updateOrDeleteRTE = GetUpdateOrDeleteRTE(rangeTableList); + + /* + * If all of the shards are pruned, we replace the relation RTE into + * subquery RTE that returns no results. However, this is not useful + * for UPDATE and DELETE queries. Therefore, if we detect a UPDATE or + * DELETE RTE with subquery type, we just set task list to empty and return + * the job. + */ + if (updateOrDeleteRTE != NULL && updateOrDeleteRTE->rtekind == RTE_SUBQUERY) + { + job->taskList = NIL; + return job; + } + pg_get_query_def(originalQuery, queryString); - task = CreateTask(ROUTER_TASK); + if (originalQuery->commandType == CMD_SELECT) + { + task = CreateTask(ROUTER_TASK); + } + else + { + DistTableCacheEntry *modificationTableCacheEntry = NULL; + char modificationPartitionMethod = 0; + + modificationTableCacheEntry = DistributedTableCacheEntry( + updateOrDeleteRTE->relid); + modificationPartitionMethod = modificationTableCacheEntry->partitionMethod; + + if (modificationPartitionMethod == DISTRIBUTE_BY_NONE && + SelectsFromDistributedTable(rangeTableList)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform select on a distributed table " + "and modify a reference table"))); + } + + task = CreateTask(MODIFY_TASK); + task->replicationModel = modificationTableCacheEntry->replicationModel; + } + task->queryString = queryString->data; task->anchorShardId = shardId; task->taskPlacementList = placementList; task->relationShardList = relationShardList; job->taskList = list_make1(task); - - *returnQueryRoutable = true; + job->requiresMasterEvaluation = requiresMasterEvaluation; return job; } /* - * RouterSelectQuery returns true if the input query can be pushed down to the - * worker node as it is. Otherwise, the function returns false. - * - * On return true, all RTEs have been updated to point to the relevant shards in - * the originalQuery. Also, placementList is filled with the list of worker nodes - * that has all the required shard placements for the query execution. - * anchorShardId is set to the first pruned shardId of the given query. Finally, - * relationShardList is filled with the list of relation-to-shard mappings for - * the query. + * GetUpdateOrDeleteRTE walks over the given range table list, and checks if + * it has an UPDATE or DELETE RTE. If it finds one, it return it immediately. */ -bool -RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext, - List **placementList, uint64 *anchorShardId, List **relationShardList, - bool replacePrunedQueryWithDummy) +static RangeTblEntry * +GetUpdateOrDeleteRTE(List *rangeTableList) { - List *prunedRelationShardList = TargetShardIntervalsForSelect(originalQuery, - restrictionContext); - uint64 shardId = INVALID_SHARD_ID; - CmdType commandType PG_USED_FOR_ASSERTS_ONLY = originalQuery->commandType; - ListCell *prunedRelationShardListCell = NULL; - List *workerList = NIL; - bool shardsPresent = false; + ListCell *rangeTableCell = NULL; - *placementList = NIL; + foreach(rangeTableCell, rangeTableList) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - if (prunedRelationShardList == NULL) + if (UpdateOrDeleteRTE(rangeTableEntry)) + { + return rangeTableEntry; + } + } + + return NULL; +} + + +/* + * UpdateOrDeleteRTE checks if the given range table entry is an UPDATE or + * DELETE RTE by checking required permissions on it. + */ +static bool +UpdateOrDeleteRTE(RangeTblEntry *rangeTableEntry) +{ + if ((ACL_UPDATE & rangeTableEntry->requiredPerms) || + (ACL_DELETE & rangeTableEntry->requiredPerms)) + { + return true; + } + else { return false; } +} - Assert(commandType == CMD_SELECT); + +/* + * SelectsFromDistributedTable checks if there is a select on a distributed + * table by looking into range table entries. + */ +static bool +SelectsFromDistributedTable(List *rangeTableList) +{ + ListCell *rangeTableCell = NULL; + foreach(rangeTableCell, rangeTableList) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + DistTableCacheEntry *cacheEntry = NULL; + + if (rangeTableEntry->relid == InvalidOid) + { + continue; + } + + cacheEntry = DistributedTableCacheEntry(rangeTableEntry->relid); + if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE && + !UpdateOrDeleteRTE(rangeTableEntry)) + { + return true; + } + } + + return false; +} + + +/* + * RouterQuery runs router pruning logic for SELECT, UPDATE and DELETE queries. + * If there are shards present and query is routable, all RTEs have been updated + * to point to the relevant shards in the originalQuery. Also, placementList is + * filled with the list of worker nodes that has all the required shard placements + * for the query execution. anchorShardId is set to the first pruned shardId of + * the given query. Finally, relationShardList is filled with the list of + * relation-to-shard mappings for the query. + * + * If the given query is not routable, it fills planningError with the related + * DeferredErrorMessage. The caller can check this error message to see if query + * is routable or not. + */ +DeferredErrorMessage * +PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext, + List **placementList, uint64 *anchorShardId, List **relationShardList, + bool replacePrunedQueryWithDummy) +{ + bool multiShardQuery = false; + List *prunedRelationShardList = NIL; + DeferredErrorMessage *planningError = NULL; + ListCell *prunedRelationShardListCell = NULL; + List *workerList = NIL; + bool shardsPresent = false; + uint64 shardId = INVALID_SHARD_ID; + + *placementList = NIL; + prunedRelationShardList = TargetShardIntervalsForRouter(originalQuery, + restrictionContext, + &multiShardQuery); + + /* + * If multiShardQuery is true then it means a relation has more + * than one shard left after pruning. + */ + if (multiShardQuery) + { + StringInfo errorMessage = makeStringInfo(); + StringInfo errorHint = makeStringInfo(); + CmdType commandType = originalQuery->commandType; + const char *commandName = "SELECT"; + + if (commandType == CMD_UPDATE) + { + commandName = "UPDATE"; + } + else if (commandType == CMD_DELETE) + { + commandName = "DELETE"; + } + + if (commandType == CMD_UPDATE || commandType == CMD_DELETE) + { + List *rangeTableList = NIL; + RangeTblEntry *updateOrDeleteRTE = NULL; + DistTableCacheEntry *updateOrDeleteTableCacheEntry = NULL; + char *partitionKeyString = NULL; + char *partitionColumnName = NULL; + + /* extract range table entries */ + ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList); + + updateOrDeleteRTE = GetUpdateOrDeleteRTE(rangeTableList); + updateOrDeleteTableCacheEntry = + DistributedTableCacheEntry(updateOrDeleteRTE->relid); + + partitionKeyString = updateOrDeleteTableCacheEntry->partitionKeyString; + partitionColumnName = ColumnNameToColumn(updateOrDeleteRTE->relid, + partitionKeyString); + + appendStringInfo(errorHint, "Consider using an equality filter on " + "partition column \"%s\" to target a " + "single shard. If you'd like to run a " + "multi-shard operation, use " + "master_modify_multiple_shards().", + partitionColumnName); + } + + /* note that for SELECT queries, we never print this error message */ + appendStringInfo(errorMessage, + "cannot run %s command which targets multiple shards", + commandName); + + planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + errorMessage->data, NULL, + errorHint->data); + return planningError; + } foreach(prunedRelationShardListCell, prunedRelationShardList) { @@ -1673,7 +1729,10 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC */ if (RelationPrunesToMultipleShards(*relationShardList)) { - return false; + planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot run command which targets " + "multiple shards", NULL, NULL); + return planningError; } /* @@ -1708,44 +1767,53 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC * shard intervals. Thus, we should return empty list if there aren't any matching * workers, so that the caller can decide what to do with this task. */ - workerList = NIL; - - return true; + return NULL; } if (workerList == NIL) { ereport(DEBUG2, (errmsg("Found no worker with all shard placements"))); - return false; + planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "found no worker with all shard placements", + NULL, NULL); + return planningError; } - UpdateRelationToShardNames((Node *) originalQuery, *relationShardList); + /* + * If this is an UPDATE or DELETE query which requires master evaluation, + * don't try update shard names, and postpone that to execution phase. + */ + if (!(UpdateOrDeleteQuery(originalQuery) && RequiresMasterEvaluation(originalQuery))) + { + UpdateRelationToShardNames((Node *) originalQuery, *relationShardList); + } *placementList = workerList; *anchorShardId = shardId; - return true; + return planningError; } /* - * TargetShardIntervalsForSelect performs shard pruning for all referenced relations + * TargetShardIntervalsForRouter performs shard pruning for all referenced relations * in the query and returns list of shards per relation. Shard pruning is done based - * on provided restriction context per relation. The function bails out and returns NULL - * if any of the relations pruned down to more than one active shard. It also records - * pruned shard intervals in relation restriction context to be used later on. Some - * queries may have contradiction clauses like 'and false' or 'and 1=0', such queries - * are treated as if all of the shards of joining relations are pruned out. + * on provided restriction context per relation. The function bails out and returns + * after setting multiShardQuery to true if any of the relations pruned down to + * more than one active shard. It also records pruned shard intervals in relation + * restriction context to be used later on. Some queries may have contradiction + * clauses like 'and false' or 'and 1=0', such queries are treated as if all of + * the shards of joining relations are pruned out. */ static List * -TargetShardIntervalsForSelect(Query *query, - RelationRestrictionContext *restrictionContext) +TargetShardIntervalsForRouter(Query *query, + RelationRestrictionContext *restrictionContext, + bool *multiShardQuery) { List *prunedRelationShardList = NIL; ListCell *restrictionCell = NULL; - Assert(query->commandType == CMD_SELECT); Assert(restrictionContext != NULL); foreach(restrictionCell, restrictionContext->relationRestrictionList) @@ -1774,8 +1842,7 @@ TargetShardIntervalsForSelect(Query *query, whereFalseQuery = ContainsFalseClause(pseudoRestrictionList); if (!whereFalseQuery && shardCount > 0) { - prunedShardList = PruneShards(relationId, tableId, - restrictClauseList); + prunedShardList = PruneShards(relationId, tableId, restrictClauseList); /* * Quick bail out. The query can not be router plannable if one @@ -1785,7 +1852,9 @@ TargetShardIntervalsForSelect(Query *query, */ if (list_length(prunedShardList) > 1) { - return NULL; + (*multiShardQuery) = true; + + return NIL; } } @@ -1992,8 +2061,8 @@ MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionC RelationRestrictionContext * CopyRelationRestrictionContext(RelationRestrictionContext *oldContext) { - RelationRestrictionContext *newContext = (RelationRestrictionContext *) - palloc(sizeof(RelationRestrictionContext)); + RelationRestrictionContext *newContext = + (RelationRestrictionContext *) palloc(sizeof(RelationRestrictionContext)); ListCell *relationRestrictionCell = NULL; newContext->hasDistributedRelation = oldContext->hasDistributedRelation; diff --git a/src/backend/distributed/utils/ruleutils_10.c b/src/backend/distributed/utils/ruleutils_10.c index 95f8a3d7f..6dfdad67c 100644 --- a/src/backend/distributed/utils/ruleutils_10.c +++ b/src/backend/distributed/utils/ruleutils_10.c @@ -3169,20 +3169,43 @@ get_update_query_def(Query *query, deparse_context *context) * Start the query with UPDATE relname SET */ rte = rt_fetch(query->resultRelation, query->rtable); - Assert(rte->rtekind == RTE_RELATION); + if (PRETTY_INDENT(context)) { appendStringInfoChar(buf, ' '); context->indentLevel += PRETTYINDENT_STD; } - appendStringInfo(buf, "UPDATE %s%s", - only_marker(rte), - generate_relation_or_shard_name(rte->relid, - context->distrelid, - context->shardid, NIL)); - if (rte->alias != NULL) - appendStringInfo(buf, " %s", - quote_identifier(rte->alias->aliasname)); + + /* if it's a shard, do differently */ + if (GetRangeTblKind(rte) == CITUS_RTE_SHARD) + { + char *fragmentSchemaName = NULL; + char *fragmentTableName = NULL; + + ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL); + + /* use schema and table name from the remote alias */ + appendStringInfo(buf, "UPDATE %s%s", + only_marker(rte), + generate_fragment_name(fragmentSchemaName, fragmentTableName)); + + if(rte->eref != NULL) + appendStringInfo(buf, " %s", + quote_identifier(rte->eref->aliasname)); + } + else + { + appendStringInfo(buf, "UPDATE %s%s", + only_marker(rte), + generate_relation_or_shard_name(rte->relid, + context->distrelid, + context->shardid, NIL)); + + if (rte->alias != NULL) + appendStringInfo(buf, " %s", + quote_identifier(rte->alias->aliasname)); + } + appendStringInfoString(buf, " SET "); /* Deparse targetlist */ @@ -3366,20 +3389,42 @@ get_delete_query_def(Query *query, deparse_context *context) * Start the query with DELETE FROM relname */ rte = rt_fetch(query->resultRelation, query->rtable); - Assert(rte->rtekind == RTE_RELATION); + if (PRETTY_INDENT(context)) { appendStringInfoChar(buf, ' '); context->indentLevel += PRETTYINDENT_STD; } - appendStringInfo(buf, "DELETE FROM %s%s", - only_marker(rte), - generate_relation_or_shard_name(rte->relid, - context->distrelid, - context->shardid, NIL)); - if (rte->alias != NULL) - appendStringInfo(buf, " %s", - quote_identifier(rte->alias->aliasname)); + + /* if it's a shard, do differently */ + if (GetRangeTblKind(rte) == CITUS_RTE_SHARD) + { + char *fragmentSchemaName = NULL; + char *fragmentTableName = NULL; + + ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL); + + /* use schema and table name from the remote alias */ + appendStringInfo(buf, "DELETE FROM %s%s", + only_marker(rte), + generate_fragment_name(fragmentSchemaName, fragmentTableName)); + + if(rte->eref != NULL) + appendStringInfo(buf, " %s", + quote_identifier(rte->eref->aliasname)); + } + else + { + appendStringInfo(buf, "DELETE FROM %s%s", + only_marker(rte), + generate_relation_or_shard_name(rte->relid, + context->distrelid, + context->shardid, NIL)); + + if (rte->alias != NULL) + appendStringInfo(buf, " %s", + quote_identifier(rte->alias->aliasname)); + } /* Add the USING clause if given */ get_from_clause(query, " USING ", context); @@ -6857,7 +6902,7 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context) ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL); - /* Use schema and table name from the remote alias */ + /* use schema and table name from the remote alias */ appendStringInfoString(buf, generate_fragment_name(fragmentSchemaName, fragmentTableName)); diff --git a/src/backend/distributed/utils/ruleutils_96.c b/src/backend/distributed/utils/ruleutils_96.c index dbd976341..0f19fe73e 100644 --- a/src/backend/distributed/utils/ruleutils_96.c +++ b/src/backend/distributed/utils/ruleutils_96.c @@ -3152,20 +3152,43 @@ get_update_query_def(Query *query, deparse_context *context) * Start the query with UPDATE relname SET */ rte = rt_fetch(query->resultRelation, query->rtable); - Assert(rte->rtekind == RTE_RELATION); + if (PRETTY_INDENT(context)) { appendStringInfoChar(buf, ' '); context->indentLevel += PRETTYINDENT_STD; } - appendStringInfo(buf, "UPDATE %s%s", - only_marker(rte), - generate_relation_or_shard_name(rte->relid, - context->distrelid, - context->shardid, NIL)); - if (rte->alias != NULL) - appendStringInfo(buf, " %s", - quote_identifier(rte->alias->aliasname)); + + /* if it's a shard, do differently */ + if (GetRangeTblKind(rte) == CITUS_RTE_SHARD) + { + char *fragmentSchemaName = NULL; + char *fragmentTableName = NULL; + + ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL); + + /* use schema and table name from the remote alias */ + appendStringInfo(buf, "UPDATE %s%s", + only_marker(rte), + generate_fragment_name(fragmentSchemaName, fragmentTableName)); + + if(rte->eref != NULL) + appendStringInfo(buf, " %s", + quote_identifier(rte->eref->aliasname)); + } + else + { + appendStringInfo(buf, "UPDATE %s%s", + only_marker(rte), + generate_relation_or_shard_name(rte->relid, + context->distrelid, + context->shardid, NIL)); + + if (rte->alias != NULL) + appendStringInfo(buf, " %s", + quote_identifier(rte->alias->aliasname)); + } + appendStringInfoString(buf, " SET "); /* Deparse targetlist */ @@ -3349,20 +3372,42 @@ get_delete_query_def(Query *query, deparse_context *context) * Start the query with DELETE FROM relname */ rte = rt_fetch(query->resultRelation, query->rtable); - Assert(rte->rtekind == RTE_RELATION); + if (PRETTY_INDENT(context)) { appendStringInfoChar(buf, ' '); context->indentLevel += PRETTYINDENT_STD; } - appendStringInfo(buf, "DELETE FROM %s%s", - only_marker(rte), - generate_relation_or_shard_name(rte->relid, - context->distrelid, - context->shardid, NIL)); - if (rte->alias != NULL) - appendStringInfo(buf, " %s", - quote_identifier(rte->alias->aliasname)); + + /* if it's a shard, do differently */ + if (GetRangeTblKind(rte) == CITUS_RTE_SHARD) + { + char *fragmentSchemaName = NULL; + char *fragmentTableName = NULL; + + ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL); + + /* use schema and table name from the remote alias */ + appendStringInfo(buf, "DELETE FROM %s%s", + only_marker(rte), + generate_fragment_name(fragmentSchemaName, fragmentTableName)); + + if(rte->eref != NULL) + appendStringInfo(buf, " %s", + quote_identifier(rte->eref->aliasname)); + } + else + { + appendStringInfo(buf, "DELETE FROM %s%s", + only_marker(rte), + generate_relation_or_shard_name(rte->relid, + context->distrelid, + context->shardid, NIL)); + + if (rte->alias != NULL) + appendStringInfo(buf, " %s", + quote_identifier(rte->alias->aliasname)); + } /* Add the USING clause if given */ get_from_clause(query, " USING ", context); @@ -6577,7 +6622,7 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context) ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL); - /* Use schema and table name from the remote alias */ + /* use schema and table name from the remote alias */ appendStringInfoString(buf, generate_fragment_name(fragmentSchemaName, fragmentTableName)); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index a297e8f5c..169d271e7 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -31,12 +31,16 @@ extern MultiPlan * CreateRouterPlan(Query *originalQuery, Query *query, extern MultiPlan * CreateModifyPlan(Query *originalQuery, Query *query, PlannerRestrictionContext * plannerRestrictionContext); -extern bool RouterSelectQuery(Query *originalQuery, - RelationRestrictionContext *restrictionContext, - List **placementList, uint64 *anchorShardId, - List **relationShardList, bool replacePrunedQueryWithDummy); +extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, + RelationRestrictionContext * + restrictionContext, + List **placementList, uint64 *anchorShardId, + List **relationShardList, bool + replacePrunedQueryWithDummy); +extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError); extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList); -extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree); +extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, + bool multiShardQuery); extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex); extern RelationRestrictionContext * CopyRelationRestrictionContext( RelationRestrictionContext *oldContext); @@ -46,7 +50,6 @@ extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); extern void AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval); -extern List * RouterModifyTaskList(Query *query, DeferredErrorMessage **planningError); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 54b1584c9..9670abbf4 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -317,8 +317,8 @@ Custom Scan (Citus Router) Tasks Shown: All -> Task Node: host=localhost port=57638 dbname=regression - -> Update on lineitem_290000 - -> Index Scan using lineitem_pkey_290000 on lineitem_290000 + -> Update on lineitem_290000 lineitem + -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem Index Cond: (l_orderkey = 1) Filter: (l_partkey = 0) -- Test delete @@ -330,8 +330,8 @@ Custom Scan (Citus Router) Tasks Shown: All -> Task Node: host=localhost port=57638 dbname=regression - -> Delete on lineitem_290000 - -> Index Scan using lineitem_pkey_290000 on lineitem_290000 + -> Delete on lineitem_290000 lineitem + -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem Index Cond: (l_orderkey = 1) Filter: (l_partkey = 0) -- Test zero-shard update diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index d5765679e..7e0d6e490 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -317,8 +317,8 @@ Custom Scan (Citus Router) Tasks Shown: All -> Task Node: host=localhost port=57638 dbname=regression - -> Update on lineitem_290000 - -> Index Scan using lineitem_pkey_290000 on lineitem_290000 + -> Update on lineitem_290000 lineitem + -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem Index Cond: (l_orderkey = 1) Filter: (l_partkey = 0) -- Test delete @@ -330,8 +330,8 @@ Custom Scan (Citus Router) Tasks Shown: All -> Task Node: host=localhost port=57638 dbname=regression - -> Delete on lineitem_290000 - -> Index Scan using lineitem_pkey_290000 on lineitem_290000 + -> Delete on lineitem_290000 lineitem + -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem Index Cond: (l_orderkey = 1) Filter: (l_partkey = 0) -- Test zero-shard update diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index b8b304a89..c53e090fa 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -657,3 +657,430 @@ INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING * (1 row) DROP TABLE app_analytics_events; +-- test UPDATE with subqueries +CREATE TABLE raw_table (id bigint, value bigint); +CREATE TABLE summary_table ( + id bigint, + min_value numeric, + average_value numeric, + count int, + uniques int); +SELECT create_distributed_table('raw_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('summary_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO raw_table VALUES (1, 100); +INSERT INTO raw_table VALUES (1, 200); +INSERT INTO raw_table VALUES (1, 200); +INSERT INTO raw_table VALUES (1, 300); +INSERT INTO raw_table VALUES (2, 400); +INSERT INTO raw_table VALUES (2, 500); +INSERT INTO summary_table VALUES (1); +INSERT INTO summary_table VALUES (2); +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+---------------+-------+--------- + 1 | | | | + 2 | | | | +(2 rows) + +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 1 + ) average_query +WHERE id = 1; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | | 200.0000000000000000 | | + 2 | | | | +(2 rows) + +-- try different syntax +UPDATE summary_table SET (min_value, average_value) = + (SELECT min(value), avg(value) FROM raw_table WHERE id = 2) +WHERE id = 2; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | | 200.0000000000000000 | | + 2 | 400 | 450.0000000000000000 | | +(2 rows) + +UPDATE summary_table SET min_value = 100 + WHERE id IN (SELECT id FROM raw_table WHERE id = 1 and value > 100) AND id = 1; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | 100 | 200.0000000000000000 | | + 2 | 400 | 450.0000000000000000 | | +(2 rows) + +-- indeed, we don't need filter on UPDATE explicitly if SELECT already prunes to one shard +UPDATE summary_table SET uniques = 2 + WHERE id IN (SELECT id FROM raw_table WHERE id = 1 and value IN (100, 200)); +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | 100 | 200.0000000000000000 | | 2 + 2 | 400 | 450.0000000000000000 | | +(2 rows) + +-- use inner results for non-partition column +UPDATE summary_table SET uniques = NULL + WHERE min_value IN (SELECT value FROM raw_table WHERE id = 1) AND id = 1; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | 100 | 200.0000000000000000 | | + 2 | 400 | 450.0000000000000000 | | +(2 rows) + +-- these should not update anything +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 4 + ) average_query +WHERE id = 1 AND id = 4; +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 1 + ) average_query +WHERE id = 1 AND id = 4; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | 100 | 200.0000000000000000 | | + 2 | 400 | 450.0000000000000000 | | +(2 rows) + +-- update with NULL value +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 4 + ) average_query +WHERE id = 1; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | 100 | | | + 2 | 400 | 450.0000000000000000 | | +(2 rows) + +-- unsupported multi-shard updates +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table) average_query; +ERROR: cannot run UPDATE command which targets multiple shards +HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). +UPDATE summary_table SET average_value = average_value + 1 WHERE id = + (SELECT id FROM raw_table WHERE value > 100); +ERROR: cannot run UPDATE command which targets multiple shards +HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). +-- test complex queries +UPDATE summary_table +SET + uniques = metrics.expensive_uniques, + count = metrics.total_count +FROM + (SELECT + id, + count(DISTINCT (CASE WHEN value > 100 then value end)) AS expensive_uniques, + count(value) AS total_count + FROM raw_table + WHERE id = 1 + GROUP BY id) metrics +WHERE + summary_table.id = metrics.id AND + summary_table.id = 1; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | 100 | | 4 | 2 + 2 | 400 | 450.0000000000000000 | | +(2 rows) + +-- test joins +UPDATE summary_table SET count = count + 1 FROM raw_table + WHERE raw_table.id = summary_table.id AND summary_table.id = 1; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | 100 | | 5 | 2 + 2 | 400 | 450.0000000000000000 | | +(2 rows) + +-- test with prepared statements +PREPARE prepared_update_with_subquery(int, int) AS + UPDATE summary_table SET count = count + $1 FROM raw_table + WHERE raw_table.id = summary_table.id AND summary_table.id = $2; +-- execute 6 times to trigger prepared statement usage +EXECUTE prepared_update_with_subquery(10, 1); +EXECUTE prepared_update_with_subquery(10, 1); +EXECUTE prepared_update_with_subquery(10, 1); +EXECUTE prepared_update_with_subquery(10, 1); +EXECUTE prepared_update_with_subquery(10, 1); +EXECUTE prepared_update_with_subquery(10, 1); +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | 100 | | 65 | 2 + 2 | 400 | 450.0000000000000000 | | +(2 rows) + +-- test with reference tables +CREATE TABLE reference_raw_table (id bigint, value bigint); +CREATE TABLE reference_summary_table ( + id bigint, + min_value numeric, + average_value numeric, + count int, + uniques int); +SELECT create_reference_table('reference_raw_table'); + create_reference_table +------------------------ + +(1 row) + +SELECT create_reference_table('reference_summary_table'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO reference_raw_table VALUES (1, 100); +INSERT INTO reference_raw_table VALUES (1, 200); +INSERT INTO reference_raw_table VALUES (1, 200); +INSERT INTO reference_raw_table VALUES (1, 300); +INSERT INTO reference_raw_table VALUES (2, 400); +INSERT INTO reference_raw_table VALUES (2, 500); +INSERT INTO reference_summary_table VALUES (1); +INSERT INTO reference_summary_table VALUES (2); +SELECT * FROM reference_summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+---------------+-------+--------- + 1 | | | | + 2 | | | | +(2 rows) + +UPDATE reference_summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM reference_raw_table WHERE id = 1 + ) average_query +WHERE id = 1; +UPDATE reference_summary_table SET (min_value, average_value) = + (SELECT min(value), avg(value) FROM reference_raw_table WHERE id = 2) +WHERE id = 2; +SELECT * FROM reference_summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | | 200.0000000000000000 | | + 2 | 400 | 450.0000000000000000 | | +(2 rows) + +-- no need partition colum equalities on reference tables +UPDATE reference_summary_table SET (count) = + (SELECT id AS inner_id FROM reference_raw_table WHERE value = 500) +WHERE min_value = 400; +SELECT * FROM reference_summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | | 200.0000000000000000 | | + 2 | 400 | 450.0000000000000000 | 2 | +(2 rows) + +-- can read from a reference table and update a distributed table +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM reference_raw_table WHERE id = 1 + ) average_query +WHERE id = 1; +-- cannot read from a distributed table and update a reference table +UPDATE reference_summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 1 + ) average_query +WHERE id = 1; +ERROR: cannot perform select on a distributed table and modify a reference table +UPDATE reference_summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 2 + ) average_query +WHERE id = 1; +ERROR: cannot perform select on a distributed table and modify a reference table +-- test master_modify_multiple_shards() with subqueries and expect to fail +SELECT master_modify_multiple_shards(' + UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 1 + ) average_query + WHERE id = 1'); +ERROR: cannot perform distributed planning for the given modifications +DETAIL: Subqueries are not supported in distributed modifications. +-- test connection API via using COPY +-- COPY on SELECT part +BEGIN; +\COPY raw_table FROM STDIN WITH CSV +INSERT INTO summary_table VALUES (3); +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 3 + ) average_query +WHERE id = 3; +COMMIT; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | 100 | 200.0000000000000000 | 65 | 2 + 2 | 400 | 450.0000000000000000 | | + 3 | | 150.0000000000000000 | | +(3 rows) + +-- COPY on UPDATE part +BEGIN; +INSERT INTO raw_table VALUES (4, 100); +INSERT INTO raw_table VALUES (4, 200); +\COPY summary_table FROM STDIN WITH CSV +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 4 + ) average_query +WHERE id = 4; +COMMIT; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | 100 | 200.0000000000000000 | 65 | 2 + 2 | 400 | 450.0000000000000000 | | + 3 | | 150.0000000000000000 | | + 4 | | 150.0000000000000000 | | +(4 rows) + +-- COPY on both part +BEGIN; +\COPY raw_table FROM STDIN WITH CSV +\COPY summary_table FROM STDIN WITH CSV +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 5 + ) average_query +WHERE id = 5; +COMMIT; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | 100 | 200.0000000000000000 | 65 | 2 + 2 | 400 | 450.0000000000000000 | | + 3 | | 150.0000000000000000 | | + 4 | | 150.0000000000000000 | | + 5 | | 150.0000000000000000 | | +(5 rows) + +-- COPY on reference tables +BEGIN; +\COPY reference_raw_table FROM STDIN WITH CSV +\COPY summary_table FROM STDIN WITH CSV +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM reference_raw_table WHERE id = 6 + ) average_query +WHERE id = 6; +COMMIT; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 1 | 100 | 200.0000000000000000 | 65 | 2 + 2 | 400 | 450.0000000000000000 | | + 3 | | 150.0000000000000000 | | + 4 | | 150.0000000000000000 | | + 5 | | 150.0000000000000000 | | + 6 | | 150.0000000000000000 | | +(6 rows) + +-- test DELETE queries +SELECT * FROM raw_table ORDER BY id, value; + id | value +----+------- + 1 | 100 + 1 | 200 + 1 | 200 + 1 | 300 + 2 | 400 + 2 | 500 + 3 | 100 + 3 | 200 + 4 | 100 + 4 | 200 + 5 | 100 + 5 | 200 +(12 rows) + +DELETE FROM summary_table + WHERE min_value IN (SELECT value FROM raw_table WHERE id = 1) AND id = 1; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 2 | 400 | 450.0000000000000000 | | + 3 | | 150.0000000000000000 | | + 4 | | 150.0000000000000000 | | + 5 | | 150.0000000000000000 | | + 6 | | 150.0000000000000000 | | +(5 rows) + +-- test with different syntax +DELETE FROM summary_table USING raw_table + WHERE summary_table.id = raw_table.id AND raw_table.id = 2; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 3 | | 150.0000000000000000 | | + 4 | | 150.0000000000000000 | | + 5 | | 150.0000000000000000 | | + 6 | | 150.0000000000000000 | | +(4 rows) + +-- cannot read from a distributed table and delete from a reference table +DELETE FROM reference_summary_table USING raw_table + WHERE reference_summary_table.id = raw_table.id AND raw_table.id = 3; +ERROR: cannot perform select on a distributed table and modify a reference table +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 3 | | 150.0000000000000000 | | + 4 | | 150.0000000000000000 | | + 5 | | 150.0000000000000000 | | + 6 | | 150.0000000000000000 | | +(4 rows) + +-- test connection API via using COPY with DELETEs +BEGIN; +\COPY summary_table FROM STDIN WITH CSV +DELETE FROM summary_table USING raw_table + WHERE summary_table.id = raw_table.id AND raw_table.id = 1; +DELETE FROM summary_table USING reference_raw_table + WHERE summary_table.id = reference_raw_table.id AND reference_raw_table.id = 2; +COMMIT; +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+----------------------+-------+--------- + 3 | | 150.0000000000000000 | | + 4 | | 150.0000000000000000 | | + 5 | | 150.0000000000000000 | | + 6 | | 150.0000000000000000 | | +(4 rows) + +-- test DELETEs with prepared statements +PREPARE prepared_delete_with_join(int) AS + DELETE FROM summary_table USING raw_table + WHERE summary_table.id = raw_table.id AND raw_table.id = $1; +INSERT INTO raw_table VALUES (6, 100); +-- execute 6 times to trigger prepared statement usage +EXECUTE prepared_delete_with_join(1); +EXECUTE prepared_delete_with_join(2); +EXECUTE prepared_delete_with_join(3); +EXECUTE prepared_delete_with_join(4); +EXECUTE prepared_delete_with_join(5); +EXECUTE prepared_delete_with_join(6); +SELECT * FROM summary_table ORDER BY id; + id | min_value | average_value | count | uniques +----+-----------+---------------+-------+--------- +(0 rows) + +DROP TABLE raw_table; +DROP TABLE summary_table; +DROP TABLE reference_raw_table; +DROP TABLE reference_summary_table; diff --git a/src/test/regress/expected/multi_mx_explain.out b/src/test/regress/expected/multi_mx_explain.out index 16b93b645..51e42f86c 100644 --- a/src/test/regress/expected/multi_mx_explain.out +++ b/src/test/regress/expected/multi_mx_explain.out @@ -335,8 +335,8 @@ Custom Scan (Citus Router) Tasks Shown: All -> Task Node: host=localhost port=57637 dbname=regression - -> Update on lineitem_mx_1220052 - -> Index Scan using lineitem_mx_pkey_1220052 on lineitem_mx_1220052 + -> Update on lineitem_mx_1220052 lineitem_mx + -> Index Scan using lineitem_mx_pkey_1220052 on lineitem_mx_1220052 lineitem_mx Index Cond: (l_orderkey = 1) Filter: (l_partkey = 0) -- Test delete @@ -348,8 +348,8 @@ Custom Scan (Citus Router) Tasks Shown: All -> Task Node: host=localhost port=57637 dbname=regression - -> Delete on lineitem_mx_1220052 - -> Index Scan using lineitem_mx_pkey_1220052 on lineitem_mx_1220052 + -> Delete on lineitem_mx_1220052 lineitem_mx + -> Index Scan using lineitem_mx_pkey_1220052 on lineitem_mx_1220052 lineitem_mx Index Cond: (l_orderkey = 1) Filter: (l_partkey = 0) -- Test single-shard SELECT diff --git a/src/test/regress/expected/multi_upsert.out b/src/test/regress/expected/multi_upsert.out index 089d9fe4c..62cd90bd2 100644 --- a/src/test/regress/expected/multi_upsert.out +++ b/src/test/regress/expected/multi_upsert.out @@ -236,7 +236,8 @@ INSERT INTO dropcol_distributed AS dropcol (key, keep1) VALUES (1, '5') ON CONFL -- subquery in the SET clause INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO UPDATE SET other_col = (SELECT count(*) from upsert_test); -ERROR: subqueries are not supported in distributed modifications +ERROR: cannot perform distributed planning for the given modifications +DETAIL: Subqueries are not supported in distributed modifications. -- non mutable function call in the SET INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO UPDATE SET other_col = random()::int; diff --git a/src/test/regress/output/multi_master_delete_protocol.source b/src/test/regress/output/multi_master_delete_protocol.source index 05107af81..24255a337 100644 --- a/src/test/regress/output/multi_master_delete_protocol.source +++ b/src/test/regress/output/multi_master_delete_protocol.source @@ -30,7 +30,7 @@ DETAIL: Where clause includes a column other than partition column -- Check that free-form deletes are not supported. DELETE FROM customer_delete_protocol WHERE c_custkey > 100; ERROR: cannot run DELETE command which targets multiple shards -HINT: Consider using an equality filter on partition column "c_custkey" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). You can also use master_apply_delete_command() to drop all shards satisfying delete criteria. +HINT: Consider using an equality filter on partition column "c_custkey" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). -- Check that we delete a shard if and only if all rows in the shard satisfy the condition. SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol WHERE c_custkey > 6500'); diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 583301fe8..518c4f37c 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -429,3 +429,335 @@ INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING i INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *; DROP TABLE app_analytics_events; + +-- test UPDATE with subqueries +CREATE TABLE raw_table (id bigint, value bigint); +CREATE TABLE summary_table ( + id bigint, + min_value numeric, + average_value numeric, + count int, + uniques int); + +SELECT create_distributed_table('raw_table', 'id'); +SELECT create_distributed_table('summary_table', 'id'); + +INSERT INTO raw_table VALUES (1, 100); +INSERT INTO raw_table VALUES (1, 200); +INSERT INTO raw_table VALUES (1, 200); +INSERT INTO raw_table VALUES (1, 300); +INSERT INTO raw_table VALUES (2, 400); +INSERT INTO raw_table VALUES (2, 500); + +INSERT INTO summary_table VALUES (1); +INSERT INTO summary_table VALUES (2); + +SELECT * FROM summary_table ORDER BY id; + +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 1 + ) average_query +WHERE id = 1; + +SELECT * FROM summary_table ORDER BY id; + +-- try different syntax +UPDATE summary_table SET (min_value, average_value) = + (SELECT min(value), avg(value) FROM raw_table WHERE id = 2) +WHERE id = 2; + +SELECT * FROM summary_table ORDER BY id; + +UPDATE summary_table SET min_value = 100 + WHERE id IN (SELECT id FROM raw_table WHERE id = 1 and value > 100) AND id = 1; + +SELECT * FROM summary_table ORDER BY id; + +-- indeed, we don't need filter on UPDATE explicitly if SELECT already prunes to one shard +UPDATE summary_table SET uniques = 2 + WHERE id IN (SELECT id FROM raw_table WHERE id = 1 and value IN (100, 200)); + +SELECT * FROM summary_table ORDER BY id; + +-- use inner results for non-partition column +UPDATE summary_table SET uniques = NULL + WHERE min_value IN (SELECT value FROM raw_table WHERE id = 1) AND id = 1; + +SELECT * FROM summary_table ORDER BY id; + +-- these should not update anything +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 4 + ) average_query +WHERE id = 1 AND id = 4; + +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 1 + ) average_query +WHERE id = 1 AND id = 4; + +SELECT * FROM summary_table ORDER BY id; + +-- update with NULL value +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 4 + ) average_query +WHERE id = 1; + +SELECT * FROM summary_table ORDER BY id; + +-- unsupported multi-shard updates +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table) average_query; + +UPDATE summary_table SET average_value = average_value + 1 WHERE id = + (SELECT id FROM raw_table WHERE value > 100); + +-- test complex queries +UPDATE summary_table +SET + uniques = metrics.expensive_uniques, + count = metrics.total_count +FROM + (SELECT + id, + count(DISTINCT (CASE WHEN value > 100 then value end)) AS expensive_uniques, + count(value) AS total_count + FROM raw_table + WHERE id = 1 + GROUP BY id) metrics +WHERE + summary_table.id = metrics.id AND + summary_table.id = 1; + +SELECT * FROM summary_table ORDER BY id; + +-- test joins +UPDATE summary_table SET count = count + 1 FROM raw_table + WHERE raw_table.id = summary_table.id AND summary_table.id = 1; + +SELECT * FROM summary_table ORDER BY id; + +-- test with prepared statements +PREPARE prepared_update_with_subquery(int, int) AS + UPDATE summary_table SET count = count + $1 FROM raw_table + WHERE raw_table.id = summary_table.id AND summary_table.id = $2; + +-- execute 6 times to trigger prepared statement usage +EXECUTE prepared_update_with_subquery(10, 1); +EXECUTE prepared_update_with_subquery(10, 1); +EXECUTE prepared_update_with_subquery(10, 1); +EXECUTE prepared_update_with_subquery(10, 1); +EXECUTE prepared_update_with_subquery(10, 1); +EXECUTE prepared_update_with_subquery(10, 1); + +SELECT * FROM summary_table ORDER BY id; + +-- test with reference tables + +CREATE TABLE reference_raw_table (id bigint, value bigint); +CREATE TABLE reference_summary_table ( + id bigint, + min_value numeric, + average_value numeric, + count int, + uniques int); + +SELECT create_reference_table('reference_raw_table'); +SELECT create_reference_table('reference_summary_table'); + +INSERT INTO reference_raw_table VALUES (1, 100); +INSERT INTO reference_raw_table VALUES (1, 200); +INSERT INTO reference_raw_table VALUES (1, 200); +INSERT INTO reference_raw_table VALUES (1, 300); +INSERT INTO reference_raw_table VALUES (2, 400); +INSERT INTO reference_raw_table VALUES (2, 500); + +INSERT INTO reference_summary_table VALUES (1); +INSERT INTO reference_summary_table VALUES (2); + +SELECT * FROM reference_summary_table ORDER BY id; + +UPDATE reference_summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM reference_raw_table WHERE id = 1 + ) average_query +WHERE id = 1; + +UPDATE reference_summary_table SET (min_value, average_value) = + (SELECT min(value), avg(value) FROM reference_raw_table WHERE id = 2) +WHERE id = 2; + +SELECT * FROM reference_summary_table ORDER BY id; + +-- no need partition colum equalities on reference tables +UPDATE reference_summary_table SET (count) = + (SELECT id AS inner_id FROM reference_raw_table WHERE value = 500) +WHERE min_value = 400; + +SELECT * FROM reference_summary_table ORDER BY id; + +-- can read from a reference table and update a distributed table +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM reference_raw_table WHERE id = 1 + ) average_query +WHERE id = 1; + +-- cannot read from a distributed table and update a reference table +UPDATE reference_summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 1 + ) average_query +WHERE id = 1; + +UPDATE reference_summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 2 + ) average_query +WHERE id = 1; + +-- test master_modify_multiple_shards() with subqueries and expect to fail +SELECT master_modify_multiple_shards(' + UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 1 + ) average_query + WHERE id = 1'); + +-- test connection API via using COPY + +-- COPY on SELECT part +BEGIN; + +\COPY raw_table FROM STDIN WITH CSV +3, 100 +3, 200 +\. + +INSERT INTO summary_table VALUES (3); + +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 3 + ) average_query +WHERE id = 3; + +COMMIT; + +SELECT * FROM summary_table ORDER BY id; + +-- COPY on UPDATE part +BEGIN; + +INSERT INTO raw_table VALUES (4, 100); +INSERT INTO raw_table VALUES (4, 200); + +\COPY summary_table FROM STDIN WITH CSV +4,,,, +\. + +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 4 + ) average_query +WHERE id = 4; + +COMMIT; + +SELECT * FROM summary_table ORDER BY id; + +-- COPY on both part +BEGIN; + +\COPY raw_table FROM STDIN WITH CSV +5, 100 +5, 200 +\. + +\COPY summary_table FROM STDIN WITH CSV +5,,,, +\. + +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM raw_table WHERE id = 5 + ) average_query +WHERE id = 5; + +COMMIT; + +SELECT * FROM summary_table ORDER BY id; + +-- COPY on reference tables +BEGIN; + +\COPY reference_raw_table FROM STDIN WITH CSV +6, 100 +6, 200 +\. + +\COPY summary_table FROM STDIN WITH CSV +6,,,, +\. + +UPDATE summary_table SET average_value = average_query.average FROM ( + SELECT avg(value) AS average FROM reference_raw_table WHERE id = 6 + ) average_query +WHERE id = 6; + +COMMIT; + +SELECT * FROM summary_table ORDER BY id; + +-- test DELETE queries +SELECT * FROM raw_table ORDER BY id, value; + +DELETE FROM summary_table + WHERE min_value IN (SELECT value FROM raw_table WHERE id = 1) AND id = 1; + +SELECT * FROM summary_table ORDER BY id; + +-- test with different syntax +DELETE FROM summary_table USING raw_table + WHERE summary_table.id = raw_table.id AND raw_table.id = 2; + +SELECT * FROM summary_table ORDER BY id; + +-- cannot read from a distributed table and delete from a reference table +DELETE FROM reference_summary_table USING raw_table + WHERE reference_summary_table.id = raw_table.id AND raw_table.id = 3; + +SELECT * FROM summary_table ORDER BY id; + +-- test connection API via using COPY with DELETEs +BEGIN; + +\COPY summary_table FROM STDIN WITH CSV +1,,,, +2,,,, +\. + +DELETE FROM summary_table USING raw_table + WHERE summary_table.id = raw_table.id AND raw_table.id = 1; + +DELETE FROM summary_table USING reference_raw_table + WHERE summary_table.id = reference_raw_table.id AND reference_raw_table.id = 2; + +COMMIT; + +SELECT * FROM summary_table ORDER BY id; + +-- test DELETEs with prepared statements +PREPARE prepared_delete_with_join(int) AS + DELETE FROM summary_table USING raw_table + WHERE summary_table.id = raw_table.id AND raw_table.id = $1; + +INSERT INTO raw_table VALUES (6, 100); + +-- execute 6 times to trigger prepared statement usage +EXECUTE prepared_delete_with_join(1); +EXECUTE prepared_delete_with_join(2); +EXECUTE prepared_delete_with_join(3); +EXECUTE prepared_delete_with_join(4); +EXECUTE prepared_delete_with_join(5); +EXECUTE prepared_delete_with_join(6); + +SELECT * FROM summary_table ORDER BY id; + +DROP TABLE raw_table; +DROP TABLE summary_table; +DROP TABLE reference_raw_table; +DROP TABLE reference_summary_table;