diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index a44cde0cc..7a9c5c9ad 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -249,6 +249,41 @@ CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement) return namespaceId; } +List* ExecuteFunctionOnEachTableIndex(Oid relationId, IndexProcesor indexProcessor) { + List *result = NIL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + + PushOverrideEmptySearchPath(CurrentMemoryContext); + + /* open system catalog and scan all indexes that belong to this table */ + Relation pgIndex = table_open(IndexRelationId, AccessShareLock); + + ScanKeyInit(&scanKey[0], Anum_pg_index_indrelid, + BTEqualStrategyNumber, F_OIDEQ, relationId); + + SysScanDesc scanDescriptor = systable_beginscan(pgIndex, + IndexIndrelidIndexId, true, /* indexOK */ + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + while (HeapTupleIsValid(heapTuple)) + { + Form_pg_index indexForm = (Form_pg_index) GETSTRUCT(heapTuple); + indexProcessor(indexForm, &result); + + heapTuple = systable_getnext(scanDescriptor); + } + + /* clean up scan and close system catalog */ + systable_endscan(scanDescriptor); + table_close(pgIndex, AccessShareLock); + + /* revert back to original search_path */ + PopOverrideSearchPath(); + + return result; +} /* * ExecuteFunctionOnEachTableIndex executes the given pgIndexProcessor function on each diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 4a23a5052..0eb23f6a5 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -729,7 +729,6 @@ GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLE } } - /* * IndexImpliedByAConstraint is a helper function to be used while scanning * pg_index. It returns true if the index identified by the given indexForm is diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c new file mode 100644 index 000000000..4a31aa543 --- /dev/null +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -0,0 +1,361 @@ +#include "postgres.h" + +#include "distributed/pg_version_constants.h" + +#include "funcapi.h" + +#include "catalog/pg_type.h" +#include "catalog/pg_class.h" +#include "catalog/pg_index.h" +#include "distributed/citus_nodes.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/commands.h" +#include "distributed/commands/multi_copy.h" +#include "distributed/distributed_planner.h" +#include "distributed/errormessage.h" +#include "distributed/local_distributed_join_planner.h" +#include "distributed/listutils.h" +#include "distributed/log_utils.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_logical_planner.h" +#include "distributed/multi_logical_optimizer.h" +#include "distributed/multi_router_planner.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/multi_server_executor.h" +#include "distributed/query_colocation_checker.h" +#include "distributed/query_pushdown_planning.h" +#include "distributed/recursive_planning.h" +#include "distributed/relation_restriction_equivalence.h" +#include "distributed/log_utils.h" +#include "distributed/shard_pruning.h" +#include "distributed/version_compat.h" +#include "lib/stringinfo.h" +#include "optimizer/clauses.h" +#if PG_VERSION_NUM >= PG_VERSION_12 +#include "optimizer/optimizer.h" +#else +#include "optimizer/var.h" +#endif +#include "optimizer/planner.h" +#include "optimizer/prep.h" +#include "parser/parsetree.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "nodes/nodes.h" +#include "nodes/nodeFuncs.h" +#include "nodes/pg_list.h" +#include "nodes/primnodes.h" +#if PG_VERSION_NUM >= PG_VERSION_12 +#include "nodes/pathnodes.h" +#else +#include "nodes/relation.h" +#endif +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/lsyscache.h" + +static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList); +static bool HasUniqueFilter(RangeTblEntry* distRTE, List* distRTERestrictionList, List* requiredAttrNumbersForDistRTE); +static void AutoConvertLocalTableJoinToSubquery(RangeTblEntry* localRTE, RangeTblEntry* distRTE, + List* localRTERestrictionList, List* distRTERestrictionList, + List *requiredAttrNumbersForLocalRTE, List *requiredAttrNumbersForDistRTE); +static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, + RecursivePlanningContext *planningContext); +static RangeTblEntry * FindNextRTECandidate(PlannerRestrictionContext * + plannerRestrictionContext, + List *rangeTableList, List **restrictionList, + bool localTable); +static bool AllDataLocallyAccessible(List *rangeTableList); +static void GetAllUniqueIndexes(Form_pg_index indexForm, List** uniqueIndexes); + +/* + * ConvertLocalTableJoinsToSubqueries gets a query and the planner + * restrictions. As long as there is a join between a local table + * and distributed table, the function wraps one table in a + * subquery (by also pushing the filters on the table down + * to the subquery). + * + * Once this function returns, there are no direct joins between + * local and distributed tables. + */ +void +ConvertLocalTableJoinsToSubqueries(Query *query, + RecursivePlanningContext *context) +{ + List *rangeTableList = query->rtable; + if(!ShouldConvertLocalTableJoinsToSubqueries(rangeTableList)) { + return; + } + + RangeTblEntry *resultRelation = ExtractResultRelationRTE(query); + + while (ContainsLocalTableDistributedTableJoin(rangeTableList)) + { + List *localTableRestrictList = NIL; + List *distributedTableRestrictList = NIL; + + bool localTable = true; + + PlannerRestrictionContext *plannerRestrictionContext = + context->plannerRestrictionContext; + RangeTblEntry *localRTECandidate = + FindNextRTECandidate(plannerRestrictionContext, rangeTableList, + &localTableRestrictList, localTable); + RangeTblEntry *distributedRTECandidate = + FindNextRTECandidate(plannerRestrictionContext, rangeTableList, + &distributedTableRestrictList, !localTable); + + List *requiredAttrNumbersForLocalRte = + RequiredAttrNumbersForRelation(localRTECandidate, context); + List *requiredAttrNumbersForDistributedRte = + RequiredAttrNumbersForRelation(distributedRTECandidate, context); + + if (resultRelation) { + + if (resultRelation->relid == localRTECandidate->relid) { + ReplaceRTERelationWithRteSubquery(distributedRTECandidate, + distributedTableRestrictList, + requiredAttrNumbersForDistributedRte); + continue; + }else if (resultRelation->relid == distributedRTECandidate->relid) { + ReplaceRTERelationWithRteSubquery(localRTECandidate, + localTableRestrictList, + requiredAttrNumbersForLocalRte); + continue; + } + } + + if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL) + { + ReplaceRTERelationWithRteSubquery(localRTECandidate, + localTableRestrictList, + requiredAttrNumbersForLocalRte); + } + else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) + { + ReplaceRTERelationWithRteSubquery(distributedRTECandidate, + distributedTableRestrictList, + requiredAttrNumbersForDistributedRte); + } + else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_AUTO) + { + AutoConvertLocalTableJoinToSubquery(localRTECandidate, distributedRTECandidate, + localTableRestrictList, distributedTableRestrictList, + requiredAttrNumbersForLocalRte, requiredAttrNumbersForDistributedRte); + } + else + { + elog(ERROR, "unexpected local table join policy: %d", LocalTableJoinPolicy); + } + } +} + +/* + * ShouldConvertLocalTableJoinsToSubqueries returns true if we should + * convert local-dist table joins to subqueries. + */ +static bool ShouldConvertLocalTableJoinsToSubqueries(List* rangeTableList) { + if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER) + { + /* user doesn't want Citus to enable local table joins */ + return false; + } + + if (!ContainsLocalTableDistributedTableJoin(rangeTableList)) + { + /* nothing to do as there are no relevant joins */ + return false; + } + + if (AllDataLocallyAccessible(rangeTableList)) + { + /* recursively planning is overkill, router planner can already handle this */ + return false; + } + return true; +} + +static void AutoConvertLocalTableJoinToSubquery(RangeTblEntry* localRTE, RangeTblEntry* distRTE, + List* localRTERestrictionList, List* distRTERestrictionList, + List *requiredAttrNumbersForLocalRTE, List *requiredAttrNumbersForDistRTE) { + + bool hasUniqueFilter = HasUniqueFilter(distRTE, distRTERestrictionList, requiredAttrNumbersForDistRTE); + if (hasUniqueFilter) { + ReplaceRTERelationWithRteSubquery(distRTE, + distRTERestrictionList, + requiredAttrNumbersForDistRTE); + }else { + ReplaceRTERelationWithRteSubquery(localRTE, + localRTERestrictionList, + requiredAttrNumbersForLocalRTE); + } + +} + +// TODO:: This function should only consider equality, +// currently it will return true for dist.a > 5. We should check this from join->quals. +static bool HasUniqueFilter(RangeTblEntry* distRTE, List* distRTERestrictionList, List* requiredAttrNumbersForDistRTE) { + List* uniqueIndexes = ExecuteFunctionOnEachTableIndex(distRTE->relid, GetAllUniqueIndexes); + int columnNumber = 0; + foreach_int(columnNumber, uniqueIndexes) { + if (list_member_int(requiredAttrNumbersForDistRTE, columnNumber)) { + return true; + } + } + return false; +} + +static void GetAllUniqueIndexes(Form_pg_index indexForm, List** uniqueIndexes) { + if (indexForm->indisunique || indexForm->indisprimary) { + for(int i = 0; i < indexForm->indkey.dim1; i++) { + *uniqueIndexes = list_append_unique_int(*uniqueIndexes, indexForm->indkey.values[i]); + } + } +} + + +/* + * RequiredAttrNumbersForRelation returns the required attribute numbers for + * the input RTE relation in order for the planning to succeed. + * + * The function could be optimized by not adding the columns that only appear + * WHERE clause as a filter (e.g., not a join clause). + */ +static List * +RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, + RecursivePlanningContext *planningContext) +{ + PlannerRestrictionContext *plannerRestrictionContext = + planningContext->plannerRestrictionContext; + + /* TODO: Get rid of this hack, find relation restriction information directly */ + PlannerRestrictionContext *filteredPlannerRestrictionContext = + FilterPlannerRestrictionForQuery(plannerRestrictionContext, + WrapRteRelationIntoSubquery(relationRte, NIL)); + + RelationRestrictionContext *relationRestrictionContext = + filteredPlannerRestrictionContext->relationRestrictionContext; + List *filteredRelationRestrictionList = + relationRestrictionContext->relationRestrictionList; + RelationRestriction *relationRestriction = + (RelationRestriction *) linitial(filteredRelationRestrictionList); + + PlannerInfo *plannerInfo = relationRestriction->plannerInfo; + Query *queryToProcess = plannerInfo->parse; + int rteIndex = relationRestriction->index; + + List *allVarsInQuery = pull_vars_of_level((Node *) queryToProcess, 0); + ListCell *varCell = NULL; + + List *requiredAttrNumbers = NIL; + + foreach(varCell, allVarsInQuery) + { + Var *var = (Var *) lfirst(varCell); + + if (var->varno == rteIndex) + { + requiredAttrNumbers = list_append_unique_int(requiredAttrNumbers, + var->varattno); + } + } + + return requiredAttrNumbers; +} + + +/* + * FindNextRTECandidate returns a range table entry which has the most filters + * on it along with the restrictions (e.g., fills **restrictionList). + * + * The function also gets a boolean localTable parameter, so the caller + * can choose to run the function for only local tables or distributed tables. + */ +static RangeTblEntry * +FindNextRTECandidate(PlannerRestrictionContext *plannerRestrictionContext, + List *rangeTableList, List **restrictionList, + bool localTable) +{ + ListCell *rangeTableCell = NULL; + + foreach(rangeTableCell, rangeTableList) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + + /* we're only interested in tables */ + if (!(rangeTableEntry->rtekind == RTE_RELATION && + rangeTableEntry->relkind == RELKIND_RELATION)) + { + continue; + } + + if (localTable && IsCitusTable(rangeTableEntry->relid)) + { + continue; + } + + if (!localTable && !IsCitusTable(rangeTableEntry->relid)) + { + continue; + } + + List *currentRestrictionList = + GetRestrictInfoListForRelation(rangeTableEntry, + plannerRestrictionContext, 1); + + *restrictionList = currentRestrictionList; + return rangeTableEntry; + } + // TODO:: Put Illegal state error code + ereport(ERROR, (errmsg("unexpected state: could not find any RTE to convert to subquery in range table list"))); + return NULL; +} + +/* + * AllDataLocallyAccessible return true if all data for the relations in the + * rangeTableList is locally accessible. + */ +static bool +AllDataLocallyAccessible(List *rangeTableList) +{ + ListCell *rangeTableCell = NULL; + foreach(rangeTableCell, rangeTableList) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + + /* we're only interested in tables */ + if (!(rangeTableEntry->rtekind == RTE_RELATION && + rangeTableEntry->relkind == RELKIND_RELATION)) + { + continue; + } + + + Oid relationId = rangeTableEntry->relid; + + if (!IsCitusTable(relationId)) + { + /* local tables are locally accessible */ + continue; + } + + List *shardIntervalList = LoadShardIntervalList(relationId); + if (list_length(shardIntervalList) > 1) + { + /* we currently only consider single placement tables */ + return false; + } + + ShardInterval *shardInterval = linitial(shardIntervalList); + uint64 shardId = shardInterval->shardId; + ShardPlacement *localShardPlacement = + ShardPlacementOnGroup(shardId, GetLocalGroupId()); + if (localShardPlacement == NULL) + { + /* the table doesn't have a placement on this node */ + return false; + } + } + + return true; +} \ No newline at end of file diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 1a23fd090..24414fa90 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -534,12 +534,7 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, { return deferredError; } - - deferredError = DeferErrorIfUnsupportedModifyQueryWithLocalTable(queryTree); - if (deferredError != NULL) - { - return deferredError; - } + Var *partitionColumn = NULL; if (IsCitusTable(distributedTableId)) @@ -638,11 +633,14 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, } } - Oid distributedTableId = ModifyQueryResultRelationId(queryTree); - uint32 rangeTableId = 1; - Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + distributedTableId = ModifyQueryResultRelationId(queryTree); + rangeTableId = 1; - CmdType commandType = queryTree->commandType; + if (IsCitusTable(distributedTableId)) + { + partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + } + commandType = queryTree->commandType; if (commandType == CMD_INSERT || commandType == CMD_UPDATE || commandType == CMD_DELETE) { diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index d71e0dab8..96cd55845 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -59,6 +59,7 @@ #include "distributed/commands/multi_copy.h" #include "distributed/distributed_planner.h" #include "distributed/errormessage.h" +#include "distributed/local_distributed_join_planner.h" #include "distributed/listutils.h" #include "distributed/log_utils.h" #include "distributed/metadata_cache.h" @@ -109,21 +110,6 @@ int LocalTableJoinPolicy = LOCAL_JOIN_POLICY_AUTO; /* track depth of current recursive planner query */ static int recursivePlanningDepth = 0; -/* - * RecursivePlanningContext is used to recursively plan subqueries - * and CTEs, pull results to the coordinator, and push it back into - * the workers. - */ -typedef struct RecursivePlanningContext -{ - int level; - uint64 planId; - bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */ - List *subPlanList; - PlannerRestrictionContext *plannerRestrictionContext; -} RecursivePlanningContext; - - /* * CteReferenceWalkerContext is used to collect CTE references in * CteReferenceListWalker. @@ -191,18 +177,6 @@ static bool ContainsReferencesToOuterQuery(Query *query); static bool ContainsReferencesToOuterQueryWalker(Node *node, VarLevelsUpWalkerContext *context); static bool NodeContainsSubqueryReferencingOuterQuery(Node *node); -static void ConvertLocalTableJoinsToSubqueries(Query *query, - RecursivePlanningContext *planningContext); -static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, - RecursivePlanningContext *planningContext); -static RangeTblEntry * MostFilteredRte(PlannerRestrictionContext * - plannerRestrictionContext, - List *rangeTableList, List **restrictionList, - bool localTable); -static void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, - List *restrictionList, - List *requiredAttrNumbers); -static bool AllDataLocallyAccessible(List *rangeTableList); static void WrapFunctionsInSubqueries(Query *query); static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry); static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry); @@ -210,7 +184,6 @@ static Query * BuildReadIntermediateResultsQuery(List *targetEntryList, List *columnAliasList, Const *resultIdConst, Oid functionOid, bool useBinaryCopyFormat); - /* * GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs. * The function returns the subplans if necessary. For the details of when/how subplans are @@ -1371,248 +1344,11 @@ NodeContainsSubqueryReferencingOuterQuery(Node *node) return false; } - -/* - * ConvertLocalTableJoinsToSubqueries gets a query and the planner - * restrictions. As long as there is a join between a local table - * and distributed table, the function wraps one table in a - * subquery (by also pushing the filters on the table down - * to the subquery). - * - * Once this function returns, there are no direct joins between - * local and distributed tables. - */ -static void -ConvertLocalTableJoinsToSubqueries(Query *query, - RecursivePlanningContext *context) -{ - List *rangeTableList = query->rtable; - - if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER) - { - /* user doesn't want Citus to enable local table joins */ - return; - } - - if (!ContainsLocalTableDistributedTableJoin(rangeTableList)) - { - /* nothing to do as there are no relevant joins */ - return; - } - - if (AllDataLocallyAccessible(rangeTableList)) - { - /* recursively planning is overkill, router planner can already handle this */ - return; - } - - RangeTblEntry *resultRelation = ExtractResultRelationRTE(query); - - while (ContainsLocalTableDistributedTableJoin(rangeTableList)) - { - List *localTableRestrictList = NIL; - List *distributedTableRestrictList = NIL; - - bool localTable = true; - - PlannerRestrictionContext *plannerRestrictionContext = - context->plannerRestrictionContext; - RangeTblEntry *mostFilteredLocalRte = - MostFilteredRte(plannerRestrictionContext, rangeTableList, - &localTableRestrictList, localTable); - RangeTblEntry *mostFilteredDistributedRte = - MostFilteredRte(plannerRestrictionContext, rangeTableList, - &distributedTableRestrictList, !localTable); - - List *requiredAttrNumbersForLocalRte = - RequiredAttrNumbersForRelation(mostFilteredLocalRte, context); - List *requiredAttrNumbersForDistriutedRte = - RequiredAttrNumbersForRelation(mostFilteredDistributedRte, context); - - - elog(DEBUG4, "Local relation with the most number of filters " - "on it: \"%s\"", get_rel_name(mostFilteredLocalRte->relid)); - elog(DEBUG4, "Distributed relation with the most number of filters " - "on it: \"%s\"", get_rel_name(mostFilteredDistributedRte->relid)); - - if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PULL_LOCAL) - { - ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte, - localTableRestrictList, - requiredAttrNumbersForLocalRte); - } - else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PULL_DISTRIBUTED) - { - ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte, - distributedTableRestrictList, - requiredAttrNumbersForDistriutedRte); - } - else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_AUTO) - { - bool localTableHasFilter = list_length(localTableRestrictList) > 0; - bool distributedTableHasFilter = - list_length(distributedTableRestrictList) > 0; - - if (resultRelation && resultRelation->relid == mostFilteredLocalRte->relid && - !mostFilteredLocalRte->inFromCl) - { - /* - * We cannot recursively plan result relation, we have to - * recursively plan the distributed table. - * - * TODO: A future improvement could be to pick the next most filtered - * local relation, if exists. - */ - ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte, - distributedTableRestrictList, - requiredAttrNumbersForDistriutedRte); - } - else if (localTableHasFilter || !distributedTableHasFilter) - { - /* - * First, favor recursively planning local table when it has a filter. - * The rationale is that local tables are small, and at least one filter - * they become even smaller. On each iteration, we pick the local table - * with the most filters (e.g., WHERE clause entries). Note that the filters - * don't need to be directly on the table in the query tree, instead we use - * Postgres' filters where filters can be pushed down tables via filters. - * - * Second, if a distributed table doesn't have a filter, we do not ever - * prefer recursively planning that. Instead, we recursively plan the - * local table, assuming that it is smaller. - * - * TODO: If we have better statistics on how many tuples each table returns - * considering the filters on them, we should pick the table with least - * tuples. Today, we do not have such an infrastructure. - */ - ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte, - localTableRestrictList, - requiredAttrNumbersForLocalRte); - } - else - { - ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte, - distributedTableRestrictList, - requiredAttrNumbersForDistriutedRte); - } - } - else - { - elog(ERROR, "unexpected local table join policy: %d", LocalTableJoinPolicy); - } - } -} - - -/* - * RequiredAttrNumbersForRelation returns the required attribute numbers for - * the input RTE relation in order for the planning to succeed. - * - * The function could be optimized by not adding the columns that only appear - * WHERE clause as a filter (e.g., not a join clause). - */ -static List * -RequiredAttrNumbersForRelation(RangeTblEntry *relationRte, - RecursivePlanningContext *planningContext) -{ - PlannerRestrictionContext *plannerRestrictionContext = - planningContext->plannerRestrictionContext; - - /* TODO: Get rid of this hack, find relation restriction information directly */ - PlannerRestrictionContext *filteredPlannerRestrictionContext = - FilterPlannerRestrictionForQuery(plannerRestrictionContext, - WrapRteRelationIntoSubquery(relationRte, NIL)); - - RelationRestrictionContext *relationRestrictionContext = - filteredPlannerRestrictionContext->relationRestrictionContext; - List *filteredRelationRestrictionList = - relationRestrictionContext->relationRestrictionList; - RelationRestriction *relationRestriction = - (RelationRestriction *) linitial(filteredRelationRestrictionList); - - PlannerInfo *plannerInfo = relationRestriction->plannerInfo; - Query *queryToProcess = plannerInfo->parse; - int rteIndex = relationRestriction->index; - - List *allVarsInQuery = pull_vars_of_level((Node *) queryToProcess, 0); - ListCell *varCell = NULL; - - List *requiredAttrNumbers = NIL; - - foreach(varCell, allVarsInQuery) - { - Var *var = (Var *) lfirst(varCell); - - if (var->varno == rteIndex) - { - requiredAttrNumbers = list_append_unique_int(requiredAttrNumbers, - var->varattno); - } - } - - return requiredAttrNumbers; -} - - -/* - * MostFilteredRte returns a range table entry which has the most filters - * on it along with the restrictions (e.g., fills **restrictionList). - * - * The function also gets a boolean localTable parameter, so the caller - * can choose to run the function for only local tables or distributed tables. - */ -static RangeTblEntry * -MostFilteredRte(PlannerRestrictionContext *plannerRestrictionContext, - List *rangeTableList, List **restrictionList, - bool localTable) -{ - RangeTblEntry *mostFilteredLocalRte = NULL; - - ListCell *rangeTableCell = NULL; - - foreach(rangeTableCell, rangeTableList) - { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - - /* we're only interested in tables */ - if (!(rangeTableEntry->rtekind == RTE_RELATION && - rangeTableEntry->relkind == RELKIND_RELATION)) - { - continue; - } - - if (localTable && IsCitusTable(rangeTableEntry->relid)) - { - continue; - } - - if (!localTable && !IsCitusTable(rangeTableEntry->relid)) - { - continue; - } - - List *currentRestrictionList = - GetRestrictInfoListForRelation(rangeTableEntry, - plannerRestrictionContext, 1); - - if (mostFilteredLocalRte == NULL || - list_length(*restrictionList) < list_length(currentRestrictionList) || - ContainsFalseClause(currentRestrictionList)) - { - mostFilteredLocalRte = rangeTableEntry; - *restrictionList = currentRestrictionList; - } - } - - return mostFilteredLocalRte; -} - - /* * ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry * with a subquery. The function also pushes down the filters to the subquery. */ -static void +void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList, List *requiredAttrNumbers) { @@ -1648,56 +1384,6 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict } -/* - * AllDataLocallyAccessible return true if all data for the relations in the - * rangeTableList is locally accessible. - */ -static bool -AllDataLocallyAccessible(List *rangeTableList) -{ - ListCell *rangeTableCell = NULL; - foreach(rangeTableCell, rangeTableList) - { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - - /* we're only interested in tables */ - if (!(rangeTableEntry->rtekind == RTE_RELATION && - rangeTableEntry->relkind == RELKIND_RELATION)) - { - continue; - } - - - Oid relationId = rangeTableEntry->relid; - - if (!IsCitusTable(relationId)) - { - /* local tables are locally accessible */ - continue; - } - - List *shardIntervalList = LoadShardIntervalList(relationId); - if (list_length(shardIntervalList) > 1) - { - /* we currently only consider single placement tables */ - return false; - } - - ShardInterval *shardInterval = linitial(shardIntervalList); - uint64 shardId = shardInterval->shardId; - ShardPlacement *localShardPlacement = - ShardPlacementOnGroup(shardId, GetLocalGroupId()); - if (localShardPlacement == NULL) - { - /* the table doesn't have a placement on this node */ - return false; - } - } - - return true; -} - - /* * ContainsLocalTableDistributedTableJoin returns true if the input range table list * contains a direct join between local and distributed tables. @@ -1714,8 +1400,8 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList) RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); /* we're only interested in tables */ - if (!(rangeTableEntry->rtekind == RTE_RELATION && - rangeTableEntry->relkind == RELKIND_RELATION)) + if (rangeTableEntry->rtekind != RTE_RELATION || + rangeTableEntry->relkind != RELKIND_RELATION) { continue; } @@ -1729,14 +1415,9 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList) { containsLocalTable = true; } - - if (containsLocalTable && containsDistributedTable) - { - return true; - } } - return false; + return containsLocalTable && containsDistributedTable; } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index fbf9c14ab..d93c77962 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -200,8 +200,8 @@ static const struct config_enum_entry log_level_options[] = { static const struct config_enum_entry local_table_join_policies[] = { { "never", LOCAL_JOIN_POLICY_NEVER, false}, - { "pull-local", LOCAL_JOIN_POLICY_PULL_LOCAL, false}, - { "pull-distributed", LOCAL_JOIN_POLICY_PULL_DISTRIBUTED, false}, + { "prefer-local", LOCAL_JOIN_POLICY_PREFER_LOCAL, false}, + { "prefer-distributed", LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED, false}, { "auto", LOCAL_JOIN_POLICY_AUTO, false}, { NULL, 0, false} }; diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 603b68b81..3461e2246 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -70,6 +70,7 @@ extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand); /* index.c */ typedef void (*PGIndexProcessor)(Form_pg_index, List **); + /* call.c */ extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest); diff --git a/src/include/distributed/local_distributed_join_planner.h b/src/include/distributed/local_distributed_join_planner.h new file mode 100644 index 000000000..85d15b4ea --- /dev/null +++ b/src/include/distributed/local_distributed_join_planner.h @@ -0,0 +1,24 @@ + +/*------------------------------------------------------------------------- + * + * listutils.h + * + * Declarations for public utility functions related to lists. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef LOCAL_DISTRIBUTED_JOIN_PLANNER_H +#define LOCAL_DISTRIBUTED_JOIN_PLANNER_H + +#include "postgres.h" +#include "distributed/recursive_planning.h" + + +extern void +ConvertLocalTableJoinsToSubqueries(Query *query, + RecursivePlanningContext *context); + +#endif /* LOCAL_DISTRIBUTED_JOIN_PLANNER_H */ diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index d41a1c53a..a34a6a9ff 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -113,12 +113,9 @@ extern List * AllShardPlacementsOnNodeGroup(int32 groupId); extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery); -<<<<<<< HEAD extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); -======= extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId); ->>>>>>> Handle modifications as well /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index 28f9f6e5d..d6fed178e 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -26,13 +26,27 @@ typedef enum { LOCAL_JOIN_POLICY_NEVER = 0, - LOCAL_JOIN_POLICY_PULL_LOCAL = 1, - LOCAL_JOIN_POLICY_PULL_DISTRIBUTED = 2, + LOCAL_JOIN_POLICY_PREFER_LOCAL = 1, + LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED = 2, LOCAL_JOIN_POLICY_AUTO = 3, } LocalJoinPolicy; extern int LocalTableJoinPolicy; +/* + * RecursivePlanningContext is used to recursively plan subqueries + * and CTEs, pull results to the coordinator, and push it back into + * the workers. + */ +typedef struct RecursivePlanningContext +{ + int level; + uint64 planId; + bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */ + List *subPlanList; + PlannerRestrictionContext *plannerRestrictionContext; +} RecursivePlanningContext; + extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, PlannerRestrictionContext * @@ -46,6 +60,9 @@ extern Query * BuildReadIntermediateResultsArrayQuery(List *targetEntryList, bool useBinaryCopyFormat); extern bool GeneratingSubplans(void); extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList); +extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, + List *restrictionList, + List *requiredAttrNumbers); #endif /* RECURSIVE_PLANNING_H */ diff --git a/src/test/regress/expected/local_table_join.out b/src/test/regress/expected/local_table_join.out index 489d4be0f..be290b200 100644 --- a/src/test/regress/expected/local_table_join.out +++ b/src/test/regress/expected/local_table_join.out @@ -15,15 +15,32 @@ SELECT create_distributed_table('distributed_table', 'key'); (1 row) +CREATE TABLE distributed_table_pkey (key int primary key, value text, value_2 jsonb); +SELECT create_distributed_table('distributed_table_pkey', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE distributed_table_windex (key int, value text, value_2 jsonb); +SELECT create_distributed_table('distributed_table_windex', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE UNIQUE INDEX key_index ON distributed_table_windex (key); SET client_min_messages TO DEBUG1; -- the user doesn't allow local / distributed table joinn SET citus.local_table_join_policy TO 'never'; SELECT count(*) FROM postgres_table JOIN distributed_table USING(key); -ERROR: relation postgres_table is not distributed +ERROR: direct joins between distributed and local tables are not supported +HINT: Use CTE's or subqueries to select from local tables and use them in joins SELECT count(*) FROM postgres_table JOIN reference_table USING(key); -ERROR: relation postgres_table is not distributed +ERROR: direct joins between distributed and local tables are not supported +HINT: Use CTE's or subqueries to select from local tables and use them in joins -- the user prefers local table recursively planned -SET citus.local_table_join_policy TO 'pull-local'; +SET citus.local_table_join_policy TO 'prefer-local'; SELECT count(*) FROM postgres_table JOIN distributed_table USING(key); DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 @@ -43,7 +60,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) -- the user prefers distributed table recursively planned -SET citus.local_table_join_policy TO 'pull-distributed'; +SET citus.local_table_join_policy TO 'prefer-distributed'; SELECT count(*) FROM postgres_table JOIN distributed_table USING(key); DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 @@ -65,8 +82,9 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c -- update/delete -- auto tests -- switch back to the default policy, which is auto -RESET citus.local_table_join_policy; --- on the default mode, the local tables should be recursively planned +SET citus.local_table_join_policy to 'auto'; +-- on the auto mode, the local tables should be recursively planned +-- unless a unique index exists in a column for distributed table SELECT count(*) FROM distributed_table JOIN postgres_table USING(key); DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 @@ -94,20 +112,90 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) --- this is a contreversial part that we should discuss further --- if the distributed table has at least one filter, we prefer --- recursively planning of the distributed table -SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test'; -DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table.value OPERATOR(pg_catalog.=) 'test'::text) +-- a unique index on key so dist table should be recursively planned +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key); +DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(value); +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey USING (value)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON postgres_table.key = distributed_table_pkey.key; +DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON ((postgres_table.key OPERATOR(pg_catalog.=) distributed_table_pkey.key))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10; +DEBUG: Wrapping local relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey ON ((distributed_table_pkey.key OPERATOR(pg_catalog.=) 10))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- a unique index on key so dist table should be recursively planned +SELECT count(*) FROM postgres_table JOIN distributed_table_windex USING(key); +DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_windex USING(value); +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT NULL::integer AS key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_windex USING (value)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_windex ON postgres_table.key = distributed_table_windex.key; +DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex ON ((postgres_table.key OPERATOR(pg_catalog.=) distributed_table_windex.key))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM postgres_table JOIN distributed_table_windex ON distributed_table_windex.key = 10; +DEBUG: Wrapping local relation "distributed_table_windex" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_windex WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_windex ON ((distributed_table_windex.key OPERATOR(pg_catalog.=) 10))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- no unique index on value so local table should be recursively planned. +SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test'; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE (distributed_table.value OPERATOR(pg_catalog.=) 'test'::text) count --------------------------------------------------------------------- 0 (1 row) --- but if the filters can be pushed downn to the local table via the join --- we are smart about recursively planning the local table SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.key = 1; DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 @@ -117,7 +205,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) --- if both local and distributed tables have a filter, we prefer local +-- if both local and distributed tables have a filter, we prefer local unless distributed table has unique indexes on any equality filter SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' AND postgres_table.value = 'test'; DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (value OPERATOR(pg_catalog.=) 'test'::text) OFFSET 0 @@ -149,29 +237,24 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) --- if one of the distributed tables have a filter, we'll prefer recursive planning of it as well --- it actually leads to a poor plan as we need to recursively plan local tables anyway as it is --- joined with another distributed table SELECT count(*) FROM distributed_table d1 JOIN postgres_table p1 USING(key) JOIN distributed_table d2 USING(key) JOIN postgres_table p2 USING(key) WHERE d1.value = '1'; -DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d1 WHERE (value OPERATOR(pg_catalog.=) '1'::text) OFFSET 0 DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 -DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table d1 WHERE (value OPERATOR(pg_catalog.=) '1'::text) OFFSET 0 -DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 -DEBUG: generating subplan XXX_3 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1 USING (key)) JOIN local_table_join.distributed_table d2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 USING (key)) WHERE (d1.value OPERATOR(pg_catalog.=) '1'::text) +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p1 WHERE true OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table p2 WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((local_table_join.distributed_table d1 JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p1 USING (key)) JOIN local_table_join.distributed_table d2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) p2 USING (key)) WHERE (d1.value OPERATOR(pg_catalog.=) '1'::text) count --------------------------------------------------------------------- 0 (1 row) -- if the filter is on the JOIN key, we can recursively plan the local --- tables as filters are pushded down to the local tables +-- tables as filters are pushed down to the local tables SELECT count(*) FROM @@ -188,6 +271,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) +SET citus.local_table_join_policy to 'auto'; -- we can support modification queries as well UPDATE postgres_table @@ -211,6 +295,120 @@ WHERE DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table.key OPERATOR(pg_catalog.=) postgres_table.key) +UPDATE + distributed_table_pkey +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table_pkey.key = postgres_table.key; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_pkey SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) postgres_table.key) +UPDATE + distributed_table_windex +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table_windex.key = postgres_table.key; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_windex SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) postgres_table.key) +-- in case of update/delete we always recursively plan +-- the tables other than target table no matter what the policy is +SET citus.local_table_join_policy TO 'prefer-local'; +UPDATE + postgres_table +SET + value = 'test' +FROM + distributed_table +WHERE + distributed_table.key = postgres_table.key; +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table WHERE (distributed_table.key OPERATOR(pg_catalog.=) postgres_table.key) +UPDATE + distributed_table +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table.key = postgres_table.key; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table.key OPERATOR(pg_catalog.=) postgres_table.key) +UPDATE + distributed_table_pkey +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table_pkey.key = postgres_table.key; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_pkey SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) postgres_table.key) +UPDATE + distributed_table_windex +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table_windex.key = postgres_table.key; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_windex SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) postgres_table.key) +SET citus.local_table_join_policy TO 'prefer-distributed'; +UPDATE + postgres_table +SET + value = 'test' +FROM + distributed_table +WHERE + distributed_table.key = postgres_table.key; +DEBUG: Wrapping local relation "distributed_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table WHERE (distributed_table.key OPERATOR(pg_catalog.=) postgres_table.key) +UPDATE + distributed_table +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table.key = postgres_table.key; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table.key OPERATOR(pg_catalog.=) postgres_table.key) +UPDATE + distributed_table_pkey +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table_pkey.key = postgres_table.key; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_pkey SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) postgres_table.key) +UPDATE + distributed_table_windex +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table_windex.key = postgres_table.key; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.distributed_table_windex SET value = 'test'::text FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table WHERE (distributed_table_windex.key OPERATOR(pg_catalog.=) postgres_table.key) -- modifications with multiple tables UPDATE distributed_table @@ -254,4 +452,4 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_j \set VERBOSITY terse RESET client_min_messages; DROP SCHEMA local_table_join CASCADE; -NOTICE: drop cascades to 3 other objects +NOTICE: drop cascades to 5 other objects diff --git a/src/test/regress/expected/recursive_relation_planning_restirction_pushdown.out b/src/test/regress/expected/recursive_relation_planning_restirction_pushdown.out index 2df77f1af..87dee53e0 100644 --- a/src/test/regress/expected/recursive_relation_planning_restirction_pushdown.out +++ b/src/test/regress/expected/recursive_relation_planning_restirction_pushdown.out @@ -25,7 +25,7 @@ SELECT create_distributed_table('distributed_table', 'key'); SET client_min_messages TO DEBUG1; -- for the purposes of these tests, we always want to recursively -- plan local tables. -SET citus.local_table_join_policy TO "pull-local"; +SET citus.local_table_join_policy TO "prefer-local"; -- there are no filters, hence cannot pushdown any filters SELECT count(*) FROM distributed_table u1 diff --git a/src/test/regress/sql/local_table_join.sql b/src/test/regress/sql/local_table_join.sql index 0b0285af9..be1946533 100644 --- a/src/test/regress/sql/local_table_join.sql +++ b/src/test/regress/sql/local_table_join.sql @@ -8,6 +8,13 @@ SELECT create_reference_table('reference_table'); CREATE TABLE distributed_table (key int, value text, value_2 jsonb); SELECT create_distributed_table('distributed_table', 'key'); +CREATE TABLE distributed_table_pkey (key int primary key, value text, value_2 jsonb); +SELECT create_distributed_table('distributed_table_pkey', 'key'); + +CREATE TABLE distributed_table_windex (key int, value text, value_2 jsonb); +SELECT create_distributed_table('distributed_table_windex', 'key'); +CREATE UNIQUE INDEX key_index ON distributed_table_windex (key); + SET client_min_messages TO DEBUG1; @@ -17,13 +24,13 @@ SELECT count(*) FROM postgres_table JOIN distributed_table USING(key); SELECT count(*) FROM postgres_table JOIN reference_table USING(key); -- the user prefers local table recursively planned -SET citus.local_table_join_policy TO 'pull-local'; +SET citus.local_table_join_policy TO 'prefer-local'; SELECT count(*) FROM postgres_table JOIN distributed_table USING(key); SELECT count(*) FROM postgres_table JOIN reference_table USING(key); -- the user prefers distributed table recursively planned -SET citus.local_table_join_policy TO 'pull-distributed'; +SET citus.local_table_join_policy TO 'prefer-distributed'; SELECT count(*) FROM postgres_table JOIN distributed_table USING(key); SELECT count(*) FROM postgres_table JOIN reference_table USING(key); @@ -32,26 +39,34 @@ SELECT count(*) FROM postgres_table JOIN reference_table USING(key); -- auto tests -- switch back to the default policy, which is auto -RESET citus.local_table_join_policy; +SET citus.local_table_join_policy to 'auto'; --- on the default mode, the local tables should be recursively planned +-- on the auto mode, the local tables should be recursively planned +-- unless a unique index exists in a column for distributed table SELECT count(*) FROM distributed_table JOIN postgres_table USING(key); SELECT count(*) FROM reference_table JOIN postgres_table USING(key); SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) JOIN reference_table USING (key); +-- a unique index on key so dist table should be recursively planned +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key); +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(value); +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON postgres_table.key = distributed_table_pkey.key; +SELECT count(*) FROM postgres_table JOIN distributed_table_pkey ON distributed_table_pkey.key = 10; --- this is a contreversial part that we should discuss further --- if the distributed table has at least one filter, we prefer --- recursively planning of the distributed table +-- a unique index on key so dist table should be recursively planned +SELECT count(*) FROM postgres_table JOIN distributed_table_windex USING(key); +SELECT count(*) FROM postgres_table JOIN distributed_table_windex USING(value); +SELECT count(*) FROM postgres_table JOIN distributed_table_windex ON postgres_table.key = distributed_table_windex.key; +SELECT count(*) FROM postgres_table JOIN distributed_table_windex ON distributed_table_windex.key = 10; + +-- no unique index on value so local table should be recursively planned. SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test'; --- but if the filters can be pushed downn to the local table via the join --- we are smart about recursively planning the local table SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.key = 1; --- if both local and distributed tables have a filter, we prefer local +-- if both local and distributed tables have a filter, we prefer local unless distributed table has unique indexes on any equality filter SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' AND postgres_table.value = 'test'; SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE distributed_table.value = 'test' OR postgres_table.value = 'test'; @@ -61,9 +76,6 @@ SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) WHERE dist SELECT count(*) FROM distributed_table d1 JOIN postgres_table p1 USING(key) JOIN distributed_table d2 USING(key) JOIN postgres_table p2 USING(key); --- if one of the distributed tables have a filter, we'll prefer recursive planning of it as well --- it actually leads to a poor plan as we need to recursively plan local tables anyway as it is --- joined with another distributed table SELECT count(*) FROM @@ -72,7 +84,7 @@ WHERE d1.value = '1'; -- if the filter is on the JOIN key, we can recursively plan the local --- tables as filters are pushded down to the local tables +-- tables as filters are pushed down to the local tables SELECT count(*) FROM @@ -81,6 +93,8 @@ WHERE d1.key = 1; +SET citus.local_table_join_policy to 'auto'; + -- we can support modification queries as well UPDATE postgres_table @@ -101,6 +115,106 @@ FROM WHERE distributed_table.key = postgres_table.key; +UPDATE + distributed_table_pkey +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table_pkey.key = postgres_table.key; + +UPDATE + distributed_table_windex +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table_windex.key = postgres_table.key; + +-- in case of update/delete we always recursively plan +-- the tables other than target table no matter what the policy is + +SET citus.local_table_join_policy TO 'prefer-local'; + +UPDATE + postgres_table +SET + value = 'test' +FROM + distributed_table +WHERE + distributed_table.key = postgres_table.key; + + +UPDATE + distributed_table +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table.key = postgres_table.key; + +UPDATE + distributed_table_pkey +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table_pkey.key = postgres_table.key; + +UPDATE + distributed_table_windex +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table_windex.key = postgres_table.key; + + +SET citus.local_table_join_policy TO 'prefer-distributed'; + +UPDATE + postgres_table +SET + value = 'test' +FROM + distributed_table +WHERE + distributed_table.key = postgres_table.key; + + +UPDATE + distributed_table +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table.key = postgres_table.key; + +UPDATE + distributed_table_pkey +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table_pkey.key = postgres_table.key; + +UPDATE + distributed_table_windex +SET + value = 'test' +FROM + postgres_table +WHERE + distributed_table_windex.key = postgres_table.key; + -- modifications with multiple tables UPDATE distributed_table diff --git a/src/test/regress/sql/recursive_relation_planning_restirction_pushdown.sql b/src/test/regress/sql/recursive_relation_planning_restirction_pushdown.sql index ecd148b91..7a1e9923d 100644 --- a/src/test/regress/sql/recursive_relation_planning_restirction_pushdown.sql +++ b/src/test/regress/sql/recursive_relation_planning_restirction_pushdown.sql @@ -26,7 +26,7 @@ SET client_min_messages TO DEBUG1; -- for the purposes of these tests, we always want to recursively -- plan local tables. -SET citus.local_table_join_policy TO "pull-local"; +SET citus.local_table_join_policy TO "prefer-local"; -- there are no filters, hence cannot pushdown any filters