mirror of https://github.com/citusdata/citus.git
Convert some hard coded errors to deferred errors in router planner
parent
69992d58f9
commit
28c5b6a425
|
@ -546,7 +546,8 @@ RegenerateTaskForFasthPathQuery(Job *workerJob)
|
|||
|
||||
GenerateSingleShardRouterTaskList(workerJob,
|
||||
relationShardList,
|
||||
placementList, shardId);
|
||||
placementList,
|
||||
shardId);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -171,7 +171,8 @@ static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
|||
RelationRestrictionContext *restrictionContext,
|
||||
uint32 taskId,
|
||||
TaskType taskType,
|
||||
bool modifyRequiresCoordinatorEvaluation);
|
||||
bool modifyRequiresCoordinatorEvaluation,
|
||||
DeferredErrorMessage **planningError);
|
||||
static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction,
|
||||
Oid collation,
|
||||
ShardInterval *firstInterval,
|
||||
|
@ -2105,11 +2106,16 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
|
|||
relationRestrictionContext,
|
||||
&isMultiShardQuery, NULL);
|
||||
|
||||
DeferredErrorMessage *deferredErrorMessage = NULL;
|
||||
sqlTaskList = QueryPushdownSqlTaskList(job->jobQuery, job->jobId,
|
||||
plannerRestrictionContext->
|
||||
relationRestrictionContext,
|
||||
prunedRelationShardList, READ_TASK,
|
||||
false);
|
||||
false,
|
||||
&deferredErrorMessage);
|
||||
if (deferredErrorMessage != NULL) {
|
||||
RaiseDeferredErrorInternal(deferredErrorMessage, ERROR);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -2187,7 +2193,8 @@ List *
|
|||
QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
||||
RelationRestrictionContext *relationRestrictionContext,
|
||||
List *prunedRelationShardList, TaskType taskType, bool
|
||||
modifyRequiresCoordinatorEvaluation)
|
||||
modifyRequiresCoordinatorEvaluation,
|
||||
DeferredErrorMessage **planningError)
|
||||
{
|
||||
List *sqlTaskList = NIL;
|
||||
ListCell *restrictionCell = NULL;
|
||||
|
@ -2201,8 +2208,11 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
|||
|
||||
if (list_length(relationRestrictionContext->relationRestrictionList) == 0)
|
||||
{
|
||||
ereport(ERROR, (errmsg("cannot handle complex subqueries when the "
|
||||
"router executor is disabled")));
|
||||
*planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"cannot handle complex subqueries when the "
|
||||
"router executor is disabled",
|
||||
NULL, NULL);
|
||||
return NIL;
|
||||
}
|
||||
|
||||
/* defaults to be used if this is a reference table-only query */
|
||||
|
@ -2227,8 +2237,11 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
|||
/* we expect distributed tables to have the same shard count */
|
||||
if (shardCount > 0 && shardCount != cacheEntry->shardIntervalArrayLength)
|
||||
{
|
||||
ereport(ERROR, (errmsg("shard counts of co-located tables do not "
|
||||
"match")));
|
||||
*planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"shard counts of co-located tables do not "
|
||||
"match",
|
||||
NULL, NULL);
|
||||
return NIL;
|
||||
}
|
||||
|
||||
if (taskRequiredForShardIndex == NULL)
|
||||
|
@ -2291,7 +2304,11 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
|||
relationRestrictionContext,
|
||||
taskIdIndex,
|
||||
taskType,
|
||||
modifyRequiresCoordinatorEvaluation);
|
||||
modifyRequiresCoordinatorEvaluation,
|
||||
planningError);
|
||||
if (*planningError != NULL) {
|
||||
return NIL;
|
||||
}
|
||||
subqueryTask->jobId = jobId;
|
||||
sqlTaskList = lappend(sqlTaskList, subqueryTask);
|
||||
|
||||
|
@ -2467,7 +2484,8 @@ ErrorIfUnsupportedShardDistribution(Query *query)
|
|||
static Task *
|
||||
QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
||||
RelationRestrictionContext *restrictionContext, uint32 taskId,
|
||||
TaskType taskType, bool modifyRequiresCoordinatorEvaluation)
|
||||
TaskType taskType, bool modifyRequiresCoordinatorEvaluation,
|
||||
DeferredErrorMessage **planningError)
|
||||
{
|
||||
Query *taskQuery = copyObject(originalQuery);
|
||||
|
||||
|
@ -2546,8 +2564,12 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
|||
List *taskPlacementList = PlacementsForWorkersContainingAllShards(taskShardList);
|
||||
if (list_length(taskPlacementList) == 0)
|
||||
{
|
||||
ereport(ERROR, (errmsg("cannot find a worker that has active placements for all "
|
||||
"shards in the query")));
|
||||
*planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"cannot find a worker that has active placements for all "
|
||||
"shards in the query",
|
||||
NULL, NULL);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -1781,12 +1781,16 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
|||
relationRestrictionContext,
|
||||
prunedShardIntervalListList,
|
||||
MODIFY_TASK,
|
||||
requiresCoordinatorEvaluation);
|
||||
requiresCoordinatorEvaluation,
|
||||
planningError);
|
||||
if (*planningError) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
GenerateSingleShardRouterTaskList(job, relationShardList,
|
||||
placementList, shardId);
|
||||
placementList, shardId);
|
||||
}
|
||||
|
||||
job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation;
|
||||
|
@ -1806,14 +1810,12 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
|||
{
|
||||
Query *originalQuery = job->jobQuery;
|
||||
|
||||
|
||||
if (originalQuery->commandType == CMD_SELECT)
|
||||
{
|
||||
job->taskList = SingleShardTaskList(originalQuery, job->jobId,
|
||||
relationShardList, placementList,
|
||||
shardId,
|
||||
job->parametersInJobQueryResolved);
|
||||
|
||||
/*
|
||||
* Queries to reference tables, or distributed tables with multiple replica's have
|
||||
* their task placements reordered according to the configured
|
||||
|
|
|
@ -576,7 +576,8 @@ extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
|||
RelationRestrictionContext *
|
||||
relationRestrictionContext,
|
||||
List *prunedRelationShardList, TaskType taskType,
|
||||
bool modifyRequiresCoordinatorEvaluation);
|
||||
bool modifyRequiresCoordinatorEvaluation,
|
||||
DeferredErrorMessage **planningError);
|
||||
|
||||
/* function declarations for managing jobs */
|
||||
extern uint64 UniqueJobId(void);
|
||||
|
|
|
@ -83,8 +83,9 @@ extern List * TargetShardIntervalForFastPathQuery(Query *query,
|
|||
Const *inputDistributionKeyValue,
|
||||
Const **outGoingPartitionValueConst);
|
||||
extern void GenerateSingleShardRouterTaskList(Job *job,
|
||||
List *relationShardList,
|
||||
List *placementList, uint64 shardId);
|
||||
List *relationShardList,
|
||||
List *placementList,
|
||||
uint64 shardId);
|
||||
extern bool IsRouterPlannable(Query *query,
|
||||
PlannerRestrictionContext *plannerRestrictionContext);
|
||||
|
||||
|
|
Loading…
Reference in New Issue