diff --git a/src/backend/distributed/executor/merge_executor.c b/src/backend/distributed/executor/merge_executor.c index f501497c0..bcacbcd1e 100644 --- a/src/backend/distributed/executor/merge_executor.c +++ b/src/backend/distributed/executor/merge_executor.c @@ -100,7 +100,7 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState) Query *mergeQuery = copyObject(distributedPlan->modifyQueryViaCoordinatorOrRepartition); RangeTblEntry *targetRte = ExtractResultRelationRTE(mergeQuery); - RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery); + RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery, false); Oid targetRelationId = targetRte->relid; bool hasReturning = distributedPlan->expectResults; Query *sourceQuery = sourceRte->subquery; @@ -211,7 +211,7 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState) Query *mergeQuery = copyObject(distributedPlan->modifyQueryViaCoordinatorOrRepartition); RangeTblEntry *targetRte = ExtractResultRelationRTE(mergeQuery); - RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery); + RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery, false); Query *sourceQuery = sourceRte->subquery; Oid targetRelationId = targetRte->relid; PlannedStmt *sourcePlan = diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index f6af545cb..06126cdf3 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -57,6 +57,9 @@ static DeferredErrorMessage * DeferErrorIfRoutableMergeNotSupported(Query *query * plannerRestrictionContext, Oid targetRelationId); +static bool MergeSourceHasRouterSelect(Query *query, + PlannerRestrictionContext * + plannerRestrictionContext); static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, Query *query, @@ -234,7 +237,7 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ ParamListInfo boundParams) { Query *mergeQuery = copyObject(originalQuery); - RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery); + RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery, false); DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); ereport(DEBUG1, (errmsg("Creating MERGE repartition plan"))); @@ -959,7 +962,8 @@ DeferErrorIfTargetHasFalseClause(Oid targetRelationId, List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo; List *restrictClauseList = get_all_actual_clauses(baseRestrictionList); - if (ContainsFalseClause(restrictClauseList)) + if (ContainsFalseClause(restrictClauseList) || + JoinConditionIsOnFalse(relationRestriction->relOptInfo->joininfo)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "Routing query is not possible with " @@ -1047,22 +1051,41 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, "must be colocated", NULL, NULL); } - DeferredErrorMessage *deferredError = - DeferErrorIfUnsupportedSubqueryPushdown(query, - plannerRestrictionContext); - if (deferredError) - { - ereport(DEBUG1, (errmsg("Sub-query is not pushable, try repartitioning"))); - return deferredError; - } + DeferredErrorMessage *deferredError = NULL; - if (HasDangerousJoinUsing(query->rtable, (Node *) query->jointree)) + + /* + * if the query goes to a single node ("router" in Citus' parlance), + * we don't need to go through certain SQL support and colocation checks. + * + * For PG16+, this is required as some of the outer JOINs are converted to + * "ON(true)" and filters are pushed down to the table scans. As + * DeferErrorIfUnsupportedSubqueryPushdown rely on JOIN filters, it will fail to + * detect the router case. However, we can still detect it by checking if + * the query is a router query as the router query checks the filters on + * the tables. + */ + + + if (!MergeSourceHasRouterSelect(query, plannerRestrictionContext)) { - ereport(DEBUG1, (errmsg( - "Query has ambigious joins, merge is not pushable, try repartitioning"))); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "a join with USING causes an internal naming " - "conflict, use ON instead", NULL, NULL); + deferredError = + DeferErrorIfUnsupportedSubqueryPushdown(query, + plannerRestrictionContext); + if (deferredError) + { + ereport(DEBUG1, (errmsg("Sub-query is not pushable, try repartitioning"))); + return deferredError; + } + + if (HasDangerousJoinUsing(query->rtable, (Node *) query->jointree)) + { + ereport(DEBUG1, (errmsg( + "Query has ambigious joins, merge is not pushable, try repartitioning"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "a join with USING causes an internal naming " + "conflict, use ON instead", NULL, NULL); + } } deferredError = DeferErrorIfTargetHasFalseClause(targetRelationId, @@ -1080,6 +1103,36 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, } +/* + * MergeSourceHasRouterSelect is a helper function that returns true of the source + * part of the merge query is a router query. + */ +static bool +MergeSourceHasRouterSelect(Query *query, + PlannerRestrictionContext *plannerRestrictionContext) +{ + Query *copiedQuery = copyObject(query); + RangeTblEntry *mergeSourceRte = ExtractMergeSourceRangeTableEntry(copiedQuery, true); + + if (mergeSourceRte == NULL) + { + /* + * We might potentially support this case in the future, but for now, + * we don't support MERGE with JOIN in the source. + */ + return false; + } + + ConvertSourceRTEIntoSubquery(copiedQuery, mergeSourceRte, plannerRestrictionContext); + Query *sourceQuery = mergeSourceRte->subquery; + + DistributedPlan *distributedPlan = CreateRouterPlan(sourceQuery, sourceQuery, + plannerRestrictionContext); + + return distributedPlan->planningError == NULL; +} + + /* * ErrorIfMergeQueryQualAndTargetListNotSupported does check for a MERGE command in the query, if it finds * one, it will verify the below criteria @@ -1288,7 +1341,7 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, * table or source query in USING clause. */ RangeTblEntry * -ExtractMergeSourceRangeTableEntry(Query *query) +ExtractMergeSourceRangeTableEntry(Query *query, bool joinSourceOk) { /* function is void for pre-15 versions of Postgres */ #if PG_VERSION_NUM < PG_VERSION_15 @@ -1301,7 +1354,10 @@ ExtractMergeSourceRangeTableEntry(Query *query) List *fromList = query->jointree->fromlist; - /* We should have only one RTE(MergeStmt->sourceRelation) in the from-list */ + /* + * We should have only one RTE(MergeStmt->sourceRelation) in the from-list + * unless Postgres community changes the representation of merge. + */ if (list_length(fromList) != 1) { ereport(ERROR, (errmsg("Unexpected source list in MERGE sql USING clause"))); @@ -1316,12 +1372,18 @@ ExtractMergeSourceRangeTableEntry(Query *query) */ if (reference->rtindex == 0) { - ereport(ERROR, (errmsg("Source is not an explicit query"), - errhint("Source query is a Join expression, " - "try converting into a query as SELECT * " - "FROM (..Join..)"))); + if (!joinSourceOk) + { + ereport(ERROR, (errmsg("Source is not an explicit query"), + errhint("Source query is a Join expression, " + "try converting into a query as SELECT * " + "FROM (..Join..)"))); + } + + return NULL; } + Assert(reference->rtindex >= 1); RangeTblEntry *subqueryRte = rt_fetch(reference->rtindex, query->rtable); diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 674077b46..ec0282840 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -295,7 +295,7 @@ NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors, CitusScanState *scanState = (CitusScanState *) node; DistributedPlan *distributedPlan = scanState->distributedPlan; Query *mergeQuery = distributedPlan->modifyQueryViaCoordinatorOrRepartition; - RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery); + RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery, false); /* * Create a copy because ExplainOneQuery can modify the query, and later diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 27027e064..0d7a0de78 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -396,7 +396,7 @@ ExtractSourceResultRangeTableEntry(Query *query) { if (IsMergeQuery(query)) { - return ExtractMergeSourceRangeTableEntry(query); + return ExtractMergeSourceRangeTableEntry(query, false); } else if (CheckInsertSelectQuery(query)) { diff --git a/src/include/distributed/merge_planner.h b/src/include/distributed/merge_planner.h index 1548dae6a..898292603 100644 --- a/src/include/distributed/merge_planner.h +++ b/src/include/distributed/merge_planner.h @@ -30,7 +30,7 @@ extern bool IsLocalTableModification(Oid targetRelationId, Query *query, extern void NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es); extern Var * FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query); -extern RangeTblEntry * ExtractMergeSourceRangeTableEntry(Query *query); +extern RangeTblEntry * ExtractMergeSourceRangeTableEntry(Query *query, bool joinSourceOk); #endif /* MERGE_PLANNER_H */ diff --git a/src/test/regress/expected/merge_schema_sharding.out b/src/test/regress/expected/merge_schema_sharding.out index fb4c0b235..8a9ba89dd 100644 --- a/src/test/regress/expected/merge_schema_sharding.out +++ b/src/test/regress/expected/merge_schema_sharding.out @@ -71,20 +71,24 @@ SET search_path TO schema_shard_table1; SET client_min_messages TO DEBUG2; MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b; +DEBUG: Creating router plan DEBUG: DEBUG: Creating MERGE router plan MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) WHEN MATCHED THEN DELETE; +DEBUG: Creating router plan DEBUG: DEBUG: Creating MERGE router plan MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); +DEBUG: Creating router plan DEBUG: DEBUG: Creating MERGE router plan MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); +DEBUG: Creating router plan DEBUG: DEBUG: Creating MERGE router plan SET search_path TO schema_shard_table2; @@ -195,6 +199,7 @@ WITH cte AS ( ) MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b; +DEBUG: Creating router plan DEBUG: DEBUG: Creating MERGE router plan WITH cte AS (