diff --git a/src/backend/distributed/planner/fast_path_plan_cache.c b/src/backend/distributed/planner/fast_path_plan_cache.c index 0a897a29b..b341be25e 100644 --- a/src/backend/distributed/planner/fast_path_plan_cache.c +++ b/src/backend/distributed/planner/fast_path_plan_cache.c @@ -91,7 +91,7 @@ CacheFastPathPlanForShardQuery(Task *task, DistributedPlan *originalDistributedP planCache = planner(localShardQuery, NULL, 0, NULL); fastPathPlanCache->localPlan = planCache; fastPathPlanCache->shardId = task->anchorShardId; - fastPathPlanCache->localGroupId = GetLocalGroupId(); + fastPathPlanCache->placementGroupIds = TaskGroupIdAccesses(task); originalDistributedPlan->workerJob->fastPathPlanCacheList = lappend(originalDistributedPlan->workerJob->fastPathPlanCacheList, @@ -241,12 +241,13 @@ GetFastPathLocalPlan(Task *task, DistributedPlan *distributedPlan) List *cachedPlanList = distributedPlan->workerJob->fastPathPlanCacheList; FastPathPlanCache *fastPathPlanCache = NULL; - int32 localGroupId = GetLocalGroupId(); + List *taskGroupIdAccesses = TaskGroupIdAccesses(task); foreach_ptr(fastPathPlanCache, cachedPlanList) { if (fastPathPlanCache->shardId == task->anchorShardId && - fastPathPlanCache->localGroupId == localGroupId) + list_difference_int(taskGroupIdAccesses, + fastPathPlanCache->placementGroupIds) == NIL) { /* already have a cached plan, no need to continue */ return fastPathPlanCache->localPlan; @@ -295,22 +296,12 @@ IsFastPathPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistrib return false; } - Task *task = linitial(taskList); - if (!TaskAccessesLocalNode(task)) + if (!TaskAccessesLocalNode(linitial(taskList))) { - /* not a local task */ - return false; - } - - if (!EnableLocalExecution) - { - /* user requested not to use local execution */ - return false; - } - - if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED) - { - /* transaction already connected to localhost */ + /* + * TODO: we'll remove this, but for the tests not to + * break, keep on this temp commit. + */ return false; } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index f909b48ce..78c083542 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -5548,6 +5548,25 @@ AssignDataFetchDependencies(List *taskList) } +/* + * TaskAccessesLocalNode returns true if any placements of the task reside on + * the node that we're executing the query. + */ +List * +TaskGroupIdAccesses(Task *task) +{ + List *taskGroupIdAccesses = NIL; + ShardPlacement *taskPlacement = NULL; + foreach_ptr(taskPlacement, task->taskPlacementList) + { + taskGroupIdAccesses = + list_append_unique_int(taskGroupIdAccesses, taskPlacement->groupId); + } + + return taskGroupIdAccesses; +} + + /* * TaskListHighestTaskId walks over tasks in the given task list, finds the task * that has the largest taskId, and returns that taskId. diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 1129e554f..0faadb2fa 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -338,7 +338,7 @@ CopyNodeFastPathPlanCache(COPYFUNC_ARGS) DECLARE_FROM_AND_NEW_NODE(FastPathPlanCache); COPY_SCALAR_FIELD(shardId); - COPY_SCALAR_FIELD(localGroupId); + COPY_NODE_FIELD(placementGroupIds); COPY_NODE_FIELD(localPlan); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index b47e95c3a..863097706 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -547,7 +547,7 @@ OutFastPathPlanCache(OUTFUNC_ARGS) WRITE_NODE_TYPE("FastPathPlanCache"); WRITE_UINT64_FIELD(shardId); - WRITE_UINT_FIELD(localGroupId); + WRITE_NODE_FIELD(placementGroupIds); WRITE_NODE_FIELD(localPlan); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 4ddee6376..ecee58fde 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -119,7 +119,7 @@ typedef struct FastPathPlanCache CitusNode type; uint64 shardId; - uint32 localGroupId; + List *placementGroupIds; PlannedStmt *localPlan; } FastPathPlanCache; @@ -577,6 +577,7 @@ extern List * RoundRobinAssignTaskList(List *taskList); extern List * RoundRobinReorder(List *placementList); extern void SetPlacementNodeMetadata(ShardPlacement *placement, WorkerNode *workerNode); extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement); +extern List * TaskGroupIdAccesses(Task *task); extern int CompareTasksByExecutionDuration(const void *leftElement, const void *rightElement);