From 2e2b4e81fac48849da50cca66a0e798b327ac455 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 4 Dec 2017 18:50:09 +0100 Subject: [PATCH] Add support for CTEs in distributed queries --- .../distributed/executor/citus_custom_scan.c | 1 + .../executor/intermediate_results.c | 8 +- .../executor/multi_real_time_executor.c | 3 + .../executor/multi_router_executor.c | 3 + .../executor/multi_task_tracker_executor.c | 7 + .../distributed/executor/subplan_execution.c | 55 ++ .../distributed/planner/distributed_planner.c | 338 +++++++-- .../distributed/planner/multi_explain.c | 55 ++ .../planner/multi_logical_planner.c | 107 +-- .../planner/multi_router_planner.c | 10 +- .../distributed/planner/recursive_planning.c | 650 ++++++++++++++++++ src/include/distributed/distributed_planner.h | 4 + .../distributed/multi_logical_planner.h | 3 +- src/include/distributed/recursive_planning.h | 29 + src/include/distributed/subplan_execution.h | 21 + 15 files changed, 1128 insertions(+), 166 deletions(-) create mode 100644 src/backend/distributed/executor/subplan_execution.c create mode 100644 src/backend/distributed/planner/recursive_planning.c create mode 100644 src/include/distributed/recursive_planning.h create mode 100644 src/include/distributed/subplan_execution.h diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 055ac476b..62379fc3a 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -18,6 +18,7 @@ #include "distributed/multi_server_executor.h" #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" +#include "distributed/subplan_execution.h" #include "distributed/worker_protocol.h" #include "executor/executor.h" #include "nodes/makefuncs.h" diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 8c9e17b43..e9ccb849f 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -539,8 +539,14 @@ CreateIntermediateResultsDirectory(void) if (!CreatedResultsDirectory) { makeOK = mkdir(resultDirectory, S_IRWXU); - if (makeOK != 0 && errno != EEXIST) + if (makeOK != 0) { + if (errno == EEXIST) + { + /* someone else beat us to it, that's ok */ + return resultDirectory; + } + ereport(ERROR, (errcode_for_file_access(), errmsg("could not create intermediate results directory " "\"%s\": %m", diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 4fd4c932e..735f75a34 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -33,6 +33,7 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_server_executor.h" #include "distributed/resource_lock.h" +#include "distributed/subplan_execution.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" #include "storage/fd.h" @@ -1046,6 +1047,8 @@ RealTimeExecScan(CustomScanState *node) LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); PrepareMasterJobDirectory(workerJob); + + ExecuteSubPlans(distributedPlan); MultiRealTimeExecute(workerJob); LoadTuplesIntoTupleStore(scanState, workerJob); diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index f29ea279b..3963b2b93 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -41,6 +41,7 @@ #include "distributed/multi_router_planner.h" #include "distributed/multi_shard_transaction.h" #include "distributed/placement_connection.h" +#include "distributed/subplan_execution.h" #include "distributed/relay_utility.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" @@ -543,6 +544,8 @@ RouterSelectExecScan(CustomScanState *node) /* we are taking locks on partitions of partitioned tables */ LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); + ExecuteSubPlans(distributedPlan); + if (list_length(taskList) > 0) { Task *task = (Task *) linitial(taskList); diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 5d4d7ee0f..3dc4f3111 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -3025,6 +3025,13 @@ TaskTrackerExecScan(CustomScanState *node) { DistributedPlan *distributedPlan = scanState->distributedPlan; Job *workerJob = distributedPlan->workerJob; + Query *jobQuery = workerJob->jobQuery; + + if (ContainsReadIntermediateResultFunction((Node *) jobQuery)) + { + ereport(ERROR, (errmsg("Complex subqueries and CTEs are not supported when " + "task_executor_type is set to 'task-tracker'"))); + } /* we are taking locks on partitions of partitioned tables */ LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c new file mode 100644 index 000000000..e61820d46 --- /dev/null +++ b/src/backend/distributed/executor/subplan_execution.c @@ -0,0 +1,55 @@ +/*------------------------------------------------------------------------- + * + * subplan_execution.c + * + * Functions for execution subplans prior to distributed table execution. + * + * Copyright (c) 2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/intermediate_results.h" +#include "distributed/multi_executor.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/recursive_planning.h" +#include "distributed/subplan_execution.h" +#include "distributed/worker_manager.h" +#include "executor/executor.h" + + +/* + * ExecuteSubPlans executes a list of subplans from a distributed plan + * by sequentially executing each plan from the top. + */ +void +ExecuteSubPlans(DistributedPlan *distributedPlan) +{ + uint64 planId = distributedPlan->planId; + List *subPlanList = distributedPlan->subPlanList; + ListCell *subPlanCell = NULL; + List *nodeList = ActiveReadableNodeList(); + bool writeLocalFile = false; + + foreach(subPlanCell, subPlanList) + { + DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell); + PlannedStmt *plannedStmt = subPlan->plan; + uint32 subPlanId = subPlan->subPlanId; + DestReceiver *copyDest = NULL; + ParamListInfo params = NULL; + EState *estate = NULL; + + char *resultId = GenerateResultId(planId, subPlanId); + + estate = CreateExecutorState(); + copyDest = (DestReceiver *) CreateRemoteFileDestReceiver(resultId, estate, + nodeList, + writeLocalFile); + + ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest); + + FreeExecutorState(estate); + } +} diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index b5e694918..a4c2f4034 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -26,26 +26,37 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_master_planner.h" #include "distributed/multi_router_planner.h" +#include "distributed/recursive_planning.h" #include "executor/executor.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "parser/parsetree.h" #include "optimizer/pathnode.h" #include "optimizer/planner.h" +#include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/memutils.h" static List *plannerRestrictionContextList = NIL; int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log level */ +static uint64 NextPlanId = 1; /* local function forward declarations */ static bool NeedsDistributedPlanningWalker(Node *node, void *context); -static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, - Query *query, ParamListInfo boundParams, +static PlannedStmt * CreateDistributedPlan(uint64 planId, PlannedStmt *localPlan, + Query *originalQuery, Query *query, + ParamListInfo boundParams, PlannerRestrictionContext * plannerRestrictionContext); +static DistributedPlan * CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, + Query *query, + ParamListInfo boundParams, + bool hasUnresolvedParams, + PlannerRestrictionContext * + plannerRestrictionContext); +static Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams); static void AssignRTEIdentities(Query *queryTree); static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier); @@ -76,6 +87,11 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) PlannerRestrictionContext *plannerRestrictionContext = NULL; bool setPartitionedTablesInherited = false; + if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED) + { + needsDistributedPlanning = true; + } + if (needsDistributedPlanning) { /* @@ -121,7 +137,9 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) if (needsDistributedPlanning) { - result = CreateDistributedPlan(result, originalQuery, parse, + uint64 planId = NextPlanId++; + + result = CreateDistributedPlan(planId, result, originalQuery, parse, boundParams, plannerRestrictionContext); } } @@ -336,7 +354,6 @@ static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier) { Assert(rangeTableEntry->rtekind == RTE_RELATION); - Assert(rangeTableEntry->values_lists == NIL); rangeTableEntry->values_lists = list_make1_int(rteIdentifier); } @@ -447,8 +464,8 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan) * query into a distributed plan. */ static PlannedStmt * -CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query, - ParamListInfo boundParams, +CreateDistributedPlan(uint64 planId, PlannedStmt *localPlan, Query *originalQuery, + Query *query, ParamListInfo boundParams, PlannerRestrictionContext *plannerRestrictionContext) { DistributedPlan *distributedPlan = NULL; @@ -480,56 +497,10 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query } else { - /* - * For select queries we, if router executor is enabled, first try to - * plan the query as a router query. If not supported, otherwise try - * the full blown plan/optimize/physical planing process needed to - * produce distributed query plans. - */ - if (EnableRouterExecution) - { - RelationRestrictionContext *relationRestrictionContext = - plannerRestrictionContext->relationRestrictionContext; - - distributedPlan = CreateRouterPlan(originalQuery, query, - relationRestrictionContext); - - /* for debugging it's useful to display why query was not router plannable */ - if (distributedPlan && distributedPlan->planningError) - { - RaiseDeferredError(distributedPlan->planningError, DEBUG1); - } - } - - /* - * Router didn't yield a plan, try the full distributed planner. As - * real-time/task-tracker don't support prepared statement parameters, - * skip planning in that case (we'll later trigger an error in that - * case if necessary). - */ - if ((!distributedPlan || distributedPlan->planningError) && !hasUnresolvedParams) - { - MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query, - plannerRestrictionContext, - boundParams); - MultiLogicalPlanOptimize(logicalPlan); - - /* - * This check is here to make it likely that all node types used in - * Citus are dumpable. Explain can dump logical and physical plans - * using the extended outfuncs infrastructure, but it's infeasible to - * test most plans. MultiQueryContainerNode always serializes the - * physical plan, so there's no need to check that separately. - */ - CheckNodeIsDumpable((Node *) logicalPlan); - - /* Create the physical plan */ - distributedPlan = CreatePhysicalDistributedPlan(logicalPlan, - plannerRestrictionContext); - - /* distributed plan currently should always succeed or error out */ - Assert(distributedPlan && distributedPlan->planningError == NULL); - } + distributedPlan = + CreateDistributedSelectPlan(planId, originalQuery, query, boundParams, + hasUnresolvedParams, + plannerRestrictionContext); } /* @@ -571,6 +542,9 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query RaiseDeferredError(distributedPlan->planningError, ERROR); } + /* remember the plan's identifier for identifying subplans */ + distributedPlan->planId = planId; + /* create final plan by combining local plan with distributed plan */ resultPlan = FinalizePlan(localPlan, distributedPlan); @@ -593,6 +567,258 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query } +/* + * CreateDistributedSelectPlan generates a distributed plan for a SELECT query. + * It goes through 3 steps: + * + * 1. Try router planner + * 2. Generate subplans for CTEs and complex subqueries + * - If any, go back to step 1 by calling itself recursively + * 3. Logical planner + */ +static DistributedPlan * +CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query, + ParamListInfo boundParams, bool hasUnresolvedParams, + PlannerRestrictionContext *plannerRestrictionContext) +{ + RelationRestrictionContext *relationRestrictionContext = + plannerRestrictionContext->relationRestrictionContext; + + DistributedPlan *distributedPlan = NULL; + MultiTreeRoot *logicalPlan = NULL; + DeferredErrorMessage *error = NULL; + List *subPlanList = NIL; + + /* + * For select queries we, if router executor is enabled, first try to + * plan the query as a router query. If not supported, otherwise try + * the full blown plan/optimize/physical planing process needed to + * produce distributed query plans. + */ + + distributedPlan = CreateRouterPlan(originalQuery, query, + relationRestrictionContext); + if (distributedPlan != NULL) + { + if (distributedPlan->planningError == NULL) + { + /* successfully created a router plan */ + return distributedPlan; + } + else + { + /* + * For debugging it's useful to display why query was not + * router plannable. + */ + RaiseDeferredError(distributedPlan->planningError, DEBUG1); + } + } + + if (hasUnresolvedParams) + { + /* + * There are parameters that don't have a value in boundParams. + * + * The remainder of the planning logic cannot handle unbound + * parameters. We return a NULL plan, which will have an + * extremely high cost, such that postgres will replan with + * bound parameters. + */ + return NULL; + } + + /* + * If there are parameters that do have a value in boundParams, replace + * them in the original query. This allows us to more easily cut the + * query into pieces (during recursive planning) or deparse parts of + * the query (during subquery pushdown planning). + */ + originalQuery = (Query *) ResolveExternalParams((Node *) originalQuery, + boundParams); + + /* + * Plan subqueries and CTEs that cannot be pushed down by recursively + * calling the planner and add the resulting plans to subPlanList. + */ + error = RecursivelyPlanSubqueriesAndCTEs(originalQuery, plannerRestrictionContext, + planId, &subPlanList); + if (error != NULL) + { + RaiseDeferredError(error, ERROR); + } + + /* + * If subqueries were recursively planned then we need to replan the query + * to get the new planner restriction context and apply planner transformations. + * + * We could simplify this code if the logical planner was capable of dealing + * with an original query. In that case, we would only have to filter the + * planner restriction context. + */ + if (list_length(subPlanList) > 0) + { + Query *newQuery = copyObject(originalQuery); + bool setPartitionedTablesInherited = false; + + /* remove the pre-transformation planner restrictions context */ + PopPlannerRestrictionContext(); + + /* create a fresh new planner context */ + plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(); + + /* + * We force standard_planner to treat partitioned tables as regular tables + * by clearing the inh flag on RTEs. We already did this at the start of + * distributed_planner, but on a copy of the original query, so we need + * to do it again here. + */ + AdjustPartitioningForDistributedPlanning(newQuery, setPartitionedTablesInherited); + + /* + * Some relations may have been removed from the query, but we can skip + * AssignRTEIdentities since we currently do not rely on RTE identities + * being contiguous. + */ + + standard_planner(newQuery, 0, boundParams); + + /* overwrite the old transformed query with the new transformed query */ + memcpy(query, newQuery, sizeof(Query)); + + /* recurse into CreateDistributedSelectPlan with subqueries/CTEs replaced */ + distributedPlan = CreateDistributedSelectPlan(planId, originalQuery, query, NULL, + false, plannerRestrictionContext); + distributedPlan->subPlanList = subPlanList; + + return distributedPlan; + } + + /* + * CTEs are stripped from the original query by RecursivelyPlanSubqueriesAndCTEs. + * If we get here and there are still CTEs that means that none of the CTEs are + * referenced. We therefore also strip the CTEs from the rewritten query. + */ + query->cteList = NIL; + Assert(originalQuery->cteList == NIL); + + logicalPlan = MultiLogicalPlanCreate(originalQuery, query, + plannerRestrictionContext); + MultiLogicalPlanOptimize(logicalPlan); + + /* + * This check is here to make it likely that all node types used in + * Citus are dumpable. Explain can dump logical and physical plans + * using the extended outfuncs infrastructure, but it's infeasible to + * test most plans. MultiQueryContainerNode always serializes the + * physical plan, so there's no need to check that separately + */ + CheckNodeIsDumpable((Node *) logicalPlan); + + /* Create the physical plan */ + distributedPlan = CreatePhysicalDistributedPlan(logicalPlan, + plannerRestrictionContext); + + /* distributed plan currently should always succeed or error out */ + Assert(distributedPlan && distributedPlan->planningError == NULL); + + return distributedPlan; +} + + +/* + * ResolveExternalParams replaces the external parameters that appears + * in the query with the corresponding entries in the boundParams. + * + * Note that this function is inspired by eval_const_expr() on Postgres. + * We cannot use that function because it requires access to PlannerInfo. + */ +static Node * +ResolveExternalParams(Node *inputNode, ParamListInfo boundParams) +{ + /* consider resolving external parameters only when boundParams exists */ + if (!boundParams) + { + return inputNode; + } + + if (inputNode == NULL) + { + return NULL; + } + + if (IsA(inputNode, Param)) + { + Param *paramToProcess = (Param *) inputNode; + ParamExternData *correspondingParameterData = NULL; + int numberOfParameters = boundParams->numParams; + int parameterId = paramToProcess->paramid; + int16 typeLength = 0; + bool typeByValue = false; + Datum constValue = 0; + bool paramIsNull = false; + int parameterIndex = 0; + + if (paramToProcess->paramkind != PARAM_EXTERN) + { + return inputNode; + } + + if (parameterId < 0) + { + return inputNode; + } + + /* parameterId starts from 1 */ + parameterIndex = parameterId - 1; + if (parameterIndex >= numberOfParameters) + { + return inputNode; + } + + correspondingParameterData = &boundParams->params[parameterIndex]; + + if (!(correspondingParameterData->pflags & PARAM_FLAG_CONST)) + { + return inputNode; + } + + get_typlenbyval(paramToProcess->paramtype, &typeLength, &typeByValue); + + paramIsNull = correspondingParameterData->isnull; + if (paramIsNull) + { + constValue = 0; + } + else if (typeByValue) + { + constValue = correspondingParameterData->value; + } + else + { + /* + * Out of paranoia ensure that datum lives long enough, + * although bind params currently should always live + * long enough. + */ + constValue = datumCopy(correspondingParameterData->value, typeByValue, + typeLength); + } + + return (Node *) makeConst(paramToProcess->paramtype, paramToProcess->paramtypmod, + paramToProcess->paramcollid, typeLength, constValue, + paramIsNull, typeByValue); + } + else if (IsA(inputNode, Query)) + { + return (Node *) query_tree_mutator((Query *) inputNode, ResolveExternalParams, + boundParams, 0); + } + + return expression_tree_mutator(inputNode, ResolveExternalParams, boundParams); +} + + /* * GetDistributedPlan returns the associated DistributedPlan for a CustomScan. */ diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index c5bc8d49f..25523b5eb 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -72,6 +72,7 @@ typedef struct RemoteExplainPlan /* Explain functions for distributed queries */ +static void ExplainSubPlans(List *subPlanList, ExplainState *es); static void ExplainJob(Job *job, ExplainState *es); static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es); static void ExplainTaskList(List *taskList, ExplainState *es); @@ -124,6 +125,11 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es ExplainOpenGroup("Distributed Query", "Distributed Query", true, es); + if (distributedPlan->subPlanList != NIL) + { + ExplainSubPlans(distributedPlan->subPlanList, es); + } + ExplainJob(distributedPlan->workerJob, es); ExplainCloseGroup("Distributed Query", "Distributed Query", true, es); @@ -166,6 +172,55 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, } +/* + * ExplainSubPlans generates EXPLAIN output for subplans for CTEs + * and complex subqueries. Because the planning for these queries + * is done along with the top-level plan, we cannot determine the + * planning time and set it to 0. + */ +static void +ExplainSubPlans(List *subPlanList, ExplainState *es) +{ + ListCell *subPlanCell = NULL; + + ExplainOpenGroup("Subplans", "Subplans", false, es); + + if (es->format == EXPLAIN_FORMAT_TEXT) + { + appendStringInfoSpaces(es->str, es->indent * 2); + appendStringInfo(es->str, "-> Distributed Subplan\n"); + es->indent += 3; + } + + foreach(subPlanCell, subPlanList) + { + DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell); + PlannedStmt *plan = subPlan->plan; + IntoClause *into = NULL; + ParamListInfo params = NULL; + char *queryString = NULL; + instr_time planduration; + + /* set the planning time to 0 */ + INSTR_TIME_SET_CURRENT(planduration); + INSTR_TIME_SUBTRACT(planduration, planduration); + +#if (PG_VERSION_NUM >= 100000) + ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration); +#else + ExplainOnePlan(plan, into, es, queryString, params, &planduration); +#endif + } + + if (es->format == EXPLAIN_FORMAT_TEXT) + { + es->indent -= 3; + } + + ExplainCloseGroup("Subplans", "Subplans", false, es); +} + + /* * ExplainJob shows the EXPLAIN output for a Job in the physical plan of * a distributed query by showing the remote EXPLAIN for the first task, diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index f149d1e53..e4a8054f0 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -173,7 +173,6 @@ static MultiNode * ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNo static bool ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery); static bool IsFunctionRTE(Node *node); static bool FindNodeCheck(Node *node, bool (*check)(Node *)); -static Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams); static MultiNode * SubqueryMultiNodeTree(Query *originalQuery, Query *queryTree, PlannerRestrictionContext * @@ -194,12 +193,6 @@ static MultiTable * MultiSubqueryPushdownTable(Query *subquery); * plan and adds a root node to top of it. The original query is only used for subquery * pushdown planning. * - * In order to support external parameters for the queries where planning - * is done on the original query, we need to replace the external parameters - * manually. To achive that for subquery pushdown planning, we pass boundParams - * to this function. We need to do that since Citus currently unable to send - * parameters to the workers on the execution. - * * We also pass queryTree and plannerRestrictionContext to the planner. They * are primarily used to decide whether the subquery is safe to pushdown. * If not, it helps to produce meaningful error messages for subquery @@ -207,16 +200,13 @@ static MultiTable * MultiSubqueryPushdownTable(Query *subquery); */ MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree, - PlannerRestrictionContext *plannerRestrictionContext, - ParamListInfo boundParams) + PlannerRestrictionContext *plannerRestrictionContext) { MultiNode *multiQueryNode = NULL; MultiTreeRoot *rootNode = NULL; if (ShouldUseSubqueryPushDown(originalQuery, queryTree)) { - originalQuery = (Query *) ResolveExternalParams((Node *) originalQuery, - boundParams); multiQueryNode = SubqueryMultiNodeTree(originalQuery, queryTree, plannerRestrictionContext); } @@ -328,99 +318,6 @@ FindNodeCheck(Node *node, bool (*check)(Node *)) } -/* - * ResolveExternalParams replaces the external parameters that appears - * in the query with the corresponding entries in the boundParams. - * - * Note that this function is inspired by eval_const_expr() on Postgres. - * We cannot use that function because it requires access to PlannerInfo. - */ -static Node * -ResolveExternalParams(Node *inputNode, ParamListInfo boundParams) -{ - /* consider resolving external parameters only when boundParams exists */ - if (!boundParams) - { - return inputNode; - } - - if (inputNode == NULL) - { - return NULL; - } - - if (IsA(inputNode, Param)) - { - Param *paramToProcess = (Param *) inputNode; - ParamExternData *correspondingParameterData = NULL; - int numberOfParameters = boundParams->numParams; - int parameterId = paramToProcess->paramid; - int16 typeLength = 0; - bool typeByValue = false; - Datum constValue = 0; - bool paramIsNull = false; - int parameterIndex = 0; - - if (paramToProcess->paramkind != PARAM_EXTERN) - { - return inputNode; - } - - if (parameterId < 0) - { - return inputNode; - } - - /* parameterId starts from 1 */ - parameterIndex = parameterId - 1; - if (parameterIndex >= numberOfParameters) - { - return inputNode; - } - - correspondingParameterData = &boundParams->params[parameterIndex]; - - if (!(correspondingParameterData->pflags & PARAM_FLAG_CONST)) - { - return inputNode; - } - - get_typlenbyval(paramToProcess->paramtype, &typeLength, &typeByValue); - - paramIsNull = correspondingParameterData->isnull; - if (paramIsNull) - { - constValue = 0; - } - else if (typeByValue) - { - constValue = correspondingParameterData->value; - } - else - { - /* - * Out of paranoia ensure that datum lives long enough, - * although bind params currently should always live - * long enough. - */ - constValue = datumCopy(correspondingParameterData->value, typeByValue, - typeLength); - } - - return (Node *) makeConst(paramToProcess->paramtype, paramToProcess->paramtypmod, - paramToProcess->paramcollid, typeLength, constValue, - paramIsNull, typeByValue); - } - else if (IsA(inputNode, Query)) - { - return (Node *) query_tree_mutator((Query *) inputNode, ResolveExternalParams, - boundParams, 0); - } - - return expression_tree_mutator(inputNode, ResolveExternalParams, boundParams); -} - - /* * SublinkList finds the subquery nodes in the where clause of the given query. Note * that the function should be called on the original query given that postgres @@ -1340,7 +1237,7 @@ DeferErrorIfUnsupportedTableCombination(Query *queryTree) else if (rangeTableEntry->rtekind == RTE_CTE) { unsupportedTableCombination = true; - errorDetail = "CTEs in multi-shard queries are currently unsupported"; + errorDetail = "CTEs in subqueries are currently unsupported"; break; } else if (rangeTableEntry->rtekind == RTE_VALUES) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 932b4b6ec..08230c87d 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -162,8 +162,6 @@ DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query, RelationRestrictionContext *restrictionContext) { - Assert(EnableRouterExecution); - if (MultiRouterPlannableQuery(query, restrictionContext)) { return CreateSingleTaskRouterPlan(originalQuery, query, @@ -1623,6 +1621,14 @@ SelectsFromDistributedTable(List *rangeTableList) * If the given query is not routable, it fills planningError with the related * DeferredErrorMessage. The caller can check this error message to see if query * is routable or not. + * + * Note: If the query prunes down to 0 shards due to filters (e.g. WHERE false), + * or the query has only read_intermediate_result calls (no relations left after + * recursively planning CTEs and subqueries), then it will be assigned to an + * arbitrary worker node in a round-robin fashion. + * + * Relations that prune down to 0 shards are replaced by subqueries returning + * 0 values in UpdateRelationToShardNames. */ DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext, diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c new file mode 100644 index 000000000..b0f765e71 --- /dev/null +++ b/src/backend/distributed/planner/recursive_planning.c @@ -0,0 +1,650 @@ +/*------------------------------------------------------------------------- + * + * recursive_planning.c + * + * Logic for calling the postgres planner recursively for CTEs and + * non-pushdownable subqueries in distributed queries. + * + * PostgreSQL with Citus can execute 4 types of queries: + * + * - Postgres queries on local tables and functions. + * + * These queries can use all SQL features, but they may not reference + * distributed tables. + * + * - Router queries that can be executed on a single by node by replacing + * table names with shard names. + * + * These queries can use nearly all SQL features, but only if they have + * a single-valued filter on the distribution column. + * + * - Real-time queries that can be executed by performing a task for each + * shard in a distributed table and performing a merge step. + * + * These queries have limited SQL support. They may only include + * subqueries if the subquery can be executed on each shard by replacing + * table names with shard names and concatenating the result. + * + * - Task-tracker queries that can be executed through a tree of + * re-partitioning operations. + * + * These queries have very limited SQL support and only support basic + * inner joins and subqueries without joins. + * + * To work around the limitations of these planners, we recursively call + * the planner for CTEs and unsupported subqueries to obtain a list of + * subplans. + * + * During execution, each subplan is executed separately through the method + * that is appropriate for that query. The results are written to temporary + * files on the workers. In the original query, the CTEs and subqueries are + * replaced by mini-subqueries that read from the temporary files. + * + * This allows almost all SQL to be directly or indirectly supported, + * because if all subqueries that contain distributed tables have been + * replaced then what remains is a router query which can use nearly all + * SQL features. + * + * Copyright (c) 2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "catalog/pg_type.h" +#include "distributed/citus_nodes.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/distributed_planner.h" +#include "distributed/errormessage.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_copy.h" +#include "distributed/multi_logical_planner.h" +#include "distributed/multi_router_planner.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/recursive_planning.h" +#include "distributed/relation_restriction_equivalence.h" +#include "lib/stringinfo.h" +#include "optimizer/planner.h" +#include "nodes/nodeFuncs.h" +#include "nodes/nodes.h" +#include "nodes/pg_list.h" +#include "nodes/primnodes.h" +#include "utils/builtins.h" +#include "utils/guc.h" + + +/* + * RecursivePlanningContext is used to recursively plan subqueries + * and CTEs, pull results to the coordinator, and push it back into + * the workers. + */ +typedef struct RecursivePlanningContext +{ + int level; + uint64 planId; + List *subPlanList; + PlannerRestrictionContext *plannerRestrictionContext; +} RecursivePlanningContext; + +/* + * CteReferenceWalkerContext is used to collect CTE references in + * CteReferenceListWalker. + */ +typedef struct CteReferenceWalkerContext +{ + int level; + List *cteReferenceList; +} CteReferenceWalkerContext; + +/* + * VarLevelsUpWalkerContext is used to find Vars in a (sub)query that + * refer to upper levels and therefore cannot be planned separately. + */ +typedef struct VarLevelsUpWalkerContext +{ + int level; +} VarLevelsUpWalkerContext; + + +/* local function forward declarations */ +static DeferredErrorMessage * RecursivelyPlanCTEs(Query *query, + RecursivePlanningContext *context); +static DistributedSubPlan * CreateDistributedSubPlan(uint32 subPlanId, + Query *subPlanQuery); +static bool CteReferenceListWalker(Node *node, CteReferenceWalkerContext *context); +static bool ContainsReferencesToOuterQuery(Query *query); +static bool ContainsReferencesToOuterQueryWalker(Node *node, + VarLevelsUpWalkerContext *context); +static Query * BuildSubPlanResultQuery(Query *subquery, uint64 planId, uint32 subPlanId); + + +/* + * RecursivelyPlanSubqueriesAndCTEs finds subqueries and CTEs that cannot be pushed down to + * workers directly and instead plans them by recursively calling the planner and + * adding the subplan to subPlanList. + * + * Subplans are executed prior to the distributed plan and the results are written + * to temporary files on workers. + * + * CTE references are replaced by a subquery on the read_intermediate_result + * function, which reads from the temporary file. + * + * If recursive planning results in an error then the error is returned. Otherwise, the + * subplans will be added to subPlanList. + */ +DeferredErrorMessage * +RecursivelyPlanSubqueriesAndCTEs(Query *query, + PlannerRestrictionContext *plannerRestrictionContext, + uint64 planId, List **subPlanList) +{ + DeferredErrorMessage *error = NULL; + RecursivePlanningContext context; + + if (SubqueryPushdown) + { + /* + * When the subquery_pushdown flag is enabled we make some hacks + * to push down subqueries with LIMIT. Recursive planning would + * valiantly do the right thing and try to recursively plan the + * inner subqueries, but we don't really want it to because those + * subqueries might not be supported and would be much slower. + * + * Instead, we skip recursive planning altogether when + * subquery_pushdown is enabled. + */ + return NULL; + } + + context.level = 0; + context.planId = planId; + context.subPlanList = NIL; + context.plannerRestrictionContext = plannerRestrictionContext; + + error = RecursivelyPlanCTEs(query, &context); + if (error != NULL) + { + return error; + } + + /* XXX: plan subqueries */ + + *subPlanList = context.subPlanList; + + return NULL; +} + + +/* + * RecursivelyPlanCTEs plans all CTEs in the query by recursively calling the planner + * The resulting plan is added to planningContext->subPlanList and CTE references + * are replaced by subqueries that call read_intermediate_result, which reads the + * intermediate result of the CTE after it is executed. + * + * Recursive and modifying CTEs are not yet supported and return an error. + */ +static DeferredErrorMessage * +RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext) +{ + ListCell *cteCell = NULL; + CteReferenceWalkerContext context = { -1, NIL }; + + if (query->cteList == NIL) + { + /* no CTEs, nothing to do */ + return NULL; + } + + if (query->hasModifyingCTE) + { + /* we could easily support these, but it's a little scary */ + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "data-modifying statements are not supported in " + "the WITH clauses of distributed queries", + NULL, NULL); + } + + if (query->hasRecursive) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "recursive CTEs are not supported in distributed " + "queries", + NULL, NULL); + } + + /* get all RTE_CTEs that point to CTEs from cteList */ + CteReferenceListWalker((Node *) query, &context); + + foreach(cteCell, query->cteList) + { + CommonTableExpr *cte = (CommonTableExpr *) lfirst(cteCell); + char *cteName = cte->ctename; + Query *subquery = (Query *) cte->ctequery; + uint64 planId = planningContext->planId; + uint32 subPlanId = 0; + Query *resultQuery = NULL; + DistributedSubPlan *subPlan = NULL; + ListCell *rteCell = NULL; + int replacedCtesCount = 0; + + if (ContainsReferencesToOuterQuery(subquery)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "CTEs that refer to other subqueries are not " + "supported in multi-shard queries", + NULL, NULL); + } + + if (cte->cterefcount == 0) + { + /* + * CTEs that aren't referenced aren't executed in postgres. We + * don't need to generate a subplan for it and can take the rest + * of this iteration off. + */ + continue; + } + + subPlanId = list_length(planningContext->subPlanList) + 1; + + if (log_min_messages >= DEBUG1) + { + StringInfo subPlanString = makeStringInfo(); + pg_get_query_def(subquery, subPlanString); + ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT "_%u for " + "CTE %s: %s", + planId, subPlanId, cteName, subPlanString->data))); + } + + /* build a sub plan for the CTE */ + subPlan = CreateDistributedSubPlan(subPlanId, subquery); + planningContext->subPlanList = lappend(planningContext->subPlanList, subPlan); + + /* replace references to the CTE with a subquery that reads results */ + resultQuery = BuildSubPlanResultQuery(subquery, planId, subPlanId); + + foreach(rteCell, context.cteReferenceList) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rteCell); + + if (rangeTableEntry->rtekind != RTE_CTE) + { + /* + * This RTE pointed to a preceding CTE that was already replaced + * by a subplan. + */ + continue; + } + + if (strncmp(rangeTableEntry->ctename, cteName, NAMEDATALEN) == 0) + { + /* change the RTE_CTE into an RTE_SUBQUERY */ + rangeTableEntry->rtekind = RTE_SUBQUERY; + rangeTableEntry->ctename = NULL; + rangeTableEntry->ctelevelsup = 0; + + if (replacedCtesCount == 0) + { + /* + * Replace the first CTE reference with the result query directly. + */ + rangeTableEntry->subquery = resultQuery; + } + else + { + /* + * Replace subsequent CTE references with a copy of the result + * query. + */ + rangeTableEntry->subquery = copyObject(resultQuery); + } + + replacedCtesCount++; + } + } + + Assert(cte->cterefcount == replacedCtesCount); + } + + /* + * All CTEs are now executed through subplans and RTE_CTEs pointing + * to the CTE list have been replaced with subqueries. We can now + * clear the cteList. + */ + query->cteList = NIL; + + return NULL; +} + + +/* + * CreateDistributedSubPlan creates a distributed subplan by recursively calling + * the planner from the top, which may either generate a local plan or another + * distributed plan, which can itself contain subplans. + */ +static DistributedSubPlan * +CreateDistributedSubPlan(uint32 subPlanId, Query *subPlanQuery) +{ + DistributedSubPlan *subPlan = NULL; + int cursorOptions = 0; + + if (ContainsReadIntermediateResultFunction((Node *) subPlanQuery)) + { + /* + * Make sure we go through distributed planning if there are + * read_intermediate_result calls, even if there are no distributed + * tables in the query anymore. + * + * We cannot perform this check in the planner itself, since that + * would also cause the workers to attempt distributed planning. + */ + cursorOptions |= CURSOR_OPT_FORCE_DISTRIBUTED; + } + + subPlan = CitusMakeNode(DistributedSubPlan); + subPlan->plan = planner(subPlanQuery, cursorOptions, NULL); + subPlan->subPlanId = subPlanId; + + return subPlan; +} + + +/* + * CteReferenceListWalker finds all references to CTEs in the top level of a query + * and adds them to context->cteReferenceList. + */ +static bool +CteReferenceListWalker(Node *node, CteReferenceWalkerContext *context) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, RangeTblEntry)) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node; + + if (rangeTableEntry->rtekind == RTE_CTE && + rangeTableEntry->ctelevelsup == context->level) + { + context->cteReferenceList = lappend(context->cteReferenceList, + rangeTableEntry); + } + + /* caller will descend into range table entry */ + return false; + } + else if (IsA(node, Query)) + { + Query *query = (Query *) node; + + context->level += 1; + query_tree_walker(query, CteReferenceListWalker, context, QTW_EXAMINE_RTES); + context->level -= 1; + + return false; + } + else + { + return expression_tree_walker(node, CteReferenceListWalker, context); + } +} + + +/* + * ContainsReferencesToOuterQuery determines whether the given query contains + * any Vars that point outside of the query itself. Such queries cannot be + * planned recursively. + */ +static bool +ContainsReferencesToOuterQuery(Query *query) +{ + VarLevelsUpWalkerContext context = { 0 }; + int flags = 0; + + return query_tree_walker(query, ContainsReferencesToOuterQueryWalker, + &context, flags); +} + + +/* + * ContainsReferencesToOuterQueryWalker determines whether the given query + * contains any Vars that point more than context->level levels up. + * + * ContainsReferencesToOuterQueryWalker recursively descends into subqueries + * and increases the level by 1 before recursing. + */ +static bool +ContainsReferencesToOuterQueryWalker(Node *node, VarLevelsUpWalkerContext *context) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, Var)) + { + if (((Var *) node)->varlevelsup > context->level) + { + return true; + } + + return false; + } + else if (IsA(node, Aggref)) + { + if (((Aggref *) node)->agglevelsup > context->level) + { + return true; + } + } + else if (IsA(node, GroupingFunc)) + { + if (((GroupingFunc *) node)->agglevelsup > context->level) + { + return true; + } + + return false; + } + else if (IsA(node, PlaceHolderVar)) + { + if (((PlaceHolderVar *) node)->phlevelsup > context->level) + { + return true; + } + } + else if (IsA(node, Query)) + { + Query *query = (Query *) node; + bool found = false; + int flags = 0; + + context->level += 1; + found = query_tree_walker(query, ContainsReferencesToOuterQueryWalker, + context, flags); + context->level -= 1; + + return found; + } + + return expression_tree_walker(node, ContainsReferencesToOuterQueryWalker, + context); +} + + +/* + * BuildSubPlanResultQuery returns a query of the form: + * + * SELECT + * + * FROM + * read_intermediate_result('_', ') + * AS res (); + * + * The target list and column definition list are derived from the given subquery. + * + * If any of the types in the target list cannot be used in the binary copy format, + * then the copy format 'text' is used, otherwise 'binary' is used. + */ +static Query * +BuildSubPlanResultQuery(Query *subquery, uint64 planId, uint32 subPlanId) +{ + Query *resultQuery = NULL; + char *resultIdString = NULL; + Const *resultIdConst = NULL; + Const *resultFormatConst = NULL; + FuncExpr *funcExpr = NULL; + Alias *funcAlias = NULL; + List *funcColNames = NIL; + List *funcColTypes = NIL; + List *funcColTypMods = NIL; + List *funcColCollations = NIL; + RangeTblFunction *rangeTableFunction = NULL; + RangeTblEntry *rangeTableEntry = NULL; + RangeTblRef *rangeTableRef = NULL; + FromExpr *joinTree = NULL; + ListCell *targetEntryCell = NULL; + List *targetList = NIL; + int columnNumber = 1; + bool useBinaryCopyFormat = true; + Oid copyFormatId = BinaryCopyFormatId(); + + /* build the target list and column definition list */ + foreach(targetEntryCell, subquery->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + Node *targetExpr = (Node *) targetEntry->expr; + char *columnName = targetEntry->resname; + Oid columnType = exprType(targetExpr); + Oid columnTypMod = exprTypmod(targetExpr); + Oid columnCollation = exprCollation(targetExpr); + Var *functionColumnVar = NULL; + TargetEntry *newTargetEntry = NULL; + + if (targetEntry->resjunk) + { + continue; + } + + funcColNames = lappend(funcColNames, makeString(columnName)); + funcColTypes = lappend_int(funcColTypes, columnType); + funcColTypMods = lappend_int(funcColTypMods, columnTypMod); + funcColCollations = lappend_int(funcColCollations, columnCollation); + + functionColumnVar = makeNode(Var); + functionColumnVar->varno = 1; + functionColumnVar->varattno = columnNumber; + functionColumnVar->vartype = columnType; + functionColumnVar->vartypmod = columnTypMod; + functionColumnVar->varcollid = columnCollation; + functionColumnVar->varlevelsup = 0; + functionColumnVar->varnoold = 1; + functionColumnVar->varoattno = columnNumber; + functionColumnVar->location = -1; + + newTargetEntry = makeNode(TargetEntry); + newTargetEntry->expr = (Expr *) functionColumnVar; + newTargetEntry->resno = columnNumber; + newTargetEntry->resname = columnName; + newTargetEntry->resjunk = false; + + targetList = lappend(targetList, newTargetEntry); + + if (useBinaryCopyFormat && !CanUseBinaryCopyFormatForType(columnType)) + { + useBinaryCopyFormat = false; + } + + columnNumber++; + } + + /* build the result_id parameter for the call to read_intermediate_result */ + resultIdString = GenerateResultId(planId, subPlanId); + + resultIdConst = makeNode(Const); + resultIdConst->consttype = TEXTOID; + resultIdConst->consttypmod = -1; + resultIdConst->constlen = -1; + resultIdConst->constvalue = CStringGetTextDatum(resultIdString); + resultIdConst->constbyval = false; + resultIdConst->constisnull = false; + resultIdConst->location = -1; + + /* build the citus_copy_format parameter for the call to read_intermediate_result */ + if (!useBinaryCopyFormat) + { + copyFormatId = TextCopyFormatId(); + } + + resultFormatConst = makeNode(Const); + resultFormatConst->consttype = CitusCopyFormatTypeId(); + resultFormatConst->consttypmod = -1; + resultFormatConst->constlen = 4; + resultFormatConst->constvalue = ObjectIdGetDatum(copyFormatId); + resultFormatConst->constbyval = true; + resultFormatConst->constisnull = false; + resultFormatConst->location = -1; + + /* build the call to read_intermediate_result */ + funcExpr = makeNode(FuncExpr); + funcExpr->funcid = CitusReadIntermediateResultFuncId(); + funcExpr->funcretset = true; + funcExpr->funcvariadic = false; + funcExpr->funcformat = 0; + funcExpr->funccollid = 0; + funcExpr->inputcollid = 0; + funcExpr->location = -1; + funcExpr->args = list_make2(resultIdConst, resultFormatConst); + + /* build the RTE for the call to read_intermediate_result */ + rangeTableFunction = makeNode(RangeTblFunction); + rangeTableFunction->funccolcount = list_length(funcColNames); + rangeTableFunction->funccolnames = funcColNames; + rangeTableFunction->funccoltypes = funcColTypes; + rangeTableFunction->funccoltypmods = funcColTypMods; + rangeTableFunction->funccolcollations = funcColCollations; + rangeTableFunction->funcparams = NULL; + rangeTableFunction->funcexpr = (Node *) funcExpr; + + funcAlias = makeNode(Alias); + funcAlias->aliasname = "intermediate_result"; + funcAlias->colnames = funcColNames; + + rangeTableEntry = makeNode(RangeTblEntry); + rangeTableEntry->rtekind = RTE_FUNCTION; + rangeTableEntry->functions = list_make1(rangeTableFunction); + rangeTableEntry->inFromCl = true; + rangeTableEntry->eref = funcAlias; + + /* build the join tree using the read_intermediate_result RTE */ + rangeTableRef = makeNode(RangeTblRef); + rangeTableRef->rtindex = 1; + + joinTree = makeNode(FromExpr); + joinTree->fromlist = list_make1(rangeTableRef); + + /* build the SELECT query */ + resultQuery = makeNode(Query); + resultQuery->commandType = CMD_SELECT; + resultQuery->rtable = list_make1(rangeTableEntry); + resultQuery->jointree = joinTree; + resultQuery->targetList = targetList; + + return resultQuery; +} + + +/* + * GenerateResultId generates the result ID that is used to identify an intermediate + * result of the subplan with the given plan ID and subplan ID. + */ +char * +GenerateResultId(uint64 planId, uint32 subPlanId) +{ + StringInfo resultId = makeStringInfo(); + + appendStringInfo(resultId, UINT64_FORMAT "_%u", planId, subPlanId); + + return resultId->data; +} diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 07d0aad18..996b3dd52 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -14,6 +14,7 @@ #include "nodes/relation.h" #include "distributed/citus_nodes.h" +#include "distributed/errormessage.h" /* values used by jobs and tasks which do not require identifiers */ @@ -21,6 +22,8 @@ #define INVALID_TASK_ID 0 #define MULTI_TASK_QUERY_INFO_OFF 0 /* do not log multi-task queries */ +#define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000 + typedef struct RelationRestrictionContext { bool hasDistributedRelation; @@ -89,6 +92,7 @@ extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan); extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan); extern bool IsMultiShardModifyPlan(struct DistributedPlan *distributedPlan); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); +extern char * GenerateResultId(uint64 planId, uint32 subPlanId); extern int GetRTEIdentity(RangeTblEntry *rte); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 708355e58..bb6e25309 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -185,8 +185,7 @@ extern bool SubqueryPushdown; /* Function declarations for building logical plans */ extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree, PlannerRestrictionContext * - plannerRestrictionContext, - ParamListInfo boundParams); + plannerRestrictionContext); extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery( PlannerRestrictionContext *plannerRestrictionContext, Query *query); diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h new file mode 100644 index 000000000..689729db1 --- /dev/null +++ b/src/include/distributed/recursive_planning.h @@ -0,0 +1,29 @@ +/*------------------------------------------------------------------------- + * + * recursive_planning.h + * General Citus planner code. + * + * Copyright (c) 2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#ifndef RECURSIVE_PLANNING_H +#define RECURSIVE_PLANNING_H + + +#include "distributed/errormessage.h" +#include "distributed/relation_restriction_equivalence.h" +#include "nodes/pg_list.h" +#include "nodes/primnodes.h" +#include "nodes/relation.h" + + +extern DeferredErrorMessage * RecursivelyPlanSubqueriesAndCTEs(Query *query, + PlannerRestrictionContext * + plannerRestrictionContext, + uint64 planId, + List **subPlanList); +extern char * GenerateResultId(uint64 planId, uint32 subPlanId); + + +#endif /* RECURSIVE_PLANNING_H */ diff --git a/src/include/distributed/subplan_execution.h b/src/include/distributed/subplan_execution.h new file mode 100644 index 000000000..5f5b45114 --- /dev/null +++ b/src/include/distributed/subplan_execution.h @@ -0,0 +1,21 @@ +/*------------------------------------------------------------------------- + * + * subplan_execution.h + * + * Functions for execution subplans. + * + * Copyright (c) 2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#ifndef SUBPLAN_EXECUTION_H +#define SUBPLAN_EXECUTION_H + + +#include "distributed/multi_physical_planner.h" + + +extern void ExecuteSubPlans(DistributedPlan *distributedPlan); + + +#endif /* SUBPLAN_EXECUTION_H */