Add support for CTEs in distributed queries

pull/1853/head
Marco Slot 2017-12-04 18:50:09 +01:00
parent d0335ec818
commit 2e2b4e81fa
15 changed files with 1128 additions and 166 deletions

View File

@ -18,6 +18,7 @@
#include "distributed/multi_server_executor.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/subplan_execution.h"
#include "distributed/worker_protocol.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"

View File

@ -539,8 +539,14 @@ CreateIntermediateResultsDirectory(void)
if (!CreatedResultsDirectory)
{
makeOK = mkdir(resultDirectory, S_IRWXU);
if (makeOK != 0 && errno != EEXIST)
if (makeOK != 0)
{
if (errno == EEXIST)
{
/* someone else beat us to it, that's ok */
return resultDirectory;
}
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not create intermediate results directory "
"\"%s\": %m",

View File

@ -33,6 +33,7 @@
#include "distributed/multi_router_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/resource_lock.h"
#include "distributed/subplan_execution.h"
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h"
#include "storage/fd.h"
@ -1046,6 +1047,8 @@ RealTimeExecScan(CustomScanState *node)
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
PrepareMasterJobDirectory(workerJob);
ExecuteSubPlans(distributedPlan);
MultiRealTimeExecute(workerJob);
LoadTuplesIntoTupleStore(scanState, workerJob);

View File

@ -41,6 +41,7 @@
#include "distributed/multi_router_planner.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/placement_connection.h"
#include "distributed/subplan_execution.h"
#include "distributed/relay_utility.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
@ -543,6 +544,8 @@ RouterSelectExecScan(CustomScanState *node)
/* we are taking locks on partitions of partitioned tables */
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
ExecuteSubPlans(distributedPlan);
if (list_length(taskList) > 0)
{
Task *task = (Task *) linitial(taskList);

View File

@ -3025,6 +3025,13 @@ TaskTrackerExecScan(CustomScanState *node)
{
DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = distributedPlan->workerJob;
Query *jobQuery = workerJob->jobQuery;
if (ContainsReadIntermediateResultFunction((Node *) jobQuery))
{
ereport(ERROR, (errmsg("Complex subqueries and CTEs are not supported when "
"task_executor_type is set to 'task-tracker'")));
}
/* we are taking locks on partitions of partitioned tables */
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);

View File

@ -0,0 +1,55 @@
/*-------------------------------------------------------------------------
*
* subplan_execution.c
*
* Functions for execution subplans prior to distributed table execution.
*
* Copyright (c) 2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/intermediate_results.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/recursive_planning.h"
#include "distributed/subplan_execution.h"
#include "distributed/worker_manager.h"
#include "executor/executor.h"
/*
* ExecuteSubPlans executes a list of subplans from a distributed plan
* by sequentially executing each plan from the top.
*/
void
ExecuteSubPlans(DistributedPlan *distributedPlan)
{
uint64 planId = distributedPlan->planId;
List *subPlanList = distributedPlan->subPlanList;
ListCell *subPlanCell = NULL;
List *nodeList = ActiveReadableNodeList();
bool writeLocalFile = false;
foreach(subPlanCell, subPlanList)
{
DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell);
PlannedStmt *plannedStmt = subPlan->plan;
uint32 subPlanId = subPlan->subPlanId;
DestReceiver *copyDest = NULL;
ParamListInfo params = NULL;
EState *estate = NULL;
char *resultId = GenerateResultId(planId, subPlanId);
estate = CreateExecutorState();
copyDest = (DestReceiver *) CreateRemoteFileDestReceiver(resultId, estate,
nodeList,
writeLocalFile);
ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest);
FreeExecutorState(estate);
}
}

View File

@ -26,26 +26,37 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_master_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/recursive_planning.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "parser/parsetree.h"
#include "optimizer/pathnode.h"
#include "optimizer/planner.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
static List *plannerRestrictionContextList = NIL;
int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log level */
static uint64 NextPlanId = 1;
/* local function forward declarations */
static bool NeedsDistributedPlanningWalker(Node *node, void *context);
static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery,
Query *query, ParamListInfo boundParams,
static PlannedStmt * CreateDistributedPlan(uint64 planId, PlannedStmt *localPlan,
Query *originalQuery, Query *query,
ParamListInfo boundParams,
PlannerRestrictionContext *
plannerRestrictionContext);
static DistributedPlan * CreateDistributedSelectPlan(uint64 planId, Query *originalQuery,
Query *query,
ParamListInfo boundParams,
bool hasUnresolvedParams,
PlannerRestrictionContext *
plannerRestrictionContext);
static Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams);
static void AssignRTEIdentities(Query *queryTree);
static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
@ -76,6 +87,11 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
PlannerRestrictionContext *plannerRestrictionContext = NULL;
bool setPartitionedTablesInherited = false;
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
{
needsDistributedPlanning = true;
}
if (needsDistributedPlanning)
{
/*
@ -121,7 +137,9 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
if (needsDistributedPlanning)
{
result = CreateDistributedPlan(result, originalQuery, parse,
uint64 planId = NextPlanId++;
result = CreateDistributedPlan(planId, result, originalQuery, parse,
boundParams, plannerRestrictionContext);
}
}
@ -336,7 +354,6 @@ static void
AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier)
{
Assert(rangeTableEntry->rtekind == RTE_RELATION);
Assert(rangeTableEntry->values_lists == NIL);
rangeTableEntry->values_lists = list_make1_int(rteIdentifier);
}
@ -447,8 +464,8 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan)
* query into a distributed plan.
*/
static PlannedStmt *
CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query,
ParamListInfo boundParams,
CreateDistributedPlan(uint64 planId, PlannedStmt *localPlan, Query *originalQuery,
Query *query, ParamListInfo boundParams,
PlannerRestrictionContext *plannerRestrictionContext)
{
DistributedPlan *distributedPlan = NULL;
@ -480,56 +497,10 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
}
else
{
/*
* For select queries we, if router executor is enabled, first try to
* plan the query as a router query. If not supported, otherwise try
* the full blown plan/optimize/physical planing process needed to
* produce distributed query plans.
*/
if (EnableRouterExecution)
{
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
distributedPlan = CreateRouterPlan(originalQuery, query,
relationRestrictionContext);
/* for debugging it's useful to display why query was not router plannable */
if (distributedPlan && distributedPlan->planningError)
{
RaiseDeferredError(distributedPlan->planningError, DEBUG1);
}
}
/*
* Router didn't yield a plan, try the full distributed planner. As
* real-time/task-tracker don't support prepared statement parameters,
* skip planning in that case (we'll later trigger an error in that
* case if necessary).
*/
if ((!distributedPlan || distributedPlan->planningError) && !hasUnresolvedParams)
{
MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query,
plannerRestrictionContext,
boundParams);
MultiLogicalPlanOptimize(logicalPlan);
/*
* This check is here to make it likely that all node types used in
* Citus are dumpable. Explain can dump logical and physical plans
* using the extended outfuncs infrastructure, but it's infeasible to
* test most plans. MultiQueryContainerNode always serializes the
* physical plan, so there's no need to check that separately.
*/
CheckNodeIsDumpable((Node *) logicalPlan);
/* Create the physical plan */
distributedPlan = CreatePhysicalDistributedPlan(logicalPlan,
plannerRestrictionContext);
/* distributed plan currently should always succeed or error out */
Assert(distributedPlan && distributedPlan->planningError == NULL);
}
distributedPlan =
CreateDistributedSelectPlan(planId, originalQuery, query, boundParams,
hasUnresolvedParams,
plannerRestrictionContext);
}
/*
@ -571,6 +542,9 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
RaiseDeferredError(distributedPlan->planningError, ERROR);
}
/* remember the plan's identifier for identifying subplans */
distributedPlan->planId = planId;
/* create final plan by combining local plan with distributed plan */
resultPlan = FinalizePlan(localPlan, distributedPlan);
@ -593,6 +567,258 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
}
/*
* CreateDistributedSelectPlan generates a distributed plan for a SELECT query.
* It goes through 3 steps:
*
* 1. Try router planner
* 2. Generate subplans for CTEs and complex subqueries
* - If any, go back to step 1 by calling itself recursively
* 3. Logical planner
*/
static DistributedPlan *
CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query,
ParamListInfo boundParams, bool hasUnresolvedParams,
PlannerRestrictionContext *plannerRestrictionContext)
{
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
DistributedPlan *distributedPlan = NULL;
MultiTreeRoot *logicalPlan = NULL;
DeferredErrorMessage *error = NULL;
List *subPlanList = NIL;
/*
* For select queries we, if router executor is enabled, first try to
* plan the query as a router query. If not supported, otherwise try
* the full blown plan/optimize/physical planing process needed to
* produce distributed query plans.
*/
distributedPlan = CreateRouterPlan(originalQuery, query,
relationRestrictionContext);
if (distributedPlan != NULL)
{
if (distributedPlan->planningError == NULL)
{
/* successfully created a router plan */
return distributedPlan;
}
else
{
/*
* For debugging it's useful to display why query was not
* router plannable.
*/
RaiseDeferredError(distributedPlan->planningError, DEBUG1);
}
}
if (hasUnresolvedParams)
{
/*
* There are parameters that don't have a value in boundParams.
*
* The remainder of the planning logic cannot handle unbound
* parameters. We return a NULL plan, which will have an
* extremely high cost, such that postgres will replan with
* bound parameters.
*/
return NULL;
}
/*
* If there are parameters that do have a value in boundParams, replace
* them in the original query. This allows us to more easily cut the
* query into pieces (during recursive planning) or deparse parts of
* the query (during subquery pushdown planning).
*/
originalQuery = (Query *) ResolveExternalParams((Node *) originalQuery,
boundParams);
/*
* Plan subqueries and CTEs that cannot be pushed down by recursively
* calling the planner and add the resulting plans to subPlanList.
*/
error = RecursivelyPlanSubqueriesAndCTEs(originalQuery, plannerRestrictionContext,
planId, &subPlanList);
if (error != NULL)
{
RaiseDeferredError(error, ERROR);
}
/*
* If subqueries were recursively planned then we need to replan the query
* to get the new planner restriction context and apply planner transformations.
*
* We could simplify this code if the logical planner was capable of dealing
* with an original query. In that case, we would only have to filter the
* planner restriction context.
*/
if (list_length(subPlanList) > 0)
{
Query *newQuery = copyObject(originalQuery);
bool setPartitionedTablesInherited = false;
/* remove the pre-transformation planner restrictions context */
PopPlannerRestrictionContext();
/* create a fresh new planner context */
plannerRestrictionContext = CreateAndPushPlannerRestrictionContext();
/*
* We force standard_planner to treat partitioned tables as regular tables
* by clearing the inh flag on RTEs. We already did this at the start of
* distributed_planner, but on a copy of the original query, so we need
* to do it again here.
*/
AdjustPartitioningForDistributedPlanning(newQuery, setPartitionedTablesInherited);
/*
* Some relations may have been removed from the query, but we can skip
* AssignRTEIdentities since we currently do not rely on RTE identities
* being contiguous.
*/
standard_planner(newQuery, 0, boundParams);
/* overwrite the old transformed query with the new transformed query */
memcpy(query, newQuery, sizeof(Query));
/* recurse into CreateDistributedSelectPlan with subqueries/CTEs replaced */
distributedPlan = CreateDistributedSelectPlan(planId, originalQuery, query, NULL,
false, plannerRestrictionContext);
distributedPlan->subPlanList = subPlanList;
return distributedPlan;
}
/*
* CTEs are stripped from the original query by RecursivelyPlanSubqueriesAndCTEs.
* If we get here and there are still CTEs that means that none of the CTEs are
* referenced. We therefore also strip the CTEs from the rewritten query.
*/
query->cteList = NIL;
Assert(originalQuery->cteList == NIL);
logicalPlan = MultiLogicalPlanCreate(originalQuery, query,
plannerRestrictionContext);
MultiLogicalPlanOptimize(logicalPlan);
/*
* This check is here to make it likely that all node types used in
* Citus are dumpable. Explain can dump logical and physical plans
* using the extended outfuncs infrastructure, but it's infeasible to
* test most plans. MultiQueryContainerNode always serializes the
* physical plan, so there's no need to check that separately
*/
CheckNodeIsDumpable((Node *) logicalPlan);
/* Create the physical plan */
distributedPlan = CreatePhysicalDistributedPlan(logicalPlan,
plannerRestrictionContext);
/* distributed plan currently should always succeed or error out */
Assert(distributedPlan && distributedPlan->planningError == NULL);
return distributedPlan;
}
/*
* ResolveExternalParams replaces the external parameters that appears
* in the query with the corresponding entries in the boundParams.
*
* Note that this function is inspired by eval_const_expr() on Postgres.
* We cannot use that function because it requires access to PlannerInfo.
*/
static Node *
ResolveExternalParams(Node *inputNode, ParamListInfo boundParams)
{
/* consider resolving external parameters only when boundParams exists */
if (!boundParams)
{
return inputNode;
}
if (inputNode == NULL)
{
return NULL;
}
if (IsA(inputNode, Param))
{
Param *paramToProcess = (Param *) inputNode;
ParamExternData *correspondingParameterData = NULL;
int numberOfParameters = boundParams->numParams;
int parameterId = paramToProcess->paramid;
int16 typeLength = 0;
bool typeByValue = false;
Datum constValue = 0;
bool paramIsNull = false;
int parameterIndex = 0;
if (paramToProcess->paramkind != PARAM_EXTERN)
{
return inputNode;
}
if (parameterId < 0)
{
return inputNode;
}
/* parameterId starts from 1 */
parameterIndex = parameterId - 1;
if (parameterIndex >= numberOfParameters)
{
return inputNode;
}
correspondingParameterData = &boundParams->params[parameterIndex];
if (!(correspondingParameterData->pflags & PARAM_FLAG_CONST))
{
return inputNode;
}
get_typlenbyval(paramToProcess->paramtype, &typeLength, &typeByValue);
paramIsNull = correspondingParameterData->isnull;
if (paramIsNull)
{
constValue = 0;
}
else if (typeByValue)
{
constValue = correspondingParameterData->value;
}
else
{
/*
* Out of paranoia ensure that datum lives long enough,
* although bind params currently should always live
* long enough.
*/
constValue = datumCopy(correspondingParameterData->value, typeByValue,
typeLength);
}
return (Node *) makeConst(paramToProcess->paramtype, paramToProcess->paramtypmod,
paramToProcess->paramcollid, typeLength, constValue,
paramIsNull, typeByValue);
}
else if (IsA(inputNode, Query))
{
return (Node *) query_tree_mutator((Query *) inputNode, ResolveExternalParams,
boundParams, 0);
}
return expression_tree_mutator(inputNode, ResolveExternalParams, boundParams);
}
/*
* GetDistributedPlan returns the associated DistributedPlan for a CustomScan.
*/

View File

@ -72,6 +72,7 @@ typedef struct RemoteExplainPlan
/* Explain functions for distributed queries */
static void ExplainSubPlans(List *subPlanList, ExplainState *es);
static void ExplainJob(Job *job, ExplainState *es);
static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es);
static void ExplainTaskList(List *taskList, ExplainState *es);
@ -124,6 +125,11 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es
ExplainOpenGroup("Distributed Query", "Distributed Query", true, es);
if (distributedPlan->subPlanList != NIL)
{
ExplainSubPlans(distributedPlan->subPlanList, es);
}
ExplainJob(distributedPlan->workerJob, es);
ExplainCloseGroup("Distributed Query", "Distributed Query", true, es);
@ -166,6 +172,55 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors,
}
/*
* ExplainSubPlans generates EXPLAIN output for subplans for CTEs
* and complex subqueries. Because the planning for these queries
* is done along with the top-level plan, we cannot determine the
* planning time and set it to 0.
*/
static void
ExplainSubPlans(List *subPlanList, ExplainState *es)
{
ListCell *subPlanCell = NULL;
ExplainOpenGroup("Subplans", "Subplans", false, es);
if (es->format == EXPLAIN_FORMAT_TEXT)
{
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str, "-> Distributed Subplan\n");
es->indent += 3;
}
foreach(subPlanCell, subPlanList)
{
DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell);
PlannedStmt *plan = subPlan->plan;
IntoClause *into = NULL;
ParamListInfo params = NULL;
char *queryString = NULL;
instr_time planduration;
/* set the planning time to 0 */
INSTR_TIME_SET_CURRENT(planduration);
INSTR_TIME_SUBTRACT(planduration, planduration);
#if (PG_VERSION_NUM >= 100000)
ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration);
#else
ExplainOnePlan(plan, into, es, queryString, params, &planduration);
#endif
}
if (es->format == EXPLAIN_FORMAT_TEXT)
{
es->indent -= 3;
}
ExplainCloseGroup("Subplans", "Subplans", false, es);
}
/*
* ExplainJob shows the EXPLAIN output for a Job in the physical plan of
* a distributed query by showing the remote EXPLAIN for the first task,

View File

@ -173,7 +173,6 @@ static MultiNode * ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNo
static bool ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery);
static bool IsFunctionRTE(Node *node);
static bool FindNodeCheck(Node *node, bool (*check)(Node *));
static Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams);
static MultiNode * SubqueryMultiNodeTree(Query *originalQuery,
Query *queryTree,
PlannerRestrictionContext *
@ -194,12 +193,6 @@ static MultiTable * MultiSubqueryPushdownTable(Query *subquery);
* plan and adds a root node to top of it. The original query is only used for subquery
* pushdown planning.
*
* In order to support external parameters for the queries where planning
* is done on the original query, we need to replace the external parameters
* manually. To achive that for subquery pushdown planning, we pass boundParams
* to this function. We need to do that since Citus currently unable to send
* parameters to the workers on the execution.
*
* We also pass queryTree and plannerRestrictionContext to the planner. They
* are primarily used to decide whether the subquery is safe to pushdown.
* If not, it helps to produce meaningful error messages for subquery
@ -207,16 +200,13 @@ static MultiTable * MultiSubqueryPushdownTable(Query *subquery);
*/
MultiTreeRoot *
MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree,
PlannerRestrictionContext *plannerRestrictionContext,
ParamListInfo boundParams)
PlannerRestrictionContext *plannerRestrictionContext)
{
MultiNode *multiQueryNode = NULL;
MultiTreeRoot *rootNode = NULL;
if (ShouldUseSubqueryPushDown(originalQuery, queryTree))
{
originalQuery = (Query *) ResolveExternalParams((Node *) originalQuery,
boundParams);
multiQueryNode = SubqueryMultiNodeTree(originalQuery, queryTree,
plannerRestrictionContext);
}
@ -328,99 +318,6 @@ FindNodeCheck(Node *node, bool (*check)(Node *))
}
/*
* ResolveExternalParams replaces the external parameters that appears
* in the query with the corresponding entries in the boundParams.
*
* Note that this function is inspired by eval_const_expr() on Postgres.
* We cannot use that function because it requires access to PlannerInfo.
*/
static Node *
ResolveExternalParams(Node *inputNode, ParamListInfo boundParams)
{
/* consider resolving external parameters only when boundParams exists */
if (!boundParams)
{
return inputNode;
}
if (inputNode == NULL)
{
return NULL;
}
if (IsA(inputNode, Param))
{
Param *paramToProcess = (Param *) inputNode;
ParamExternData *correspondingParameterData = NULL;
int numberOfParameters = boundParams->numParams;
int parameterId = paramToProcess->paramid;
int16 typeLength = 0;
bool typeByValue = false;
Datum constValue = 0;
bool paramIsNull = false;
int parameterIndex = 0;
if (paramToProcess->paramkind != PARAM_EXTERN)
{
return inputNode;
}
if (parameterId < 0)
{
return inputNode;
}
/* parameterId starts from 1 */
parameterIndex = parameterId - 1;
if (parameterIndex >= numberOfParameters)
{
return inputNode;
}
correspondingParameterData = &boundParams->params[parameterIndex];
if (!(correspondingParameterData->pflags & PARAM_FLAG_CONST))
{
return inputNode;
}
get_typlenbyval(paramToProcess->paramtype, &typeLength, &typeByValue);
paramIsNull = correspondingParameterData->isnull;
if (paramIsNull)
{
constValue = 0;
}
else if (typeByValue)
{
constValue = correspondingParameterData->value;
}
else
{
/*
* Out of paranoia ensure that datum lives long enough,
* although bind params currently should always live
* long enough.
*/
constValue = datumCopy(correspondingParameterData->value, typeByValue,
typeLength);
}
return (Node *) makeConst(paramToProcess->paramtype, paramToProcess->paramtypmod,
paramToProcess->paramcollid, typeLength, constValue,
paramIsNull, typeByValue);
}
else if (IsA(inputNode, Query))
{
return (Node *) query_tree_mutator((Query *) inputNode, ResolveExternalParams,
boundParams, 0);
}
return expression_tree_mutator(inputNode, ResolveExternalParams, boundParams);
}
/*
* SublinkList finds the subquery nodes in the where clause of the given query. Note
* that the function should be called on the original query given that postgres
@ -1340,7 +1237,7 @@ DeferErrorIfUnsupportedTableCombination(Query *queryTree)
else if (rangeTableEntry->rtekind == RTE_CTE)
{
unsupportedTableCombination = true;
errorDetail = "CTEs in multi-shard queries are currently unsupported";
errorDetail = "CTEs in subqueries are currently unsupported";
break;
}
else if (rangeTableEntry->rtekind == RTE_VALUES)

View File

@ -162,8 +162,6 @@ DistributedPlan *
CreateRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext)
{
Assert(EnableRouterExecution);
if (MultiRouterPlannableQuery(query, restrictionContext))
{
return CreateSingleTaskRouterPlan(originalQuery, query,
@ -1623,6 +1621,14 @@ SelectsFromDistributedTable(List *rangeTableList)
* If the given query is not routable, it fills planningError with the related
* DeferredErrorMessage. The caller can check this error message to see if query
* is routable or not.
*
* Note: If the query prunes down to 0 shards due to filters (e.g. WHERE false),
* or the query has only read_intermediate_result calls (no relations left after
* recursively planning CTEs and subqueries), then it will be assigned to an
* arbitrary worker node in a round-robin fashion.
*
* Relations that prune down to 0 shards are replaced by subqueries returning
* 0 values in UpdateRelationToShardNames.
*/
DeferredErrorMessage *
PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext,

View File

@ -0,0 +1,650 @@
/*-------------------------------------------------------------------------
*
* recursive_planning.c
*
* Logic for calling the postgres planner recursively for CTEs and
* non-pushdownable subqueries in distributed queries.
*
* PostgreSQL with Citus can execute 4 types of queries:
*
* - Postgres queries on local tables and functions.
*
* These queries can use all SQL features, but they may not reference
* distributed tables.
*
* - Router queries that can be executed on a single by node by replacing
* table names with shard names.
*
* These queries can use nearly all SQL features, but only if they have
* a single-valued filter on the distribution column.
*
* - Real-time queries that can be executed by performing a task for each
* shard in a distributed table and performing a merge step.
*
* These queries have limited SQL support. They may only include
* subqueries if the subquery can be executed on each shard by replacing
* table names with shard names and concatenating the result.
*
* - Task-tracker queries that can be executed through a tree of
* re-partitioning operations.
*
* These queries have very limited SQL support and only support basic
* inner joins and subqueries without joins.
*
* To work around the limitations of these planners, we recursively call
* the planner for CTEs and unsupported subqueries to obtain a list of
* subplans.
*
* During execution, each subplan is executed separately through the method
* that is appropriate for that query. The results are written to temporary
* files on the workers. In the original query, the CTEs and subqueries are
* replaced by mini-subqueries that read from the temporary files.
*
* This allows almost all SQL to be directly or indirectly supported,
* because if all subqueries that contain distributed tables have been
* replaced then what remains is a router query which can use nearly all
* SQL features.
*
* Copyright (c) 2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/pg_type.h"
#include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/distributed_planner.h"
#include "distributed/errormessage.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/recursive_planning.h"
#include "distributed/relation_restriction_equivalence.h"
#include "lib/stringinfo.h"
#include "optimizer/planner.h"
#include "nodes/nodeFuncs.h"
#include "nodes/nodes.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#include "utils/builtins.h"
#include "utils/guc.h"
/*
* RecursivePlanningContext is used to recursively plan subqueries
* and CTEs, pull results to the coordinator, and push it back into
* the workers.
*/
typedef struct RecursivePlanningContext
{
int level;
uint64 planId;
List *subPlanList;
PlannerRestrictionContext *plannerRestrictionContext;
} RecursivePlanningContext;
/*
* CteReferenceWalkerContext is used to collect CTE references in
* CteReferenceListWalker.
*/
typedef struct CteReferenceWalkerContext
{
int level;
List *cteReferenceList;
} CteReferenceWalkerContext;
/*
* VarLevelsUpWalkerContext is used to find Vars in a (sub)query that
* refer to upper levels and therefore cannot be planned separately.
*/
typedef struct VarLevelsUpWalkerContext
{
int level;
} VarLevelsUpWalkerContext;
/* local function forward declarations */
static DeferredErrorMessage * RecursivelyPlanCTEs(Query *query,
RecursivePlanningContext *context);
static DistributedSubPlan * CreateDistributedSubPlan(uint32 subPlanId,
Query *subPlanQuery);
static bool CteReferenceListWalker(Node *node, CteReferenceWalkerContext *context);
static bool ContainsReferencesToOuterQuery(Query *query);
static bool ContainsReferencesToOuterQueryWalker(Node *node,
VarLevelsUpWalkerContext *context);
static Query * BuildSubPlanResultQuery(Query *subquery, uint64 planId, uint32 subPlanId);
/*
* RecursivelyPlanSubqueriesAndCTEs finds subqueries and CTEs that cannot be pushed down to
* workers directly and instead plans them by recursively calling the planner and
* adding the subplan to subPlanList.
*
* Subplans are executed prior to the distributed plan and the results are written
* to temporary files on workers.
*
* CTE references are replaced by a subquery on the read_intermediate_result
* function, which reads from the temporary file.
*
* If recursive planning results in an error then the error is returned. Otherwise, the
* subplans will be added to subPlanList.
*/
DeferredErrorMessage *
RecursivelyPlanSubqueriesAndCTEs(Query *query,
PlannerRestrictionContext *plannerRestrictionContext,
uint64 planId, List **subPlanList)
{
DeferredErrorMessage *error = NULL;
RecursivePlanningContext context;
if (SubqueryPushdown)
{
/*
* When the subquery_pushdown flag is enabled we make some hacks
* to push down subqueries with LIMIT. Recursive planning would
* valiantly do the right thing and try to recursively plan the
* inner subqueries, but we don't really want it to because those
* subqueries might not be supported and would be much slower.
*
* Instead, we skip recursive planning altogether when
* subquery_pushdown is enabled.
*/
return NULL;
}
context.level = 0;
context.planId = planId;
context.subPlanList = NIL;
context.plannerRestrictionContext = plannerRestrictionContext;
error = RecursivelyPlanCTEs(query, &context);
if (error != NULL)
{
return error;
}
/* XXX: plan subqueries */
*subPlanList = context.subPlanList;
return NULL;
}
/*
* RecursivelyPlanCTEs plans all CTEs in the query by recursively calling the planner
* The resulting plan is added to planningContext->subPlanList and CTE references
* are replaced by subqueries that call read_intermediate_result, which reads the
* intermediate result of the CTE after it is executed.
*
* Recursive and modifying CTEs are not yet supported and return an error.
*/
static DeferredErrorMessage *
RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext)
{
ListCell *cteCell = NULL;
CteReferenceWalkerContext context = { -1, NIL };
if (query->cteList == NIL)
{
/* no CTEs, nothing to do */
return NULL;
}
if (query->hasModifyingCTE)
{
/* we could easily support these, but it's a little scary */
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"data-modifying statements are not supported in "
"the WITH clauses of distributed queries",
NULL, NULL);
}
if (query->hasRecursive)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"recursive CTEs are not supported in distributed "
"queries",
NULL, NULL);
}
/* get all RTE_CTEs that point to CTEs from cteList */
CteReferenceListWalker((Node *) query, &context);
foreach(cteCell, query->cteList)
{
CommonTableExpr *cte = (CommonTableExpr *) lfirst(cteCell);
char *cteName = cte->ctename;
Query *subquery = (Query *) cte->ctequery;
uint64 planId = planningContext->planId;
uint32 subPlanId = 0;
Query *resultQuery = NULL;
DistributedSubPlan *subPlan = NULL;
ListCell *rteCell = NULL;
int replacedCtesCount = 0;
if (ContainsReferencesToOuterQuery(subquery))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"CTEs that refer to other subqueries are not "
"supported in multi-shard queries",
NULL, NULL);
}
if (cte->cterefcount == 0)
{
/*
* CTEs that aren't referenced aren't executed in postgres. We
* don't need to generate a subplan for it and can take the rest
* of this iteration off.
*/
continue;
}
subPlanId = list_length(planningContext->subPlanList) + 1;
if (log_min_messages >= DEBUG1)
{
StringInfo subPlanString = makeStringInfo();
pg_get_query_def(subquery, subPlanString);
ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT "_%u for "
"CTE %s: %s",
planId, subPlanId, cteName, subPlanString->data)));
}
/* build a sub plan for the CTE */
subPlan = CreateDistributedSubPlan(subPlanId, subquery);
planningContext->subPlanList = lappend(planningContext->subPlanList, subPlan);
/* replace references to the CTE with a subquery that reads results */
resultQuery = BuildSubPlanResultQuery(subquery, planId, subPlanId);
foreach(rteCell, context.cteReferenceList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rteCell);
if (rangeTableEntry->rtekind != RTE_CTE)
{
/*
* This RTE pointed to a preceding CTE that was already replaced
* by a subplan.
*/
continue;
}
if (strncmp(rangeTableEntry->ctename, cteName, NAMEDATALEN) == 0)
{
/* change the RTE_CTE into an RTE_SUBQUERY */
rangeTableEntry->rtekind = RTE_SUBQUERY;
rangeTableEntry->ctename = NULL;
rangeTableEntry->ctelevelsup = 0;
if (replacedCtesCount == 0)
{
/*
* Replace the first CTE reference with the result query directly.
*/
rangeTableEntry->subquery = resultQuery;
}
else
{
/*
* Replace subsequent CTE references with a copy of the result
* query.
*/
rangeTableEntry->subquery = copyObject(resultQuery);
}
replacedCtesCount++;
}
}
Assert(cte->cterefcount == replacedCtesCount);
}
/*
* All CTEs are now executed through subplans and RTE_CTEs pointing
* to the CTE list have been replaced with subqueries. We can now
* clear the cteList.
*/
query->cteList = NIL;
return NULL;
}
/*
* CreateDistributedSubPlan creates a distributed subplan by recursively calling
* the planner from the top, which may either generate a local plan or another
* distributed plan, which can itself contain subplans.
*/
static DistributedSubPlan *
CreateDistributedSubPlan(uint32 subPlanId, Query *subPlanQuery)
{
DistributedSubPlan *subPlan = NULL;
int cursorOptions = 0;
if (ContainsReadIntermediateResultFunction((Node *) subPlanQuery))
{
/*
* Make sure we go through distributed planning if there are
* read_intermediate_result calls, even if there are no distributed
* tables in the query anymore.
*
* We cannot perform this check in the planner itself, since that
* would also cause the workers to attempt distributed planning.
*/
cursorOptions |= CURSOR_OPT_FORCE_DISTRIBUTED;
}
subPlan = CitusMakeNode(DistributedSubPlan);
subPlan->plan = planner(subPlanQuery, cursorOptions, NULL);
subPlan->subPlanId = subPlanId;
return subPlan;
}
/*
* CteReferenceListWalker finds all references to CTEs in the top level of a query
* and adds them to context->cteReferenceList.
*/
static bool
CteReferenceListWalker(Node *node, CteReferenceWalkerContext *context)
{
if (node == NULL)
{
return false;
}
if (IsA(node, RangeTblEntry))
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
if (rangeTableEntry->rtekind == RTE_CTE &&
rangeTableEntry->ctelevelsup == context->level)
{
context->cteReferenceList = lappend(context->cteReferenceList,
rangeTableEntry);
}
/* caller will descend into range table entry */
return false;
}
else if (IsA(node, Query))
{
Query *query = (Query *) node;
context->level += 1;
query_tree_walker(query, CteReferenceListWalker, context, QTW_EXAMINE_RTES);
context->level -= 1;
return false;
}
else
{
return expression_tree_walker(node, CteReferenceListWalker, context);
}
}
/*
* ContainsReferencesToOuterQuery determines whether the given query contains
* any Vars that point outside of the query itself. Such queries cannot be
* planned recursively.
*/
static bool
ContainsReferencesToOuterQuery(Query *query)
{
VarLevelsUpWalkerContext context = { 0 };
int flags = 0;
return query_tree_walker(query, ContainsReferencesToOuterQueryWalker,
&context, flags);
}
/*
* ContainsReferencesToOuterQueryWalker determines whether the given query
* contains any Vars that point more than context->level levels up.
*
* ContainsReferencesToOuterQueryWalker recursively descends into subqueries
* and increases the level by 1 before recursing.
*/
static bool
ContainsReferencesToOuterQueryWalker(Node *node, VarLevelsUpWalkerContext *context)
{
if (node == NULL)
{
return false;
}
if (IsA(node, Var))
{
if (((Var *) node)->varlevelsup > context->level)
{
return true;
}
return false;
}
else if (IsA(node, Aggref))
{
if (((Aggref *) node)->agglevelsup > context->level)
{
return true;
}
}
else if (IsA(node, GroupingFunc))
{
if (((GroupingFunc *) node)->agglevelsup > context->level)
{
return true;
}
return false;
}
else if (IsA(node, PlaceHolderVar))
{
if (((PlaceHolderVar *) node)->phlevelsup > context->level)
{
return true;
}
}
else if (IsA(node, Query))
{
Query *query = (Query *) node;
bool found = false;
int flags = 0;
context->level += 1;
found = query_tree_walker(query, ContainsReferencesToOuterQueryWalker,
context, flags);
context->level -= 1;
return found;
}
return expression_tree_walker(node, ContainsReferencesToOuterQueryWalker,
context);
}
/*
* BuildSubPlanResultQuery returns a query of the form:
*
* SELECT
* <target list>
* FROM
* read_intermediate_result('<planId>_<subPlanId>', '<copy format'>)
* AS res (<column definition list>);
*
* The target list and column definition list are derived from the given subquery.
*
* If any of the types in the target list cannot be used in the binary copy format,
* then the copy format 'text' is used, otherwise 'binary' is used.
*/
static Query *
BuildSubPlanResultQuery(Query *subquery, uint64 planId, uint32 subPlanId)
{
Query *resultQuery = NULL;
char *resultIdString = NULL;
Const *resultIdConst = NULL;
Const *resultFormatConst = NULL;
FuncExpr *funcExpr = NULL;
Alias *funcAlias = NULL;
List *funcColNames = NIL;
List *funcColTypes = NIL;
List *funcColTypMods = NIL;
List *funcColCollations = NIL;
RangeTblFunction *rangeTableFunction = NULL;
RangeTblEntry *rangeTableEntry = NULL;
RangeTblRef *rangeTableRef = NULL;
FromExpr *joinTree = NULL;
ListCell *targetEntryCell = NULL;
List *targetList = NIL;
int columnNumber = 1;
bool useBinaryCopyFormat = true;
Oid copyFormatId = BinaryCopyFormatId();
/* build the target list and column definition list */
foreach(targetEntryCell, subquery->targetList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
Node *targetExpr = (Node *) targetEntry->expr;
char *columnName = targetEntry->resname;
Oid columnType = exprType(targetExpr);
Oid columnTypMod = exprTypmod(targetExpr);
Oid columnCollation = exprCollation(targetExpr);
Var *functionColumnVar = NULL;
TargetEntry *newTargetEntry = NULL;
if (targetEntry->resjunk)
{
continue;
}
funcColNames = lappend(funcColNames, makeString(columnName));
funcColTypes = lappend_int(funcColTypes, columnType);
funcColTypMods = lappend_int(funcColTypMods, columnTypMod);
funcColCollations = lappend_int(funcColCollations, columnCollation);
functionColumnVar = makeNode(Var);
functionColumnVar->varno = 1;
functionColumnVar->varattno = columnNumber;
functionColumnVar->vartype = columnType;
functionColumnVar->vartypmod = columnTypMod;
functionColumnVar->varcollid = columnCollation;
functionColumnVar->varlevelsup = 0;
functionColumnVar->varnoold = 1;
functionColumnVar->varoattno = columnNumber;
functionColumnVar->location = -1;
newTargetEntry = makeNode(TargetEntry);
newTargetEntry->expr = (Expr *) functionColumnVar;
newTargetEntry->resno = columnNumber;
newTargetEntry->resname = columnName;
newTargetEntry->resjunk = false;
targetList = lappend(targetList, newTargetEntry);
if (useBinaryCopyFormat && !CanUseBinaryCopyFormatForType(columnType))
{
useBinaryCopyFormat = false;
}
columnNumber++;
}
/* build the result_id parameter for the call to read_intermediate_result */
resultIdString = GenerateResultId(planId, subPlanId);
resultIdConst = makeNode(Const);
resultIdConst->consttype = TEXTOID;
resultIdConst->consttypmod = -1;
resultIdConst->constlen = -1;
resultIdConst->constvalue = CStringGetTextDatum(resultIdString);
resultIdConst->constbyval = false;
resultIdConst->constisnull = false;
resultIdConst->location = -1;
/* build the citus_copy_format parameter for the call to read_intermediate_result */
if (!useBinaryCopyFormat)
{
copyFormatId = TextCopyFormatId();
}
resultFormatConst = makeNode(Const);
resultFormatConst->consttype = CitusCopyFormatTypeId();
resultFormatConst->consttypmod = -1;
resultFormatConst->constlen = 4;
resultFormatConst->constvalue = ObjectIdGetDatum(copyFormatId);
resultFormatConst->constbyval = true;
resultFormatConst->constisnull = false;
resultFormatConst->location = -1;
/* build the call to read_intermediate_result */
funcExpr = makeNode(FuncExpr);
funcExpr->funcid = CitusReadIntermediateResultFuncId();
funcExpr->funcretset = true;
funcExpr->funcvariadic = false;
funcExpr->funcformat = 0;
funcExpr->funccollid = 0;
funcExpr->inputcollid = 0;
funcExpr->location = -1;
funcExpr->args = list_make2(resultIdConst, resultFormatConst);
/* build the RTE for the call to read_intermediate_result */
rangeTableFunction = makeNode(RangeTblFunction);
rangeTableFunction->funccolcount = list_length(funcColNames);
rangeTableFunction->funccolnames = funcColNames;
rangeTableFunction->funccoltypes = funcColTypes;
rangeTableFunction->funccoltypmods = funcColTypMods;
rangeTableFunction->funccolcollations = funcColCollations;
rangeTableFunction->funcparams = NULL;
rangeTableFunction->funcexpr = (Node *) funcExpr;
funcAlias = makeNode(Alias);
funcAlias->aliasname = "intermediate_result";
funcAlias->colnames = funcColNames;
rangeTableEntry = makeNode(RangeTblEntry);
rangeTableEntry->rtekind = RTE_FUNCTION;
rangeTableEntry->functions = list_make1(rangeTableFunction);
rangeTableEntry->inFromCl = true;
rangeTableEntry->eref = funcAlias;
/* build the join tree using the read_intermediate_result RTE */
rangeTableRef = makeNode(RangeTblRef);
rangeTableRef->rtindex = 1;
joinTree = makeNode(FromExpr);
joinTree->fromlist = list_make1(rangeTableRef);
/* build the SELECT query */
resultQuery = makeNode(Query);
resultQuery->commandType = CMD_SELECT;
resultQuery->rtable = list_make1(rangeTableEntry);
resultQuery->jointree = joinTree;
resultQuery->targetList = targetList;
return resultQuery;
}
/*
* GenerateResultId generates the result ID that is used to identify an intermediate
* result of the subplan with the given plan ID and subplan ID.
*/
char *
GenerateResultId(uint64 planId, uint32 subPlanId)
{
StringInfo resultId = makeStringInfo();
appendStringInfo(resultId, UINT64_FORMAT "_%u", planId, subPlanId);
return resultId->data;
}

View File

@ -14,6 +14,7 @@
#include "nodes/relation.h"
#include "distributed/citus_nodes.h"
#include "distributed/errormessage.h"
/* values used by jobs and tasks which do not require identifiers */
@ -21,6 +22,8 @@
#define INVALID_TASK_ID 0
#define MULTI_TASK_QUERY_INFO_OFF 0 /* do not log multi-task queries */
#define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000
typedef struct RelationRestrictionContext
{
bool hasDistributedRelation;
@ -89,6 +92,7 @@ extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan);
extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan);
extern bool IsMultiShardModifyPlan(struct DistributedPlan *distributedPlan);
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);
extern char * GenerateResultId(uint64 planId, uint32 subPlanId);
extern int GetRTEIdentity(RangeTblEntry *rte);

View File

@ -185,8 +185,7 @@ extern bool SubqueryPushdown;
/* Function declarations for building logical plans */
extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree,
PlannerRestrictionContext *
plannerRestrictionContext,
ParamListInfo boundParams);
plannerRestrictionContext);
extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery(
PlannerRestrictionContext *plannerRestrictionContext,
Query *query);

View File

@ -0,0 +1,29 @@
/*-------------------------------------------------------------------------
*
* recursive_planning.h
* General Citus planner code.
*
* Copyright (c) 2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef RECURSIVE_PLANNING_H
#define RECURSIVE_PLANNING_H
#include "distributed/errormessage.h"
#include "distributed/relation_restriction_equivalence.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#include "nodes/relation.h"
extern DeferredErrorMessage * RecursivelyPlanSubqueriesAndCTEs(Query *query,
PlannerRestrictionContext *
plannerRestrictionContext,
uint64 planId,
List **subPlanList);
extern char * GenerateResultId(uint64 planId, uint32 subPlanId);
#endif /* RECURSIVE_PLANNING_H */

View File

@ -0,0 +1,21 @@
/*-------------------------------------------------------------------------
*
* subplan_execution.h
*
* Functions for execution subplans.
*
* Copyright (c) 2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef SUBPLAN_EXECUTION_H
#define SUBPLAN_EXECUTION_H
#include "distributed/multi_physical_planner.h"
extern void ExecuteSubPlans(DistributedPlan *distributedPlan);
#endif /* SUBPLAN_EXECUTION_H */