extract local plan cache related methods into a file (#3667)

pull/3668/head
SaitTalhaNisanci 2020-03-31 11:11:34 +03:00 committed by GitHub
parent 8dfc2cb122
commit e1802c5c00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 213 additions and 183 deletions

View File

@ -23,6 +23,7 @@
#include "distributed/insert_select_planner.h" #include "distributed/insert_select_planner.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/local_executor.h" #include "distributed/local_executor.h"
#include "distributed/local_plan_cache.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
@ -55,10 +56,6 @@ static void CitusPreExecScan(CitusScanState *scanState);
static bool ModifyJobNeedsEvaluation(Job *workerJob); static bool ModifyJobNeedsEvaluation(Job *workerJob);
static void RegenerateTaskForFasthPathQuery(Job *workerJob); static void RegenerateTaskForFasthPathQuery(Job *workerJob);
static void RegenerateTaskListForInsert(Job *workerJob); static void RegenerateTaskListForInsert(Job *workerJob);
static void CacheLocalPlanForShardQuery(Task *task,
DistributedPlan *originalDistributedPlan);
static bool IsLocalPlanCachingSupported(Job *workerJob,
DistributedPlan *originalDistributedPlan);
static DistributedPlan * CopyDistributedPlanWithoutCache( static DistributedPlan * CopyDistributedPlanWithoutCache(
DistributedPlan *originalDistributedPlan); DistributedPlan *originalDistributedPlan);
static void CitusEndScan(CustomScanState *node); static void CitusEndScan(CustomScanState *node);
@ -465,184 +462,6 @@ CopyDistributedPlanWithoutCache(DistributedPlan *originalDistributedPlan)
} }
/*
* CacheLocalPlanForShardQuery replaces the relation OIDs in the job query
* with shard relation OIDs and then plans the query and caches the result
* in the originalDistributedPlan (which may be preserved across executions).
*/
static void
CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan)
{
PlannedStmt *localPlan = GetCachedLocalPlan(task, originalDistributedPlan);
if (localPlan != NULL)
{
/* we already have a local plan */
return;
}
if (list_length(task->relationShardList) == 0)
{
/* zero shard plan, no need to cache */
return;
}
/*
* All memory allocations should happen in the plan's context
* since we'll cache the local plan there.
*/
MemoryContext oldContext =
MemoryContextSwitchTo(GetMemoryChunkContext(originalDistributedPlan));
/*
* We prefer to use jobQuery (over task->query) because we don't want any
* functions/params to have been evaluated in the cached plan.
*/
Query *shardQuery = copyObject(originalDistributedPlan->workerJob->jobQuery);
UpdateRelationsToLocalShardTables((Node *) shardQuery, task->relationShardList);
LOCKMODE lockMode =
IsModifyCommand(shardQuery) ? RowExclusiveLock : (shardQuery->hasForUpdate ?
RowShareLock : AccessShareLock);
/* fast path queries can only have a single RTE by definition */
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(shardQuery->rtable);
/*
* If the shard has been created in this transction, we wouldn't see the relationId
* for it, so do not cache.
*/
if (rangeTableEntry->relid == InvalidOid)
{
pfree(shardQuery);
MemoryContextSwitchTo(oldContext);
return;
}
if (IsLoggableLevel(DEBUG5))
{
StringInfo queryString = makeStringInfo();
pg_get_query_def(shardQuery, queryString);
ereport(DEBUG5, (errmsg("caching plan for query: %s",
queryString->data)));
}
LockRelationOid(rangeTableEntry->relid, lockMode);
LocalPlannedStatement *localPlannedStatement = CitusMakeNode(LocalPlannedStatement);
localPlan = planner(shardQuery, 0, NULL);
localPlannedStatement->localPlan = localPlan;
localPlannedStatement->shardId = task->anchorShardId;
localPlannedStatement->localGroupId = GetLocalGroupId();
originalDistributedPlan->workerJob->localPlannedStatements =
lappend(originalDistributedPlan->workerJob->localPlannedStatements,
localPlannedStatement);
MemoryContextSwitchTo(oldContext);
}
/*
* GetCachedLocalPlan is a helper function which return the cached
* plan in the distributedPlan for the given task if exists.
*
* Otherwise, the function returns NULL.
*/
PlannedStmt *
GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan)
{
if (distributedPlan->workerJob == NULL)
{
return NULL;
}
List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements;
LocalPlannedStatement *localPlannedStatement = NULL;
int32 localGroupId = GetLocalGroupId();
foreach_ptr(localPlannedStatement, cachedPlanList)
{
if (localPlannedStatement->shardId == task->anchorShardId &&
localPlannedStatement->localGroupId == localGroupId)
{
/* already have a cached plan, no need to continue */
return localPlannedStatement->localPlan;
}
}
return NULL;
}
/*
* IsLocalPlanCachingSupported returns whether (part of) the task can be planned
* and executed locally and whether caching is supported (single shard, no volatile
* functions).
*/
static bool
IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistributedPlan)
{
if (!currentJob->deferredPruning)
{
/*
* When not using deferred pruning we may have already replaced distributed
* table RTEs with citus_extradata_container RTEs to pass the shard ID to the
* deparser. In that case, we cannot pass the query tree directly to the
* planner.
*
* If desired, we can relax this check by improving the implementation of
* CacheLocalPlanForShardQuery to translate citus_extradata_container
* to a shard relation OID.
*/
return false;
}
List *taskList = currentJob->taskList;
if (list_length(taskList) != 1)
{
/* we only support plan caching for single shard queries */
return false;
}
Task *task = linitial(taskList);
if (!TaskAccessesLocalNode(task))
{
/* not a local task */
return false;
}
if (!EnableLocalExecution)
{
/* user requested not to use local execution */
return false;
}
if (TransactionConnectedToLocalGroup)
{
/* transaction already connected to localhost */
return false;
}
Query *originalJobQuery = originalDistributedPlan->workerJob->jobQuery;
if (contain_volatile_functions((Node *) originalJobQuery))
{
/*
* We do not cache plans with volatile functions in the query.
*
* The reason we care about volatile functions is primarily that we
* already executed them in ExecuteMasterEvaluableFunctionsAndParameters
* and since we're falling back to the original query tree here we would
* execute them again if we execute the plan.
*/
return false;
}
return true;
}
/* /*
* RegenerateTaskListForInsert does the shard pruning for an INSERT query * RegenerateTaskListForInsert does the shard pruning for an INSERT query
* queries and rebuilds the query strings. * queries and rebuilds the query strings.

View File

@ -82,6 +82,7 @@
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/local_executor.h" #include "distributed/local_executor.h"
#include "distributed/local_plan_cache.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"

View File

@ -0,0 +1,201 @@
/*-------------------------------------------------------------------------
*
* local_plan_cache.c
*
* Local plan cache related functions
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/local_plan_cache.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/metadata_cache.h"
#if PG_VERSION_NUM >= 120000
#include "optimizer/optimizer.h"
#else
#include "optimizer/planner.h"
#endif
#include "optimizer/clauses.h"
/*
* CacheLocalPlanForShardQuery replaces the relation OIDs in the job query
* with shard relation OIDs and then plans the query and caches the result
* in the originalDistributedPlan (which may be preserved across executions).
*/
void
CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan)
{
PlannedStmt *localPlan = GetCachedLocalPlan(task, originalDistributedPlan);
if (localPlan != NULL)
{
/* we already have a local plan */
return;
}
if (list_length(task->relationShardList) == 0)
{
/* zero shard plan, no need to cache */
return;
}
/*
* All memory allocations should happen in the plan's context
* since we'll cache the local plan there.
*/
MemoryContext oldContext =
MemoryContextSwitchTo(GetMemoryChunkContext(originalDistributedPlan));
/*
* We prefer to use jobQuery (over task->query) because we don't want any
* functions/params to have been evaluated in the cached plan.
*/
Query *shardQuery = copyObject(originalDistributedPlan->workerJob->jobQuery);
UpdateRelationsToLocalShardTables((Node *) shardQuery, task->relationShardList);
LOCKMODE lockMode =
IsModifyCommand(shardQuery) ? RowExclusiveLock : (shardQuery->hasForUpdate ?
RowShareLock : AccessShareLock);
/* fast path queries can only have a single RTE by definition */
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(shardQuery->rtable);
/*
* If the shard has been created in this transction, we wouldn't see the relationId
* for it, so do not cache.
*/
if (rangeTableEntry->relid == InvalidOid)
{
pfree(shardQuery);
MemoryContextSwitchTo(oldContext);
return;
}
if (IsLoggableLevel(DEBUG5))
{
StringInfo queryString = makeStringInfo();
pg_get_query_def(shardQuery, queryString);
ereport(DEBUG5, (errmsg("caching plan for query: %s",
queryString->data)));
}
LockRelationOid(rangeTableEntry->relid, lockMode);
LocalPlannedStatement *localPlannedStatement = CitusMakeNode(LocalPlannedStatement);
localPlan = planner(shardQuery, 0, NULL);
localPlannedStatement->localPlan = localPlan;
localPlannedStatement->shardId = task->anchorShardId;
localPlannedStatement->localGroupId = GetLocalGroupId();
originalDistributedPlan->workerJob->localPlannedStatements =
lappend(originalDistributedPlan->workerJob->localPlannedStatements,
localPlannedStatement);
MemoryContextSwitchTo(oldContext);
}
/*
* GetCachedLocalPlan is a helper function which return the cached
* plan in the distributedPlan for the given task if exists.
*
* Otherwise, the function returns NULL.
*/
PlannedStmt *
GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan)
{
if (distributedPlan->workerJob == NULL)
{
return NULL;
}
List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements;
LocalPlannedStatement *localPlannedStatement = NULL;
int32 localGroupId = GetLocalGroupId();
foreach_ptr(localPlannedStatement, cachedPlanList)
{
if (localPlannedStatement->shardId == task->anchorShardId &&
localPlannedStatement->localGroupId == localGroupId)
{
/* already have a cached plan, no need to continue */
return localPlannedStatement->localPlan;
}
}
return NULL;
}
/*
* IsLocalPlanCachingSupported returns whether (part of) the task can be planned
* and executed locally and whether caching is supported (single shard, no volatile
* functions).
*/
bool
IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistributedPlan)
{
if (!currentJob->deferredPruning)
{
/*
* When not using deferred pruning we may have already replaced distributed
* table RTEs with citus_extradata_container RTEs to pass the shard ID to the
* deparser. In that case, we cannot pass the query tree directly to the
* planner.
*
* If desired, we can relax this check by improving the implementation of
* CacheLocalPlanForShardQuery to translate citus_extradata_container
* to a shard relation OID.
*/
return false;
}
List *taskList = currentJob->taskList;
if (list_length(taskList) != 1)
{
/* we only support plan caching for single shard queries */
return false;
}
Task *task = linitial(taskList);
if (!TaskAccessesLocalNode(task))
{
/* not a local task */
return false;
}
if (!EnableLocalExecution)
{
/* user requested not to use local execution */
return false;
}
if (TransactionConnectedToLocalGroup)
{
/* transaction already connected to localhost */
return false;
}
Query *originalJobQuery = originalDistributedPlan->workerJob->jobQuery;
if (contain_volatile_functions((Node *) originalJobQuery))
{
/*
* We do not cache plans with volatile functions in the query.
*
* The reason we care about volatile functions is primarily that we
* already executed them in ExecuteMasterEvaluableFunctionsAndParameters
* and since we're falling back to the original query tree here we would
* execute them again if we execute the plan.
*/
return false;
}
return true;
}

View File

@ -46,5 +46,4 @@ extern CustomScan * FetchCitusCustomScanIfExists(Plan *plan);
extern bool IsCitusPlan(Plan *plan); extern bool IsCitusPlan(Plan *plan);
extern bool IsCitusCustomScan(Plan *plan); extern bool IsCitusCustomScan(Plan *plan);
extern PlannedStmt * GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan);
#endif /* CITUS_CUSTOM_SCAN_H */ #endif /* CITUS_CUSTOM_SCAN_H */

View File

@ -0,0 +1,10 @@
#ifndef LOCAL_PLAN_CACHE
#define LOCAL_PLAN_CACHE
extern bool IsLocalPlanCachingSupported(Job *currentJob,
DistributedPlan *originalDistributedPlan);
extern PlannedStmt * GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan);
extern void CacheLocalPlanForShardQuery(Task *task,
DistributedPlan *originalDistributedPlan);
#endif /* LOCAL_PLAN_CACHE */