diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 20be2fcb1..5face47b8 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -487,6 +487,8 @@ RouterSequentialModifyExecScan(CustomScanState *node) BeginOrContinueCoordinatedTransaction(); } + ExecuteSubPlans(distributedPlan); + foreach(taskCell, taskList) { Task *task = (Task *) lfirst(taskCell); @@ -522,6 +524,7 @@ RouterMultiModifyExecScan(CustomScanState *node) bool hasReturning = distributedPlan->hasReturning; bool isModificationQuery = true; + ExecuteSubPlans(distributedPlan); ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning); scanState->finishedRemoteScan = true; diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 371d769a2..ce006ebcd 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -50,17 +50,16 @@ static uint64 NextPlanId = 1; /* local function forward declarations */ static bool NeedsDistributedPlanningWalker(Node *node, void *context); -static PlannedStmt * CreateDistributedPlan(uint64 planId, PlannedStmt *localPlan, - Query *originalQuery, Query *query, - ParamListInfo boundParams, - PlannerRestrictionContext * - plannerRestrictionContext); -static DistributedPlan * CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, - Query *query, - ParamListInfo boundParams, - bool hasUnresolvedParams, - PlannerRestrictionContext * - plannerRestrictionContext); +static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, + Query *originalQuery, Query *query, + ParamListInfo boundParams, + PlannerRestrictionContext * + plannerRestrictionContext); +static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery, + Query *query, ParamListInfo boundParams, + bool hasUnresolvedParams, + PlannerRestrictionContext * + plannerRestrictionContext); static Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams); static void AssignRTEIdentities(Query *queryTree); @@ -146,8 +145,8 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) { uint64 planId = NextPlanId++; - result = CreateDistributedPlan(planId, result, originalQuery, parse, - boundParams, plannerRestrictionContext); + result = CreateDistributedPlannedStmt(planId, result, originalQuery, parse, + boundParams, plannerRestrictionContext); } } PG_CATCH(); @@ -467,13 +466,13 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan) /* - * CreateDistributedPlan encapsulates the logic needed to transform a particular - * query into a distributed plan. + * CreateDistributedPlannedStmt encapsulates the logic needed to transform a particular + * query into a distributed plan that is encapsulated by a PlannedStmt. */ static PlannedStmt * -CreateDistributedPlan(uint64 planId, PlannedStmt *localPlan, Query *originalQuery, - Query *query, ParamListInfo boundParams, - PlannerRestrictionContext *plannerRestrictionContext) +CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *originalQuery, + Query *query, ParamListInfo boundParams, + PlannerRestrictionContext *plannerRestrictionContext) { DistributedPlan *distributedPlan = NULL; PlannedStmt *resultPlan = NULL; @@ -489,31 +488,9 @@ CreateDistributedPlan(uint64 planId, PlannedStmt *localPlan, Query *originalQuer plannerRestrictionContext->joinRestrictionContext = RemoveDuplicateJoinRestrictions(joinRestrictionContext); - if (IsModifyCommand(query)) - { - EnsureModificationsCanRun(); - - if (InsertSelectIntoDistributedTable(originalQuery)) - { - distributedPlan = - CreateInsertSelectPlan(originalQuery, plannerRestrictionContext); - } - else - { - /* modifications are always routed through the same planner/executor */ - distributedPlan = - CreateModifyPlan(originalQuery, query, plannerRestrictionContext); - } - - Assert(distributedPlan); - } - else - { - distributedPlan = - CreateDistributedSelectPlan(planId, originalQuery, query, boundParams, - hasUnresolvedParams, - plannerRestrictionContext); - } + distributedPlan = + CreateDistributedPlan(planId, originalQuery, query, boundParams, + hasUnresolvedParams, plannerRestrictionContext); /* * If no plan was generated, prepare a generic error to be emitted. @@ -580,7 +557,7 @@ CreateDistributedPlan(uint64 planId, PlannedStmt *localPlan, Query *originalQuer /* - * CreateDistributedSelectPlan generates a distributed plan for a SELECT query. + * CreateDistributedPlan generates a distributed plan for a query. * It goes through 3 steps: * * 1. Try router planner @@ -589,39 +566,71 @@ CreateDistributedPlan(uint64 planId, PlannedStmt *localPlan, Query *originalQuer * 3. Logical planner */ static DistributedPlan * -CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query, - ParamListInfo boundParams, bool hasUnresolvedParams, - PlannerRestrictionContext *plannerRestrictionContext) +CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamListInfo + boundParams, bool hasUnresolvedParams, + PlannerRestrictionContext *plannerRestrictionContext) { DistributedPlan *distributedPlan = NULL; MultiTreeRoot *logicalPlan = NULL; List *subPlanList = NIL; + bool hasCtes = originalQuery->cteList != NIL; - /* - * For select queries we, if router executor is enabled, first try to - * plan the query as a router query. If not supported, otherwise try - * the full blown plan/optimize/physical planing process needed to - * produce distributed query plans. - */ - - distributedPlan = CreateRouterPlan(originalQuery, query, - plannerRestrictionContext); - if (distributedPlan != NULL) + if (IsModifyCommand(originalQuery)) { + EnsureModificationsCanRun(); + + if (InsertSelectIntoDistributedTable(originalQuery)) + { + distributedPlan = + CreateInsertSelectPlan(originalQuery, plannerRestrictionContext); + } + else + { + /* modifications are always routed through the same planner/executor */ + distributedPlan = + CreateModifyPlan(originalQuery, query, plannerRestrictionContext); + } + + /* the functions above always return a plan, possibly with an error */ + Assert(distributedPlan); + if (distributedPlan->planningError == NULL) { - /* successfully created a router plan */ return distributedPlan; } else { - /* - * For debugging it's useful to display why query was not - * router plannable. - */ RaiseDeferredError(distributedPlan->planningError, DEBUG1); } } + else + { + /* + * For select queries we, if router executor is enabled, first try to + * plan the query as a router query. If not supported, otherwise try + * the full blown plan/optimize/physical planing process needed to + * produce distributed query plans. + */ + + distributedPlan = CreateRouterPlan(originalQuery, query, + plannerRestrictionContext); + if (distributedPlan != NULL) + { + if (distributedPlan->planningError == NULL) + { + /* successfully created a router plan */ + return distributedPlan; + } + else + { + /* + * For debugging it's useful to display why query was not + * router plannable. + */ + RaiseDeferredError(distributedPlan->planningError, DEBUG1); + } + } + } if (hasUnresolvedParams) { @@ -662,8 +671,13 @@ CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query, * We could simplify this code if the logical planner was capable of dealing * with an original query. In that case, we would only have to filter the * planner restriction context. + * + * Note that we check both for subplans and whether the query had CTEs + * prior to calling GenerateSubplansForSubqueriesAndCTEs. If none of + * the CTEs are referenced then there are no subplans, but we still want + * to retry the router planner. */ - if (list_length(subPlanList) > 0) + if (list_length(subPlanList) > 0 || hasCtes) { Query *newQuery = copyObject(originalQuery); bool setPartitionedTablesInherited = false; @@ -693,14 +707,24 @@ CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query, /* overwrite the old transformed query with the new transformed query */ memcpy(query, newQuery, sizeof(Query)); - /* recurse into CreateDistributedSelectPlan with subqueries/CTEs replaced */ - distributedPlan = CreateDistributedSelectPlan(planId, originalQuery, query, NULL, - false, plannerRestrictionContext); + /* recurse into CreateDistributedPlan with subqueries/CTEs replaced */ + distributedPlan = CreateDistributedPlan(planId, originalQuery, query, NULL, false, + plannerRestrictionContext); distributedPlan->subPlanList = subPlanList; return distributedPlan; } + /* + * DML command returns a planning error, even after recursive planning. The + * logical planner cannot handle DML commands so return the plan with the + * error. + */ + if (IsModifyCommand(originalQuery)) + { + return distributedPlan; + } + /* * CTEs are stripped from the original query by RecursivelyPlanSubqueriesAndCTEs. * If we get here and there are still CTEs that means that none of the CTEs are diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index de312022d..46fb12e47 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -332,7 +332,7 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, } } - if (contain_volatile_functions((Node *) queryTree)) + if (FindNodeCheck((Node *) queryTree, CitusIsVolatileFunction)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "volatile functions are not allowed in distributed " diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 9075401cf..5142cde60 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -594,14 +594,21 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer if (rangeTableEntry->rtekind == RTE_RELATION) { - /* - * We are sure that the table should be distributed, therefore no need to - * call IsDistributedTable() here and DistributedTableCacheEntry will - * error out if the table is not distributed - */ - DistTableCacheEntry *distTableEntry = - DistributedTableCacheEntry(rangeTableEntry->relid); + DistTableCacheEntry *distTableEntry = NULL; + if (!IsDistributedTable(rangeTableEntry->relid)) + { + StringInfo errorMessage = makeStringInfo(); + char *relationName = get_rel_name(rangeTableEntry->relid); + + appendStringInfo(errorMessage, "relation %s is not distributed", + relationName); + + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + errorMessage->data, NULL, NULL); + } + + distTableEntry = DistributedTableCacheEntry(rangeTableEntry->relid); if (distTableEntry->partitionMethod == DISTRIBUTE_BY_NONE) { referenceTable = true; diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index c4d4425ad..cc444c206 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -699,11 +699,11 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext) NULL, NULL); } - if (cte->cterefcount == 0) + if (cte->cterefcount == 0 && subquery->commandType == CMD_SELECT) { /* - * CTEs that aren't referenced aren't executed in postgres. We - * don't need to generate a subplan for it and can take the rest + * SELECT CTEs that aren't referenced aren't executed in postgres. + * We don't need to generate a subplan for it and can take the rest * of this iteration off. */ continue; diff --git a/src/test/regress/expected/cte_nested_modification.out b/src/test/regress/expected/cte_nested_modification.out new file mode 100644 index 000000000..f5848e182 --- /dev/null +++ b/src/test/regress/expected/cte_nested_modification.out @@ -0,0 +1,151 @@ +CREATE SCHEMA cte_nested_modifications; +SET search_path TO cte_nested_modifications, public; +CREATE TABLE tt1(id int, value_1 int); +INSERT INTO tt1 VALUES(1,2),(2,3),(3,4); +SELECT create_distributed_table('tt1','id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE tt2(id int, value_1 int); +INSERT INTO tt2 VALUES(3,3),(4,4),(5,5); +SELECT create_distributed_table('tt2','id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE tt3(id int, json_val json); +INSERT INTO tt3 VALUES(1, '{"prod_name":"name_1", "qty":"6"}'), + (2, '{"prod_name":"name_2", "qty":"4"}'), + (3, '{"prod_name":"name_3", "qty":"2"}'); +-- DELETE within CTE and use it from UPDATE +BEGIN; +WITH cte_1 AS ( + WITH cte_2 AS ( + SELECT id as cte2_id + FROM tt1 + WHERE value_1 >= 2 + ) + + DELETE FROM tt2 + USING cte_2 + WHERE tt2.id = cte_2.cte2_id + RETURNING cte2_id - 1 as id +) +UPDATE tt1 +SET value_1 = abs(2 + 3.5) +FROM cte_1 +WHERE cte_1.id = tt1.id; +SELECT * FROM tt1 ORDER BY id; + id | value_1 +----+--------- + 1 | 2 + 2 | 6 + 3 | 4 +(3 rows) + +ROLLBACK; +-- Similar to test above, now use the CTE in the SET part of the UPDATE +BEGIN; +WITH cte_1 AS ( + WITH cte_2 AS ( + SELECT id as cte2_id + FROM tt1 + WHERE value_1 >= 2 + ) + + DELETE FROM tt2 + USING cte_2 + WHERE tt2.id = cte_2.cte2_id + RETURNING cte2_id as id +) +UPDATE tt1 +SET value_1 = (SELECT max(id) + abs(2 + 3.5) FROM cte_1); +SELECT * FROM tt1 ORDER BY id; + id | value_1 +----+--------- + 1 | 9 + 2 | 9 + 3 | 9 +(3 rows) + +ROLLBACK; +-- Use alias in the definition of CTE, instead of in the RETURNING +BEGIN; +WITH cte_1(id) AS ( + WITH cte_2 AS ( + SELECT id as cte2_id + FROM tt1 + WHERE value_1 >= 2 + ) + + DELETE FROM tt2 + USING cte_2 + WHERE tt2.id = cte_2.cte2_id + RETURNING cte2_id +) +UPDATE tt1 +SET value_1 = (SELECT max(id) + abs(2 + 3.5) FROM cte_1); +SELECT * FROM tt1 ORDER BY id; + id | value_1 +----+--------- + 1 | 9 + 2 | 9 + 3 | 9 +(3 rows) + +ROLLBACK; +-- Update within CTE and use it from Delete +BEGIN; +WITH cte_1 AS ( + WITH cte_2 AS ( + SELECT id as cte2_id + FROM tt1 + WHERE value_1 >= 2 + ) + + UPDATE tt2 + SET value_1 = 10 + FROM cte_2 + WHERE id = cte2_id + RETURNING id, value_1 +) +DELETE FROM tt1 +USING cte_1 +WHERE tt1.id < cte_1.id; +SELECT * FROM tt1 ORDER BY id; + id | value_1 +----+--------- + 3 | 4 +(1 row) + +ROLLBACK; +-- Similar to test above, but use json column +BEGIN; +WITH cte_1 AS ( + WITH cte_2 AS ( + SELECT * FROM tt3 + ) + + UPDATE tt2 + SET value_1 = (SELECT max((json_val->>'qty')::int) FROM cte_2) + RETURNING id, value_1 +) +DELETE FROM tt1 +USING cte_1 +WHERE tt1.id < cte_1.id; +SELECT * FROM tt1 ORDER BY id; + id | value_1 +----+--------- +(0 rows) + +ROLLBACK; +DROP SCHEMA cte_nested_modifications CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table tt1 +drop cascades to table tt2 +drop cascades to table tt3 diff --git a/src/test/regress/expected/cte_prepared_modify.out b/src/test/regress/expected/cte_prepared_modify.out new file mode 100644 index 000000000..390a8d72d --- /dev/null +++ b/src/test/regress/expected/cte_prepared_modify.out @@ -0,0 +1,67 @@ +CREATE SCHEMA cte_prepared_modify; +SET search_path TO cte_prepared_modify, public; +CREATE TABLE tt1(id int, value_1 int); +INSERT INTO tt1 VALUES(1,2),(2,3),(3,4); +SELECT create_distributed_table('tt1','id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE tt2(id int, value_1 int); +INSERT INTO tt2 VALUES(3,3),(4,4),(5,5); +SELECT create_distributed_table('tt2','id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +-- Test with prepared statements (parameter used by SET) +PREPARE prepared_test(integer) AS +WITH cte_1 AS( + SELECT * FROM tt1 WHERE id >= 2 +) +UPDATE tt2 +SET value_1 = $1 +FROM cte_1 +WHERE tt2.id = cte_1.id; +-- Test with prepared statements (parameter used by WHERE on partition column) +PREPARE prepared_test_2(integer) AS +WITH cte_1 AS( + SELECT * FROM tt1 WHERE id >= 2 +) +UPDATE tt2 +SET value_1 = (SELECT max(id) FROM cte_1) +WHERE tt2.id = $1; +-- Test with prepared statements (parameter used by WHERE on non-partition column) +PREPARE prepared_test_3(integer) AS +WITH cte_1 AS( + SELECT * FROM tt1 WHERE id >= 2 +) +UPDATE tt2 +SET value_1 = (SELECT max(id) FROM cte_1) +WHERE tt2.value_1 = $1; +EXECUTE prepared_test(1); +EXECUTE prepared_test(2); +EXECUTE prepared_test(3); +EXECUTE prepared_test(4); +EXECUTE prepared_test(5); +EXECUTE prepared_test(6); +EXECUTE prepared_test(1); +EXECUTE prepared_test(2); +EXECUTE prepared_test(3); +EXECUTE prepared_test(4); +EXECUTE prepared_test(5); +EXECUTE prepared_test(6); +EXECUTE prepared_test_3(1); +EXECUTE prepared_test_3(2); +EXECUTE prepared_test_3(3); +EXECUTE prepared_test_3(4); +EXECUTE prepared_test_3(5); +EXECUTE prepared_test_3(6); +DROP SCHEMA cte_prepared_modify CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table tt1 +drop cascades to table tt2 diff --git a/src/test/regress/expected/dml_recursive.out b/src/test/regress/expected/dml_recursive.out new file mode 100644 index 000000000..ca02f9ef5 --- /dev/null +++ b/src/test/regress/expected/dml_recursive.out @@ -0,0 +1,359 @@ +CREATE SCHEMA recursive_dml_queries; +SET search_path TO recursive_dml_queries, public; +SET citus.next_shard_id TO 2370000; +CREATE TABLE recursive_dml_queries.distributed_table (tenant_id text, dept int, info jsonb); +SELECT create_distributed_table('distributed_table', 'tenant_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE recursive_dml_queries.second_distributed_table (tenant_id text, dept int, info jsonb); +SELECT create_distributed_table('second_distributed_table', 'tenant_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE recursive_dml_queries.reference_table (id text, name text); +SELECT create_reference_table('reference_table'); + create_reference_table +------------------------ + +(1 row) + +CREATE TABLE recursive_dml_queries.local_table (id text, name text); +INSERT INTO distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i; +INSERT INTO second_distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i; +INSERT INTO reference_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i; +INSERT INTO local_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i; +CREATE VIEW tenant_ids AS + SELECT + tenant_id, name + FROM + distributed_table, reference_table + WHERE + distributed_table.dept::text = reference_table.id + ORDER BY 2 DESC, 1 DESC; +SET client_min_messages TO DEBUG1; +-- the subquery foo is recursively planned +UPDATE + reference_table +SET + name = 'new_' || name +FROM +( + SELECT + avg(second_distributed_table.tenant_id::int) as avg_tenant_id + FROM + second_distributed_table +) as foo +WHERE + foo.avg_tenant_id::int::text = reference_table.id +RETURNING + reference_table.name; +DEBUG: only reference tables may be queried when targeting a reference table with multi shard UPDATE/DELETE queries with multiple tables +DEBUG: generating subplan 4_1 for subquery SELECT avg((tenant_id)::integer) AS avg_tenant_id FROM recursive_dml_queries.second_distributed_table +DEBUG: Plan 4 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.reference_table SET name = ('new_'::text || reference_table.name) FROM (SELECT intermediate_result.avg_tenant_id FROM read_intermediate_result('4_1'::text, 'binary'::citus_copy_format) intermediate_result(avg_tenant_id numeric)) foo WHERE (((foo.avg_tenant_id)::integer)::text = reference_table.id) RETURNING reference_table.name + name +------------- + new_user_50 +(1 row) + +-- the subquery foo is recursively planned +-- but note that the subquery foo itself is pushdownable +UPDATE + second_distributed_table +SET + dept = foo.max_dept * 2 +FROM +( + SELECT DISTINCT ON (tenant_id) tenant_id, max(dept) as max_dept FROM + ( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + ) foo_inner + GROUP BY + tenant_id + ORDER BY 1 DESC +) as foo +WHERE + foo.tenant_id != second_distributed_table.tenant_id + AND second_distributed_table.dept IN (2) +RETURNING + second_distributed_table.tenant_id, second_distributed_table.dept; +DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +DEBUG: generating subplan 6_1 for subquery SELECT DISTINCT ON (tenant_id) tenant_id, max(dept) AS max_dept FROM (SELECT second_distributed_table.dept, second_distributed_table.tenant_id FROM recursive_dml_queries.second_distributed_table, recursive_dml_queries.distributed_table WHERE (distributed_table.tenant_id = second_distributed_table.tenant_id)) foo_inner GROUP BY tenant_id ORDER BY tenant_id DESC +DEBUG: Plan 6 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.second_distributed_table SET dept = (foo.max_dept * 2) FROM (SELECT intermediate_result.tenant_id, intermediate_result.max_dept FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, max_dept integer)) foo WHERE ((foo.tenant_id <> second_distributed_table.tenant_id) AND (second_distributed_table.dept = 2)) RETURNING second_distributed_table.tenant_id, second_distributed_table.dept + tenant_id | dept +-----------+------ + 52 | 18 + 72 | 18 + 82 | 18 + 2 | 18 + 12 | 18 + 22 | 18 + 62 | 18 + 92 | 18 + 32 | 18 + 42 | 18 +(10 rows) + +-- the subquery foo is recursively planned +-- and foo itself is a non colocated subquery and recursively planned +UPDATE + second_distributed_table +SET + dept = foo.tenant_id::int / 4 +FROM +( + SELECT DISTINCT foo_inner_1.tenant_id FROM + ( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (3,4) + ) foo_inner_1, + ( + SELECT + second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (4,5) + )foo_inner_2 + WHERE foo_inner_1.tenant_id != foo_inner_2.tenant_id +) as foo +WHERE + foo.tenant_id != second_distributed_table.tenant_id + AND second_distributed_table.dept IN (3); +DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +DEBUG: generating subplan 8_1 for subquery SELECT second_distributed_table.tenant_id FROM recursive_dml_queries.second_distributed_table, recursive_dml_queries.distributed_table WHERE ((distributed_table.tenant_id = second_distributed_table.tenant_id) AND (second_distributed_table.dept = ANY (ARRAY[4, 5]))) +DEBUG: generating subplan 8_2 for subquery SELECT DISTINCT foo_inner_1.tenant_id FROM (SELECT second_distributed_table.dept, second_distributed_table.tenant_id FROM recursive_dml_queries.second_distributed_table, recursive_dml_queries.distributed_table WHERE ((distributed_table.tenant_id = second_distributed_table.tenant_id) AND (second_distributed_table.dept = ANY (ARRAY[3, 4])))) foo_inner_1, (SELECT intermediate_result.tenant_id FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) foo_inner_2 WHERE (foo_inner_1.tenant_id <> foo_inner_2.tenant_id) +DEBUG: Plan 8 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.second_distributed_table SET dept = ((foo.tenant_id)::integer / 4) FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('8_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) foo WHERE ((foo.tenant_id <> second_distributed_table.tenant_id) AND (second_distributed_table.dept = 3)) +-- we currently do not allow local tables in modification queries +UPDATE + distributed_table +SET + dept = avg_tenant_id::int +FROM +( + SELECT + avg(local_table.id::int) as avg_tenant_id + FROM + local_table +) as foo +WHERE + foo.avg_tenant_id::int::text = distributed_table.tenant_id +RETURNING + distributed_table.*; +DEBUG: relation local_table is not distributed +DEBUG: generating subplan 11_1 for subquery SELECT avg((id)::integer) AS avg_tenant_id FROM recursive_dml_queries.local_table +DEBUG: Plan 11 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = (foo.avg_tenant_id)::integer FROM (SELECT intermediate_result.avg_tenant_id FROM read_intermediate_result('11_1'::text, 'binary'::citus_copy_format) intermediate_result(avg_tenant_id numeric)) foo WHERE (((foo.avg_tenant_id)::integer)::text = distributed_table.tenant_id) RETURNING distributed_table.tenant_id, distributed_table.dept, distributed_table.info + tenant_id | dept | info +-----------+------+------------------------ + 50 | 50 | {"f1": 50, "f2": 2500} +(1 row) + +-- we currently do not allow views in modification queries +UPDATE + distributed_table +SET + dept = avg_tenant_id::int +FROM +( + SELECT + avg(tenant_id::int) as avg_tenant_id + FROM + tenant_ids +) as foo +WHERE + foo.avg_tenant_id::int::text = distributed_table.tenant_id +RETURNING + distributed_table.*; +DEBUG: relation tenant_ids is not distributed +DEBUG: generating subplan 12_1 for subquery SELECT avg((tenant_id)::integer) AS avg_tenant_id FROM (SELECT distributed_table.tenant_id, reference_table.name FROM recursive_dml_queries.distributed_table, recursive_dml_queries.reference_table WHERE ((distributed_table.dept)::text = reference_table.id) ORDER BY reference_table.name DESC, distributed_table.tenant_id DESC) tenant_ids +DEBUG: Plan 12 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = (foo.avg_tenant_id)::integer FROM (SELECT intermediate_result.avg_tenant_id FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(avg_tenant_id numeric)) foo WHERE (((foo.avg_tenant_id)::integer)::text = distributed_table.tenant_id) RETURNING distributed_table.tenant_id, distributed_table.dept, distributed_table.info + tenant_id | dept | info +-----------+------+------------------------ + 50 | 50 | {"f1": 50, "f2": 2500} +(1 row) + +-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be +-- recursively planned +UPDATE + second_distributed_table +SET + dept = foo.tenant_id::int / 4 +FROM +( + SELECT DISTINCT foo_inner_1.tenant_id FROM + ( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (3,4) + ) + foo_inner_1 JOIN LATERAL + ( + SELECT + second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND foo_inner_1.dept = second_distributed_table.dept + AND + second_distributed_table.dept IN (4,5) + ) foo_inner_2 + ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id) + ) as foo +RETURNING *; +DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +-- again a corrolated subquery +-- this time distribution key eq. exists +-- however recursive planning is prevented due to correlated subqueries +UPDATE + second_distributed_table +SET + dept = foo.tenant_id::int / 4 +FROM +( + SELECT baz.tenant_id FROM + ( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table as d1 + WHERE + d1.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (3,4) + AND + second_distributed_table.tenant_id IN + ( + SELECT s2.tenant_id + FROM second_distributed_table as s2 + GROUP BY d1.tenant_id, s2.tenant_id + ) + ) as baz + ) as foo WHERE second_distributed_table.tenant_id = foo.tenant_id +RETURNING *; +DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +-- we don't support subquerues/CTEs inside VALUES +INSERT INTO + second_distributed_table (tenant_id, dept) +VALUES ('3', (WITH vals AS (SELECT 3) select * from vals)); +DEBUG: subqueries are not supported within INSERT queries +HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. +DEBUG: generating subplan 18_1 for CTE vals: SELECT 3 +DEBUG: Plan 18 query after replacing subqueries and CTEs: INSERT INTO recursive_dml_queries.second_distributed_table (tenant_id, dept) VALUES ('3'::text, (SELECT vals."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('18_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) vals)) +DEBUG: subqueries are not supported within INSERT queries +HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. +ERROR: subqueries are not supported within INSERT queries +HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. +INSERT INTO + second_distributed_table (tenant_id, dept) +VALUES ('3', (SELECT 3)); +DEBUG: subqueries are not supported within INSERT queries +HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. +ERROR: subqueries are not supported within INSERT queries +HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. +-- DML with an unreferenced SELECT CTE +WITH cte_1 AS ( + WITH cte_2 AS ( + SELECT tenant_id as cte2_id + FROM second_distributed_table + WHERE dept >= 2 + ) + + UPDATE distributed_table + SET dept = 10 + RETURNING * +) +UPDATE distributed_table +SET dept = 5 +FROM cte_1 +WHERE distributed_table.tenant_id < cte_1.tenant_id; +DEBUG: common table expressions are not supported in distributed modifications +DEBUG: generating subplan 20_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept >= 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info +DEBUG: common table expressions are not supported in distributed modifications +DEBUG: Plan 20 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id < cte_1.tenant_id) +EXPLAIN (COSTS FALSE) WITH cte_1 AS ( + WITH cte_2 AS ( + SELECT tenant_id as cte2_id + FROM second_distributed_table + WHERE dept >= 2 + ) + + UPDATE distributed_table + SET dept = 10 + RETURNING * +) +UPDATE distributed_table +SET dept = 5 +FROM cte_1 +WHERE distributed_table.tenant_id < cte_1.tenant_id; +DEBUG: common table expressions are not supported in distributed modifications +DEBUG: generating subplan 22_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept >= 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info +DEBUG: common table expressions are not supported in distributed modifications +DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('22_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id < cte_1.tenant_id) + QUERY PLAN +------------------------------------------------------------------------------------------------ + Custom Scan (Citus Router) + -> Distributed Subplan 22_1 + -> Custom Scan (Citus Router) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57638 dbname=regression + -> Update on distributed_table_2370000 distributed_table + -> Seq Scan on distributed_table_2370000 distributed_table + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57638 dbname=regression + -> Update on distributed_table_2370000 distributed_table + -> Nested Loop + Join Filter: (distributed_table.tenant_id < intermediate_result.tenant_id) + -> Function Scan on read_intermediate_result intermediate_result + -> Materialize + -> Seq Scan on distributed_table_2370000 distributed_table +(19 rows) + +-- we don't support updating local table with a join with +-- distributed tables +UPDATE + local_table +SET + id = 'citus_test' +FROM + distributed_table +WHERE + distributed_table.tenant_id = local_table.id; +DEBUG: relation local_table is not distributed +ERROR: relation local_table is not distributed +RESET client_min_messages; +DROP SCHEMA recursive_dml_queries CASCADE; +NOTICE: drop cascades to 5 other objects +DETAIL: drop cascades to table distributed_table +drop cascades to table second_distributed_table +drop cascades to table reference_table +drop cascades to table local_table +drop cascades to view tenant_ids diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 64c5683a4..6e86f6d51 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1775,6 +1775,25 @@ SELECT count(*) FROM raw_events_second; 38 (1 row) +-- intermediate results (CTEs) should be allowed when doing INSERT...SELECT within a CTE +WITH series AS ( + SELECT s AS val FROM generate_series(60,70) s +), +inserts AS ( + INSERT INTO raw_events_second (user_id) + SELECT + user_id + FROM + raw_events_first JOIN series ON (value_1 = val) + RETURNING + NULL +) +SELECT count(*) FROM inserts; + count +------- + 2 +(1 row) + -- we need this in our next test truncate raw_events_first; SET client_min_messages TO DEBUG2; @@ -2176,12 +2195,14 @@ INSERT INTO raw_events_first (user_id, value_1) SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s ON CONFLICT DO NOTHING; DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: ON CONFLICT is not supported in INSERT ... SELECT via coordinator ERROR: ON CONFLICT is not supported in INSERT ... SELECT via coordinator -- RETURNING is unsupported INSERT INTO raw_events_first (user_id, value_1) SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s RETURNING *; DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: RETURNING is not supported in INSERT ... SELECT via coordinator ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator RESET client_min_messages; -- INSERT ... SELECT and multi-shard SELECT in the same transaction is supported diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index a2d79f321..2ba2565c2 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -1,5 +1,6 @@ SET citus.shard_count TO 32; SET citus.next_shard_id TO 750000; +SET citus.next_placement_id TO 750000; -- =================================================================== -- test end-to-end modification functionality -- =================================================================== @@ -241,13 +242,9 @@ SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 70000 AND 90000; 3 (1 row) --- Who says that? :) --- INSERT ... SELECT ... FROM commands are unsupported --- INSERT INTO limit_orders SELECT * FROM limit_orders; --- commands containing a CTE are unsupported -WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *) -INSERT INTO limit_orders DEFAULT VALUES; -ERROR: common table expressions are not supported in distributed modifications +-- commands containing a CTE are supported +WITH deleted_orders AS (DELETE FROM limit_orders WHERE id < 0 RETURNING *) +INSERT INTO limit_orders SELECT * FROM deleted_orders; -- test simple DELETE INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); SELECT COUNT(*) FROM limit_orders WHERE id = 246; @@ -291,16 +288,15 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 246; 0 (1 row) --- commands with a USING clause are unsupported +-- commands with a USING clause are supported CREATE TABLE bidders ( name text, id bigint ); DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; ERROR: relation bidders is not distributed --- commands containing a CTE are unsupported -WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) -DELETE FROM limit_orders; -ERROR: common table expressions are not supported in distributed modifications +-- commands containing a CTE are supported +WITH new_orders AS (INSERT INTO limit_orders VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66)) +DELETE FROM limit_orders WHERE id < 0; INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); -- simple UPDATE UPDATE limit_orders SET symbol = 'GM' WHERE id = 246; @@ -431,10 +427,10 @@ UPDATE limit_orders SET limit_price = 0.00 FROM bidders limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; ERROR: relation bidders is not distributed --- commands containing a CTE are unsupported -WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) +-- the connection used for the INSERT is claimed by pull-push, causing the UPDATE to fail +WITH deleted_orders AS (INSERT INTO limit_orders VALUES (399, 'PDR', 14, '2017-07-02 16:32:15', 'sell', 43)) UPDATE limit_orders SET symbol = 'GM'; -ERROR: common table expressions are not supported in distributed modifications +ERROR: cannot establish a new connection for placement 750003, since DML has been executed on a connection that is in use SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; symbol | bidder_id --------+----------- @@ -962,13 +958,15 @@ SELECT * FROM summary_table ORDER BY id; 2 | 400 | 450.0000000000000000 | | (2 rows) --- unsupported multi-shard updates +-- multi-shard updates with recursively planned subqueries +BEGIN; UPDATE summary_table SET average_value = average_query.average FROM ( SELECT avg(value) AS average FROM raw_table) average_query; -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +ROLLBACK; +BEGIN; UPDATE summary_table SET average_value = average_value + 1 WHERE id = - (SELECT id FROM raw_table WHERE value > 100); -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator + (SELECT id FROM raw_table WHERE value > 100 LIMIT 1); +ROLLBACK; -- test complex queries UPDATE summary_table SET diff --git a/src/test/regress/expected/multi_mx_modifications.out b/src/test/regress/expected/multi_mx_modifications.out index dbf4c3c1d..f3934e587 100644 --- a/src/test/regress/expected/multi_mx_modifications.out +++ b/src/test/regress/expected/multi_mx_modifications.out @@ -111,10 +111,9 @@ INSERT INTO limit_orders_mx VALUES (2037, 'GOOG', 5634, now(), 'buy', random()), (2039, 'GOOG', 5634, now(), 'buy', random()); -- connect back to the other node \c - - - :worker_1_port --- commands containing a CTE are unsupported -WITH deleted_orders AS (DELETE FROM limit_orders_mx RETURNING *) -INSERT INTO limit_orders_mx DEFAULT VALUES; -ERROR: common table expressions are not supported in distributed modifications +-- commands containing a CTE are supported +WITH deleted_orders AS (DELETE FROM limit_orders_mx WHERE id < 0 RETURNING *) +INSERT INTO limit_orders_mx SELECT * FROM deleted_orders; -- test simple DELETE INSERT INTO limit_orders_mx VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); SELECT COUNT(*) FROM limit_orders_mx WHERE id = 246; @@ -166,10 +165,9 @@ DELETE FROM limit_orders_mx USING bidders WHERE limit_orders_mx.id = 246 AND limit_orders_mx.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; ERROR: relation bidders is not distributed --- commands containing a CTE are unsupported -WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *) -DELETE FROM limit_orders_mx; -ERROR: common table expressions are not supported in distributed modifications +-- commands containing a CTE are supported +WITH new_orders AS (INSERT INTO limit_orders_mx VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66)) +DELETE FROM limit_orders_mx WHERE id < 0; -- cursors are not supported DELETE FROM limit_orders_mx WHERE CURRENT OF cursor_name; ERROR: cannot run DML queries with cursors @@ -236,10 +234,9 @@ UPDATE limit_orders_mx SET limit_price = 0.00 FROM bidders limit_orders_mx.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; ERROR: relation bidders is not distributed --- commands containing a CTE are unsupported -WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *) +-- commands containing a CTE are supported +WITH deleted_orders AS (INSERT INTO limit_orders_mx VALUES (399, 'PDR', 14, '2017-07-02 16:32:15', 'sell', 43)) UPDATE limit_orders_mx SET symbol = 'GM'; -ERROR: common table expressions are not supported in distributed modifications SELECT symbol, bidder_id FROM limit_orders_mx WHERE id = 246; symbol | bidder_id --------+----------- diff --git a/src/test/regress/expected/multi_shard_update_delete.out b/src/test/regress/expected/multi_shard_update_delete.out index 9efcdfe36..0ef6cbf6b 100644 --- a/src/test/regress/expected/multi_shard_update_delete.out +++ b/src/test/regress/expected/multi_shard_update_delete.out @@ -415,20 +415,20 @@ WHERE user_id IN (SELECT user_id FROM users_test_table UNION ALL SELECT user_id - FROM events_test_table) returning *; - user_id | value_1 | value_2 | value_3 ----------+---------+---------+--------- - 8 | 4 | 13 | 0 - 20 | 4 | | 0 - 20 | 4 | | 0 - 20 | 4 | | 0 - 4 | 4 | 9 | 0 - 4 | 4 | 17 | 0 - 16 | 4 | | 0 - 6 | 4 | 11 | 0 - 6 | 4 | 15 | 0 - 2 | 4 | 7 | 0 - 2 | 4 | 19 | 0 + FROM events_test_table) returning value_3; + value_3 +--------- + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0 (11 rows) UPDATE users_test_table @@ -556,12 +556,10 @@ WHERE user_id IN (SELECT 2); UPDATE users_test_table SET value_2 = 6 WHERE value_1 IN (SELECT 2); --- Can only use immutable functions +-- Function calls in subqueries will be recursively planned UPDATE test_table_1 SET col_3 = 6 WHERE date_col IN (SELECT now()); -ERROR: cannot push down this subquery -DETAIL: Subqueries without a FROM clause can only contain immutable functions -- Test with prepared statements SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0; count @@ -630,17 +628,15 @@ FROM users_test_table FULL OUTER JOIN events_test_table e2 USING (user_id) WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2; ERROR: a join with USING causes an internal naming conflict, use ON instead --- We can not pushdown query if there is no partition key equality +-- Non-pushdownable subqueries, but will be handled through recursive planning UPDATE users_test_table SET value_1 = 1 WHERE user_id IN (SELECT Count(value_1) FROM events_test_table GROUP BY user_id); -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator UPDATE users_test_table SET value_1 = (SELECT Count(*) FROM events_test_table); -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator UPDATE users_test_table SET value_1 = 4 WHERE user_id IN (SELECT user_id @@ -648,7 +644,6 @@ WHERE user_id IN (SELECT user_id UNION SELECT value_1 FROM events_test_table); -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator UPDATE users_test_table SET value_1 = 4 WHERE user_id IN (SELECT user_id @@ -657,7 +652,6 @@ WHERE user_id IN (SELECT user_id SELECT Sum(value_1) FROM events_test_table GROUP BY user_id); -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table); @@ -685,8 +679,6 @@ WHERE user_id IN INTERSECT SELECT user_id FROM events_test_table); -ERROR: cannot push down this subquery -DETAIL: Intersect and Except are currently unsupported -- Reference tables can not locate on the outer part of the outer join UPDATE users_test_table SET value_1 = 4 @@ -707,25 +699,25 @@ SET value_2 = 5 * random() FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE +-- Volatile functions in a subquery are recursively planned UPDATE users_test_table SET value_2 = 5 WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table); -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator -- Local tables are not supported UPDATE users_test_table SET value_2 = 5 FROM events_test_table_local WHERE users_test_table.user_id = events_test_table_local.user_id; ERROR: relation events_test_table_local is not distributed -UPDATE users_test_table -SET value_2 = 5 -WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local); -ERROR: relation events_test_table_local is not distributed UPDATE events_test_table_local SET value_2 = 5 FROM users_test_table WHERE events_test_table_local.user_id = users_test_table.user_id; ERROR: relation events_test_table_local is not distributed +-- Local tables in a subquery are supported through recursive planning +UPDATE users_test_table +SET value_2 = 5 +WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local); -- Shard counts of tables must be equal to pushdown the query UPDATE users_test_table SET value_2 = 5 @@ -738,7 +730,8 @@ DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table); -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +ERROR: more than one row returned by a subquery used as an expression +CONTEXT: while executing command on localhost:57637 -- Cursors are not supported BEGIN; DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table; diff --git a/src/test/regress/expected/multi_shard_update_delete_0.out b/src/test/regress/expected/multi_shard_update_delete_0.out index 44fcc535c..457c36f8a 100644 --- a/src/test/regress/expected/multi_shard_update_delete_0.out +++ b/src/test/regress/expected/multi_shard_update_delete_0.out @@ -438,20 +438,20 @@ WHERE user_id IN (SELECT user_id FROM users_test_table UNION ALL SELECT user_id - FROM events_test_table) returning *; - user_id | value_1 | value_2 | value_3 ----------+---------+---------+--------- - 8 | 4 | 13 | 0 - 20 | 4 | | 0 - 20 | 4 | | 0 - 20 | 4 | | 0 - 4 | 4 | 9 | 0 - 4 | 4 | 17 | 0 - 16 | 4 | | 0 - 6 | 4 | 11 | 0 - 6 | 4 | 15 | 0 - 2 | 4 | 7 | 0 - 2 | 4 | 19 | 0 + FROM events_test_table) returning value_3; + value_3 +--------- + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0 (11 rows) UPDATE users_test_table @@ -579,12 +579,10 @@ WHERE user_id IN (SELECT 2); UPDATE users_test_table SET value_2 = 6 WHERE value_1 IN (SELECT 2); --- Can only use immutable functions +-- Function calls in subqueries will be recursively planned UPDATE test_table_1 SET col_3 = 6 WHERE date_col IN (SELECT now()); -ERROR: cannot push down this subquery -DETAIL: Subqueries without a FROM clause can only contain immutable functions -- Test with prepared statements SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0; count @@ -653,17 +651,15 @@ FROM users_test_table FULL OUTER JOIN events_test_table e2 USING (user_id) WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2; ERROR: a join with USING causes an internal naming conflict, use ON instead --- We can not pushdown query if there is no partition key equality +-- Non-pushdownable subqueries, but will be handled through recursive planning UPDATE users_test_table SET value_1 = 1 WHERE user_id IN (SELECT Count(value_1) FROM events_test_table GROUP BY user_id); -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator UPDATE users_test_table SET value_1 = (SELECT Count(*) FROM events_test_table); -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator UPDATE users_test_table SET value_1 = 4 WHERE user_id IN (SELECT user_id @@ -671,7 +667,6 @@ WHERE user_id IN (SELECT user_id UNION SELECT value_1 FROM events_test_table); -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator UPDATE users_test_table SET value_1 = 4 WHERE user_id IN (SELECT user_id @@ -680,7 +675,6 @@ WHERE user_id IN (SELECT user_id SELECT Sum(value_1) FROM events_test_table GROUP BY user_id); -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table); @@ -708,8 +702,6 @@ WHERE user_id IN INTERSECT SELECT user_id FROM events_test_table); -ERROR: cannot push down this subquery -DETAIL: Intersect and Except are currently unsupported -- Reference tables can not locate on the outer part of the outer join UPDATE users_test_table SET value_1 = 4 @@ -730,25 +722,25 @@ SET value_2 = 5 * random() FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE +-- Volatile functions in a subquery are recursively planned UPDATE users_test_table SET value_2 = 5 WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table); -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator -- Local tables are not supported UPDATE users_test_table SET value_2 = 5 FROM events_test_table_local WHERE users_test_table.user_id = events_test_table_local.user_id; ERROR: relation events_test_table_local is not distributed -UPDATE users_test_table -SET value_2 = 5 -WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local); -ERROR: relation events_test_table_local is not distributed UPDATE events_test_table_local SET value_2 = 5 FROM users_test_table WHERE events_test_table_local.user_id = users_test_table.user_id; ERROR: relation events_test_table_local is not distributed +-- Local tables in a subquery are supported through recursive planning +UPDATE users_test_table +SET value_2 = 5 +WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local); -- Shard counts of tables must be equal to pushdown the query UPDATE users_test_table SET value_2 = 5 @@ -761,7 +753,8 @@ DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table); -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +ERROR: more than one row returned by a subquery used as an expression +CONTEXT: while executing command on localhost:57637 -- Cursors are not supported BEGIN; DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table; diff --git a/src/test/regress/expected/recursive_dml_queries_mx.out b/src/test/regress/expected/recursive_dml_queries_mx.out new file mode 100644 index 000000000..6961463e7 --- /dev/null +++ b/src/test/regress/expected/recursive_dml_queries_mx.out @@ -0,0 +1,176 @@ +CREATE SCHEMA recursive_dml_queries_mx; +SET search_path TO recursive_dml_queries_mx, public; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO streaming; +CREATE TABLE recursive_dml_queries_mx.distributed_table (tenant_id text, dept int, info jsonb); +SELECT create_distributed_table('distributed_table', 'tenant_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE recursive_dml_queries_mx.second_distributed_table (tenant_id text, dept int, info jsonb); +SELECT create_distributed_table('second_distributed_table', 'tenant_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE recursive_dml_queries_mx.reference_table (id text, name text); +SELECT create_reference_table('reference_table'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i; +INSERT INTO second_distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i; +INSERT INTO reference_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i; +SET client_min_messages TO DEBUG1; +-- the subquery foo is recursively planned +UPDATE + reference_table +SET + name = 'new_' || name +FROM +( + SELECT + avg(second_distributed_table.tenant_id::int) as avg_tenant_id + FROM + second_distributed_table +) as foo +WHERE + foo.avg_tenant_id::int::text = reference_table.id; +DEBUG: only reference tables may be queried when targeting a reference table with multi shard UPDATE/DELETE queries with multiple tables +DEBUG: generating subplan 4_1 for subquery SELECT avg((tenant_id)::integer) AS avg_tenant_id FROM recursive_dml_queries_mx.second_distributed_table +DEBUG: Plan 4 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries_mx.reference_table SET name = ('new_'::text || reference_table.name) FROM (SELECT intermediate_result.avg_tenant_id FROM read_intermediate_result('4_1'::text, 'binary'::citus_copy_format) intermediate_result(avg_tenant_id numeric)) foo WHERE (((foo.avg_tenant_id)::integer)::text = reference_table.id) +-- the subquery foo is recursively planned +-- but note that the subquery foo itself is pushdownable +UPDATE + second_distributed_table +SET + dept = foo.max_dept * 2 +FROM +( + SELECT DISTINCT ON (tenant_id) tenant_id, max(dept) as max_dept FROM + ( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + ) foo_inner + GROUP BY + tenant_id + ORDER BY 1 DESC +) as foo +WHERE + foo.tenant_id != second_distributed_table.tenant_id + AND second_distributed_table.dept IN (2); +DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +DEBUG: generating subplan 6_1 for subquery SELECT DISTINCT ON (tenant_id) tenant_id, max(dept) AS max_dept FROM (SELECT second_distributed_table.dept, second_distributed_table.tenant_id FROM recursive_dml_queries_mx.second_distributed_table, recursive_dml_queries_mx.distributed_table WHERE (distributed_table.tenant_id = second_distributed_table.tenant_id)) foo_inner GROUP BY tenant_id ORDER BY tenant_id DESC +DEBUG: Plan 6 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries_mx.second_distributed_table SET dept = (foo.max_dept * 2) FROM (SELECT intermediate_result.tenant_id, intermediate_result.max_dept FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, max_dept integer)) foo WHERE ((foo.tenant_id <> second_distributed_table.tenant_id) AND (second_distributed_table.dept = 2)) +-- run some queries from worker nodes +\c - - - :worker_1_port +SET search_path TO recursive_dml_queries_mx, public; +-- the subquery foo is recursively planned +-- and foo itself is a non colocated subquery and recursively planned +UPDATE + second_distributed_table +SET + dept = foo.tenant_id::int / 4 +FROM +( + SELECT DISTINCT foo_inner_1.tenant_id FROM + ( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (3,4) + ) foo_inner_1, + ( + SELECT + second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (4,5) + )foo_inner_2 + WHERE foo_inner_1.tenant_id != foo_inner_2.tenant_id +) as foo +WHERE + foo.tenant_id != second_distributed_table.tenant_id + AND second_distributed_table.dept IN (3); +-- use the second worker +\c - - - :worker_2_port +SET search_path TO recursive_dml_queries_mx, public; +CREATE TABLE recursive_dml_queries_mx.local_table (id text, name text); +INSERT INTO local_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i; +CREATE VIEW tenant_ids AS + SELECT + tenant_id, name + FROM + distributed_table, reference_table + WHERE + distributed_table.dept::text = reference_table.id + ORDER BY 2 DESC, 1 DESC; +-- we currently do not allow local tables in modification queries +UPDATE + distributed_table +SET + dept = avg_tenant_id::int +FROM +( + SELECT + avg(local_table.id::int) as avg_tenant_id + FROM + local_table +) as foo +WHERE + foo.avg_tenant_id::int::text = distributed_table.tenant_id +RETURNING + distributed_table.*; + tenant_id | dept | info +-----------+------+------------------------ + 50 | 50 | {"f1": 50, "f2": 2500} +(1 row) + +-- we currently do not allow views in modification queries +UPDATE + distributed_table +SET + dept = avg_tenant_id::int +FROM +( + SELECT + avg(tenant_id::int) as avg_tenant_id + FROM + tenant_ids +) as foo +WHERE + foo.avg_tenant_id::int::text = distributed_table.tenant_id +RETURNING + distributed_table.*; + tenant_id | dept | info +-----------+------+------------------------ + 50 | 50 | {"f1": 50, "f2": 2500} +(1 row) + +DROP TABLE local_table; +\c - - - :master_port +SET search_path TO recursive_dml_queries_mx, public; +RESET client_min_messages; +DROP SCHEMA recursive_dml_queries_mx CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table distributed_table +drop cascades to table second_distributed_table +drop cascades to table reference_table +RESET citus.shard_replication_factor; +RESET citus.replication_model; diff --git a/src/test/regress/expected/recursive_dml_with_different_planners_executors.out b/src/test/regress/expected/recursive_dml_with_different_planners_executors.out new file mode 100644 index 000000000..33d4ab497 --- /dev/null +++ b/src/test/regress/expected/recursive_dml_with_different_planners_executors.out @@ -0,0 +1,87 @@ +CREATE SCHEMA recursive_dml_with_different_planner_executors; +SET search_path TO recursive_dml_with_different_planner_executors, public; +CREATE TABLE distributed_table (tenant_id text, dept int, info jsonb); +SELECT create_distributed_table('distributed_table', 'tenant_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE second_distributed_table (tenant_id text, dept int, info jsonb); +SELECT create_distributed_table('second_distributed_table', 'tenant_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE reference_table (id text, name text); +SELECT create_reference_table('reference_table'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i; +INSERT INTO second_distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i; +SET client_min_messages TO DEBUG1; +-- subquery with router planner +-- joined with a real-time query +UPDATE + distributed_table +SET dept = foo.dept FROM + (SELECT tenant_id, dept FROM second_distributed_table WHERE dept = 1 ) as foo, + (SELECT tenant_id FROM second_distributed_table WHERE dept IN (1, 2, 3, 4) OFFSET 0) as bar + WHERE foo.tenant_id = bar.tenant_id + AND distributed_table.tenant_id = bar.tenant_id; +DEBUG: cannot push down this subquery +DETAIL: Offset clause is currently unsupported when a subquery references a column from another query +DEBUG: generating subplan 3_1 for subquery SELECT tenant_id FROM recursive_dml_with_different_planner_executors.second_distributed_table WHERE (dept = ANY (ARRAY[1, 2, 3, 4])) OFFSET 0 +DEBUG: Plan 3 query after replacing subqueries and CTEs: UPDATE recursive_dml_with_different_planner_executors.distributed_table SET dept = foo.dept FROM (SELECT second_distributed_table.tenant_id, second_distributed_table.dept FROM recursive_dml_with_different_planner_executors.second_distributed_table WHERE (second_distributed_table.dept = 1)) foo, (SELECT intermediate_result.tenant_id FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) bar WHERE ((foo.tenant_id = bar.tenant_id) AND (distributed_table.tenant_id = bar.tenant_id)) +-- a non colocated subquery inside the UPDATE +UPDATE distributed_table SET dept = foo.max_dept FROM +( + SELECT + max(dept) as max_dept + FROM + (SELECT DISTINCT tenant_id, dept FROM distributed_table) as distributed_table + WHERE tenant_id NOT IN + (SELECT tenant_id FROM second_distributed_table WHERE dept IN (1, 2, 3, 4)) +) as foo WHERE foo.max_dept > dept * 3; +DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +DEBUG: generating subplan 5_1 for subquery SELECT tenant_id FROM recursive_dml_with_different_planner_executors.second_distributed_table WHERE (dept = ANY (ARRAY[1, 2, 3, 4])) +DEBUG: generating subplan 5_2 for subquery SELECT max(dept) AS max_dept FROM (SELECT DISTINCT distributed_table_1.tenant_id, distributed_table_1.dept FROM recursive_dml_with_different_planner_executors.distributed_table distributed_table_1) distributed_table WHERE (NOT (tenant_id IN (SELECT intermediate_result.tenant_id FROM read_intermediate_result('5_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)))) +DEBUG: Plan 5 query after replacing subqueries and CTEs: UPDATE recursive_dml_with_different_planner_executors.distributed_table SET dept = foo.max_dept FROM (SELECT intermediate_result.max_dept FROM read_intermediate_result('5_2'::text, 'binary'::citus_copy_format) intermediate_result(max_dept integer)) foo WHERE (foo.max_dept > (distributed_table.dept * 3)) +-- subquery with repartition query +SET citus.enable_repartition_joins to ON; +UPDATE distributed_table SET dept = foo.some_tenants::int FROM +( + SELECT + DISTINCT second_distributed_table.tenant_id as some_tenants + FROM second_distributed_table, distributed_table WHERE second_distributed_table.dept = distributed_table.dept +) as foo; +DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +DEBUG: cannot use real time executor with repartition jobs +HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. +DEBUG: generating subplan 8_1 for subquery SELECT DISTINCT second_distributed_table.tenant_id AS some_tenants FROM recursive_dml_with_different_planner_executors.second_distributed_table, recursive_dml_with_different_planner_executors.distributed_table WHERE (second_distributed_table.dept = distributed_table.dept) +DEBUG: Plan 8 query after replacing subqueries and CTEs: UPDATE recursive_dml_with_different_planner_executors.distributed_table SET dept = (foo.some_tenants)::integer FROM (SELECT intermediate_result.some_tenants FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(some_tenants text)) foo +SET citus.enable_repartition_joins to OFF; +-- final query is router +UPDATE distributed_table SET dept = foo.max_dept FROM +( + SELECT + max(dept) as max_dept + FROM + (SELECT DISTINCT tenant_id, dept FROM distributed_table) as distributed_table + WHERE tenant_id IN + (SELECT tenant_id FROM second_distributed_table WHERE dept IN (1, 2, 3, 4)) +) as foo WHERE foo.max_dept >= dept and tenant_id = '8'; +DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +DEBUG: generating subplan 10_1 for subquery SELECT max(dept) AS max_dept FROM (SELECT DISTINCT distributed_table_1.tenant_id, distributed_table_1.dept FROM recursive_dml_with_different_planner_executors.distributed_table distributed_table_1) distributed_table WHERE (tenant_id IN (SELECT second_distributed_table.tenant_id FROM recursive_dml_with_different_planner_executors.second_distributed_table WHERE (second_distributed_table.dept = ANY (ARRAY[1, 2, 3, 4])))) +DEBUG: Plan 10 query after replacing subqueries and CTEs: UPDATE recursive_dml_with_different_planner_executors.distributed_table SET dept = foo.max_dept FROM (SELECT intermediate_result.max_dept FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(max_dept integer)) foo WHERE ((foo.max_dept >= distributed_table.dept) AND (distributed_table.tenant_id = '8'::text)) +RESET client_min_messages; +DROP SCHEMA recursive_dml_with_different_planner_executors CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table distributed_table +drop cascades to table second_distributed_table +drop cascades to table reference_table +SET search_path TO public; diff --git a/src/test/regress/expected/with_dml.out b/src/test/regress/expected/with_dml.out new file mode 100644 index 000000000..d0fb4e978 --- /dev/null +++ b/src/test/regress/expected/with_dml.out @@ -0,0 +1,180 @@ +CREATE SCHEMA with_dml; +SET search_path TO with_dml, public; +CREATE TABLE with_dml.distributed_table (tenant_id text PRIMARY KEY, dept int); +SELECT create_distributed_table('distributed_table', 'tenant_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE with_dml.second_distributed_table (tenant_id text, dept int); +SELECT create_distributed_table('second_distributed_table', 'tenant_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE with_dml.reference_table (id text, name text); +SELECT create_reference_table('reference_table'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO distributed_table SELECT i::text, i % 10 FROM generate_series (0, 100) i; +INSERT INTO second_distributed_table SELECT i::text, i % 10 FROM generate_series (0, 100) i; +INSERT INTO reference_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i; +SET client_min_messages TO DEBUG1; +-- delete all tenants from the reference table whose dept is 1 +WITH ids_to_delete AS ( + SELECT tenant_id FROM distributed_table WHERE dept = 1 +) +DELETE FROM reference_table WHERE id IN (SELECT tenant_id FROM ids_to_delete); +DEBUG: common table expressions are not supported in distributed modifications +DEBUG: generating subplan 4_1 for CTE ids_to_delete: SELECT tenant_id FROM with_dml.distributed_table WHERE (dept = 1) +DEBUG: Plan 4 query after replacing subqueries and CTEs: DELETE FROM with_dml.reference_table WHERE (id IN (SELECT ids_to_delete.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('4_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete)) +-- update the name of the users whose dept is 2 +WITH ids_to_update AS ( + SELECT tenant_id FROM distributed_table WHERE dept = 2 +) +UPDATE reference_table SET name = 'new_' || name WHERE id IN (SELECT tenant_id FROM ids_to_update); +DEBUG: common table expressions are not supported in distributed modifications +DEBUG: generating subplan 6_1 for CTE ids_to_update: SELECT tenant_id FROM with_dml.distributed_table WHERE (dept = 2) +DEBUG: Plan 6 query after replacing subqueries and CTEs: UPDATE with_dml.reference_table SET name = ('new_'::text || name) WHERE (id IN (SELECT ids_to_update.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_update)) +-- now the CTE is also modifying +WITH ids_deleted_3 AS +( + DELETE FROM distributed_table WHERE dept = 3 RETURNING tenant_id +), +ids_deleted_4 AS +( + DELETE FROM distributed_table WHERE dept = 4 RETURNING tenant_id +) +DELETE FROM reference_table WHERE id IN (SELECT * FROM ids_deleted_3 UNION SELECT * FROM ids_deleted_4); +DEBUG: common table expressions are not supported in distributed modifications +DEBUG: generating subplan 8_1 for CTE ids_deleted_3: DELETE FROM with_dml.distributed_table WHERE (dept = 3) RETURNING tenant_id +DEBUG: generating subplan 8_2 for CTE ids_deleted_4: DELETE FROM with_dml.distributed_table WHERE (dept = 4) RETURNING tenant_id +DEBUG: generating subplan 8_3 for subquery SELECT ids_deleted_3.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_deleted_3 UNION SELECT ids_deleted_4.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('8_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_deleted_4 +DEBUG: Plan 8 query after replacing subqueries and CTEs: DELETE FROM with_dml.reference_table WHERE (id IN (SELECT intermediate_result.tenant_id FROM read_intermediate_result('8_3'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text))) +-- now the final UPDATE command is pushdownable +WITH ids_to_delete AS +( + SELECT tenant_id FROM distributed_table WHERE dept = 5 +) +UPDATE + distributed_table +SET + dept = dept + 1 +FROM + ids_to_delete, (SELECT tenant_id FROM distributed_table WHERE tenant_id::int < 60) as some_tenants +WHERE + some_tenants.tenant_id = ids_to_delete.tenant_id + AND distributed_table.tenant_id = some_tenants.tenant_id; +DEBUG: common table expressions are not supported in distributed modifications +DEBUG: generating subplan 12_1 for CTE ids_to_delete: SELECT tenant_id FROM with_dml.distributed_table WHERE (dept = 5) +DEBUG: Plan 12 query after replacing subqueries and CTEs: UPDATE with_dml.distributed_table SET dept = (distributed_table.dept + 1) FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete, (SELECT distributed_table_1.tenant_id FROM with_dml.distributed_table distributed_table_1 WHERE ((distributed_table_1.tenant_id)::integer < 60)) some_tenants WHERE ((some_tenants.tenant_id = ids_to_delete.tenant_id) AND (distributed_table.tenant_id = some_tenants.tenant_id)) +-- this query errors out since we've some hard +-- errors in the INSERT ... SELECT pushdown +-- which prevents to fallback to recursive planning +WITH ids_to_upsert AS +( + SELECT tenant_id FROM distributed_table WHERE dept > 7 +) +INSERT INTO distributed_table + SELECT distributed_table.tenant_id FROM ids_to_upsert, distributed_table + WHERE distributed_table.tenant_id = ids_to_upsert.tenant_id + ON CONFLICT (tenant_id) DO UPDATE SET dept = 8; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. +-- the following query is very similar to the above one +-- but this time the query is pulled to coordinator since +-- we return before hitting any hard errors +WITH ids_to_insert AS +( + SELECT (tenant_id::int * 100)::text as tenant_id FROM distributed_table WHERE dept > 7 +) +INSERT INTO distributed_table + SELECT DISTINCT ids_to_insert.tenant_id FROM ids_to_insert, distributed_table + WHERE distributed_table.tenant_id < ids_to_insert.tenant_id; +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match +DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. +HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: generating subplan 16_1 for CTE ids_to_insert: SELECT (((tenant_id)::integer * 100))::text AS tenant_id FROM with_dml.distributed_table WHERE (dept > 7) +DEBUG: generating subplan 16_2 for subquery SELECT DISTINCT ids_to_insert.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('16_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_insert, with_dml.distributed_table WHERE (distributed_table.tenant_id < ids_to_insert.tenant_id) +DEBUG: Plan 16 query after replacing subqueries and CTEs: SELECT tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('16_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) citus_insert_select_subquery +-- not a very meaningful query +-- but has two modifying CTEs along with another +-- modify statement +WITH copy_to_other_table AS ( + INSERT INTO distributed_table + SELECT * + FROM second_distributed_table + WHERE dept = 3 + ON CONFLICT (tenant_id) DO UPDATE SET dept = 4 + RETURNING * +), +main_table_deleted AS ( + DELETE + FROM distributed_table + WHERE dept < 10 + AND NOT EXISTS (SELECT 1 FROM second_distributed_table + WHERE second_distributed_table.dept = 1 + AND second_distributed_table.tenant_id = distributed_table.tenant_id) + RETURNING * +) +INSERT INTO second_distributed_table + SELECT * + FROM main_table_deleted + EXCEPT + SELECT * + FROM copy_to_other_table; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries +DEBUG: generating subplan 20_1 for CTE copy_to_other_table: INSERT INTO with_dml.distributed_table (tenant_id, dept) SELECT tenant_id, dept FROM with_dml.second_distributed_table WHERE (dept = 3) ON CONFLICT(tenant_id) DO UPDATE SET dept = 4 RETURNING distributed_table.tenant_id, distributed_table.dept +DEBUG: generating subplan 20_2 for CTE main_table_deleted: DELETE FROM with_dml.distributed_table WHERE ((dept < 10) AND (NOT (EXISTS (SELECT 1 FROM with_dml.second_distributed_table WHERE ((second_distributed_table.dept = 1) AND (second_distributed_table.tenant_id = distributed_table.tenant_id)))))) RETURNING tenant_id, dept +DEBUG: generating subplan 20_3 for subquery SELECT main_table_deleted.tenant_id, main_table_deleted.dept FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept FROM read_intermediate_result('20_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer)) main_table_deleted EXCEPT SELECT copy_to_other_table.tenant_id, copy_to_other_table.dept FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer)) copy_to_other_table +DEBUG: Plan 20 query after replacing subqueries and CTEs: SELECT tenant_id, dept FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept FROM read_intermediate_result('20_3'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer)) citus_insert_select_subquery +-- CTE inside the UPDATE statement +UPDATE + second_distributed_table +SET dept = + (WITH vals AS ( + SELECT DISTINCT tenant_id::int FROM distributed_table + ) select * from vals where tenant_id = 8 ) + WHERE dept = 8; +DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +DEBUG: generating subplan 24_1 for CTE vals: SELECT DISTINCT (tenant_id)::integer AS tenant_id FROM with_dml.distributed_table +DEBUG: Plan 24 query after replacing subqueries and CTEs: UPDATE with_dml.second_distributed_table SET dept = (SELECT vals.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('24_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) vals WHERE (vals.tenant_id = 8)) WHERE (dept = 8) +-- Subquery inside the UPDATE statement +UPDATE + second_distributed_table +SET dept = + + (SELECT DISTINCT tenant_id::int FROM distributed_table WHERE tenant_id = '9') + WHERE dept = 8; +DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +DEBUG: generating subplan 26_1 for subquery SELECT DISTINCT (tenant_id)::integer AS tenant_id FROM with_dml.distributed_table WHERE (tenant_id = '9'::text) +DEBUG: Plan 26 query after replacing subqueries and CTEs: UPDATE with_dml.second_distributed_table SET dept = (SELECT intermediate_result.tenant_id FROM read_intermediate_result('26_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) WHERE (dept = 8) +-- delete all remaining tenants +WITH ids_to_delete AS ( + SELECT tenant_id FROM distributed_table +) +DELETE FROM distributed_table WHERE tenant_id = ANY(SELECT tenant_id FROM ids_to_delete); +DEBUG: common table expressions are not supported in distributed modifications +DEBUG: generating subplan 28_1 for CTE ids_to_delete: SELECT tenant_id FROM with_dml.distributed_table +DEBUG: Plan 28 query after replacing subqueries and CTEs: DELETE FROM with_dml.distributed_table WHERE (tenant_id IN (SELECT ids_to_delete.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('28_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete)) +WITH ids_to_delete AS ( + SELECT id FROM reference_table +) +DELETE FROM reference_table WHERE id = ANY(SELECT id FROM ids_to_delete); +DEBUG: common table expressions are not supported in distributed modifications +DEBUG: generating subplan 30_1 for CTE ids_to_delete: SELECT id FROM with_dml.reference_table +DEBUG: Plan 30 query after replacing subqueries and CTEs: DELETE FROM with_dml.reference_table WHERE (id IN (SELECT ids_to_delete.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('30_1'::text, 'binary'::citus_copy_format) intermediate_result(id text)) ids_to_delete)) +RESET client_min_messages; +DROP SCHEMA with_dml CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table distributed_table +drop cascades to table second_distributed_table +drop cascades to table reference_table diff --git a/src/test/regress/expected/with_modifying.out b/src/test/regress/expected/with_modifying.out index 8c7bf4fc0..ad689c1e0 100644 --- a/src/test/regress/expected/with_modifying.out +++ b/src/test/regress/expected/with_modifying.out @@ -201,12 +201,10 @@ WITH cte AS ( SELECT value_2 FROM users_table WHERE user_id IN (1, 2, 3) ) DELETE FROM modify_table WHERE id IN (SELECT value_2 FROM cte); -ERROR: common table expressions are not supported in distributed modifications WITH cte AS ( SELECT value_2 FROM users_table WHERE user_id IN (1, 2, 3) ) UPDATE modify_table SET val=-1 WHERE val IN (SELECT * FROM cte); -ERROR: common table expressions are not supported in distributed modifications WITH cte AS ( WITH basic AS ( SELECT value_2 FROM users_table WHERE user_id IN (1, 2, 3) @@ -214,7 +212,7 @@ WITH cte AS ( INSERT INTO modify_table (SELECT * FROM basic) RETURNING * ) UPDATE modify_table SET val=-2 WHERE id IN (SELECT id FROM cte); -ERROR: common table expressions are not supported in distributed modifications +ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator WITH cte AS ( WITH basic AS ( SELECT * FROM events_table WHERE event_type = 5 @@ -237,10 +235,10 @@ INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY SELECT * FROM summary_table ORDER BY id; id | counter ----+--------- - 2 | 153 - 3 | 184 - 4 | 160 - 5 | 170 + 2 | 20 + 3 | 38 + 4 | 24 + 5 | 27 (4 rows) SELECT COUNT(*) FROM modify_table; @@ -259,11 +257,11 @@ SELECT * FROM summary_table ORDER BY id, counter; ----+--------- 1 | 1 2 | 1 - 2 | 153 + 2 | 20 3 | 1 - 3 | 184 - 4 | 160 - 5 | 170 + 3 | 38 + 4 | 24 + 5 | 27 (7 rows) SELECT COUNT(*) FROM modify_table; @@ -310,10 +308,10 @@ SELECT * FROM summary_table ORDER BY id, counter; id | counter ----+--------- 1 | 1 - 2 | 154 - 3 | 185 - 4 | 160 - 5 | 170 + 2 | 21 + 3 | 39 + 4 | 24 + 5 | 27 (5 rows) WITH added_data AS ( @@ -334,10 +332,10 @@ SELECT * FROM summary_table ORDER BY id, counter; ----+--------- 1 | 1 1 | 1 - 2 | 154 - 3 | 185 - 4 | 160 - 5 | 170 + 2 | 21 + 3 | 39 + 4 | 24 + 5 | 27 (6 rows) -- Merge rows in the summary_table @@ -349,10 +347,10 @@ SELECT * FROM summary_table ORDER BY id, counter; id | counter ----+--------- 1 | 2 - 2 | 154 - 3 | 185 - 4 | 160 - 5 | 170 + 2 | 21 + 3 | 39 + 4 | 24 + 5 | 27 (5 rows) SELECT * FROM modify_table ORDER BY id, val; @@ -378,8 +376,6 @@ raw_data AS ( DELETE FROM modify_table WHERE id >= (SELECT min(id) FROM select_data WHERE id > 10) RETURNING * ) INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; -ERROR: cannot push down this subquery -DETAIL: Aggregates without group by are currently unsupported when a subquery references a column from another query INSERT INTO modify_table VALUES (21, 1), (22, 2), (23, 3); -- read ids from the same table WITH distinct_ids AS ( @@ -392,7 +388,7 @@ update_data AS ( SELECT count(*) FROM update_data; count ------- - 6 + 3 (1 row) -- read ids from a different table @@ -420,7 +416,7 @@ WITH update_data AS ( SELECT COUNT(*) FROM update_data; count ------- - 2 + 1 (1 row) WITH delete_rows AS ( @@ -429,13 +425,10 @@ WITH delete_rows AS ( SELECT * FROM delete_rows ORDER BY id, val; id | val ----+----- - 11 | 100 - 12 | 300 - 13 | 100 21 | 300 22 | 200 23 | 100 -(6 rows) +(3 rows) WITH delete_rows AS ( DELETE FROM summary_table WHERE id > 10 RETURNING * @@ -443,7 +436,10 @@ WITH delete_rows AS ( SELECT * FROM delete_rows ORDER BY id, counter; id | counter ----+--------- -(0 rows) + 11 | 1 + 12 | 1 + 13 | 1 +(3 rows) -- Check modifiying CTEs inside a transaction BEGIN; @@ -467,11 +463,11 @@ SELECT * FROM summary_table ORDER BY id, counter; 1 | 1 1 | 2 2 | 1 - 2 | 154 + 2 | 21 3 | 1 - 3 | 185 - 4 | 160 - 5 | 170 + 3 | 39 + 4 | 24 + 5 | 27 (8 rows) SELECT * FROM modify_table ORDER BY id, val; @@ -493,10 +489,10 @@ SELECT * FROM summary_table ORDER BY id, counter; id | counter ----+--------- 1 | 2 - 2 | 154 - 3 | 185 - 4 | 160 - 5 | 170 + 2 | 21 + 3 | 39 + 4 | 24 + 5 | 27 (5 rows) SELECT * FROM modify_table ORDER BY id, val; @@ -548,7 +544,11 @@ WITH deleted_rows AS ( DELETE FROM modify_table WHERE val IN (SELECT val FROM modify_table WHERE id = 3) RETURNING * ) SELECT * FROM deleted_rows; -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator + id | val +----+----- + 3 | 6 +(1 row) + WITH select_rows AS ( SELECT val FROM modify_table WHERE id = 3 ), @@ -558,14 +558,14 @@ deleted_rows AS ( SELECT * FROM deleted_rows; id | val ----+----- - 3 | 6 -(1 row) +(0 rows) WITH deleted_rows AS ( DELETE FROM modify_table WHERE ctid IN (SELECT ctid FROM modify_table WHERE id = 1) RETURNING * ) SELECT * FROM deleted_rows; -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +ERROR: cannot perform distributed planning for the given modification +DETAIL: Recursively planned distributed modifications with ctid on where clause are not supported. WITH select_rows AS ( SELECT ctid FROM modify_table WHERE id = 1 ), @@ -646,7 +646,10 @@ raw_data AS ( RETURNING id, val ) SELECT * FROM raw_data ORDER BY val; -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator + id | val +----+----- +(0 rows) + -- Test with replication factor 2 SET citus.shard_replication_factor to 2; DROP TABLE modify_table; diff --git a/src/test/regress/expected/with_transactions.out b/src/test/regress/expected/with_transactions.out new file mode 100644 index 000000000..1b75f2ab4 --- /dev/null +++ b/src/test/regress/expected/with_transactions.out @@ -0,0 +1,113 @@ +CREATE SCHEMA with_transactions; +SET search_path TO with_transactions, public; +SET citus.shard_count TO 4; +SET citus.next_placement_id TO 800000; +CREATE TABLE with_transactions.raw_table (tenant_id int, income float, created_at timestamptz); +SELECT create_distributed_table('raw_table', 'tenant_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE with_transactions.second_raw_table (tenant_id int, income float, created_at timestamptz); +SELECT create_distributed_table('second_raw_table', 'tenant_id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO + raw_table (tenant_id, income, created_at) +SELECT + i % 10, i * 10.0, timestamp '2014-01-10 20:00:00' + i * interval '1 day' +FROM + generate_series (0, 100) i; +INSERT INTO second_raw_table SELECT * FROM raw_table; +SET client_min_messages TO DEBUG1; +-- run a transaction which DELETE +BEGIN; + WITH ids_to_delete AS + ( + SELECT tenant_id FROM raw_table WHERE income < 250 + ), + deleted_ids AS + ( + DELETE FROM raw_table WHERE created_at < '2014-02-10 20:00:00' AND tenant_id IN (SELECT * from ids_to_delete) RETURNING tenant_id + ) + UPDATE raw_table SET income = income * 2 WHERE tenant_id IN (SELECT tenant_id FROM deleted_ids); +DEBUG: common table expressions are not supported in distributed modifications +DEBUG: generating subplan 3_1 for CTE ids_to_delete: SELECT tenant_id FROM with_transactions.raw_table WHERE (income < (250)::double precision) +DEBUG: generating subplan 3_2 for CTE deleted_ids: DELETE FROM with_transactions.raw_table WHERE ((created_at < 'Mon Feb 10 20:00:00 2014 PST'::timestamp with time zone) AND (tenant_id IN (SELECT ids_to_delete.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) ids_to_delete))) RETURNING tenant_id +DEBUG: Plan 3 query after replacing subqueries and CTEs: UPDATE with_transactions.raw_table SET income = (income * (2)::double precision) WHERE (tenant_id IN (SELECT deleted_ids.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('3_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) deleted_ids)) +ROLLBACK; +-- see that both UPDATE and DELETE commands are rollbacked +SELECT count(*) FROM raw_table; + count +------- + 101 +(1 row) + +SELECT max(income) FROM raw_table; + max +------ + 1000 +(1 row) + +-- multi-statement multi shard modifying statements should work +BEGIN; + SELECT count (*) FROM second_raw_table; + count +------- + 101 +(1 row) + + WITH distinct_count AS ( + SELECT count(DISTINCT created_at) FROM raw_table + ), + ids_inserted AS + ( + INSERT INTO raw_table VALUES (11, 1000, now()) RETURNING tenant_id + ) + UPDATE raw_table SET created_at = '2001-02-10 20:00:00' + WHERE tenant_id IN (SELECT tenant_id FROM ids_inserted) AND tenant_id < (SELECT count FROM distinct_count); +DEBUG: common table expressions are not supported in distributed modifications +DEBUG: generating subplan 9_1 for CTE distinct_count: SELECT count(DISTINCT created_at) AS count FROM with_transactions.raw_table +DEBUG: generating subplan 9_2 for CTE ids_inserted: INSERT INTO with_transactions.raw_table (tenant_id, income, created_at) VALUES (11, 1000, now()) RETURNING tenant_id +DEBUG: Plan 9 query after replacing subqueries and CTEs: UPDATE with_transactions.raw_table SET created_at = 'Sat Feb 10 20:00:00 2001 PST'::timestamp with time zone WHERE ((tenant_id IN (SELECT ids_inserted.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('9_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) ids_inserted)) AND (tenant_id < (SELECT distinct_count.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('9_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) distinct_count))) + TRUNCATE second_raw_table; +COMMIT; +-- sequential insert followed by parallel update causes execution issues +WITH ids_inserted AS +( + INSERT INTO raw_table VALUES (11, 1000, now()), (12, 1000, now()), (13, 1000, now()) RETURNING tenant_id +) +UPDATE raw_table SET created_at = '2001-02-10 20:00:00' WHERE tenant_id IN (SELECT tenant_id FROM ids_inserted); +DEBUG: common table expressions are not supported in distributed modifications +DEBUG: generating subplan 12_1 for CTE ids_inserted: INSERT INTO with_transactions.raw_table (tenant_id, income, created_at) VALUES (11,1000,now()), (12,1000,now()), (13,1000,now()) RETURNING raw_table.tenant_id +DEBUG: Plan 12 query after replacing subqueries and CTEs: UPDATE with_transactions.raw_table SET created_at = 'Sat Feb 10 20:00:00 2001 PST'::timestamp with time zone WHERE (tenant_id IN (SELECT ids_inserted.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) ids_inserted)) +ERROR: cannot establish a new connection for placement 800007, since DML has been executed on a connection that is in use +-- make sure that everything committed +SELECT count(*) FROM raw_table; + count +------- + 102 +(1 row) + +SELECT count(*) FROM raw_table WHERE created_at = '2001-02-10 20:00:00'; + count +------- + 1 +(1 row) + +SELECT count(*) FROM second_raw_table; + count +------- + 0 +(1 row) + +RESET client_min_messages; +RESET citus.shard_count; +DROP SCHEMA with_transactions CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table raw_table +drop cascades to table second_raw_table diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 8a309bba1..f11d2d4e8 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -23,7 +23,7 @@ test: multi_mx_copy_data multi_mx_router_planner test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10 test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19 test: multi_mx_tpch_query3 multi_mx_tpch_query6 multi_mx_tpch_query7 -test: multi_mx_tpch_query7_nested multi_mx_ddl +test: multi_mx_tpch_query7_nested multi_mx_ddl recursive_dml_queries_mx test: multi_mx_repartition_udt_prepare test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: multi_mx_metadata diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 8156c9e2d..91dc1d9a2 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -32,7 +32,7 @@ test: multi_read_from_secondaries test: multi_create_table test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select -test: multi_insert_select_window multi_shard_update_delete window_functions +test: multi_insert_select_window multi_shard_update_delete window_functions dml_recursive recursive_dml_with_different_planners_executors # ---------- # Tests for partitioning support @@ -99,8 +99,8 @@ test: multi_repartition_join_planning multi_repartition_join_pruning multi_repar # --------- test: with_nested with_where with_basics with_prepare with_set_operations -test: with_modifying -test: with_executors with_join with_partitioning +test: with_modifying cte_prepared_modify cte_nested_modification +test: with_executors with_join with_partitioning with_transactions with_dml # ---------- # Tests to check our large record loading and shard deletion behavior diff --git a/src/test/regress/sql/cte_nested_modification.sql b/src/test/regress/sql/cte_nested_modification.sql new file mode 100644 index 000000000..ebe8f02b4 --- /dev/null +++ b/src/test/regress/sql/cte_nested_modification.sql @@ -0,0 +1,105 @@ +CREATE SCHEMA cte_nested_modifications; +SET search_path TO cte_nested_modifications, public; +CREATE TABLE tt1(id int, value_1 int); +INSERT INTO tt1 VALUES(1,2),(2,3),(3,4); +SELECT create_distributed_table('tt1','id'); +CREATE TABLE tt2(id int, value_1 int); +INSERT INTO tt2 VALUES(3,3),(4,4),(5,5); +SELECT create_distributed_table('tt2','id'); +CREATE TABLE tt3(id int, json_val json); +INSERT INTO tt3 VALUES(1, '{"prod_name":"name_1", "qty":"6"}'), + (2, '{"prod_name":"name_2", "qty":"4"}'), + (3, '{"prod_name":"name_3", "qty":"2"}'); +-- DELETE within CTE and use it from UPDATE +BEGIN; +WITH cte_1 AS ( + WITH cte_2 AS ( + SELECT id as cte2_id + FROM tt1 + WHERE value_1 >= 2 + ) + + DELETE FROM tt2 + USING cte_2 + WHERE tt2.id = cte_2.cte2_id + RETURNING cte2_id - 1 as id +) +UPDATE tt1 +SET value_1 = abs(2 + 3.5) +FROM cte_1 +WHERE cte_1.id = tt1.id; +SELECT * FROM tt1 ORDER BY id; +ROLLBACK; +-- Similar to test above, now use the CTE in the SET part of the UPDATE +BEGIN; +WITH cte_1 AS ( + WITH cte_2 AS ( + SELECT id as cte2_id + FROM tt1 + WHERE value_1 >= 2 + ) + + DELETE FROM tt2 + USING cte_2 + WHERE tt2.id = cte_2.cte2_id + RETURNING cte2_id as id +) +UPDATE tt1 +SET value_1 = (SELECT max(id) + abs(2 + 3.5) FROM cte_1); +SELECT * FROM tt1 ORDER BY id; +ROLLBACK; +-- Use alias in the definition of CTE, instead of in the RETURNING +BEGIN; +WITH cte_1(id) AS ( + WITH cte_2 AS ( + SELECT id as cte2_id + FROM tt1 + WHERE value_1 >= 2 + ) + + DELETE FROM tt2 + USING cte_2 + WHERE tt2.id = cte_2.cte2_id + RETURNING cte2_id +) +UPDATE tt1 +SET value_1 = (SELECT max(id) + abs(2 + 3.5) FROM cte_1); +SELECT * FROM tt1 ORDER BY id; +ROLLBACK; +-- Update within CTE and use it from Delete +BEGIN; +WITH cte_1 AS ( + WITH cte_2 AS ( + SELECT id as cte2_id + FROM tt1 + WHERE value_1 >= 2 + ) + + UPDATE tt2 + SET value_1 = 10 + FROM cte_2 + WHERE id = cte2_id + RETURNING id, value_1 +) +DELETE FROM tt1 +USING cte_1 +WHERE tt1.id < cte_1.id; +SELECT * FROM tt1 ORDER BY id; +ROLLBACK; +-- Similar to test above, but use json column +BEGIN; +WITH cte_1 AS ( + WITH cte_2 AS ( + SELECT * FROM tt3 + ) + + UPDATE tt2 + SET value_1 = (SELECT max((json_val->>'qty')::int) FROM cte_2) + RETURNING id, value_1 +) +DELETE FROM tt1 +USING cte_1 +WHERE tt1.id < cte_1.id; +SELECT * FROM tt1 ORDER BY id; +ROLLBACK; +DROP SCHEMA cte_nested_modifications CASCADE; diff --git a/src/test/regress/sql/cte_prepared_modify.sql b/src/test/regress/sql/cte_prepared_modify.sql new file mode 100644 index 000000000..73adc70ee --- /dev/null +++ b/src/test/regress/sql/cte_prepared_modify.sql @@ -0,0 +1,52 @@ +CREATE SCHEMA cte_prepared_modify; +SET search_path TO cte_prepared_modify, public; +CREATE TABLE tt1(id int, value_1 int); +INSERT INTO tt1 VALUES(1,2),(2,3),(3,4); +SELECT create_distributed_table('tt1','id'); +CREATE TABLE tt2(id int, value_1 int); +INSERT INTO tt2 VALUES(3,3),(4,4),(5,5); +SELECT create_distributed_table('tt2','id'); +-- Test with prepared statements (parameter used by SET) +PREPARE prepared_test(integer) AS +WITH cte_1 AS( + SELECT * FROM tt1 WHERE id >= 2 +) +UPDATE tt2 +SET value_1 = $1 +FROM cte_1 +WHERE tt2.id = cte_1.id; +-- Test with prepared statements (parameter used by WHERE on partition column) +PREPARE prepared_test_2(integer) AS +WITH cte_1 AS( + SELECT * FROM tt1 WHERE id >= 2 +) +UPDATE tt2 +SET value_1 = (SELECT max(id) FROM cte_1) +WHERE tt2.id = $1; +-- Test with prepared statements (parameter used by WHERE on non-partition column) +PREPARE prepared_test_3(integer) AS +WITH cte_1 AS( + SELECT * FROM tt1 WHERE id >= 2 +) +UPDATE tt2 +SET value_1 = (SELECT max(id) FROM cte_1) +WHERE tt2.value_1 = $1; +EXECUTE prepared_test(1); +EXECUTE prepared_test(2); +EXECUTE prepared_test(3); +EXECUTE prepared_test(4); +EXECUTE prepared_test(5); +EXECUTE prepared_test(6); +EXECUTE prepared_test(1); +EXECUTE prepared_test(2); +EXECUTE prepared_test(3); +EXECUTE prepared_test(4); +EXECUTE prepared_test(5); +EXECUTE prepared_test(6); +EXECUTE prepared_test_3(1); +EXECUTE prepared_test_3(2); +EXECUTE prepared_test_3(3); +EXECUTE prepared_test_3(4); +EXECUTE prepared_test_3(5); +EXECUTE prepared_test_3(6); +DROP SCHEMA cte_prepared_modify CASCADE; diff --git a/src/test/regress/sql/dml_recursive.sql b/src/test/regress/sql/dml_recursive.sql new file mode 100644 index 000000000..f44195a84 --- /dev/null +++ b/src/test/regress/sql/dml_recursive.sql @@ -0,0 +1,265 @@ +CREATE SCHEMA recursive_dml_queries; +SET search_path TO recursive_dml_queries, public; +SET citus.next_shard_id TO 2370000; + +CREATE TABLE recursive_dml_queries.distributed_table (tenant_id text, dept int, info jsonb); +SELECT create_distributed_table('distributed_table', 'tenant_id'); + +CREATE TABLE recursive_dml_queries.second_distributed_table (tenant_id text, dept int, info jsonb); +SELECT create_distributed_table('second_distributed_table', 'tenant_id'); + +CREATE TABLE recursive_dml_queries.reference_table (id text, name text); +SELECT create_reference_table('reference_table'); + +CREATE TABLE recursive_dml_queries.local_table (id text, name text); + +INSERT INTO distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i; +INSERT INTO second_distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i; +INSERT INTO reference_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i; +INSERT INTO local_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i; + +CREATE VIEW tenant_ids AS + SELECT + tenant_id, name + FROM + distributed_table, reference_table + WHERE + distributed_table.dept::text = reference_table.id + ORDER BY 2 DESC, 1 DESC; + +SET client_min_messages TO DEBUG1; + +-- the subquery foo is recursively planned +UPDATE + reference_table +SET + name = 'new_' || name +FROM +( + SELECT + avg(second_distributed_table.tenant_id::int) as avg_tenant_id + FROM + second_distributed_table +) as foo +WHERE + foo.avg_tenant_id::int::text = reference_table.id +RETURNING + reference_table.name; + +-- the subquery foo is recursively planned +-- but note that the subquery foo itself is pushdownable +UPDATE + second_distributed_table +SET + dept = foo.max_dept * 2 +FROM +( + SELECT DISTINCT ON (tenant_id) tenant_id, max(dept) as max_dept FROM + ( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + ) foo_inner + GROUP BY + tenant_id + ORDER BY 1 DESC +) as foo +WHERE + foo.tenant_id != second_distributed_table.tenant_id + AND second_distributed_table.dept IN (2) +RETURNING + second_distributed_table.tenant_id, second_distributed_table.dept; + +-- the subquery foo is recursively planned +-- and foo itself is a non colocated subquery and recursively planned +UPDATE + second_distributed_table +SET + dept = foo.tenant_id::int / 4 +FROM +( + SELECT DISTINCT foo_inner_1.tenant_id FROM + ( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (3,4) + ) foo_inner_1, + ( + SELECT + second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (4,5) + )foo_inner_2 + WHERE foo_inner_1.tenant_id != foo_inner_2.tenant_id +) as foo +WHERE + foo.tenant_id != second_distributed_table.tenant_id + AND second_distributed_table.dept IN (3); + +-- we currently do not allow local tables in modification queries +UPDATE + distributed_table +SET + dept = avg_tenant_id::int +FROM +( + SELECT + avg(local_table.id::int) as avg_tenant_id + FROM + local_table +) as foo +WHERE + foo.avg_tenant_id::int::text = distributed_table.tenant_id +RETURNING + distributed_table.*; + +-- we currently do not allow views in modification queries +UPDATE + distributed_table +SET + dept = avg_tenant_id::int +FROM +( + SELECT + avg(tenant_id::int) as avg_tenant_id + FROM + tenant_ids +) as foo +WHERE + foo.avg_tenant_id::int::text = distributed_table.tenant_id +RETURNING + distributed_table.*; + +-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be +-- recursively planned +UPDATE + second_distributed_table +SET + dept = foo.tenant_id::int / 4 +FROM +( + SELECT DISTINCT foo_inner_1.tenant_id FROM + ( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (3,4) + ) + foo_inner_1 JOIN LATERAL + ( + SELECT + second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND foo_inner_1.dept = second_distributed_table.dept + AND + second_distributed_table.dept IN (4,5) + ) foo_inner_2 + ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id) + ) as foo +RETURNING *; + + +-- again a corrolated subquery +-- this time distribution key eq. exists +-- however recursive planning is prevented due to correlated subqueries +UPDATE + second_distributed_table +SET + dept = foo.tenant_id::int / 4 +FROM +( + SELECT baz.tenant_id FROM + ( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table as d1 + WHERE + d1.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (3,4) + AND + second_distributed_table.tenant_id IN + ( + SELECT s2.tenant_id + FROM second_distributed_table as s2 + GROUP BY d1.tenant_id, s2.tenant_id + ) + ) as baz + ) as foo WHERE second_distributed_table.tenant_id = foo.tenant_id +RETURNING *; + +-- we don't support subquerues/CTEs inside VALUES +INSERT INTO + second_distributed_table (tenant_id, dept) +VALUES ('3', (WITH vals AS (SELECT 3) select * from vals)); + +INSERT INTO + second_distributed_table (tenant_id, dept) +VALUES ('3', (SELECT 3)); + +-- DML with an unreferenced SELECT CTE +WITH cte_1 AS ( + WITH cte_2 AS ( + SELECT tenant_id as cte2_id + FROM second_distributed_table + WHERE dept >= 2 + ) + + UPDATE distributed_table + SET dept = 10 + RETURNING * +) +UPDATE distributed_table +SET dept = 5 +FROM cte_1 +WHERE distributed_table.tenant_id < cte_1.tenant_id; + +EXPLAIN (COSTS FALSE) WITH cte_1 AS ( + WITH cte_2 AS ( + SELECT tenant_id as cte2_id + FROM second_distributed_table + WHERE dept >= 2 + ) + + UPDATE distributed_table + SET dept = 10 + RETURNING * +) +UPDATE distributed_table +SET dept = 5 +FROM cte_1 +WHERE distributed_table.tenant_id < cte_1.tenant_id; + +-- we don't support updating local table with a join with +-- distributed tables +UPDATE + local_table +SET + id = 'citus_test' +FROM + distributed_table +WHERE + distributed_table.tenant_id = local_table.id; + +RESET client_min_messages; +DROP SCHEMA recursive_dml_queries CASCADE; diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index 6d85c4230..2ac88a69f 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -1420,6 +1420,21 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP BY 1,2,3,4,5,6; SELECT count(*) FROM raw_events_second; +-- intermediate results (CTEs) should be allowed when doing INSERT...SELECT within a CTE +WITH series AS ( + SELECT s AS val FROM generate_series(60,70) s +), +inserts AS ( + INSERT INTO raw_events_second (user_id) + SELECT + user_id + FROM + raw_events_first JOIN series ON (value_1 = val) + RETURNING + NULL +) +SELECT count(*) FROM inserts; + -- we need this in our next test truncate raw_events_first; diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index f8f00ccd8..acde8543e 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -1,5 +1,6 @@ SET citus.shard_count TO 32; SET citus.next_shard_id TO 750000; +SET citus.next_placement_id TO 750000; -- =================================================================== @@ -168,13 +169,9 @@ INSERT INTO limit_orders VALUES (random() * 10 + 70000, 'GOOG', 5634, now(), 'bu SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 70000 AND 90000; --- Who says that? :) --- INSERT ... SELECT ... FROM commands are unsupported --- INSERT INTO limit_orders SELECT * FROM limit_orders; - --- commands containing a CTE are unsupported -WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *) -INSERT INTO limit_orders DEFAULT VALUES; +-- commands containing a CTE are supported +WITH deleted_orders AS (DELETE FROM limit_orders WHERE id < 0 RETURNING *) +INSERT INTO limit_orders SELECT * FROM deleted_orders; -- test simple DELETE INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); @@ -194,15 +191,15 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 246; DELETE FROM limit_orders WHERE id = (2 * 123); SELECT COUNT(*) FROM limit_orders WHERE id = 246; --- commands with a USING clause are unsupported +-- commands with a USING clause are supported CREATE TABLE bidders ( name text, id bigint ); DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; --- commands containing a CTE are unsupported -WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) -DELETE FROM limit_orders; +-- commands containing a CTE are supported +WITH new_orders AS (INSERT INTO limit_orders VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66)) +DELETE FROM limit_orders WHERE id < 0; INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); @@ -307,8 +304,8 @@ UPDATE limit_orders SET limit_price = 0.00 FROM bidders limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; --- commands containing a CTE are unsupported -WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) +-- the connection used for the INSERT is claimed by pull-push, causing the UPDATE to fail +WITH deleted_orders AS (INSERT INTO limit_orders VALUES (399, 'PDR', 14, '2017-07-02 16:32:15', 'sell', 43)) UPDATE limit_orders SET symbol = 'GM'; SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; @@ -572,12 +569,16 @@ WHERE id = 1; SELECT * FROM summary_table ORDER BY id; --- unsupported multi-shard updates +-- multi-shard updates with recursively planned subqueries +BEGIN; UPDATE summary_table SET average_value = average_query.average FROM ( SELECT avg(value) AS average FROM raw_table) average_query; +ROLLBACK; +BEGIN; UPDATE summary_table SET average_value = average_value + 1 WHERE id = - (SELECT id FROM raw_table WHERE value > 100); + (SELECT id FROM raw_table WHERE value > 100 LIMIT 1); +ROLLBACK; -- test complex queries UPDATE summary_table diff --git a/src/test/regress/sql/multi_mx_modifications.sql b/src/test/regress/sql/multi_mx_modifications.sql index 05fa444d6..1896376a8 100644 --- a/src/test/regress/sql/multi_mx_modifications.sql +++ b/src/test/regress/sql/multi_mx_modifications.sql @@ -84,9 +84,9 @@ INSERT INTO limit_orders_mx VALUES (2037, 'GOOG', 5634, now(), 'buy', random()), -- connect back to the other node \c - - - :worker_1_port --- commands containing a CTE are unsupported -WITH deleted_orders AS (DELETE FROM limit_orders_mx RETURNING *) -INSERT INTO limit_orders_mx DEFAULT VALUES; +-- commands containing a CTE are supported +WITH deleted_orders AS (DELETE FROM limit_orders_mx WHERE id < 0 RETURNING *) +INSERT INTO limit_orders_mx SELECT * FROM deleted_orders; -- test simple DELETE INSERT INTO limit_orders_mx VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); @@ -115,9 +115,9 @@ DELETE FROM limit_orders_mx USING bidders WHERE limit_orders_mx.id = 246 AND limit_orders_mx.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; --- commands containing a CTE are unsupported -WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *) -DELETE FROM limit_orders_mx; +-- commands containing a CTE are supported +WITH new_orders AS (INSERT INTO limit_orders_mx VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66)) +DELETE FROM limit_orders_mx WHERE id < 0; -- cursors are not supported DELETE FROM limit_orders_mx WHERE CURRENT OF cursor_name; @@ -161,8 +161,8 @@ UPDATE limit_orders_mx SET limit_price = 0.00 FROM bidders limit_orders_mx.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; --- commands containing a CTE are unsupported -WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *) +-- commands containing a CTE are supported +WITH deleted_orders AS (INSERT INTO limit_orders_mx VALUES (399, 'PDR', 14, '2017-07-02 16:32:15', 'sell', 43)) UPDATE limit_orders_mx SET symbol = 'GM'; SELECT symbol, bidder_id FROM limit_orders_mx WHERE id = 246; diff --git a/src/test/regress/sql/multi_shard_update_delete.sql b/src/test/regress/sql/multi_shard_update_delete.sql index 5f829b666..6f493a529 100644 --- a/src/test/regress/sql/multi_shard_update_delete.sql +++ b/src/test/regress/sql/multi_shard_update_delete.sql @@ -314,7 +314,7 @@ WHERE user_id IN (SELECT user_id FROM users_test_table UNION ALL SELECT user_id - FROM events_test_table) returning *; + FROM events_test_table) returning value_3; UPDATE users_test_table SET value_1 = 5 @@ -438,7 +438,7 @@ UPDATE users_test_table SET value_2 = 6 WHERE value_1 IN (SELECT 2); --- Can only use immutable functions +-- Function calls in subqueries will be recursively planned UPDATE test_table_1 SET col_3 = 6 WHERE date_col IN (SELECT now()); @@ -505,7 +505,7 @@ FROM users_test_table FULL OUTER JOIN events_test_table e2 USING (user_id) WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2; --- We can not pushdown query if there is no partition key equality +-- Non-pushdownable subqueries, but will be handled through recursive planning UPDATE users_test_table SET value_1 = 1 WHERE user_id IN (SELECT Count(value_1) @@ -580,6 +580,7 @@ SET value_2 = 5 * random() FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id; +-- Volatile functions in a subquery are recursively planned UPDATE users_test_table SET value_2 = 5 WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table); @@ -590,15 +591,16 @@ SET value_2 = 5 FROM events_test_table_local WHERE users_test_table.user_id = events_test_table_local.user_id; -UPDATE users_test_table -SET value_2 = 5 -WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local); - UPDATE events_test_table_local SET value_2 = 5 FROM users_test_table WHERE events_test_table_local.user_id = users_test_table.user_id; +-- Local tables in a subquery are supported through recursive planning +UPDATE users_test_table +SET value_2 = 5 +WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local); + -- Shard counts of tables must be equal to pushdown the query UPDATE users_test_table SET value_2 = 5 diff --git a/src/test/regress/sql/recursive_dml_queries_mx.sql b/src/test/regress/sql/recursive_dml_queries_mx.sql new file mode 100644 index 000000000..8af3731e4 --- /dev/null +++ b/src/test/regress/sql/recursive_dml_queries_mx.sql @@ -0,0 +1,159 @@ +CREATE SCHEMA recursive_dml_queries_mx; +SET search_path TO recursive_dml_queries_mx, public; + +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO streaming; + +CREATE TABLE recursive_dml_queries_mx.distributed_table (tenant_id text, dept int, info jsonb); +SELECT create_distributed_table('distributed_table', 'tenant_id'); + +CREATE TABLE recursive_dml_queries_mx.second_distributed_table (tenant_id text, dept int, info jsonb); +SELECT create_distributed_table('second_distributed_table', 'tenant_id'); + +CREATE TABLE recursive_dml_queries_mx.reference_table (id text, name text); +SELECT create_reference_table('reference_table'); + +INSERT INTO distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i; +INSERT INTO second_distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i; +INSERT INTO reference_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i; + +SET client_min_messages TO DEBUG1; + +-- the subquery foo is recursively planned +UPDATE + reference_table +SET + name = 'new_' || name +FROM +( + SELECT + avg(second_distributed_table.tenant_id::int) as avg_tenant_id + FROM + second_distributed_table +) as foo +WHERE + foo.avg_tenant_id::int::text = reference_table.id; + +-- the subquery foo is recursively planned +-- but note that the subquery foo itself is pushdownable +UPDATE + second_distributed_table +SET + dept = foo.max_dept * 2 +FROM +( + SELECT DISTINCT ON (tenant_id) tenant_id, max(dept) as max_dept FROM + ( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + ) foo_inner + GROUP BY + tenant_id + ORDER BY 1 DESC +) as foo +WHERE + foo.tenant_id != second_distributed_table.tenant_id + AND second_distributed_table.dept IN (2); + +-- run some queries from worker nodes +\c - - - :worker_1_port +SET search_path TO recursive_dml_queries_mx, public; + +-- the subquery foo is recursively planned +-- and foo itself is a non colocated subquery and recursively planned +UPDATE + second_distributed_table +SET + dept = foo.tenant_id::int / 4 +FROM +( + SELECT DISTINCT foo_inner_1.tenant_id FROM + ( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (3,4) + ) foo_inner_1, + ( + SELECT + second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (4,5) + )foo_inner_2 + WHERE foo_inner_1.tenant_id != foo_inner_2.tenant_id +) as foo +WHERE + foo.tenant_id != second_distributed_table.tenant_id + AND second_distributed_table.dept IN (3); + +-- use the second worker +\c - - - :worker_2_port +SET search_path TO recursive_dml_queries_mx, public; + +CREATE TABLE recursive_dml_queries_mx.local_table (id text, name text); +INSERT INTO local_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i; + +CREATE VIEW tenant_ids AS + SELECT + tenant_id, name + FROM + distributed_table, reference_table + WHERE + distributed_table.dept::text = reference_table.id + ORDER BY 2 DESC, 1 DESC; + +-- we currently do not allow local tables in modification queries +UPDATE + distributed_table +SET + dept = avg_tenant_id::int +FROM +( + SELECT + avg(local_table.id::int) as avg_tenant_id + FROM + local_table +) as foo +WHERE + foo.avg_tenant_id::int::text = distributed_table.tenant_id +RETURNING + distributed_table.*; + +-- we currently do not allow views in modification queries +UPDATE + distributed_table +SET + dept = avg_tenant_id::int +FROM +( + SELECT + avg(tenant_id::int) as avg_tenant_id + FROM + tenant_ids +) as foo +WHERE + foo.avg_tenant_id::int::text = distributed_table.tenant_id +RETURNING + distributed_table.*; + +DROP TABLE local_table; + +\c - - - :master_port +SET search_path TO recursive_dml_queries_mx, public; + +RESET client_min_messages; +DROP SCHEMA recursive_dml_queries_mx CASCADE; +RESET citus.shard_replication_factor; +RESET citus.replication_model; diff --git a/src/test/regress/sql/recursive_dml_with_different_planners_executors.sql b/src/test/regress/sql/recursive_dml_with_different_planners_executors.sql new file mode 100644 index 000000000..19e86681d --- /dev/null +++ b/src/test/regress/sql/recursive_dml_with_different_planners_executors.sql @@ -0,0 +1,66 @@ +CREATE SCHEMA recursive_dml_with_different_planner_executors; +SET search_path TO recursive_dml_with_different_planner_executors, public; + +CREATE TABLE distributed_table (tenant_id text, dept int, info jsonb); +SELECT create_distributed_table('distributed_table', 'tenant_id'); + +CREATE TABLE second_distributed_table (tenant_id text, dept int, info jsonb); +SELECT create_distributed_table('second_distributed_table', 'tenant_id'); + +CREATE TABLE reference_table (id text, name text); +SELECT create_reference_table('reference_table'); + +INSERT INTO distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i; +INSERT INTO second_distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i; + +SET client_min_messages TO DEBUG1; + +-- subquery with router planner +-- joined with a real-time query +UPDATE + distributed_table +SET dept = foo.dept FROM + (SELECT tenant_id, dept FROM second_distributed_table WHERE dept = 1 ) as foo, + (SELECT tenant_id FROM second_distributed_table WHERE dept IN (1, 2, 3, 4) OFFSET 0) as bar + WHERE foo.tenant_id = bar.tenant_id + AND distributed_table.tenant_id = bar.tenant_id; + +-- a non colocated subquery inside the UPDATE +UPDATE distributed_table SET dept = foo.max_dept FROM +( + SELECT + max(dept) as max_dept + FROM + (SELECT DISTINCT tenant_id, dept FROM distributed_table) as distributed_table + WHERE tenant_id NOT IN + (SELECT tenant_id FROM second_distributed_table WHERE dept IN (1, 2, 3, 4)) +) as foo WHERE foo.max_dept > dept * 3; + + +-- subquery with repartition query +SET citus.enable_repartition_joins to ON; + +UPDATE distributed_table SET dept = foo.some_tenants::int FROM +( + SELECT + DISTINCT second_distributed_table.tenant_id as some_tenants + FROM second_distributed_table, distributed_table WHERE second_distributed_table.dept = distributed_table.dept +) as foo; + +SET citus.enable_repartition_joins to OFF; + +-- final query is router +UPDATE distributed_table SET dept = foo.max_dept FROM +( + SELECT + max(dept) as max_dept + FROM + (SELECT DISTINCT tenant_id, dept FROM distributed_table) as distributed_table + WHERE tenant_id IN + (SELECT tenant_id FROM second_distributed_table WHERE dept IN (1, 2, 3, 4)) +) as foo WHERE foo.max_dept >= dept and tenant_id = '8'; + + +RESET client_min_messages; +DROP SCHEMA recursive_dml_with_different_planner_executors CASCADE; +SET search_path TO public; diff --git a/src/test/regress/sql/with_dml.sql b/src/test/regress/sql/with_dml.sql new file mode 100644 index 000000000..4ba278f90 --- /dev/null +++ b/src/test/regress/sql/with_dml.sql @@ -0,0 +1,138 @@ +CREATE SCHEMA with_dml; +SET search_path TO with_dml, public; + +CREATE TABLE with_dml.distributed_table (tenant_id text PRIMARY KEY, dept int); +SELECT create_distributed_table('distributed_table', 'tenant_id'); + + +CREATE TABLE with_dml.second_distributed_table (tenant_id text, dept int); +SELECT create_distributed_table('second_distributed_table', 'tenant_id'); + +CREATE TABLE with_dml.reference_table (id text, name text); +SELECT create_reference_table('reference_table'); + +INSERT INTO distributed_table SELECT i::text, i % 10 FROM generate_series (0, 100) i; +INSERT INTO second_distributed_table SELECT i::text, i % 10 FROM generate_series (0, 100) i; +INSERT INTO reference_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i; + +SET client_min_messages TO DEBUG1; + +-- delete all tenants from the reference table whose dept is 1 +WITH ids_to_delete AS ( + SELECT tenant_id FROM distributed_table WHERE dept = 1 +) +DELETE FROM reference_table WHERE id IN (SELECT tenant_id FROM ids_to_delete); + +-- update the name of the users whose dept is 2 +WITH ids_to_update AS ( + SELECT tenant_id FROM distributed_table WHERE dept = 2 +) +UPDATE reference_table SET name = 'new_' || name WHERE id IN (SELECT tenant_id FROM ids_to_update); + +-- now the CTE is also modifying +WITH ids_deleted_3 AS +( + DELETE FROM distributed_table WHERE dept = 3 RETURNING tenant_id +), +ids_deleted_4 AS +( + DELETE FROM distributed_table WHERE dept = 4 RETURNING tenant_id +) +DELETE FROM reference_table WHERE id IN (SELECT * FROM ids_deleted_3 UNION SELECT * FROM ids_deleted_4); + +-- now the final UPDATE command is pushdownable +WITH ids_to_delete AS +( + SELECT tenant_id FROM distributed_table WHERE dept = 5 +) +UPDATE + distributed_table +SET + dept = dept + 1 +FROM + ids_to_delete, (SELECT tenant_id FROM distributed_table WHERE tenant_id::int < 60) as some_tenants +WHERE + some_tenants.tenant_id = ids_to_delete.tenant_id + AND distributed_table.tenant_id = some_tenants.tenant_id; + +-- this query errors out since we've some hard +-- errors in the INSERT ... SELECT pushdown +-- which prevents to fallback to recursive planning +WITH ids_to_upsert AS +( + SELECT tenant_id FROM distributed_table WHERE dept > 7 +) +INSERT INTO distributed_table + SELECT distributed_table.tenant_id FROM ids_to_upsert, distributed_table + WHERE distributed_table.tenant_id = ids_to_upsert.tenant_id + ON CONFLICT (tenant_id) DO UPDATE SET dept = 8; + +-- the following query is very similar to the above one +-- but this time the query is pulled to coordinator since +-- we return before hitting any hard errors +WITH ids_to_insert AS +( + SELECT (tenant_id::int * 100)::text as tenant_id FROM distributed_table WHERE dept > 7 +) +INSERT INTO distributed_table + SELECT DISTINCT ids_to_insert.tenant_id FROM ids_to_insert, distributed_table + WHERE distributed_table.tenant_id < ids_to_insert.tenant_id; + + +-- not a very meaningful query +-- but has two modifying CTEs along with another +-- modify statement +WITH copy_to_other_table AS ( + INSERT INTO distributed_table + SELECT * + FROM second_distributed_table + WHERE dept = 3 + ON CONFLICT (tenant_id) DO UPDATE SET dept = 4 + RETURNING * +), +main_table_deleted AS ( + DELETE + FROM distributed_table + WHERE dept < 10 + AND NOT EXISTS (SELECT 1 FROM second_distributed_table + WHERE second_distributed_table.dept = 1 + AND second_distributed_table.tenant_id = distributed_table.tenant_id) + RETURNING * +) +INSERT INTO second_distributed_table + SELECT * + FROM main_table_deleted + EXCEPT + SELECT * + FROM copy_to_other_table; + +-- CTE inside the UPDATE statement +UPDATE + second_distributed_table +SET dept = + (WITH vals AS ( + SELECT DISTINCT tenant_id::int FROM distributed_table + ) select * from vals where tenant_id = 8 ) + WHERE dept = 8; + +-- Subquery inside the UPDATE statement +UPDATE + second_distributed_table +SET dept = + + (SELECT DISTINCT tenant_id::int FROM distributed_table WHERE tenant_id = '9') + WHERE dept = 8; + +-- delete all remaining tenants +WITH ids_to_delete AS ( + SELECT tenant_id FROM distributed_table +) +DELETE FROM distributed_table WHERE tenant_id = ANY(SELECT tenant_id FROM ids_to_delete); + +WITH ids_to_delete AS ( + SELECT id FROM reference_table +) +DELETE FROM reference_table WHERE id = ANY(SELECT id FROM ids_to_delete); + +RESET client_min_messages; +DROP SCHEMA with_dml CASCADE; diff --git a/src/test/regress/sql/with_transactions.sql b/src/test/regress/sql/with_transactions.sql new file mode 100644 index 000000000..d85edd6d0 --- /dev/null +++ b/src/test/regress/sql/with_transactions.sql @@ -0,0 +1,75 @@ +CREATE SCHEMA with_transactions; +SET search_path TO with_transactions, public; + +SET citus.shard_count TO 4; +SET citus.next_placement_id TO 800000; + +CREATE TABLE with_transactions.raw_table (tenant_id int, income float, created_at timestamptz); +SELECT create_distributed_table('raw_table', 'tenant_id'); + +CREATE TABLE with_transactions.second_raw_table (tenant_id int, income float, created_at timestamptz); +SELECT create_distributed_table('second_raw_table', 'tenant_id'); + + +INSERT INTO + raw_table (tenant_id, income, created_at) +SELECT + i % 10, i * 10.0, timestamp '2014-01-10 20:00:00' + i * interval '1 day' +FROM + generate_series (0, 100) i; + +INSERT INTO second_raw_table SELECT * FROM raw_table; + +SET client_min_messages TO DEBUG1; + +-- run a transaction which DELETE +BEGIN; + + WITH ids_to_delete AS + ( + SELECT tenant_id FROM raw_table WHERE income < 250 + ), + deleted_ids AS + ( + DELETE FROM raw_table WHERE created_at < '2014-02-10 20:00:00' AND tenant_id IN (SELECT * from ids_to_delete) RETURNING tenant_id + ) + UPDATE raw_table SET income = income * 2 WHERE tenant_id IN (SELECT tenant_id FROM deleted_ids); + +ROLLBACK; + +-- see that both UPDATE and DELETE commands are rollbacked +SELECT count(*) FROM raw_table; +SELECT max(income) FROM raw_table; + +-- multi-statement multi shard modifying statements should work +BEGIN; + SELECT count (*) FROM second_raw_table; + + WITH distinct_count AS ( + SELECT count(DISTINCT created_at) FROM raw_table + ), + ids_inserted AS + ( + INSERT INTO raw_table VALUES (11, 1000, now()) RETURNING tenant_id + ) + UPDATE raw_table SET created_at = '2001-02-10 20:00:00' + WHERE tenant_id IN (SELECT tenant_id FROM ids_inserted) AND tenant_id < (SELECT count FROM distinct_count); + + TRUNCATE second_raw_table; +COMMIT; + +-- sequential insert followed by parallel update causes execution issues +WITH ids_inserted AS +( + INSERT INTO raw_table VALUES (11, 1000, now()), (12, 1000, now()), (13, 1000, now()) RETURNING tenant_id +) +UPDATE raw_table SET created_at = '2001-02-10 20:00:00' WHERE tenant_id IN (SELECT tenant_id FROM ids_inserted); + +-- make sure that everything committed +SELECT count(*) FROM raw_table; +SELECT count(*) FROM raw_table WHERE created_at = '2001-02-10 20:00:00'; +SELECT count(*) FROM second_raw_table; + +RESET client_min_messages; +RESET citus.shard_count; +DROP SCHEMA with_transactions CASCADE;