Recursively plan local table joins

The logical planner cannot handle joins between local and distributed table.
Instead, we can recursively plan one side of the join and let the logical
planner handle the rest.

Our algorithm is a little smart, trying not to recursively plan distributed
tables, but favors local tables.
pull/4358/head
Onder Kalaci 2020-07-09 12:10:43 +02:00 committed by Sait Talha Nisanci
parent 7cc25c9125
commit 8f8390ed6e
9 changed files with 324 additions and 17 deletions

View File

@ -291,7 +291,6 @@ static SortGroupClause * CreateSortGroupClause(Var *column);
/* Local functions forward declarations for count(distinct) approximations */
static const char * CountDistinctHashFunctionName(Oid argumentType);
static int CountDistinctStorageSize(double approximationErrorRate);
static Const * MakeIntegerConst(int32 integerValue);
static Const * MakeIntegerConstInt64(int64 integerValue);
/* Local functions forward declarations for aggregate expression checks */
@ -3790,7 +3789,7 @@ CountDistinctStorageSize(double approximationErrorRate)
/* Makes an integer constant node from the given value, and returns that node. */
static Const *
Const *
MakeIntegerConst(int32 integerValue)
{
const int typeCollationId = get_typcollation(INT4OID);

View File

@ -33,10 +33,10 @@
#include "parser/parse_relation.h"
#include "optimizer/planner.h"
#include "optimizer/prep.h"
#include "utils/rel.h"
static RangeTblEntry * AnchorRte(Query *subquery);
static Query * WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation);
static List * UnionRelationRestrictionLists(List *firstRelationList,
List *secondRelationList);
@ -252,7 +252,7 @@ SubqueryColocated(Query *subquery, ColocatedJoinChecker *checker)
* projections. The returned query should be used cautiosly and it is mostly
* designed for generating a stub query.
*/
static Query *
Query *
WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation)
{
Query *subquery = makeNode(Query);
@ -269,15 +269,26 @@ WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation)
newRangeTableRef->rtindex = 1;
subquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
/* Need the whole row as a junk var */
Var *targetColumn = makeWholeRowVar(newRangeTableEntry, newRangeTableRef->rtindex, 0,
false);
/* create a dummy target entry */
TargetEntry *targetEntry = makeTargetEntry((Expr *) targetColumn, 1, "wholerow",
true);
Relation relation = relation_open(rteRelation->relid, AccessShareLock);
int numberOfAttributes = RelationGetNumberOfAttributes(relation);
subquery->targetList = lappend(subquery->targetList, targetEntry);
int attributeNumber = 1;
for (; attributeNumber <= numberOfAttributes; attributeNumber++)
{
Form_pg_attribute attributeTuple =
TupleDescAttr(relation->rd_att, attributeNumber - 1);
Var *targetColumn =
makeVar(newRangeTableRef->rtindex, attributeNumber, attributeTuple->atttypid,
attributeTuple->atttypmod, attributeTuple->attcollation, 0);
TargetEntry *targetEntry =
makeTargetEntry((Expr *) targetColumn, attributeNumber,
strdup(attributeTuple->attname.data), false);
subquery->targetList = lappend(subquery->targetList, targetEntry);
}
relation_close(relation, NoLock);
return subquery;
}

View File

@ -63,6 +63,7 @@
#include "distributed/log_utils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
@ -73,12 +74,14 @@
#include "distributed/log_utils.h"
#include "distributed/version_compat.h"
#include "lib/stringinfo.h"
#include "optimizer/clauses.h"
#include "optimizer/planner.h"
#include "optimizer/prep.h"
#include "parser/parsetree.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/nodes.h"
#include "nodes/nodeFuncs.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#if PG_VERSION_NUM >= PG_VERSION_12
@ -88,6 +91,13 @@
#endif
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
/*
* Managed via a GUC
*/
int LocalTableJoinPolicy = LOCAL_JOIN_POLICY_AUTO;
/* track depth of current recursive planner query */
@ -175,6 +185,16 @@ static bool ContainsReferencesToOuterQuery(Query *query);
static bool ContainsReferencesToOuterQueryWalker(Node *node,
VarLevelsUpWalkerContext *context);
static bool NodeContainsSubqueryReferencingOuterQuery(Node *node);
static void ConvertLocalTableJoinsToSubqueries(Query *query,
PlannerRestrictionContext *
plannerRestrictionContext);
static RangeTblEntry * MostFilteredRte(PlannerRestrictionContext *
plannerRestrictionContext,
List *rangeTableList, List **restrictionList,
bool localTable);
static void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
List *restrictionList);
static bool ContainsLocalTableDistributedTableJoin(List *rangeTableList);
static void WrapFunctionsInSubqueries(Query *query);
static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry);
static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry);
@ -287,6 +307,12 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context
/* make sure function calls in joins are executed in the coordinator */
WrapFunctionsInSubqueries(query);
/*
* Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", so we
* recursively plan one side of the join so that the logical planner can plan.
*/
ConvertLocalTableJoinsToSubqueries(query, context->plannerRestrictionContext);
/* descend into subqueries */
query_tree_walker(query, RecursivelyPlanSubqueryWalker, context, 0);
@ -1125,6 +1151,7 @@ RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningConte
debugQuery = copyObject(subquery);
}
/*
* Create the subplan and append it to the list in the planning context.
*/
@ -1337,6 +1364,233 @@ NodeContainsSubqueryReferencingOuterQuery(Node *node)
}
/*
* ConvertLocalTableJoinsToSubqueries gets a query and the planner
* restrictions. As long as there is a join between a local table
* and distributed table, the function wraps one table in a
* subquery (by also pushing the filters on the table down
* to the subquery).
*
* Once this function returns, there are no direct joins between
* local and distributed tables.
*/
static void
ConvertLocalTableJoinsToSubqueries(Query *query,
PlannerRestrictionContext *plannerRestrictionContext)
{
List *rangeTableList = query->rtable;
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER)
{
/* user doesn't want Citus to enable local table joins */
return;
}
if (!ContainsLocalTableDistributedTableJoin(rangeTableList))
{
/* nothing to do as there are no relevant joins */
return;
}
{
/* TODO: if all tables are local, skip */
}
while (ContainsLocalTableDistributedTableJoin(rangeTableList))
{
List *localTableRestrictList = NIL;
List *distributedTableRestrictList = NIL;
bool localTable = true;
RangeTblEntry *mostFilteredLocalRte =
MostFilteredRte(plannerRestrictionContext, rangeTableList,
&localTableRestrictList, localTable);
RangeTblEntry *mostFilteredDistributedRte =
MostFilteredRte(plannerRestrictionContext, rangeTableList,
&distributedTableRestrictList, !localTable);
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PULL_LOCAL)
{
ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte,
localTableRestrictList);
}
else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PULL_DISTRIBUTED)
{
ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte,
distributedTableRestrictList);
}
else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_AUTO)
{
bool localTableHasFilter = list_length(localTableRestrictList) > 0;
bool distributedTableHasFilter =
list_length(distributedTableRestrictList) > 0;
/* TODO: for modifications, either skip or do not plan target table */
/*
* First, favor recursively planning local table when it has a filter.
* The rationale is that local tables are small, and at least one filter
* they become even smaller. On each iteration, we pick the local table
* with the most filters (e.g., WHERE clause entries). Note that the filters
* don't need to be directly on the table in the query tree, instead we use
* Postgres' filters where filters can be pushed down tables via filters.
*
* Second, if a distributed table doesn't have a filter, we do not ever
* prefer recursively planning that. Instead, we recursively plan the
* local table, assuming that it is smaller.
*
* TODO: If we have better statistics on how many tuples each table returns
* considering the filters on them, we should pick the table with least
* tuples. Today, we do not have such an infrastructure.
*/
if (localTableHasFilter || !distributedTableHasFilter)
{
ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte,
localTableRestrictList);
}
else
{
ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte,
distributedTableRestrictList);
}
}
else
{
elog(ERROR, "unexpected local table join policy: %d", LocalTableJoinPolicy);
}
}
}
/*
* MostFilteredRte returns a range table entry which has the most filters
* on it along with the restrictions (e.g., fills **restrictionList).
*
* The function also gets a boolean localTable parameter, so the caller
* can choose to run the function for only local tables or distributed tables.
*/
static RangeTblEntry *
MostFilteredRte(PlannerRestrictionContext *plannerRestrictionContext,
List *rangeTableList, List **restrictionList,
bool localTable)
{
RangeTblEntry *mostFilteredLocalRte = NULL;
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/* we're only interested in tables */
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_RELATION))
{
continue;
}
if (IsCitusTable(rangeTableEntry->relid) && localTable)
{
continue;
}
List *currentRestrictionList =
GetRestrictInfoListForRelation(rangeTableEntry,
plannerRestrictionContext, 1);
if (mostFilteredLocalRte == NULL ||
list_length(*restrictionList) < list_length(currentRestrictionList))
{
mostFilteredLocalRte = rangeTableEntry;
*restrictionList = currentRestrictionList;
}
}
return mostFilteredLocalRte;
}
/*
* ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry
* with a subquery. The function also pushes down the filters to the subquery.
*/
static void
ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList)
{
Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry);
Expr *andedBoundExpressions = make_ands_explicit(restrictionList);
subquery->jointree->quals = (Node *) andedBoundExpressions;
/* force recursively planning of the newly created subquery */
subquery->limitOffset = (Node *) MakeIntegerConst(0);
/* replace the function with the constructed subquery */
rangeTableEntry->rtekind = RTE_SUBQUERY;
rangeTableEntry->subquery = subquery;
/*
* If the relation is inherited, it'll still be inherited as
* we've copied it earlier. This is to prevent the newly created
* subquery being treated as inherited.
*/
rangeTableEntry->inh = false;
if (IsLoggableLevel(DEBUG1))
{
StringInfo subqueryString = makeStringInfo();
pg_get_query_def(subquery, subqueryString);
ereport(DEBUG1, (errmsg("Wrapping local relation \"%s\" to a subquery: %s ",
get_rel_name(rangeTableEntry->relid),
ApplyLogRedaction(subqueryString->data))));
}
}
/*
* ContainsLocalTableDistributedTableJoin returns true if the input range table list
* contains a direct join between local and distributed tables.
*/
static bool
ContainsLocalTableDistributedTableJoin(List *rangeTableList)
{
bool containsLocalTable = false;
bool containsDistributedTable = false;
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/* we're only interested in tables */
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_RELATION))
{
continue;
}
/* TODO: do NOT forget Citus local tables */
if (IsCitusTable(rangeTableEntry->relid))
{
containsDistributedTable = true;
}
else
{
containsLocalTable = true;
}
if (containsLocalTable && containsDistributedTable)
{
return true;
}
}
return false;
}
/*
* WrapFunctionsInSubqueries iterates over all the immediate Range Table Entries
* of a query and wraps the functions inside (SELECT * FROM fnc() f)

View File

@ -488,9 +488,13 @@ FindUnionAllVar(PlannerInfo *root, List *appendRelList, Oid relationOid,
bool
RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *restrictionContext)
{
/* there is a single distributed relation, no need to continue */
if (!ContainsMultipleDistributedRelations(restrictionContext))
if (ContextContainsLocalRelation(restrictionContext->relationRestrictionContext))
{
return false;
}
else if (!ContainsMultipleDistributedRelations(restrictionContext))
{
/* there is a single distributed relation, no need to continue */
return true;
}
@ -1845,7 +1849,8 @@ FilterPlannerRestrictionForQuery(PlannerRestrictionContext *plannerRestrictionCo
*/
List *
GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry,
PlannerRestrictionContext *plannerRestrictionContext)
PlannerRestrictionContext *plannerRestrictionContext,
int rteIndex)
{
int rteIdentity = GetRTEIdentity(rangeTblEntry);
RelationRestrictionContext *relationRestrictionContext =
@ -1867,6 +1872,7 @@ GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry,
RelOptInfo *relOptInfo = relationRestriction->relOptInfo;
List *baseRestrictInfo = relOptInfo->baserestrictinfo;
List *restrictExprList = NIL;
ListCell *restrictCell = NULL;
foreach(restrictCell, baseRestrictInfo)
@ -1906,8 +1912,8 @@ GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry,
{
Var *column = (Var *) lfirst(varClauseCell);
column->varno = 1;
column->varnoold = 1;
column->varno = rteIndex;
column->varnoold = rteIndex;
}
restrictExprList = lappend(restrictExprList, copyOfRestrictClause);

View File

@ -55,6 +55,7 @@
#include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/placement_connection.h"
#include "distributed/recursive_planning.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/run_from_same_connection.h"
@ -196,6 +197,16 @@ static const struct config_enum_entry log_level_options[] = {
{ NULL, 0, false}
};
static const struct config_enum_entry local_table_join_policies[] = {
{ "never", LOCAL_JOIN_POLICY_NEVER, false},
{ "pull-local", LOCAL_JOIN_POLICY_PULL_LOCAL, false},
{ "pull-distributed", LOCAL_JOIN_POLICY_PULL_DISTRIBUTED, false},
{ "auto", LOCAL_JOIN_POLICY_AUTO, false},
{ NULL, 0, false}
};
static const struct config_enum_entry multi_shard_modify_connection_options[] = {
{ "parallel", PARALLEL_CONNECTION, false },
{ "sequential", SEQUENTIAL_CONNECTION, false },
@ -708,6 +719,18 @@ RegisterCitusConfigVariables(void)
PGC_SIGHUP,
GUC_SUPERUSER_ONLY,
NULL, NULL, LocalPoolSizeGucShowHook);
DefineCustomEnumVariable(
"citus.local_table_join_policy",
gettext_noop("defines the behaviour when a distributed table "
"is joined with a local table"),
gettext_noop("TODO: fill"),
&LocalTableJoinPolicy,
LOCAL_JOIN_POLICY_AUTO,
local_table_join_policies,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.log_multi_join_order",

View File

@ -177,5 +177,6 @@ extern void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryL
extern char * WorkerColumnName(AttrNumber resno);
extern bool IsGroupBySubsetOfDistinct(List *groupClauses, List *distinctClauses);
extern bool TargetListHasAggregates(List *targetEntryList);
extern Const * MakeIntegerConst(int32 integerValue);
#endif /* MULTI_LOGICAL_OPTIMIZER_H */

View File

@ -34,6 +34,7 @@ extern ColocatedJoinChecker CreateColocatedJoinChecker(Query *subquery,
PlannerRestrictionContext *
restrictionContext);
extern bool SubqueryColocated(Query *subquery, ColocatedJoinChecker *context);
extern Query * WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation);
#endif /* QUERY_COLOCATION_CHECKER_H */

View File

@ -22,6 +22,18 @@
#include "nodes/relation.h"
#endif
/* managed via guc.c */
typedef enum
{
LOCAL_JOIN_POLICY_NEVER = 0,
LOCAL_JOIN_POLICY_PULL_LOCAL = 1,
LOCAL_JOIN_POLICY_PULL_DISTRIBUTED = 2,
LOCAL_JOIN_POLICY_AUTO = 3,
} LocalJoinPolicy;
extern int LocalTableJoinPolicy;
extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);

View File

@ -38,7 +38,7 @@ extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery(
Query *query);
extern List * GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry,
PlannerRestrictionContext *
plannerRestrictionContext);
plannerRestrictionContext, int rteIndex);
extern JoinRestrictionContext * RemoveDuplicateJoinRestrictions(JoinRestrictionContext *
joinRestrictionContext);