Make the caching generic, not only for local node

remote_prepared_txes
Onder Kalaci 2022-11-15 17:30:02 +01:00
parent e451301750
commit 91530ebe30
5 changed files with 32 additions and 21 deletions

View File

@ -91,7 +91,7 @@ CacheFastPathPlanForShardQuery(Task *task, DistributedPlan *originalDistributedP
planCache = planner(localShardQuery, NULL, 0, NULL);
fastPathPlanCache->localPlan = planCache;
fastPathPlanCache->shardId = task->anchorShardId;
fastPathPlanCache->localGroupId = GetLocalGroupId();
fastPathPlanCache->placementGroupIds = TaskGroupIdAccesses(task);
originalDistributedPlan->workerJob->fastPathPlanCacheList =
lappend(originalDistributedPlan->workerJob->fastPathPlanCacheList,
@ -241,12 +241,13 @@ GetFastPathLocalPlan(Task *task, DistributedPlan *distributedPlan)
List *cachedPlanList = distributedPlan->workerJob->fastPathPlanCacheList;
FastPathPlanCache *fastPathPlanCache = NULL;
int32 localGroupId = GetLocalGroupId();
List *taskGroupIdAccesses = TaskGroupIdAccesses(task);
foreach_ptr(fastPathPlanCache, cachedPlanList)
{
if (fastPathPlanCache->shardId == task->anchorShardId &&
fastPathPlanCache->localGroupId == localGroupId)
list_difference_int(taskGroupIdAccesses,
fastPathPlanCache->placementGroupIds) == NIL)
{
/* already have a cached plan, no need to continue */
return fastPathPlanCache->localPlan;
@ -295,22 +296,12 @@ IsFastPathPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistrib
return false;
}
Task *task = linitial(taskList);
if (!TaskAccessesLocalNode(task))
if (!TaskAccessesLocalNode(linitial(taskList)))
{
/* not a local task */
return false;
}
if (!EnableLocalExecution)
{
/* user requested not to use local execution */
return false;
}
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED)
{
/* transaction already connected to localhost */
/*
* TODO: we'll remove this, but for the tests not to
* break, keep on this temp commit.
*/
return false;
}

View File

@ -5548,6 +5548,25 @@ AssignDataFetchDependencies(List *taskList)
}
/*
* TaskAccessesLocalNode returns true if any placements of the task reside on
* the node that we're executing the query.
*/
List *
TaskGroupIdAccesses(Task *task)
{
List *taskGroupIdAccesses = NIL;
ShardPlacement *taskPlacement = NULL;
foreach_ptr(taskPlacement, task->taskPlacementList)
{
taskGroupIdAccesses =
list_append_unique_int(taskGroupIdAccesses, taskPlacement->groupId);
}
return taskGroupIdAccesses;
}
/*
* TaskListHighestTaskId walks over tasks in the given task list, finds the task
* that has the largest taskId, and returns that taskId.

View File

@ -338,7 +338,7 @@ CopyNodeFastPathPlanCache(COPYFUNC_ARGS)
DECLARE_FROM_AND_NEW_NODE(FastPathPlanCache);
COPY_SCALAR_FIELD(shardId);
COPY_SCALAR_FIELD(localGroupId);
COPY_NODE_FIELD(placementGroupIds);
COPY_NODE_FIELD(localPlan);
}

View File

@ -547,7 +547,7 @@ OutFastPathPlanCache(OUTFUNC_ARGS)
WRITE_NODE_TYPE("FastPathPlanCache");
WRITE_UINT64_FIELD(shardId);
WRITE_UINT_FIELD(localGroupId);
WRITE_NODE_FIELD(placementGroupIds);
WRITE_NODE_FIELD(localPlan);
}

View File

@ -119,7 +119,7 @@ typedef struct FastPathPlanCache
CitusNode type;
uint64 shardId;
uint32 localGroupId;
List *placementGroupIds;
PlannedStmt *localPlan;
} FastPathPlanCache;
@ -577,6 +577,7 @@ extern List * RoundRobinAssignTaskList(List *taskList);
extern List * RoundRobinReorder(List *placementList);
extern void SetPlacementNodeMetadata(ShardPlacement *placement, WorkerNode *workerNode);
extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement);
extern List * TaskGroupIdAccesses(Task *task);
extern int CompareTasksByExecutionDuration(const void *leftElement, const
void *rightElement);