Properly cache fast-path remote SELECT queries

Use all the necessary infrastructure we built in the previous commits.
remote_prepared_txes
Onder Kalaci 2022-11-16 16:03:47 +01:00
parent ccec45c63a
commit 3ece691b79
9 changed files with 75 additions and 39 deletions

View File

@ -303,26 +303,38 @@ CitusBeginReadOnlyScan(CustomScanState *node, EState *estate, int eflags)
*/
ExecuteCoordinatorEvaluableExpressions(jobQuery, planState);
/* job query no longer has parameters, so we should not send any */
workerJob->parametersInJobQueryResolved = true;
/* parameters are filled in, so we can generate a task for this execution */
/* only do the pruning, but do not deparse just yet */
RegenerateTaskForFasthPathQuery(workerJob);
if (IsFastPathPlanCachingSupported(workerJob, originalDistributedPlan))
{
Task *task = linitial(workerJob->taskList);
Task *task = linitial(workerJob->taskList);
/*
* We are going to execute this task locally. If it's not already in
* the cache, create a local plan now and add it to the cache. During
* execution, we will get the plan from the cache.
*
* The plan will be cached across executions when originalDistributedPlan
* represents a prepared statement.
*/
CacheFastPathPlanForShardQuery(task, originalDistributedPlan,
/*
* We are going to execute this task locally. If it's not already in
* the cache, create a local plan now and add it to the cache. During
* execution, we will get the plan from the cache.
*
* The plan will be cached across executions when originalDistributedPlan
* represents a prepared statement.
*/
FastPathPlanCache *fastPathPlanCache =
CacheFastPathPlanForShardQuery(task, workerJob, originalDistributedPlan,
estate->es_param_list_info);
if (fastPathPlanCache == NULL)
{
/* job query no longer has parameters, so we should not send any */
workerJob->parametersInJobQueryResolved = true;
/* do the heavy lifting of deparsing unless we cannot find any cache */
SetTaskQueryString(task, DeparseTaskQuery(task, workerJob->jobQuery));
}
else if (fastPathPlanCache->queryString != NULL)
{
SetTaskQueryString(task, fastPathPlanCache->queryString);
/* TODO: we have this due to MarkUnreferencedExternParams. Can we find another way? */
workerJob->jobQuery = copyObject(originalDistributedPlan->workerJob->jobQuery);
}
}
@ -449,7 +461,7 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
* The plan will be cached across executions when originalDistributedPlan
* represents a prepared statement.
*/
CacheFastPathPlanForShardQuery(task, originalDistributedPlan,
CacheFastPathPlanForShardQuery(task, workerJob, originalDistributedPlan,
estate->es_param_list_info);
}
@ -650,11 +662,13 @@ RegenerateTaskForFasthPathQuery(Job *workerJob)
}
bool isLocalTableModification = false;
bool deferredPruning = true;
GenerateSingleShardRouterTaskList(workerJob,
relationShardList,
placementList,
shardId,
isLocalTableModification);
isLocalTableModification,
deferredPruning);
}

View File

@ -44,7 +44,6 @@ static void UpdateTaskQueryString(Query *query, Task *task);
static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList);
static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
static bool ShouldLazyDeparseQuery(Task *task);
static char * DeparseTaskQuery(Task *task, Query *query);
/*
@ -366,7 +365,7 @@ ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte)
static bool
ShouldLazyDeparseQuery(Task *task)
{
return TaskAccessesLocalNode(task);
return task->deferredPruning;
}
@ -429,7 +428,7 @@ SetTaskQueryStringList(Task *task, List *queryStringList)
/*
* DeparseTaskQuery is a general way of deparsing a query based on a task.
*/
static char *
char *
DeparseTaskQuery(Task *task, Query *query)
{
StringInfo queryString = makeStringInfo();

View File

@ -39,22 +39,30 @@ static int ExtractParameterTypesForParamListInfo(ParamListInfo originalParamList
* CacheFastPathPlanForShardQuery replaces the relation OIDs in the job query
* with shard relation OIDs and then plans the query and caches the result
* in the originalDistributedPlan (which may be preserved across executions).
*
* TODO: update comment
*/
void
CacheFastPathPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan,
FastPathPlanCache *
CacheFastPathPlanForShardQuery(Task *task, Job *evaluatedJob,
DistributedPlan *originalDistributedPlan,
ParamListInfo paramListInfo)
{
if (!IsFastPathPlanCachingSupported(evaluatedJob, originalDistributedPlan))
{
return NULL;
}
FastPathPlanCache *planCache = GetFastPathCachedPlan(task, originalDistributedPlan);
if (planCache != NULL)
{
/* we already have a local plan */
return;
return planCache;
}
if (list_length(task->relationShardList) == 0)
{
/* zero shard plan, no need to cache */
return;
return NULL;
}
/*
@ -93,7 +101,7 @@ CacheFastPathPlanForShardQuery(Task *task, DistributedPlan *originalDistributedP
pfree(jobQuery);
pfree(localShardQuery);
MemoryContextSwitchTo(oldContext);
return;
return NULL;
}
LockRelationOid(rangeTableEntry->relid, lockMode);
@ -117,6 +125,8 @@ CacheFastPathPlanForShardQuery(Task *task, DistributedPlan *originalDistributedP
fastPathPlanCache);
MemoryContextSwitchTo(oldContext);
return fastPathPlanCache;
}

View File

@ -170,7 +170,8 @@ static int CompareInsertValuesByShardId(const void *leftElement,
static List * SingleShardTaskList(Query *query, uint64 jobId,
List *relationShardList, List *placementList,
uint64 shardId, bool parametersInQueryResolved,
bool isLocalTableModification);
bool isLocalTableModification,
bool deferredPruning);
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
TaskAssignmentPolicyType
@ -1869,7 +1870,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
{
GenerateSingleShardRouterTaskList(job, relationShardList,
placementList, shardId,
isLocalTableModification);
isLocalTableModification,
false);
}
job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation;
@ -1885,8 +1887,9 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
*/
void
GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
List *placementList, uint64 shardId, bool
isLocalTableModification)
List *placementList, uint64 shardId,
bool isLocalTableModification,
bool deferredPruning)
{
Query *originalQuery = job->jobQuery;
@ -1896,7 +1899,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
relationShardList, placementList,
shardId,
job->parametersInJobQueryResolved,
isLocalTableModification);
isLocalTableModification,
deferredPruning);
/*
* Queries to reference tables, or distributed tables with multiple replica's have
@ -1924,7 +1928,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
relationShardList, placementList,
shardId,
job->parametersInJobQueryResolved,
isLocalTableModification);
isLocalTableModification,
deferredPruning);
}
}
@ -2018,7 +2023,8 @@ static List *
SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
List *placementList, uint64 shardId,
bool parametersInQueryResolved,
bool isLocalTableModification)
bool isLocalTableModification,
bool deferredPruning)
{
TaskType taskType = READ_TASK;
char replicationModel = 0;
@ -2078,6 +2084,7 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
Task *task = CreateTask(taskType);
task->isLocalTableModification = isLocalTableModification;
task->deferredPruning = deferredPruning;
List *relationRowLockList = NIL;
RowLocksOnRelations((Node *) query, &relationRowLockList);

View File

@ -22,6 +22,7 @@
extern void RebuildQueryStrings(Job *workerJob);
extern char * DeparseTaskQuery(Task *task, Query *query);
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
extern void SetTaskQueryString(Task *task, char *queryString);

View File

@ -5,8 +5,9 @@ extern bool IsFastPathPlanCachingSupported(Job *currentJob,
DistributedPlan *originalDistributedPlan);
extern PlannedStmt * GetCachedFastPathLocalPlan(Task *task,
DistributedPlan *distributedPlan);
extern void CacheFastPathPlanForShardQuery(Task *task,
DistributedPlan *originalDistributedPlan,
ParamListInfo paramListInfo);
extern FastPathPlanCache * CacheFastPathPlanForShardQuery(Task *task, Job *evaluatedJob,
DistributedPlan *
originalDistributedPlan,
ParamListInfo paramListInfo);
#endif /* FAST_PATH_PLAN_CACHE */

View File

@ -348,6 +348,9 @@ typedef struct Task
* Vacuum, create/drop/reindex concurrently cannot be executed in a transaction.
*/
bool cannotBeExecutedInTransction;
/* TODO: add comment */
bool deferredPruning;
} Task;

View File

@ -87,7 +87,8 @@ extern void GenerateSingleShardRouterTaskList(Job *job,
List *relationShardList,
List *placementList,
uint64 shardId,
bool isLocalTableModification);
bool isLocalTableModification,
bool deferredPruning);
/*
* FastPathPlanner is a subset of router planner, that's why we prefer to

View File

@ -2633,7 +2633,7 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 10)
Filter: (a = $1)
Rows Removed by Filter: 3
EXPLAIN :default_analyze_flags EXECUTE p2(100);
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
@ -2644,7 +2644,7 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570023 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 100)
Filter: (a = $1)
Rows Removed by Filter: 1
prepare p3 AS SELECT * FROM dist_table_rep1 WHERE a = 1;
EXPLAIN :default_analyze_flags EXECUTE p3;