replan fastpath queries instead of rerouting them for the sake of clean code

reroute-non-fastpath
aykutbozkurt 2022-11-19 19:03:06 +03:00
parent 6b0f64e0b2
commit b3500194fc
1 changed files with 21 additions and 59 deletions

View File

@ -62,7 +62,7 @@ static void CitusBeginModifyScan(CustomScanState *node, EState *estate, int efla
static void CitusPreExecScan(CitusScanState *scanState);
static bool ModifyJobNeedsEvaluation(Job *workerJob);
static void RegenerateTaskForFasthPathQuery(Job *workerJob);
static DistributedPlan * RePlanNonFastPathQuery(DistributedPlan *distributedPlan);
static DistributedPlan * RePlanTopLevelQuery(DistributedPlan *distributedPlan);
static void RegenerateTaskListForInsert(Job *workerJob);
static DistributedPlan * CopyDistributedPlanWithoutCache(
DistributedPlan *originalDistributedPlan);
@ -72,7 +72,6 @@ static void SetJobColocationId(Job *job);
static void EnsureForceDelegationDistributionKey(Job *job);
static void EnsureAnchorShardsInJobExist(Job *job);
static bool AnchorShardsInTaskListExist(List *taskList);
static void TryToRerouteFastPathModifyQuery(Job *job);
/* create custom scan methods for all executors */
@ -413,39 +412,26 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
AcquireMetadataLocks(workerJob->taskList);
/*
* In case of a split, the shard might no longer be available. In that
* case try to reroute. We reroute missing shards for fast-path queries.
* And we have to replan for non-fastpath queries as pruning directly depends
* on postgres planner. (Might be optimized if we have enough info fed from
* planning phase. That way, we can recompute tasks similarly but it is more complex.)
* in case of a split, the shard might no longer be available. In that case,
* we have to replan top level query to prevent unfriendly 'missing shards' message.
*
* we should only replan if we have valid topLevelQueryContext which means our plan
* is top level plan (not a subplan).
*/
if (!AnchorShardsInTaskListExist(workerJob->taskList))
if (!AnchorShardsInTaskListExist(workerJob->taskList) &&
originalDistributedPlan->topLevelQueryContext)
{
if (currentPlan->fastPathRouterPlan)
{
TryToRerouteFastPathModifyQuery(workerJob);
}
else
{
/*
* we should only replan if we have valid topLevelQueryContext which means our plan
* is top level plan (not a subplan)
*/
if (originalDistributedPlan->topLevelQueryContext)
{
DistributedPlan *newDistributedPlan = RePlanNonFastPathQuery(
originalDistributedPlan);
scanState->distributedPlan = newDistributedPlan;
DistributedPlan *newDistributedPlan = RePlanTopLevelQuery(
originalDistributedPlan);
scanState->distributedPlan = newDistributedPlan;
/*
* switch to oldcontext and restart CitusBeginModifyScan (maybe to regenerate tasks
* due to deferredPruning)
*/
MemoryContextSwitchTo(oldContext);
CitusBeginModifyScan((CustomScanState *) scanState, estate, eflags);
return;
}
}
/*
* switch to oldcontext and restart CitusBeginModifyScan (maybe to regenerate tasks
* due to deferredPruning)
*/
MemoryContextSwitchTo(oldContext);
CitusBeginModifyScan((CustomScanState *) scanState, estate, eflags);
return;
}
/* ensure there is no invalid shard */
@ -485,27 +471,6 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
}
/*
* TryToRerouteFastPathModifyQuery tries to reroute non-existent shards in given job if it finds any such shard,
* only for fastpath queries.
*
* Should only be called if the job belongs to a fastpath modify query
*/
static void
TryToRerouteFastPathModifyQuery(Job *job)
{
if (job->jobQuery->commandType == CMD_INSERT)
{
RegenerateTaskListForInsert(job);
}
else
{
RegenerateTaskForFasthPathQuery(job);
RebuildQueryStrings(job);
}
}
/*
* EnsureAnchorShardsInJobExist ensures all shards are valid in job.
* If it finds a non-existent shard in given job, it throws an error.
@ -687,17 +652,14 @@ RegenerateTaskForFasthPathQuery(Job *workerJob)
/*
* RePlanNonFastPathQuery replans the initial query, which is stored in the distributed
* RePlanTopLevelQuery replans the initial query, which is stored in the distributed
* plan, at the start of the planning.
*
* That method should only be used when we detect any missing shard at execution
* phase.
* That method is supposed to be used when we detect any missing shard just before execution.
*/
static DistributedPlan *
RePlanNonFastPathQuery(DistributedPlan *oldPlan)
RePlanTopLevelQuery(DistributedPlan *oldPlan)
{
Assert(!oldPlan->fastPathRouterPlan);
/* extract top level query info from the TopLevelQueryContext stored in the old plan */
TopLevelQueryContext *topLevelQueryContext = oldPlan->topLevelQueryContext;