diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 7f532b695..c09bd6760 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -16,6 +16,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodes.h" +#include "distributed/insert_select_planner.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" #include "distributed/multi_planner.h" @@ -225,7 +226,7 @@ IsModifyCommand(Query *query) CmdType commandType = query->commandType; if (commandType == CMD_INSERT || commandType == CMD_UPDATE || - commandType == CMD_DELETE || query->hasModifyingCTE) + commandType == CMD_DELETE) { return true; } @@ -273,9 +274,17 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query if (IsModifyCommand(query)) { - /* modifications are always routed through the same planner/executor */ - distributedPlan = - CreateModifyPlan(originalQuery, query, plannerRestrictionContext); + if (InsertSelectQuery(originalQuery)) + { + distributedPlan = CreateDistributedInsertSelectPlan(originalQuery, + plannerRestrictionContext); + } + else + { + /* modifications are always routed through the same planner/executor */ + distributedPlan = + CreateModifyPlan(originalQuery, query, plannerRestrictionContext); + } Assert(distributedPlan); } @@ -506,7 +515,6 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) customScan->custom_private = list_make1(multiPlanData); customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN; - /* check if we have a master query */ if (multiPlan->masterQuery) { finalPlan = FinalizeNonRouterPlan(localPlan, multiPlan, customScan); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 5a053f382..044aa775c 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -86,9 +86,6 @@ static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, RelationRestrictionContext * restrictionContext); -static MultiPlan * CreateInsertSelectRouterPlan(Query *originalQuery, - PlannerRestrictionContext * - plannerRestrictionContext); static bool SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext, Query *originalQuery); static Task * RouterModifyTaskForShardInterval(Query *originalQuery, @@ -170,92 +167,33 @@ MultiPlan * CreateModifyPlan(Query *originalQuery, Query *query, PlannerRestrictionContext *plannerRestrictionContext) { - if (InsertSelectQuery(originalQuery)) - { - return CreateInsertSelectRouterPlan(originalQuery, plannerRestrictionContext); - } - else - { - RelationRestrictionContext *relationRestrictionContext = - plannerRestrictionContext->relationRestrictionContext; - - return CreateSingleTaskRouterPlan(originalQuery, query, - relationRestrictionContext); - } -} - - -/* - * CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is - * either a modify task that changes a single shard, or a router task that returns - * query results from a single worker. Supported modify queries (insert/update/delete) - * are router plannable by default. If query is not router plannable then either NULL is - * returned, or the returned plan has planningError set to a description of the problem. - */ -static MultiPlan * -CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, - RelationRestrictionContext *restrictionContext) -{ - CmdType commandType = query->commandType; - bool modifyTask = false; - Job *job = NULL; + Oid distributedTableId = ExtractFirstDistributedTableId(originalQuery); + ShardInterval *targetShardInterval = NULL; Task *task = NULL; + Job *job = NULL; List *placementList = NIL; MultiPlan *multiPlan = CitusMakeNode(MultiPlan); multiPlan->operation = query->commandType; - if (commandType == CMD_INSERT || commandType == CMD_UPDATE || - commandType == CMD_DELETE) + multiPlan->planningError = ModifyQuerySupported(query); + if (multiPlan->planningError != NULL) { - modifyTask = true; + return multiPlan; } - if (modifyTask) + targetShardInterval = TargetShardIntervalForModify(distributedTableId, query, + &multiPlan->planningError); + if (multiPlan->planningError != NULL) { - Oid distributedTableId = ExtractFirstDistributedTableId(originalQuery); - ShardInterval *targetShardInterval = NULL; - DeferredErrorMessage *planningError = NULL; - - /* FIXME: this should probably rather be inlined into CreateModifyPlan */ - planningError = ModifyQuerySupported(query); - if (planningError != NULL) - { - multiPlan->planningError = planningError; - return multiPlan; - } - - targetShardInterval = TargetShardIntervalForModify(distributedTableId, query, - &planningError); - if (planningError != NULL) - { - multiPlan->planningError = planningError; - return multiPlan; - } - - task = RouterModifyTask(distributedTableId, originalQuery, targetShardInterval); - Assert(task); - } - else - { - /* FIXME: this should probably rather be inlined into CreateSelectPlan */ - multiPlan->planningError = ErrorIfQueryHasModifyingCTE(query); - if (multiPlan->planningError) - { - return multiPlan; - } - task = RouterSelectTask(originalQuery, restrictionContext, &placementList); + return multiPlan; } - if (task == NULL) - { - return NULL; - } + task = RouterModifyTask(distributedTableId, originalQuery, targetShardInterval); ereport(DEBUG2, (errmsg("Creating router plan"))); job = RouterQueryJob(originalQuery, task, placementList); - multiPlan->workerJob = job; multiPlan->masterQuery = NULL; multiPlan->routerExecutable = true; @@ -270,15 +208,59 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, } +/* + * CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is + * either a modify task that changes a single shard, or a router task that returns + * query results from a single worker. Supported modify queries (insert/update/delete) + * are router plannable by default. If query is not router plannable then either NULL is + * returned, or the returned plan has planningError set to a description of the problem. + */ +static MultiPlan * +CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext) +{ + Job *job = NULL; + Task *task = NULL; + List *placementList = NIL; + MultiPlan *multiPlan = CitusMakeNode(MultiPlan); + + multiPlan->operation = query->commandType; + + /* FIXME: this should probably rather be inlined into CreateRouterPlan */ + multiPlan->planningError = ErrorIfQueryHasModifyingCTE(query); + if (multiPlan->planningError) + { + return multiPlan; + } + + task = RouterSelectTask(originalQuery, restrictionContext, &placementList); + if (task == NULL) + { + return NULL; + } + + ereport(DEBUG2, (errmsg("Creating router plan"))); + + job = RouterQueryJob(originalQuery, task, placementList); + + multiPlan->workerJob = job; + multiPlan->masterQuery = NULL; + multiPlan->routerExecutable = true; + multiPlan->hasReturning = false; + + return multiPlan; +} + + /* * Creates a router plan for INSERT ... SELECT queries which could consists of * multiple tasks. * * The function never returns NULL, it errors out if cannot create the multi plan. */ -static MultiPlan * -CreateInsertSelectRouterPlan(Query *originalQuery, - PlannerRestrictionContext *plannerRestrictionContext) +MultiPlan * +CreateDistributedInsertSelectPlan(Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext) { int shardOffset = 0; List *sqlTaskList = NIL; diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 0dd0f1e7b..54a8abc7b 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -35,6 +35,9 @@ extern bool RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext, List **placementList, uint64 *anchorShardId, List **relationShardList, bool replacePrunedQueryWithDummy); +extern MultiPlan * CreateDistributedInsertSelectPlan(Query *originalQuery, + PlannerRestrictionContext * + plannerRestrictionContext); extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree); extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index c6ac05c4d..273a6e826 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -426,7 +426,9 @@ WITH new_article AS ( INSERT INTO articles_hash VALUES (1, 1, 'arsenous', 9572) RETURNING * ) SELECT * FROM new_article; -ERROR: data-modifying statements are not supported in the WITH clauses of distributed queries +DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries +ERROR: could not run distributed query with complex table expressions +HINT: Consider using an equality filter on the distributed table's partition column. -- Modifying statement in nested CTE case is covered by PostgreSQL itself WITH new_article AS ( WITH nested_cte AS (