expand shard model for geo distribution

moonshot/custom-path
Nils Dijk 2021-01-08 13:09:08 +01:00
parent 6bc9209baf
commit ff187ce5e1
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
12 changed files with 388 additions and 59 deletions

73
configure vendored
View File

@ -668,7 +668,6 @@ infodir
docdir docdir
oldincludedir oldincludedir
includedir includedir
runstatedir
localstatedir localstatedir
sharedstatedir sharedstatedir
sysconfdir sysconfdir
@ -748,7 +747,6 @@ datadir='${datarootdir}'
sysconfdir='${prefix}/etc' sysconfdir='${prefix}/etc'
sharedstatedir='${prefix}/com' sharedstatedir='${prefix}/com'
localstatedir='${prefix}/var' localstatedir='${prefix}/var'
runstatedir='${localstatedir}/run'
includedir='${prefix}/include' includedir='${prefix}/include'
oldincludedir='/usr/include' oldincludedir='/usr/include'
docdir='${datarootdir}/doc/${PACKAGE_TARNAME}' docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
@ -1001,15 +999,6 @@ do
| -silent | --silent | --silen | --sile | --sil) | -silent | --silent | --silen | --sile | --sil)
silent=yes ;; silent=yes ;;
-runstatedir | --runstatedir | --runstatedi | --runstated \
| --runstate | --runstat | --runsta | --runst | --runs \
| --run | --ru | --r)
ac_prev=runstatedir ;;
-runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
| --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
| --run=* | --ru=* | --r=*)
runstatedir=$ac_optarg ;;
-sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb) -sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
ac_prev=sbindir ;; ac_prev=sbindir ;;
-sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \ -sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
@ -1147,7 +1136,7 @@ fi
for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \ for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \
datadir sysconfdir sharedstatedir localstatedir includedir \ datadir sysconfdir sharedstatedir localstatedir includedir \
oldincludedir docdir infodir htmldir dvidir pdfdir psdir \ oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
libdir localedir mandir runstatedir libdir localedir mandir
do do
eval ac_val=\$$ac_var eval ac_val=\$$ac_var
# Remove trailing slashes. # Remove trailing slashes.
@ -1300,7 +1289,6 @@ Fine tuning of the installation directories:
--sysconfdir=DIR read-only single-machine data [PREFIX/etc] --sysconfdir=DIR read-only single-machine data [PREFIX/etc]
--sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com] --sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com]
--localstatedir=DIR modifiable single-machine data [PREFIX/var] --localstatedir=DIR modifiable single-machine data [PREFIX/var]
--runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run]
--libdir=DIR object code libraries [EPREFIX/lib] --libdir=DIR object code libraries [EPREFIX/lib]
--includedir=DIR C header files [PREFIX/include] --includedir=DIR C header files [PREFIX/include]
--oldincludedir=DIR C header files for non-gcc [/usr/include] --oldincludedir=DIR C header files for non-gcc [/usr/include]
@ -4502,6 +4490,65 @@ fi
fi fi
#
# liblwgeom
#
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for lwgeom_add_bbox in -llwgeom" >&5
$as_echo_n "checking for lwgeom_add_bbox in -llwgeom... " >&6; }
if ${ac_cv_lib_lwgeom_lwgeom_add_bbox+:} false; then :
$as_echo_n "(cached) " >&6
else
ac_check_lib_save_LIBS=$LIBS
LIBS="-llwgeom $LIBS"
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
/* end confdefs.h. */
/* Override any GCC internal prototype to avoid an error.
Use char because int might match the return type of a GCC
builtin and then its argument prototype would still apply. */
#ifdef __cplusplus
extern "C"
#endif
char lwgeom_add_bbox ();
int
main ()
{
return lwgeom_add_bbox ();
;
return 0;
}
_ACEOF
if ac_fn_c_try_link "$LINENO"; then :
ac_cv_lib_lwgeom_lwgeom_add_bbox=yes
else
ac_cv_lib_lwgeom_lwgeom_add_bbox=no
fi
rm -f core conftest.err conftest.$ac_objext \
conftest$ac_exeext conftest.$ac_ext
LIBS=$ac_check_lib_save_LIBS
fi
{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_lwgeom_lwgeom_add_bbox" >&5
$as_echo "$ac_cv_lib_lwgeom_lwgeom_add_bbox" >&6; }
if test "x$ac_cv_lib_lwgeom_lwgeom_add_bbox" = xyes; then :
cat >>confdefs.h <<_ACEOF
#define HAVE_LIBLWGEOM 1
_ACEOF
LIBS="-llwgeom $LIBS"
else
as_fn_error $? "liblwgeom not found" "$LINENO" 5
fi
ac_fn_c_check_header_mongrel "$LINENO" "liblwgeom.h" "ac_cv_header_liblwgeom_h" "$ac_includes_default"
if test "x$ac_cv_header_liblwgeom_h" = xyes; then :
else
as_fn_error $? "liblwgeom header not found" "$LINENO" 5
fi
# REPORTS_BASE_URL definition # REPORTS_BASE_URL definition

View File

@ -208,6 +208,12 @@ failure. It is possible the compiler isn't looking in the proper directory.
Use --without-libcurl to disable libcurl support.])]) Use --without-libcurl to disable libcurl support.])])
fi fi
#
# liblwgeom
#
AC_CHECK_LIB(lwgeom, lwgeom_add_bbox, [], [AC_MSG_ERROR([liblwgeom not found])])
AC_CHECK_HEADER(liblwgeom.h, [], [AC_MSG_ERROR([liblwgeom header not found])])
# REPORTS_BASE_URL definition # REPORTS_BASE_URL definition
PGAC_ARG_REQ(with, reports-hostname, [HOSTNAME], PGAC_ARG_REQ(with, reports-hostname, [HOSTNAME],
[Use HOSTNAME as hostname for statistics collection and update checks], [Use HOSTNAME as hostname for statistics collection and update checks],

View File

@ -4,3 +4,4 @@ default_version = '11.0-1'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog
requires = postgis

View File

@ -4410,6 +4410,7 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
Oid typeIoParam = InvalidOid; Oid typeIoParam = InvalidOid;
Datum minValue = 0; Datum minValue = 0;
Datum maxValue = 0; Datum maxValue = 0;
Datum containerValue = 0;
bool minValueExists = false; bool minValueExists = false;
bool maxValueExists = false; bool maxValueExists = false;
int16 intervalTypeLen = 0; int16 intervalTypeLen = 0;
@ -4417,15 +4418,24 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
char intervalAlign = '0'; char intervalAlign = '0';
char intervalDelim = '0'; char intervalDelim = '0';
Oid containerTypeId = 0;
int16 containerTypeLen = 0;
bool containerByVal = false;
char containerAlign = '0';
char containerDelim = '0';
int32 containerTypeMod = 0;
Oid relationId = Oid relationId =
DatumGetObjectId(datumArray[Anum_pg_dist_shard_logicalrelid - 1]); DatumGetObjectId(datumArray[Anum_pg_dist_shard_logicalrelid - 1]);
int64 shardId = DatumGetInt64(datumArray[Anum_pg_dist_shard_shardid - 1]); int64 shardId = DatumGetInt64(datumArray[Anum_pg_dist_shard_shardid - 1]);
char storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]); char storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]);
Datum minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1]; Datum minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1];
Datum maxValueTextDatum = datumArray[Anum_pg_dist_shard_shardmaxvalue - 1]; Datum maxValueTextDatum = datumArray[Anum_pg_dist_shard_shardmaxvalue - 1];
Datum containerValueTextDatum = datumArray[Anum_pg_dist_shard_shardcontainer - 1];
bool minValueNull = isNullArray[Anum_pg_dist_shard_shardminvalue - 1]; bool minValueNull = isNullArray[Anum_pg_dist_shard_shardminvalue - 1];
bool maxValueNull = isNullArray[Anum_pg_dist_shard_shardmaxvalue - 1]; bool maxValueNull = isNullArray[Anum_pg_dist_shard_shardmaxvalue - 1];
bool containerValueNull = isNullArray[Anum_pg_dist_shard_shardcontainer - 1];
if (!minValueNull && !maxValueNull) if (!minValueNull && !maxValueNull)
{ {
@ -4448,6 +4458,22 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
maxValueExists = true; maxValueExists = true;
} }
if (!containerValueNull)
{
/* geo */
TypeName *geometryTypeName = makeTypeNameFromNameList(
list_make2(makeString("public"), makeString("geometry")));
containerTypeId = typenameTypeId(NULL, geometryTypeName);
get_type_io_data(containerTypeId, IOFunc_input, &containerTypeLen,
&containerByVal,
&containerAlign, &containerDelim, &typeIoParam,
&inputFunctionId);
char *containerValueString = TextDatumGetCString(containerValueTextDatum);
containerValue = OidInputFunctionCall(inputFunctionId, containerValueString,
typeIoParam, containerTypeMod);
}
ShardInterval *shardInterval = CitusMakeNode(ShardInterval); ShardInterval *shardInterval = CitusMakeNode(ShardInterval);
shardInterval->relationId = relationId; shardInterval->relationId = relationId;
shardInterval->storageType = storageType; shardInterval->storageType = storageType;
@ -4460,6 +4486,11 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
shardInterval->maxValue = maxValue; shardInterval->maxValue = maxValue;
shardInterval->shardId = shardId; shardInterval->shardId = shardId;
shardInterval->containerTypeId = containerTypeId;
shardInterval->containerTypeLen = containerTypeLen;
shardInterval->containerByVal = containerByVal;
shardInterval->containerValue = containerValue;
return shardInterval; return shardInterval;
} }

View File

@ -1611,6 +1611,8 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true; isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true;
} }
isNulls[Anum_pg_dist_shard_shardcontainer - 1] = true;
/* open shard relation and insert new tuple */ /* open shard relation and insert new tuple */
Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock); Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock);

View File

@ -3,6 +3,7 @@
// //
#include "postgres.h" #include "postgres.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type_d.h" #include "catalog/pg_type_d.h"
#include "distributed/citus_custom_scan.h" #include "distributed/citus_custom_scan.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
@ -23,41 +24,9 @@
#include "optimizer/pathnode.h" #include "optimizer/pathnode.h"
#include "optimizer/restrictinfo.h" #include "optimizer/restrictinfo.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/syscache.h"
typedef List * (*optimizeFn)(Path *originalPath); typedef List * (*optimizeFn)(PlannerInfo *root, Path *originalPath);
static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans);
static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel);
static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue, Oid sampleRelid, List *subPaths);
static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses);
static List * ShardIntervalListToRelationShardList(List *shardIntervalList);
static List * OptimizeJoinPath(Path *originalPath);
static List * BroadcastOuterJoinPath(Path *originalPath);
static List * BroadcastInnerJoinPath(Path *originalPath);
static Path * CreateReadIntermediateResultPath(const Path *originalPath);
static bool CanOptimizeJoinPath(const JoinPath *jpath);
static bool IsDistributedUnion(Path *path);
static Expr * ExtractPartitionValue(List *restrictionList, Var *partitionKey);
static List * ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue);
static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra);
static Path * OptimizeGroupAgg(PlannerInfo *root, Path *originalPath);
static bool CanOptimizeAggPath(PlannerInfo *root, AggPath *apath);
/*
* TODO some optimizations are useless if others are already provided. This might cause
* excessive path creation causing performance problems. Depending on the amount of
* optimizations to be added we can keep a bitmask indicating for every entry to skip if
* the index of a preceding successful optimization is in the bitmap.
*/
bool EnableBroadcastJoin = true;
/* list of functions that will be called to optimized in the joinhook*/
static optimizeFn joinOptimizations[] = {
OptimizeJoinPath,
BroadcastOuterJoinPath,
BroadcastInnerJoinPath,
};
typedef struct DistributedUnionPath typedef struct DistributedUnionPath
{ {
@ -77,6 +46,41 @@ typedef struct DistributedUnionPath
Oid sampleRelid; Oid sampleRelid;
} DistributedUnionPath; } DistributedUnionPath;
static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans);
static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel);
static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue, Oid sampleRelid, List *subPaths);
static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses);
static List * ShardIntervalListToRelationShardList(List *shardIntervalList);
static List * OptimizeJoinPath(PlannerInfo *root, Path *originalPath);
static List * BroadcastOuterJoinPath(PlannerInfo *root, Path *originalPath);
static List * BroadcastInnerJoinPath(PlannerInfo *root, Path *originalPath);
static List * GeoOverlapJoin(PlannerInfo *root, Path *originalPath);
static Path * CreateReadIntermediateResultPath(const Path *originalPath);
static bool CanOptimizeJoinPath(const JoinPath *jpath);
static bool IsDistributedUnion(Path *path, bool recurseTransparent, DistributedUnionPath **out);
static Expr * ExtractPartitionValue(List *restrictionList, Var *partitionKey);
static List * ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue);
static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra);
static Path * OptimizeGroupAgg(PlannerInfo *root, Path *originalPath);
static bool CanOptimizeAggPath(PlannerInfo *root, AggPath *apath);
/*
* TODO some optimizations are useless if others are already provided. This might cause
* excessive path creation causing performance problems. Depending on the amount of
* optimizations to be added we can keep a bitmask indicating for every entry to skip if
* the index of a preceding successful optimization is in the bitmap.
*/
bool EnableBroadcastJoin = true;
/* list of functions that will be called to optimized in the joinhook*/
static optimizeFn joinOptimizations[] = {
OptimizeJoinPath,
// BroadcastOuterJoinPath,
// BroadcastInnerJoinPath,
GeoOverlapJoin,
};
const CustomPathMethods distributedUnionMethods = { const CustomPathMethods distributedUnionMethods = {
.CustomName = "Distributed Union", .CustomName = "Distributed Union",
.PlanCustomPath = CreateDistributedUnionPlan, .PlanCustomPath = CreateDistributedUnionPlan,
@ -258,17 +262,48 @@ ReparameterizeDistributedUnion(PlannerInfo *root,
/* /*
* IsDistributedUnion returns if the pathnode is a distributed union * IsDistributedUnion returns if the pathnode is a distributed union
*
* If recurseTransparent is set it will recurse into transparant nodes like Materialize
*
* If out is set to not NULL it will write the pointer to the union at the location
* specified
*/ */
static bool static bool
IsDistributedUnion(Path *path) IsDistributedUnion(Path *path, bool recurseTransparent, DistributedUnionPath **out)
{ {
if (recurseTransparent)
{
switch (nodeTag(path))
{
case T_MaterialPath:
{
MaterialPath *materialPath = castNode(MaterialPath, path);
return IsDistributedUnion(materialPath->subpath, recurseTransparent, out);
}
default:
{
break;
}
}
}
if (!IsA(path, CustomPath)) if (!IsA(path, CustomPath))
{ {
return false; return false;
} }
CustomPath *cpath = castNode(CustomPath, path); CustomPath *cpath = castNode(CustomPath, path);
return cpath->methods == &distributedUnionMethods; if (cpath->methods != &distributedUnionMethods) {
return false;
}
if (out != NULL)
{
*out = (DistributedUnionPath *) cpath;
}
return true;
} }
@ -359,8 +394,8 @@ ExtractPartitionValue(List *restrictionList, Var *partitionKey)
static bool static bool
CanOptimizeJoinPath(const JoinPath *jpath) CanOptimizeJoinPath(const JoinPath *jpath)
{ {
if (!(IsDistributedUnion(jpath->innerjoinpath) && if (!(IsDistributedUnion(jpath->innerjoinpath, false, NULL) &&
IsDistributedUnion(jpath->outerjoinpath))) IsDistributedUnion(jpath->outerjoinpath, false, NULL)))
{ {
/* can only optimize joins when both inner and outer are a distributed union */ /* can only optimize joins when both inner and outer are a distributed union */
return false; return false;
@ -384,9 +419,130 @@ CanOptimizeJoinPath(const JoinPath *jpath)
return true; return true;
} }
static NameData
GetFunctionNameData(Oid funcid)
{
HeapTuple proctup = NULL;
Form_pg_proc procform = NULL;
proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
if (!HeapTupleIsValid(proctup))
elog(ERROR, "cache lookup failed for function %u", funcid);
procform = (Form_pg_proc) GETSTRUCT(proctup);
/* copy name by value */
NameData result = procform->proname;
ReleaseSysCache(proctup);
return result;
}
static bool
IsSTExpandExpression(Expr *expr, float8 *distance)
{
if (!IsA(expr, FuncExpr))
{
return false;
}
FuncExpr *funcexpr = castNode(FuncExpr, expr);
NameData funcNameData = GetFunctionNameData(funcexpr->funcid);
if (strcmp(NameStr(funcNameData), "st_expand") != 0)
{
/* expected an expansion of the geometry */
return false;
}
if (list_length(funcexpr->args) != 2)
{
/* expected 2 arguments */
return false;
}
if (distance)
{
Const *distanceConst = lsecond_node(Const, funcexpr->args);
Assert(distanceConst->consttype == FLOAT8OID);
*distance = DatumGetFloat8(distanceConst->constvalue);
}
return true;
}
static bool
IsGeoOverlapJoin(Expr *expr)
{
/* find a geometry_overlaps expression */
if (!IsA(expr, OpExpr))
{
return false;
}
OpExpr *opexpr = castNode(OpExpr, expr);
if (!OidIsValid(opexpr->opfuncid))
{
return false;
}
/* check the name of the operation */
NameData opname = GetFunctionNameData(opexpr->opfuncid);
if (strcmp(NameStr(opname), "geometry_overlaps") != 0)
{
return false;
}
/* we expect exactly 2 arguments */
if (list_length(opexpr->args) != 2)
{
return false;
}
Expr *leftArg = linitial(opexpr->args);
Expr *rightArg = lsecond(opexpr->args);
if (IsA(rightArg, Var) && IsA(leftArg, FuncExpr))
{
/* swap the args around to work on them in expected fashion */
Expr *tmp = leftArg;
leftArg = rightArg;
rightArg = tmp;
}
float8 distance = 0;
if (IsA(leftArg, Var) && IsSTExpandExpression(rightArg, &distance))
{
return false;
}
return true;
}
static List * static List *
OptimizeJoinPath(Path *originalPath) GeoOverlapJoin(PlannerInfo *root, Path *originalPath)
{
JoinPath *joinPath = (JoinPath *) originalPath;
RestrictInfo *rinfo = NULL;
foreach_ptr(rinfo, joinPath->joinrestrictinfo)
{
if (IsGeoOverlapJoin(rinfo->clause))
{
}
}
return NIL;
}
static List *
OptimizeJoinPath(PlannerInfo *root, Path *originalPath)
{ {
switch (originalPath->pathtype) switch (originalPath->pathtype)
{ {
@ -435,7 +591,7 @@ OptimizeJoinPath(Path *originalPath)
static List * static List *
BroadcastOuterJoinPath(Path *originalPath) BroadcastOuterJoinPath(PlannerInfo *root, Path *originalPath)
{ {
if (!EnableBroadcastJoin) if (!EnableBroadcastJoin)
{ {
@ -450,7 +606,7 @@ BroadcastOuterJoinPath(Path *originalPath)
const JoinPath *jpath = (JoinPath *) originalPath; const JoinPath *jpath = (JoinPath *) originalPath;
List *newPaths = NIL; List *newPaths = NIL;
if (IsDistributedUnion(jpath->outerjoinpath)) if (IsDistributedUnion(jpath->outerjoinpath, false, NULL))
{ {
/* broadcast inner join path */ /* broadcast inner join path */
DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->outerjoinpath; DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->outerjoinpath;
@ -493,7 +649,7 @@ BroadcastOuterJoinPath(Path *originalPath)
static List * static List *
BroadcastInnerJoinPath(Path *originalPath) BroadcastInnerJoinPath(PlannerInfo *root, Path *originalPath)
{ {
if (!EnableBroadcastJoin) if (!EnableBroadcastJoin)
{ {
@ -508,7 +664,7 @@ BroadcastInnerJoinPath(Path *originalPath)
const JoinPath *jpath = (JoinPath *) originalPath; const JoinPath *jpath = (JoinPath *) originalPath;
List *newPaths = NIL; List *newPaths = NIL;
if (IsDistributedUnion(jpath->innerjoinpath)) if (IsDistributedUnion(jpath->innerjoinpath, false, NULL))
{ {
/* broadcast inner join path */ /* broadcast inner join path */
DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->innerjoinpath; DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->innerjoinpath;
@ -588,7 +744,7 @@ PathBasedPlannerJoinHook(PlannerInfo *root,
Path *originalPath = lfirst(pathCell); Path *originalPath = lfirst(pathCell);
for (int i=0; i < sizeof(joinOptimizations)/sizeof(joinOptimizations[1]); i++) for (int i=0; i < sizeof(joinOptimizations)/sizeof(joinOptimizations[1]); i++)
{ {
List *alternativePaths = joinOptimizations[i](originalPath); List *alternativePaths = joinOptimizations[i](root, originalPath);
newPaths = list_concat(newPaths, alternativePaths); newPaths = list_concat(newPaths, alternativePaths);
} }
} }
@ -974,7 +1130,7 @@ CanOptimizeAggPath(PlannerInfo *root, AggPath *apath)
return false; return false;
} }
if (!IsDistributedUnion(apath->subpath)) if (!IsDistributedUnion(apath->subpath, false, NULL))
{ {
/* /*
* we only can optimize if the path below is a distributed union that we can pull * we only can optimize if the path below is a distributed union that we can pull

View File

@ -24,3 +24,6 @@ BEGIN
END IF; END IF;
END; END;
$$; $$;
-- add a catchall shardcontainer - eg. geobbox
ALTER TABLE pg_catalog.pg_dist_shard ADD COLUMN shardcontainer text;

View File

@ -0,0 +1,71 @@
-- citus--9.5-1--10.0-1
DROP FUNCTION pg_catalog.upgrade_to_reference_table(regclass);
DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass);
-- add a catchall shardcontainer - eg. geobbox
ALTER TABLE pg_catalog.pg_dist_shard ADD COLUMN shardcontainer text;
#include "udfs/citus_total_relation_size/10.0-1.sql"
#include "udfs/citus_tables/10.0-1.sql"
#include "udfs/citus_finish_pg_upgrade/10.0-1.sql"
#include "udfs/alter_distributed_table/10.0-1.sql"
#include "udfs/alter_table_set_access_method/10.0-1.sql"
#include "udfs/undistribute_table/10.0-1.sql"
#include "udfs/create_citus_local_table/10.0-1.sql"
#include "udfs/citus_set_coordinator_host/10.0-1.sql"
#include "udfs/citus_add_node/10.0-1.sql"
#include "udfs/citus_activate_node/10.0-1.sql"
#include "udfs/citus_add_inactive_node/10.0-1.sql"
#include "udfs/citus_add_secondary_node/10.0-1.sql"
#include "udfs/citus_disable_node/10.0-1.sql"
#include "udfs/citus_drain_node/10.0-1.sql"
#include "udfs/citus_remove_node/10.0-1.sql"
#include "udfs/citus_set_node_property/10.0-1.sql"
#include "udfs/citus_unmark_object_distributed/10.0-1.sql"
#include "udfs/citus_update_node/10.0-1.sql"
#include "udfs/citus_update_shard_statistics/10.0-1.sql"
#include "udfs/citus_update_table_statistics/10.0-1.sql"
#include "udfs/citus_copy_shard_placement/10.0-1.sql"
#include "udfs/citus_move_shard_placement/10.0-1.sql"
#include "udfs/citus_drop_trigger/10.0-1.sql"
#include "udfs/worker_change_sequence_dependency/10.0-1.sql"
#include "../../columnar/sql/columnar--9.5-1--10.0-1.sql"
#include "udfs/time_partition_range/10.0-1.sql"
#include "udfs/time_partitions/10.0-1.sql"
#include "udfs/alter_old_partitions_set_access_method/10.0-1.sql"
ALTER FUNCTION pg_catalog.master_conninfo_cache_invalidate()
RENAME TO citus_conninfo_cache_invalidate;
ALTER FUNCTION pg_catalog.master_dist_local_group_cache_invalidate()
RENAME TO citus_dist_local_group_cache_invalidate;
ALTER FUNCTION pg_catalog.master_dist_node_cache_invalidate()
RENAME TO citus_dist_node_cache_invalidate;
ALTER FUNCTION pg_catalog.master_dist_object_cache_invalidate()
RENAME TO citus_dist_object_cache_invalidate;
ALTER FUNCTION pg_catalog.master_dist_partition_cache_invalidate()
RENAME TO citus_dist_partition_cache_invalidate;
ALTER FUNCTION pg_catalog.master_dist_placement_cache_invalidate()
RENAME TO citus_dist_placement_cache_invalidate;
ALTER FUNCTION pg_catalog.master_dist_shard_cache_invalidate()
RENAME TO citus_dist_shard_cache_invalidate;
#include "udfs/citus_conninfo_cache_invalidate/10.0-1.sql"
#include "udfs/citus_dist_local_group_cache_invalidate/10.0-1.sql"
#include "udfs/citus_dist_node_cache_invalidate/10.0-1.sql"
#include "udfs/citus_dist_object_cache_invalidate/10.0-1.sql"
#include "udfs/citus_dist_partition_cache_invalidate/10.0-1.sql"
#include "udfs/citus_dist_placement_cache_invalidate/10.0-1.sql"
#include "udfs/citus_dist_shard_cache_invalidate/10.0-1.sql"
ALTER FUNCTION pg_catalog.master_drop_all_shards(regclass, text, text)
RENAME TO citus_drop_all_shards;
DROP FUNCTION pg_catalog.master_modify_multiple_shards(text);
DROP FUNCTION pg_catalog.master_create_distributed_table(regclass, text, citus.distribution_type);
DROP FUNCTION pg_catalog.master_create_worker_shards(text, integer, integer);
#include "udfs/citus_shard_sizes/10.0-1.sql"
#include "udfs/citus_shards/10.0-1.sql"

View File

@ -37,6 +37,9 @@
/* Define to 1 if you have the `curl' library (-lcurl). */ /* Define to 1 if you have the `curl' library (-lcurl). */
#undef HAVE_LIBCURL #undef HAVE_LIBCURL
/* Define to 1 if you have the `lwgeom' library (-llwgeom). */
#undef HAVE_LIBLWGEOM
/* Define to 1 if you have the `lz4' library (-llz4). */ /* Define to 1 if you have the `lz4' library (-llz4). */
#undef HAVE_LIBLZ4 #undef HAVE_LIBLZ4

View File

@ -56,6 +56,12 @@ typedef struct ShardInterval
Datum maxValue; /* a shard's typed max value datum */ Datum maxValue; /* a shard's typed max value datum */
uint64 shardId; uint64 shardId;
int shardIndex; int shardIndex;
/* stuff for the shard container thingy, mostly geometry now */
bool containerByVal;
int16 containerTypeLen;
Oid containerTypeId;
Datum containerValue;
} ShardInterval; } ShardInterval;

View File

@ -57,6 +57,7 @@ typedef FormData_pg_dist_partition *Form_pg_dist_partition;
#define DISTRIBUTE_BY_NONE 'n' #define DISTRIBUTE_BY_NONE 'n'
#define REDISTRIBUTE_BY_HASH 'x' #define REDISTRIBUTE_BY_HASH 'x'
#define DISTRIBUTE_BY_INVALID '\0' #define DISTRIBUTE_BY_INVALID '\0'
#define REDISTRIBUTE_BY_GEO 'g'
/* /*
* Valid values for repmodel are 'c' for coordinator, 's' for streaming * Valid values for repmodel are 'c' for coordinator, 's' for streaming

View File

@ -29,6 +29,7 @@ typedef struct FormData_pg_dist_shard
text shardalias_DROPPED; /* dropped column, not in use */ text shardalias_DROPPED; /* dropped column, not in use */
text shardminvalue; /* partition key's minimum value in shard */ text shardminvalue; /* partition key's minimum value in shard */
text shardmaxvalue; /* partition key's maximum value in shard */ text shardmaxvalue; /* partition key's maximum value in shard */
text shardcontainer;
#endif #endif
} FormData_pg_dist_shard; } FormData_pg_dist_shard;
@ -43,13 +44,14 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard;
* compiler constants for pg_dist_shards * compiler constants for pg_dist_shards
* ---------------- * ----------------
*/ */
#define Natts_pg_dist_shard 6 #define Natts_pg_dist_shard 7
#define Anum_pg_dist_shard_logicalrelid 1 #define Anum_pg_dist_shard_logicalrelid 1
#define Anum_pg_dist_shard_shardid 2 #define Anum_pg_dist_shard_shardid 2
#define Anum_pg_dist_shard_shardstorage 3 #define Anum_pg_dist_shard_shardstorage 3
#define Anum_pg_dist_shard_shardalias_DROPPED 4 #define Anum_pg_dist_shard_shardalias_DROPPED 4
#define Anum_pg_dist_shard_shardminvalue 5 #define Anum_pg_dist_shard_shardminvalue 5
#define Anum_pg_dist_shard_shardmaxvalue 6 #define Anum_pg_dist_shard_shardmaxvalue 6
#define Anum_pg_dist_shard_shardcontainer 7
/* /*
* Valid values for shard storage types include foreign table, (standard) table * Valid values for shard storage types include foreign table, (standard) table