POC - avoid deparse - parse - plan for local execution on single shard

colm/sshard-poc
Colm McHugh 2025-05-29 10:55:07 +00:00
parent 55a0d1f730
commit 97cbfb33f0
11 changed files with 583 additions and 82 deletions

View File

@ -313,6 +313,7 @@ ExecuteLocalTaskListExtended(List *taskList,
{ {
int taskNumParams = numParams; int taskNumParams = numParams;
Oid *taskParameterTypes = parameterTypes; Oid *taskParameterTypes = parameterTypes;
int taskType = GetTaskQueryType(task);
if (task->parametersInQueryStringResolved) if (task->parametersInQueryStringResolved)
{ {
@ -330,7 +331,7 @@ ExecuteLocalTaskListExtended(List *taskList,
* for concatenated strings, we set queryStringList so that we can access * for concatenated strings, we set queryStringList so that we can access
* each query string. * each query string.
*/ */
if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) if (taskType == TASK_QUERY_TEXT_LIST)
{ {
List *queryStringList = task->taskQuery.data.queryStringList; List *queryStringList = task->taskQuery.data.queryStringList;
totalRowsProcessed += totalRowsProcessed +=
@ -342,6 +343,12 @@ ExecuteLocalTaskListExtended(List *taskList,
continue; continue;
} }
if (taskType == TASK_QUERY_LOCAL_PLAN)
{
localPlan = task->taskQuery.data.localPlan;
}
else
{
Query *shardQuery = ParseQueryString(TaskQueryString(task), Query *shardQuery = ParseQueryString(TaskQueryString(task),
taskParameterTypes, taskParameterTypes,
taskNumParams); taskNumParams);
@ -359,6 +366,7 @@ ExecuteLocalTaskListExtended(List *taskList,
*/ */
localPlan = planner(shardQuery, NULL, cursorOptions, paramListInfo); localPlan = planner(shardQuery, NULL, cursorOptions, paramListInfo);
} }
}
char *shardQueryString = NULL; char *shardQueryString = NULL;
if (GetTaskQueryType(task) == TASK_QUERY_TEXT) if (GetTaskQueryType(task) == TASK_QUERY_TEXT)

View File

@ -439,6 +439,24 @@ SetTaskQueryStringList(Task *task, List *queryStringList)
} }
void
SetTaskQueryPlan(Task *task, PlannedStmt *localPlan)
{
Assert(localPlan != NULL);
task->taskQuery.queryType = TASK_QUERY_LOCAL_PLAN;
task->taskQuery.data.localPlan = localPlan;
task->queryCount = 1;
}
PlannedStmt *
TaskQueryLocalPlan(Task *task)
{
Assert(GetTaskQueryType(task) == TASK_QUERY_LOCAL_PLAN);
return task->taskQuery.data.localPlan;
}
/* /*
* DeparseTaskQuery is a general way of deparsing a query based on a task. * DeparseTaskQuery is a general way of deparsing a query based on a task.
*/ */

View File

@ -97,6 +97,10 @@ static bool ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable); bool *maybeHasForeignDistributedTable);
static PlannedStmt * CreateDistributedPlannedStmt( static PlannedStmt * CreateDistributedPlannedStmt(
DistributedPlanningContext *planContext); DistributedPlanningContext *planContext);
static PlannedStmt * CreateFastPathDistributedPlannedStmt(
DistributedPlanningContext *planContext);
static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
DistributedPlanningContext DistributedPlanningContext
*planContext); *planContext);
@ -135,13 +139,13 @@ static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo,
Const *resultFormatConst); Const *resultFormatConst);
static List * OuterPlanParamsList(PlannerInfo *root); static List * OuterPlanParamsList(PlannerInfo *root);
static List * CopyPlanParamList(List *originalPlanParamList); static List * CopyPlanParamList(List *originalPlanParamList);
static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void); static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(
FastPathRestrictionContext *fastPathContext);
static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void); static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
static void PopPlannerRestrictionContext(void); static void PopPlannerRestrictionContext(void);
static void ResetPlannerRestrictionContext( static void ResetPlannerRestrictionContext(
PlannerRestrictionContext *plannerRestrictionContext); PlannerRestrictionContext *plannerRestrictionContext);
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext);
Node *distributionKeyValue);
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
int rteIdCounter); int rteIdCounter);
static RTEListProperties * GetRTEListProperties(List *rangeTableList); static RTEListProperties * GetRTEListProperties(List *rangeTableList);
@ -156,6 +160,8 @@ static bool CheckPostPlanDistribution(bool isDistributedQuery,
Query *origQuery, Query *origQuery,
List *rangeTableList, List *rangeTableList,
Query *plannedQuery); Query *plannedQuery);
static DistributedPlan * CreateFastPathDistributedPlan(
DistributedPlanningContext *planContext);
/* Distributed planner hook */ /* Distributed planner hook */
PlannedStmt * PlannedStmt *
@ -166,7 +172,7 @@ distributed_planner(Query *parse,
{ {
bool needsDistributedPlanning = false; bool needsDistributedPlanning = false;
bool fastPathRouterQuery = false; bool fastPathRouterQuery = false;
Node *distributionKeyValue = NULL; FastPathRestrictionContext fastPathContext = { 0 };
List *rangeTableList = ExtractRangeTableEntryList(parse); List *rangeTableList = ExtractRangeTableEntryList(parse);
@ -191,7 +197,8 @@ distributed_planner(Query *parse,
&maybeHasForeignDistributedTable); &maybeHasForeignDistributedTable);
if (needsDistributedPlanning) if (needsDistributedPlanning)
{ {
fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue); fastPathRouterQuery = FastPathRouterQuery(parse, query_string,
&fastPathContext);
if (maybeHasForeignDistributedTable) if (maybeHasForeignDistributedTable)
{ {
@ -248,7 +255,8 @@ distributed_planner(Query *parse,
HideCitusDependentObjectsOnQueriesOfPgMetaTables((Node *) parse, NULL); HideCitusDependentObjectsOnQueriesOfPgMetaTables((Node *) parse, NULL);
/* create a restriction context and put it at the end if context list */ /* create a restriction context and put it at the end if context list */
planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(); planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(
&fastPathContext);
/* /*
* We keep track of how many times we've recursed into the planner, primarily * We keep track of how many times we've recursed into the planner, primarily
@ -264,7 +272,7 @@ distributed_planner(Query *parse,
{ {
if (fastPathRouterQuery) if (fastPathRouterQuery)
{ {
result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue); result = PlanFastPathDistributedStmt(&planContext);
} }
else else
{ {
@ -649,34 +657,23 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan)
* the FastPathPlanner. * the FastPathPlanner.
*/ */
static PlannedStmt * static PlannedStmt *
PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, PlanFastPathDistributedStmt(DistributedPlanningContext *planContext)
Node *distributionKeyValue)
{ {
FastPathRestrictionContext *fastPathContext = FastPathRestrictionContext *fastPathContext =
planContext->plannerRestrictionContext->fastPathRestrictionContext; planContext->plannerRestrictionContext->fastPathRestrictionContext;
planContext->plannerRestrictionContext->fastPathRestrictionContext-> if (!fastPathContext->delayFastPathPlanning)
fastPathRouterQuery = true;
if (distributionKeyValue == NULL)
{ {
/* nothing to record */ planContext->plan = FastPathPlanner(planContext->originalQuery,
} planContext->query,
else if (IsA(distributionKeyValue, Const))
{
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
}
else if (IsA(distributionKeyValue, Param))
{
fastPathContext->distributionKeyHasParam = true;
}
planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query,
planContext->boundParams); planContext->boundParams);
return CreateDistributedPlannedStmt(planContext); return CreateDistributedPlannedStmt(planContext);
} }
return CreateFastPathDistributedPlannedStmt(planContext);
}
/* /*
* PlanDistributedStmt creates a distributed planned statement using the PG * PlanDistributedStmt creates a distributed planned statement using the PG
@ -826,6 +823,76 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
} }
static PlannedStmt *
CreateFastPathDistributedPlannedStmt(DistributedPlanningContext *planContext)
{
uint64 planId = NextPlanId++;
bool hasUnresolvedParams = false; /* todo: need to assign this from CreateFastPathDistributedPlan() output */
PlannedStmt *resultPlan = NULL;
DistributedPlan *distributedPlan = CreateFastPathDistributedPlan(planContext);
/*
* If no plan was generated, prepare a generic error to be emitted.
* Normally this error message will never returned to the user, as it's
* usually due to unresolved prepared statement parameters - in that case
* the logic below will force a custom plan (i.e. with parameters bound to
* specific values) to be generated. But sql (not plpgsql) functions
* unfortunately don't go through a codepath supporting custom plans - so
* we still need to have an error prepared.
*/
if (!distributedPlan)
{
/* currently always should have a more specific error otherwise */
distributedPlan = CitusMakeNode(DistributedPlan);
distributedPlan->planningError =
DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"could not create distributed plan",
"Possibly this is caused by the use of parameters in SQL "
"functions, which is not supported in Citus.",
"Consider using PL/pgSQL functions instead.");
}
/*
* Error out if none of the planners resulted in a usable plan, unless the
* error was possibly triggered by missing parameters. In that case we'll
* not error out here, but instead rely on postgres' custom plan logic.
* Postgres re-plans prepared statements the first five executions
* (i.e. it produces custom plans), after that the cost of a generic plan
* is compared with the average custom plan cost. We support otherwise
* unsupported prepared statement parameters by assigning an exorbitant
* cost to the unsupported query. That'll lead to the custom plan being
* chosen. But for that to be possible we can't error out here, as
* otherwise that logic is never reached.
*/
if (distributedPlan->planningError && !hasUnresolvedParams)
{
RaiseDeferredError(distributedPlan->planningError, ERROR);
}
/* remember the plan's identifier for identifying subplans */
distributedPlan->planId = planId;
/* create final plan by combining local plan with distributed plan. todo: FIXME */
resultPlan = FinalizePlan(planContext->plan, distributedPlan);
/*
* As explained above, force planning costs to be unrealistically high if
* query planning failed (possibly) due to prepared statement parameters or
* if it is planned as a multi shard modify query.
*/
if ((distributedPlan->planningError ||
(UpdateOrDeleteOrMergeQuery(planContext->originalQuery) && IsMultiTaskPlan(
distributedPlan))) &&
hasUnresolvedParams)
{
DissuadePlannerFromUsingPlan(resultPlan);
}
return resultPlan;
}
/* /*
* InlineCtesAndCreateDistributedPlannedStmt gets all the parameters required * InlineCtesAndCreateDistributedPlannedStmt gets all the parameters required
* for creating a distributed planned statement. The function is primarily a * for creating a distributed planned statement. The function is primarily a
@ -1225,6 +1292,95 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
} }
static DistributedPlan *
CreateFastPathDistributedPlan(DistributedPlanningContext *planContext)
{
DistributedPlan *distributedPlan = NULL;
Query *originalQuery = planContext->originalQuery;
Query *query = planContext->query;
ParamListInfo boundParams = planContext->boundParams;
PlannerRestrictionContext *plannerRestrictionContext =
planContext->plannerRestrictionContext;
bool hasUnresolvedParams = false;
if (HasUnresolvedExternParamsWalker((Node *) originalQuery,
boundParams))
{
hasUnresolvedParams = true;
}
/* Router planner only */
RouterPlanType routerPlan = GetRouterPlanType(query, originalQuery,
hasUnresolvedParams);
switch (routerPlan)
{
case DML_QUERY:
case SELECT_QUERY:
{
/*
* modifications and selects are handled via the same fast path routing.
*/
distributedPlan =
CreateFastPathRouterPlan(planContext);
break;
}
case INSERT_SELECT_INTO_CITUS_TABLE:
case INSERT_SELECT_INTO_LOCAL_TABLE:
case MERGE_QUERY:
case REPLAN_WITH_BOUND_PARAMETERS:
{
/*
* Unexpected plan type - error out for now.
*/
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"unexpected plan type for fast path single shard planning")));
}
}
/* the functions above always return a plan, possibly with an error */
Assert(distributedPlan);
if (distributedPlan->planningError == NULL)
{
return distributedPlan;
}
else
{
RaiseDeferredError(distributedPlan->planningError, DEBUG2);
}
if (hasUnresolvedParams)
{
/*
* There are parameters that don't have a value in boundParams.
*
* The remainder of the planning logic cannot handle unbound
* parameters. We return a NULL plan, which will have an
* extremely high cost, such that postgres will replan with
* bound parameters.
*/
return NULL;
}
/* force evaluation of bound params */
boundParams = copyParamList(boundParams);
/*
* If there are parameters that do have a value in boundParams, replace
* them in the original query. This allows us to more easily cut the
* query into pieces (during recursive planning) or deparse parts of
* the query (during subquery pushdown planning).
*/
originalQuery = (Query *) ResolveExternalParams((Node *) originalQuery,
boundParams);
Assert(originalQuery != NULL);
return distributedPlan;
}
/* /*
* EnsurePartitionTableNotReplicated errors out if the input relation is * EnsurePartitionTableNotReplicated errors out if the input relation is
* a partition table and the table has a replication factor greater than * a partition table and the table has a replication factor greater than
@ -1377,7 +1533,7 @@ GetDistributedPlan(CustomScan *customScan)
Node *node = (Node *) linitial(customScan->custom_private); Node *node = (Node *) linitial(customScan->custom_private);
Assert(CitusIsA(node, DistributedPlan)); Assert(CitusIsA(node, DistributedPlan));
CheckNodeCopyAndSerialization(node); /* CheckNodeCopyAndSerialization(node); commented out for local perf profiling */
DistributedPlan *distributedPlan = (DistributedPlan *) node; DistributedPlan *distributedPlan = (DistributedPlan *) node;
@ -2413,7 +2569,8 @@ CopyPlanParamList(List *originalPlanParamList)
* inserted to the beginning of the plannerRestrictionContextList and it is returned. * inserted to the beginning of the plannerRestrictionContextList and it is returned.
*/ */
static PlannerRestrictionContext * static PlannerRestrictionContext *
CreateAndPushPlannerRestrictionContext(void) CreateAndPushPlannerRestrictionContext(
FastPathRestrictionContext *fastPathRestrictionContext)
{ {
PlannerRestrictionContext *plannerRestrictionContext = PlannerRestrictionContext *plannerRestrictionContext =
palloc0(sizeof(PlannerRestrictionContext)); palloc0(sizeof(PlannerRestrictionContext));
@ -2427,6 +2584,14 @@ CreateAndPushPlannerRestrictionContext(void)
plannerRestrictionContext->fastPathRestrictionContext = plannerRestrictionContext->fastPathRestrictionContext =
palloc0(sizeof(FastPathRestrictionContext)); palloc0(sizeof(FastPathRestrictionContext));
if (fastPathRestrictionContext != NULL)
{
/* copy the fast path restriction context */
memcpy(plannerRestrictionContext->fastPathRestrictionContext,
fastPathRestrictionContext,
sizeof(FastPathRestrictionContext));
}
plannerRestrictionContext->memoryContext = CurrentMemoryContext; plannerRestrictionContext->memoryContext = CurrentMemoryContext;
/* we'll apply logical AND as we add tables */ /* we'll apply logical AND as we add tables */

View File

@ -53,6 +53,7 @@
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
bool EnableFastPathRouterPlanner = true; bool EnableFastPathRouterPlanner = true;
bool EnableSingShardFastPathPOC = true;
static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey);
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn,
@ -112,10 +113,6 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
Plan *plan = &scanNode->plan; Plan *plan = &scanNode->plan;
#endif #endif
Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL;
Assert(FastPathRouterQuery(parse, &distKey));
/* there is only a single relation rte */ /* there is only a single relation rte */
#if PG_VERSION_NUM >= PG_VERSION_16 #if PG_VERSION_NUM >= PG_VERSION_16
scanNode->scan.scanrelid = 1; scanNode->scan.scanrelid = 1;
@ -166,10 +163,14 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
* don't have any sublinks/CTEs etc * don't have any sublinks/CTEs etc
*/ */
bool bool
FastPathRouterQuery(Query *query, Node **distributionKeyValue) FastPathRouterQuery(Query *query, const char *query_string,
FastPathRestrictionContext *fastPathContext)
{ {
FromExpr *joinTree = query->jointree; FromExpr *joinTree = query->jointree;
Node *quals = NULL; Node *quals = NULL;
bool isFastPath = false;
bool isDistributedTable = false;
Node *distributionKeyValue = NULL;
if (!EnableFastPathRouterPlanner) if (!EnableFastPathRouterPlanner)
{ {
@ -201,7 +202,9 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
else if (query->commandType == CMD_INSERT) else if (query->commandType == CMD_INSERT)
{ {
/* we don't need to do any further checks, all INSERTs are fast-path */ /* we don't need to do any further checks, all INSERTs are fast-path */
return true; isFastPath = true;
isDistributedTable = true;
goto returnFastPath;
} }
/* make sure that the only range table in FROM clause */ /* make sure that the only range table in FROM clause */
@ -232,18 +235,20 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
Var *distributionKey = PartitionColumn(distributedTableId, 1); Var *distributionKey = PartitionColumn(distributedTableId, 1);
if (!distributionKey) if (!distributionKey)
{ {
return true; isFastPath = true;
} }
/* WHERE clause should not be empty for distributed tables */ if (!isFastPath)
if (joinTree == NULL ||
(IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && joinTree->quals ==
NULL))
{ {
isDistributedTable = IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE);
if (joinTree == NULL ||
(joinTree->quals == NULL && !isDistributedTable))
{
/* no quals, not a fast path query */
return false; return false;
} }
/* convert list of expressions into expression tree for further processing */
quals = joinTree->quals; quals = joinTree->quals;
if (quals != NULL && IsA(quals, List)) if (quals != NULL && IsA(quals, List))
{ {
@ -264,13 +269,40 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
* This is to simplify both of the individual checks and omit various edge cases * This is to simplify both of the individual checks and omit various edge cases
* that might arise with multiple distribution keys in the quals. * that might arise with multiple distribution keys in the quals.
*/ */
if (ConjunctionContainsColumnFilter(quals, distributionKey, distributionKeyValue) && isFastPath = (ConjunctionContainsColumnFilter(quals, distributionKey,
!ColumnAppearsMultipleTimes(quals, distributionKey)) &distributionKeyValue) &&
{ !ColumnAppearsMultipleTimes(quals, distributionKey));
return true;
} }
return false; returnFastPath:
if (isFastPath)
{
fastPathContext->fastPathRouterQuery = true;
if (distributionKeyValue == NULL)
{
/* nothing to record */
}
else if (IsA(distributionKeyValue, Const))
{
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
}
else if (IsA(distributionKeyValue, Param))
{
fastPathContext->distributionKeyHasParam = true;
}
if (EnableSingShardFastPathPOC)
{
fastPathContext->distTableRte = rangeTableEntry;
fastPathContext->delayFastPathPlanning = isDistributedTable; /*&& !fastPathContext->distributionKeyHasParam; */
/* If the dist key is parameterized the query will use the plan cache (todo: verify) */
fastPathContext->clientQueryString = query_string;
}
}
return isFastPath;
} }

View File

@ -20,6 +20,7 @@
#include "catalog/pg_opfamily.h" #include "catalog/pg_opfamily.h"
#include "catalog/pg_proc.h" #include "catalog/pg_proc.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/shard_utils.h"
#include "executor/execdesc.h" #include "executor/execdesc.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
@ -34,6 +35,7 @@
#include "optimizer/pathnode.h" #include "optimizer/pathnode.h"
#include "optimizer/paths.h" #include "optimizer/paths.h"
#include "optimizer/planmain.h" #include "optimizer/planmain.h"
#include "optimizer/planner.h"
#include "optimizer/restrictinfo.h" #include "optimizer/restrictinfo.h"
#include "parser/parse_oper.h" #include "parser/parse_oper.h"
#include "parser/parsetree.h" #include "parser/parsetree.h"
@ -173,7 +175,9 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList); static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList); static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList);
static bool IsLocallyAccessibleCitusLocalTable(Oid relationId); static bool IsLocallyAccessibleCitusLocalTable(Oid relationId);
static Job * RouterJobFastPath(DistributedPlanningContext *planContext,
DistributedPlan *distributedPlan);
static Query * ReplaceShardRelationId(Query *query, Oid relationID, Oid shardRelationId);
/* /*
* CreateRouterPlan attempts to create a router executor plan for the given * CreateRouterPlan attempts to create a router executor plan for the given
@ -201,6 +205,64 @@ CreateRouterPlan(Query *originalQuery, Query *query,
} }
DistributedPlan *
CreateFastPathRouterPlan(DistributedPlanningContext *planContext)
{
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
Query *query = planContext->query;
bool is_select = (query->commandType == CMD_SELECT);
distributedPlan->planningError = NULL; /* todo: relevant parts of DeferErrorIfUnsupportedRouterPlannableSelectQuery(planContext->query); */
/* Query targetlist does not have nextval */
/* Query -> hasForUpdate AND table replication factor > 1 */
if (!is_select)
{
distributedPlan->planningError = ModifyQuerySupported(query,
planContext->originalQuery,
false,
planContext->
plannerRestrictionContext);
}
if (distributedPlan->planningError == NULL)
{
distributedPlan->modLevel = RowModifyLevelForQuery(query);
Job *job = NULL;
if (is_select || UpdateOrDeleteOrMergeQuery(query))
{
job = RouterJobFastPath(planContext, distributedPlan);
}
else
{
job = RouterInsertJob(planContext->originalQuery);
/* Apply fast path planning :: todo - place earlier in planning */
planContext->plan = FastPathPlanner(planContext->originalQuery,
planContext->query,
planContext->boundParams);
}
if (distributedPlan->planningError == NULL)
{
distributedPlan->workerJob = job;
distributedPlan->combineQuery = NULL;
distributedPlan->expectResults = is_select || query->returningList != NIL;
distributedPlan->targetRelationId = is_select ? InvalidOid :
ResultRelationOidForQuery(query);
}
/* todo: handle the case where planningError is not NULL */
}
distributedPlan->fastPathRouterPlan =
planContext->plannerRestrictionContext->fastPathRestrictionContext->
fastPathRouterQuery;
return distributedPlan;
}
/* /*
* CreateModifyPlan attempts to create a plan for the given modification * CreateModifyPlan attempts to create a plan for the given modification
* statement. If planning fails ->planningError is set to a description of * statement. If planning fails ->planningError is set to a description of
@ -1948,6 +2010,188 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
} }
static Job *
RouterJobFastPath(DistributedPlanningContext *planContext,
DistributedPlan *distributedPlan)
{
FastPathRestrictionContext *fastPathContext =
planContext->plannerRestrictionContext->fastPathRestrictionContext;
/* DeferredErrorMessage *planningError = NULL; todo - check if we need this */
Job *job = NULL;
Assert(fastPathContext->fastPathRouterQuery);
Assert(fastPathContext->delayFastPathPlanning);
if (fastPathContext->distributionKeyHasParam)
{
/*
* Deferred pruning - don't know which shard at this point.
*/
job = CreateJob(planContext->query);
job->deferredPruning = true;
/* Apply fast path planning */
planContext->plan = FastPathPlanner(planContext->originalQuery,
planContext->query,
planContext->boundParams);
return job;
}
Query *originalQuery = planContext->originalQuery;
bool isMultiShardQuery = false, shardsPresent = false,
distTableWithShardKey = false, isLocalExecution = false;
Const *partitionKeyValue = NULL;
Const *distributionKeyValue = fastPathContext->distributionKeyValue;
uint64 shardId = INVALID_SHARD_ID;
int32 localGroupId = -1;
List *shardIntervals =
TargetShardIntervalForFastPathQuery(originalQuery, &isMultiShardQuery,
distributionKeyValue,
&partitionKeyValue);
Assert(!isMultiShardQuery);
Assert(list_length(shardIntervals) == 1);
List *relationShards = RelationShardListForShardIntervalList(shardIntervals,
&shardsPresent);
Assert(shardsPresent);
Assert(list_length(relationShards) == 1);
RelationShard *shard = (RelationShard *) linitial(relationShards);
shardId = shard->shardId;
CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(
fastPathContext->distTableRte->relid);
Assert(cacheEntry != NULL);
Assert(cacheEntry->relationId == shard->relationId);
Assert(IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE));
distTableWithShardKey = HasDistributionKeyCacheEntry(cacheEntry);
Assert(distTableWithShardKey);
List *taskPlacementList = CreateTaskPlacementListForShardIntervals(shardIntervals,
true, false,
false);
Assert(list_length(taskPlacementList) == 1);
ShardPlacement *primaryPlacement =
(ShardPlacement *) linitial(taskPlacementList);
Assert(primaryPlacement->shardId == shardId);
Assert(originalQuery->resultRelation == 0 ||
fastPathContext->distTableRte->rtekind != RTE_SUBQUERY);
Assert(shardId != INVALID_SHARD_ID);
if (originalQuery->hasModifyingCTE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"Fast path queries with modifying CTEs are not supported")));
}
job = CreateJob(originalQuery);
job->partitionKeyValue = partitionKeyValue;
job->requiresCoordinatorEvaluation = false; /* todo: assert this is correct */
job->colocationId = TableColocationId(fastPathContext->distTableRte->relid);
TaskType taskType = READ_TASK;
char replicationModel = 0;
if (originalQuery->commandType != CMD_SELECT)
{
taskType = MODIFY_TASK;
replicationModel = cacheEntry->replicationModel;
}
Task *task = CreateTask(taskType);
task->isLocalTableModification = false;
List *relationRowLockList = NIL;
task->taskPlacementList = taskPlacementList;
task->partitionKeyValue = partitionKeyValue;
task->colocationId = job->colocationId;
/* Determine if task is local or remote */
localGroupId = GetLocalGroupId();
isLocalExecution = (primaryPlacement->groupId == localGroupId);
if (isLocalExecution)
{
planContext->query = ReplaceShardRelationId(planContext->query,
shard->relationId,
shardId);
/* Plan the query with the new shard relation id */
/* Save plan in planContext->plan */
planContext->plan = standard_planner(planContext->query, NULL,
planContext->cursorOptions,
planContext->boundParams);
SetTaskQueryPlan(task, planContext->plan);
}
else
{
SetTaskQueryString(task, (char *) fastPathContext->clientQueryString);
/* Call fast path query planner, Save plan in planContext->plan */
planContext->plan = FastPathPlanner(planContext->originalQuery,
planContext->query,
planContext->boundParams);
}
task->anchorShardId = shardId;
task->jobId = job->jobId;
task->relationShardList = relationShards;
task->relationRowLockList = relationRowLockList;
task->replicationModel = replicationModel;
task->parametersInQueryStringResolved = job->parametersInJobQueryResolved;
job->taskList = list_make1(task);
return job;
}
static Query *
ReplaceShardRelationId(Query *query, Oid citusTableOid, Oid shardId)
{
Oid shardRelationId = InvalidOid;
Assert(list_length(query->rtable) == 1);
Assert(list_length(query->rteperminfos) == 1);
RangeTblEntry *rte = (RangeTblEntry *) linitial(query->rtable);
Assert(rte->relid == citusTableOid);
RTEPermissionInfo *rtePermInfo = (RTEPermissionInfo *) linitial(query->rteperminfos);
const char *citusTableName = get_rel_name(citusTableOid);
Assert(citusTableName != NULL);
/* construct shard relation name */
char *shardRelationName = pstrdup(citusTableName);
AppendShardIdToName(&shardRelationName, shardId);
RangeVar shardRangeVar = {
.relname = shardRelationName,
.schemaname = NULL, /* todo - should initialize this ? get_rel_namespace(shardRelationId), */
.inh = rte->inh,
.relpersistence = RELPERSISTENCE_PERMANENT,
};
shardRelationId = RangeVarGetRelidExtended(&shardRangeVar, rte->rellockmode,
0, NULL, NULL); /* todo - use suitable callback for perms check? */
Assert(shardRelationId != InvalidOid);
rte->relid = shardRelationId;
rtePermInfo->relid = shardRelationId;
ListCell *lc = NULL;
foreach(lc, query->targetList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(lc);
if (targetEntry->resorigtbl == citusTableOid)
{
targetEntry->resorigtbl = shardRelationId;
}
}
return query;
}
/* /*
* GenerateSingleShardRouterTaskList is a wrapper around other corresponding task * GenerateSingleShardRouterTaskList is a wrapper around other corresponding task
* list generation functions specific to single shard selects and modifications. * list generation functions specific to single shard selects and modifications.

View File

@ -1366,6 +1366,17 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_single_shard_poc",
gettext_noop("Enables execution shortcuts for single shard "
"queries in the proof of concept mode."),
NULL,
&EnableSingShardFastPathPOC,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.enable_local_execution", "citus.enable_local_execution",
gettext_noop("Enables queries on shards that are local to the current node " gettext_noop("Enables queries on shards that are local to the current node "

View File

@ -287,6 +287,14 @@ CopyTaskQuery(Task *newnode, Task *from)
break; break;
} }
case TASK_QUERY_LOCAL_PLAN:
{
/*COPY_NODE_FIELD(taskQuery.data.localPlan); */
/* This is a local planned statement, so shallow copy is enough */
COPY_SCALAR_FIELD(taskQuery.data.localPlan);
break;
}
default: default:
{ {
break; break;

View File

@ -27,7 +27,9 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryString(Task *task, char *queryString);
extern void SetTaskQueryStringList(Task *task, List *queryStringList); extern void SetTaskQueryStringList(Task *task, List *queryStringList);
extern void SetTaskQueryPlan(Task *task, PlannedStmt *localPlan);
extern char * TaskQueryString(Task *task); extern char * TaskQueryString(Task *task);
extern PlannedStmt * TaskQueryLocalPlan(Task *task);
extern char * TaskQueryStringAtIndex(Task *task, int index); extern char * TaskQueryStringAtIndex(Task *task, int index);
extern int GetTaskQueryType(Task *task); extern int GetTaskQueryType(Task *task);
extern void AddInsertAliasIfNeeded(Query *query); extern void AddInsertAliasIfNeeded(Query *query);

View File

@ -99,6 +99,12 @@ typedef struct FastPathRestrictionContext
* Set to true when distKey = Param; in the queryTree * Set to true when distKey = Param; in the queryTree
*/ */
bool distributionKeyHasParam; bool distributionKeyHasParam;
bool delayFastPathPlanning; /* Indicates to hold off on callning the fast path planner until its known if the shard is local */
RangeTblEntry *distTableRte; /* Range table entry for the table we're querying */
const char *clientQueryString; /* As passed in by the client */
} FastPathRestrictionContext; } FastPathRestrictionContext;
typedef struct PlannerRestrictionContext typedef struct PlannerRestrictionContext

View File

@ -174,7 +174,8 @@ typedef enum TaskQueryType
TASK_QUERY_NULL, TASK_QUERY_NULL,
TASK_QUERY_TEXT, TASK_QUERY_TEXT,
TASK_QUERY_OBJECT, TASK_QUERY_OBJECT,
TASK_QUERY_TEXT_LIST TASK_QUERY_TEXT_LIST,
TASK_QUERY_LOCAL_PLAN,
} TaskQueryType; } TaskQueryType;
typedef struct TaskQuery typedef struct TaskQuery
@ -219,6 +220,8 @@ typedef struct TaskQuery
* when we want to access each query string. * when we want to access each query string.
*/ */
List *queryStringList; List *queryStringList;
PlannedStmt *localPlan; /* only applies to local tasks */
}data; }data;
}TaskQuery; }TaskQuery;

View File

@ -28,12 +28,15 @@
extern bool EnableRouterExecution; extern bool EnableRouterExecution;
extern bool EnableFastPathRouterPlanner; extern bool EnableFastPathRouterPlanner;
extern bool EnableSingShardFastPathPOC;
extern bool EnableNonColocatedRouterQueryPushdown; extern bool EnableNonColocatedRouterQueryPushdown;
extern DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query, extern DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
extern DistributedPlan * CreateFastPathRouterPlan(
DistributedPlanningContext *planContext);
extern DistributedPlan * CreateModifyPlan(Query *originalQuery, Query *query, extern DistributedPlan * CreateModifyPlan(Query *originalQuery, Query *query,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
@ -100,7 +103,8 @@ extern void GenerateSingleShardRouterTaskList(Job *job,
extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo
boundParams); boundParams);
extern bool FastPathRouterQuery(Query *query, Node **distributionKeyValue); extern bool FastPathRouterQuery(Query *query, const char *query_string,
FastPathRestrictionContext *fastPathContext);
extern bool JoinConditionIsOnFalse(List *relOptInfo); extern bool JoinConditionIsOnFalse(List *relOptInfo);
extern Oid ResultRelationOidForQuery(Query *query); extern Oid ResultRelationOidForQuery(Query *query);
extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId, extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId,