Simplify router planner call path

pull/1402/head
Marco Slot 2017-05-08 11:18:33 +02:00
parent 0c4bf2d943
commit 155db4d913
4 changed files with 77 additions and 82 deletions

View File

@ -16,6 +16,7 @@
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/insert_select_planner.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_planner.h" #include "distributed/multi_planner.h"
@ -225,7 +226,7 @@ IsModifyCommand(Query *query)
CmdType commandType = query->commandType; CmdType commandType = query->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE || if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE || query->hasModifyingCTE) commandType == CMD_DELETE)
{ {
return true; return true;
} }
@ -273,9 +274,17 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
if (IsModifyCommand(query)) if (IsModifyCommand(query))
{ {
/* modifications are always routed through the same planner/executor */ if (InsertSelectQuery(originalQuery))
distributedPlan = {
CreateModifyPlan(originalQuery, query, plannerRestrictionContext); distributedPlan = CreateDistributedInsertSelectPlan(originalQuery,
plannerRestrictionContext);
}
else
{
/* modifications are always routed through the same planner/executor */
distributedPlan =
CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
}
Assert(distributedPlan); Assert(distributedPlan);
} }
@ -506,7 +515,6 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan)
customScan->custom_private = list_make1(multiPlanData); customScan->custom_private = list_make1(multiPlanData);
customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN; customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
/* check if we have a master query */
if (multiPlan->masterQuery) if (multiPlan->masterQuery)
{ {
finalPlan = FinalizeNonRouterPlan(localPlan, multiPlan, customScan); finalPlan = FinalizeNonRouterPlan(localPlan, multiPlan, customScan);

View File

@ -86,9 +86,6 @@ static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery,
Query *query, Query *query,
RelationRestrictionContext * RelationRestrictionContext *
restrictionContext); restrictionContext);
static MultiPlan * CreateInsertSelectRouterPlan(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
static bool SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext, static bool SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext,
Query *originalQuery); Query *originalQuery);
static Task * RouterModifyTaskForShardInterval(Query *originalQuery, static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
@ -170,92 +167,33 @@ MultiPlan *
CreateModifyPlan(Query *originalQuery, Query *query, CreateModifyPlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext)
{ {
if (InsertSelectQuery(originalQuery)) Oid distributedTableId = ExtractFirstDistributedTableId(originalQuery);
{ ShardInterval *targetShardInterval = NULL;
return CreateInsertSelectRouterPlan(originalQuery, plannerRestrictionContext);
}
else
{
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
return CreateSingleTaskRouterPlan(originalQuery, query,
relationRestrictionContext);
}
}
/*
* CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is
* either a modify task that changes a single shard, or a router task that returns
* query results from a single worker. Supported modify queries (insert/update/delete)
* are router plannable by default. If query is not router plannable then either NULL is
* returned, or the returned plan has planningError set to a description of the problem.
*/
static MultiPlan *
CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext)
{
CmdType commandType = query->commandType;
bool modifyTask = false;
Job *job = NULL;
Task *task = NULL; Task *task = NULL;
Job *job = NULL;
List *placementList = NIL; List *placementList = NIL;
MultiPlan *multiPlan = CitusMakeNode(MultiPlan); MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
multiPlan->operation = query->commandType; multiPlan->operation = query->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE || multiPlan->planningError = ModifyQuerySupported(query);
commandType == CMD_DELETE) if (multiPlan->planningError != NULL)
{ {
modifyTask = true; return multiPlan;
} }
if (modifyTask) targetShardInterval = TargetShardIntervalForModify(distributedTableId, query,
&multiPlan->planningError);
if (multiPlan->planningError != NULL)
{ {
Oid distributedTableId = ExtractFirstDistributedTableId(originalQuery); return multiPlan;
ShardInterval *targetShardInterval = NULL;
DeferredErrorMessage *planningError = NULL;
/* FIXME: this should probably rather be inlined into CreateModifyPlan */
planningError = ModifyQuerySupported(query);
if (planningError != NULL)
{
multiPlan->planningError = planningError;
return multiPlan;
}
targetShardInterval = TargetShardIntervalForModify(distributedTableId, query,
&planningError);
if (planningError != NULL)
{
multiPlan->planningError = planningError;
return multiPlan;
}
task = RouterModifyTask(distributedTableId, originalQuery, targetShardInterval);
Assert(task);
}
else
{
/* FIXME: this should probably rather be inlined into CreateSelectPlan */
multiPlan->planningError = ErrorIfQueryHasModifyingCTE(query);
if (multiPlan->planningError)
{
return multiPlan;
}
task = RouterSelectTask(originalQuery, restrictionContext, &placementList);
} }
if (task == NULL) task = RouterModifyTask(distributedTableId, originalQuery, targetShardInterval);
{
return NULL;
}
ereport(DEBUG2, (errmsg("Creating router plan"))); ereport(DEBUG2, (errmsg("Creating router plan")));
job = RouterQueryJob(originalQuery, task, placementList); job = RouterQueryJob(originalQuery, task, placementList);
multiPlan->workerJob = job; multiPlan->workerJob = job;
multiPlan->masterQuery = NULL; multiPlan->masterQuery = NULL;
multiPlan->routerExecutable = true; multiPlan->routerExecutable = true;
@ -270,15 +208,59 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
} }
/*
* CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is
* either a modify task that changes a single shard, or a router task that returns
* query results from a single worker. Supported modify queries (insert/update/delete)
* are router plannable by default. If query is not router plannable then either NULL is
* returned, or the returned plan has planningError set to a description of the problem.
*/
static MultiPlan *
CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext)
{
Job *job = NULL;
Task *task = NULL;
List *placementList = NIL;
MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
multiPlan->operation = query->commandType;
/* FIXME: this should probably rather be inlined into CreateRouterPlan */
multiPlan->planningError = ErrorIfQueryHasModifyingCTE(query);
if (multiPlan->planningError)
{
return multiPlan;
}
task = RouterSelectTask(originalQuery, restrictionContext, &placementList);
if (task == NULL)
{
return NULL;
}
ereport(DEBUG2, (errmsg("Creating router plan")));
job = RouterQueryJob(originalQuery, task, placementList);
multiPlan->workerJob = job;
multiPlan->masterQuery = NULL;
multiPlan->routerExecutable = true;
multiPlan->hasReturning = false;
return multiPlan;
}
/* /*
* Creates a router plan for INSERT ... SELECT queries which could consists of * Creates a router plan for INSERT ... SELECT queries which could consists of
* multiple tasks. * multiple tasks.
* *
* The function never returns NULL, it errors out if cannot create the multi plan. * The function never returns NULL, it errors out if cannot create the multi plan.
*/ */
static MultiPlan * MultiPlan *
CreateInsertSelectRouterPlan(Query *originalQuery, CreateDistributedInsertSelectPlan(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext)
{ {
int shardOffset = 0; int shardOffset = 0;
List *sqlTaskList = NIL; List *sqlTaskList = NIL;

View File

@ -35,6 +35,9 @@ extern bool RouterSelectQuery(Query *originalQuery,
RelationRestrictionContext *restrictionContext, RelationRestrictionContext *restrictionContext,
List **placementList, uint64 *anchorShardId, List **placementList, uint64 *anchorShardId,
List **relationShardList, bool replacePrunedQueryWithDummy); List **relationShardList, bool replacePrunedQueryWithDummy);
extern MultiPlan * CreateDistributedInsertSelectPlan(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree); extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree);
extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
RangeTblEntry *insertRte, RangeTblEntry *insertRte,

View File

@ -426,7 +426,9 @@ WITH new_article AS (
INSERT INTO articles_hash VALUES (1, 1, 'arsenous', 9572) RETURNING * INSERT INTO articles_hash VALUES (1, 1, 'arsenous', 9572) RETURNING *
) )
SELECT * FROM new_article; SELECT * FROM new_article;
ERROR: data-modifying statements are not supported in the WITH clauses of distributed queries DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
ERROR: could not run distributed query with complex table expressions
HINT: Consider using an equality filter on the distributed table's partition column.
-- Modifying statement in nested CTE case is covered by PostgreSQL itself -- Modifying statement in nested CTE case is covered by PostgreSQL itself
WITH new_article AS ( WITH new_article AS (
WITH nested_cte AS ( WITH nested_cte AS (