diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 5ba60b5ad..3b3b4e1d1 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -686,7 +686,8 @@ RegenerateTaskForFasthPathQuery(Job *workerJob) relationShardList, placementList, shardId, - isLocalTableModification); + isLocalTableModification, + false); } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 18563c763..a20d949eb 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -313,6 +313,7 @@ ExecuteLocalTaskListExtended(List *taskList, { int taskNumParams = numParams; Oid *taskParameterTypes = parameterTypes; + int taskType = GetTaskQueryType(task); if (task->parametersInQueryStringResolved) { @@ -330,7 +331,7 @@ ExecuteLocalTaskListExtended(List *taskList, * for concatenated strings, we set queryStringList so that we can access * each query string. */ - if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) + if (taskType == TASK_QUERY_TEXT_LIST) { List *queryStringList = task->taskQuery.data.queryStringList; totalRowsProcessed += @@ -342,22 +343,28 @@ ExecuteLocalTaskListExtended(List *taskList, continue; } - Query *shardQuery = ParseQueryString(TaskQueryString(task), - taskParameterTypes, - taskNumParams); + if (taskType != TASK_QUERY_LOCAL_PLAN) + { + Query *shardQuery = ParseQueryString(TaskQueryString(task), + taskParameterTypes, + taskNumParams); + int cursorOptions = CURSOR_OPT_PARALLEL_OK; - int cursorOptions = CURSOR_OPT_PARALLEL_OK; - - /* - * Altough the shardQuery is local to this node, we prefer planner() - * over standard_planner(). The primary reason for that is Citus itself - * is not very tolarent standard_planner() calls that doesn't go through - * distributed_planner() because of the way that restriction hooks are - * implemented. So, let planner to call distributed_planner() which - * eventually calls standard_planner(). - */ - localPlan = planner(shardQuery, NULL, cursorOptions, paramListInfo); + /* + * Altough the shardQuery is local to this node, we prefer planner() + * over standard_planner(). The primary reason for that is Citus itself + * is not very tolarent standard_planner() calls that doesn't go through + * distributed_planner() because of the way that restriction hooks are + * implemented. So, let planner to call distributed_planner() which + * eventually calls standard_planner(). + */ + localPlan = planner(shardQuery, NULL, cursorOptions, paramListInfo); + } + else + { + localPlan = TaskQueryLocalPlan(task); + } } char *shardQueryString = NULL; diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 2542d931a..c889e00ad 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -439,6 +439,24 @@ SetTaskQueryStringList(Task *task, List *queryStringList) } +void +SetTaskQueryPlan(Task *task, PlannedStmt *localPlan) +{ + Assert(localPlan != NULL); + task->taskQuery.queryType = TASK_QUERY_LOCAL_PLAN; + task->taskQuery.data.localPlan = localPlan; + task->queryCount = 1; +} + + +PlannedStmt * +TaskQueryLocalPlan(Task *task) +{ + Assert(task->taskQuery.queryType == TASK_QUERY_LOCAL_PLAN); + return task->taskQuery.data.localPlan; +} + + /* * DeparseTaskQuery is a general way of deparsing a query based on a task. */ @@ -497,6 +515,8 @@ TaskQueryStringAtIndex(Task *task, int index) } +static char *qry_unavailable_msg = "SELECT 'Task query unavailable - optimized away'"; + /* * TaskQueryString generates task query string text if missing. * @@ -524,6 +544,10 @@ TaskQueryString(Task *task) { return task->taskQuery.data.queryStringLazy; } + else if (taskQueryType == TASK_QUERY_LOCAL_PLAN) + { + return qry_unavailable_msg; + } Query *jobQueryReferenceForLazyDeparsing = task->taskQuery.data.jobQueryReferenceForLazyDeparsing; diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 193e2f250..bcc09eede 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -135,13 +135,13 @@ static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo, Const *resultFormatConst); static List * OuterPlanParamsList(PlannerInfo *root); static List * CopyPlanParamList(List *originalPlanParamList); -static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void); +static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext( + FastPathRestrictionContext *fastPathContext); static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void); static void PopPlannerRestrictionContext(void); static void ResetPlannerRestrictionContext( PlannerRestrictionContext *plannerRestrictionContext); -static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, - Node *distributionKeyValue); +static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext); static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, int rteIdCounter); static RTEListProperties * GetRTEListProperties(List *rangeTableList); @@ -166,7 +166,7 @@ distributed_planner(Query *parse, { bool needsDistributedPlanning = false; bool fastPathRouterQuery = false; - Node *distributionKeyValue = NULL; + FastPathRestrictionContext fastPathContext = { 0 }; List *rangeTableList = ExtractRangeTableEntryList(parse); @@ -191,8 +191,7 @@ distributed_planner(Query *parse, &maybeHasForeignDistributedTable); if (needsDistributedPlanning) { - fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue); - + fastPathRouterQuery = FastPathRouterQuery(parse, &fastPathContext); if (maybeHasForeignDistributedTable) { WarnIfListHasForeignDistributedTable(rangeTableList); @@ -247,8 +246,9 @@ distributed_planner(Query *parse, */ HideCitusDependentObjectsOnQueriesOfPgMetaTables((Node *) parse, NULL); - /* create a restriction context and put it at the end if context list */ - planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(); + /* create a restriction context and put it at the end of context list */ + planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext( + &fastPathContext); /* * We keep track of how many times we've recursed into the planner, primarily @@ -264,7 +264,7 @@ distributed_planner(Query *parse, { if (fastPathRouterQuery) { - result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue); + result = PlanFastPathDistributedStmt(&planContext); } else { @@ -649,30 +649,17 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan) * the FastPathPlanner. */ static PlannedStmt * -PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, - Node *distributionKeyValue) +PlanFastPathDistributedStmt(DistributedPlanningContext *planContext) { FastPathRestrictionContext *fastPathContext = planContext->plannerRestrictionContext->fastPathRestrictionContext; - planContext->plannerRestrictionContext->fastPathRestrictionContext-> - fastPathRouterQuery = true; - - if (distributionKeyValue == NULL) + if (!fastPathContext->delayFastPathPlanning) { - /* nothing to record */ + planContext->plan = FastPathPlanner(planContext->originalQuery, + planContext->query, + planContext->boundParams); } - else if (IsA(distributionKeyValue, Const)) - { - fastPathContext->distributionKeyValue = (Const *) distributionKeyValue; - } - else if (IsA(distributionKeyValue, Param)) - { - fastPathContext->distributionKeyHasParam = true; - } - - planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query, - planContext->boundParams); return CreateDistributedPlannedStmt(planContext); } @@ -803,6 +790,8 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) RaiseDeferredError(distributedPlan->planningError, ERROR); } + CheckAndBuildDelayedFastPathPlan(planContext, distributedPlan); + /* remember the plan's identifier for identifying subplans */ distributedPlan->planId = planId; @@ -2407,13 +2396,15 @@ CopyPlanParamList(List *originalPlanParamList) /* - * CreateAndPushPlannerRestrictionContext creates a new relation restriction context - * and a new join context, inserts it to the beginning of the - * plannerRestrictionContextList. Finally, the planner restriction context is - * inserted to the beginning of the plannerRestrictionContextList and it is returned. + * CreateAndPushPlannerRestrictionContext creates a new planner restriction + * context with an empty relation restriction context and an empty join and + * a copy of the given fast path restriction context (if present). Finally, + * the planner restriction context is inserted to the beginning of the + * global plannerRestrictionContextList and it is returned. */ static PlannerRestrictionContext * -CreateAndPushPlannerRestrictionContext(void) +CreateAndPushPlannerRestrictionContext( + FastPathRestrictionContext *fastPathRestrictionContext) { PlannerRestrictionContext *plannerRestrictionContext = palloc0(sizeof(PlannerRestrictionContext)); @@ -2427,6 +2418,14 @@ CreateAndPushPlannerRestrictionContext(void) plannerRestrictionContext->fastPathRestrictionContext = palloc0(sizeof(FastPathRestrictionContext)); + if (fastPathRestrictionContext != NULL) + { + /* copy the given fast path restriction context */ + memcpy(plannerRestrictionContext->fastPathRestrictionContext, + fastPathRestrictionContext, + sizeof(FastPathRestrictionContext)); + } + plannerRestrictionContext->memoryContext = CurrentMemoryContext; /* we'll apply logical AND as we add tables */ diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 59f80bb40..c0ec05e89 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -43,6 +43,7 @@ #include "pg_version_constants.h" +#include "distributed/citus_clauses.h" #include "distributed/distributed_planner.h" #include "distributed/insert_select_planner.h" #include "distributed/metadata_cache.h" @@ -53,6 +54,7 @@ #include "distributed/shardinterval_utils.h" bool EnableFastPathRouterPlanner = true; +bool EnableFastPathLocalExecutor = true; static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, @@ -112,10 +114,6 @@ GeneratePlaceHolderPlannedStmt(Query *parse) Plan *plan = &scanNode->plan; #endif - Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; - - Assert(FastPathRouterQuery(parse, &distKey)); - /* there is only a single relation rte */ #if PG_VERSION_NUM >= PG_VERSION_16 scanNode->scan.scanrelid = 1; @@ -152,24 +150,32 @@ GeneratePlaceHolderPlannedStmt(Query *parse) /* * FastPathRouterQuery gets a query and returns true if the query is eligible for - * being a fast path router query. + * being a fast path router query. It also fills the given fastPathContext with + * details about the query such as the distribution key value (if available), + * whether the distribution key is a parameter, and the range table entry for the + * table being queried. * The requirements for the fast path query can be listed below: * * - SELECT/UPDATE/DELETE query without CTES, sublinks-subqueries, set operations * - The query should touch only a single hash distributed or reference table * - The distribution with equality operator should be in the WHERE clause * and it should be ANDed with any other filters. Also, the distribution - * key should only exists once in the WHERE clause. So basically, + * key should only exist once in the WHERE clause. So basically, * SELECT ... FROM dist_table WHERE dist_key = X * If the filter is a const, distributionKeyValue is set * - All INSERT statements (including multi-row INSERTs) as long as the commands * don't have any sublinks/CTEs etc + * - */ bool -FastPathRouterQuery(Query *query, Node **distributionKeyValue) +FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext) { FromExpr *joinTree = query->jointree; Node *quals = NULL; + bool isFastPath = false; + bool canAvoidDeparse = false; + Node *distributionKeyValue = NULL; + RangeTblEntry *rangeTableEntry = NULL; if (!EnableFastPathRouterPlanner) { @@ -201,7 +207,8 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) else if (query->commandType == CMD_INSERT) { /* we don't need to do any further checks, all INSERTs are fast-path */ - return true; + isFastPath = true; + goto returnFastPath; } /* make sure that the only range table in FROM clause */ @@ -210,7 +217,7 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) return false; } - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(query->rtable); + rangeTableEntry = (RangeTblEntry *) linitial(query->rtable); if (rangeTableEntry->rtekind != RTE_RELATION) { return false; @@ -232,45 +239,97 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) Var *distributionKey = PartitionColumn(distributedTableId, 1); if (!distributionKey) { - return true; + /* Local execution may avoid a deparse on single shard distributed tables */ + canAvoidDeparse = IsCitusTableTypeCacheEntry(cacheEntry, + SINGLE_SHARD_DISTRIBUTED); + isFastPath = true; } - /* WHERE clause should not be empty for distributed tables */ - if (joinTree == NULL || - (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && joinTree->quals == - NULL)) + if (!isFastPath) { - return false; + canAvoidDeparse = IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE); + + if (joinTree == NULL || + (joinTree->quals == NULL && !canAvoidDeparse)) + { + /* no quals, not a fast path query */ + return false; + } + + quals = joinTree->quals; + if (quals != NULL && IsA(quals, List)) + { + quals = (Node *) make_ands_explicit((List *) quals); + } + + /* + * Distribution column must be used in a simple equality match check and it must be + * place at top level conjunction operator. In simple words, we should have + * WHERE dist_key = VALUE [AND ....]; + * + * We're also not allowing any other appearances of the distribution key in the quals. + * + * Overall the logic might sound fuzzy since it involves two individual checks: + * (a) Check for top level AND operator with one side being "dist_key = const" + * (b) Only allow single appearance of "dist_key" in the quals + * + * This is to simplify both of the individual checks and omit various edge cases + * that might arise with multiple distribution keys in the quals. + */ + isFastPath = (ConjunctionContainsColumnFilter(quals, distributionKey, + &distributionKeyValue) && + !ColumnAppearsMultipleTimes(quals, distributionKey)); } - /* convert list of expressions into expression tree for further processing */ - quals = joinTree->quals; - if (quals != NULL && IsA(quals, List)) +returnFastPath: + + if (isFastPath) { - quals = (Node *) make_ands_explicit((List *) quals); + Assert(fastPathContext != NULL); + Assert(!fastPathContext->fastPathRouterQuery); + Assert(!fastPathContext->delayFastPathPlanning); + + /* + * We're looking at a fast path query, so we can fill the + * fastPathContext with relevant details. + */ + fastPathContext->fastPathRouterQuery = true; + if (distributionKeyValue == NULL) + { + /* nothing to record */ + } + else if (IsA(distributionKeyValue, Const)) + { + fastPathContext->distributionKeyValue = (Const *) distributionKeyValue; + } + else if (IsA(distributionKeyValue, Param)) + { + fastPathContext->distributionKeyHasParam = true; + } + + /* + * Note the range table entry for the table we're querying. + */ + Assert(rangeTableEntry != NULL || query->commandType == CMD_INSERT); + fastPathContext->distTableRte = rangeTableEntry; + + if (EnableFastPathLocalExecutor) + { + /* + * This fast path query may be executed by the local executor. + * We need to delay the fast path planning until we know if the + * shard is local or not. Make a final check for volatile + * functions in the query tree to determine if we should delay + * the fast path planning. + */ + fastPathContext->delayFastPathPlanning = canAvoidDeparse && + !FindNodeMatchingCheckFunction( + (Node *) query, + CitusIsVolatileFunction); + } } - /* - * Distribution column must be used in a simple equality match check and it must be - * place at top level conjunction operator. In simple words, we should have - * WHERE dist_key = VALUE [AND ....]; - * - * We're also not allowing any other appearances of the distribution key in the quals. - * - * Overall the logic might sound fuzzy since it involves two individual checks: - * (a) Check for top level AND operator with one side being "dist_key = const" - * (b) Only allow single appearance of "dist_key" in the quals - * - * This is to simplify both of the individual checks and omit various edge cases - * that might arise with multiple distribution keys in the quals. - */ - if (ConjunctionContainsColumnFilter(quals, distributionKey, distributionKeyValue) && - !ColumnAppearsMultipleTimes(quals, distributionKey)) - { - return true; - } - - return false; + return isFastPath; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 19d386343..6ab9b3601 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -20,6 +20,7 @@ #include "catalog/pg_opfamily.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "distributed/shard_utils.h" #include "executor/execdesc.h" #include "lib/stringinfo.h" #include "nodes/makefuncs.h" @@ -34,6 +35,7 @@ #include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/planmain.h" +#include "optimizer/planner.h" #include "optimizer/restrictinfo.h" #include "parser/parse_oper.h" #include "parser/parsetree.h" @@ -164,7 +166,7 @@ static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, bool isLocalTableModification, Const *partitionKeyValue, - int colocationId); + int colocationId, bool delayedFastPath); static bool RowLocksOnRelations(Node *node, List **rtiLockList); static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType @@ -173,7 +175,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList); static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList); static bool IsLocallyAccessibleCitusLocalTable(Oid relationId); - +static Query * ConvertToQueryOnShard(Query *query, Oid relationID, Oid shardRelationId); /* * CreateRouterPlan attempts to create a router executor plan for the given @@ -1940,7 +1942,9 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon { GenerateSingleShardRouterTaskList(job, relationShardList, placementList, shardId, - isLocalTableModification); + isLocalTableModification, + fastPathRestrictionContext-> + delayFastPathPlanning); } job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation; @@ -1948,6 +1952,134 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon } +void +CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext, + DistributedPlan *plan) +{ + FastPathRestrictionContext *fastPathContext = + planContext->plannerRestrictionContext->fastPathRestrictionContext; + + if (!fastPathContext->delayFastPathPlanning) + { + return; + } + + Job *job = plan->workerJob; + Assert(job != NULL); + + if (job->deferredPruning) + { + /* Call fast path query planner, Save plan in planContext->plan */ + planContext->plan = FastPathPlanner(planContext->originalQuery, + planContext->query, + planContext->boundParams); + return; + } + + List *tasks = job->taskList; + Assert(list_length(tasks) == 1); + Task *task = (Task *) linitial(tasks); + List *placements = task->taskPlacementList; + Assert(list_length(placements) > 0); + int32 localGroupId = GetLocalGroupId(); + ShardPlacement *primaryPlacement = (ShardPlacement *) linitial(placements); + List *relationShards = task->relationShardList; + Assert(list_length(relationShards) == 1); + bool isLocalExecution = (primaryPlacement->groupId == localGroupId); + + if (isLocalExecution) + { + ConvertToQueryOnShard(planContext->query, + fastPathContext->distTableRte->relid, + primaryPlacement->shardId); + + /* Plan the query with the new shard relation id */ + /* Save plan in planContext->plan */ + planContext->plan = standard_planner(planContext->query, NULL, + planContext->cursorOptions, + planContext->boundParams); + SetTaskQueryPlan(task, planContext->plan); + + ereport(DEBUG2, (errmsg("Local plan for fast-path router " + "query"))); + } + else + { + /* Call fast path query planner, Save plan in planContext->plan */ + planContext->plan = FastPathPlanner(planContext->originalQuery, + planContext->query, + planContext->boundParams); + UpdateRelationToShardNames((Node *) job->jobQuery, relationShards); + SetTaskQueryIfShouldLazyDeparse(task, job->jobQuery); + } +} + + +/* + * ConvertToQueryOnShard() converts the given query on a citus table (identified by + * citusTableOid) to a query on a shard (identified by shardId). + * + * The function assumes that the query is a "fast path" query - it has only one + * RangeTblEntry and one RTEPermissionInfo. + * + * It changes the RangeTblEntry's relid to the shard's relation id and + * changes the RTEPermissionInfo's relid to the shard's relation id also. + * It also changes the target list entries that reference the original + * citus table's relation id to the shard's relation id. Finally, it must + * apply the same lock to the shard that was applied to the citus table. + */ +static Query * +ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId) +{ + Oid shardRelationId = InvalidOid; + + Assert(list_length(query->rtable) == 1); + Assert(list_length(query->rteperminfos) == 1); + + RangeTblEntry *citusTableRte = (RangeTblEntry *) linitial(query->rtable); + Assert(citusTableRte->relid == citusTableOid); + RTEPermissionInfo *rtePermInfo = (RTEPermissionInfo *) linitial(query->rteperminfos); + + const char *citusTableName = get_rel_name(citusTableOid); + Assert(citusTableName != NULL); + + /* construct shard relation name */ + char *shardRelationName = pstrdup(citusTableName); + AppendShardIdToName(&shardRelationName, shardId); + + /* construct the schema name */ + char *schemaName = get_namespace_name(get_rel_namespace(citusTableOid)); + + /* now construct a range variable for the shard */ + RangeVar shardRangeVar = { + .relname = shardRelationName, + .schemaname = schemaName, + .inh = citusTableRte->inh, + .relpersistence = RELPERSISTENCE_PERMANENT, + }; + + /* Change the target list entries that reference the original citus table's relation id */ + ListCell *lc = NULL; + foreach(lc, query->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(lc); + if (targetEntry->resorigtbl == citusTableOid) + { + targetEntry->resorigtbl = shardRelationId; + } + } + + /* Must apply the same lock to the shard that was applied to the citus table */ + shardRelationId = RangeVarGetRelidExtended(&shardRangeVar, citusTableRte->rellockmode, + 0, NULL, NULL); /* todo - use suitable callback for perms check? */ + Assert(shardRelationId != InvalidOid); + citusTableRte->relid = shardRelationId; + rtePermInfo->relid = shardRelationId; + + return query; +} + + /* * GenerateSingleShardRouterTaskList is a wrapper around other corresponding task * list generation functions specific to single shard selects and modifications. @@ -1957,7 +2089,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon void GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, List *placementList, uint64 shardId, bool - isLocalTableModification) + isLocalTableModification, bool delayedFastPath) { Query *originalQuery = job->jobQuery; @@ -1970,7 +2102,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, shardId, job->parametersInJobQueryResolved, isLocalTableModification, - job->partitionKeyValue, job->colocationId); + job->partitionKeyValue, job->colocationId, + delayedFastPath); /* * Queries to reference tables, or distributed tables with multiple replica's have @@ -2001,7 +2134,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, shardId, job->parametersInJobQueryResolved, isLocalTableModification, - job->partitionKeyValue, job->colocationId); + job->partitionKeyValue, job->colocationId, + delayedFastPath); } } @@ -2096,7 +2230,7 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, bool isLocalTableModification, Const *partitionKeyValue, - int colocationId) + int colocationId, bool delayedFastPath) { TaskType taskType = READ_TASK; char replicationModel = 0; @@ -2168,7 +2302,10 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, task->taskPlacementList = placementList; task->partitionKeyValue = partitionKeyValue; task->colocationId = colocationId; - SetTaskQueryIfShouldLazyDeparse(task, query); + if (!delayedFastPath) + { + SetTaskQueryIfShouldLazyDeparse(task, query); + } task->anchorShardId = shardId; task->jobId = jobId; task->relationShardList = relationShardList; @@ -2449,10 +2586,15 @@ PlanRouterQuery(Query *originalQuery, /* * If this is an UPDATE or DELETE query which requires coordinator evaluation, - * don't try update shard names, and postpone that to execution phase. + * don't try update shard names, and postpone that to execution phase. Also, if + * this is a delayed fast path query, we don't update the shard names + * either, as the shard names will be updated in the fast path query planner. */ bool isUpdateOrDelete = UpdateOrDeleteOrMergeQuery(originalQuery); - if (!(isUpdateOrDelete && RequiresCoordinatorEvaluation(originalQuery))) + bool delayedFastPath = + plannerRestrictionContext->fastPathRestrictionContext->delayFastPathPlanning; + if (!(isUpdateOrDelete && RequiresCoordinatorEvaluation(originalQuery)) && + !delayedFastPath) { UpdateRelationToShardNames((Node *) originalQuery, *relationShardList); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 430eb8555..53d6f90e7 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -212,6 +212,7 @@ static const char * MaxSharedPoolSizeGucShowHook(void); static const char * LocalPoolSizeGucShowHook(void); static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source); +static bool ErrorIfLocalExectionDisabled(bool *newval, void **extra, GucSource source); static void CitusAuthHook(Port *port, int status); static bool IsSuperuser(char *userName); static void AdjustDynamicLibraryPathForCdcDecoders(void); @@ -1377,6 +1378,17 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_fast_path_local_execution", + gettext_noop("Enables the planner to avoid a query deparse and planning if " + "the shard is local to the current node."), + NULL, + &EnableFastPathLocalExecutor, + true, + PGC_USERSET, + GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, + ErrorIfLocalExectionDisabled, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_local_reference_table_foreign_keys", gettext_noop("Enables foreign keys from/to local tables"), @@ -2802,6 +2814,23 @@ WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source) } +static bool +ErrorIfLocalExectionDisabled(bool *newval, void **extra, GucSource source) +{ + if (*newval == true && EnableLocalExecution == false) + { + ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg( + "citus.enable_local_execution must be set in order for " + "citus.enable_fast_path_local_execution to be effective."))); + + return false; + } + + return true; +} + + /* * NoticeIfSubqueryPushdownEnabled prints a notice when a user sets * citus.subquery_pushdown to ON. It doesn't print the notice if the diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 4b4a334c8..e834c1470 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -287,6 +287,12 @@ CopyTaskQuery(Task *newnode, Task *from) break; } + case TASK_QUERY_LOCAL_PLAN: + { + COPY_NODE_FIELD(taskQuery.data.localPlan); + break; + } + default: { break; diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 8fb012588..7ac49f920 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -27,7 +27,9 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryStringList(Task *task, List *queryStringList); +extern void SetTaskQueryPlan(Task *task, PlannedStmt *localPlan); extern char * TaskQueryString(Task *task); +extern PlannedStmt * TaskQueryLocalPlan(Task *task); extern char * TaskQueryStringAtIndex(Task *task, int index); extern int GetTaskQueryType(Task *task); extern void AddInsertAliasIfNeeded(Query *query); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 33a9c2fa8..9c7cb0b90 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -99,6 +99,17 @@ typedef struct FastPathRestrictionContext * Set to true when distKey = Param; in the queryTree */ bool distributionKeyHasParam; + + /* + * Indicates to hold off on callning the fast path planner until its + * known if the shard is local + */ + bool delayFastPathPlanning; + + /* + * Range table entry for the table we're querying + */ + RangeTblEntry *distTableRte; } FastPathRestrictionContext; typedef struct PlannerRestrictionContext diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 475a41b37..c73b78227 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -174,7 +174,8 @@ typedef enum TaskQueryType TASK_QUERY_NULL, TASK_QUERY_TEXT, TASK_QUERY_OBJECT, - TASK_QUERY_TEXT_LIST + TASK_QUERY_TEXT_LIST, + TASK_QUERY_LOCAL_PLAN, } TaskQueryType; typedef struct TaskQuery @@ -219,6 +220,8 @@ typedef struct TaskQuery * when we want to access each query string. */ List *queryStringList; + + PlannedStmt *localPlan; /* only applies to local tasks */ }data; }TaskQuery; diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 44be2736e..97a961685 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -28,6 +28,7 @@ extern bool EnableRouterExecution; extern bool EnableFastPathRouterPlanner; +extern bool EnableFastPathLocalExecutor; extern bool EnableNonColocatedRouterQueryPushdown; @@ -91,7 +92,8 @@ extern void GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, List *placementList, uint64 shardId, - bool isLocalTableModification); + bool isLocalTableModification, + bool delayedFastPath); /* * FastPathPlanner is a subset of router planner, that's why we prefer to @@ -100,7 +102,8 @@ extern void GenerateSingleShardRouterTaskList(Job *job, extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams); -extern bool FastPathRouterQuery(Query *query, Node **distributionKeyValue); +extern bool FastPathRouterQuery(Query *query, + FastPathRestrictionContext *fastPathContext); extern bool JoinConditionIsOnFalse(List *relOptInfo); extern Oid ResultRelationOidForQuery(Query *query); extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId, @@ -120,5 +123,7 @@ extern Job * RouterJob(Query *originalQuery, DeferredErrorMessage **planningError); extern bool ContainsOnlyLocalOrReferenceTables(RTEListProperties *rteProperties); extern RangeTblEntry * ExtractSourceResultRangeTableEntry(Query *query); +extern void CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext, + DistributedPlan *plan); #endif /* MULTI_ROUTER_PLANNER_H */