From bac16a37857f528be9104c3a157295f38987fac9 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Mon, 18 Jan 2021 15:14:38 +0100 Subject: [PATCH] basic geo partitioning planning --- geodist.sql | 3 + .../distributed/metadata/metadata_cache.c | 7 + .../distributed/planner/distributed_planner.c | 3 + .../distributed/planner/path_based_planner.c | 270 +++++++++++++++++- src/backend/distributed/shared_library_init.c | 30 ++ .../distributed/utils/colocation_utils.c | 10 + src/include/distributed/distributed_planner.h | 3 + src/include/distributed/metadata_cache.h | 1 + src/include/distributed/pg_dist_partition.h | 2 +- 9 files changed, 326 insertions(+), 3 deletions(-) diff --git a/geodist.sql b/geodist.sql index c5965bc55..f16b7fd37 100644 --- a/geodist.sql +++ b/geodist.sql @@ -97,3 +97,6 @@ UPDATE pg_dist_partition t AND t.logicalrelid = 'belgium_planet_osm_roads_geo'::regclass; DROP TABLE foo; COMMIT; + +ALTER SYSTEM SET citus.use_custom_path TO on; +SELECT pg_reload_conf(); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index b8c2ffced..fcb458fd7 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -451,6 +451,11 @@ IsCitusTableTypeInternal(char partitionMethod, char replicationModel, return true; } + case GEO_DISTRIBUTED: + { + return partitionMethod == DISTRIBUTE_BY_GEO; + } + default: { ereport(ERROR, (errmsg("Unknown table type %d", tableType))); @@ -4308,6 +4313,7 @@ GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, case DISTRIBUTE_BY_APPEND: case DISTRIBUTE_BY_RANGE: case DISTRIBUTE_BY_HASH: + case DISTRIBUTE_BY_GEO: { Node *partitionNode = stringToNode(partitionKeyString); Var *partitionColumn = (Var *) partitionNode; @@ -4350,6 +4356,7 @@ GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn, switch (partitionMethod) { + case DISTRIBUTE_BY_GEO: case DISTRIBUTE_BY_APPEND: case DISTRIBUTE_BY_RANGE: { diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 41529e8ec..2a6de3514 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -74,6 +74,9 @@ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ int PlannerLevel = 0; bool UseCustomPath = false; +bool OnlyGeoPartitioning = false; +bool UseGeoPartitioning = true; +bool EnableGeoPartitioningGrouping = true; static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool IsUpdateOrDelete(Query *query); diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c index 6bf6ecc0d..7b61628eb 100644 --- a/src/backend/distributed/planner/path_based_planner.c +++ b/src/backend/distributed/planner/path_based_planner.c @@ -3,6 +3,7 @@ // #include "postgres.h" +#include "catalog/pg_aggregate.h" #include "catalog/pg_proc.h" #include "catalog/pg_type_d.h" #include "distributed/citus_custom_scan.h" @@ -23,6 +24,7 @@ #include "nodes/plannodes.h" #include "optimizer/pathnode.h" #include "optimizer/restrictinfo.h" +#include "optimizer/tlist.h" #include "utils/builtins.h" #include "utils/syscache.h" @@ -46,6 +48,13 @@ typedef struct DistributedUnionPath Oid sampleRelid; } DistributedUnionPath; +typedef struct GeoScanPath +{ + CustomPath custom_path; + + RangeTblEntry *rte; +} GeoScanPath; + static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans); static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel); static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue, Oid sampleRelid, List *subPaths); @@ -63,6 +72,14 @@ static List * ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *p static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); static Path * OptimizeGroupAgg(PlannerInfo *root, Path *originalPath); static bool CanOptimizeAggPath(PlannerInfo *root, AggPath *apath); +static GeoScanPath *makeGeoScanPath(Relation rel, RelOptInfo *parent, + PathTarget *pathtarget, double rows); +static bool IsGeoScanPath(CustomPath *path); +static RangeTblEntry *makeRangeTableEntryForRelation(Relation rel, + int lockmode, + Alias *alias, + bool inh, + bool inFromCl); /* @@ -81,6 +98,10 @@ static optimizeFn joinOptimizations[] = { GeoOverlapJoin, }; +const CustomPathMethods geoScanMethods = { + .CustomName = "GeoScan", +}; + const CustomPathMethods distributedUnionMethods = { .CustomName = "Distributed Union", .PlanCustomPath = CreateDistributedUnionPlan, @@ -332,7 +353,7 @@ PathBasedPlannerRelationHook(PlannerInfo *root, partitionValue = ExtractPartitionValue(relOptInfo->baserestrictinfo, partitionKey); } - /* wrap every path with a distirbuted union custom path */ + /* wrap every path with a distributed union custom path */ ListCell *pathCell = NULL; foreach(pathCell, relOptInfo->pathlist) { @@ -345,6 +366,216 @@ PathBasedPlannerRelationHook(PlannerInfo *root, NIL); SetListCellPtr(pathCell, wrappedPath); } + + /* hardcoded hack for adding geo distributed tables as an alternative path */ + Relation rel = relation_open(rte->relid, AccessShareLock); + if (UseGeoPartitioning && strcmp(RelationGetRelationName(rel), "belgium_planet_osm_roads_dist") == 0) + { + if (OnlyGeoPartitioning) + { + /* makes the geo path the only path to access the relation */ + relOptInfo->pathlist = NIL; + } + + + Oid geoRelid = RelnameGetRelid("belgium_planet_osm_roads_geo"); + Relation georel = relation_open(geoRelid, AccessShareLock); + + Path *geoPath = (Path *) makeGeoScanPath(georel, + relOptInfo, + relOptInfo->reltarget, + relOptInfo->rows); + geoPath = (Path *) + WrapTableAccessWithDistributedUnion(geoPath, + TableColocationId(geoRelid), + NULL, + geoRelid, + NIL); + + if (EnableGeoPartitioningGrouping) + { + /* verymuch just an int4 at the moment */ + SortGroupClause *sgc = makeNode(SortGroupClause); + sgc->tleSortGroupRef = 1; /* should be first field */ + sgc->eqop = 96; + sgc->sortop = 97; + sgc->nulls_first = false; + sgc->hashable = true; /* ? just assume an in can be hashed */ + + List *groupClause = list_make1(sgc); + + /* creating the target list */ + PathTarget *groupPathTarget = create_empty_pathtarget(); + add_column_to_pathtarget(groupPathTarget, (Expr *) makeVar(1,1,23,-1, 0,0), 0); + + /* any_value on osm_id */ + Aggref *aggref = makeNode(Aggref); + aggref->aggfnoid = 18333; /* any_value */ + aggref->aggtype = 20; + aggref->aggtranstype = 20; + aggref->aggfilter = NULL; + aggref->aggstar = false; + aggref->aggvariadic = false; + aggref->aggkind = AGGKIND_NORMAL; + aggref->aggsplit = AGGSPLIT_SIMPLE; + aggref->location = 0; + + aggref->args = list_make1( + makeTargetEntry( + (Expr *) makeVar(1, 2, 20, -1, 0, 0), + 1, NULL, false)); /* osm_id */ + struct TargetEntry *argTLE = NULL; + foreach_ptr(argTLE, aggref->args) + { + aggref->aggargtypes = lappend_oid(aggref->aggargtypes, + exprType((Node *) argTLE->expr)); + } + add_column_to_pathtarget(groupPathTarget, (Expr *) aggref, 0); + + /* ST_Union(way) */ + Aggref *aggref2 = makeNode(Aggref); + aggref2->aggfnoid = 16861; /* ST_Union */ + aggref2->aggtype = 16390; + aggref2->aggtranstype = 2281; + aggref2->aggfilter = NULL; + aggref2->aggstar = false; + aggref2->aggvariadic = false; + aggref2->aggkind = AGGKIND_NORMAL; + aggref2->aggsplit = AGGSPLIT_SIMPLE; + aggref2->location = 0; + + aggref2->args = list_make1( + makeTargetEntry( + (Expr *) makeVar(1, 3, 16390, -1, 0, 0), + 1, NULL, false)); /* way */ + argTLE = NULL; + foreach_ptr(argTLE, aggref2->args) + { + aggref2->aggargtypes = lappend_oid(aggref2->aggargtypes, + exprType((Node *) argTLE->expr)); + } + add_column_to_pathtarget(groupPathTarget, (Expr *) aggref2, 0); + + /* TODO figure out costing for our grouping */ + AggClauseCosts costs = { + .numAggs = 2, + .numOrderedAggs = 0, + .hasNonPartial = false, + .hasNonSerial = false, + + .transCost.startup = 0, + .transCost.per_tuple = 0, + .finalCost.startup = 0, + .finalCost.per_tuple = 0, + + .transitionSpace = 0, + }; + + geoPath = (Path *) create_agg_path(root, + relOptInfo, + geoPath, + groupPathTarget, + AGG_HASHED, + AGGSPLIT_SIMPLE, + groupClause, + NIL, &costs, + 2); + } + + + add_path(relOptInfo, geoPath); + + relation_close(georel, AccessShareLock); + } + relation_close(rel, AccessShareLock); +} + + +static GeoScanPath * +makeGeoScanPath(Relation rel, RelOptInfo *parent, PathTarget *pathtarget, double rows) +{ + GeoScanPath *geoPath = (GeoScanPath *) newNode(sizeof(GeoScanPath), T_CustomPath); + CustomPath *cpath = (CustomPath *) geoPath; + Path *path = (Path *) geoPath; + + path->pathtype = T_CustomScan; + path->parent = parent; + + PathTarget *targetCopy = create_empty_pathtarget(); + add_column_to_pathtarget(targetCopy, list_nth(pathtarget->exprs, 0), 1); + add_column_to_pathtarget(targetCopy, list_nth(pathtarget->exprs, 1), 0); + add_column_to_pathtarget(targetCopy, list_nth(pathtarget->exprs, 2), 0); + + path->pathtarget = targetCopy; + path->param_info = NULL; + + path->rows = rows * 1.2; /* add 20% for the duplication */ + path->startup_cost = 0; + path->total_cost = 0; + + cpath->methods = &geoScanMethods; + + geoPath->rte = makeRangeTableEntryForRelation(rel, AccessShareLock, NULL, false, true); + + return geoPath; +} + + +static bool +IsGeoScanPath(CustomPath *path) +{ + return path->methods == &geoScanMethods; +} + + +static RangeTblEntry * +makeRangeTableEntryForRelation(Relation rel, + int lockmode, + Alias *alias, + bool inh, + bool inFromCl) +{ + RangeTblEntry *rte = makeNode(RangeTblEntry); + char *refname = alias ? alias->aliasname : RelationGetRelationName(rel); + + Assert(lockmode == AccessShareLock || + lockmode == RowShareLock || + lockmode == RowExclusiveLock); + Assert(CheckRelationLockedByMe(rel, lockmode, true)); + + rte->rtekind = RTE_RELATION; + rte->alias = alias; + rte->relid = RelationGetRelid(rel); + rte->relkind = rel->rd_rel->relkind; + rte->rellockmode = lockmode; + + /* + * Build the list of effective column names using user-supplied aliases + * and/or actual column names. + */ + rte->eref = makeAlias(refname, NIL); + rte->eref->colnames = list_make3(makeString("k"), + makeString("osm_id"), + makeString("way")); + + /* + * Set flags and access permissions. + * + * The initial default on access checks is always check-for-READ-access, + * which is the right thing for all except target tables. + */ + rte->lateral = false; + rte->inh = inh; + rte->inFromCl = inFromCl; + + rte->requiredPerms = ACL_SELECT; + rte->checkAsUser = InvalidOid; /* not set-uid by default, either */ + rte->selectedCols = NULL; + rte->insertedCols = NULL; + rte->updatedCols = NULL; + rte->extraUpdatedCols = NULL; + + return rte; } @@ -823,6 +1054,13 @@ ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *inf break; } + case T_BitmapHeapScan: + { + BitmapHeapPath *bpath = castNode(BitmapHeapPath, path); + ApplyPathToQuery(root, query, bpath->bitmapqual, info); + return; + } + case T_IndexScan: case T_IndexOnlyScan: case T_SeqScan: @@ -977,9 +1215,37 @@ ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *inf break; } + case T_CustomScan: + { + if (IsGeoScanPath(castNode(CustomPath, path))) + { + GeoScanPath *geoPath = (GeoScanPath *) path; + + Index scan_relid = path->parent->relid; + Index rteIndex = info->varno_mapping[scan_relid]; + + if (rteIndex == 0) + { + RangeTblEntry* rte = geoPath->rte; + query->rtable = lappend(query->rtable, rte); + rteIndex = list_length(query->rtable); + info->varno_mapping[scan_relid] = rteIndex; + } + + /* add to from list */ + RangeTblRef *rr = makeNode(RangeTblRef); + rr->rtindex = rteIndex; + query->jointree->fromlist = lappend(query->jointree->fromlist, rr); + + break; + } + + /* fallthrough to error */ + } + default: { - ereport(ERROR, (errmsg("unknow path type in worker query"), + ereport(ERROR, (errmsg("unknown path type in worker query"), errdetail("cannot turn worker path into query due to unknown " "path type in plan. pathtype: %d", path->pathtype)) ); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 754882b75..258d8ab16 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1159,6 +1159,36 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.use_geo_partitioning", + gettext_noop("Enable planning with geo partitins"), + NULL, + &UseGeoPartitioning, + true, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + + DefineCustomBoolVariable( + "citus.use_only_geo_partitioning", + gettext_noop("Force the planner to use only the geo partitioning"), + NULL, + &OnlyGeoPartitioning, + false, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + + DefineCustomBoolVariable( + "citus.use_geo_partitioning_grouping", + gettext_noop("Enable geo partitioning grouping"), + NULL, + &EnableGeoPartitioningGrouping, + true, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.use_custom_path_broadcast_join", gettext_noop("Allow broadcast joins to be used during path based planning"), diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 4edc9e424..625910495 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -1003,6 +1003,16 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) return colocatedShardList; } + if (IsCitusTableTypeCacheEntry(cacheEntry, GEO_DISTRIBUTED)) + { + /* TODO actual lookup colocated geo tables, since we experiment with selfjoins this hack works */ + ShardInterval *copyShardInterval = CopyShardInterval(shardInterval); + + colocatedShardList = lappend(colocatedShardList, copyShardInterval); + + return colocatedShardList; + } + int shardIntervalIndex = ShardIndex(shardInterval); List *colocatedTableList = ColocatedTableList(distributedTableId); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 0a03b8a5c..5a7a3dfd4 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -33,6 +33,9 @@ /* level of planner calls */ extern int PlannerLevel; extern bool UseCustomPath; +extern bool OnlyGeoPartitioning; +extern bool UseGeoPartitioning; +extern bool EnableGeoPartitioningGrouping; typedef struct RelationRestrictionContext { diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 4461cb1e9..4f524d5f4 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -122,6 +122,7 @@ typedef enum HASH_DISTRIBUTED, APPEND_DISTRIBUTED, RANGE_DISTRIBUTED, + GEO_DISTRIBUTED, /* hash, range or append distributed table */ DISTRIBUTED_TABLE, diff --git a/src/include/distributed/pg_dist_partition.h b/src/include/distributed/pg_dist_partition.h index a4d10880e..3c95829a5 100644 --- a/src/include/distributed/pg_dist_partition.h +++ b/src/include/distributed/pg_dist_partition.h @@ -57,7 +57,7 @@ typedef FormData_pg_dist_partition *Form_pg_dist_partition; #define DISTRIBUTE_BY_NONE 'n' #define REDISTRIBUTE_BY_HASH 'x' #define DISTRIBUTE_BY_INVALID '\0' -#define REDISTRIBUTE_BY_GEO 'g' +#define DISTRIBUTE_BY_GEO 'g' /* * Valid values for repmodel are 'c' for coordinator, 's' for streaming