diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 35dc01cf4..af1563129 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -78,7 +78,6 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, uint64 *tableSize); static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId); static void ErrorIfNotSuitableToGetSize(Oid relationId); -static ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId); /* exports for SQL callable functions */ @@ -1295,7 +1294,7 @@ UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char s * of the shard on the given group. If no such placement exists, the function * return NULL. */ -static ShardPlacement * +ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId) { List *placementList = ShardPlacementList(shardId); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index eaba0ff02..0062e4534 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -512,6 +512,8 @@ IsTidColumn(Node *node) } +#include "distributed/recursive_planning.h" + /* * ModifyPartialQuerySupported implements a subset of what ModifyQuerySupported checks, * that subset being what's necessary to check modifying CTEs for. @@ -521,7 +523,15 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, Oid *distributedTableIdOutput) { DeferredErrorMessage *deferredError = DeferErrorIfModifyView(queryTree); - if (deferredError != NULL) + if (deferredError != NULL) { + return deferredError; + } + uint32 rangeTableId = 1; + CmdType commandType = queryTree->commandType; + + Oid distributedTableId = ModifyQueryResultRelationId(queryTree); + *distributedTableIdOutput = distributedTableId; + if (ContainsLocalTableDistributedTableJoin(queryTree->rtable)) { return deferredError; } @@ -531,6 +541,18 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery, { return deferredError; } + Var *partitionColumn = NULL; + + if (IsCitusTable(distributedTableId)) + { + partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + } + + deferredError = DeferErrorIfModifyView(queryTree); + if (deferredError != NULL) + { + return deferredError; + } /* * Reject subqueries which are in SELECT or WHERE clause. @@ -939,9 +961,7 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer /* for other kinds of relations, check if its distributed */ else { - Oid relationId = rangeTableEntry->relid; - - if (!IsCitusTable(relationId)) + if (ContainsLocalTableDistributedTableJoin(queryTree->rtable)) { StringInfo errorMessage = makeStringInfo(); char *relationName = get_rel_name(rangeTableEntry->relid); diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index e68052a5f..898f00280 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -194,7 +194,7 @@ static RangeTblEntry * MostFilteredRte(PlannerRestrictionContext * bool localTable); static void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList); -static bool ContainsLocalTableDistributedTableJoin(List *rangeTableList); +static bool AllDataLocallyAccessible(List *rangeTableList); static void WrapFunctionsInSubqueries(Query *query); static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry); static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry); @@ -1392,10 +1392,14 @@ ConvertLocalTableJoinsToSubqueries(Query *query, return; } + if (AllDataLocallyAccessible(rangeTableList)) { - /* TODO: if all tables are local, skip */ + /* recursively planning is overkill, router planner can already handle this */ + return; } + RangeTblEntry *resultRelation = ExtractResultRelationRTE(query); + while (ContainsLocalTableDistributedTableJoin(rangeTableList)) { List *localTableRestrictList = NIL; @@ -1410,6 +1414,11 @@ ConvertLocalTableJoinsToSubqueries(Query *query, MostFilteredRte(plannerRestrictionContext, rangeTableList, &distributedTableRestrictList, !localTable); + elog(DEBUG4, "Local relation with the most number of filters " + "on it: \"%s\"", get_rel_name(mostFilteredLocalRte->relid)); + elog(DEBUG4, "Distributed relation with the most number of filters " + "on it: \"%s\"", get_rel_name(mostFilteredDistributedRte->relid)); + if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PULL_LOCAL) { ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte, @@ -1426,26 +1435,37 @@ ConvertLocalTableJoinsToSubqueries(Query *query, bool distributedTableHasFilter = list_length(distributedTableRestrictList) > 0; - /* TODO: for modifications, either skip or do not plan target table */ - - /* - * First, favor recursively planning local table when it has a filter. - * The rationale is that local tables are small, and at least one filter - * they become even smaller. On each iteration, we pick the local table - * with the most filters (e.g., WHERE clause entries). Note that the filters - * don't need to be directly on the table in the query tree, instead we use - * Postgres' filters where filters can be pushed down tables via filters. - * - * Second, if a distributed table doesn't have a filter, we do not ever - * prefer recursively planning that. Instead, we recursively plan the - * local table, assuming that it is smaller. - * - * TODO: If we have better statistics on how many tuples each table returns - * considering the filters on them, we should pick the table with least - * tuples. Today, we do not have such an infrastructure. - */ - if (localTableHasFilter || !distributedTableHasFilter) + if (resultRelation && resultRelation->relid == mostFilteredLocalRte->relid && + !mostFilteredLocalRte->inFromCl) { + /* + * We cannot recursively plan result relation, we have to + * recursively plan the distributed table. + * + * TODO: A future improvement could be to pick the next most filtered + * local relation, if exists. + */ + ReplaceRTERelationWithRteSubquery(mostFilteredDistributedRte, + distributedTableRestrictList); + } + else if (localTableHasFilter || !distributedTableHasFilter) + { + /* + * First, favor recursively planning local table when it has a filter. + * The rationale is that local tables are small, and at least one filter + * they become even smaller. On each iteration, we pick the local table + * with the most filters (e.g., WHERE clause entries). Note that the filters + * don't need to be directly on the table in the query tree, instead we use + * Postgres' filters where filters can be pushed down tables via filters. + * + * Second, if a distributed table doesn't have a filter, we do not ever + * prefer recursively planning that. Instead, we recursively plan the + * local table, assuming that it is smaller. + * + * TODO: If we have better statistics on how many tuples each table returns + * considering the filters on them, we should pick the table with least + * tuples. Today, we do not have such an infrastructure. + */ ReplaceRTERelationWithRteSubquery(mostFilteredLocalRte, localTableRestrictList); } @@ -1490,7 +1510,12 @@ MostFilteredRte(PlannerRestrictionContext *plannerRestrictionContext, continue; } - if (IsCitusTable(rangeTableEntry->relid) && localTable) + if (localTable && IsCitusTable(rangeTableEntry->relid)) + { + continue; + } + + if (!localTable && !IsCitusTable(rangeTableEntry->relid)) { continue; } @@ -1549,11 +1574,61 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict } +/* + * AllDataLocallyAccessible return true if all data for the relations in the + * rangeTableList is locally accessible. + */ +static bool +AllDataLocallyAccessible(List *rangeTableList) +{ + ListCell *rangeTableCell = NULL; + foreach(rangeTableCell, rangeTableList) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + + /* we're only interested in tables */ + if (!(rangeTableEntry->rtekind == RTE_RELATION && + rangeTableEntry->relkind == RELKIND_RELATION)) + { + continue; + } + + + Oid relationId = rangeTableEntry->relid; + + if (!IsCitusTable(relationId)) + { + /* local tables are locally accessible */ + continue; + } + + List *shardIntervalList = LoadShardIntervalList(relationId); + if (list_length(shardIntervalList) > 1) + { + /* we currently only consider single placement tables */ + return false; + } + + ShardInterval *shardInterval = linitial(shardIntervalList); + uint64 shardId = shardInterval->shardId; + ShardPlacement *localShardPlacement = + ShardPlacementOnGroup(shardId, GetLocalGroupId()); + if (localShardPlacement == NULL) + { + /* the table doesn't have a placement on this node */ + return false; + } + } + + return true; +} + + /* * ContainsLocalTableDistributedTableJoin returns true if the input range table list * contains a direct join between local and distributed tables. */ -static bool +bool ContainsLocalTableDistributedTableJoin(List *rangeTableList) { bool containsLocalTable = false; diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 5db5b0b69..d41a1c53a 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -113,7 +113,12 @@ extern List * AllShardPlacementsOnNodeGroup(int32 groupId); extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery); +<<<<<<< HEAD extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); +======= +extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId); + +>>>>>>> Handle modifications as well /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index 1da704dff..28f9f6e5d 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -45,5 +45,7 @@ extern Query * BuildReadIntermediateResultsArrayQuery(List *targetEntryList, List *resultIdList, bool useBinaryCopyFormat); extern bool GeneratingSubplans(void); +extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList); + #endif /* RECURSIVE_PLANNING_H */