diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 106f47cbe..b5617d1c6 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -546,7 +546,8 @@ RegenerateTaskForFasthPathQuery(Job *workerJob) GenerateSingleShardRouterTaskList(workerJob, relationShardList, - placementList, shardId); + placementList, + shardId); } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 0bc31a9f5..01802d8ec 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -171,7 +171,8 @@ static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, uint32 taskId, TaskType taskType, - bool modifyRequiresCoordinatorEvaluation); + bool modifyRequiresCoordinatorEvaluation, + DeferredErrorMessage **planningError); static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction, Oid collation, ShardInterval *firstInterval, @@ -2105,11 +2106,16 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction relationRestrictionContext, &isMultiShardQuery, NULL); + DeferredErrorMessage *deferredErrorMessage = NULL; sqlTaskList = QueryPushdownSqlTaskList(job->jobQuery, job->jobId, plannerRestrictionContext-> relationRestrictionContext, prunedRelationShardList, READ_TASK, - false); + false, + &deferredErrorMessage); + if (deferredErrorMessage != NULL) { + RaiseDeferredErrorInternal(deferredErrorMessage, ERROR); + } } else { @@ -2187,7 +2193,8 @@ List * QueryPushdownSqlTaskList(Query *query, uint64 jobId, RelationRestrictionContext *relationRestrictionContext, List *prunedRelationShardList, TaskType taskType, bool - modifyRequiresCoordinatorEvaluation) + modifyRequiresCoordinatorEvaluation, + DeferredErrorMessage **planningError) { List *sqlTaskList = NIL; ListCell *restrictionCell = NULL; @@ -2201,8 +2208,11 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, if (list_length(relationRestrictionContext->relationRestrictionList) == 0) { - ereport(ERROR, (errmsg("cannot handle complex subqueries when the " - "router executor is disabled"))); + *planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot handle complex subqueries when the " + "router executor is disabled", + NULL, NULL); + return NIL; } /* defaults to be used if this is a reference table-only query */ @@ -2227,8 +2237,11 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, /* we expect distributed tables to have the same shard count */ if (shardCount > 0 && shardCount != cacheEntry->shardIntervalArrayLength) { - ereport(ERROR, (errmsg("shard counts of co-located tables do not " - "match"))); + *planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "shard counts of co-located tables do not " + "match", + NULL, NULL); + return NIL; } if (taskRequiredForShardIndex == NULL) @@ -2291,7 +2304,11 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, relationRestrictionContext, taskIdIndex, taskType, - modifyRequiresCoordinatorEvaluation); + modifyRequiresCoordinatorEvaluation, + planningError); + if (*planningError != NULL) { + return NIL; + } subqueryTask->jobId = jobId; sqlTaskList = lappend(sqlTaskList, subqueryTask); @@ -2467,7 +2484,8 @@ ErrorIfUnsupportedShardDistribution(Query *query) static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, uint32 taskId, - TaskType taskType, bool modifyRequiresCoordinatorEvaluation) + TaskType taskType, bool modifyRequiresCoordinatorEvaluation, + DeferredErrorMessage **planningError) { Query *taskQuery = copyObject(originalQuery); @@ -2546,8 +2564,12 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, List *taskPlacementList = PlacementsForWorkersContainingAllShards(taskShardList); if (list_length(taskPlacementList) == 0) { - ereport(ERROR, (errmsg("cannot find a worker that has active placements for all " - "shards in the query"))); + *planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot find a worker that has active placements for all " + "shards in the query", + NULL, NULL); + + return NULL; } /* diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 537091359..01ad53a6e 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1781,12 +1781,16 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon relationRestrictionContext, prunedShardIntervalListList, MODIFY_TASK, - requiresCoordinatorEvaluation); + requiresCoordinatorEvaluation, + planningError); + if (*planningError) { + return NULL; + } } else { GenerateSingleShardRouterTaskList(job, relationShardList, - placementList, shardId); + placementList, shardId); } job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation; @@ -1806,14 +1810,12 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, { Query *originalQuery = job->jobQuery; - if (originalQuery->commandType == CMD_SELECT) { job->taskList = SingleShardTaskList(originalQuery, job->jobId, relationShardList, placementList, shardId, job->parametersInJobQueryResolved); - /* * Queries to reference tables, or distributed tables with multiple replica's have * their task placements reordered according to the configured diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index c15c6bcf9..bdd38cb54 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -576,7 +576,8 @@ extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId, RelationRestrictionContext * relationRestrictionContext, List *prunedRelationShardList, TaskType taskType, - bool modifyRequiresCoordinatorEvaluation); + bool modifyRequiresCoordinatorEvaluation, + DeferredErrorMessage **planningError); /* function declarations for managing jobs */ extern uint64 UniqueJobId(void); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 7dff1015c..a6b6b6152 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -83,8 +83,9 @@ extern List * TargetShardIntervalForFastPathQuery(Query *query, Const *inputDistributionKeyValue, Const **outGoingPartitionValueConst); extern void GenerateSingleShardRouterTaskList(Job *job, - List *relationShardList, - List *placementList, uint64 shardId); + List *relationShardList, + List *placementList, + uint64 shardId); extern bool IsRouterPlannable(Query *query, PlannerRestrictionContext *plannerRestrictionContext);