adding some changes

pull/7643/head
paragjain 2024-07-04 10:15:08 +00:00
parent 7ad8d711cd
commit a9dcd729e7
1 changed files with 18 additions and 26 deletions

View File

@ -243,14 +243,19 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ
CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId); CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId);
/* bool repartitioned = IsSupportedRedistributionTarget(targetRelationId);
* Get the index of the column in the source query that will be utilized
* to repartition the source rows, ensuring colocation with the target if(repartitioned)
*/ {
distributedPlan->sourceResultRepartitionColumnIndex = /*
SourceResultPartitionColumnIndex(mergeQuery, * Get the index of the column in the source query that will be utilized
sourceQuery->targetList, * to repartition the source rows, ensuring colocation with the target
targetRelation); */
distributedPlan->sourceResultRepartitionColumnIndex =
SourceResultPartitionColumnIndex(mergeQuery,
sourceQuery->targetList,
targetRelation);
}
/* /*
* Make a copy of the source query, since following code scribbles it * Make a copy of the source query, since following code scribbles it
@ -262,9 +267,7 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ
int cursorOptions = CURSOR_OPT_PARALLEL_OK; int cursorOptions = CURSOR_OPT_PARALLEL_OK;
PlannedStmt *sourceRowsPlan = pg_plan_query(sourceQueryCopy, NULL, cursorOptions, PlannedStmt *sourceRowsPlan = pg_plan_query(sourceQueryCopy, NULL, cursorOptions,
boundParams); boundParams);
bool repartitioned = IsRedistributablePlan(sourceRowsPlan->planTree) && repartitioned = repartitioned && IsRedistributablePlan(sourceRowsPlan->planTree);
IsSupportedRedistributionTarget(targetRelationId);
/* If plan is distributed, no work at the coordinator */ /* If plan is distributed, no work at the coordinator */
if (repartitioned) if (repartitioned)
{ {
@ -1005,18 +1008,12 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
List *localTablesList = NIL; List *localTablesList = NIL;
RangeTblEntry *rangeTableEntry = NULL; RangeTblEntry *rangeTableEntry = NULL;
bool areAllDistTablesSingleSharded = true;
foreach_ptr(rangeTableEntry, rangeTableList) foreach_ptr(rangeTableEntry, rangeTableList)
{ {
Oid relationId = rangeTableEntry->relid; Oid relationId = rangeTableEntry->relid;
if (IsCitusTableType(relationId, DISTRIBUTED_TABLE)) if (IsCitusTableType(relationId, DISTRIBUTED_TABLE))
{ {
if (!IsCitusTableType(relationId, SINGLE_SHARD_DISTRIBUTED))
{
areAllDistTablesSingleSharded = false;
}
distTablesList = lappend(distTablesList, rangeTableEntry); distTablesList = lappend(distTablesList, rangeTableEntry);
} }
else if (IsCitusTableType(relationId, REFERENCE_TABLE)) else if (IsCitusTableType(relationId, REFERENCE_TABLE))
@ -1067,12 +1064,6 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
/* Ensure all distributed tables are indeed co-located */ /* Ensure all distributed tables are indeed co-located */
if (!AllDistributedRelationsInRTEListColocated(distTablesList)) if (!AllDistributedRelationsInRTEListColocated(distTablesList))
{ {
/* All distributed tables are colocated and single sharded so we can push down to workers */
if (areAllDistTablesSingleSharded)
{
return NULL;
}
ereport(DEBUG1, (errmsg("Distributed tables are not co-located, try " ereport(DEBUG1, (errmsg("Distributed tables are not co-located, try "
"repartitioning"))); "repartitioning")));
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
@ -1280,6 +1271,9 @@ ValidateAndReturnVarIfSupported(Node *entryExpr)
* ON clause and verifies if there is a join, either left or right, with * ON clause and verifies if there is a join, either left or right, with
* the distribution column of the given target. Once a match is found, it * the distribution column of the given target. Once a match is found, it
* returns the index of that match in the source's target list. * returns the index of that match in the source's target list.
*
* In the case of SINGLE_SHARD_DISTRIBUTED, there is no distribution key available for the target, which prevents repartitioning.
* Therefore, return -1 to signal the caller to utilize the Pull to coordinator approach.
*/ */
static int static int
SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
@ -1287,9 +1281,7 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
{ {
if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED)) if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED))
{ {
ereport(ERROR, (errmsg("MERGE operation across distributed schemas " return -1;
"or with a row-based distributed table is "
"not yet supported")));
} }
/* Get all the Join conditions from the ON clause */ /* Get all the Join conditions from the ON clause */