From 3ece691b79c66ca9c7831767c6a70721f7e18af2 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 16 Nov 2022 16:03:47 +0100 Subject: [PATCH] Properly cache fast-path remote SELECT queries Use all the necessary infrastructure we built in the previous commits. --- .../distributed/executor/citus_custom_scan.c | 50 ++++++++++++------- .../distributed/planner/deparse_shard_query.c | 5 +- .../planner/fast_path_plan_cache.c | 20 ++++++-- .../planner/multi_router_planner.c | 21 +++++--- src/include/distributed/deparse_shard_query.h | 1 + .../distributed/fast_path_plan_cache.h | 7 +-- .../distributed/multi_physical_planner.h | 3 ++ .../distributed/multi_router_planner.h | 3 +- src/test/regress/expected/multi_explain.out | 4 +- 9 files changed, 75 insertions(+), 39 deletions(-) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index da54aa52f..52f3b289f 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -303,26 +303,38 @@ CitusBeginReadOnlyScan(CustomScanState *node, EState *estate, int eflags) */ ExecuteCoordinatorEvaluableExpressions(jobQuery, planState); - /* job query no longer has parameters, so we should not send any */ - workerJob->parametersInJobQueryResolved = true; - - /* parameters are filled in, so we can generate a task for this execution */ + /* only do the pruning, but do not deparse just yet */ RegenerateTaskForFasthPathQuery(workerJob); - if (IsFastPathPlanCachingSupported(workerJob, originalDistributedPlan)) - { - Task *task = linitial(workerJob->taskList); + Task *task = linitial(workerJob->taskList); - /* - * We are going to execute this task locally. If it's not already in - * the cache, create a local plan now and add it to the cache. During - * execution, we will get the plan from the cache. - * - * The plan will be cached across executions when originalDistributedPlan - * represents a prepared statement. - */ - CacheFastPathPlanForShardQuery(task, originalDistributedPlan, + /* + * We are going to execute this task locally. If it's not already in + * the cache, create a local plan now and add it to the cache. During + * execution, we will get the plan from the cache. + * + * The plan will be cached across executions when originalDistributedPlan + * represents a prepared statement. + */ + FastPathPlanCache *fastPathPlanCache = + CacheFastPathPlanForShardQuery(task, workerJob, originalDistributedPlan, estate->es_param_list_info); + + if (fastPathPlanCache == NULL) + { + /* job query no longer has parameters, so we should not send any */ + workerJob->parametersInJobQueryResolved = true; + + + /* do the heavy lifting of deparsing unless we cannot find any cache */ + SetTaskQueryString(task, DeparseTaskQuery(task, workerJob->jobQuery)); + } + else if (fastPathPlanCache->queryString != NULL) + { + SetTaskQueryString(task, fastPathPlanCache->queryString); + + /* TODO: we have this due to MarkUnreferencedExternParams. Can we find another way? */ + workerJob->jobQuery = copyObject(originalDistributedPlan->workerJob->jobQuery); } } @@ -449,7 +461,7 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags) * The plan will be cached across executions when originalDistributedPlan * represents a prepared statement. */ - CacheFastPathPlanForShardQuery(task, originalDistributedPlan, + CacheFastPathPlanForShardQuery(task, workerJob, originalDistributedPlan, estate->es_param_list_info); } @@ -650,11 +662,13 @@ RegenerateTaskForFasthPathQuery(Job *workerJob) } bool isLocalTableModification = false; + bool deferredPruning = true; GenerateSingleShardRouterTaskList(workerJob, relationShardList, placementList, shardId, - isLocalTableModification); + isLocalTableModification, + deferredPruning); } diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 1c5c62034..edd465f6e 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -44,7 +44,6 @@ static void UpdateTaskQueryString(Query *query, Task *task); static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList); static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); static bool ShouldLazyDeparseQuery(Task *task); -static char * DeparseTaskQuery(Task *task, Query *query); /* @@ -366,7 +365,7 @@ ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte) static bool ShouldLazyDeparseQuery(Task *task) { - return TaskAccessesLocalNode(task); + return task->deferredPruning; } @@ -429,7 +428,7 @@ SetTaskQueryStringList(Task *task, List *queryStringList) /* * DeparseTaskQuery is a general way of deparsing a query based on a task. */ -static char * +char * DeparseTaskQuery(Task *task, Query *query) { StringInfo queryString = makeStringInfo(); diff --git a/src/backend/distributed/planner/fast_path_plan_cache.c b/src/backend/distributed/planner/fast_path_plan_cache.c index f503529a0..2bf5637ac 100644 --- a/src/backend/distributed/planner/fast_path_plan_cache.c +++ b/src/backend/distributed/planner/fast_path_plan_cache.c @@ -39,22 +39,30 @@ static int ExtractParameterTypesForParamListInfo(ParamListInfo originalParamList * CacheFastPathPlanForShardQuery replaces the relation OIDs in the job query * with shard relation OIDs and then plans the query and caches the result * in the originalDistributedPlan (which may be preserved across executions). + * + * TODO: update comment */ -void -CacheFastPathPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan, +FastPathPlanCache * +CacheFastPathPlanForShardQuery(Task *task, Job *evaluatedJob, + DistributedPlan *originalDistributedPlan, ParamListInfo paramListInfo) { + if (!IsFastPathPlanCachingSupported(evaluatedJob, originalDistributedPlan)) + { + return NULL; + } + FastPathPlanCache *planCache = GetFastPathCachedPlan(task, originalDistributedPlan); if (planCache != NULL) { /* we already have a local plan */ - return; + return planCache; } if (list_length(task->relationShardList) == 0) { /* zero shard plan, no need to cache */ - return; + return NULL; } /* @@ -93,7 +101,7 @@ CacheFastPathPlanForShardQuery(Task *task, DistributedPlan *originalDistributedP pfree(jobQuery); pfree(localShardQuery); MemoryContextSwitchTo(oldContext); - return; + return NULL; } LockRelationOid(rangeTableEntry->relid, lockMode); @@ -117,6 +125,8 @@ CacheFastPathPlanForShardQuery(Task *task, DistributedPlan *originalDistributedP fastPathPlanCache); MemoryContextSwitchTo(oldContext); + + return fastPathPlanCache; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 325c382c7..beb449526 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -170,7 +170,8 @@ static int CompareInsertValuesByShardId(const void *leftElement, static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, - bool isLocalTableModification); + bool isLocalTableModification, + bool deferredPruning); static bool RowLocksOnRelations(Node *node, List **rtiLockList); static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType @@ -1869,7 +1870,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon { GenerateSingleShardRouterTaskList(job, relationShardList, placementList, shardId, - isLocalTableModification); + isLocalTableModification, + false); } job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation; @@ -1885,8 +1887,9 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon */ void GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, - List *placementList, uint64 shardId, bool - isLocalTableModification) + List *placementList, uint64 shardId, + bool isLocalTableModification, + bool deferredPruning) { Query *originalQuery = job->jobQuery; @@ -1896,7 +1899,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, relationShardList, placementList, shardId, job->parametersInJobQueryResolved, - isLocalTableModification); + isLocalTableModification, + deferredPruning); /* * Queries to reference tables, or distributed tables with multiple replica's have @@ -1924,7 +1928,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, relationShardList, placementList, shardId, job->parametersInJobQueryResolved, - isLocalTableModification); + isLocalTableModification, + deferredPruning); } } @@ -2018,7 +2023,8 @@ static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, - bool isLocalTableModification) + bool isLocalTableModification, + bool deferredPruning) { TaskType taskType = READ_TASK; char replicationModel = 0; @@ -2078,6 +2084,7 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, Task *task = CreateTask(taskType); task->isLocalTableModification = isLocalTableModification; + task->deferredPruning = deferredPruning; List *relationRowLockList = NIL; RowLocksOnRelations((Node *) query, &relationRowLockList); diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 9370e51e2..38e7c40a7 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -22,6 +22,7 @@ extern void RebuildQueryStrings(Job *workerJob); +extern char * DeparseTaskQuery(Task *task, Query *query); extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); diff --git a/src/include/distributed/fast_path_plan_cache.h b/src/include/distributed/fast_path_plan_cache.h index 1472416c5..7729bb70a 100644 --- a/src/include/distributed/fast_path_plan_cache.h +++ b/src/include/distributed/fast_path_plan_cache.h @@ -5,8 +5,9 @@ extern bool IsFastPathPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistributedPlan); extern PlannedStmt * GetCachedFastPathLocalPlan(Task *task, DistributedPlan *distributedPlan); -extern void CacheFastPathPlanForShardQuery(Task *task, - DistributedPlan *originalDistributedPlan, - ParamListInfo paramListInfo); +extern FastPathPlanCache * CacheFastPathPlanForShardQuery(Task *task, Job *evaluatedJob, + DistributedPlan * + originalDistributedPlan, + ParamListInfo paramListInfo); #endif /* FAST_PATH_PLAN_CACHE */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index f487f610f..d8416e6e2 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -348,6 +348,9 @@ typedef struct Task * Vacuum, create/drop/reindex concurrently cannot be executed in a transaction. */ bool cannotBeExecutedInTransction; + + /* TODO: add comment */ + bool deferredPruning; } Task; diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 22c334a13..b690c4000 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -87,7 +87,8 @@ extern void GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, List *placementList, uint64 shardId, - bool isLocalTableModification); + bool isLocalTableModification, + bool deferredPruning); /* * FastPathPlanner is a subset of router planner, that's why we prefer to diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index eee7a6236..83b5d8424 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -2633,7 +2633,7 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) - Filter: (a = 10) + Filter: (a = $1) Rows Removed by Filter: 3 EXPLAIN :default_analyze_flags EXECUTE p2(100); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) @@ -2644,7 +2644,7 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570023 dist_table_rep1 (actual rows=1 loops=1) - Filter: (a = 100) + Filter: (a = $1) Rows Removed by Filter: 1 prepare p3 AS SELECT * FROM dist_table_rep1 WHERE a = 1; EXPLAIN :default_analyze_flags EXECUTE p3;