pull/8035/merge
Colm 2025-06-24 17:53:42 +00:00 committed by GitHub
commit aab8cafd59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 389 additions and 101 deletions

View File

@ -686,7 +686,8 @@ RegenerateTaskForFasthPathQuery(Job *workerJob)
relationShardList, relationShardList,
placementList, placementList,
shardId, shardId,
isLocalTableModification); isLocalTableModification,
false);
} }

View File

@ -313,6 +313,7 @@ ExecuteLocalTaskListExtended(List *taskList,
{ {
int taskNumParams = numParams; int taskNumParams = numParams;
Oid *taskParameterTypes = parameterTypes; Oid *taskParameterTypes = parameterTypes;
int taskType = GetTaskQueryType(task);
if (task->parametersInQueryStringResolved) if (task->parametersInQueryStringResolved)
{ {
@ -330,7 +331,7 @@ ExecuteLocalTaskListExtended(List *taskList,
* for concatenated strings, we set queryStringList so that we can access * for concatenated strings, we set queryStringList so that we can access
* each query string. * each query string.
*/ */
if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) if (taskType == TASK_QUERY_TEXT_LIST)
{ {
List *queryStringList = task->taskQuery.data.queryStringList; List *queryStringList = task->taskQuery.data.queryStringList;
totalRowsProcessed += totalRowsProcessed +=
@ -342,22 +343,28 @@ ExecuteLocalTaskListExtended(List *taskList,
continue; continue;
} }
Query *shardQuery = ParseQueryString(TaskQueryString(task), if (taskType != TASK_QUERY_LOCAL_PLAN)
taskParameterTypes, {
taskNumParams); Query *shardQuery = ParseQueryString(TaskQueryString(task),
taskParameterTypes,
taskNumParams);
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
int cursorOptions = CURSOR_OPT_PARALLEL_OK; /*
* Altough the shardQuery is local to this node, we prefer planner()
/* * over standard_planner(). The primary reason for that is Citus itself
* Altough the shardQuery is local to this node, we prefer planner() * is not very tolarent standard_planner() calls that doesn't go through
* over standard_planner(). The primary reason for that is Citus itself * distributed_planner() because of the way that restriction hooks are
* is not very tolarent standard_planner() calls that doesn't go through * implemented. So, let planner to call distributed_planner() which
* distributed_planner() because of the way that restriction hooks are * eventually calls standard_planner().
* implemented. So, let planner to call distributed_planner() which */
* eventually calls standard_planner(). localPlan = planner(shardQuery, NULL, cursorOptions, paramListInfo);
*/ }
localPlan = planner(shardQuery, NULL, cursorOptions, paramListInfo); else
{
localPlan = TaskQueryLocalPlan(task);
}
} }
char *shardQueryString = NULL; char *shardQueryString = NULL;

View File

@ -439,6 +439,24 @@ SetTaskQueryStringList(Task *task, List *queryStringList)
} }
void
SetTaskQueryPlan(Task *task, PlannedStmt *localPlan)
{
Assert(localPlan != NULL);
task->taskQuery.queryType = TASK_QUERY_LOCAL_PLAN;
task->taskQuery.data.localPlan = localPlan;
task->queryCount = 1;
}
PlannedStmt *
TaskQueryLocalPlan(Task *task)
{
Assert(task->taskQuery.queryType == TASK_QUERY_LOCAL_PLAN);
return task->taskQuery.data.localPlan;
}
/* /*
* DeparseTaskQuery is a general way of deparsing a query based on a task. * DeparseTaskQuery is a general way of deparsing a query based on a task.
*/ */
@ -497,6 +515,8 @@ TaskQueryStringAtIndex(Task *task, int index)
} }
static char *qry_unavailable_msg = "SELECT 'Task query unavailable - optimized away'";
/* /*
* TaskQueryString generates task query string text if missing. * TaskQueryString generates task query string text if missing.
* *
@ -524,6 +544,10 @@ TaskQueryString(Task *task)
{ {
return task->taskQuery.data.queryStringLazy; return task->taskQuery.data.queryStringLazy;
} }
else if (taskQueryType == TASK_QUERY_LOCAL_PLAN)
{
return qry_unavailable_msg;
}
Query *jobQueryReferenceForLazyDeparsing = Query *jobQueryReferenceForLazyDeparsing =
task->taskQuery.data.jobQueryReferenceForLazyDeparsing; task->taskQuery.data.jobQueryReferenceForLazyDeparsing;

View File

@ -135,13 +135,13 @@ static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo,
Const *resultFormatConst); Const *resultFormatConst);
static List * OuterPlanParamsList(PlannerInfo *root); static List * OuterPlanParamsList(PlannerInfo *root);
static List * CopyPlanParamList(List *originalPlanParamList); static List * CopyPlanParamList(List *originalPlanParamList);
static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void); static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(
FastPathRestrictionContext *fastPathContext);
static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void); static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
static void PopPlannerRestrictionContext(void); static void PopPlannerRestrictionContext(void);
static void ResetPlannerRestrictionContext( static void ResetPlannerRestrictionContext(
PlannerRestrictionContext *plannerRestrictionContext); PlannerRestrictionContext *plannerRestrictionContext);
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext);
Node *distributionKeyValue);
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
int rteIdCounter); int rteIdCounter);
static RTEListProperties * GetRTEListProperties(List *rangeTableList); static RTEListProperties * GetRTEListProperties(List *rangeTableList);
@ -166,7 +166,7 @@ distributed_planner(Query *parse,
{ {
bool needsDistributedPlanning = false; bool needsDistributedPlanning = false;
bool fastPathRouterQuery = false; bool fastPathRouterQuery = false;
Node *distributionKeyValue = NULL; FastPathRestrictionContext fastPathContext = { 0 };
List *rangeTableList = ExtractRangeTableEntryList(parse); List *rangeTableList = ExtractRangeTableEntryList(parse);
@ -191,8 +191,7 @@ distributed_planner(Query *parse,
&maybeHasForeignDistributedTable); &maybeHasForeignDistributedTable);
if (needsDistributedPlanning) if (needsDistributedPlanning)
{ {
fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue); fastPathRouterQuery = FastPathRouterQuery(parse, &fastPathContext);
if (maybeHasForeignDistributedTable) if (maybeHasForeignDistributedTable)
{ {
WarnIfListHasForeignDistributedTable(rangeTableList); WarnIfListHasForeignDistributedTable(rangeTableList);
@ -247,8 +246,9 @@ distributed_planner(Query *parse,
*/ */
HideCitusDependentObjectsOnQueriesOfPgMetaTables((Node *) parse, NULL); HideCitusDependentObjectsOnQueriesOfPgMetaTables((Node *) parse, NULL);
/* create a restriction context and put it at the end if context list */ /* create a restriction context and put it at the end of context list */
planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(); planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(
&fastPathContext);
/* /*
* We keep track of how many times we've recursed into the planner, primarily * We keep track of how many times we've recursed into the planner, primarily
@ -264,7 +264,7 @@ distributed_planner(Query *parse,
{ {
if (fastPathRouterQuery) if (fastPathRouterQuery)
{ {
result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue); result = PlanFastPathDistributedStmt(&planContext);
} }
else else
{ {
@ -649,30 +649,17 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan)
* the FastPathPlanner. * the FastPathPlanner.
*/ */
static PlannedStmt * static PlannedStmt *
PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, PlanFastPathDistributedStmt(DistributedPlanningContext *planContext)
Node *distributionKeyValue)
{ {
FastPathRestrictionContext *fastPathContext = FastPathRestrictionContext *fastPathContext =
planContext->plannerRestrictionContext->fastPathRestrictionContext; planContext->plannerRestrictionContext->fastPathRestrictionContext;
planContext->plannerRestrictionContext->fastPathRestrictionContext-> if (!fastPathContext->delayFastPathPlanning)
fastPathRouterQuery = true;
if (distributionKeyValue == NULL)
{ {
/* nothing to record */ planContext->plan = FastPathPlanner(planContext->originalQuery,
planContext->query,
planContext->boundParams);
} }
else if (IsA(distributionKeyValue, Const))
{
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
}
else if (IsA(distributionKeyValue, Param))
{
fastPathContext->distributionKeyHasParam = true;
}
planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query,
planContext->boundParams);
return CreateDistributedPlannedStmt(planContext); return CreateDistributedPlannedStmt(planContext);
} }
@ -803,6 +790,8 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
RaiseDeferredError(distributedPlan->planningError, ERROR); RaiseDeferredError(distributedPlan->planningError, ERROR);
} }
CheckAndBuildDelayedFastPathPlan(planContext, distributedPlan);
/* remember the plan's identifier for identifying subplans */ /* remember the plan's identifier for identifying subplans */
distributedPlan->planId = planId; distributedPlan->planId = planId;
@ -2407,13 +2396,15 @@ CopyPlanParamList(List *originalPlanParamList)
/* /*
* CreateAndPushPlannerRestrictionContext creates a new relation restriction context * CreateAndPushPlannerRestrictionContext creates a new planner restriction
* and a new join context, inserts it to the beginning of the * context with an empty relation restriction context and an empty join and
* plannerRestrictionContextList. Finally, the planner restriction context is * a copy of the given fast path restriction context (if present). Finally,
* inserted to the beginning of the plannerRestrictionContextList and it is returned. * the planner restriction context is inserted to the beginning of the
* global plannerRestrictionContextList and it is returned.
*/ */
static PlannerRestrictionContext * static PlannerRestrictionContext *
CreateAndPushPlannerRestrictionContext(void) CreateAndPushPlannerRestrictionContext(
FastPathRestrictionContext *fastPathRestrictionContext)
{ {
PlannerRestrictionContext *plannerRestrictionContext = PlannerRestrictionContext *plannerRestrictionContext =
palloc0(sizeof(PlannerRestrictionContext)); palloc0(sizeof(PlannerRestrictionContext));
@ -2427,6 +2418,14 @@ CreateAndPushPlannerRestrictionContext(void)
plannerRestrictionContext->fastPathRestrictionContext = plannerRestrictionContext->fastPathRestrictionContext =
palloc0(sizeof(FastPathRestrictionContext)); palloc0(sizeof(FastPathRestrictionContext));
if (fastPathRestrictionContext != NULL)
{
/* copy the given fast path restriction context */
memcpy(plannerRestrictionContext->fastPathRestrictionContext,
fastPathRestrictionContext,
sizeof(FastPathRestrictionContext));
}
plannerRestrictionContext->memoryContext = CurrentMemoryContext; plannerRestrictionContext->memoryContext = CurrentMemoryContext;
/* we'll apply logical AND as we add tables */ /* we'll apply logical AND as we add tables */

View File

@ -43,6 +43,7 @@
#include "pg_version_constants.h" #include "pg_version_constants.h"
#include "distributed/citus_clauses.h"
#include "distributed/distributed_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/insert_select_planner.h" #include "distributed/insert_select_planner.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -53,6 +54,7 @@
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
bool EnableFastPathRouterPlanner = true; bool EnableFastPathRouterPlanner = true;
bool EnableFastPathLocalExecutor = true;
static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey);
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn,
@ -112,10 +114,6 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
Plan *plan = &scanNode->plan; Plan *plan = &scanNode->plan;
#endif #endif
Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL;
Assert(FastPathRouterQuery(parse, &distKey));
/* there is only a single relation rte */ /* there is only a single relation rte */
#if PG_VERSION_NUM >= PG_VERSION_16 #if PG_VERSION_NUM >= PG_VERSION_16
scanNode->scan.scanrelid = 1; scanNode->scan.scanrelid = 1;
@ -152,24 +150,32 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
/* /*
* FastPathRouterQuery gets a query and returns true if the query is eligible for * FastPathRouterQuery gets a query and returns true if the query is eligible for
* being a fast path router query. * being a fast path router query. It also fills the given fastPathContext with
* details about the query such as the distribution key value (if available),
* whether the distribution key is a parameter, and the range table entry for the
* table being queried.
* The requirements for the fast path query can be listed below: * The requirements for the fast path query can be listed below:
* *
* - SELECT/UPDATE/DELETE query without CTES, sublinks-subqueries, set operations * - SELECT/UPDATE/DELETE query without CTES, sublinks-subqueries, set operations
* - The query should touch only a single hash distributed or reference table * - The query should touch only a single hash distributed or reference table
* - The distribution with equality operator should be in the WHERE clause * - The distribution with equality operator should be in the WHERE clause
* and it should be ANDed with any other filters. Also, the distribution * and it should be ANDed with any other filters. Also, the distribution
* key should only exists once in the WHERE clause. So basically, * key should only exist once in the WHERE clause. So basically,
* SELECT ... FROM dist_table WHERE dist_key = X * SELECT ... FROM dist_table WHERE dist_key = X
* If the filter is a const, distributionKeyValue is set * If the filter is a const, distributionKeyValue is set
* - All INSERT statements (including multi-row INSERTs) as long as the commands * - All INSERT statements (including multi-row INSERTs) as long as the commands
* don't have any sublinks/CTEs etc * don't have any sublinks/CTEs etc
* -
*/ */
bool bool
FastPathRouterQuery(Query *query, Node **distributionKeyValue) FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext)
{ {
FromExpr *joinTree = query->jointree; FromExpr *joinTree = query->jointree;
Node *quals = NULL; Node *quals = NULL;
bool isFastPath = false;
bool canAvoidDeparse = false;
Node *distributionKeyValue = NULL;
RangeTblEntry *rangeTableEntry = NULL;
if (!EnableFastPathRouterPlanner) if (!EnableFastPathRouterPlanner)
{ {
@ -201,7 +207,8 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
else if (query->commandType == CMD_INSERT) else if (query->commandType == CMD_INSERT)
{ {
/* we don't need to do any further checks, all INSERTs are fast-path */ /* we don't need to do any further checks, all INSERTs are fast-path */
return true; isFastPath = true;
goto returnFastPath;
} }
/* make sure that the only range table in FROM clause */ /* make sure that the only range table in FROM clause */
@ -210,7 +217,7 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
return false; return false;
} }
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(query->rtable); rangeTableEntry = (RangeTblEntry *) linitial(query->rtable);
if (rangeTableEntry->rtekind != RTE_RELATION) if (rangeTableEntry->rtekind != RTE_RELATION)
{ {
return false; return false;
@ -232,45 +239,97 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
Var *distributionKey = PartitionColumn(distributedTableId, 1); Var *distributionKey = PartitionColumn(distributedTableId, 1);
if (!distributionKey) if (!distributionKey)
{ {
return true; /* Local execution may avoid a deparse on single shard distributed tables */
canAvoidDeparse = IsCitusTableTypeCacheEntry(cacheEntry,
SINGLE_SHARD_DISTRIBUTED);
isFastPath = true;
} }
/* WHERE clause should not be empty for distributed tables */ if (!isFastPath)
if (joinTree == NULL ||
(IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && joinTree->quals ==
NULL))
{ {
return false; canAvoidDeparse = IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE);
if (joinTree == NULL ||
(joinTree->quals == NULL && !canAvoidDeparse))
{
/* no quals, not a fast path query */
return false;
}
quals = joinTree->quals;
if (quals != NULL && IsA(quals, List))
{
quals = (Node *) make_ands_explicit((List *) quals);
}
/*
* Distribution column must be used in a simple equality match check and it must be
* place at top level conjunction operator. In simple words, we should have
* WHERE dist_key = VALUE [AND ....];
*
* We're also not allowing any other appearances of the distribution key in the quals.
*
* Overall the logic might sound fuzzy since it involves two individual checks:
* (a) Check for top level AND operator with one side being "dist_key = const"
* (b) Only allow single appearance of "dist_key" in the quals
*
* This is to simplify both of the individual checks and omit various edge cases
* that might arise with multiple distribution keys in the quals.
*/
isFastPath = (ConjunctionContainsColumnFilter(quals, distributionKey,
&distributionKeyValue) &&
!ColumnAppearsMultipleTimes(quals, distributionKey));
} }
/* convert list of expressions into expression tree for further processing */ returnFastPath:
quals = joinTree->quals;
if (quals != NULL && IsA(quals, List)) if (isFastPath)
{ {
quals = (Node *) make_ands_explicit((List *) quals); Assert(fastPathContext != NULL);
Assert(!fastPathContext->fastPathRouterQuery);
Assert(!fastPathContext->delayFastPathPlanning);
/*
* We're looking at a fast path query, so we can fill the
* fastPathContext with relevant details.
*/
fastPathContext->fastPathRouterQuery = true;
if (distributionKeyValue == NULL)
{
/* nothing to record */
}
else if (IsA(distributionKeyValue, Const))
{
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
}
else if (IsA(distributionKeyValue, Param))
{
fastPathContext->distributionKeyHasParam = true;
}
/*
* Note the range table entry for the table we're querying.
*/
Assert(rangeTableEntry != NULL || query->commandType == CMD_INSERT);
fastPathContext->distTableRte = rangeTableEntry;
if (EnableFastPathLocalExecutor)
{
/*
* This fast path query may be executed by the local executor.
* We need to delay the fast path planning until we know if the
* shard is local or not. Make a final check for volatile
* functions in the query tree to determine if we should delay
* the fast path planning.
*/
fastPathContext->delayFastPathPlanning = canAvoidDeparse &&
!FindNodeMatchingCheckFunction(
(Node *) query,
CitusIsVolatileFunction);
}
} }
/* return isFastPath;
* Distribution column must be used in a simple equality match check and it must be
* place at top level conjunction operator. In simple words, we should have
* WHERE dist_key = VALUE [AND ....];
*
* We're also not allowing any other appearances of the distribution key in the quals.
*
* Overall the logic might sound fuzzy since it involves two individual checks:
* (a) Check for top level AND operator with one side being "dist_key = const"
* (b) Only allow single appearance of "dist_key" in the quals
*
* This is to simplify both of the individual checks and omit various edge cases
* that might arise with multiple distribution keys in the quals.
*/
if (ConjunctionContainsColumnFilter(quals, distributionKey, distributionKeyValue) &&
!ColumnAppearsMultipleTimes(quals, distributionKey))
{
return true;
}
return false;
} }

View File

@ -20,6 +20,7 @@
#include "catalog/pg_opfamily.h" #include "catalog/pg_opfamily.h"
#include "catalog/pg_proc.h" #include "catalog/pg_proc.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/shard_utils.h"
#include "executor/execdesc.h" #include "executor/execdesc.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
@ -34,6 +35,7 @@
#include "optimizer/pathnode.h" #include "optimizer/pathnode.h"
#include "optimizer/paths.h" #include "optimizer/paths.h"
#include "optimizer/planmain.h" #include "optimizer/planmain.h"
#include "optimizer/planner.h"
#include "optimizer/restrictinfo.h" #include "optimizer/restrictinfo.h"
#include "parser/parse_oper.h" #include "parser/parse_oper.h"
#include "parser/parsetree.h" #include "parser/parsetree.h"
@ -164,7 +166,7 @@ static List * SingleShardTaskList(Query *query, uint64 jobId,
List *relationShardList, List *placementList, List *relationShardList, List *placementList,
uint64 shardId, bool parametersInQueryResolved, uint64 shardId, bool parametersInQueryResolved,
bool isLocalTableModification, Const *partitionKeyValue, bool isLocalTableModification, Const *partitionKeyValue,
int colocationId); int colocationId, bool delayedFastPath);
static bool RowLocksOnRelations(Node *node, List **rtiLockList); static bool RowLocksOnRelations(Node *node, List **rtiLockList);
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
TaskAssignmentPolicyType TaskAssignmentPolicyType
@ -173,7 +175,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList); static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList); static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList);
static bool IsLocallyAccessibleCitusLocalTable(Oid relationId); static bool IsLocallyAccessibleCitusLocalTable(Oid relationId);
static Query * ConvertToQueryOnShard(Query *query, Oid relationID, Oid shardRelationId);
/* /*
* CreateRouterPlan attempts to create a router executor plan for the given * CreateRouterPlan attempts to create a router executor plan for the given
@ -1940,7 +1942,9 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
{ {
GenerateSingleShardRouterTaskList(job, relationShardList, GenerateSingleShardRouterTaskList(job, relationShardList,
placementList, shardId, placementList, shardId,
isLocalTableModification); isLocalTableModification,
fastPathRestrictionContext->
delayFastPathPlanning);
} }
job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation; job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation;
@ -1948,6 +1952,134 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
} }
void
CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext,
DistributedPlan *plan)
{
FastPathRestrictionContext *fastPathContext =
planContext->plannerRestrictionContext->fastPathRestrictionContext;
if (!fastPathContext->delayFastPathPlanning)
{
return;
}
Job *job = plan->workerJob;
Assert(job != NULL);
if (job->deferredPruning)
{
/* Call fast path query planner, Save plan in planContext->plan */
planContext->plan = FastPathPlanner(planContext->originalQuery,
planContext->query,
planContext->boundParams);
return;
}
List *tasks = job->taskList;
Assert(list_length(tasks) == 1);
Task *task = (Task *) linitial(tasks);
List *placements = task->taskPlacementList;
Assert(list_length(placements) > 0);
int32 localGroupId = GetLocalGroupId();
ShardPlacement *primaryPlacement = (ShardPlacement *) linitial(placements);
List *relationShards = task->relationShardList;
Assert(list_length(relationShards) == 1);
bool isLocalExecution = (primaryPlacement->groupId == localGroupId);
if (isLocalExecution)
{
ConvertToQueryOnShard(planContext->query,
fastPathContext->distTableRte->relid,
primaryPlacement->shardId);
/* Plan the query with the new shard relation id */
/* Save plan in planContext->plan */
planContext->plan = standard_planner(planContext->query, NULL,
planContext->cursorOptions,
planContext->boundParams);
SetTaskQueryPlan(task, planContext->plan);
ereport(DEBUG2, (errmsg("Local plan for fast-path router "
"query")));
}
else
{
/* Call fast path query planner, Save plan in planContext->plan */
planContext->plan = FastPathPlanner(planContext->originalQuery,
planContext->query,
planContext->boundParams);
UpdateRelationToShardNames((Node *) job->jobQuery, relationShards);
SetTaskQueryIfShouldLazyDeparse(task, job->jobQuery);
}
}
/*
* ConvertToQueryOnShard() converts the given query on a citus table (identified by
* citusTableOid) to a query on a shard (identified by shardId).
*
* The function assumes that the query is a "fast path" query - it has only one
* RangeTblEntry and one RTEPermissionInfo.
*
* It changes the RangeTblEntry's relid to the shard's relation id and
* changes the RTEPermissionInfo's relid to the shard's relation id also.
* It also changes the target list entries that reference the original
* citus table's relation id to the shard's relation id. Finally, it must
* apply the same lock to the shard that was applied to the citus table.
*/
static Query *
ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId)
{
Oid shardRelationId = InvalidOid;
Assert(list_length(query->rtable) == 1);
Assert(list_length(query->rteperminfos) == 1);
RangeTblEntry *citusTableRte = (RangeTblEntry *) linitial(query->rtable);
Assert(citusTableRte->relid == citusTableOid);
RTEPermissionInfo *rtePermInfo = (RTEPermissionInfo *) linitial(query->rteperminfos);
const char *citusTableName = get_rel_name(citusTableOid);
Assert(citusTableName != NULL);
/* construct shard relation name */
char *shardRelationName = pstrdup(citusTableName);
AppendShardIdToName(&shardRelationName, shardId);
/* construct the schema name */
char *schemaName = get_namespace_name(get_rel_namespace(citusTableOid));
/* now construct a range variable for the shard */
RangeVar shardRangeVar = {
.relname = shardRelationName,
.schemaname = schemaName,
.inh = citusTableRte->inh,
.relpersistence = RELPERSISTENCE_PERMANENT,
};
/* Change the target list entries that reference the original citus table's relation id */
ListCell *lc = NULL;
foreach(lc, query->targetList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(lc);
if (targetEntry->resorigtbl == citusTableOid)
{
targetEntry->resorigtbl = shardRelationId;
}
}
/* Must apply the same lock to the shard that was applied to the citus table */
shardRelationId = RangeVarGetRelidExtended(&shardRangeVar, citusTableRte->rellockmode,
0, NULL, NULL); /* todo - use suitable callback for perms check? */
Assert(shardRelationId != InvalidOid);
citusTableRte->relid = shardRelationId;
rtePermInfo->relid = shardRelationId;
return query;
}
/* /*
* GenerateSingleShardRouterTaskList is a wrapper around other corresponding task * GenerateSingleShardRouterTaskList is a wrapper around other corresponding task
* list generation functions specific to single shard selects and modifications. * list generation functions specific to single shard selects and modifications.
@ -1957,7 +2089,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
void void
GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
List *placementList, uint64 shardId, bool List *placementList, uint64 shardId, bool
isLocalTableModification) isLocalTableModification, bool delayedFastPath)
{ {
Query *originalQuery = job->jobQuery; Query *originalQuery = job->jobQuery;
@ -1970,7 +2102,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
shardId, shardId,
job->parametersInJobQueryResolved, job->parametersInJobQueryResolved,
isLocalTableModification, isLocalTableModification,
job->partitionKeyValue, job->colocationId); job->partitionKeyValue, job->colocationId,
delayedFastPath);
/* /*
* Queries to reference tables, or distributed tables with multiple replica's have * Queries to reference tables, or distributed tables with multiple replica's have
@ -2001,7 +2134,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
shardId, shardId,
job->parametersInJobQueryResolved, job->parametersInJobQueryResolved,
isLocalTableModification, isLocalTableModification,
job->partitionKeyValue, job->colocationId); job->partitionKeyValue, job->colocationId,
delayedFastPath);
} }
} }
@ -2096,7 +2230,7 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
List *placementList, uint64 shardId, List *placementList, uint64 shardId,
bool parametersInQueryResolved, bool parametersInQueryResolved,
bool isLocalTableModification, Const *partitionKeyValue, bool isLocalTableModification, Const *partitionKeyValue,
int colocationId) int colocationId, bool delayedFastPath)
{ {
TaskType taskType = READ_TASK; TaskType taskType = READ_TASK;
char replicationModel = 0; char replicationModel = 0;
@ -2168,7 +2302,10 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
task->taskPlacementList = placementList; task->taskPlacementList = placementList;
task->partitionKeyValue = partitionKeyValue; task->partitionKeyValue = partitionKeyValue;
task->colocationId = colocationId; task->colocationId = colocationId;
SetTaskQueryIfShouldLazyDeparse(task, query); if (!delayedFastPath)
{
SetTaskQueryIfShouldLazyDeparse(task, query);
}
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->jobId = jobId; task->jobId = jobId;
task->relationShardList = relationShardList; task->relationShardList = relationShardList;
@ -2449,10 +2586,15 @@ PlanRouterQuery(Query *originalQuery,
/* /*
* If this is an UPDATE or DELETE query which requires coordinator evaluation, * If this is an UPDATE or DELETE query which requires coordinator evaluation,
* don't try update shard names, and postpone that to execution phase. * don't try update shard names, and postpone that to execution phase. Also, if
* this is a delayed fast path query, we don't update the shard names
* either, as the shard names will be updated in the fast path query planner.
*/ */
bool isUpdateOrDelete = UpdateOrDeleteOrMergeQuery(originalQuery); bool isUpdateOrDelete = UpdateOrDeleteOrMergeQuery(originalQuery);
if (!(isUpdateOrDelete && RequiresCoordinatorEvaluation(originalQuery))) bool delayedFastPath =
plannerRestrictionContext->fastPathRestrictionContext->delayFastPathPlanning;
if (!(isUpdateOrDelete && RequiresCoordinatorEvaluation(originalQuery)) &&
!delayedFastPath)
{ {
UpdateRelationToShardNames((Node *) originalQuery, *relationShardList); UpdateRelationToShardNames((Node *) originalQuery, *relationShardList);
} }

View File

@ -212,6 +212,7 @@ static const char * MaxSharedPoolSizeGucShowHook(void);
static const char * LocalPoolSizeGucShowHook(void); static const char * LocalPoolSizeGucShowHook(void);
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
source); source);
static bool ErrorIfLocalExectionDisabled(bool *newval, void **extra, GucSource source);
static void CitusAuthHook(Port *port, int status); static void CitusAuthHook(Port *port, int status);
static bool IsSuperuser(char *userName); static bool IsSuperuser(char *userName);
static void AdjustDynamicLibraryPathForCdcDecoders(void); static void AdjustDynamicLibraryPathForCdcDecoders(void);
@ -1377,6 +1378,17 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_fast_path_local_execution",
gettext_noop("Enables the planner to avoid a query deparse and planning if "
"the shard is local to the current node."),
NULL,
&EnableFastPathLocalExecutor,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
ErrorIfLocalExectionDisabled, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.enable_local_reference_table_foreign_keys", "citus.enable_local_reference_table_foreign_keys",
gettext_noop("Enables foreign keys from/to local tables"), gettext_noop("Enables foreign keys from/to local tables"),
@ -2802,6 +2814,23 @@ WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source)
} }
static bool
ErrorIfLocalExectionDisabled(bool *newval, void **extra, GucSource source)
{
if (*newval == true && EnableLocalExecution == false)
{
ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg(
"citus.enable_local_execution must be set in order for "
"citus.enable_fast_path_local_execution to be effective.")));
return false;
}
return true;
}
/* /*
* NoticeIfSubqueryPushdownEnabled prints a notice when a user sets * NoticeIfSubqueryPushdownEnabled prints a notice when a user sets
* citus.subquery_pushdown to ON. It doesn't print the notice if the * citus.subquery_pushdown to ON. It doesn't print the notice if the

View File

@ -287,6 +287,12 @@ CopyTaskQuery(Task *newnode, Task *from)
break; break;
} }
case TASK_QUERY_LOCAL_PLAN:
{
COPY_NODE_FIELD(taskQuery.data.localPlan);
break;
}
default: default:
{ {
break; break;

View File

@ -27,7 +27,9 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryString(Task *task, char *queryString);
extern void SetTaskQueryStringList(Task *task, List *queryStringList); extern void SetTaskQueryStringList(Task *task, List *queryStringList);
extern void SetTaskQueryPlan(Task *task, PlannedStmt *localPlan);
extern char * TaskQueryString(Task *task); extern char * TaskQueryString(Task *task);
extern PlannedStmt * TaskQueryLocalPlan(Task *task);
extern char * TaskQueryStringAtIndex(Task *task, int index); extern char * TaskQueryStringAtIndex(Task *task, int index);
extern int GetTaskQueryType(Task *task); extern int GetTaskQueryType(Task *task);
extern void AddInsertAliasIfNeeded(Query *query); extern void AddInsertAliasIfNeeded(Query *query);

View File

@ -99,6 +99,17 @@ typedef struct FastPathRestrictionContext
* Set to true when distKey = Param; in the queryTree * Set to true when distKey = Param; in the queryTree
*/ */
bool distributionKeyHasParam; bool distributionKeyHasParam;
/*
* Indicates to hold off on callning the fast path planner until its
* known if the shard is local
*/
bool delayFastPathPlanning;
/*
* Range table entry for the table we're querying
*/
RangeTblEntry *distTableRte;
} FastPathRestrictionContext; } FastPathRestrictionContext;
typedef struct PlannerRestrictionContext typedef struct PlannerRestrictionContext

View File

@ -174,7 +174,8 @@ typedef enum TaskQueryType
TASK_QUERY_NULL, TASK_QUERY_NULL,
TASK_QUERY_TEXT, TASK_QUERY_TEXT,
TASK_QUERY_OBJECT, TASK_QUERY_OBJECT,
TASK_QUERY_TEXT_LIST TASK_QUERY_TEXT_LIST,
TASK_QUERY_LOCAL_PLAN,
} TaskQueryType; } TaskQueryType;
typedef struct TaskQuery typedef struct TaskQuery
@ -219,6 +220,8 @@ typedef struct TaskQuery
* when we want to access each query string. * when we want to access each query string.
*/ */
List *queryStringList; List *queryStringList;
PlannedStmt *localPlan; /* only applies to local tasks */
}data; }data;
}TaskQuery; }TaskQuery;

View File

@ -28,6 +28,7 @@
extern bool EnableRouterExecution; extern bool EnableRouterExecution;
extern bool EnableFastPathRouterPlanner; extern bool EnableFastPathRouterPlanner;
extern bool EnableFastPathLocalExecutor;
extern bool EnableNonColocatedRouterQueryPushdown; extern bool EnableNonColocatedRouterQueryPushdown;
@ -91,7 +92,8 @@ extern void GenerateSingleShardRouterTaskList(Job *job,
List *relationShardList, List *relationShardList,
List *placementList, List *placementList,
uint64 shardId, uint64 shardId,
bool isLocalTableModification); bool isLocalTableModification,
bool delayedFastPath);
/* /*
* FastPathPlanner is a subset of router planner, that's why we prefer to * FastPathPlanner is a subset of router planner, that's why we prefer to
@ -100,7 +102,8 @@ extern void GenerateSingleShardRouterTaskList(Job *job,
extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo
boundParams); boundParams);
extern bool FastPathRouterQuery(Query *query, Node **distributionKeyValue); extern bool FastPathRouterQuery(Query *query,
FastPathRestrictionContext *fastPathContext);
extern bool JoinConditionIsOnFalse(List *relOptInfo); extern bool JoinConditionIsOnFalse(List *relOptInfo);
extern Oid ResultRelationOidForQuery(Query *query); extern Oid ResultRelationOidForQuery(Query *query);
extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId, extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId,
@ -120,5 +123,7 @@ extern Job * RouterJob(Query *originalQuery,
DeferredErrorMessage **planningError); DeferredErrorMessage **planningError);
extern bool ContainsOnlyLocalOrReferenceTables(RTEListProperties *rteProperties); extern bool ContainsOnlyLocalOrReferenceTables(RTEListProperties *rteProperties);
extern RangeTblEntry * ExtractSourceResultRangeTableEntry(Query *query); extern RangeTblEntry * ExtractSourceResultRangeTableEntry(Query *query);
extern void CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext,
DistributedPlan *plan);
#endif /* MULTI_ROUTER_PLANNER_H */ #endif /* MULTI_ROUTER_PLANNER_H */