mirror of https://github.com/citusdata/citus.git
beginning of set_rel_pathlist_hook to plan query on distributed table
parent
ab365a335d
commit
8f7037da63
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include <float.h>
|
#include <float.h>
|
||||||
#include <limits.h>
|
#include <limits.h>
|
||||||
|
#include <nodes/pg_list.h>
|
||||||
|
|
||||||
#include "access/htup_details.h"
|
#include "access/htup_details.h"
|
||||||
#include "catalog/pg_class.h"
|
#include "catalog/pg_class.h"
|
||||||
|
@ -70,6 +71,7 @@ static uint64 NextPlanId = 1;
|
||||||
|
|
||||||
/* keep track of planner call stack levels */
|
/* keep track of planner call stack levels */
|
||||||
int PlannerLevel = 0;
|
int PlannerLevel = 0;
|
||||||
|
bool UseCustomPath = false;
|
||||||
|
|
||||||
static bool ListContainsDistributedTableRTE(List *rangeTableList);
|
static bool ListContainsDistributedTableRTE(List *rangeTableList);
|
||||||
static bool IsUpdateOrDelete(Query *query);
|
static bool IsUpdateOrDelete(Query *query);
|
||||||
|
@ -134,6 +136,11 @@ distributed_planner(Query *parse,
|
||||||
int cursorOptions,
|
int cursorOptions,
|
||||||
ParamListInfo boundParams)
|
ParamListInfo boundParams)
|
||||||
{
|
{
|
||||||
|
if (UseCustomPath)
|
||||||
|
{
|
||||||
|
return standard_planner(parse, cursorOptions, boundParams);
|
||||||
|
}
|
||||||
|
|
||||||
bool needsDistributedPlanning = false;
|
bool needsDistributedPlanning = false;
|
||||||
bool fastPathRouterQuery = false;
|
bool fastPathRouterQuery = false;
|
||||||
Node *distributionKeyValue = NULL;
|
Node *distributionKeyValue = NULL;
|
||||||
|
@ -1680,6 +1687,12 @@ multi_join_restriction_hook(PlannerInfo *root,
|
||||||
JoinType jointype,
|
JoinType jointype,
|
||||||
JoinPathExtraData *extra)
|
JoinPathExtraData *extra)
|
||||||
{
|
{
|
||||||
|
if (UseCustomPath)
|
||||||
|
{
|
||||||
|
// TODO implement the replacement of path nodes with distributed union
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (bms_is_empty(innerrel->relids) || bms_is_empty(outerrel->relids))
|
if (bms_is_empty(innerrel->relids) || bms_is_empty(outerrel->relids))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
@ -1734,6 +1747,91 @@ multi_join_restriction_hook(PlannerInfo *root,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static Plan *
|
||||||
|
CreateDistributedUnionPlan(PlannerInfo *root,
|
||||||
|
RelOptInfo *rel,
|
||||||
|
struct CustomPath *best_path,
|
||||||
|
List *tlist,
|
||||||
|
List *clauses,
|
||||||
|
List *custom_plans)
|
||||||
|
{
|
||||||
|
Job *workerJob = CitusMakeNode(Job);
|
||||||
|
workerJob->jobId = UniqueJobId();
|
||||||
|
|
||||||
|
RangeTblEntry *rte = list_nth_node(RangeTblEntry, best_path->custom_private, 0);
|
||||||
|
List* shardIntervalList = LoadShardIntervalList(rte->relid);
|
||||||
|
ShardInterval *shardInterval = NULL;
|
||||||
|
|
||||||
|
int i = 0;
|
||||||
|
foreach_ptr(shardInterval, shardIntervalList)
|
||||||
|
{
|
||||||
|
StringInfo sqlQueryString = makeStringInfo();
|
||||||
|
appendStringInfo(sqlQueryString, "SELECT a,b FROM t1_%lu", shardInterval->shardId);
|
||||||
|
Task *sqlTask = CreateBasicTask(workerJob->jobId, i, SELECT_TASK, sqlQueryString->data);
|
||||||
|
sqlTask->anchorShardId = shardInterval->shardId;
|
||||||
|
sqlTask->taskPlacementList = FinalizedShardPlacementList(shardInterval->shardId);
|
||||||
|
workerJob->taskList = lappend(workerJob->taskList, sqlTask);
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
|
||||||
|
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
|
||||||
|
distributedPlan->workerJob = workerJob;
|
||||||
|
distributedPlan->modLevel = ROW_MODIFY_READONLY;
|
||||||
|
distributedPlan->relationIdList = list_make1_oid(rte->relid);
|
||||||
|
distributedPlan->hasReturning = true;
|
||||||
|
|
||||||
|
CustomScan *plan = makeNode(CustomScan);
|
||||||
|
plan->scan.scanrelid = 1;
|
||||||
|
plan->flags = best_path->flags;
|
||||||
|
plan->methods = &AdaptiveExecutorCustomScanMethods;
|
||||||
|
plan->custom_private = list_make1(distributedPlan);
|
||||||
|
|
||||||
|
plan->scan.plan.targetlist = list_make2(
|
||||||
|
makeTargetEntry((Expr *) makeVar(1, 1, INT4OID, -1, 0, 0), 1, "a", false),
|
||||||
|
makeTargetEntry((Expr *) makeVar(1, 2, INT4OID, -1, 0, 0), 2, "b", false)
|
||||||
|
);
|
||||||
|
|
||||||
|
return (Plan *) plan;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static List *
|
||||||
|
ReparameterizeDistributedUnion(PlannerInfo *root,
|
||||||
|
List *custom_private,
|
||||||
|
RelOptInfo *child_rel)
|
||||||
|
{
|
||||||
|
return NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
const CustomPathMethods distributedUnionMethods = {
|
||||||
|
.CustomName = "Distributed Union",
|
||||||
|
.PlanCustomPath = CreateDistributedUnionPlan,
|
||||||
|
.ReparameterizeCustomPathByChild = ReparameterizeDistributedUnion
|
||||||
|
};
|
||||||
|
|
||||||
|
static CustomPath *
|
||||||
|
WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte)
|
||||||
|
{
|
||||||
|
CustomPath *distUnion = makeNode(CustomPath);
|
||||||
|
distUnion->path.pathtype = T_CustomScan;
|
||||||
|
distUnion->path.parent = originalPath->parent;
|
||||||
|
distUnion->path.pathtarget = originalPath->pathtarget;
|
||||||
|
distUnion->path.param_info = originalPath->param_info;
|
||||||
|
|
||||||
|
|
||||||
|
/* TODO use a better cost model */
|
||||||
|
distUnion->path.rows = originalPath->rows;
|
||||||
|
distUnion->path.startup_cost = originalPath->startup_cost;
|
||||||
|
distUnion->path.total_cost = originalPath->total_cost;
|
||||||
|
|
||||||
|
distUnion->methods = &distributedUnionMethods;
|
||||||
|
|
||||||
|
distUnion->custom_private = list_make1(rte);
|
||||||
|
|
||||||
|
return distUnion;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* multi_relation_restriction_hook is a hook called by postgresql standard planner
|
* multi_relation_restriction_hook is a hook called by postgresql standard planner
|
||||||
* to notify us about various planning information regarding a relation. We use
|
* to notify us about various planning information regarding a relation. We use
|
||||||
|
@ -1743,6 +1841,25 @@ void
|
||||||
multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||||
Index restrictionIndex, RangeTblEntry *rte)
|
Index restrictionIndex, RangeTblEntry *rte)
|
||||||
{
|
{
|
||||||
|
if (UseCustomPath)
|
||||||
|
{
|
||||||
|
if (!IsDistributedTable(rte->relid))
|
||||||
|
{
|
||||||
|
/* table accessed is not distributed, no paths to change */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* wrap every path with a distirbuted union custom path */
|
||||||
|
ListCell *pathCell = NULL;
|
||||||
|
foreach(pathCell, relOptInfo->pathlist)
|
||||||
|
{
|
||||||
|
Path *originalPath = lfirst(pathCell);
|
||||||
|
pathCell->data.ptr_value = WrapTableAccessWithDistributedUnion(originalPath, rte);
|
||||||
|
}
|
||||||
|
// TODO implement the replacement of path nodes with distributed union
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
CitusTableCacheEntry *cacheEntry = NULL;
|
CitusTableCacheEntry *cacheEntry = NULL;
|
||||||
|
|
||||||
if (ReplaceCitusExtraDataContainer && IsCitusExtraDataContainerRelation(rte))
|
if (ReplaceCitusExtraDataContainer && IsCitusExtraDataContainerRelation(rte))
|
||||||
|
|
|
@ -1133,6 +1133,16 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_UNIT_BYTE | GUC_NO_SHOW_ALL,
|
GUC_UNIT_BYTE | GUC_NO_SHOW_ALL,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomBoolVariable(
|
||||||
|
"citus.use_custom_path",
|
||||||
|
gettext_noop("Replaces citus' planner with a custom path in the standard planner"),
|
||||||
|
NULL,
|
||||||
|
&UseCustomPath,
|
||||||
|
false,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_STANDARD,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomStringVariable(
|
DefineCustomStringVariable(
|
||||||
"citus.local_hostname",
|
"citus.local_hostname",
|
||||||
gettext_noop("Sets the hostname when connecting back to itself."),
|
gettext_noop("Sets the hostname when connecting back to itself."),
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
|
|
||||||
/* level of planner calls */
|
/* level of planner calls */
|
||||||
extern int PlannerLevel;
|
extern int PlannerLevel;
|
||||||
|
extern bool UseCustomPath;
|
||||||
|
|
||||||
typedef struct RelationRestrictionContext
|
typedef struct RelationRestrictionContext
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue