diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 89a3f7f6a..a3019e0ec 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -243,14 +243,19 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId); - /* - * Get the index of the column in the source query that will be utilized - * to repartition the source rows, ensuring colocation with the target - */ - distributedPlan->sourceResultRepartitionColumnIndex = - SourceResultPartitionColumnIndex(mergeQuery, - sourceQuery->targetList, - targetRelation); + bool repartitioned = IsSupportedRedistributionTarget(targetRelationId); + + if(repartitioned) + { + /* + * Get the index of the column in the source query that will be utilized + * to repartition the source rows, ensuring colocation with the target + */ + distributedPlan->sourceResultRepartitionColumnIndex = + SourceResultPartitionColumnIndex(mergeQuery, + sourceQuery->targetList, + targetRelation); + } /* * Make a copy of the source query, since following code scribbles it @@ -262,9 +267,7 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ int cursorOptions = CURSOR_OPT_PARALLEL_OK; PlannedStmt *sourceRowsPlan = pg_plan_query(sourceQueryCopy, NULL, cursorOptions, boundParams); - bool repartitioned = IsRedistributablePlan(sourceRowsPlan->planTree) && - IsSupportedRedistributionTarget(targetRelationId); - + repartitioned = repartitioned && IsRedistributablePlan(sourceRowsPlan->planTree); /* If plan is distributed, no work at the coordinator */ if (repartitioned) { @@ -1005,18 +1008,12 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, List *localTablesList = NIL; RangeTblEntry *rangeTableEntry = NULL; - bool areAllDistTablesSingleSharded = true; foreach_ptr(rangeTableEntry, rangeTableList) { Oid relationId = rangeTableEntry->relid; if (IsCitusTableType(relationId, DISTRIBUTED_TABLE)) { - if (!IsCitusTableType(relationId, SINGLE_SHARD_DISTRIBUTED)) - { - areAllDistTablesSingleSharded = false; - } - distTablesList = lappend(distTablesList, rangeTableEntry); } else if (IsCitusTableType(relationId, REFERENCE_TABLE)) @@ -1067,12 +1064,6 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, /* Ensure all distributed tables are indeed co-located */ if (!AllDistributedRelationsInRTEListColocated(distTablesList)) { - /* All distributed tables are colocated and single sharded so we can push down to workers */ - if (areAllDistTablesSingleSharded) - { - return NULL; - } - ereport(DEBUG1, (errmsg("Distributed tables are not co-located, try " "repartitioning"))); return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, @@ -1280,6 +1271,9 @@ ValidateAndReturnVarIfSupported(Node *entryExpr) * ON clause and verifies if there is a join, either left or right, with * the distribution column of the given target. Once a match is found, it * returns the index of that match in the source's target list. + * + * In the case of SINGLE_SHARD_DISTRIBUTED, there is no distribution key available for the target, which prevents repartitioning. + * Therefore, return -1 to signal the caller to utilize the Pull to coordinator approach. */ static int SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, @@ -1287,9 +1281,7 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, { if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED)) { - ereport(ERROR, (errmsg("MERGE operation across distributed schemas " - "or with a row-based distributed table is " - "not yet supported"))); + return -1; } /* Get all the Join conditions from the ON clause */