From 0d5575595125deb45562cb5aa3551042f3c04bf8 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Wed, 8 Jan 2020 16:01:38 +0100 Subject: [PATCH] recreate query struct for sub paths --- .../distributed/planner/distributed_planner.c | 42 +++++++++++++++---- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 261381175..682b56cd1 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -25,6 +25,7 @@ #include "distributed/citus_nodes.h" #include "distributed/citus_ruleutils.h" #include "distributed/cte_inline.h" +#include "distributed/deparse_shard_query.h" #include "distributed/function_call_delegation.h" #include "distributed/insert_select_planner.h" #include "distributed/intermediate_result_pruning.h" @@ -127,6 +128,7 @@ static RTEListProperties * GetRTEListProperties(List *rangeTableList); static List * TranslatedVars(PlannerInfo *root, int relationIndex); + /* Distributed planner hook */ PlannedStmt * distributed_planner(Query *parse, @@ -1747,6 +1749,21 @@ multi_join_restriction_hook(PlannerInfo *root, } +static Query * +GetQueryFromPath(List *tlist, RangeTblEntry *rte) +{ + Query *q = makeNode(Query); + q->commandType = CMD_SELECT; + q->rtable = list_make1(rte); + q->targetList = tlist; + q->jointree = makeNode(FromExpr); + RangeTblRef *rr = makeNode(RangeTblRef); + rr->rtindex = 1; + q->jointree->fromlist = list_make1(rr); + return q; +} + + static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, @@ -1762,12 +1779,23 @@ CreateDistributedUnionPlan(PlannerInfo *root, List* shardIntervalList = LoadShardIntervalList(rte->relid); ShardInterval *shardInterval = NULL; + Query *q = GetQueryFromPath(tlist, rte); + int i = 0; foreach_ptr(shardInterval, shardIntervalList) { - StringInfo sqlQueryString = makeStringInfo(); - appendStringInfo(sqlQueryString, "SELECT a,b FROM t1_%lu", shardInterval->shardId); - Task *sqlTask = CreateBasicTask(workerJob->jobId, i, SELECT_TASK, sqlQueryString->data); + RelationShard *rs = CitusMakeNode(RelationShard); + rs->relationId = rte->relid; + rs->shardId = shardInterval->shardId; + + Query *qc = copyObject(q); + UpdateRelationToShardNames((Node *) qc, list_make1(rs)); + + StringInfoData buf; + initStringInfo(&buf); + pg_get_query_def(qc, &buf); + + Task *sqlTask = CreateBasicTask(workerJob->jobId, i, SELECT_TASK, buf.data); sqlTask->anchorShardId = shardInterval->shardId; sqlTask->taskPlacementList = FinalizedShardPlacementList(shardInterval->shardId); workerJob->taskList = lappend(workerJob->taskList, sqlTask); @@ -1786,10 +1814,7 @@ CreateDistributedUnionPlan(PlannerInfo *root, plan->methods = &AdaptiveExecutorCustomScanMethods; plan->custom_private = list_make1(distributedPlan); - plan->scan.plan.targetlist = list_make2( - makeTargetEntry((Expr *) makeVar(1, 1, INT4OID, -1, 0, 0), 1, "a", false), - makeTargetEntry((Expr *) makeVar(1, 2, INT4OID, -1, 0, 0), 2, "b", false) - ); + plan->scan.plan.targetlist = tlist; return (Plan *) plan; } @@ -1818,7 +1843,6 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte) distUnion->path.pathtarget = originalPath->pathtarget; distUnion->path.param_info = originalPath->param_info; - /* TODO use a better cost model */ distUnion->path.rows = originalPath->rows; distUnion->path.startup_cost = originalPath->startup_cost; @@ -1826,7 +1850,7 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte) distUnion->methods = &distributedUnionMethods; - distUnion->custom_private = list_make1(rte); + distUnion->custom_private = list_make2(rte, originalPath); return distUnion; }