recreate query struct for sub paths

moonshot/custom-path
Nils Dijk 2020-01-08 16:01:38 +01:00
parent 8f7037da63
commit 0d55755951
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
1 changed files with 33 additions and 9 deletions

View File

@ -25,6 +25,7 @@
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/cte_inline.h" #include "distributed/cte_inline.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/function_call_delegation.h" #include "distributed/function_call_delegation.h"
#include "distributed/insert_select_planner.h" #include "distributed/insert_select_planner.h"
#include "distributed/intermediate_result_pruning.h" #include "distributed/intermediate_result_pruning.h"
@ -127,6 +128,7 @@ static RTEListProperties * GetRTEListProperties(List *rangeTableList);
static List * TranslatedVars(PlannerInfo *root, int relationIndex); static List * TranslatedVars(PlannerInfo *root, int relationIndex);
/* Distributed planner hook */ /* Distributed planner hook */
PlannedStmt * PlannedStmt *
distributed_planner(Query *parse, distributed_planner(Query *parse,
@ -1747,6 +1749,21 @@ multi_join_restriction_hook(PlannerInfo *root,
} }
static Query *
GetQueryFromPath(List *tlist, RangeTblEntry *rte)
{
Query *q = makeNode(Query);
q->commandType = CMD_SELECT;
q->rtable = list_make1(rte);
q->targetList = tlist;
q->jointree = makeNode(FromExpr);
RangeTblRef *rr = makeNode(RangeTblRef);
rr->rtindex = 1;
q->jointree->fromlist = list_make1(rr);
return q;
}
static Plan * static Plan *
CreateDistributedUnionPlan(PlannerInfo *root, CreateDistributedUnionPlan(PlannerInfo *root,
RelOptInfo *rel, RelOptInfo *rel,
@ -1762,12 +1779,23 @@ CreateDistributedUnionPlan(PlannerInfo *root,
List* shardIntervalList = LoadShardIntervalList(rte->relid); List* shardIntervalList = LoadShardIntervalList(rte->relid);
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
Query *q = GetQueryFromPath(tlist, rte);
int i = 0; int i = 0;
foreach_ptr(shardInterval, shardIntervalList) foreach_ptr(shardInterval, shardIntervalList)
{ {
StringInfo sqlQueryString = makeStringInfo(); RelationShard *rs = CitusMakeNode(RelationShard);
appendStringInfo(sqlQueryString, "SELECT a,b FROM t1_%lu", shardInterval->shardId); rs->relationId = rte->relid;
Task *sqlTask = CreateBasicTask(workerJob->jobId, i, SELECT_TASK, sqlQueryString->data); rs->shardId = shardInterval->shardId;
Query *qc = copyObject(q);
UpdateRelationToShardNames((Node *) qc, list_make1(rs));
StringInfoData buf;
initStringInfo(&buf);
pg_get_query_def(qc, &buf);
Task *sqlTask = CreateBasicTask(workerJob->jobId, i, SELECT_TASK, buf.data);
sqlTask->anchorShardId = shardInterval->shardId; sqlTask->anchorShardId = shardInterval->shardId;
sqlTask->taskPlacementList = FinalizedShardPlacementList(shardInterval->shardId); sqlTask->taskPlacementList = FinalizedShardPlacementList(shardInterval->shardId);
workerJob->taskList = lappend(workerJob->taskList, sqlTask); workerJob->taskList = lappend(workerJob->taskList, sqlTask);
@ -1786,10 +1814,7 @@ CreateDistributedUnionPlan(PlannerInfo *root,
plan->methods = &AdaptiveExecutorCustomScanMethods; plan->methods = &AdaptiveExecutorCustomScanMethods;
plan->custom_private = list_make1(distributedPlan); plan->custom_private = list_make1(distributedPlan);
plan->scan.plan.targetlist = list_make2( plan->scan.plan.targetlist = tlist;
makeTargetEntry((Expr *) makeVar(1, 1, INT4OID, -1, 0, 0), 1, "a", false),
makeTargetEntry((Expr *) makeVar(1, 2, INT4OID, -1, 0, 0), 2, "b", false)
);
return (Plan *) plan; return (Plan *) plan;
} }
@ -1818,7 +1843,6 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte)
distUnion->path.pathtarget = originalPath->pathtarget; distUnion->path.pathtarget = originalPath->pathtarget;
distUnion->path.param_info = originalPath->param_info; distUnion->path.param_info = originalPath->param_info;
/* TODO use a better cost model */ /* TODO use a better cost model */
distUnion->path.rows = originalPath->rows; distUnion->path.rows = originalPath->rows;
distUnion->path.startup_cost = originalPath->startup_cost; distUnion->path.startup_cost = originalPath->startup_cost;
@ -1826,7 +1850,7 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte)
distUnion->methods = &distributedUnionMethods; distUnion->methods = &distributedUnionMethods;
distUnion->custom_private = list_make1(rte); distUnion->custom_private = list_make2(rte, originalPath);
return distUnion; return distUnion;
} }