basic geo partitioning planning

moonshot/custom-path
Nils Dijk 2021-01-18 15:14:38 +01:00
parent ec994a3a9c
commit bac16a3785
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
9 changed files with 326 additions and 3 deletions

View File

@ -97,3 +97,6 @@ UPDATE pg_dist_partition t
AND t.logicalrelid = 'belgium_planet_osm_roads_geo'::regclass;
DROP TABLE foo;
COMMIT;
ALTER SYSTEM SET citus.use_custom_path TO on;
SELECT pg_reload_conf();

View File

@ -451,6 +451,11 @@ IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
return true;
}
case GEO_DISTRIBUTED:
{
return partitionMethod == DISTRIBUTE_BY_GEO;
}
default:
{
ereport(ERROR, (errmsg("Unknown table type %d", tableType)));
@ -4308,6 +4313,7 @@ GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
case DISTRIBUTE_BY_APPEND:
case DISTRIBUTE_BY_RANGE:
case DISTRIBUTE_BY_HASH:
case DISTRIBUTE_BY_GEO:
{
Node *partitionNode = stringToNode(partitionKeyString);
Var *partitionColumn = (Var *) partitionNode;
@ -4350,6 +4356,7 @@ GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn,
switch (partitionMethod)
{
case DISTRIBUTE_BY_GEO:
case DISTRIBUTE_BY_APPEND:
case DISTRIBUTE_BY_RANGE:
{

View File

@ -74,6 +74,9 @@ static uint64 NextPlanId = 1;
/* keep track of planner call stack levels */
int PlannerLevel = 0;
bool UseCustomPath = false;
bool OnlyGeoPartitioning = false;
bool UseGeoPartitioning = true;
bool EnableGeoPartitioningGrouping = true;
static bool ListContainsDistributedTableRTE(List *rangeTableList);
static bool IsUpdateOrDelete(Query *query);

View File

@ -3,6 +3,7 @@
//
#include "postgres.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type_d.h"
#include "distributed/citus_custom_scan.h"
@ -23,6 +24,7 @@
#include "nodes/plannodes.h"
#include "optimizer/pathnode.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/tlist.h"
#include "utils/builtins.h"
#include "utils/syscache.h"
@ -46,6 +48,13 @@ typedef struct DistributedUnionPath
Oid sampleRelid;
} DistributedUnionPath;
typedef struct GeoScanPath
{
CustomPath custom_path;
RangeTblEntry *rte;
} GeoScanPath;
static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans);
static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel);
static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue, Oid sampleRelid, List *subPaths);
@ -63,6 +72,14 @@ static List * ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *p
static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra);
static Path * OptimizeGroupAgg(PlannerInfo *root, Path *originalPath);
static bool CanOptimizeAggPath(PlannerInfo *root, AggPath *apath);
static GeoScanPath *makeGeoScanPath(Relation rel, RelOptInfo *parent,
PathTarget *pathtarget, double rows);
static bool IsGeoScanPath(CustomPath *path);
static RangeTblEntry *makeRangeTableEntryForRelation(Relation rel,
int lockmode,
Alias *alias,
bool inh,
bool inFromCl);
/*
@ -81,6 +98,10 @@ static optimizeFn joinOptimizations[] = {
GeoOverlapJoin,
};
const CustomPathMethods geoScanMethods = {
.CustomName = "GeoScan",
};
const CustomPathMethods distributedUnionMethods = {
.CustomName = "Distributed Union",
.PlanCustomPath = CreateDistributedUnionPlan,
@ -332,7 +353,7 @@ PathBasedPlannerRelationHook(PlannerInfo *root,
partitionValue = ExtractPartitionValue(relOptInfo->baserestrictinfo, partitionKey);
}
/* wrap every path with a distirbuted union custom path */
/* wrap every path with a distributed union custom path */
ListCell *pathCell = NULL;
foreach(pathCell, relOptInfo->pathlist)
{
@ -345,6 +366,216 @@ PathBasedPlannerRelationHook(PlannerInfo *root,
NIL);
SetListCellPtr(pathCell, wrappedPath);
}
/* hardcoded hack for adding geo distributed tables as an alternative path */
Relation rel = relation_open(rte->relid, AccessShareLock);
if (UseGeoPartitioning && strcmp(RelationGetRelationName(rel), "belgium_planet_osm_roads_dist") == 0)
{
if (OnlyGeoPartitioning)
{
/* makes the geo path the only path to access the relation */
relOptInfo->pathlist = NIL;
}
Oid geoRelid = RelnameGetRelid("belgium_planet_osm_roads_geo");
Relation georel = relation_open(geoRelid, AccessShareLock);
Path *geoPath = (Path *) makeGeoScanPath(georel,
relOptInfo,
relOptInfo->reltarget,
relOptInfo->rows);
geoPath = (Path *)
WrapTableAccessWithDistributedUnion(geoPath,
TableColocationId(geoRelid),
NULL,
geoRelid,
NIL);
if (EnableGeoPartitioningGrouping)
{
/* verymuch just an int4 at the moment */
SortGroupClause *sgc = makeNode(SortGroupClause);
sgc->tleSortGroupRef = 1; /* should be first field */
sgc->eqop = 96;
sgc->sortop = 97;
sgc->nulls_first = false;
sgc->hashable = true; /* ? just assume an in can be hashed */
List *groupClause = list_make1(sgc);
/* creating the target list */
PathTarget *groupPathTarget = create_empty_pathtarget();
add_column_to_pathtarget(groupPathTarget, (Expr *) makeVar(1,1,23,-1, 0,0), 0);
/* any_value on osm_id */
Aggref *aggref = makeNode(Aggref);
aggref->aggfnoid = 18333; /* any_value */
aggref->aggtype = 20;
aggref->aggtranstype = 20;
aggref->aggfilter = NULL;
aggref->aggstar = false;
aggref->aggvariadic = false;
aggref->aggkind = AGGKIND_NORMAL;
aggref->aggsplit = AGGSPLIT_SIMPLE;
aggref->location = 0;
aggref->args = list_make1(
makeTargetEntry(
(Expr *) makeVar(1, 2, 20, -1, 0, 0),
1, NULL, false)); /* osm_id */
struct TargetEntry *argTLE = NULL;
foreach_ptr(argTLE, aggref->args)
{
aggref->aggargtypes = lappend_oid(aggref->aggargtypes,
exprType((Node *) argTLE->expr));
}
add_column_to_pathtarget(groupPathTarget, (Expr *) aggref, 0);
/* ST_Union(way) */
Aggref *aggref2 = makeNode(Aggref);
aggref2->aggfnoid = 16861; /* ST_Union */
aggref2->aggtype = 16390;
aggref2->aggtranstype = 2281;
aggref2->aggfilter = NULL;
aggref2->aggstar = false;
aggref2->aggvariadic = false;
aggref2->aggkind = AGGKIND_NORMAL;
aggref2->aggsplit = AGGSPLIT_SIMPLE;
aggref2->location = 0;
aggref2->args = list_make1(
makeTargetEntry(
(Expr *) makeVar(1, 3, 16390, -1, 0, 0),
1, NULL, false)); /* way */
argTLE = NULL;
foreach_ptr(argTLE, aggref2->args)
{
aggref2->aggargtypes = lappend_oid(aggref2->aggargtypes,
exprType((Node *) argTLE->expr));
}
add_column_to_pathtarget(groupPathTarget, (Expr *) aggref2, 0);
/* TODO figure out costing for our grouping */
AggClauseCosts costs = {
.numAggs = 2,
.numOrderedAggs = 0,
.hasNonPartial = false,
.hasNonSerial = false,
.transCost.startup = 0,
.transCost.per_tuple = 0,
.finalCost.startup = 0,
.finalCost.per_tuple = 0,
.transitionSpace = 0,
};
geoPath = (Path *) create_agg_path(root,
relOptInfo,
geoPath,
groupPathTarget,
AGG_HASHED,
AGGSPLIT_SIMPLE,
groupClause,
NIL, &costs,
2);
}
add_path(relOptInfo, geoPath);
relation_close(georel, AccessShareLock);
}
relation_close(rel, AccessShareLock);
}
static GeoScanPath *
makeGeoScanPath(Relation rel, RelOptInfo *parent, PathTarget *pathtarget, double rows)
{
GeoScanPath *geoPath = (GeoScanPath *) newNode(sizeof(GeoScanPath), T_CustomPath);
CustomPath *cpath = (CustomPath *) geoPath;
Path *path = (Path *) geoPath;
path->pathtype = T_CustomScan;
path->parent = parent;
PathTarget *targetCopy = create_empty_pathtarget();
add_column_to_pathtarget(targetCopy, list_nth(pathtarget->exprs, 0), 1);
add_column_to_pathtarget(targetCopy, list_nth(pathtarget->exprs, 1), 0);
add_column_to_pathtarget(targetCopy, list_nth(pathtarget->exprs, 2), 0);
path->pathtarget = targetCopy;
path->param_info = NULL;
path->rows = rows * 1.2; /* add 20% for the duplication */
path->startup_cost = 0;
path->total_cost = 0;
cpath->methods = &geoScanMethods;
geoPath->rte = makeRangeTableEntryForRelation(rel, AccessShareLock, NULL, false, true);
return geoPath;
}
static bool
IsGeoScanPath(CustomPath *path)
{
return path->methods == &geoScanMethods;
}
static RangeTblEntry *
makeRangeTableEntryForRelation(Relation rel,
int lockmode,
Alias *alias,
bool inh,
bool inFromCl)
{
RangeTblEntry *rte = makeNode(RangeTblEntry);
char *refname = alias ? alias->aliasname : RelationGetRelationName(rel);
Assert(lockmode == AccessShareLock ||
lockmode == RowShareLock ||
lockmode == RowExclusiveLock);
Assert(CheckRelationLockedByMe(rel, lockmode, true));
rte->rtekind = RTE_RELATION;
rte->alias = alias;
rte->relid = RelationGetRelid(rel);
rte->relkind = rel->rd_rel->relkind;
rte->rellockmode = lockmode;
/*
* Build the list of effective column names using user-supplied aliases
* and/or actual column names.
*/
rte->eref = makeAlias(refname, NIL);
rte->eref->colnames = list_make3(makeString("k"),
makeString("osm_id"),
makeString("way"));
/*
* Set flags and access permissions.
*
* The initial default on access checks is always check-for-READ-access,
* which is the right thing for all except target tables.
*/
rte->lateral = false;
rte->inh = inh;
rte->inFromCl = inFromCl;
rte->requiredPerms = ACL_SELECT;
rte->checkAsUser = InvalidOid; /* not set-uid by default, either */
rte->selectedCols = NULL;
rte->insertedCols = NULL;
rte->updatedCols = NULL;
rte->extraUpdatedCols = NULL;
return rte;
}
@ -823,6 +1054,13 @@ ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *inf
break;
}
case T_BitmapHeapScan:
{
BitmapHeapPath *bpath = castNode(BitmapHeapPath, path);
ApplyPathToQuery(root, query, bpath->bitmapqual, info);
return;
}
case T_IndexScan:
case T_IndexOnlyScan:
case T_SeqScan:
@ -977,9 +1215,37 @@ ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *inf
break;
}
case T_CustomScan:
{
if (IsGeoScanPath(castNode(CustomPath, path)))
{
GeoScanPath *geoPath = (GeoScanPath *) path;
Index scan_relid = path->parent->relid;
Index rteIndex = info->varno_mapping[scan_relid];
if (rteIndex == 0)
{
RangeTblEntry* rte = geoPath->rte;
query->rtable = lappend(query->rtable, rte);
rteIndex = list_length(query->rtable);
info->varno_mapping[scan_relid] = rteIndex;
}
/* add to from list */
RangeTblRef *rr = makeNode(RangeTblRef);
rr->rtindex = rteIndex;
query->jointree->fromlist = lappend(query->jointree->fromlist, rr);
break;
}
/* fallthrough to error */
}
default:
{
ereport(ERROR, (errmsg("unknow path type in worker query"),
ereport(ERROR, (errmsg("unknown path type in worker query"),
errdetail("cannot turn worker path into query due to unknown "
"path type in plan. pathtype: %d", path->pathtype))
);

View File

@ -1159,6 +1159,36 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.use_geo_partitioning",
gettext_noop("Enable planning with geo partitins"),
NULL,
&UseGeoPartitioning,
true,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.use_only_geo_partitioning",
gettext_noop("Force the planner to use only the geo partitioning"),
NULL,
&OnlyGeoPartitioning,
false,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.use_geo_partitioning_grouping",
gettext_noop("Enable geo partitioning grouping"),
NULL,
&EnableGeoPartitioningGrouping,
true,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.use_custom_path_broadcast_join",
gettext_noop("Allow broadcast joins to be used during path based planning"),

View File

@ -1003,6 +1003,16 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
return colocatedShardList;
}
if (IsCitusTableTypeCacheEntry(cacheEntry, GEO_DISTRIBUTED))
{
/* TODO actual lookup colocated geo tables, since we experiment with selfjoins this hack works */
ShardInterval *copyShardInterval = CopyShardInterval(shardInterval);
colocatedShardList = lappend(colocatedShardList, copyShardInterval);
return colocatedShardList;
}
int shardIntervalIndex = ShardIndex(shardInterval);
List *colocatedTableList = ColocatedTableList(distributedTableId);

View File

@ -33,6 +33,9 @@
/* level of planner calls */
extern int PlannerLevel;
extern bool UseCustomPath;
extern bool OnlyGeoPartitioning;
extern bool UseGeoPartitioning;
extern bool EnableGeoPartitioningGrouping;
typedef struct RelationRestrictionContext
{

View File

@ -122,6 +122,7 @@ typedef enum
HASH_DISTRIBUTED,
APPEND_DISTRIBUTED,
RANGE_DISTRIBUTED,
GEO_DISTRIBUTED,
/* hash, range or append distributed table */
DISTRIBUTED_TABLE,

View File

@ -57,7 +57,7 @@ typedef FormData_pg_dist_partition *Form_pg_dist_partition;
#define DISTRIBUTE_BY_NONE 'n'
#define REDISTRIBUTE_BY_HASH 'x'
#define DISTRIBUTE_BY_INVALID '\0'
#define REDISTRIBUTE_BY_GEO 'g'
#define DISTRIBUTE_BY_GEO 'g'
/*
* Valid values for repmodel are 'c' for coordinator, 's' for streaming