From 32bcd610c10e2ad354e5e9457e3f8227bff4ac8e Mon Sep 17 00:00:00 2001 From: velioglu Date: Wed, 25 Apr 2018 00:06:53 +0300 Subject: [PATCH] Support modify queries with multiple tables With this commit we begin to support modify queries with multiple tables if these queries are pushdownable. --- .../executor/multi_router_executor.c | 10 +- .../master/master_modify_multiple_shards.c | 2 +- .../distributed/planner/deparse_shard_query.c | 2 +- .../distributed/planner/distributed_planner.c | 5 +- .../planner/insert_select_planner.c | 31 +- .../planner/multi_physical_planner.c | 109 +++- .../planner/multi_router_planner.c | 262 ++++++---- src/backend/distributed/utils/citus_clauses.c | 19 + .../distributed/utils/citus_copyfuncs.c | 2 +- .../distributed/utils/citus_outfuncs.c | 2 +- .../distributed/utils/citus_readfuncs.c | 2 +- .../distributed/multi_physical_planner.h | 13 +- .../distributed/multi_router_planner.h | 12 +- .../isolation_modify_with_subquery_vs_dml.out | 127 +++++ src/test/regress/expected/multi_explain.out | 43 +- src/test/regress/expected/multi_explain_0.out | 43 +- .../regress/expected/multi_modifications.out | 10 +- .../regress/expected/multi_shard_modify.out | 8 +- .../expected/multi_shard_update_delete.out | 472 ++++++++++++++++-- .../expected/multi_shard_update_delete_0.out | 472 ++++++++++++++++-- src/test/regress/expected/with_modifying.out | 35 +- src/test/regress/isolation_schedule | 1 + ...isolation_modify_with_subquery_vs_dml.spec | 91 ++++ src/test/regress/sql/multi_explain.sql | 16 +- .../regress/sql/multi_shard_update_delete.sql | 420 +++++++++++++++- src/test/regress/sql/with_modifying.sql | 1 + 26 files changed, 1940 insertions(+), 270 deletions(-) create mode 100644 src/test/regress/expected/isolation_modify_with_subquery_vs_dml.out create mode 100644 src/test/regress/specs/isolation_modify_with_subquery_vs_dml.spec diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 9cddba5c3..20be2fcb1 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -310,8 +310,7 @@ AcquireExecutorMultiShardLocks(List *taskList) /* * If the task has a subselect, then we may need to lock the shards from which * the query selects as well to prevent the subselects from seeing different - * results on different replicas. In particular this prevents INSERT..SELECT - * commands from having different effects on different placements. + * results on different replicas. */ if (RequiresConsistentSnapshot(task)) @@ -330,18 +329,17 @@ AcquireExecutorMultiShardLocks(List *taskList) /* * RequiresConsistentSnapshot returns true if the given task need to take - * the necessary locks to ensure that a subquery in the INSERT ... SELECT - * query returns the same output for all task placements. + * the necessary locks to ensure that a subquery in the modify query + * returns the same output for all task placements. */ static bool RequiresConsistentSnapshot(Task *task) { bool requiresIsolation = false; - if (!task->insertSelectQuery) + if (!task->modifyWithSubquery) { /* - * Only INSERT/SELECT commands currently require SELECT isolation. * Other commands do not read from other shards. */ diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 077f54827..3907c0cdf 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -151,7 +151,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) { bool multiShardQuery = true; DeferredErrorMessage *error = - ModifyQuerySupported(modifyQuery, modifyQuery, multiShardQuery); + ModifyQuerySupported(modifyQuery, modifyQuery, multiShardQuery, NULL); if (error) { diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 3192cb80b..660c4a0ac 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -58,7 +58,7 @@ RebuildQueryStrings(Query *originalQuery, List *taskList) { query = copyObject(originalQuery); } - else if (task->insertSelectQuery) + else if (query->commandType == CMD_INSERT && task->modifyWithSubquery) { /* for INSERT..SELECT, adjust shard names in SELECT part */ RangeTblEntry *copiedInsertRte = NULL; diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index ee51376bb..371d769a2 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -593,9 +593,6 @@ CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query, ParamListInfo boundParams, bool hasUnresolvedParams, PlannerRestrictionContext *plannerRestrictionContext) { - RelationRestrictionContext *relationRestrictionContext = - plannerRestrictionContext->relationRestrictionContext; - DistributedPlan *distributedPlan = NULL; MultiTreeRoot *logicalPlan = NULL; List *subPlanList = NIL; @@ -608,7 +605,7 @@ CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query, */ distributedPlan = CreateRouterPlan(originalQuery, query, - relationRestrictionContext); + plannerRestrictionContext); if (distributedPlan != NULL) { if (distributedPlan->planningError == NULL) diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index a885d2f0c..de312022d 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -44,8 +44,8 @@ static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery, plannerRestrictionContext); static Task * RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval, - RelationRestrictionContext * - restrictionContext, + PlannerRestrictionContext * + plannerRestrictionContext, uint32 taskIdIndex, bool allRelationsJoinedOnPartitionKey); static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree, @@ -252,14 +252,14 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, Task *modifyTask = NULL; modifyTask = RouterModifyTaskForShardInterval(originalQuery, targetShardInterval, - relationRestrictionContext, + plannerRestrictionContext, taskIdIndex, allDistributionKeysInQueryAreEqual); /* add the task if it could be created */ if (modifyTask != NULL) { - modifyTask->insertSelectQuery = true; + modifyTask->modifyWithSubquery = true; sqlTaskList = lappend(sqlTaskList, modifyTask); } @@ -404,7 +404,7 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, */ static Task * RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval, - RelationRestrictionContext *restrictionContext, + PlannerRestrictionContext *plannerRestrictionContext, uint32 taskIdIndex, bool safeToPushdownSubquery) { @@ -417,8 +417,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter Oid distributedTableId = shardInterval->relationId; DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); - RelationRestrictionContext *copiedRestrictionContext = - CopyRelationRestrictionContext(restrictionContext); + PlannerRestrictionContext *copyOfPlannerRestrictionContext = palloc0( + sizeof(PlannerRestrictionContext)); StringInfo queryString = makeStringInfo(); ListCell *restrictionCell = NULL; @@ -431,11 +431,22 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter List *intersectedPlacementList = NULL; bool upsertQuery = false; bool replacePrunedQueryWithDummy = false; - bool allReferenceTables = restrictionContext->allReferenceTables; + bool allReferenceTables = + plannerRestrictionContext->relationRestrictionContext->allReferenceTables; List *shardOpExpressions = NIL; RestrictInfo *shardRestrictionList = NULL; DeferredErrorMessage *planningError = NULL; bool multiShardModifyQuery = false; + List *relationRestrictionList = NIL; + + copyOfPlannerRestrictionContext->relationRestrictionContext = + CopyRelationRestrictionContext( + plannerRestrictionContext->relationRestrictionContext); + copyOfPlannerRestrictionContext->joinRestrictionContext = + plannerRestrictionContext->joinRestrictionContext; + relationRestrictionList = + copyOfPlannerRestrictionContext->relationRestrictionContext-> + relationRestrictionList; /* grab shared metadata lock to stop concurrent placement additions */ LockShardDistributionMetadata(shardId, ShareLock); @@ -444,7 +455,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter * Replace the partitioning qual parameter value in all baserestrictinfos. * Note that this has to be done on a copy, as the walker modifies in place. */ - foreach(restrictionCell, copiedRestrictionContext->relationRestrictionList) + foreach(restrictionCell, relationRestrictionList) { RelationRestriction *restriction = lfirst(restrictionCell); List *originalBaseRestrictInfo = restriction->relOptInfo->baserestrictinfo; @@ -493,7 +504,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter * If we can, we also rely on the side-effects that all RTEs have been updated * to point to the relevant nodes and selectPlacementList is determined. */ - planningError = PlanRouterQuery(copiedSubquery, copiedRestrictionContext, + planningError = PlanRouterQuery(copiedSubquery, copyOfPlannerRestrictionContext, &selectPlacementList, &selectAnchorShardId, &relationShardList, replacePrunedQueryWithDummy, &multiShardModifyQuery); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 77f49ca20..a0e7803ad 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -127,15 +127,12 @@ static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray, static bool DistributedPlanRouterExecutable(DistributedPlan *distributedPlan); static Job * BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestrictionContext); -static List * QueryPushdownSqlTaskList(Query *query, uint64 jobId, - RelationRestrictionContext * - relationRestrictionContext, - List *prunedRelationShardList, TaskType taskType); static void ErrorIfUnsupportedShardDistribution(Query *query); static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, uint32 taskId, - TaskType taskType); + TaskType taskType, + bool modifyRequiresMasterEvaluation); static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval, ShardInterval *secondInterval); @@ -202,6 +199,7 @@ static uint32 FinalTargetEntryCount(List *targetEntryList); static bool CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval); + /* * CreatePhysicalDistributedPlan is the entry point for physical plan generation. The * function builds the physical plan; this plan includes the list of tasks to be @@ -2008,7 +2006,8 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction sqlTaskList = QueryPushdownSqlTaskList(job->jobQuery, job->jobId, plannerRestrictionContext-> relationRestrictionContext, - prunedRelationShardList, SQL_TASK); + prunedRelationShardList, SQL_TASK, + false); } else { @@ -2068,7 +2067,8 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction List * QueryPushdownSqlTaskList(Query *query, uint64 jobId, RelationRestrictionContext *relationRestrictionContext, - List *prunedRelationShardList, TaskType taskType) + List *prunedRelationShardList, TaskType taskType, bool + modifyRequiresMasterEvaluation) { List *sqlTaskList = NIL; ListCell *restrictionCell = NULL; @@ -2168,13 +2168,25 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, subqueryTask = QueryPushdownTaskCreate(query, shardOffset, relationRestrictionContext, taskIdIndex, - taskType); + taskType, modifyRequiresMasterEvaluation); subqueryTask->jobId = jobId; sqlTaskList = lappend(sqlTaskList, subqueryTask); ++taskIdIndex; } + /* If it is a modify task with multiple tables */ + if (taskType == MODIFY_TASK && list_length( + relationRestrictionContext->relationRestrictionList) > 1) + { + ListCell *taskCell = NULL; + foreach(taskCell, sqlTaskList) + { + Task *task = (Task *) lfirst(taskCell); + task->modifyWithSubquery = true; + } + } + return sqlTaskList; } @@ -2198,6 +2210,7 @@ ErrorIfUnsupportedShardDistribution(Query *query) uint32 relationIndex = 0; uint32 rangeDistributedRelationCount = 0; uint32 hashDistributedRelationCount = 0; + uint32 appendDistributedRelationCount = 0; foreach(relationIdCell, relationIdList) { @@ -2222,10 +2235,17 @@ ErrorIfUnsupportedShardDistribution(Query *query) } else { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("Currently append partitioned relations " - "are not supported"))); + DistTableCacheEntry *distTableEntry = DistributedTableCacheEntry(relationId); + if (distTableEntry->hasOverlappingShardInterval) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot push down this subquery"), + errdetail("Currently append partitioned relations " + "with overlapping shard intervals are " + "not supported"))); + } + + appendDistributedRelationCount++; } } @@ -2236,6 +2256,20 @@ ErrorIfUnsupportedShardDistribution(Query *query) errdetail("A query including both range and hash " "partitioned relations are unsupported"))); } + else if ((rangeDistributedRelationCount > 0) && (appendDistributedRelationCount > 0)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot push down this subquery"), + errdetail("A query including both range and append " + "partitioned relations are unsupported"))); + } + else if ((appendDistributedRelationCount > 0) && (hashDistributedRelationCount > 0)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot push down this subquery"), + errdetail("A query including both append and hash " + "partitioned relations are unsupported"))); + } foreach(relationIdCell, nonReferenceRelations) { @@ -2273,7 +2307,7 @@ ErrorIfUnsupportedShardDistribution(Query *query) static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, uint32 taskId, - TaskType taskType) + TaskType taskType, bool modifyRequiresMasterEvaluation) { Query *taskQuery = copyObject(originalQuery); @@ -2285,6 +2319,20 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, List *selectPlacementList = NIL; uint64 jobId = INVALID_JOB_ID; uint64 anchorShardId = INVALID_SHARD_ID; + bool modifyWithSubselect = false; + RangeTblEntry *resultRangeTable = NULL; + Oid resultRelationOid = InvalidOid; + + /* + * If it is a modify query with sub-select, we need to set result relation shard's id + * as anchor shard id. + */ + if (UpdateOrDeleteQuery(originalQuery)) + { + resultRangeTable = rt_fetch(originalQuery->resultRelation, originalQuery->rtable); + resultRelationOid = resultRangeTable->relid; + modifyWithSubselect = true; + } /* * Find the relevant shard out of each relation for this task. @@ -2310,12 +2358,19 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, anchorShardId = shardInterval->shardId; } } + else if (UpdateOrDeleteQuery(originalQuery)) + { + shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex]; + if (!modifyWithSubselect || relationId == resultRelationOid) + { + /* for UPDATE/DELETE the shard in the result relation becomes the anchor shard */ + anchorShardId = shardInterval->shardId; + } + } else { - /* use the shard from a specific index */ + /* for SELECT we pick an arbitrary shard as the anchor shard */ shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex]; - - /* use a shard from a distributed table as the anchor shard */ anchorShardId = shardInterval->shardId; } @@ -2328,6 +2383,8 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, relationShardList = lappend(relationShardList, relationShard); } + Assert(anchorShardId != INVALID_SHARD_ID); + selectPlacementList = WorkersContainingAllShards(taskShardList); if (list_length(selectPlacementList) == 0) { @@ -2346,14 +2403,22 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, * that the query string is generated as (...) AND (...) as opposed to * (...), (...). */ - taskQuery->jointree->quals = - (Node *) make_ands_explicit((List *) taskQuery->jointree->quals); + if (taskQuery->jointree->quals != NULL && IsA(taskQuery->jointree->quals, List)) + { + taskQuery->jointree->quals = (Node *) make_ands_explicit( + (List *) taskQuery->jointree->quals); + } - /* and generate the full query string */ - pg_get_query_def(taskQuery, queryString); - ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data))); + subqueryTask = CreateBasicTask(jobId, taskId, taskType, NULL); + + if ((taskType == MODIFY_TASK && !modifyRequiresMasterEvaluation) || + taskType == SQL_TASK) + { + pg_get_query_def(taskQuery, queryString); + ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data))); + subqueryTask->queryString = queryString->data; + } - subqueryTask = CreateBasicTask(jobId, taskId, taskType, queryString->data); subqueryTask->dependedTaskList = NULL; subqueryTask->anchorShardId = anchorShardId; subqueryTask->taskPlacementList = selectPlacementList; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index a9a7f5c9d..9075401cf 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -38,6 +38,7 @@ #include "distributed/multi_server_executor.h" #include "distributed/listutils.h" #include "distributed/citus_ruleutils.h" +#include "distributed/query_pushdown_planning.h" #include "distributed/relation_restriction_equivalence.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" @@ -108,9 +109,13 @@ bool EnableRouterExecution = true; /* planner functions forward declarations */ static DistributedPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, - RelationRestrictionContext * - restrictionContext); + PlannerRestrictionContext * + plannerRestrictionContext); static bool IsTidColumn(Node *node); +static DeferredErrorMessage * MultiShardModifyQuerySupported(Query *originalQuery, + PlannerRestrictionContext * + plannerRestrictionContext); +static bool HasDangerousJoinUsing(List *rtableList, Node *jtnode); static bool MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce); static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state); @@ -124,7 +129,7 @@ static bool CanShardPrune(Oid distributedTableId, Query *query); static Job * CreateJob(Query *query); static Task * CreateTask(TaskType taskType); static Job * RouterJob(Query *originalQuery, - RelationRestrictionContext *restrictionContext, + PlannerRestrictionContext *plannerRestrictionContext, DeferredErrorMessage **planningError); static bool RelationPrunesToMultipleShards(List *relationShardList); static void NormalizeMultiRowInsertTargetList(Query *query); @@ -147,8 +152,7 @@ static List * SingleShardSelectTaskList(Query *query, List *relationShardList, List *placementList, uint64 shardId); static List * SingleShardModifyTaskList(Query *query, List *relationShardList, List *placementList, uint64 shardId); -static List * MultiShardModifyTaskList(Query *originalQuery, List *relationShardList, - bool requiresMasterEvaluation); + /* * CreateRouterPlan attempts to create a router executor plan for the given @@ -157,12 +161,13 @@ static List * MultiShardModifyTaskList(Query *originalQuery, List *relationShard */ DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query, - RelationRestrictionContext *restrictionContext) + PlannerRestrictionContext *plannerRestrictionContext) { - if (MultiRouterPlannableQuery(query, restrictionContext)) + if (MultiRouterPlannableQuery(query, + plannerRestrictionContext->relationRestrictionContext)) { return CreateSingleTaskRouterPlan(originalQuery, query, - restrictionContext); + plannerRestrictionContext); } /* @@ -189,7 +194,8 @@ CreateModifyPlan(Query *originalQuery, Query *query, distributedPlan->operation = query->commandType; distributedPlan->planningError = ModifyQuerySupported(query, originalQuery, - multiShardQuery); + multiShardQuery, + plannerRestrictionContext); if (distributedPlan->planningError != NULL) { return distributedPlan; @@ -197,10 +203,7 @@ CreateModifyPlan(Query *originalQuery, Query *query, if (UpdateOrDeleteQuery(query)) { - RelationRestrictionContext *restrictionContext = - plannerRestrictionContext->relationRestrictionContext; - - job = RouterJob(originalQuery, restrictionContext, + job = RouterJob(originalQuery, plannerRestrictionContext, &distributedPlan->planningError); } else @@ -238,7 +241,7 @@ CreateModifyPlan(Query *originalQuery, Query *query, */ static DistributedPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, - RelationRestrictionContext *restrictionContext) + PlannerRestrictionContext *plannerRestrictionContext) { Job *job = NULL; DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); @@ -253,7 +256,8 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, } /* we cannot have multi shard update/delete query via this code path */ - job = RouterJob(originalQuery, restrictionContext, &distributedPlan->planningError); + job = RouterJob(originalQuery, plannerRestrictionContext, + &distributedPlan->planningError); if (distributedPlan->planningError) { @@ -516,7 +520,8 @@ IsTidColumn(Node *node) * on the rewritten query. */ DeferredErrorMessage * -ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuery) +ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuery, + PlannerRestrictionContext *plannerRestrictionContext) { Oid distributedTableId = ExtractFirstDistributedTableId(queryTree); uint32 rangeTableId = 1; @@ -687,23 +692,31 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer } /* - * Reject queries which involve joins. Note that UPSERTs are exceptional for this case. - * Queries like "INSERT INTO table_name ON CONFLICT DO UPDATE (col) SET other_col = ''" - * contains two range table entries, and we have to allow them. + * We have to allow modify queries with two range table entries, if it is pushdownable. */ - if (commandType != CMD_INSERT && queryTableCount != 1) + if (commandType != CMD_INSERT) { - /* - * We support UPDATE and DELETE with joins unless they are multi shard - * queries. - */ - if (!UpdateOrDeleteQuery(queryTree) || multiShardQuery) + /* We can not get restriction context via master_modify_multiple_shards path */ + if (plannerRestrictionContext == NULL) { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot perform distributed planning for the given " - "modification", - "Joins are not supported in distributed " - "modifications.", NULL); + if (queryTableCount != 1) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot run multi shard modify query with master_modify_multiple_shards when the query involves subquery or join", + "Execute the query without using master_modify_multiple_shards()", + NULL); + } + } + /* If it is a multi-shard modify query with multiple tables */ + else if (multiShardQuery) + { + DeferredErrorMessage *errorMessage = MultiShardModifyQuerySupported( + originalQuery, plannerRestrictionContext); + + if (errorMessage != NULL) + { + return errorMessage; + } } } @@ -898,6 +911,114 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer } +/* + * MultiShardModifyQuerySupported returns the error message if the modify query is + * not pushdownable, otherwise it returns NULL. + */ +static DeferredErrorMessage * +MultiShardModifyQuerySupported(Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext) +{ + DeferredErrorMessage *errorMessage = NULL; + RangeTblEntry *resultRangeTable = rt_fetch(originalQuery->resultRelation, + originalQuery->rtable); + Oid resultRelationOid = resultRangeTable->relid; + char resultPartitionMethod = PartitionMethod(resultRelationOid); + + if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree)) + { + errorMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "a join with USING causes an internal naming conflict, use " + "ON instead", + NULL, NULL); + } + else if (resultPartitionMethod == DISTRIBUTE_BY_NONE) + { + errorMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "only reference tables may be queried when targeting " + "a reference table with multi shard UPDATE/DELETE queries " + "with multiple tables ", + NULL, NULL); + } + else + { + errorMessage = DeferErrorIfUnsupportedSubqueryPushdown(originalQuery, + plannerRestrictionContext); + } + + return errorMessage; +} + + +/* + * HasDangerousJoinUsing search jointree for unnamed JOIN USING. Check the + * implementation of has_dangerous_join_using in ruleutils. + */ +static bool +HasDangerousJoinUsing(List *rtableList, Node *joinTreeNode) +{ + if (IsA(joinTreeNode, RangeTblRef)) + { + /* nothing to do here */ + } + else if (IsA(joinTreeNode, FromExpr)) + { + FromExpr *fromExpr = (FromExpr *) joinTreeNode; + ListCell *listCell; + + foreach(listCell, fromExpr->fromlist) + { + if (HasDangerousJoinUsing(rtableList, (Node *) lfirst(listCell))) + { + return true; + } + } + } + else if (IsA(joinTreeNode, JoinExpr)) + { + JoinExpr *joinExpr = (JoinExpr *) joinTreeNode; + + /* Is it an unnamed JOIN with USING? */ + if (joinExpr->alias == NULL && joinExpr->usingClause) + { + /* + * Yes, so check each join alias var to see if any of them are not + * simple references to underlying columns. If so, we have a + * dangerous situation and must pick unique aliases. + */ + RangeTblEntry *joinRTE = rt_fetch(joinExpr->rtindex, rtableList); + ListCell *listCell; + + foreach(listCell, joinRTE->joinaliasvars) + { + Var *aliasVar = (Var *) lfirst(listCell); + + if (aliasVar != NULL && !IsA(aliasVar, Var)) + { + return true; + } + } + } + + /* Nope, but inspect children */ + if (HasDangerousJoinUsing(rtableList, joinExpr->larg)) + { + return true; + } + if (HasDangerousJoinUsing(rtableList, joinExpr->rarg)) + { + return true; + } + } + else + { + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(joinTreeNode)); + } + return false; +} + + /* * UpdateOrDeleteQuery checks if the given query is an UPDATE or DELETE command. * If it is, it returns true otherwise it returns false. @@ -1351,7 +1472,7 @@ CreateTask(TaskType taskType) task->upsertQuery = false; task->replicationModel = REPLICATION_MODEL_INVALID; - task->insertSelectQuery = false; + task->modifyWithSubquery = false; task->relationShardList = NIL; return task; @@ -1393,7 +1514,7 @@ ExtractFirstDistributedTableId(Query *query) * multiple shard update/delete queries. */ static Job * -RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext, +RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionContext, DeferredErrorMessage **planningError) { Job *job = NULL; @@ -1412,7 +1533,7 @@ RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext, /* check if this query requires master evaluation */ requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); - (*planningError) = PlanRouterQuery(originalQuery, restrictionContext, + (*planningError) = PlanRouterQuery(originalQuery, plannerRestrictionContext, &placementList, &shardId, &relationShardList, replacePrunedQueryWithDummy, &isMultiShardModifyQuery); @@ -1446,7 +1567,10 @@ RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext, } else if (isMultiShardModifyQuery) { - job->taskList = MultiShardModifyTaskList(originalQuery, relationShardList, + job->taskList = QueryPushdownSqlTaskList(originalQuery, 0, + plannerRestrictionContext-> + relationRestrictionContext, + relationShardList, MODIFY_TASK, requiresMasterEvaluation); } else @@ -1482,46 +1606,6 @@ SingleShardSelectTaskList(Query *query, List *relationShardList, List *placement } -/* - * MultiShardModifyTaskList generates task list for multi shard update/delete - * queries. - */ -static List * -MultiShardModifyTaskList(Query *originalQuery, List *relationShardList, - bool requiresMasterEvaluation) -{ - List *taskList = NIL; - ListCell *relationShardCell = NULL; - int taskId = 1; - - foreach(relationShardCell, relationShardList) - { - RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell); - List *relationShardList = list_make1(relationShard); - Task *task = CreateTask(MODIFY_TASK); - - if (!requiresMasterEvaluation) - { - Query *copiedQuery = copyObject(originalQuery); - StringInfo shardQueryString = makeStringInfo(); - - UpdateRelationToShardNames((Node *) copiedQuery, relationShardList); - pg_get_query_def(copiedQuery, shardQueryString); - - task->queryString = shardQueryString->data; - } - - task->taskId = taskId++; - task->anchorShardId = relationShard->shardId; - task->relationShardList = relationShardList; - - taskList = lappend(taskList, task); - } - - return taskList; -} - - /* * SingleShardModifyTaskList generates a task for single shard update/delete query * and returns it as a list. @@ -1657,7 +1741,8 @@ SelectsFromDistributedTable(List *rangeTableList) * 0 values in UpdateRelationToShardNames. */ DeferredErrorMessage * -PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext, +PlanRouterQuery(Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext, List **placementList, uint64 *anchorShardId, List **relationShardList, bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery) { @@ -1671,11 +1756,12 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon bool shardsPresent = false; uint64 shardId = INVALID_SHARD_ID; CmdType commandType = originalQuery->commandType; - bool isMultiShardModifyQuery = false; *placementList = NIL; + prunedRelationShardList = TargetShardIntervalsForQuery(originalQuery, - restrictionContext, + plannerRestrictionContext-> + relationRestrictionContext, &isMultiShardQuery); if (isMultiShardQuery) @@ -1695,13 +1781,23 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon Assert(UpdateOrDeleteQuery(originalQuery)); planningError = ModifyQuerySupported(originalQuery, originalQuery, - isMultiShardQuery); + isMultiShardQuery, + plannerRestrictionContext); if (planningError != NULL) { return planningError; } - - isMultiShardModifyQuery = true; + else + { + /* + * If the modify query uses multiple shards and update/delete query, relation + * shard list should be returned as list of shard list for each table. Check + * the implementation of QueryPushdownSqlTaskList. + */ + *relationShardList = prunedRelationShardList; + *multiShardModifyQuery = true; + return planningError; + } } foreach(prunedRelationShardListCell, prunedRelationShardList) @@ -1729,12 +1825,6 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon } } - if (isMultiShardModifyQuery) - { - *multiShardModifyQuery = true; - return planningError; - } - /* * We bail out if there are RTEs that prune multiple shards above, but * there can also be multiple RTEs that reference the same relation. diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 3cc3cc9a8..9f4569bfd 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -233,6 +233,16 @@ PartiallyEvaluateExpressionMutator(Node *expression, FunctionEvaluationContext * context); } + /* ExecInitExpr cannot handle PARAM_SUBLINK */ + if (IsA(expression, Param)) + { + Param *param = (Param *) expression; + if (param->paramkind == PARAM_SUBLINK) + { + return expression; + } + } + if (IsA(expression, Var)) { context->containsVar = true; @@ -243,6 +253,15 @@ PartiallyEvaluateExpressionMutator(Node *expression, FunctionEvaluationContext * context); } + /* expression_tree_mutator does not descend into Query trees */ + if (IsA(expression, Query)) + { + Query *query = (Query *) expression; + + return (Node *) query_tree_mutator(query, PartiallyEvaluateExpressionMutator, + context, 0); + } + copy = expression_tree_mutator(expression, PartiallyEvaluateExpressionMutator, &localContext); diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 31976b406..b7fa63c27 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -245,7 +245,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_NODE_FIELD(taskExecution); COPY_SCALAR_FIELD(upsertQuery); COPY_SCALAR_FIELD(replicationModel); - COPY_SCALAR_FIELD(insertSelectQuery); + COPY_SCALAR_FIELD(modifyWithSubquery); COPY_NODE_FIELD(relationShardList); COPY_NODE_FIELD(rowValuesLists); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 5e46bf63b..27adb1f2d 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -454,7 +454,7 @@ OutTask(OUTFUNC_ARGS) WRITE_NODE_FIELD(taskExecution); WRITE_BOOL_FIELD(upsertQuery); WRITE_CHAR_FIELD(replicationModel); - WRITE_BOOL_FIELD(insertSelectQuery); + WRITE_BOOL_FIELD(modifyWithSubquery); WRITE_NODE_FIELD(relationShardList); WRITE_NODE_FIELD(rowValuesLists); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index b4847d5d5..1d8ce0e88 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -367,7 +367,7 @@ ReadTask(READFUNC_ARGS) READ_NODE_FIELD(taskExecution); READ_BOOL_FIELD(upsertQuery); READ_CHAR_FIELD(replicationModel); - READ_BOOL_FIELD(insertSelectQuery); + READ_BOOL_FIELD(modifyWithSubquery); READ_NODE_FIELD(relationShardList); READ_NODE_FIELD(rowValuesLists); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index e7ee201da..ce2dbc3d7 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -158,6 +158,10 @@ typedef struct MapMergeJob * * NB: Changing this requires also changing _outTask in citus_outfuncs and _readTask * in citus_readfuncs to correctly (de)serialize this struct. + * + * INSERT ... SELECT queries and modify queries with subqueries or multiple tables + * set modifyWithSubquery to true. We need to use it to take the necessary locks + * to get consistent results for subqueries. */ typedef struct TaskExecution TaskExecution; @@ -180,7 +184,7 @@ typedef struct Task bool upsertQuery; /* only applies to modify tasks */ char replicationModel; /* only applies to modify tasks */ - bool insertSelectQuery; + bool modifyWithSubquery; List *relationShardList; List *rowValuesLists; /* rows to use when building multi-row INSERT */ @@ -329,5 +333,12 @@ extern List * TaskListDifference(const List *list1, const List *list2); extern List * AssignAnchorShardTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList); +/* function declaration for creating Task */ +extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId, + RelationRestrictionContext * + relationRestrictionContext, + List *prunedRelationShardList, TaskType taskType, + bool modifyRequiresMasterEvaluation); + #endif /* MULTI_PHYSICAL_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 7c7fbd8bc..050a5cebc 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -27,13 +27,14 @@ extern bool EnableRouterExecution; extern DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query, - RelationRestrictionContext *restrictionContext); + PlannerRestrictionContext * + plannerRestrictionContext); extern DistributedPlan * CreateModifyPlan(Query *originalQuery, Query *query, PlannerRestrictionContext * plannerRestrictionContext); extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, - RelationRestrictionContext * - restrictionContext, + PlannerRestrictionContext * + plannerRestrictionContext, List **placementList, uint64 *anchorShardId, List **relationShardList, bool replacePrunedQueryWithDummy, @@ -45,7 +46,9 @@ extern List * TargetShardIntervalsForQuery(Query *query, extern List * WorkersContainingAllShards(List *prunedShardIntervalsList); extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList); extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, Query *originalQuery, - bool multiShardQuery); + bool multiShardQuery, + PlannerRestrictionContext * + plannerRestrictionContext); extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex); extern RelationRestrictionContext * CopyRelationRestrictionContext( RelationRestrictionContext *oldContext); @@ -58,6 +61,7 @@ extern bool IsMultiRowInsert(Query *query); extern void AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval); extern bool UpdateOrDeleteQuery(Query *query); +extern List * WorkersContainingAllShards(List *prunedShardIntervalsList); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/test/regress/expected/isolation_modify_with_subquery_vs_dml.out b/src/test/regress/expected/isolation_modify_with_subquery_vs_dml.out new file mode 100644 index 000000000..67a1e5dcb --- /dev/null +++ b/src/test/regress/expected/isolation_modify_with_subquery_vs_dml.out @@ -0,0 +1,127 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s2-begin s2-modify_with_subquery_v1 s1-insert_to_events_test_table s2-commit s1-commit +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s2-modify_with_subquery_v1: + UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; + +step s1-insert_to_events_test_table: + INSERT INTO events_test_table VALUES(4,6,8,10); + +step s2-commit: + COMMIT; + +step s1-insert_to_events_test_table: <... completed> +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s2-modify_with_subquery_v1 s1-update_events_test_table s2-commit s1-commit +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s2-modify_with_subquery_v1: + UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; + +step s1-update_events_test_table: + UPDATE users_test_table SET value_1 = 3; + +step s2-commit: + COMMIT; + +step s1-update_events_test_table: <... completed> +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s2-modify_with_subquery_v1 s1-delete_events_test_table s2-commit s1-commit +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s2-modify_with_subquery_v1: + UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; + +step s1-delete_events_test_table: + DELETE FROM events_test_table WHERE user_id = 1 or user_id = 3; + +step s2-commit: + COMMIT; + +step s1-delete_events_test_table: <... completed> +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-insert_to_events_test_table s2-modify_with_subquery_v1 s1-commit s2-commit +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-insert_to_events_test_table: + INSERT INTO events_test_table VALUES(4,6,8,10); + +step s2-modify_with_subquery_v1: + UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; + +step s1-commit: + COMMIT; + +step s2-modify_with_subquery_v1: <... completed> +step s2-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-update_events_test_table s2-modify_with_subquery_v1 s1-commit s2-commit +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-update_events_test_table: + UPDATE users_test_table SET value_1 = 3; + +step s2-modify_with_subquery_v1: + UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; + +step s1-commit: + COMMIT; + +step s2-modify_with_subquery_v1: <... completed> +step s2-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-delete_events_test_table s2-modify_with_subquery_v1 s1-commit s2-commit +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-delete_events_test_table: + DELETE FROM events_test_table WHERE user_id = 1 or user_id = 3; + +step s2-modify_with_subquery_v1: + UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; + +step s1-commit: + COMMIT; + +step s2-modify_with_subquery_v1: <... completed> +step s2-commit: + COMMIT; + diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 7c3fb4666..1b59c1b08 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -816,9 +816,42 @@ Custom Scan (Citus Router) Node: host=localhost port=57638 dbname=regression -> Delete on lineitem_hash_part_360044 lineitem_hash_part -> Seq Scan on lineitem_hash_part_360044 lineitem_hash_part +SET citus.explain_all_tasks TO off; +-- Test update with subquery +EXPLAIN (COSTS FALSE) + UPDATE lineitem_hash_part + SET l_suppkey = 12 + FROM orders_hash_part + WHERE orders_hash_part.o_orderkey = lineitem_hash_part.l_orderkey; +Custom Scan (Citus Router) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Update on lineitem_hash_part_360041 lineitem_hash_part + -> Hash Join + Hash Cond: (lineitem_hash_part.l_orderkey = orders_hash_part.o_orderkey) + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part + -> Hash + -> Seq Scan on orders_hash_part_360045 orders_hash_part +-- Test delete with subquery +EXPLAIN (COSTS FALSE) + DELETE FROM lineitem_hash_part + USING orders_hash_part + WHERE orders_hash_part.o_orderkey = lineitem_hash_part.l_orderkey; +Custom Scan (Citus Router) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Delete on lineitem_hash_part_360041 lineitem_hash_part + -> Hash Join + Hash Cond: (lineitem_hash_part.l_orderkey = orders_hash_part.o_orderkey) + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part + -> Hash + -> Seq Scan on orders_hash_part_360045 orders_hash_part -- Test track tracker SET citus.task_executor_type TO 'task-tracker'; -SET citus.explain_all_tasks TO off; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; Aggregate @@ -1128,7 +1161,7 @@ SELECT l_orderkey FROM series JOIN keys ON (s = l_orderkey) ORDER BY s; Custom Scan (Citus Router) Output: remote_scan.l_orderkey - -> Distributed Subplan 55_1 + -> Distributed Subplan 57_1 -> HashAggregate Output: remote_scan.l_orderkey Group Key: remote_scan.l_orderkey @@ -1143,7 +1176,7 @@ Custom Scan (Citus Router) Group Key: lineitem_hash_part.l_orderkey -> Seq Scan on public.lineitem_hash_part_360041 lineitem_hash_part Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment - -> Distributed Subplan 55_2 + -> Distributed Subplan 57_2 -> Function Scan on pg_catalog.generate_series s Output: s Function Call: generate_series(1, 10) @@ -1159,13 +1192,13 @@ Custom Scan (Citus Router) Sort Key: intermediate_result.s -> Function Scan on pg_catalog.read_intermediate_result intermediate_result Output: intermediate_result.s - Function Call: read_intermediate_result('55_2'::text, 'binary'::citus_copy_format) + Function Call: read_intermediate_result('57_2'::text, 'binary'::citus_copy_format) -> Sort Output: intermediate_result_1.l_orderkey Sort Key: intermediate_result_1.l_orderkey -> Function Scan on pg_catalog.read_intermediate_result intermediate_result_1 Output: intermediate_result_1.l_orderkey - Function Call: read_intermediate_result('55_1'::text, 'binary'::citus_copy_format) + Function Call: read_intermediate_result('57_1'::text, 'binary'::citus_copy_format) SELECT true AS valid FROM explain_json($$ WITH result AS ( SELECT l_quantity, count(*) count_quantity FROM lineitem diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index 0eef98c0a..645c20ac1 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -816,9 +816,42 @@ Custom Scan (Citus Router) Node: host=localhost port=57638 dbname=regression -> Delete on lineitem_hash_part_360044 lineitem_hash_part -> Seq Scan on lineitem_hash_part_360044 lineitem_hash_part +SET citus.explain_all_tasks TO off; +-- Test update with subquery +EXPLAIN (COSTS FALSE) + UPDATE lineitem_hash_part + SET l_suppkey = 12 + FROM orders_hash_part + WHERE orders_hash_part.o_orderkey = lineitem_hash_part.l_orderkey; +Custom Scan (Citus Router) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Update on lineitem_hash_part_360041 lineitem_hash_part + -> Hash Join + Hash Cond: (lineitem_hash_part.l_orderkey = orders_hash_part.o_orderkey) + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part + -> Hash + -> Seq Scan on orders_hash_part_360045 orders_hash_part +-- Test delete with subquery +EXPLAIN (COSTS FALSE) + DELETE FROM lineitem_hash_part + USING orders_hash_part + WHERE orders_hash_part.o_orderkey = lineitem_hash_part.l_orderkey; +Custom Scan (Citus Router) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Delete on lineitem_hash_part_360041 lineitem_hash_part + -> Hash Join + Hash Cond: (lineitem_hash_part.l_orderkey = orders_hash_part.o_orderkey) + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part + -> Hash + -> Seq Scan on orders_hash_part_360045 orders_hash_part -- Test track tracker SET citus.task_executor_type TO 'task-tracker'; -SET citus.explain_all_tasks TO off; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; Aggregate @@ -1128,7 +1161,7 @@ SELECT l_orderkey FROM series JOIN keys ON (s = l_orderkey) ORDER BY s; Custom Scan (Citus Router) Output: remote_scan.l_orderkey - -> Distributed Subplan 55_1 + -> Distributed Subplan 57_1 -> HashAggregate Output: remote_scan.l_orderkey Group Key: remote_scan.l_orderkey @@ -1143,7 +1176,7 @@ Custom Scan (Citus Router) Group Key: lineitem_hash_part.l_orderkey -> Seq Scan on public.lineitem_hash_part_360041 lineitem_hash_part Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment - -> Distributed Subplan 55_2 + -> Distributed Subplan 57_2 -> Function Scan on pg_catalog.generate_series s Output: s Function Call: generate_series(1, 10) @@ -1159,13 +1192,13 @@ Custom Scan (Citus Router) Sort Key: intermediate_result.s -> Function Scan on pg_catalog.read_intermediate_result intermediate_result Output: intermediate_result.s - Function Call: read_intermediate_result('55_2'::text, 'binary'::citus_copy_format) + Function Call: read_intermediate_result('57_2'::text, 'binary'::citus_copy_format) -> Sort Output: intermediate_result_1.l_orderkey Sort Key: intermediate_result_1.l_orderkey -> Function Scan on pg_catalog.read_intermediate_result intermediate_result_1 Output: intermediate_result_1.l_orderkey - Function Call: read_intermediate_result('55_1'::text, 'binary'::citus_copy_format) + Function Call: read_intermediate_result('57_1'::text, 'binary'::citus_copy_format) SELECT true AS valid FROM explain_json($$ WITH result AS ( SELECT l_quantity, count(*) count_quantity FROM lineitem diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index d18cc51f7..1b80c719e 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -964,12 +964,10 @@ SELECT * FROM summary_table ORDER BY id; -- unsupported multi-shard updates UPDATE summary_table SET average_value = average_query.average FROM ( SELECT avg(value) AS average FROM raw_table) average_query; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator UPDATE summary_table SET average_value = average_value + 1 WHERE id = (SELECT id FROM raw_table WHERE value > 100); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator -- test complex queries UPDATE summary_table SET @@ -1108,8 +1106,8 @@ SELECT master_modify_multiple_shards(' SELECT avg(value) AS average FROM raw_table WHERE id = 1 ) average_query WHERE id = 1'); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. +ERROR: cannot run multi shard modify query with master_modify_multiple_shards when the query involves subquery or join +DETAIL: Execute the query without using master_modify_multiple_shards() -- test connection API via using COPY -- COPY on SELECT part BEGIN; diff --git a/src/test/regress/expected/multi_shard_modify.out b/src/test/regress/expected/multi_shard_modify.out index 78eb43666..2c0a32c5b 100644 --- a/src/test/regress/expected/multi_shard_modify.out +++ b/src/test/regress/expected/multi_shard_modify.out @@ -84,8 +84,8 @@ SELECT master_create_worker_shards('temp_nations', 4, 2); (1 row) SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test USING temp_nations WHERE multi_shard_modify_test.t_value = temp_nations.key AND temp_nations.name = ''foobar'' '); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. +ERROR: cannot run multi shard modify query with master_modify_multiple_shards when the query involves subquery or join +DETAIL: Execute the query without using master_modify_multiple_shards() -- commands with a RETURNING clause are unsupported SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 3 RETURNING *'); ERROR: master_modify_multiple_shards() does not support RETURNING @@ -222,8 +222,8 @@ SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_key=3 ERROR: modifying the partition value of rows is not allowed -- UPDATEs with a FROM clause are unsupported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL'' FROM temp_nations WHERE multi_shard_modify_test.t_key = 3 AND multi_shard_modify_test.t_value = temp_nations.key AND temp_nations.name = ''dummy'' '); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. +ERROR: cannot run multi shard modify query with master_modify_multiple_shards when the query involves subquery or join +DETAIL: Execute the query without using master_modify_multiple_shards() -- commands with a RETURNING clause are unsupported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''FAIL'' WHERE t_key=4 RETURNING *'); ERROR: master_modify_multiple_shards() does not support RETURNING diff --git a/src/test/regress/expected/multi_shard_update_delete.out b/src/test/regress/expected/multi_shard_update_delete.out index 925d1e2f0..9efcdfe36 100644 --- a/src/test/regress/expected/multi_shard_update_delete.out +++ b/src/test/regress/expected/multi_shard_update_delete.out @@ -169,6 +169,10 @@ CREATE TABLE append_stage_table(id int, col_2 int); INSERT INTO append_stage_table VALUES(1,3); INSERT INTO append_stage_table VALUES(3,2); INSERT INTO append_stage_table VALUES(5,4); +CREATE TABLE append_stage_table_2(id int, col_2 int); +INSERT INTO append_stage_table_2 VALUES(8,3); +INSERT INTO append_stage_table_2 VALUES(9,2); +INSERT INTO append_stage_table_2 VALUES(10,4); CREATE TABLE test_append_table(id int, col_2 int); SELECT create_distributed_table('test_append_table','id','append'); create_distributed_table @@ -194,7 +198,7 @@ SELECT master_create_empty_shard('test_append_table') AS new_shard_id; 1440011 (1 row) -SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table', 'localhost', :master_port); +SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table_2', 'localhost', :master_port); master_append_table_to_shard ------------------------------ 0.00533333 @@ -204,15 +208,16 @@ UPDATE test_append_table SET col_2 = 5; SELECT * FROM test_append_table; id | col_2 ----+------- - 1 | 5 - 3 | 5 - 5 | 5 + 8 | 5 + 9 | 5 + 10 | 5 1 | 5 3 | 5 5 | 5 (6 rows) DROP TABLE append_stage_table; +DROP TABLE append_stage_table_2; DROP TABLE test_append_table; -- Update multi shard of partitioned distributed table SET citus.multi_shard_modify_mode to 'parallel'; @@ -317,43 +322,24 @@ UPDATE tt2 SET col_2 = 3 RETURNING id, col_2; (5 rows) DROP TABLE tt2; --- Multiple RTEs are not supported +-- Multiple RTEs are only supported if subquery is pushdownable SET citus.multi_shard_modify_mode to DEFAULT; -UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. -UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. -UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. -DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. -DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. -DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. --- Cursors are not supported -BEGIN; -DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table; -FETCH test_cursor; - user_id | value_1 | value_2 | value_3 ----------+---------+---------+--------- - 8 | 0 | 13 | 0 +-- To test colocation between tables in modify query +SET citus.shard_count to 6; +CREATE TABLE events_test_table_2 (user_id int, value_1 int, value_2 int, value_3 int); +SELECT create_distributed_table('events_test_table_2', 'user_id'); + create_distributed_table +-------------------------- + (1 row) -UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor; -ERROR: cannot run DML queries with cursors -ROLLBACK; --- Stable functions are supported +\COPY events_test_table_2 FROM STDIN DELIMITER AS ','; +CREATE TABLE events_test_table_local (user_id int, value_1 int, value_2 int, value_3 int); +\COPY events_test_table_local FROM STDIN DELIMITER AS ','; CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int); INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5); INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7); -INSERT INTO test_table_1 VALUES(3, '2011-01-12 08:35:19', 9); +INSERT INTO test_table_1 VALUES(3, '2111-01-12 08:35:19', 9); SELECT create_distributed_table('test_table_1', 'id'); NOTICE: Copying data from local table... create_distributed_table @@ -361,11 +347,416 @@ NOTICE: Copying data from local table... (1 row) +-- We can pushdown query if there is partition key equality +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; +DELETE FROM users_test_table +USING events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; +UPDATE users_test_table +SET value_1 = 3 +WHERE user_id IN (SELECT user_id + FROM events_test_table); +DELETE FROM users_test_table +WHERE user_id IN (SELECT user_id + FROM events_test_table); +DELETE FROM events_test_table_2 +WHERE now() > (SELECT max(date_col) + FROM test_table_1 + WHERE test_table_1.id = events_test_table_2.user_id + GROUP BY id) +RETURNING *; + user_id | value_1 | value_2 | value_3 +---------+---------+---------+--------- + 1 | 5 | 7 | 7 + 1 | 20 | 12 | 25 + 1 | 60 | 17 | 17 +(3 rows) + +UPDATE users_test_table +SET value_1 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id + AND events_test_table.user_id > 5; +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + UNION + SELECT user_id + FROM events_test_table); +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + UNION + SELECT user_id + FROM events_test_table) returning *; + user_id | value_1 | value_2 | value_3 +---------+---------+---------+--------- + 8 | 4 | 13 | 0 + 20 | 4 | | 0 + 20 | 4 | | 0 + 20 | 4 | | 0 + 4 | 4 | 9 | 0 + 4 | 4 | 17 | 0 + 16 | 4 | | 0 + 6 | 4 | 11 | 0 + 6 | 4 | 15 | 0 + 2 | 4 | 7 | 0 + 2 | 4 | 19 | 0 +(11 rows) + +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + UNION ALL + SELECT user_id + FROM events_test_table) returning *; + user_id | value_1 | value_2 | value_3 +---------+---------+---------+--------- + 8 | 4 | 13 | 0 + 20 | 4 | | 0 + 20 | 4 | | 0 + 20 | 4 | | 0 + 4 | 4 | 9 | 0 + 4 | 4 | 17 | 0 + 16 | 4 | | 0 + 6 | 4 | 11 | 0 + 6 | 4 | 15 | 0 + 2 | 4 | 7 | 0 + 2 | 4 | 19 | 0 +(11 rows) + +UPDATE users_test_table +SET value_1 = 5 +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_test_table + WHERE + users_test_table.user_id = events_test_table.user_id + GROUP BY + user_id + ); +UPDATE users_test_table +SET value_3 = 1 +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_test_table + WHERE + users_test_table.user_id = events_test_table.user_id AND + users_test_table.value_2 > events_test_table.value_2 + GROUP BY + user_id + ); +UPDATE users_test_table +SET value_2 = 4 +WHERE + value_1 > 1 AND value_1 < 3 + AND value_2 >= 1 + AND user_id IN + ( + SELECT + e1.user_id + FROM ( + SELECT + user_id, + 1 AS view_homepage + FROM events_test_table + WHERE + value_1 IN (0, 1) + ) e1 LEFT JOIN LATERAL ( + SELECT + user_id, + 1 AS use_demo + FROM events_test_table + WHERE + user_id = e1.user_id + ) e2 ON true +); +UPDATE users_test_table +SET value_3 = 5 +WHERE value_2 IN (SELECT AVG(value_1) OVER (PARTITION BY user_id) FROM events_test_table WHERE events_test_table.user_id = users_test_table.user_id); +-- Test it within transaction +BEGIN; +INSERT INTO users_test_table +SELECT * FROM events_test_table +WHERE events_test_table.user_id = 1 OR events_test_table.user_id = 5; +SELECT SUM(value_2) FROM users_test_table; + sum +----- + 169 +(1 row) + +UPDATE users_test_table +SET value_2 = 1 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; +SELECT SUM(value_2) FROM users_test_table; + sum +----- + 97 +(1 row) + +COMMIT; +-- Test with schema +CREATE SCHEMA sec_schema; +CREATE TABLE sec_schema.tt1(id int, value_1 int); +SELECT create_distributed_table('sec_schema.tt1','id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO sec_schema.tt1 values(1,1),(2,2),(7,7),(9,9); +UPDATE sec_schema.tt1 +SET value_1 = 11 +WHERE id < (SELECT max(value_2) FROM events_test_table_2 + WHERE sec_schema.tt1.id = events_test_table_2.user_id + GROUP BY user_id) +RETURNING *; + id | value_1 +----+--------- + 7 | 11 + 9 | 11 +(2 rows) + +DROP SCHEMA sec_schema CASCADE; +NOTICE: drop cascades to table sec_schema.tt1 +-- We don't need partition key equality with reference tables +UPDATE events_test_table +SET value_2 = 5 +FROM users_reference_copy_table +WHERE users_reference_copy_table.user_id = events_test_table.value_1; +-- Both reference tables and hash distributed tables can be used in subquery +UPDATE events_test_table as ett +SET value_2 = 6 +WHERE ett.value_3 IN (SELECT utt.value_3 + FROM users_test_table as utt, users_reference_copy_table as uct + WHERE utt.user_id = uct.user_id AND utt.user_id = ett.user_id); +-- We don't need equality check with constant values in sub-select +UPDATE users_reference_copy_table +SET value_2 = 6 +WHERE user_id IN (SELECT 2); +UPDATE users_reference_copy_table +SET value_2 = 6 +WHERE value_1 IN (SELECT 2); +UPDATE users_test_table +SET value_2 = 6 +WHERE user_id IN (SELECT 2); +UPDATE users_test_table +SET value_2 = 6 +WHERE value_1 IN (SELECT 2); +-- Can only use immutable functions +UPDATE test_table_1 +SET col_3 = 6 +WHERE date_col IN (SELECT now()); +ERROR: cannot push down this subquery +DETAIL: Subqueries without a FROM clause can only contain immutable functions +-- Test with prepared statements +SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0; + count +------- + 0 +(1 row) + +PREPARE foo_plan_2(int,int) AS UPDATE users_test_table + SET value_1 = $1, value_3 = $2 + FROM events_test_table + WHERE users_test_table.user_id = events_test_table.user_id; +EXECUTE foo_plan_2(1,5); +EXECUTE foo_plan_2(3,15); +EXECUTE foo_plan_2(5,25); +EXECUTE foo_plan_2(7,35); +EXECUTE foo_plan_2(9,45); +EXECUTE foo_plan_2(0,0); +SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0; + count +------- + 6 +(1 row) + +-- Test with varying WHERE expressions +UPDATE users_test_table +SET value_1 = 7 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id OR FALSE; +UPDATE users_test_table +SET value_1 = 7 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id AND TRUE; +-- Test with inactive shard-placement +-- manually set shardstate of one placement of users_test_table as inactive +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440000; +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; +ERROR: cannot find a worker that has active placements for all shards in the query +-- manually set shardstate of one placement of events_test_table as inactive +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440004; +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; +ERROR: cannot find a worker that has active placements for all shards in the query +UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440000; +UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440004; +-- Subquery must return single value to use it with comparison operators +UPDATE users_test_table as utt +SET value_1 = 3 +WHERE value_2 > (SELECT value_3 FROM events_test_table as ett WHERE utt.user_id = ett.user_id); +ERROR: more than one row returned by a subquery used as an expression +CONTEXT: while executing command on localhost:57637 +-- We can not pushdown a query if the target relation is reference table +UPDATE users_reference_copy_table +SET value_2 = 5 +FROM events_test_table +WHERE users_reference_copy_table.user_id = events_test_table.user_id; +ERROR: only reference tables may be queried when targeting a reference table with multi shard UPDATE/DELETE queries with multiple tables +-- We cannot push down it if the query has outer join and using +UPDATE events_test_table +SET value_2 = users_test_table.user_id +FROM users_test_table +FULL OUTER JOIN events_test_table e2 USING (user_id) +WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2; +ERROR: a join with USING causes an internal naming conflict, use ON instead +-- We can not pushdown query if there is no partition key equality +UPDATE users_test_table +SET value_1 = 1 +WHERE user_id IN (SELECT Count(value_1) + FROM events_test_table + GROUP BY user_id); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +UPDATE users_test_table +SET value_1 = (SELECT Count(*) + FROM events_test_table); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + UNION + SELECT value_1 + FROM events_test_table); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + INTERSECT + SELECT Sum(value_1) + FROM events_test_table + GROUP BY user_id); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +UPDATE users_test_table +SET value_2 = (SELECT value_3 + FROM users_test_table); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +UPDATE users_test_table +SET value_2 = 2 +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_test_table + WHERE + users_test_table.user_id > events_test_table.user_id AND + users_test_table.value_1 = events_test_table.value_1 + GROUP BY + user_id + ); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +UPDATE users_test_table +SET (value_1, value_2) = (2,1) +WHERE user_id IN + (SELECT user_id + FROM users_test_table + INTERSECT + SELECT user_id + FROM events_test_table); +ERROR: cannot push down this subquery +DETAIL: Intersect and Except are currently unsupported +-- Reference tables can not locate on the outer part of the outer join +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN + (SELECT DISTINCT e2.user_id + FROM users_reference_copy_table + LEFT JOIN users_test_table e2 ON (e2.user_id = users_reference_copy_table.value_1)) RETURNING *; +ERROR: cannot pushdown the subquery +DETAIL: There exist a reference table in the outer part of the outer join +-- Volatile functions are also not supported +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id * random(); +ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE +UPDATE users_test_table +SET value_2 = 5 * random() +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; +ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE +UPDATE users_test_table +SET value_2 = 5 +WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +-- Local tables are not supported +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table_local +WHERE users_test_table.user_id = events_test_table_local.user_id; +ERROR: relation events_test_table_local is not distributed +UPDATE users_test_table +SET value_2 = 5 +WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local); +ERROR: relation events_test_table_local is not distributed +UPDATE events_test_table_local +SET value_2 = 5 +FROM users_test_table +WHERE events_test_table_local.user_id = users_test_table.user_id; +ERROR: relation events_test_table_local is not distributed +-- Shard counts of tables must be equal to pushdown the query +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table_2 +WHERE users_test_table.user_id = events_test_table_2.user_id; +ERROR: cannot push down this subquery +DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning +-- Should error out due to multiple row return from subquery, but we can not get this information within +-- subquery pushdown planner. This query will be sent to worker with recursive planner. +DELETE FROM users_test_table +WHERE users_test_table.user_id = (SELECT user_id + FROM events_test_table); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +-- Cursors are not supported +BEGIN; +DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table; +FETCH test_cursor; + user_id | value_1 | value_2 | value_3 +---------+---------+---------+--------- + 8 | 4 | 13 | 0 +(1 row) + +UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor; +ERROR: cannot run DML queries with cursors +ROLLBACK; +-- Stable functions are supported SELECT * FROM test_table_1; id | date_col | col_3 ----+------------------------------+------- 1 | Sat Apr 05 08:32:12 2014 PDT | 5 - 3 | Wed Jan 12 08:35:19 2011 PST | 9 + 3 | Mon Jan 12 08:35:19 2111 PST | 9 2 | Sun Feb 01 08:31:16 2015 PST | 7 (3 rows) @@ -374,15 +765,16 @@ SELECT * FROM test_table_1; id | date_col | col_3 ----+------------------------------+------- 1 | Sat Apr 05 08:32:12 2014 PDT | 3 - 3 | Wed Jan 12 08:35:19 2011 PST | 3 + 3 | Mon Jan 12 08:35:19 2111 PST | 9 2 | Sun Feb 01 08:31:16 2015 PST | 3 (3 rows) DELETE FROM test_table_1 WHERE date_col < current_timestamp; SELECT * FROM test_table_1; - id | date_col | col_3 -----+----------+------- -(0 rows) + id | date_col | col_3 +----+------------------------------+------- + 3 | Mon Jan 12 08:35:19 2111 PST | 9 +(1 row) DROP TABLE test_table_1; -- Volatile functions are not supported diff --git a/src/test/regress/expected/multi_shard_update_delete_0.out b/src/test/regress/expected/multi_shard_update_delete_0.out index 26cc435eb..44fcc535c 100644 --- a/src/test/regress/expected/multi_shard_update_delete_0.out +++ b/src/test/regress/expected/multi_shard_update_delete_0.out @@ -169,6 +169,10 @@ CREATE TABLE append_stage_table(id int, col_2 int); INSERT INTO append_stage_table VALUES(1,3); INSERT INTO append_stage_table VALUES(3,2); INSERT INTO append_stage_table VALUES(5,4); +CREATE TABLE append_stage_table_2(id int, col_2 int); +INSERT INTO append_stage_table_2 VALUES(8,3); +INSERT INTO append_stage_table_2 VALUES(9,2); +INSERT INTO append_stage_table_2 VALUES(10,4); CREATE TABLE test_append_table(id int, col_2 int); SELECT create_distributed_table('test_append_table','id','append'); create_distributed_table @@ -194,7 +198,7 @@ SELECT master_create_empty_shard('test_append_table') AS new_shard_id; 1440011 (1 row) -SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table', 'localhost', :master_port); +SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table_2', 'localhost', :master_port); master_append_table_to_shard ------------------------------ 0.00533333 @@ -204,15 +208,16 @@ UPDATE test_append_table SET col_2 = 5; SELECT * FROM test_append_table; id | col_2 ----+------- - 1 | 5 - 3 | 5 - 5 | 5 + 8 | 5 + 9 | 5 + 10 | 5 1 | 5 3 | 5 5 | 5 (6 rows) DROP TABLE append_stage_table; +DROP TABLE append_stage_table_2; DROP TABLE test_append_table; -- Update multi shard of partitioned distributed table SET citus.multi_shard_modify_mode to 'parallel'; @@ -340,43 +345,24 @@ UPDATE tt2 SET col_2 = 3 RETURNING id, col_2; (5 rows) DROP TABLE tt2; --- Multiple RTEs are not supported +-- Multiple RTEs are only supported if subquery is pushdownable SET citus.multi_shard_modify_mode to DEFAULT; -UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. -UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. -UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. -DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. -DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. -DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. --- Cursors are not supported -BEGIN; -DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table; -FETCH test_cursor; - user_id | value_1 | value_2 | value_3 ----------+---------+---------+--------- - 8 | 0 | 13 | 0 +-- To test colocation between tables in modify query +SET citus.shard_count to 6; +CREATE TABLE events_test_table_2 (user_id int, value_1 int, value_2 int, value_3 int); +SELECT create_distributed_table('events_test_table_2', 'user_id'); + create_distributed_table +-------------------------- + (1 row) -UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor; -ERROR: cannot run DML queries with cursors -ROLLBACK; --- Stable functions are supported +\COPY events_test_table_2 FROM STDIN DELIMITER AS ','; +CREATE TABLE events_test_table_local (user_id int, value_1 int, value_2 int, value_3 int); +\COPY events_test_table_local FROM STDIN DELIMITER AS ','; CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int); INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5); INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7); -INSERT INTO test_table_1 VALUES(3, '2011-01-12 08:35:19', 9); +INSERT INTO test_table_1 VALUES(3, '2111-01-12 08:35:19', 9); SELECT create_distributed_table('test_table_1', 'id'); NOTICE: Copying data from local table... create_distributed_table @@ -384,11 +370,416 @@ NOTICE: Copying data from local table... (1 row) +-- We can pushdown query if there is partition key equality +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; +DELETE FROM users_test_table +USING events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; +UPDATE users_test_table +SET value_1 = 3 +WHERE user_id IN (SELECT user_id + FROM events_test_table); +DELETE FROM users_test_table +WHERE user_id IN (SELECT user_id + FROM events_test_table); +DELETE FROM events_test_table_2 +WHERE now() > (SELECT max(date_col) + FROM test_table_1 + WHERE test_table_1.id = events_test_table_2.user_id + GROUP BY id) +RETURNING *; + user_id | value_1 | value_2 | value_3 +---------+---------+---------+--------- + 1 | 5 | 7 | 7 + 1 | 20 | 12 | 25 + 1 | 60 | 17 | 17 +(3 rows) + +UPDATE users_test_table +SET value_1 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id + AND events_test_table.user_id > 5; +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + UNION + SELECT user_id + FROM events_test_table); +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + UNION + SELECT user_id + FROM events_test_table) returning *; + user_id | value_1 | value_2 | value_3 +---------+---------+---------+--------- + 8 | 4 | 13 | 0 + 20 | 4 | | 0 + 20 | 4 | | 0 + 20 | 4 | | 0 + 4 | 4 | 9 | 0 + 4 | 4 | 17 | 0 + 16 | 4 | | 0 + 6 | 4 | 11 | 0 + 6 | 4 | 15 | 0 + 2 | 4 | 7 | 0 + 2 | 4 | 19 | 0 +(11 rows) + +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + UNION ALL + SELECT user_id + FROM events_test_table) returning *; + user_id | value_1 | value_2 | value_3 +---------+---------+---------+--------- + 8 | 4 | 13 | 0 + 20 | 4 | | 0 + 20 | 4 | | 0 + 20 | 4 | | 0 + 4 | 4 | 9 | 0 + 4 | 4 | 17 | 0 + 16 | 4 | | 0 + 6 | 4 | 11 | 0 + 6 | 4 | 15 | 0 + 2 | 4 | 7 | 0 + 2 | 4 | 19 | 0 +(11 rows) + +UPDATE users_test_table +SET value_1 = 5 +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_test_table + WHERE + users_test_table.user_id = events_test_table.user_id + GROUP BY + user_id + ); +UPDATE users_test_table +SET value_3 = 1 +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_test_table + WHERE + users_test_table.user_id = events_test_table.user_id AND + users_test_table.value_2 > events_test_table.value_2 + GROUP BY + user_id + ); +UPDATE users_test_table +SET value_2 = 4 +WHERE + value_1 > 1 AND value_1 < 3 + AND value_2 >= 1 + AND user_id IN + ( + SELECT + e1.user_id + FROM ( + SELECT + user_id, + 1 AS view_homepage + FROM events_test_table + WHERE + value_1 IN (0, 1) + ) e1 LEFT JOIN LATERAL ( + SELECT + user_id, + 1 AS use_demo + FROM events_test_table + WHERE + user_id = e1.user_id + ) e2 ON true +); +UPDATE users_test_table +SET value_3 = 5 +WHERE value_2 IN (SELECT AVG(value_1) OVER (PARTITION BY user_id) FROM events_test_table WHERE events_test_table.user_id = users_test_table.user_id); +-- Test it within transaction +BEGIN; +INSERT INTO users_test_table +SELECT * FROM events_test_table +WHERE events_test_table.user_id = 1 OR events_test_table.user_id = 5; +SELECT SUM(value_2) FROM users_test_table; + sum +----- + 169 +(1 row) + +UPDATE users_test_table +SET value_2 = 1 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; +SELECT SUM(value_2) FROM users_test_table; + sum +----- + 97 +(1 row) + +COMMIT; +-- Test with schema +CREATE SCHEMA sec_schema; +CREATE TABLE sec_schema.tt1(id int, value_1 int); +SELECT create_distributed_table('sec_schema.tt1','id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO sec_schema.tt1 values(1,1),(2,2),(7,7),(9,9); +UPDATE sec_schema.tt1 +SET value_1 = 11 +WHERE id < (SELECT max(value_2) FROM events_test_table_2 + WHERE sec_schema.tt1.id = events_test_table_2.user_id + GROUP BY user_id) +RETURNING *; + id | value_1 +----+--------- + 7 | 11 + 9 | 11 +(2 rows) + +DROP SCHEMA sec_schema CASCADE; +NOTICE: drop cascades to table sec_schema.tt1 +-- We don't need partition key equality with reference tables +UPDATE events_test_table +SET value_2 = 5 +FROM users_reference_copy_table +WHERE users_reference_copy_table.user_id = events_test_table.value_1; +-- Both reference tables and hash distributed tables can be used in subquery +UPDATE events_test_table as ett +SET value_2 = 6 +WHERE ett.value_3 IN (SELECT utt.value_3 + FROM users_test_table as utt, users_reference_copy_table as uct + WHERE utt.user_id = uct.user_id AND utt.user_id = ett.user_id); +-- We don't need equality check with constant values in sub-select +UPDATE users_reference_copy_table +SET value_2 = 6 +WHERE user_id IN (SELECT 2); +UPDATE users_reference_copy_table +SET value_2 = 6 +WHERE value_1 IN (SELECT 2); +UPDATE users_test_table +SET value_2 = 6 +WHERE user_id IN (SELECT 2); +UPDATE users_test_table +SET value_2 = 6 +WHERE value_1 IN (SELECT 2); +-- Can only use immutable functions +UPDATE test_table_1 +SET col_3 = 6 +WHERE date_col IN (SELECT now()); +ERROR: cannot push down this subquery +DETAIL: Subqueries without a FROM clause can only contain immutable functions +-- Test with prepared statements +SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0; + count +------- + 0 +(1 row) + +PREPARE foo_plan_2(int,int) AS UPDATE users_test_table + SET value_1 = $1, value_3 = $2 + FROM events_test_table + WHERE users_test_table.user_id = events_test_table.user_id; +EXECUTE foo_plan_2(1,5); +EXECUTE foo_plan_2(3,15); +EXECUTE foo_plan_2(5,25); +EXECUTE foo_plan_2(7,35); +EXECUTE foo_plan_2(9,45); +EXECUTE foo_plan_2(0,0); +SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0; + count +------- + 6 +(1 row) + +-- Test with varying WHERE expressions +UPDATE users_test_table +SET value_1 = 7 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id OR FALSE; +UPDATE users_test_table +SET value_1 = 7 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id AND TRUE; +-- Test with inactive shard-placement +-- manually set shardstate of one placement of users_test_table as inactive +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440000; +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; +ERROR: cannot find a worker that has active placements for all shards in the query +-- manually set shardstate of one placement of events_test_table as inactive +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440004; +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; +ERROR: cannot find a worker that has active placements for all shards in the query +UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440000; +UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440004; +-- Subquery must return single value to use it with comparison operators +UPDATE users_test_table as utt +SET value_1 = 3 +WHERE value_2 > (SELECT value_3 FROM events_test_table as ett WHERE utt.user_id = ett.user_id); +ERROR: more than one row returned by a subquery used as an expression +CONTEXT: while executing command on localhost:57637 +-- We can not pushdown a query if the target relation is reference table +UPDATE users_reference_copy_table +SET value_2 = 5 +FROM events_test_table +WHERE users_reference_copy_table.user_id = events_test_table.user_id; +ERROR: only reference tables may be queried when targeting a reference table with multi shard UPDATE/DELETE queries with multiple tables +-- We cannot push down it if the query has outer join and using +UPDATE events_test_table +SET value_2 = users_test_table.user_id +FROM users_test_table +FULL OUTER JOIN events_test_table e2 USING (user_id) +WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2; +ERROR: a join with USING causes an internal naming conflict, use ON instead +-- We can not pushdown query if there is no partition key equality +UPDATE users_test_table +SET value_1 = 1 +WHERE user_id IN (SELECT Count(value_1) + FROM events_test_table + GROUP BY user_id); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +UPDATE users_test_table +SET value_1 = (SELECT Count(*) + FROM events_test_table); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + UNION + SELECT value_1 + FROM events_test_table); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + INTERSECT + SELECT Sum(value_1) + FROM events_test_table + GROUP BY user_id); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +UPDATE users_test_table +SET value_2 = (SELECT value_3 + FROM users_test_table); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +UPDATE users_test_table +SET value_2 = 2 +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_test_table + WHERE + users_test_table.user_id > events_test_table.user_id AND + users_test_table.value_1 = events_test_table.value_1 + GROUP BY + user_id + ); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +UPDATE users_test_table +SET (value_1, value_2) = (2,1) +WHERE user_id IN + (SELECT user_id + FROM users_test_table + INTERSECT + SELECT user_id + FROM events_test_table); +ERROR: cannot push down this subquery +DETAIL: Intersect and Except are currently unsupported +-- Reference tables can not locate on the outer part of the outer join +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN + (SELECT DISTINCT e2.user_id + FROM users_reference_copy_table + LEFT JOIN users_test_table e2 ON (e2.user_id = users_reference_copy_table.value_1)) RETURNING *; +ERROR: cannot pushdown the subquery +DETAIL: There exist a reference table in the outer part of the outer join +-- Volatile functions are also not supported +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id * random(); +ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE +UPDATE users_test_table +SET value_2 = 5 * random() +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; +ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE +UPDATE users_test_table +SET value_2 = 5 +WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +-- Local tables are not supported +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table_local +WHERE users_test_table.user_id = events_test_table_local.user_id; +ERROR: relation events_test_table_local is not distributed +UPDATE users_test_table +SET value_2 = 5 +WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local); +ERROR: relation events_test_table_local is not distributed +UPDATE events_test_table_local +SET value_2 = 5 +FROM users_test_table +WHERE events_test_table_local.user_id = users_test_table.user_id; +ERROR: relation events_test_table_local is not distributed +-- Shard counts of tables must be equal to pushdown the query +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table_2 +WHERE users_test_table.user_id = events_test_table_2.user_id; +ERROR: cannot push down this subquery +DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning +-- Should error out due to multiple row return from subquery, but we can not get this information within +-- subquery pushdown planner. This query will be sent to worker with recursive planner. +DELETE FROM users_test_table +WHERE users_test_table.user_id = (SELECT user_id + FROM events_test_table); +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +-- Cursors are not supported +BEGIN; +DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table; +FETCH test_cursor; + user_id | value_1 | value_2 | value_3 +---------+---------+---------+--------- + 8 | 4 | 13 | 0 +(1 row) + +UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor; +ERROR: cannot run DML queries with cursors +ROLLBACK; +-- Stable functions are supported SELECT * FROM test_table_1; id | date_col | col_3 ----+------------------------------+------- 1 | Sat Apr 05 08:32:12 2014 PDT | 5 - 3 | Wed Jan 12 08:35:19 2011 PST | 9 + 3 | Mon Jan 12 08:35:19 2111 PST | 9 2 | Sun Feb 01 08:31:16 2015 PST | 7 (3 rows) @@ -397,15 +788,16 @@ SELECT * FROM test_table_1; id | date_col | col_3 ----+------------------------------+------- 1 | Sat Apr 05 08:32:12 2014 PDT | 3 - 3 | Wed Jan 12 08:35:19 2011 PST | 3 + 3 | Mon Jan 12 08:35:19 2111 PST | 9 2 | Sun Feb 01 08:31:16 2015 PST | 3 (3 rows) DELETE FROM test_table_1 WHERE date_col < current_timestamp; SELECT * FROM test_table_1; - id | date_col | col_3 -----+----------+------- -(0 rows) + id | date_col | col_3 +----+------------------------------+------- + 3 | Mon Jan 12 08:35:19 2111 PST | 9 +(1 row) DROP TABLE test_table_1; -- Volatile functions are not supported diff --git a/src/test/regress/expected/with_modifying.out b/src/test/regress/expected/with_modifying.out index 39051a912..8c7bf4fc0 100644 --- a/src/test/regress/expected/with_modifying.out +++ b/src/test/regress/expected/with_modifying.out @@ -378,6 +378,8 @@ raw_data AS ( DELETE FROM modify_table WHERE id >= (SELECT min(id) FROM select_data WHERE id > 10) RETURNING * ) INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; +ERROR: cannot push down this subquery +DETAIL: Aggregates without group by are currently unsupported when a subquery references a column from another query INSERT INTO modify_table VALUES (21, 1), (22, 2), (23, 3); -- read ids from the same table WITH distinct_ids AS ( @@ -390,7 +392,7 @@ update_data AS ( SELECT count(*) FROM update_data; count ------- - 3 + 6 (1 row) -- read ids from a different table @@ -418,7 +420,7 @@ WITH update_data AS ( SELECT COUNT(*) FROM update_data; count ------- - 1 + 2 (1 row) WITH delete_rows AS ( @@ -427,10 +429,13 @@ WITH delete_rows AS ( SELECT * FROM delete_rows ORDER BY id, val; id | val ----+----- + 11 | 100 + 12 | 300 + 13 | 100 21 | 300 22 | 200 23 | 100 -(3 rows) +(6 rows) WITH delete_rows AS ( DELETE FROM summary_table WHERE id > 10 RETURNING * @@ -438,10 +443,7 @@ WITH delete_rows AS ( SELECT * FROM delete_rows ORDER BY id, counter; id | counter ----+--------- - 11 | 1 - 12 | 1 - 13 | 1 -(3 rows) +(0 rows) -- Check modifiying CTEs inside a transaction BEGIN; @@ -526,8 +528,11 @@ WITH deleted_rows AS ( DELETE FROM modify_table WHERE id IN (SELECT id FROM modify_table WHERE val = 4) RETURNING * ) SELECT * FROM deleted_rows; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. + id | val +----+----- + 2 | 4 +(1 row) + WITH select_rows AS ( SELECT id FROM modify_table WHERE val = 4 ), @@ -537,15 +542,13 @@ deleted_rows AS ( SELECT * FROM deleted_rows; id | val ----+----- - 2 | 4 -(1 row) +(0 rows) WITH deleted_rows AS ( DELETE FROM modify_table WHERE val IN (SELECT val FROM modify_table WHERE id = 3) RETURNING * ) SELECT * FROM deleted_rows; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator WITH select_rows AS ( SELECT val FROM modify_table WHERE id = 3 ), @@ -562,8 +565,7 @@ WITH deleted_rows AS ( DELETE FROM modify_table WHERE ctid IN (SELECT ctid FROM modify_table WHERE id = 1) RETURNING * ) SELECT * FROM deleted_rows; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator WITH select_rows AS ( SELECT ctid FROM modify_table WHERE id = 1 ), @@ -644,8 +646,7 @@ raw_data AS ( RETURNING id, val ) SELECT * FROM raw_data ORDER BY val; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Joins are not supported in distributed modifications. +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator -- Test with replication factor 2 SET citus.shard_replication_factor to 2; DROP TABLE modify_table; diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 220f24db6..51d9bee3b 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -25,6 +25,7 @@ test: isolation_create_restore_point test: isolation_create_distributed_table isolation_master_append_table isolation_master_apply_delete test: isolation_multi_shard_modify_vs_all +test: isolation_modify_with_subquery_vs_dml test: isolation_hash_copy_vs_all test: isolation_append_copy_vs_all test: isolation_range_copy_vs_all diff --git a/src/test/regress/specs/isolation_modify_with_subquery_vs_dml.spec b/src/test/regress/specs/isolation_modify_with_subquery_vs_dml.spec new file mode 100644 index 000000000..dbef78248 --- /dev/null +++ b/src/test/regress/specs/isolation_modify_with_subquery_vs_dml.spec @@ -0,0 +1,91 @@ +setup +{ + SET citus.shard_replication_factor to 2; + + CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int); + SELECT create_distributed_table('users_test_table', 'user_id'); + INSERT INTO users_test_table VALUES + (1, 5, 6, 7), + (2, 12, 7, 18), + (3, 23, 8, 25), + (4, 42, 9, 23), + (5, 35, 10, 17), + (6, 21, 11, 25), + (7, 27, 12, 18); + + CREATE TABLE events_test_table (user_id int, value_1 int, value_2 int, value_3 int); + SELECT create_distributed_table('events_test_table', 'user_id'); + INSERT INTO events_test_table VALUES + (1, 5, 7, 7), + (3, 11, 78, 18), + (5, 22, 9, 25), + (7, 41, 10, 23), + (1, 20, 12, 25), + (3, 26, 13, 18), + (5, 17, 14, 4); +} + +teardown +{ + DROP TABLE users_test_table; + DROP TABLE events_test_table; + SET citus.shard_replication_factor to 1; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-insert_to_events_test_table" +{ + INSERT INTO events_test_table VALUES(4,6,8,10); +} + +step "s1-update_events_test_table" +{ + UPDATE users_test_table SET value_1 = 3; +} + +step "s1-delete_events_test_table" +{ + DELETE FROM events_test_table WHERE user_id = 1 or user_id = 3; +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-modify_with_subquery_v1" +{ + UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; +} + +step "s2-modify_with_subquery_v2" +{ + UPDATE users_test_table SET value_1 = 3 WHERE user_id IN (SELECT user_id FROM events_test_table); +} + +step "s2-commit" +{ + COMMIT; +} + +# tests to check locks on subqueries are taken +permutation "s1-begin" "s2-begin" "s2-modify_with_subquery_v1" "s1-insert_to_events_test_table" "s2-commit" "s1-commit" +permutation "s1-begin" "s2-begin" "s2-modify_with_subquery_v1" "s1-update_events_test_table" "s2-commit" "s1-commit" +permutation "s1-begin" "s2-begin" "s2-modify_with_subquery_v1" "s1-delete_events_test_table" "s2-commit" "s1-commit" +permutation "s1-begin" "s2-begin" "s1-insert_to_events_test_table" "s2-modify_with_subquery_v1" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-update_events_test_table" "s2-modify_with_subquery_v1" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-delete_events_test_table" "s2-modify_with_subquery_v1" "s1-commit" "s2-commit" + diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index 01e82c56b..e526ec2de 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -392,9 +392,23 @@ EXPLAIN (COSTS FALSE) EXPLAIN (COSTS FALSE) DELETE FROM lineitem_hash_part; +SET citus.explain_all_tasks TO off; + +-- Test update with subquery +EXPLAIN (COSTS FALSE) + UPDATE lineitem_hash_part + SET l_suppkey = 12 + FROM orders_hash_part + WHERE orders_hash_part.o_orderkey = lineitem_hash_part.l_orderkey; + +-- Test delete with subquery +EXPLAIN (COSTS FALSE) + DELETE FROM lineitem_hash_part + USING orders_hash_part + WHERE orders_hash_part.o_orderkey = lineitem_hash_part.l_orderkey; + -- Test track tracker SET citus.task_executor_type TO 'task-tracker'; -SET citus.explain_all_tasks TO off; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; diff --git a/src/test/regress/sql/multi_shard_update_delete.sql b/src/test/regress/sql/multi_shard_update_delete.sql index a358f2120..5f829b666 100644 --- a/src/test/regress/sql/multi_shard_update_delete.sql +++ b/src/test/regress/sql/multi_shard_update_delete.sql @@ -135,16 +135,22 @@ INSERT INTO append_stage_table VALUES(1,3); INSERT INTO append_stage_table VALUES(3,2); INSERT INTO append_stage_table VALUES(5,4); +CREATE TABLE append_stage_table_2(id int, col_2 int); +INSERT INTO append_stage_table_2 VALUES(8,3); +INSERT INTO append_stage_table_2 VALUES(9,2); +INSERT INTO append_stage_table_2 VALUES(10,4); + CREATE TABLE test_append_table(id int, col_2 int); SELECT create_distributed_table('test_append_table','id','append'); SELECT master_create_empty_shard('test_append_table'); SELECT * FROM master_append_table_to_shard(1440010, 'append_stage_table', 'localhost', :master_port); SELECT master_create_empty_shard('test_append_table') AS new_shard_id; -SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table', 'localhost', :master_port); +SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table_2', 'localhost', :master_port); UPDATE test_append_table SET col_2 = 5; SELECT * FROM test_append_table; DROP TABLE append_stage_table; +DROP TABLE append_stage_table_2; DROP TABLE test_append_table; -- Update multi shard of partitioned distributed table @@ -203,15 +209,407 @@ SET citus.multi_shard_modify_mode to sequential; UPDATE tt2 SET col_2 = 3 RETURNING id, col_2; DROP TABLE tt2; --- Multiple RTEs are not supported +-- Multiple RTEs are only supported if subquery is pushdownable SET citus.multi_shard_modify_mode to DEFAULT; -UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; -UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table); -UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table); -DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id; -DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table); -DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table); +-- To test colocation between tables in modify query +SET citus.shard_count to 6; + +CREATE TABLE events_test_table_2 (user_id int, value_1 int, value_2 int, value_3 int); +SELECT create_distributed_table('events_test_table_2', 'user_id'); +\COPY events_test_table_2 FROM STDIN DELIMITER AS ','; +1, 5, 7, 7 +3, 11, 78, 18 +5, 22, 9, 25 +7, 41, 10, 23 +9, 34, 11, 21 +1, 20, 12, 25 +3, 26, 13, 18 +5, 17, 14, 4 +7, 37, 15, 22 +9, 42, 16, 22 +1, 60, 17, 17 +3, 5, 18, 8 +5, 15, 19, 44 +7, 24, 20, 38 +9, 54, 21, 17 +\. + +CREATE TABLE events_test_table_local (user_id int, value_1 int, value_2 int, value_3 int); +\COPY events_test_table_local FROM STDIN DELIMITER AS ','; +1, 5, 7, 7 +3, 11, 78, 18 +5, 22, 9, 25 +7, 41, 10, 23 +9, 34, 11, 21 +1, 20, 12, 25 +3, 26, 13, 18 +5, 17, 14, 4 +7, 37, 15, 22 +9, 42, 16, 22 +1, 60, 17, 17 +3, 5, 18, 8 +5, 15, 19, 44 +7, 24, 20, 38 +9, 54, 21, 17 +\. + +CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int); +INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5); +INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7); +INSERT INTO test_table_1 VALUES(3, '2111-01-12 08:35:19', 9); +SELECT create_distributed_table('test_table_1', 'id'); + +-- We can pushdown query if there is partition key equality +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; + +DELETE FROM users_test_table +USING events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; + +UPDATE users_test_table +SET value_1 = 3 +WHERE user_id IN (SELECT user_id + FROM events_test_table); + +DELETE FROM users_test_table +WHERE user_id IN (SELECT user_id + FROM events_test_table); + +DELETE FROM events_test_table_2 +WHERE now() > (SELECT max(date_col) + FROM test_table_1 + WHERE test_table_1.id = events_test_table_2.user_id + GROUP BY id) +RETURNING *; + +UPDATE users_test_table +SET value_1 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id + AND events_test_table.user_id > 5; + +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + UNION + SELECT user_id + FROM events_test_table); + +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + UNION + SELECT user_id + FROM events_test_table) returning *; + +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + UNION ALL + SELECT user_id + FROM events_test_table) returning *; + +UPDATE users_test_table +SET value_1 = 5 +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_test_table + WHERE + users_test_table.user_id = events_test_table.user_id + GROUP BY + user_id + ); + +UPDATE users_test_table +SET value_3 = 1 +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_test_table + WHERE + users_test_table.user_id = events_test_table.user_id AND + users_test_table.value_2 > events_test_table.value_2 + GROUP BY + user_id + ); + +UPDATE users_test_table +SET value_2 = 4 +WHERE + value_1 > 1 AND value_1 < 3 + AND value_2 >= 1 + AND user_id IN + ( + SELECT + e1.user_id + FROM ( + SELECT + user_id, + 1 AS view_homepage + FROM events_test_table + WHERE + value_1 IN (0, 1) + ) e1 LEFT JOIN LATERAL ( + SELECT + user_id, + 1 AS use_demo + FROM events_test_table + WHERE + user_id = e1.user_id + ) e2 ON true +); + +UPDATE users_test_table +SET value_3 = 5 +WHERE value_2 IN (SELECT AVG(value_1) OVER (PARTITION BY user_id) FROM events_test_table WHERE events_test_table.user_id = users_test_table.user_id); + +-- Test it within transaction +BEGIN; + +INSERT INTO users_test_table +SELECT * FROM events_test_table +WHERE events_test_table.user_id = 1 OR events_test_table.user_id = 5; + +SELECT SUM(value_2) FROM users_test_table; + +UPDATE users_test_table +SET value_2 = 1 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; + +SELECT SUM(value_2) FROM users_test_table; + +COMMIT; + +-- Test with schema +CREATE SCHEMA sec_schema; +CREATE TABLE sec_schema.tt1(id int, value_1 int); +SELECT create_distributed_table('sec_schema.tt1','id'); +INSERT INTO sec_schema.tt1 values(1,1),(2,2),(7,7),(9,9); + +UPDATE sec_schema.tt1 +SET value_1 = 11 +WHERE id < (SELECT max(value_2) FROM events_test_table_2 + WHERE sec_schema.tt1.id = events_test_table_2.user_id + GROUP BY user_id) +RETURNING *; + +DROP SCHEMA sec_schema CASCADE; + +-- We don't need partition key equality with reference tables +UPDATE events_test_table +SET value_2 = 5 +FROM users_reference_copy_table +WHERE users_reference_copy_table.user_id = events_test_table.value_1; + +-- Both reference tables and hash distributed tables can be used in subquery +UPDATE events_test_table as ett +SET value_2 = 6 +WHERE ett.value_3 IN (SELECT utt.value_3 + FROM users_test_table as utt, users_reference_copy_table as uct + WHERE utt.user_id = uct.user_id AND utt.user_id = ett.user_id); + +-- We don't need equality check with constant values in sub-select +UPDATE users_reference_copy_table +SET value_2 = 6 +WHERE user_id IN (SELECT 2); + +UPDATE users_reference_copy_table +SET value_2 = 6 +WHERE value_1 IN (SELECT 2); + +UPDATE users_test_table +SET value_2 = 6 +WHERE user_id IN (SELECT 2); + +UPDATE users_test_table +SET value_2 = 6 +WHERE value_1 IN (SELECT 2); + +-- Can only use immutable functions +UPDATE test_table_1 +SET col_3 = 6 +WHERE date_col IN (SELECT now()); + +-- Test with prepared statements +SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0; +PREPARE foo_plan_2(int,int) AS UPDATE users_test_table + SET value_1 = $1, value_3 = $2 + FROM events_test_table + WHERE users_test_table.user_id = events_test_table.user_id; + +EXECUTE foo_plan_2(1,5); +EXECUTE foo_plan_2(3,15); +EXECUTE foo_plan_2(5,25); +EXECUTE foo_plan_2(7,35); +EXECUTE foo_plan_2(9,45); +EXECUTE foo_plan_2(0,0); +SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0; + +-- Test with varying WHERE expressions +UPDATE users_test_table +SET value_1 = 7 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id OR FALSE; + +UPDATE users_test_table +SET value_1 = 7 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id AND TRUE; + +-- Test with inactive shard-placement +-- manually set shardstate of one placement of users_test_table as inactive +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440000; +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; + +-- manually set shardstate of one placement of events_test_table as inactive +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440004; +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; + +UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440000; +UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440004; + +-- Subquery must return single value to use it with comparison operators +UPDATE users_test_table as utt +SET value_1 = 3 +WHERE value_2 > (SELECT value_3 FROM events_test_table as ett WHERE utt.user_id = ett.user_id); + +-- We can not pushdown a query if the target relation is reference table +UPDATE users_reference_copy_table +SET value_2 = 5 +FROM events_test_table +WHERE users_reference_copy_table.user_id = events_test_table.user_id; + +-- We cannot push down it if the query has outer join and using +UPDATE events_test_table +SET value_2 = users_test_table.user_id +FROM users_test_table +FULL OUTER JOIN events_test_table e2 USING (user_id) +WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2; + +-- We can not pushdown query if there is no partition key equality +UPDATE users_test_table +SET value_1 = 1 +WHERE user_id IN (SELECT Count(value_1) + FROM events_test_table + GROUP BY user_id); + +UPDATE users_test_table +SET value_1 = (SELECT Count(*) + FROM events_test_table); + +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + UNION + SELECT value_1 + FROM events_test_table); + +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN (SELECT user_id + FROM users_test_table + INTERSECT + SELECT Sum(value_1) + FROM events_test_table + GROUP BY user_id); + +UPDATE users_test_table +SET value_2 = (SELECT value_3 + FROM users_test_table); + +UPDATE users_test_table +SET value_2 = 2 +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_test_table + WHERE + users_test_table.user_id > events_test_table.user_id AND + users_test_table.value_1 = events_test_table.value_1 + GROUP BY + user_id + ); + +UPDATE users_test_table +SET (value_1, value_2) = (2,1) +WHERE user_id IN + (SELECT user_id + FROM users_test_table + INTERSECT + SELECT user_id + FROM events_test_table); + +-- Reference tables can not locate on the outer part of the outer join +UPDATE users_test_table +SET value_1 = 4 +WHERE user_id IN + (SELECT DISTINCT e2.user_id + FROM users_reference_copy_table + LEFT JOIN users_test_table e2 ON (e2.user_id = users_reference_copy_table.value_1)) RETURNING *; + +-- Volatile functions are also not supported +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id * random(); + +UPDATE users_test_table +SET value_2 = 5 * random() +FROM events_test_table +WHERE users_test_table.user_id = events_test_table.user_id; + +UPDATE users_test_table +SET value_2 = 5 +WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table); + +-- Local tables are not supported +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table_local +WHERE users_test_table.user_id = events_test_table_local.user_id; + +UPDATE users_test_table +SET value_2 = 5 +WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local); + +UPDATE events_test_table_local +SET value_2 = 5 +FROM users_test_table +WHERE events_test_table_local.user_id = users_test_table.user_id; + +-- Shard counts of tables must be equal to pushdown the query +UPDATE users_test_table +SET value_2 = 5 +FROM events_test_table_2 +WHERE users_test_table.user_id = events_test_table_2.user_id; + +-- Should error out due to multiple row return from subquery, but we can not get this information within +-- subquery pushdown planner. This query will be sent to worker with recursive planner. +DELETE FROM users_test_table +WHERE users_test_table.user_id = (SELECT user_id + FROM events_test_table); -- Cursors are not supported BEGIN; @@ -221,12 +619,6 @@ UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor; ROLLBACK; -- Stable functions are supported -CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int); -INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5); -INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7); -INSERT INTO test_table_1 VALUES(3, '2011-01-12 08:35:19', 9); -SELECT create_distributed_table('test_table_1', 'id'); - SELECT * FROM test_table_1; UPDATE test_table_1 SET col_3 = 3 WHERE date_col < now(); SELECT * FROM test_table_1; diff --git a/src/test/regress/sql/with_modifying.sql b/src/test/regress/sql/with_modifying.sql index 4f06d2d5a..f7fb58ef2 100644 --- a/src/test/regress/sql/with_modifying.sql +++ b/src/test/regress/sql/with_modifying.sql @@ -221,6 +221,7 @@ SELECT * FROM modify_table ORDER BY id, val; SELECT * FROM anchor_table ORDER BY id; INSERT INTO modify_table VALUES (11, 1), (12, 2), (13, 3); + WITH select_data AS ( SELECT * FROM modify_table ),