diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 6e36c0f9f..27509e8e1 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -291,7 +291,6 @@ static SortGroupClause * CreateSortGroupClause(Var *column); /* Local functions forward declarations for count(distinct) approximations */ static const char * CountDistinctHashFunctionName(Oid argumentType); static int CountDistinctStorageSize(double approximationErrorRate); -static Const * MakeIntegerConst(int32 integerValue); static Const * MakeIntegerConstInt64(int64 integerValue); /* Local functions forward declarations for aggregate expression checks */ @@ -3790,7 +3789,7 @@ CountDistinctStorageSize(double approximationErrorRate) /* Makes an integer constant node from the given value, and returns that node. */ -static Const * +Const * MakeIntegerConst(int32 integerValue) { const int typeCollationId = get_typcollation(INT4OID); diff --git a/src/backend/distributed/planner/query_colocation_checker.c b/src/backend/distributed/planner/query_colocation_checker.c index 0a84ea49e..5016cf34e 100644 --- a/src/backend/distributed/planner/query_colocation_checker.c +++ b/src/backend/distributed/planner/query_colocation_checker.c @@ -33,10 +33,10 @@ #include "parser/parse_relation.h" #include "optimizer/planner.h" #include "optimizer/prep.h" +#include "utils/rel.h" static RangeTblEntry * AnchorRte(Query *subquery); -static Query * WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation); static List * UnionRelationRestrictionLists(List *firstRelationList, List *secondRelationList); @@ -252,7 +252,7 @@ SubqueryColocated(Query *subquery, ColocatedJoinChecker *checker) * projections. The returned query should be used cautiosly and it is mostly * designed for generating a stub query. */ -static Query * +Query * WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation) { Query *subquery = makeNode(Query); @@ -269,15 +269,26 @@ WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation) newRangeTableRef->rtindex = 1; subquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL); - /* Need the whole row as a junk var */ - Var *targetColumn = makeWholeRowVar(newRangeTableEntry, newRangeTableRef->rtindex, 0, - false); - /* create a dummy target entry */ - TargetEntry *targetEntry = makeTargetEntry((Expr *) targetColumn, 1, "wholerow", - true); + Relation relation = relation_open(rteRelation->relid, AccessShareLock); + int numberOfAttributes = RelationGetNumberOfAttributes(relation); - subquery->targetList = lappend(subquery->targetList, targetEntry); + int attributeNumber = 1; + for (; attributeNumber <= numberOfAttributes; attributeNumber++) + { + Form_pg_attribute attributeTuple = + TupleDescAttr(relation->rd_att, attributeNumber - 1); + Var *targetColumn = + makeVar(newRangeTableRef->rtindex, attributeNumber, attributeTuple->atttypid, + attributeTuple->atttypmod, attributeTuple->attcollation, 0); + TargetEntry *targetEntry = + makeTargetEntry((Expr *) targetColumn, attributeNumber, + strdup(attributeTuple->attname.data), false); + + subquery->targetList = lappend(subquery->targetList, targetEntry); + } + + relation_close(relation, NoLock); return subquery; } diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index c6dce836e..e68052a5f 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -63,6 +63,7 @@ #include "distributed/log_utils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" +#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" @@ -73,12 +74,14 @@ #include "distributed/log_utils.h" #include "distributed/version_compat.h" #include "lib/stringinfo.h" +#include "optimizer/clauses.h" #include "optimizer/planner.h" #include "optimizer/prep.h" #include "parser/parsetree.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "nodes/nodes.h" +#include "nodes/nodeFuncs.h" #include "nodes/pg_list.h" #include "nodes/primnodes.h" #if PG_VERSION_NUM >= PG_VERSION_12 @@ -88,6 +91,13 @@ #endif #include "utils/builtins.h" #include "utils/guc.h" +#include "utils/lsyscache.h" + + +/* + * Managed via a GUC + */ +int LocalTableJoinPolicy = LOCAL_JOIN_POLICY_AUTO; /* track depth of current recursive planner query */ @@ -175,6 +185,16 @@ static bool ContainsReferencesToOuterQuery(Query *query); static bool ContainsReferencesToOuterQueryWalker(Node *node, VarLevelsUpWalkerContext *context); static bool NodeContainsSubqueryReferencingOuterQuery(Node *node); +static void ConvertLocalTableJoinsToSubqueries(Query *query, + PlannerRestrictionContext * + plannerRestrictionContext); +static RangeTblEntry * MostFilteredRte(PlannerRestrictionContext * + plannerRestrictionContext, + List *rangeTableList, List **restrictionList, + bool localTable); +static void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, + List *restrictionList); +static bool ContainsLocalTableDistributedTableJoin(List *rangeTableList); static void WrapFunctionsInSubqueries(Query *query); static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry); static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry); @@ -287,6 +307,12 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context /* make sure function calls in joins are executed in the coordinator */ WrapFunctionsInSubqueries(query); + /* + * Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", so we + * recursively plan one side of the join so that the logical planner can plan. + */ + ConvertLocalTableJoinsToSubqueries(query, context->plannerRestrictionContext); + /* descend into subqueries */ query_tree_walker(query, RecursivelyPlanSubqueryWalker, context, 0); @@ -1125,6 +1151,7 @@ RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningConte debugQuery = copyObject(subquery); } + /* * Create the subplan and append it to the list in the planning context. */ @@ -1337,6 +1364,233 @@ NodeContainsSubqueryReferencingOuterQuery(Node *node) } +/* + * ConvertLocalTableJoinsToSubqueries gets a query and the planner + * restrictions. As long as there is a join between a local table + * and distributed table, the function wraps one table in a + * subquery (by also pushing the filters on the table down + * to the subquery). + * + * Once this function returns, there are no direct joins between + * local and distributed tables. + */ +static void +ConvertLocalTableJoinsToSubqueries(Query *query, + PlannerRestrictionContext *plannerRestrictionContext) +{ + List *rangeTableList = query->rtable; + + if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER) + { + /* user doesn't want Citus to enable local table joins */ + return; + } + + if (!ContainsLocalTableDistributedTableJoin(rangeTableList)) + { + /* nothing to do as there are no relevant joins */ + return; + } + + { + /* TODO: if all tables are local, skip */ + } + + while (ContainsLocalTableDistributedTableJoin(rangeTableList)) + { + List *localTableRestrictList = NIL; + List *distributedTableRestrictList = NIL; + + bool localTable = true; + + RangeTblEntry *mostFilteredLocalRte = + MostFilteredRte(plannerRestrictionContext, rangeTableList, + &localTableRestrictList, localTable); + RangeTblEntry *mostFilteredDistributedRte = + MostFilteredRte(plannerRestrictionContext, rangeTableList, + &distributedTableRestrictList, !localTable); + + if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PULL_LOCAL) + { + ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte, + localTableRestrictList); + } + else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PULL_DISTRIBUTED) + { + ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte, + distributedTableRestrictList); + } + else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_AUTO) + { + bool localTableHasFilter = list_length(localTableRestrictList) > 0; + bool distributedTableHasFilter = + list_length(distributedTableRestrictList) > 0; + + /* TODO: for modifications, either skip or do not plan target table */ + + /* + * First, favor recursively planning local table when it has a filter. + * The rationale is that local tables are small, and at least one filter + * they become even smaller. On each iteration, we pick the local table + * with the most filters (e.g., WHERE clause entries). Note that the filters + * don't need to be directly on the table in the query tree, instead we use + * Postgres' filters where filters can be pushed down tables via filters. + * + * Second, if a distributed table doesn't have a filter, we do not ever + * prefer recursively planning that. Instead, we recursively plan the + * local table, assuming that it is smaller. + * + * TODO: If we have better statistics on how many tuples each table returns + * considering the filters on them, we should pick the table with least + * tuples. Today, we do not have such an infrastructure. + */ + if (localTableHasFilter || !distributedTableHasFilter) + { + ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte, + localTableRestrictList); + } + else + { + ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte, + distributedTableRestrictList); + } + } + else + { + elog(ERROR, "unexpected local table join policy: %d", LocalTableJoinPolicy); + } + } +} + + +/* + * MostFilteredRte returns a range table entry which has the most filters + * on it along with the restrictions (e.g., fills **restrictionList). + * + * The function also gets a boolean localTable parameter, so the caller + * can choose to run the function for only local tables or distributed tables. + */ +static RangeTblEntry * +MostFilteredRte(PlannerRestrictionContext *plannerRestrictionContext, + List *rangeTableList, List **restrictionList, + bool localTable) +{ + RangeTblEntry *mostFilteredLocalRte = NULL; + + ListCell *rangeTableCell = NULL; + + foreach(rangeTableCell, rangeTableList) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + + /* we're only interested in tables */ + if (!(rangeTableEntry->rtekind == RTE_RELATION && + rangeTableEntry->relkind == RELKIND_RELATION)) + { + continue; + } + + if (IsCitusTable(rangeTableEntry->relid) && localTable) + { + continue; + } + + List *currentRestrictionList = + GetRestrictInfoListForRelation(rangeTableEntry, + plannerRestrictionContext, 1); + + if (mostFilteredLocalRte == NULL || + list_length(*restrictionList) < list_length(currentRestrictionList)) + { + mostFilteredLocalRte = rangeTableEntry; + *restrictionList = currentRestrictionList; + } + } + + return mostFilteredLocalRte; +} + + +/* + * ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry + * with a subquery. The function also pushes down the filters to the subquery. + */ +static void +ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList) +{ + Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry); + Expr *andedBoundExpressions = make_ands_explicit(restrictionList); + subquery->jointree->quals = (Node *) andedBoundExpressions; + + /* force recursively planning of the newly created subquery */ + subquery->limitOffset = (Node *) MakeIntegerConst(0); + + /* replace the function with the constructed subquery */ + rangeTableEntry->rtekind = RTE_SUBQUERY; + rangeTableEntry->subquery = subquery; + + /* + * If the relation is inherited, it'll still be inherited as + * we've copied it earlier. This is to prevent the newly created + * subquery being treated as inherited. + */ + rangeTableEntry->inh = false; + + if (IsLoggableLevel(DEBUG1)) + { + StringInfo subqueryString = makeStringInfo(); + + pg_get_query_def(subquery, subqueryString); + + ereport(DEBUG1, (errmsg("Wrapping local relation \"%s\" to a subquery: %s ", + get_rel_name(rangeTableEntry->relid), + ApplyLogRedaction(subqueryString->data)))); + } +} + + +/* + * ContainsLocalTableDistributedTableJoin returns true if the input range table list + * contains a direct join between local and distributed tables. + */ +static bool +ContainsLocalTableDistributedTableJoin(List *rangeTableList) +{ + bool containsLocalTable = false; + bool containsDistributedTable = false; + + ListCell *rangeTableCell = NULL; + foreach(rangeTableCell, rangeTableList) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + + /* we're only interested in tables */ + if (!(rangeTableEntry->rtekind == RTE_RELATION && + rangeTableEntry->relkind == RELKIND_RELATION)) + { + continue; + } + + /* TODO: do NOT forget Citus local tables */ + if (IsCitusTable(rangeTableEntry->relid)) + { + containsDistributedTable = true; + } + else + { + containsLocalTable = true; + } + + if (containsLocalTable && containsDistributedTable) + { + return true; + } + } + + return false; +} + + /* * WrapFunctionsInSubqueries iterates over all the immediate Range Table Entries * of a query and wraps the functions inside (SELECT * FROM fnc() f) diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index e9d30ade4..e1c480336 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -488,9 +488,13 @@ FindUnionAllVar(PlannerInfo *root, List *appendRelList, Oid relationOid, bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *restrictionContext) { - /* there is a single distributed relation, no need to continue */ - if (!ContainsMultipleDistributedRelations(restrictionContext)) + if (ContextContainsLocalRelation(restrictionContext->relationRestrictionContext)) { + return false; + } + else if (!ContainsMultipleDistributedRelations(restrictionContext)) + { + /* there is a single distributed relation, no need to continue */ return true; } @@ -1845,7 +1849,8 @@ FilterPlannerRestrictionForQuery(PlannerRestrictionContext *plannerRestrictionCo */ List * GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, - PlannerRestrictionContext *plannerRestrictionContext) + PlannerRestrictionContext *plannerRestrictionContext, + int rteIndex) { int rteIdentity = GetRTEIdentity(rangeTblEntry); RelationRestrictionContext *relationRestrictionContext = @@ -1867,6 +1872,7 @@ GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, RelOptInfo *relOptInfo = relationRestriction->relOptInfo; List *baseRestrictInfo = relOptInfo->baserestrictinfo; + List *restrictExprList = NIL; ListCell *restrictCell = NULL; foreach(restrictCell, baseRestrictInfo) @@ -1906,8 +1912,8 @@ GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, { Var *column = (Var *) lfirst(varClauseCell); - column->varno = 1; - column->varnoold = 1; + column->varno = rteIndex; + column->varnoold = rteIndex; } restrictExprList = lappend(restrictExprList, copyOfRestrictClause); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 6c4de8fe5..fbf9c14ab 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -55,6 +55,7 @@ #include "distributed/multi_server_executor.h" #include "distributed/pg_dist_partition.h" #include "distributed/placement_connection.h" +#include "distributed/recursive_planning.h" #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/run_from_same_connection.h" @@ -196,6 +197,16 @@ static const struct config_enum_entry log_level_options[] = { { NULL, 0, false} }; + +static const struct config_enum_entry local_table_join_policies[] = { + { "never", LOCAL_JOIN_POLICY_NEVER, false}, + { "pull-local", LOCAL_JOIN_POLICY_PULL_LOCAL, false}, + { "pull-distributed", LOCAL_JOIN_POLICY_PULL_DISTRIBUTED, false}, + { "auto", LOCAL_JOIN_POLICY_AUTO, false}, + { NULL, 0, false} +}; + + static const struct config_enum_entry multi_shard_modify_connection_options[] = { { "parallel", PARALLEL_CONNECTION, false }, { "sequential", SEQUENTIAL_CONNECTION, false }, @@ -708,6 +719,18 @@ RegisterCitusConfigVariables(void) PGC_SIGHUP, GUC_SUPERUSER_ONLY, NULL, NULL, LocalPoolSizeGucShowHook); + + DefineCustomEnumVariable( + "citus.local_table_join_policy", + gettext_noop("defines the behaviour when a distributed table " + "is joined with a local table"), + gettext_noop("TODO: fill"), + &LocalTableJoinPolicy, + LOCAL_JOIN_POLICY_AUTO, + local_table_join_policies, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); DefineCustomBoolVariable( "citus.log_multi_join_order", diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index 9e6167959..ddfaae315 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -177,5 +177,6 @@ extern void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryL extern char * WorkerColumnName(AttrNumber resno); extern bool IsGroupBySubsetOfDistinct(List *groupClauses, List *distinctClauses); extern bool TargetListHasAggregates(List *targetEntryList); +extern Const * MakeIntegerConst(int32 integerValue); #endif /* MULTI_LOGICAL_OPTIMIZER_H */ diff --git a/src/include/distributed/query_colocation_checker.h b/src/include/distributed/query_colocation_checker.h index 0c8c7292b..2a27fa9f1 100644 --- a/src/include/distributed/query_colocation_checker.h +++ b/src/include/distributed/query_colocation_checker.h @@ -34,6 +34,7 @@ extern ColocatedJoinChecker CreateColocatedJoinChecker(Query *subquery, PlannerRestrictionContext * restrictionContext); extern bool SubqueryColocated(Query *subquery, ColocatedJoinChecker *context); +extern Query * WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation); #endif /* QUERY_COLOCATION_CHECKER_H */ diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index e1017bd70..1da704dff 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -22,6 +22,18 @@ #include "nodes/relation.h" #endif +/* managed via guc.c */ +typedef enum +{ + LOCAL_JOIN_POLICY_NEVER = 0, + LOCAL_JOIN_POLICY_PULL_LOCAL = 1, + LOCAL_JOIN_POLICY_PULL_DISTRIBUTED = 2, + LOCAL_JOIN_POLICY_AUTO = 3, +} LocalJoinPolicy; + +extern int LocalTableJoinPolicy; + + extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index ba89ec972..4c1406f8e 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -38,7 +38,7 @@ extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery( Query *query); extern List * GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, PlannerRestrictionContext * - plannerRestrictionContext); + plannerRestrictionContext, int rteIndex); extern JoinRestrictionContext * RemoveDuplicateJoinRestrictions(JoinRestrictionContext * joinRestrictionContext);