Rename LocalPlannedStatement to FastPathPlanCache

remote_prepared_txes
Onder Kalaci 2022-11-15 16:58:49 +01:00
parent a093ccb2b0
commit e451301750
10 changed files with 65 additions and 65 deletions

View File

@ -274,7 +274,7 @@ CitusBeginReadOnlyScan(CustomScanState *node, EState *estate, int eflags)
/* /*
* Create a copy of the generic plan for the current execution, but make a shallow * Create a copy of the generic plan for the current execution, but make a shallow
* copy of the plan cache. That means we'll be able to access the plan cache via * copy of the plan cache. That means we'll be able to access the plan cache via
* currentPlan->workerJob->localPlannedStatements, but it will be preserved across * currentPlan->workerJob->fastPathPlanCacheList, but it will be preserved across
* executions by the prepared statement logic. * executions by the prepared statement logic.
*/ */
DistributedPlan *currentPlan = DistributedPlan *currentPlan =
@ -309,7 +309,7 @@ CitusBeginReadOnlyScan(CustomScanState *node, EState *estate, int eflags)
/* parameters are filled in, so we can generate a task for this execution */ /* parameters are filled in, so we can generate a task for this execution */
RegenerateTaskForFasthPathQuery(workerJob); RegenerateTaskForFasthPathQuery(workerJob);
if (IsLocalPlanCachingSupported(workerJob, originalDistributedPlan)) if (IsFastPathPlanCachingSupported(workerJob, originalDistributedPlan))
{ {
Task *task = linitial(workerJob->taskList); Task *task = linitial(workerJob->taskList);
@ -321,7 +321,7 @@ CitusBeginReadOnlyScan(CustomScanState *node, EState *estate, int eflags)
* The plan will be cached across executions when originalDistributedPlan * The plan will be cached across executions when originalDistributedPlan
* represents a prepared statement. * represents a prepared statement.
*/ */
CacheLocalPlanForShardQuery(task, originalDistributedPlan, CacheFastPathPlanForShardQuery(task, originalDistributedPlan,
estate->es_param_list_info); estate->es_param_list_info);
} }
} }
@ -432,7 +432,7 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
* Now that we have populated the task placements we can determine whether * Now that we have populated the task placements we can determine whether
* any of them are local to this node and cache a plan if needed. * any of them are local to this node and cache a plan if needed.
*/ */
if (IsLocalPlanCachingSupported(workerJob, originalDistributedPlan)) if (IsFastPathPlanCachingSupported(workerJob, originalDistributedPlan))
{ {
Task *task = linitial(workerJob->taskList); Task *task = linitial(workerJob->taskList);
@ -449,7 +449,7 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
* The plan will be cached across executions when originalDistributedPlan * The plan will be cached across executions when originalDistributedPlan
* represents a prepared statement. * represents a prepared statement.
*/ */
CacheLocalPlanForShardQuery(task, originalDistributedPlan, CacheFastPathPlanForShardQuery(task, originalDistributedPlan,
estate->es_param_list_info); estate->es_param_list_info);
} }
@ -548,21 +548,21 @@ ModifyJobNeedsEvaluation(Job *workerJob)
* executions of a prepared statement. Instead we create a deep copy that we only * executions of a prepared statement. Instead we create a deep copy that we only
* use for the current execution. * use for the current execution.
* *
* We also exclude localPlannedStatements from the copyObject call for performance * We also exclude fastPathPlanCacheList from the copyObject call for performance
* reasons, as they are immutable, so no need to have a deep copy. * reasons, as they are immutable, so no need to have a deep copy.
*/ */
static DistributedPlan * static DistributedPlan *
CopyDistributedPlanWithoutCache(DistributedPlan *originalDistributedPlan) CopyDistributedPlanWithoutCache(DistributedPlan *originalDistributedPlan)
{ {
List *localPlannedStatements = List *fastPathPlanCacheList =
originalDistributedPlan->workerJob->localPlannedStatements; originalDistributedPlan->workerJob->fastPathPlanCacheList;
originalDistributedPlan->workerJob->localPlannedStatements = NIL; originalDistributedPlan->workerJob->fastPathPlanCacheList = NIL;
DistributedPlan *distributedPlan = copyObject(originalDistributedPlan); DistributedPlan *distributedPlan = copyObject(originalDistributedPlan);
/* set back the immutable field */ /* set back the immutable field */
originalDistributedPlan->workerJob->localPlannedStatements = localPlannedStatements; originalDistributedPlan->workerJob->fastPathPlanCacheList = fastPathPlanCacheList;
distributedPlan->workerJob->localPlannedStatements = localPlannedStatements; distributedPlan->workerJob->fastPathPlanCacheList = fastPathPlanCacheList;
return distributedPlan; return distributedPlan;
} }

View File

@ -274,19 +274,19 @@ ExecuteLocalTaskListExtended(List *taskList,
continue; continue;
} }
PlannedStmt *localPlan = GetCachedLocalPlan(task, distributedPlan); PlannedStmt *planCache = GetFastPathLocalPlan(task, distributedPlan);
/* /*
* If the plan is already cached, don't need to re-plan, just * If the plan is already cached, don't need to re-plan, just
* acquire necessary locks. * acquire necessary locks.
*/ */
if (localPlan != NULL) if (planCache != NULL)
{ {
Query *jobQuery = distributedPlan->workerJob->jobQuery; Query *jobQuery = distributedPlan->workerJob->jobQuery;
LOCKMODE lockMode = GetQueryLockMode(jobQuery); LOCKMODE lockMode = GetQueryLockMode(jobQuery);
Oid relationId = InvalidOid; Oid relationId = InvalidOid;
foreach_oid(relationId, localPlan->relationOids) foreach_oid(relationId, planCache->relationOids)
{ {
LockRelationOid(relationId, lockMode); LockRelationOid(relationId, lockMode);
} }
@ -339,7 +339,7 @@ ExecuteLocalTaskListExtended(List *taskList,
* implemented. So, let planner to call distributed_planner() which * implemented. So, let planner to call distributed_planner() which
* eventually calls standard_planner(). * eventually calls standard_planner().
*/ */
localPlan = planner(shardQuery, NULL, cursorOptions, paramListInfo); planCache = planner(shardQuery, NULL, cursorOptions, paramListInfo);
} }
char *shardQueryString = NULL; char *shardQueryString = NULL;
@ -354,7 +354,7 @@ ExecuteLocalTaskListExtended(List *taskList,
} }
totalRowsProcessed += totalRowsProcessed +=
LocallyExecuteTaskPlan(localPlan, shardQueryString, LocallyExecuteTaskPlan(planCache, shardQueryString,
tupleDest, task, paramListInfo); tupleDest, task, paramListInfo);
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);

View File

@ -32,16 +32,16 @@ static int ExtractParameterTypesForParamListInfo(ParamListInfo originalParamList
Oid **parameterTypes); Oid **parameterTypes);
/* /*
* CacheLocalPlanForShardQuery replaces the relation OIDs in the job query * CacheFastPathPlanForShardQuery replaces the relation OIDs in the job query
* with shard relation OIDs and then plans the query and caches the result * with shard relation OIDs and then plans the query and caches the result
* in the originalDistributedPlan (which may be preserved across executions). * in the originalDistributedPlan (which may be preserved across executions).
*/ */
void void
CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan, CacheFastPathPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan,
ParamListInfo paramListInfo) ParamListInfo paramListInfo)
{ {
PlannedStmt *localPlan = GetCachedLocalPlan(task, originalDistributedPlan); PlannedStmt *planCache = GetFastPathLocalPlan(task, originalDistributedPlan);
if (localPlan != NULL) if (planCache != NULL)
{ {
/* we already have a local plan */ /* we already have a local plan */
return; return;
@ -87,15 +87,15 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan
LockRelationOid(rangeTableEntry->relid, lockMode); LockRelationOid(rangeTableEntry->relid, lockMode);
LocalPlannedStatement *localPlannedStatement = CitusMakeNode(LocalPlannedStatement); FastPathPlanCache *fastPathPlanCache = CitusMakeNode(FastPathPlanCache);
localPlan = planner(localShardQuery, NULL, 0, NULL); planCache = planner(localShardQuery, NULL, 0, NULL);
localPlannedStatement->localPlan = localPlan; fastPathPlanCache->localPlan = planCache;
localPlannedStatement->shardId = task->anchorShardId; fastPathPlanCache->shardId = task->anchorShardId;
localPlannedStatement->localGroupId = GetLocalGroupId(); fastPathPlanCache->localGroupId = GetLocalGroupId();
originalDistributedPlan->workerJob->localPlannedStatements = originalDistributedPlan->workerJob->fastPathPlanCacheList =
lappend(originalDistributedPlan->workerJob->localPlannedStatements, lappend(originalDistributedPlan->workerJob->fastPathPlanCacheList,
localPlannedStatement); fastPathPlanCache);
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
} }
@ -232,24 +232,24 @@ ExtractParameterTypesForParamListInfo(ParamListInfo originalParamListInfo,
* Otherwise, the function returns NULL. * Otherwise, the function returns NULL.
*/ */
PlannedStmt * PlannedStmt *
GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan) GetFastPathLocalPlan(Task *task, DistributedPlan *distributedPlan)
{ {
if (distributedPlan == NULL || distributedPlan->workerJob == NULL) if (distributedPlan == NULL || distributedPlan->workerJob == NULL)
{ {
return NULL; return NULL;
} }
List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements; List *cachedPlanList = distributedPlan->workerJob->fastPathPlanCacheList;
LocalPlannedStatement *localPlannedStatement = NULL; FastPathPlanCache *fastPathPlanCache = NULL;
int32 localGroupId = GetLocalGroupId(); int32 localGroupId = GetLocalGroupId();
foreach_ptr(localPlannedStatement, cachedPlanList) foreach_ptr(fastPathPlanCache, cachedPlanList)
{ {
if (localPlannedStatement->shardId == task->anchorShardId && if (fastPathPlanCache->shardId == task->anchorShardId &&
localPlannedStatement->localGroupId == localGroupId) fastPathPlanCache->localGroupId == localGroupId)
{ {
/* already have a cached plan, no need to continue */ /* already have a cached plan, no need to continue */
return localPlannedStatement->localPlan; return fastPathPlanCache->localPlan;
} }
} }
@ -263,7 +263,7 @@ GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan)
* functions). * functions).
*/ */
bool bool
IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistributedPlan) IsFastPathPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistributedPlan)
{ {
if (originalDistributedPlan->numberOfTimesExecuted < 1) if (originalDistributedPlan->numberOfTimesExecuted < 1)
{ {

View File

@ -99,7 +99,7 @@ copyJobInfo(Job *newnode, Job *from)
COPY_SCALAR_FIELD(requiresCoordinatorEvaluation); COPY_SCALAR_FIELD(requiresCoordinatorEvaluation);
COPY_SCALAR_FIELD(deferredPruning); COPY_SCALAR_FIELD(deferredPruning);
COPY_NODE_FIELD(partitionKeyValue); COPY_NODE_FIELD(partitionKeyValue);
COPY_NODE_FIELD(localPlannedStatements); COPY_NODE_FIELD(fastPathPlanCacheList);
COPY_SCALAR_FIELD(parametersInJobQueryResolved); COPY_SCALAR_FIELD(parametersInJobQueryResolved);
} }
@ -333,9 +333,9 @@ CopyNodeTask(COPYFUNC_ARGS)
void void
CopyNodeLocalPlannedStatement(COPYFUNC_ARGS) CopyNodeFastPathPlanCache(COPYFUNC_ARGS)
{ {
DECLARE_FROM_AND_NEW_NODE(LocalPlannedStatement); DECLARE_FROM_AND_NEW_NODE(FastPathPlanCache);
COPY_SCALAR_FIELD(shardId); COPY_SCALAR_FIELD(shardId);
COPY_SCALAR_FIELD(localGroupId); COPY_SCALAR_FIELD(localGroupId);

View File

@ -40,7 +40,7 @@ static const char *CitusNodeTagNamesD[] = {
"DistributedSubPlan", "DistributedSubPlan",
"UsedDistributedSubPlan", "UsedDistributedSubPlan",
"Task", "Task",
"LocalPlannedStatement", "FastPathPlanCache",
"ShardInterval", "ShardInterval",
"ShardPlacement", "ShardPlacement",
"RelationShard", "RelationShard",
@ -397,7 +397,7 @@ const ExtensibleNodeMethods nodeMethods[] =
DEFINE_NODE_METHODS(RelationShard), DEFINE_NODE_METHODS(RelationShard),
DEFINE_NODE_METHODS(RelationRowLock), DEFINE_NODE_METHODS(RelationRowLock),
DEFINE_NODE_METHODS(Task), DEFINE_NODE_METHODS(Task),
DEFINE_NODE_METHODS(LocalPlannedStatement), DEFINE_NODE_METHODS(FastPathPlanCache),
DEFINE_NODE_METHODS(DeferredErrorMessage), DEFINE_NODE_METHODS(DeferredErrorMessage),
DEFINE_NODE_METHODS(GroupShardPlacement), DEFINE_NODE_METHODS(GroupShardPlacement),

View File

@ -345,7 +345,7 @@ OutJobFields(StringInfo str, const Job *node)
WRITE_BOOL_FIELD(requiresCoordinatorEvaluation); WRITE_BOOL_FIELD(requiresCoordinatorEvaluation);
WRITE_BOOL_FIELD(deferredPruning); WRITE_BOOL_FIELD(deferredPruning);
WRITE_NODE_FIELD(partitionKeyValue); WRITE_NODE_FIELD(partitionKeyValue);
WRITE_NODE_FIELD(localPlannedStatements); WRITE_NODE_FIELD(fastPathPlanCacheList);
WRITE_BOOL_FIELD(parametersInJobQueryResolved); WRITE_BOOL_FIELD(parametersInJobQueryResolved);
} }
@ -540,11 +540,11 @@ OutTask(OUTFUNC_ARGS)
void void
OutLocalPlannedStatement(OUTFUNC_ARGS) OutFastPathPlanCache(OUTFUNC_ARGS)
{ {
WRITE_LOCALS(LocalPlannedStatement); WRITE_LOCALS(FastPathPlanCache);
WRITE_NODE_TYPE("LocalPlannedStatement"); WRITE_NODE_TYPE("FastPathPlanCache");
WRITE_UINT64_FIELD(shardId); WRITE_UINT64_FIELD(shardId);
WRITE_UINT_FIELD(localGroupId); WRITE_UINT_FIELD(localGroupId);

View File

@ -48,7 +48,7 @@ extern void OutShardPlacement(OUTFUNC_ARGS);
extern void OutRelationShard(OUTFUNC_ARGS); extern void OutRelationShard(OUTFUNC_ARGS);
extern void OutRelationRowLock(OUTFUNC_ARGS); extern void OutRelationRowLock(OUTFUNC_ARGS);
extern void OutTask(OUTFUNC_ARGS); extern void OutTask(OUTFUNC_ARGS);
extern void OutLocalPlannedStatement(OUTFUNC_ARGS); extern void OutFastPathPlanCache(OUTFUNC_ARGS);
extern void OutDeferredErrorMessage(OUTFUNC_ARGS); extern void OutDeferredErrorMessage(OUTFUNC_ARGS);
extern void OutGroupShardPlacement(OUTFUNC_ARGS); extern void OutGroupShardPlacement(OUTFUNC_ARGS);
@ -75,7 +75,7 @@ extern void CopyNodeGroupShardPlacement(COPYFUNC_ARGS);
extern void CopyNodeRelationShard(COPYFUNC_ARGS); extern void CopyNodeRelationShard(COPYFUNC_ARGS);
extern void CopyNodeRelationRowLock(COPYFUNC_ARGS); extern void CopyNodeRelationRowLock(COPYFUNC_ARGS);
extern void CopyNodeTask(COPYFUNC_ARGS); extern void CopyNodeTask(COPYFUNC_ARGS);
extern void CopyNodeLocalPlannedStatement(COPYFUNC_ARGS); extern void CopyNodeFastPathPlanCache(COPYFUNC_ARGS);
extern void CopyNodeTaskQuery(COPYFUNC_ARGS); extern void CopyNodeTaskQuery(COPYFUNC_ARGS);
extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS); extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS);

View File

@ -59,7 +59,7 @@ typedef enum CitusNodeTag
T_DistributedSubPlan, T_DistributedSubPlan,
T_UsedDistributedSubPlan, T_UsedDistributedSubPlan,
T_Task, T_Task,
T_LocalPlannedStatement, T_FastPathPlanCache,
T_ShardInterval, T_ShardInterval,
T_ShardPlacement, T_ShardPlacement,
T_RelationShard, T_RelationShard,

View File

@ -1,11 +1,11 @@
#ifndef LOCAL_PLAN_CACHE #ifndef FAST_PATH_PLAN_CACHE
#define LOCAL_PLAN_CACHE #define FAST_PATH_PLAN_CACHE
extern bool IsLocalPlanCachingSupported(Job *currentJob, extern bool IsFastPathPlanCachingSupported(Job *currentJob,
DistributedPlan *originalDistributedPlan); DistributedPlan *originalDistributedPlan);
extern PlannedStmt * GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan); extern PlannedStmt * GetFastPathLocalPlan(Task *task, DistributedPlan *distributedPlan);
extern void CacheLocalPlanForShardQuery(Task *task, extern void CacheFastPathPlanForShardQuery(Task *task,
DistributedPlan *originalDistributedPlan, DistributedPlan *originalDistributedPlan,
ParamListInfo paramListInfo); ParamListInfo paramListInfo);
#endif /* LOCAL_PLAN_CACHE */ #endif /* FAST_PATH_PLAN_CACHE */

View File

@ -111,17 +111,17 @@ typedef enum RowModifyLevel
/* /*
* LocalPlannedStatement represents a local plan of a shard. The scope * FastPathPlanCache represents a plan of a shard. The scope
* for the LocalPlannedStatement is Task. * for the FastPathPlanCache is Task.
*/ */
typedef struct LocalPlannedStatement typedef struct FastPathPlanCache
{ {
CitusNode type; CitusNode type;
uint64 shardId; uint64 shardId;
uint32 localGroupId; uint32 localGroupId;
PlannedStmt *localPlan; PlannedStmt *localPlan;
} LocalPlannedStatement; } FastPathPlanCache;
/* /*
@ -144,7 +144,7 @@ typedef struct Job
Const *partitionKeyValue; Const *partitionKeyValue;
/* for local shard queries, we may save the local plan here */ /* for local shard queries, we may save the local plan here */
List *localPlannedStatements; List *fastPathPlanCacheList;
/* /*
* When we evaluate functions and parameters in jobQuery then we * When we evaluate functions and parameters in jobQuery then we