diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index a9c9394bc..2f8eeb9b4 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -1469,11 +1469,10 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, else { columnArray[columnIndex] = PQgetvalue(result, rowIndex, columnIndex); - if (SubPlanLevel > 0) + if (SubPlanLevel > 0 && executionStats) { - executionStats->totalIntermediateResultSize += PQgetlength(result, - rowIndex, - columnIndex); + int rowLength = PQgetlength(result, rowIndex, columnIndex); + executionStats->totalIntermediateResultSize += rowLength; } } } diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 62f594701..077f54827 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -150,7 +150,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) if (modifyQuery->commandType != CMD_UTILITY) { bool multiShardQuery = true; - DeferredErrorMessage *error = ModifyQuerySupported(modifyQuery, multiShardQuery); + DeferredErrorMessage *error = + ModifyQuerySupported(modifyQuery, modifyQuery, multiShardQuery); if (error) { diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 08230c87d..716b7b543 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -110,6 +110,7 @@ static DistributedPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, RelationRestrictionContext * restrictionContext); +static bool IsTidColumn(Node *node); static bool MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce); static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state); @@ -191,7 +192,8 @@ CreateModifyPlan(Query *originalQuery, Query *query, distributedPlan->operation = query->commandType; - distributedPlan->planningError = ModifyQuerySupported(query, multiShardQuery); + distributedPlan->planningError = ModifyQuerySupported(query, originalQuery, + multiShardQuery); if (distributedPlan->planningError != NULL) { return distributedPlan; @@ -488,12 +490,37 @@ ExtractInsertRangeTableEntry(Query *query) } +/* + * IsTidColumn gets a node and returns true if the node is a Var type of TID. + */ +static bool +IsTidColumn(Node *node) +{ + if (IsA(node, Var)) + { + Var *column = (Var *) node; + if (column->vartype == TIDOID) + { + return true; + } + } + + return false; +} + + /* * ModifyQuerySupported returns NULL if the query only contains supported * features, otherwise it returns an error description. + * Note that we need both the original query and the modified one because + * different checks need different versions. In particular, we cannot + * perform the ContainsReadIntermediateResultFunction check on the + * rewritten query because it may have been replaced by a subplan, + * while some of the checks for setting the partition column value rely + * on the rewritten query. */ DeferredErrorMessage * -ModifyQuerySupported(Query *queryTree, bool multiShardQuery) +ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuery) { Oid distributedTableId = ExtractFirstDistributedTableId(queryTree); uint32 rangeTableId = 1; @@ -507,9 +534,28 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery) List *onConflictSet = NIL; Node *arbiterWhere = NULL; Node *onConflictWhere = NULL; - CmdType commandType = queryTree->commandType; + /* + * Here, we check if a recursively planned query tries to modify + * rows based on the ctid column. This is a bad idea because ctid of + * the rows could be changed before the modification part of + * the query is executed. + */ + if (ContainsReadIntermediateResultFunction((Node *) originalQuery)) + { + bool hasTidColumn = FindNodeCheck((Node *) originalQuery->jointree, IsTidColumn); + if (hasTidColumn) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot perform distributed planning for the given " + "modification", + "Recursively planned distributed modifications " + "with ctid on where clause are not supported.", + NULL); + } + } + /* * Reject subqueries which are in SELECT or WHERE clause. * Queries which include subqueries in FROM clauses are rejected below. @@ -520,7 +566,7 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery) * We support UPDATE and DELETE with subqueries unless they are multi * shard queries. */ - if (!UpdateOrDeleteQuery(queryTree) || multiShardQuery) + if (!UpdateOrDeleteQuery(queryTree)) { StringInfo errorHint = makeStringInfo(); DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry( @@ -604,7 +650,7 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery) * We support UPDATE and DELETE with subqueries and joins unless * they are multi shard queries. */ - if (UpdateOrDeleteQuery(queryTree) && !multiShardQuery) + if (UpdateOrDeleteQuery(queryTree)) { continue; } @@ -707,7 +753,7 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery) } if (commandType == CMD_UPDATE && - contain_volatile_functions((Node *) targetEntry->expr)) + FindNodeCheck((Node *) targetEntry->expr, CitusIsVolatileFunction)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "functions used in UPDATE queries on distributed " @@ -732,7 +778,7 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery) if (joinTree != NULL) { - if (contain_volatile_functions(joinTree->quals)) + if (FindNodeCheck((Node *) joinTree->quals, CitusIsVolatileFunction)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "functions used in the WHERE clause of modification " @@ -982,9 +1028,6 @@ MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state) * executor! */ - /* subqueries aren't allowed and should fail before control reaches this point */ - Assert(!IsA(expression, Query)); - hasVolatileFunction = check_functions_in_node(expression, MasterIrreducibleExpressionFunctionChecker, &volatileFlag); @@ -1668,7 +1711,8 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon Assert(UpdateOrDeleteQuery(originalQuery)); - planningError = ModifyQuerySupported(originalQuery, isMultiShardQuery); + planningError = ModifyQuerySupported(originalQuery, originalQuery, + isMultiShardQuery); if (planningError != NULL) { return planningError; diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 583ef3489..2029c8566 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -667,15 +667,6 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext) return NULL; } - if (query->hasModifyingCTE) - { - /* we could easily support these, but it's a little scary */ - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "data-modifying statements are not supported in " - "the WITH clauses of distributed queries", - NULL, NULL); - } - if (query->hasRecursive) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, @@ -1335,8 +1326,18 @@ BuildSubPlanResultQuery(Query *subquery, List *columnAliasList, uint64 planId, Oid copyFormatId = BinaryCopyFormatId(); int columnAliasCount = list_length(columnAliasList); + List *targetEntryList = NIL; + if (subquery->returningList) + { + targetEntryList = subquery->returningList; + } + else + { + targetEntryList = subquery->targetList; + } + /* build the target list and column definition list */ - foreach(targetEntryCell, subquery->targetList) + foreach(targetEntryCell, targetEntryList) { TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); Node *targetExpr = (Node *) targetEntry->expr; diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 79e5268b1..3cc3cc9a8 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -10,8 +10,10 @@ #include "distributed/citus_clauses.h" #include "distributed/insert_select_planner.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_router_planner.h" +#include "catalog/pg_proc.h" #include "catalog/pg_type.h" #include "executor/executor.h" #include "nodes/makefuncs.h" @@ -38,6 +40,8 @@ static Node * PartiallyEvaluateExpressionMutator(Node *expression, FunctionEvaluationContext *context); static Expr * citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod, Oid result_collation, PlanState *planState); +static bool CitusIsVolatileFunctionIdChecker(Oid func_id, void *context); +static bool CitusIsMutableFunctionIdChecker(Oid func_id, void *context); /* @@ -54,7 +58,7 @@ RequiresMasterEvaluation(Query *query) { TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - if (contain_mutable_functions((Node *) targetEntry->expr)) + if (FindNodeCheck((Node *) targetEntry->expr, CitusIsMutableFunction)) { return true; } @@ -73,7 +77,7 @@ RequiresMasterEvaluation(Query *query) } else if (rte->rtekind == RTE_VALUES) { - if (contain_mutable_functions((Node *) rte->values_lists)) + if (FindNodeCheck((Node *) rte->values_lists, CitusIsMutableFunction)) { return true; } @@ -92,7 +96,10 @@ RequiresMasterEvaluation(Query *query) if (query->jointree && query->jointree->quals) { - return contain_mutable_functions((Node *) query->jointree->quals); + if (FindNodeCheck((Node *) query->jointree->quals, CitusIsMutableFunction)) + { + return true; + } } return false; @@ -392,4 +399,95 @@ citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod, resultTypByVal); } + +/* + * CitusIsVolatileFunctionIdChecker checks if the given function id is + * a volatile function other than read_intermediate_result(). + */ +static bool +CitusIsVolatileFunctionIdChecker(Oid func_id, void *context) +{ + if (func_id == CitusReadIntermediateResultFuncId()) + { + return false; + } + + return (func_volatile(func_id) == PROVOLATILE_VOLATILE); +} + + +/* + * CitusIsVolatileFunction checks if the given node is a volatile function + * other than Citus's internal functions. + */ +bool +CitusIsVolatileFunction(Node *node) +{ + /* Check for volatile functions in node itself */ + if (check_functions_in_node(node, CitusIsVolatileFunctionIdChecker, NULL)) + { + return true; + } + +#if (PG_VERSION_NUM >= 100000) + if (IsA(node, NextValueExpr)) + { + /* NextValueExpr is volatile */ + return true; + } +#endif + + return false; +} + + +/* + * CitusIsMutableFunctionIdChecker checks if the given function id is + * a mutable function other than read_intermediate_result(). + */ +static bool +CitusIsMutableFunctionIdChecker(Oid func_id, void *context) +{ + if (func_id == CitusReadIntermediateResultFuncId()) + { + return false; + } + else + { + return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE); + } +} + + +/* + * CitusIsMutableFunction checks if the given node is a mutable function + * other than Citus's internal functions. + */ +bool +CitusIsMutableFunction(Node *node) +{ + /* Check for mutable functions in node itself */ + if (check_functions_in_node(node, CitusIsMutableFunctionIdChecker, NULL)) + { + return true; + } + +#if (PG_VERSION_NUM >= 100000) + if (IsA(node, SQLValueFunction)) + { + /* all variants of SQLValueFunction are stable */ + return true; + } + + if (IsA(node, NextValueExpr)) + { + /* NextValueExpr is volatile */ + return true; + } +#endif + + return false; +} + + /* *INDENT-ON* */ diff --git a/src/include/distributed/citus_clauses.h b/src/include/distributed/citus_clauses.h index 4bd619000..91c780a38 100644 --- a/src/include/distributed/citus_clauses.h +++ b/src/include/distributed/citus_clauses.h @@ -18,5 +18,7 @@ extern bool RequiresMasterEvaluation(Query *query); extern void ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState); extern Node * PartiallyEvaluateExpression(Node *expression, PlanState *planState); +extern bool CitusIsVolatileFunction(Node *node); +extern bool CitusIsMutableFunction(Node *node); #endif /* CITUS_CLAUSES_H */ diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 9fac464dd..cf5cfa650 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -224,6 +224,7 @@ extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList); extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList); extern List * pull_var_clause_default(Node *node); extern bool OperatorImplementsEquality(Oid opno); +extern bool FindNodeCheck(Node *node, bool (*check)(Node *)); #endif /* MULTI_LOGICAL_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 153fdcaf0..d97cea3d5 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -40,7 +40,7 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, bool *multiShardModifyQuery); extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError); extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList); -extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, +extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuery); extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex); extern RelationRestrictionContext * CopyRelationRestrictionContext( diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index f2792ddf7..57518532b 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -964,12 +964,12 @@ SELECT * FROM summary_table ORDER BY id; -- unsupported multi-shard updates UPDATE summary_table SET average_value = average_query.average FROM ( SELECT avg(value) AS average FROM raw_table) average_query; -ERROR: subqueries are not supported in modifications across multiple shards -DETAIL: Consider using an equality filter on partition column "id" to target a single shard. +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. UPDATE summary_table SET average_value = average_value + 1 WHERE id = (SELECT id FROM raw_table WHERE value > 100); -ERROR: subqueries are not supported in modifications across multiple shards -DETAIL: Consider using an equality filter on partition column "id" to target a single shard. +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. -- test complex queries UPDATE summary_table SET @@ -1108,8 +1108,8 @@ SELECT master_modify_multiple_shards(' SELECT avg(value) AS average FROM raw_table WHERE id = 1 ) average_query WHERE id = 1'); -ERROR: subqueries are not supported in modifications across multiple shards -DETAIL: Consider using an equality filter on partition column "id" to target a single shard. +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. -- test connection API via using COPY -- COPY on SELECT part BEGIN; diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index 3f7215f17..7e8b40d8f 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -428,13 +428,55 @@ WITH RECURSIVE hierarchy as ( ce.company_id = 2)) SELECT * FROM hierarchy WHERE LEVEL <= 2; ERROR: recursive CTEs are not supported in distributed queries --- CTE with queries other than SELECT is not supported +-- Test router modifying CTEs WITH new_article AS ( - INSERT INTO articles_hash VALUES (1, 1, 'arsenous', 9572) RETURNING * + INSERT INTO articles_hash VALUES (1, 1, 'arsenous', 9) RETURNING * ) SELECT * FROM new_article; DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries -ERROR: data-modifying statements are not supported in the WITH clauses of distributed queries +DEBUG: generating subplan 81_1 for CTE new_article: INSERT INTO public.articles_hash (id, author_id, title, word_count) VALUES (1, 1, 'arsenous'::character varying, 9) RETURNING id, author_id, title, word_count +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: Plan 81 query after replacing subqueries and CTEs: SELECT id, author_id, title, word_count FROM (SELECT intermediate_result.id, intermediate_result.author_id, intermediate_result.title, intermediate_result.word_count FROM read_intermediate_result('81_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, author_id bigint, title character varying(20), word_count integer)) new_article +DEBUG: Creating router plan +DEBUG: Plan is router executable + id | author_id | title | word_count +----+-----------+----------+------------ + 1 | 1 | arsenous | 9 +(1 row) + +WITH update_article AS ( + UPDATE articles_hash SET word_count = 10 WHERE id = 1 AND word_count = 9 RETURNING * +) +SELECT * FROM update_article; +DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries +DEBUG: generating subplan 83_1 for CTE update_article: UPDATE public.articles_hash SET word_count = 10 WHERE ((id = 1) AND (word_count = 9)) RETURNING id, author_id, title, word_count +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: Plan 83 query after replacing subqueries and CTEs: SELECT id, author_id, title, word_count FROM (SELECT intermediate_result.id, intermediate_result.author_id, intermediate_result.title, intermediate_result.word_count FROM read_intermediate_result('83_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, author_id bigint, title character varying(20), word_count integer)) update_article +DEBUG: Creating router plan +DEBUG: Plan is router executable + id | author_id | title | word_count +----+-----------+----------+------------ + 1 | 1 | arsenous | 10 +(1 row) + +WITH delete_article AS ( + DELETE FROM articles_hash WHERE id = 1 AND word_count = 10 RETURNING * +) +SELECT * FROM delete_article; +DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries +DEBUG: generating subplan 85_1 for CTE delete_article: DELETE FROM public.articles_hash WHERE ((id = 1) AND (word_count = 10)) RETURNING id, author_id, title, word_count +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: Plan 85 query after replacing subqueries and CTEs: SELECT id, author_id, title, word_count FROM (SELECT intermediate_result.id, intermediate_result.author_id, intermediate_result.title, intermediate_result.word_count FROM read_intermediate_result('85_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, author_id bigint, title character varying(20), word_count integer)) delete_article +DEBUG: Creating router plan +DEBUG: Plan is router executable + id | author_id | title | word_count +----+-----------+----------+------------ + 1 | 1 | arsenous | 10 +(1 row) + -- Modifying statement in nested CTE case is covered by PostgreSQL itself WITH new_article AS ( WITH nested_cte AS ( @@ -557,8 +599,8 @@ DEBUG: Plan is router executable SELECT articles_hash.id,test.word_count FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test WHERE test.id = articles_hash.id ORDER BY test.word_count DESC, articles_hash.id LIMIT 5; -DEBUG: generating subplan 88_1 for subquery SELECT id, word_count FROM public.articles_hash -DEBUG: Plan 88 query after replacing subqueries and CTEs: SELECT articles_hash.id, test.word_count FROM public.articles_hash, (SELECT intermediate_result.id, intermediate_result.word_count FROM read_intermediate_result('88_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, word_count integer)) test WHERE (test.id = articles_hash.id) ORDER BY test.word_count DESC, articles_hash.id LIMIT 5 +DEBUG: generating subplan 93_1 for subquery SELECT id, word_count FROM public.articles_hash +DEBUG: Plan 93 query after replacing subqueries and CTEs: SELECT articles_hash.id, test.word_count FROM public.articles_hash, (SELECT intermediate_result.id, intermediate_result.word_count FROM read_intermediate_result('93_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, word_count integer)) test WHERE (test.id = articles_hash.id) ORDER BY test.word_count DESC, articles_hash.id LIMIT 5 DEBUG: push down of limit count: 5 id | word_count ----+------------ @@ -573,8 +615,8 @@ SELECT articles_hash.id,test.word_count FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test WHERE test.id = articles_hash.id and articles_hash.author_id = 1 ORDER BY articles_hash.id; -DEBUG: generating subplan 90_1 for subquery SELECT id, word_count FROM public.articles_hash -DEBUG: Plan 90 query after replacing subqueries and CTEs: SELECT articles_hash.id, test.word_count FROM public.articles_hash, (SELECT intermediate_result.id, intermediate_result.word_count FROM read_intermediate_result('90_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, word_count integer)) test WHERE ((test.id = articles_hash.id) AND (articles_hash.author_id = 1)) ORDER BY articles_hash.id +DEBUG: generating subplan 95_1 for subquery SELECT id, word_count FROM public.articles_hash +DEBUG: Plan 95 query after replacing subqueries and CTEs: SELECT articles_hash.id, test.word_count FROM public.articles_hash, (SELECT intermediate_result.id, intermediate_result.word_count FROM read_intermediate_result('95_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, word_count integer)) test WHERE ((test.id = articles_hash.id) AND (articles_hash.author_id = 1)) ORDER BY articles_hash.id DEBUG: Creating router plan DEBUG: Plan is router executable id | word_count @@ -690,10 +732,10 @@ SELECT a.author_id as first_author, b.word_count as second_word_count WHERE a.author_id = 2 and a.author_id = b.author_id LIMIT 3; DEBUG: Found no worker with all shard placements -DEBUG: generating subplan 99_1 for CTE single_shard: SELECT id, author_id, title, word_count FROM public.articles_single_shard_hash +DEBUG: generating subplan 104_1 for CTE single_shard: SELECT id, author_id, title, word_count FROM public.articles_single_shard_hash DEBUG: Creating router plan DEBUG: Plan is router executable -DEBUG: Plan 99 query after replacing subqueries and CTEs: SELECT a.author_id AS first_author, b.word_count AS second_word_count FROM public.articles_hash a, (SELECT intermediate_result.id, intermediate_result.author_id, intermediate_result.title, intermediate_result.word_count FROM read_intermediate_result('99_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, author_id bigint, title character varying(20), word_count integer)) b WHERE ((a.author_id = 2) AND (a.author_id = b.author_id)) LIMIT 3 +DEBUG: Plan 104 query after replacing subqueries and CTEs: SELECT a.author_id AS first_author, b.word_count AS second_word_count FROM public.articles_hash a, (SELECT intermediate_result.id, intermediate_result.author_id, intermediate_result.title, intermediate_result.word_count FROM read_intermediate_result('104_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, author_id bigint, title character varying(20), word_count integer)) b WHERE ((a.author_id = 2) AND (a.author_id = b.author_id)) LIMIT 3 DEBUG: Creating router plan DEBUG: Plan is router executable first_author | second_word_count diff --git a/src/test/regress/expected/multi_shard_update_delete.out b/src/test/regress/expected/multi_shard_update_delete.out index 7bea9cf21..033da858e 100644 --- a/src/test/regress/expected/multi_shard_update_delete.out +++ b/src/test/regress/expected/multi_shard_update_delete.out @@ -323,20 +323,20 @@ UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_ ERROR: cannot perform distributed planning for the given modification DETAIL: Joins are not supported in distributed modifications. UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table); -ERROR: subqueries are not supported in modifications across multiple shards -DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table); -ERROR: subqueries are not supported in modifications across multiple shards -DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id; ERROR: cannot perform distributed planning for the given modification DETAIL: Joins are not supported in distributed modifications. DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table); -ERROR: subqueries are not supported in modifications across multiple shards -DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table); -ERROR: subqueries are not supported in modifications across multiple shards -DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. -- Cursors are not supported BEGIN; DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table; diff --git a/src/test/regress/expected/multi_shard_update_delete_0.out b/src/test/regress/expected/multi_shard_update_delete_0.out index 0a7892bc5..51491571c 100644 --- a/src/test/regress/expected/multi_shard_update_delete_0.out +++ b/src/test/regress/expected/multi_shard_update_delete_0.out @@ -346,20 +346,20 @@ UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_ ERROR: cannot perform distributed planning for the given modification DETAIL: Joins are not supported in distributed modifications. UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table); -ERROR: subqueries are not supported in modifications across multiple shards -DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table); -ERROR: subqueries are not supported in modifications across multiple shards -DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id; ERROR: cannot perform distributed planning for the given modification DETAIL: Joins are not supported in distributed modifications. DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table); -ERROR: subqueries are not supported in modifications across multiple shards -DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table); -ERROR: subqueries are not supported in modifications across multiple shards -DETAIL: Consider using an equality filter on partition column "user_id" to target a single shard. +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. -- Cursors are not supported BEGIN; DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table; diff --git a/src/test/regress/expected/with_join.out b/src/test/regress/expected/with_join.out index 6d691822a..f635ba039 100644 --- a/src/test/regress/expected/with_join.out +++ b/src/test/regress/expected/with_join.out @@ -38,12 +38,12 @@ ORDER BY 2 DESC, 1; user_id | count ---------+------- - 3 | 26792 - 4 | 25024 - 5 | 22724 - 2 | 22554 - 6 | 5720 - 1 | 5593 + 3 | 30168 + 4 | 27768 + 2 | 25327 + 5 | 25083 + 1 | 6776 + 6 | 6710 (6 rows) -- Two non-colocated CTE under a co-located join @@ -74,11 +74,11 @@ ORDER BY 2 DESC, 1; user_id | count ---------+------- - 2 | 60588 - 4 | 21160 - 3 | 13005 - 5 | 10140 - 1 | 4802 + 2 | 67507 + 4 | 23040 + 3 | 14580 + 5 | 10935 + 1 | 6272 (5 rows) diff --git a/src/test/regress/expected/with_modifying.out b/src/test/regress/expected/with_modifying.out index 61a2f6371..39051a912 100644 --- a/src/test/regress/expected/with_modifying.out +++ b/src/test/regress/expected/with_modifying.out @@ -17,6 +17,20 @@ SELECT create_distributed_table('with_modifying.users_table', 'user_id'); (1 row) INSERT INTO with_modifying.users_table SELECT * FROM public.users_table; +CREATE TABLE with_modifying.summary_table (id int, counter int); +SELECT create_distributed_table('summary_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE with_modifying.anchor_table (id int); +SELECT create_reference_table('anchor_table'); + create_reference_table +------------------------ + +(1 row) + -- basic insert query in CTE WITH basic_insert AS ( INSERT INTO users_table VALUES (1), (2), (3) RETURNING * @@ -24,17 +38,41 @@ WITH basic_insert AS ( SELECT * FROM - basic_insert; -ERROR: data-modifying statements are not supported in the WITH clauses of distributed queries + basic_insert +ORDER BY + user_id; + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+------+---------+---------+---------+--------- + 1 | | | | | + 2 | | | | | + 3 | | | | | +(3 rows) + -- single-shard UPDATE in CTE WITH basic_update AS ( - UPDATE users_table SET value_3=42 WHERE user_id=0 RETURNING * + UPDATE users_table SET value_3=41 WHERE user_id=1 RETURNING * ) SELECT * FROM - basic_update; -ERROR: data-modifying statements are not supported in the WITH clauses of distributed queries + basic_update +ORDER BY + user_id, + time +LIMIT 10; + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+---------------------------------+---------+---------+---------+--------- + 1 | Wed Nov 22 22:51:43.132261 2017 | 4 | 0 | 41 | + 1 | Thu Nov 23 03:32:50.803031 2017 | 3 | 2 | 41 | + 1 | Thu Nov 23 09:26:42.145043 2017 | 1 | 3 | 41 | + 1 | Thu Nov 23 11:11:24.40789 2017 | 3 | 4 | 41 | + 1 | Thu Nov 23 11:44:57.515981 2017 | 4 | 3 | 41 | + 1 | Thu Nov 23 17:23:03.441394 2017 | 5 | 4 | 41 | + 1 | Thu Nov 23 17:30:34.635085 2017 | 3 | 4 | 41 | + 1 | | | | 41 | + 1 | | | | 41 | +(9 rows) + -- multi-shard UPDATE in CTE WITH basic_update AS ( UPDATE users_table SET value_3=42 WHERE value_2=1 RETURNING * @@ -42,35 +80,102 @@ WITH basic_update AS ( SELECT * FROM - basic_update; -ERROR: data-modifying statements are not supported in the WITH clauses of distributed queries + basic_update +ORDER BY + user_id, + time +LIMIT 10; + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+---------------------------------+---------+---------+---------+--------- + 2 | Thu Nov 23 06:50:30.797805 2017 | 1 | 1 | 42 | + 2 | Thu Nov 23 06:56:38.46819 2017 | 0 | 1 | 42 | + 2 | Thu Nov 23 13:52:54.83829 2017 | 3 | 1 | 42 | + 3 | Wed Nov 22 18:43:51.450263 2017 | 1 | 1 | 42 | + 3 | Thu Nov 23 00:15:45.610845 2017 | 1 | 1 | 42 | + 4 | Wed Nov 22 23:59:46.493416 2017 | 3 | 1 | 42 | + 4 | Thu Nov 23 01:55:21.824618 2017 | 3 | 1 | 42 | + 4 | Thu Nov 23 06:50:08.101207 2017 | 2 | 1 | 42 | + 4 | Thu Nov 23 07:09:37.382372 2017 | 4 | 1 | 42 | + 4 | Thu Nov 23 08:38:45.877401 2017 | 4 | 1 | 42 | +(10 rows) + -- single-shard DELETE in CTE WITH basic_delete AS ( - DELETE FROM users_table WHERE user_id=42 RETURNING * + DELETE FROM users_table WHERE user_id=6 RETURNING * ) SELECT * FROM - basic_delete; -ERROR: data-modifying statements are not supported in the WITH clauses of distributed queries + basic_delete +ORDER BY + user_id, + time +LIMIT 10; + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+---------------------------------+---------+---------+---------+--------- + 6 | Wed Nov 22 20:15:53.317797 2017 | 1 | 1 | 42 | + 6 | Wed Nov 22 23:01:24.82289 2017 | 2 | 4 | 1 | + 6 | Thu Nov 23 00:07:11.068353 2017 | 1 | 1 | 42 | + 6 | Thu Nov 23 00:09:44.19812 2017 | 5 | 2 | 0 | + 6 | Thu Nov 23 01:13:50.526322 2017 | 2 | 4 | 1 | + 6 | Thu Nov 23 01:14:55.769581 2017 | 0 | 0 | 5 | + 6 | Thu Nov 23 10:22:11.02918 2017 | 5 | 0 | 5 | + 6 | Thu Nov 23 11:08:04.244582 2017 | 2 | 3 | 2 | + 6 | Thu Nov 23 13:51:16.92838 2017 | 0 | 4 | 2 | + 6 | Thu Nov 23 14:43:18.024104 2017 | 3 | 2 | 5 | +(10 rows) + -- multi-shard DELETE in CTE WITH basic_delete AS ( - DELETE FROM users_table WHERE value_2=42 RETURNING * + DELETE FROM users_table WHERE value_3=41 RETURNING * ) SELECT * FROM - basic_delete; -ERROR: data-modifying statements are not supported in the WITH clauses of distributed queries + basic_delete +ORDER BY + user_id, + time +LIMIT 10; + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+---------------------------------+---------+---------+---------+--------- + 1 | Wed Nov 22 22:51:43.132261 2017 | 4 | 0 | 41 | + 1 | Thu Nov 23 03:32:50.803031 2017 | 3 | 2 | 41 | + 1 | Thu Nov 23 09:26:42.145043 2017 | 1 | 3 | 41 | + 1 | Thu Nov 23 11:11:24.40789 2017 | 3 | 4 | 41 | + 1 | Thu Nov 23 11:44:57.515981 2017 | 4 | 3 | 41 | + 1 | Thu Nov 23 17:23:03.441394 2017 | 5 | 4 | 41 | + 1 | Thu Nov 23 17:30:34.635085 2017 | 3 | 4 | 41 | + 1 | | | | 41 | + 1 | | | | 41 | +(9 rows) + -- INSERT...SELECT query in CTE WITH copy_table AS ( - INSERT INTO users_table SELECT * FROM users_table RETURNING * + INSERT INTO users_table SELECT * FROM users_table WHERE user_id = 0 OR user_id = 3 RETURNING * ) SELECT * FROM - copy_table; -ERROR: data-modifying statements are not supported in the WITH clauses of distributed queries + copy_table +ORDER BY + user_id, + time +LIMIT 10; + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+---------------------------------+---------+---------+---------+--------- + 3 | Wed Nov 22 18:43:51.450263 2017 | 1 | 1 | 42 | + 3 | Wed Nov 22 20:43:31.008625 2017 | 1 | 3 | 2 | + 3 | Wed Nov 22 23:24:32.080584 2017 | 3 | 2 | 5 | + 3 | Thu Nov 23 00:15:45.610845 2017 | 1 | 1 | 42 | + 3 | Thu Nov 23 03:23:24.702501 2017 | 1 | 2 | 5 | + 3 | Thu Nov 23 03:52:32.008895 2017 | 4 | 2 | 0 | + 3 | Thu Nov 23 04:01:08.04806 2017 | 5 | 5 | 3 | + 3 | Thu Nov 23 05:01:44.885505 2017 | 3 | 5 | 4 | + 3 | Thu Nov 23 06:20:05.854857 2017 | 1 | 4 | 2 | + 3 | Thu Nov 23 09:57:41.540228 2017 | 2 | 2 | 3 | +(10 rows) + -- CTEs prior to INSERT...SELECT via the coordinator should work WITH cte AS ( SELECT user_id FROM users_table WHERE value_2 IN (1, 2) @@ -120,8 +225,481 @@ WITH cte AS ( INSERT INTO modify_table (SELECT user_id FROM events_table) RETURNING * ) SELECT * FROM cte; -ERROR: data-modifying statements are not supported in the WITH clauses of distributed queries +ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator +WITH user_data AS ( + SELECT user_id, value_2 FROM users_table +) +INSERT INTO modify_table SELECT * FROM user_data; +WITH raw_data AS ( + DELETE FROM modify_table RETURNING * +) +INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; +SELECT * FROM summary_table ORDER BY id; + id | counter +----+--------- + 2 | 153 + 3 | 184 + 4 | 160 + 5 | 170 +(4 rows) + +SELECT COUNT(*) FROM modify_table; + count +------- + 0 +(1 row) + +INSERT INTO modify_table VALUES (1,1), (2, 2), (3,3); +WITH raw_data AS ( + DELETE FROM modify_table RETURNING * +) +INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; +SELECT * FROM summary_table ORDER BY id, counter; + id | counter +----+--------- + 1 | 1 + 2 | 1 + 2 | 153 + 3 | 1 + 3 | 184 + 4 | 160 + 5 | 170 +(7 rows) + +SELECT COUNT(*) FROM modify_table; + count +------- + 0 +(1 row) + +WITH insert_reference AS ( + INSERT INTO anchor_table VALUES (1), (2) RETURNING * +) +SELECT id FROM insert_reference ORDER BY id; + id +---- + 1 + 2 +(2 rows) + +WITH anchor_data AS ( + SELECT * FROM anchor_table +), +raw_data AS ( + DELETE FROM modify_table RETURNING * +), +summary_data AS ( + DELETE FROM summary_table RETURNING * +) +INSERT INTO + summary_table +SELECT id, SUM(counter) FROM ( + (SELECT raw_data.id, COUNT(*) AS counter FROM raw_data, anchor_data + WHERE raw_data.id = anchor_data.id GROUP BY raw_data.id) + UNION ALL + (SELECT * FROM summary_data)) AS all_rows +GROUP BY + id; +SELECT COUNT(*) FROM modify_table; + count +------- + 0 +(1 row) + +SELECT * FROM summary_table ORDER BY id, counter; + id | counter +----+--------- + 1 | 1 + 2 | 154 + 3 | 185 + 4 | 160 + 5 | 170 +(5 rows) + +WITH added_data AS ( + INSERT INTO modify_table VALUES (1,2), (1,6), (2,4), (3,6) RETURNING * +), +raw_data AS ( + DELETE FROM modify_table WHERE id = 1 AND val = (SELECT MAX(val) FROM added_data) RETURNING * +) +INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; +SELECT COUNT(*) FROM modify_table; + count +------- + 3 +(1 row) + +SELECT * FROM summary_table ORDER BY id, counter; + id | counter +----+--------- + 1 | 1 + 1 | 1 + 2 | 154 + 3 | 185 + 4 | 160 + 5 | 170 +(6 rows) + +-- Merge rows in the summary_table +WITH summary_data AS ( + DELETE FROM summary_table RETURNING * +) +INSERT INTO summary_table SELECT id, SUM(counter) AS counter FROM summary_data GROUP BY id; +SELECT * FROM summary_table ORDER BY id, counter; + id | counter +----+--------- + 1 | 2 + 2 | 154 + 3 | 185 + 4 | 160 + 5 | 170 +(5 rows) + +SELECT * FROM modify_table ORDER BY id, val; + id | val +----+----- + 1 | 2 + 2 | 4 + 3 | 6 +(3 rows) + +SELECT * FROM anchor_table ORDER BY id; + id +---- + 1 + 2 +(2 rows) + +INSERT INTO modify_table VALUES (11, 1), (12, 2), (13, 3); +WITH select_data AS ( + SELECT * FROM modify_table +), +raw_data AS ( + DELETE FROM modify_table WHERE id >= (SELECT min(id) FROM select_data WHERE id > 10) RETURNING * +) +INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; +INSERT INTO modify_table VALUES (21, 1), (22, 2), (23, 3); +-- read ids from the same table +WITH distinct_ids AS ( + SELECT DISTINCT id FROM modify_table +), +update_data AS ( + UPDATE modify_table SET val = 100 WHERE id > 10 AND + id IN (SELECT * FROM distinct_ids) RETURNING * +) +SELECT count(*) FROM update_data; + count +------- + 3 +(1 row) + +-- read ids from a different table +WITH distinct_ids AS ( + SELECT DISTINCT id FROM summary_table +), +update_data AS ( + UPDATE modify_table SET val = 100 WHERE id > 10 AND + id IN (SELECT * FROM distinct_ids) RETURNING * +) +SELECT count(*) FROM update_data; + count +------- + 0 +(1 row) + +-- test update with generate series +UPDATE modify_table SET val = 200 WHERE id > 10 AND + id IN (SELECT 2*s FROM generate_series(1,20) s); +-- test update with generate series in CTE +WITH update_data AS ( + UPDATE modify_table SET val = 300 WHERE id > 10 AND + id IN (SELECT 3*s FROM generate_series(1,20) s) RETURNING * +) +SELECT COUNT(*) FROM update_data; + count +------- + 1 +(1 row) + +WITH delete_rows AS ( + DELETE FROM modify_table WHERE id > 10 RETURNING * +) +SELECT * FROM delete_rows ORDER BY id, val; + id | val +----+----- + 21 | 300 + 22 | 200 + 23 | 100 +(3 rows) + +WITH delete_rows AS ( + DELETE FROM summary_table WHERE id > 10 RETURNING * +) +SELECT * FROM delete_rows ORDER BY id, counter; + id | counter +----+--------- + 11 | 1 + 12 | 1 + 13 | 1 +(3 rows) + +-- Check modifiying CTEs inside a transaction +BEGIN; +WITH raw_data AS ( + DELETE FROM modify_table RETURNING * +) +INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; +WITH insert_reference AS ( + INSERT INTO anchor_table VALUES (3), (4) RETURNING * +) +SELECT id FROM insert_reference ORDER BY id; + id +---- + 3 + 4 +(2 rows) + +SELECT * FROM summary_table ORDER BY id, counter; + id | counter +----+--------- + 1 | 1 + 1 | 2 + 2 | 1 + 2 | 154 + 3 | 1 + 3 | 185 + 4 | 160 + 5 | 170 +(8 rows) + +SELECT * FROM modify_table ORDER BY id, val; + id | val +----+----- +(0 rows) + +SELECT * FROM anchor_table ORDER BY id; + id +---- + 1 + 2 + 3 + 4 +(4 rows) + +ROLLBACK; +SELECT * FROM summary_table ORDER BY id, counter; + id | counter +----+--------- + 1 | 2 + 2 | 154 + 3 | 185 + 4 | 160 + 5 | 170 +(5 rows) + +SELECT * FROM modify_table ORDER BY id, val; + id | val +----+----- + 1 | 2 + 2 | 4 + 3 | 6 +(3 rows) + +SELECT * FROM anchor_table ORDER BY id; + id +---- + 1 + 2 +(2 rows) + +-- Test delete with subqueries +WITH deleted_rows AS ( + DELETE FROM modify_table WHERE id IN (SELECT id FROM modify_table WHERE id = 1) RETURNING * +) +SELECT * FROM deleted_rows; + id | val +----+----- + 1 | 2 +(1 row) + +WITH deleted_rows AS ( + DELETE FROM modify_table WHERE id IN (SELECT id FROM modify_table WHERE val = 4) RETURNING * +) +SELECT * FROM deleted_rows; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. +WITH select_rows AS ( + SELECT id FROM modify_table WHERE val = 4 +), +deleted_rows AS ( + DELETE FROM modify_table WHERE id IN (SELECT id FROM select_rows) RETURNING * +) +SELECT * FROM deleted_rows; + id | val +----+----- + 2 | 4 +(1 row) + +WITH deleted_rows AS ( + DELETE FROM modify_table WHERE val IN (SELECT val FROM modify_table WHERE id = 3) RETURNING * +) +SELECT * FROM deleted_rows; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. +WITH select_rows AS ( + SELECT val FROM modify_table WHERE id = 3 +), +deleted_rows AS ( + DELETE FROM modify_table WHERE val IN (SELECT val FROM select_rows) RETURNING * +) +SELECT * FROM deleted_rows; + id | val +----+----- + 3 | 6 +(1 row) + +WITH deleted_rows AS ( + DELETE FROM modify_table WHERE ctid IN (SELECT ctid FROM modify_table WHERE id = 1) RETURNING * +) +SELECT * FROM deleted_rows; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. +WITH select_rows AS ( + SELECT ctid FROM modify_table WHERE id = 1 +), +deleted_rows AS ( + DELETE FROM modify_table WHERE ctid IN (SELECT ctid FROM select_rows) RETURNING * +) +SELECT * FROM deleted_rows; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Recursively planned distributed modifications with ctid on where clause are not supported. +WITH added_data AS ( + INSERT INTO modify_table VALUES (1,2), (1,6) RETURNING * +), +select_data AS ( + SELECT * FROM added_data WHERE id = 1 +), +raw_data AS ( + DELETE FROM modify_table WHERE id = 1 AND ctid IN (SELECT ctid FROM select_data) RETURNING val +) +SELECT * FROM raw_data ORDER BY val; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Recursively planned distributed modifications with ctid on where clause are not supported. +WITH added_data AS ( + INSERT INTO modify_table VALUES (1, trunc(10 * random())), (1, trunc(random())) RETURNING * +), +select_data AS ( + SELECT val, now() FROM added_data WHERE id = 1 +), +raw_data AS ( + DELETE FROM modify_table WHERE id = 1 AND val IN (SELECT val FROM select_data) RETURNING * +) +SELECT COUNT(*) FROM raw_data; + count +------- + 2 +(1 row) + +INSERT INTO modify_table VALUES (1,2), (1,6), (2, 3), (3, 5); +WITH select_data AS ( + SELECT * FROM modify_table +), +raw_data AS ( + DELETE FROM modify_table WHERE id IN (SELECT id FROM select_data WHERE val > 5) RETURNING id, val +) +SELECT * FROM raw_data ORDER BY val; + id | val +----+----- + 1 | 2 + 1 | 6 +(2 rows) + +WITH select_data AS ( + SELECT * FROM modify_table +), +raw_data AS ( + UPDATE modify_table SET val = 0 WHERE id IN (SELECT id FROM select_data WHERE val < 5) RETURNING id, val +) +SELECT * FROM raw_data ORDER BY val; + id | val +----+----- + 2 | 0 +(1 row) + +SELECT * FROM modify_table ORDER BY id, val; + id | val +----+----- + 2 | 0 + 3 | 5 +(2 rows) + +-- Test with joins +WITH select_data AS ( + SELECT * FROM modify_table +), +raw_data AS ( + UPDATE modify_table SET val = 0 WHERE + id IN (SELECT id FROM select_data) AND + val IN (SELECT counter FROM summary_table) + RETURNING id, val +) +SELECT * FROM raw_data ORDER BY val; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Joins are not supported in distributed modifications. +-- Test with replication factor 2 +SET citus.shard_replication_factor to 2; +DROP TABLE modify_table; +CREATE TABLE with_modifying.modify_table (id int, val int); +SELECT create_distributed_table('modify_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO with_modifying.modify_table SELECT user_id, value_1 FROM public.users_table; +DROP TABLE summary_table; +CREATE TABLE with_modifying.summary_table (id int, counter int); +SELECT create_distributed_table('summary_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT COUNT(*) FROM modify_table; + count +------- + 107 +(1 row) + +SELECT * FROM summary_table ORDER BY id, counter; + id | counter +----+--------- +(0 rows) + +WITH raw_data AS ( + DELETE FROM modify_table RETURNING * +) +INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; +SELECT COUNT(*) FROM modify_table; + count +------- + 0 +(1 row) + +SELECT * FROM summary_table ORDER BY id, counter; + id | counter +----+--------- + 1 | 8 + 2 | 19 + 3 | 18 + 4 | 24 + 5 | 27 + 6 | 11 +(6 rows) + DROP SCHEMA with_modifying CASCADE; -NOTICE: drop cascades to 2 other objects -DETAIL: drop cascades to table modify_table -drop cascades to table users_table +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table users_table +drop cascades to table anchor_table +drop cascades to table modify_table +drop cascades to table summary_table diff --git a/src/test/regress/expected/with_prepare.out b/src/test/regress/expected/with_prepare.out index 6efdaf68f..4a1a55129 100644 --- a/src/test/regress/expected/with_prepare.out +++ b/src/test/regress/expected/with_prepare.out @@ -591,5 +591,39 @@ EXECUTE prepared_test_6; (1 row) EXECUTE prepared_partition_column_insert(1); -ERROR: data-modifying statements are not supported in the WITH clauses of distributed queries + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+------+---------+---------+---------+--------- + 1 | | | | | +(1 row) + +EXECUTE prepared_partition_column_insert(2); + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+------+---------+---------+---------+--------- + 2 | | | | | +(1 row) + +EXECUTE prepared_partition_column_insert(3); + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+------+---------+---------+---------+--------- + 3 | | | | | +(1 row) + +EXECUTE prepared_partition_column_insert(4); + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+------+---------+---------+---------+--------- + 4 | | | | | +(1 row) + +EXECUTE prepared_partition_column_insert(5); + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+------+---------+---------+---------+--------- + 5 | | | | | +(1 row) + +EXECUTE prepared_partition_column_insert(6); + user_id | time | value_1 | value_2 | value_3 | value_4 +---------+------+---------+---------+---------+--------- + 6 | | | | | +(1 row) + DEALLOCATE ALL; diff --git a/src/test/regress/output/multi_complex_count_distinct.source b/src/test/regress/output/multi_complex_count_distinct.source index 77d3143fd..4ddf8ff2a 100644 --- a/src/test/regress/output/multi_complex_count_distinct.source +++ b/src/test/regress/output/multi_complex_count_distinct.source @@ -934,11 +934,11 @@ ORDER BY 1 DESC LIMIT 5; user_id | count | count | count ---------+-------+-------+------- - 6 | 10 | 1 | 1 - 5 | 26 | 1 | 1 - 4 | 23 | 1 | 1 - 3 | 17 | 1 | 1 - 2 | 18 | 1 | 1 + 6 | 11 | 1 | 1 + 5 | 27 | 1 | 1 + 4 | 24 | 1 | 1 + 3 | 18 | 1 | 1 + 2 | 19 | 1 | 1 (5 rows) CREATE TYPE test_item AS diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index 5c1f4cc5d..0f0bc9e2a 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -236,12 +236,22 @@ WITH RECURSIVE hierarchy as ( ce.company_id = 2)) SELECT * FROM hierarchy WHERE LEVEL <= 2; --- CTE with queries other than SELECT is not supported +-- Test router modifying CTEs WITH new_article AS ( - INSERT INTO articles_hash VALUES (1, 1, 'arsenous', 9572) RETURNING * + INSERT INTO articles_hash VALUES (1, 1, 'arsenous', 9) RETURNING * ) SELECT * FROM new_article; +WITH update_article AS ( + UPDATE articles_hash SET word_count = 10 WHERE id = 1 AND word_count = 9 RETURNING * +) +SELECT * FROM update_article; + +WITH delete_article AS ( + DELETE FROM articles_hash WHERE id = 1 AND word_count = 10 RETURNING * +) +SELECT * FROM delete_article; + -- Modifying statement in nested CTE case is covered by PostgreSQL itself WITH new_article AS ( WITH nested_cte AS ( diff --git a/src/test/regress/sql/with_modifying.sql b/src/test/regress/sql/with_modifying.sql index 60ced959a..4f06d2d5a 100644 --- a/src/test/regress/sql/with_modifying.sql +++ b/src/test/regress/sql/with_modifying.sql @@ -11,6 +11,12 @@ CREATE TABLE with_modifying.users_table (LIKE public.users_table INCLUDING ALL); SELECT create_distributed_table('with_modifying.users_table', 'user_id'); INSERT INTO with_modifying.users_table SELECT * FROM public.users_table; +CREATE TABLE with_modifying.summary_table (id int, counter int); +SELECT create_distributed_table('summary_table', 'id'); + +CREATE TABLE with_modifying.anchor_table (id int); +SELECT create_reference_table('anchor_table'); + -- basic insert query in CTE WITH basic_insert AS ( INSERT INTO users_table VALUES (1), (2), (3) RETURNING * @@ -18,16 +24,22 @@ WITH basic_insert AS ( SELECT * FROM - basic_insert; + basic_insert +ORDER BY + user_id; -- single-shard UPDATE in CTE WITH basic_update AS ( - UPDATE users_table SET value_3=42 WHERE user_id=0 RETURNING * + UPDATE users_table SET value_3=41 WHERE user_id=1 RETURNING * ) SELECT * FROM - basic_update; + basic_update +ORDER BY + user_id, + time +LIMIT 10; -- multi-shard UPDATE in CTE WITH basic_update AS ( @@ -36,34 +48,50 @@ WITH basic_update AS ( SELECT * FROM - basic_update; + basic_update +ORDER BY + user_id, + time +LIMIT 10; -- single-shard DELETE in CTE WITH basic_delete AS ( - DELETE FROM users_table WHERE user_id=42 RETURNING * + DELETE FROM users_table WHERE user_id=6 RETURNING * ) SELECT * FROM - basic_delete; + basic_delete +ORDER BY + user_id, + time +LIMIT 10; -- multi-shard DELETE in CTE WITH basic_delete AS ( - DELETE FROM users_table WHERE value_2=42 RETURNING * + DELETE FROM users_table WHERE value_3=41 RETURNING * ) SELECT * FROM - basic_delete; + basic_delete +ORDER BY + user_id, + time +LIMIT 10; -- INSERT...SELECT query in CTE WITH copy_table AS ( - INSERT INTO users_table SELECT * FROM users_table RETURNING * + INSERT INTO users_table SELECT * FROM users_table WHERE user_id = 0 OR user_id = 3 RETURNING * ) SELECT * FROM - copy_table; + copy_table +ORDER BY + user_id, + time +LIMIT 10; -- CTEs prior to INSERT...SELECT via the coordinator should work WITH cte AS ( @@ -121,4 +149,271 @@ WITH cte AS ( ) SELECT * FROM cte; +WITH user_data AS ( + SELECT user_id, value_2 FROM users_table +) +INSERT INTO modify_table SELECT * FROM user_data; + +WITH raw_data AS ( + DELETE FROM modify_table RETURNING * +) +INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; + +SELECT * FROM summary_table ORDER BY id; +SELECT COUNT(*) FROM modify_table; + +INSERT INTO modify_table VALUES (1,1), (2, 2), (3,3); + +WITH raw_data AS ( + DELETE FROM modify_table RETURNING * +) +INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; + +SELECT * FROM summary_table ORDER BY id, counter; +SELECT COUNT(*) FROM modify_table; + +WITH insert_reference AS ( + INSERT INTO anchor_table VALUES (1), (2) RETURNING * +) +SELECT id FROM insert_reference ORDER BY id; + +WITH anchor_data AS ( + SELECT * FROM anchor_table +), +raw_data AS ( + DELETE FROM modify_table RETURNING * +), +summary_data AS ( + DELETE FROM summary_table RETURNING * +) +INSERT INTO + summary_table +SELECT id, SUM(counter) FROM ( + (SELECT raw_data.id, COUNT(*) AS counter FROM raw_data, anchor_data + WHERE raw_data.id = anchor_data.id GROUP BY raw_data.id) + UNION ALL + (SELECT * FROM summary_data)) AS all_rows +GROUP BY + id; + +SELECT COUNT(*) FROM modify_table; +SELECT * FROM summary_table ORDER BY id, counter; + +WITH added_data AS ( + INSERT INTO modify_table VALUES (1,2), (1,6), (2,4), (3,6) RETURNING * +), +raw_data AS ( + DELETE FROM modify_table WHERE id = 1 AND val = (SELECT MAX(val) FROM added_data) RETURNING * +) +INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; + +SELECT COUNT(*) FROM modify_table; +SELECT * FROM summary_table ORDER BY id, counter; + +-- Merge rows in the summary_table +WITH summary_data AS ( + DELETE FROM summary_table RETURNING * +) +INSERT INTO summary_table SELECT id, SUM(counter) AS counter FROM summary_data GROUP BY id; + +SELECT * FROM summary_table ORDER BY id, counter; +SELECT * FROM modify_table ORDER BY id, val; +SELECT * FROM anchor_table ORDER BY id; + +INSERT INTO modify_table VALUES (11, 1), (12, 2), (13, 3); +WITH select_data AS ( + SELECT * FROM modify_table +), +raw_data AS ( + DELETE FROM modify_table WHERE id >= (SELECT min(id) FROM select_data WHERE id > 10) RETURNING * +) +INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; + +INSERT INTO modify_table VALUES (21, 1), (22, 2), (23, 3); + +-- read ids from the same table +WITH distinct_ids AS ( + SELECT DISTINCT id FROM modify_table +), +update_data AS ( + UPDATE modify_table SET val = 100 WHERE id > 10 AND + id IN (SELECT * FROM distinct_ids) RETURNING * +) +SELECT count(*) FROM update_data; + +-- read ids from a different table +WITH distinct_ids AS ( + SELECT DISTINCT id FROM summary_table +), +update_data AS ( + UPDATE modify_table SET val = 100 WHERE id > 10 AND + id IN (SELECT * FROM distinct_ids) RETURNING * +) +SELECT count(*) FROM update_data; + +-- test update with generate series +UPDATE modify_table SET val = 200 WHERE id > 10 AND + id IN (SELECT 2*s FROM generate_series(1,20) s); + +-- test update with generate series in CTE +WITH update_data AS ( + UPDATE modify_table SET val = 300 WHERE id > 10 AND + id IN (SELECT 3*s FROM generate_series(1,20) s) RETURNING * +) +SELECT COUNT(*) FROM update_data; + +WITH delete_rows AS ( + DELETE FROM modify_table WHERE id > 10 RETURNING * +) +SELECT * FROM delete_rows ORDER BY id, val; + +WITH delete_rows AS ( + DELETE FROM summary_table WHERE id > 10 RETURNING * +) +SELECT * FROM delete_rows ORDER BY id, counter; + +-- Check modifiying CTEs inside a transaction +BEGIN; + +WITH raw_data AS ( + DELETE FROM modify_table RETURNING * +) +INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; + +WITH insert_reference AS ( + INSERT INTO anchor_table VALUES (3), (4) RETURNING * +) +SELECT id FROM insert_reference ORDER BY id; + +SELECT * FROM summary_table ORDER BY id, counter; +SELECT * FROM modify_table ORDER BY id, val; +SELECT * FROM anchor_table ORDER BY id; + +ROLLBACK; + +SELECT * FROM summary_table ORDER BY id, counter; +SELECT * FROM modify_table ORDER BY id, val; +SELECT * FROM anchor_table ORDER BY id; + +-- Test delete with subqueries +WITH deleted_rows AS ( + DELETE FROM modify_table WHERE id IN (SELECT id FROM modify_table WHERE id = 1) RETURNING * +) +SELECT * FROM deleted_rows; + +WITH deleted_rows AS ( + DELETE FROM modify_table WHERE id IN (SELECT id FROM modify_table WHERE val = 4) RETURNING * +) +SELECT * FROM deleted_rows; + +WITH select_rows AS ( + SELECT id FROM modify_table WHERE val = 4 +), +deleted_rows AS ( + DELETE FROM modify_table WHERE id IN (SELECT id FROM select_rows) RETURNING * +) +SELECT * FROM deleted_rows; + +WITH deleted_rows AS ( + DELETE FROM modify_table WHERE val IN (SELECT val FROM modify_table WHERE id = 3) RETURNING * +) +SELECT * FROM deleted_rows; + +WITH select_rows AS ( + SELECT val FROM modify_table WHERE id = 3 +), +deleted_rows AS ( + DELETE FROM modify_table WHERE val IN (SELECT val FROM select_rows) RETURNING * +) +SELECT * FROM deleted_rows; + +WITH deleted_rows AS ( + DELETE FROM modify_table WHERE ctid IN (SELECT ctid FROM modify_table WHERE id = 1) RETURNING * +) +SELECT * FROM deleted_rows; + +WITH select_rows AS ( + SELECT ctid FROM modify_table WHERE id = 1 +), +deleted_rows AS ( + DELETE FROM modify_table WHERE ctid IN (SELECT ctid FROM select_rows) RETURNING * +) +SELECT * FROM deleted_rows; + +WITH added_data AS ( + INSERT INTO modify_table VALUES (1,2), (1,6) RETURNING * +), +select_data AS ( + SELECT * FROM added_data WHERE id = 1 +), +raw_data AS ( + DELETE FROM modify_table WHERE id = 1 AND ctid IN (SELECT ctid FROM select_data) RETURNING val +) +SELECT * FROM raw_data ORDER BY val; + +WITH added_data AS ( + INSERT INTO modify_table VALUES (1, trunc(10 * random())), (1, trunc(random())) RETURNING * +), +select_data AS ( + SELECT val, now() FROM added_data WHERE id = 1 +), +raw_data AS ( + DELETE FROM modify_table WHERE id = 1 AND val IN (SELECT val FROM select_data) RETURNING * +) +SELECT COUNT(*) FROM raw_data; + +INSERT INTO modify_table VALUES (1,2), (1,6), (2, 3), (3, 5); +WITH select_data AS ( + SELECT * FROM modify_table +), +raw_data AS ( + DELETE FROM modify_table WHERE id IN (SELECT id FROM select_data WHERE val > 5) RETURNING id, val +) +SELECT * FROM raw_data ORDER BY val; + +WITH select_data AS ( + SELECT * FROM modify_table +), +raw_data AS ( + UPDATE modify_table SET val = 0 WHERE id IN (SELECT id FROM select_data WHERE val < 5) RETURNING id, val +) +SELECT * FROM raw_data ORDER BY val; + +SELECT * FROM modify_table ORDER BY id, val; + +-- Test with joins +WITH select_data AS ( + SELECT * FROM modify_table +), +raw_data AS ( + UPDATE modify_table SET val = 0 WHERE + id IN (SELECT id FROM select_data) AND + val IN (SELECT counter FROM summary_table) + RETURNING id, val +) +SELECT * FROM raw_data ORDER BY val; + +-- Test with replication factor 2 +SET citus.shard_replication_factor to 2; + +DROP TABLE modify_table; +CREATE TABLE with_modifying.modify_table (id int, val int); +SELECT create_distributed_table('modify_table', 'id'); +INSERT INTO with_modifying.modify_table SELECT user_id, value_1 FROM public.users_table; + +DROP TABLE summary_table; +CREATE TABLE with_modifying.summary_table (id int, counter int); +SELECT create_distributed_table('summary_table', 'id'); + +SELECT COUNT(*) FROM modify_table; +SELECT * FROM summary_table ORDER BY id, counter; + +WITH raw_data AS ( + DELETE FROM modify_table RETURNING * +) +INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; + +SELECT COUNT(*) FROM modify_table; +SELECT * FROM summary_table ORDER BY id, counter; + DROP SCHEMA with_modifying CASCADE; diff --git a/src/test/regress/sql/with_prepare.sql b/src/test/regress/sql/with_prepare.sql index ed4a72708..573d6a36f 100644 --- a/src/test/regress/sql/with_prepare.sql +++ b/src/test/regress/sql/with_prepare.sql @@ -198,7 +198,6 @@ FROM WHERE events_user_id IN (SELECT user_id FROM users_table); - EXECUTE prepared_test_1; EXECUTE prepared_test_1; EXECUTE prepared_test_1; @@ -242,5 +241,10 @@ EXECUTE prepared_test_6; EXECUTE prepared_test_6; EXECUTE prepared_partition_column_insert(1); +EXECUTE prepared_partition_column_insert(2); +EXECUTE prepared_partition_column_insert(3); +EXECUTE prepared_partition_column_insert(4); +EXECUTE prepared_partition_column_insert(5); +EXECUTE prepared_partition_column_insert(6); DEALLOCATE ALL;