Merge pull request #3388 from citusdata/local_prepared_on_top_lazy_deparse

Cache local plans on shards for Citus MX
pull/3399/head
Önder Kalacı 2020-01-17 17:17:41 +01:00 committed by GitHub
commit 5f34399e1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 643 additions and 79 deletions

View File

@ -15,10 +15,12 @@
#include "distributed/backend_data.h"
#include "distributed/citus_clauses.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h"
#include "distributed/local_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_router_planner.h"
@ -27,6 +29,12 @@
#include "distributed/worker_protocol.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#if PG_VERSION_NUM >= 120000
#include "optimizer/optimizer.h"
#else
#include "optimizer/planner.h"
#endif
#include "optimizer/clauses.h"
#include "utils/memutils.h"
#include "utils/rel.h"
@ -39,13 +47,16 @@ static Node * DelayedErrorCreateScan(CustomScan *scan);
/* functions that are common to different scans */
static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags);
static void CitusGenerateDeferredQueryStrings(CustomScanState *node, EState *estate, int
eflags);
static void CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate,
int eflags);
static void HandleDeferredShardPruningForFastPathQueries(
DistributedPlan *distributedPlan);
static void HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan);
static void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags);
static void CacheLocalPlanForTask(Task *task, DistributedPlan *originalDistributedPlan);
static DistributedPlan * CopyDistributedPlanWithoutCache(CitusScanState *scanState);
static void ResetExecutionParameters(EState *executorState);
static void CitusBeginScanWithoutCoordinatorProcessing(CustomScanState *node,
EState *estate, int eflags);
static void CitusEndScan(CustomScanState *node);
static void CitusReScan(CustomScanState *node);
@ -138,16 +149,12 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
if (workerJob &&
(workerJob->requiresMasterEvaluation || workerJob->deferredPruning))
{
CitusGenerateDeferredQueryStrings(node, estate, eflags);
}
CitusBeginScanWithCoordinatorProcessing(node, estate, eflags);
if (distributedPlan->modLevel == ROW_MODIFY_READONLY ||
distributedPlan->insertSelectQuery != NULL)
{
return;
}
CitusModifyBeginScan(node, estate, eflags);
CitusBeginScanWithoutCoordinatorProcessing(node, estate, eflags);
}
@ -176,33 +183,38 @@ CitusExecScan(CustomScanState *node)
/*
* CitusModifyBeginScan first evaluates expressions in the query and then
* performs shard pruning in case the partition column in an insert was
* defined as a function call.
*
* The function also checks the validity of the given custom scan node and
* gets locks on the shards involved in the task list of the distributed plan.
* CitusBeginScanWithoutCoordinatorProcessing is intended to work on all executions
* that do not require any coordinator processing. The function simply acquires the
* necessary locks on the shards involved in the task list of the distributed plan
* and does the placement assignements. This implies that the function is a no-op for
* SELECT queries as they do not require any locking and placement assignements.
*/
static void
CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
CitusBeginScanWithoutCoordinatorProcessing(CustomScanState *node, EState *estate, int
eflags)
{
CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *distributedPlan = scanState->distributedPlan;
/*
* If we have not already copied the plan into this context, do it now.
* Note that we could have copied the plan during CitusGenerateDeferredQueryStrings.
*/
if (GetMemoryChunkContext(distributedPlan) != CurrentMemoryContext)
if (distributedPlan->modLevel == ROW_MODIFY_READONLY ||
distributedPlan->insertSelectQuery != NULL)
{
distributedPlan = copyObject(distributedPlan);
scanState->distributedPlan = distributedPlan;
return;
}
/* we'll be modifying the distributed plan by assigning taskList, do it on a copy */
distributedPlan = copyObject(distributedPlan);
scanState->distributedPlan = distributedPlan;
Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList;
/*
* These more complex jobs should have been evaluated in
* CitusBeginScanWithCoordinatorProcessing.
*/
Assert(!(workerJob->requiresMasterEvaluation || workerJob->deferredPruning));
/* prevent concurrent placement changes */
AcquireMetadataLocks(taskList);
@ -212,22 +224,17 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
/*
* CitusGenerateDeferredQueryStrings generates query strings at the start of the execution
* CitusBeginScanWithCoordinatorProcessing generates query strings at the start of the execution
* in two cases: when the query requires master evaluation and/or deferred shard pruning.
*
* The function is also smart about caching plans if the plan is local to this node.
*/
static void
CitusGenerateDeferredQueryStrings(CustomScanState *node, EState *estate, int eflags)
CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, int eflags)
{
CitusScanState *scanState = (CitusScanState *) node;
/*
* We must not change the distributed plan since it may be reused across multiple
* executions of a prepared statement. Instead we create a deep copy that we only
* use for the current execution.
*/
DistributedPlan *distributedPlan = copyObject(scanState->distributedPlan);
scanState->distributedPlan = distributedPlan;
DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
DistributedPlan *distributedPlan = CopyDistributedPlanWithoutCache(scanState);
Job *workerJob = distributedPlan->workerJob;
Query *jobQuery = workerJob->jobQuery;
@ -235,7 +242,6 @@ CitusGenerateDeferredQueryStrings(CustomScanState *node, EState *estate, int efl
Assert(workerJob->requiresMasterEvaluation || workerJob->deferredPruning);
PlanState *planState = &(scanState->customScanState.ss.ps);
EState *executorState = planState->state;
/* citus only evaluates functions for modification queries */
bool modifyQueryRequiresMasterEvaluation =
@ -251,36 +257,32 @@ CitusGenerateDeferredQueryStrings(CustomScanState *node, EState *estate, int efl
modifyQueryRequiresMasterEvaluation || workerJob->deferredPruning;
if (shoudEvaluteFunctionsOrParams)
{
distributedPlan = (scanState->distributedPlan);
scanState->distributedPlan = distributedPlan;
workerJob = distributedPlan->workerJob;
jobQuery = workerJob->jobQuery;
/* evaluate functions and parameters */
ExecuteMasterEvaluableFunctions(jobQuery, planState);
/*
* We've processed parameters in ExecuteMasterEvaluableFunctions and
* don't need to send their values to workers, since they will be
* represented as constants in the deparsed query. To avoid sending
* parameter values, we set the parameter list to NULL.
*/
executorState->es_param_list_info = NULL;
}
/*
* After evaluating the function/parameters, we're done unless shard pruning
* is also deferred.
*/
if (!workerJob->deferredPruning)
if (workerJob->requiresMasterEvaluation && !workerJob->deferredPruning)
{
RebuildQueryStrings(workerJob->jobQuery, workerJob->taskList);
/* we'll use generated strings, no need to have the parameters anymore */
EState *executorState = planState->state;
ResetExecutionParameters(executorState);
return;
}
/* at this point, we're about to do the shard pruning */
Assert(workerJob->deferredPruning);
/*
* At this point, we're about to do the shard pruning for fast-path queries.
* Given that pruning is deferred always for INSERTs, we get here
* !EnableFastPathRouterPlanner as well.
*/
Assert(workerJob->deferredPruning &&
(distributedPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner));
if (jobQuery->commandType == CMD_INSERT)
{
HandleDeferredShardPruningForInserts(distributedPlan);
@ -289,6 +291,191 @@ CitusGenerateDeferredQueryStrings(CustomScanState *node, EState *estate, int efl
{
HandleDeferredShardPruningForFastPathQueries(distributedPlan);
}
if (jobQuery->commandType != CMD_SELECT)
{
/* prevent concurrent placement changes */
AcquireMetadataLocks(workerJob->taskList);
/* modify tasks are always assigned using first-replica policy */
workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList);
}
if (list_length(distributedPlan->workerJob->taskList) != 1)
{
/*
* We might have zero shard queries or multi-row INSERTs at this point,
* we only want to cache single task queries.
*/
return;
}
/*
* As long as the task accesses local node and the query doesn't have
* any volatile functions, we cache the local Postgres plan on the
* shard for re-use.
*/
Task *task = linitial(distributedPlan->workerJob->taskList);
if (EnableLocalExecution && TaskAccessesLocalNode(task) &&
!contain_volatile_functions(
(Node *) originalDistributedPlan->workerJob->jobQuery))
{
CacheLocalPlanForTask(task, originalDistributedPlan);
}
else
{
/*
* If we're not going to use a cached plan, we'll use the query string that is
* already generated where the parameters are replaced, so we should not have
* the parameters anymore.
*/
EState *executorState = planState->state;
ResetExecutionParameters(executorState);
}
}
/*
* CopyDistributedPlanWithoutCache is a helper function which copies the
* distributedPlan into the current memory context.
*
* We must not change the distributed plan since it may be reused across multiple
* executions of a prepared statement. Instead we create a deep copy that we only
* use for the current execution.
*
* We also exclude localPlannedStatements from the copyObject call for performance
* reasons, as they are immutable, so no need to have a deep copy.
*/
static DistributedPlan *
CopyDistributedPlanWithoutCache(CitusScanState *scanState)
{
DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
List *localPlannedStatements =
originalDistributedPlan->workerJob->localPlannedStatements;
originalDistributedPlan->workerJob->localPlannedStatements = NIL;
DistributedPlan *distributedPlan = copyObject(originalDistributedPlan);
scanState->distributedPlan = distributedPlan;
/* set back the immutable field */
originalDistributedPlan->workerJob->localPlannedStatements = localPlannedStatements;
distributedPlan->workerJob->localPlannedStatements = localPlannedStatements;
return distributedPlan;
}
/*
* ResetExecutionParameters set the parameter list to NULL. See the function
* for details.
*/
static void
ResetExecutionParameters(EState *executorState)
{
/*
* We've processed parameters in ExecuteMasterEvaluableFunctions and
* don't need to send their values to workers, since they will be
* represented as constants in the deparsed query. To avoid sending
* parameter values, we set the parameter list to NULL.
*/
executorState->es_param_list_info = NULL;
}
/*
* CacheLocalPlanForTask caches a plan that is local to this node in the
* originalDistributedPlan.
*
* The basic idea is to be able to skip planning on the shards when possible.
*/
static void
CacheLocalPlanForTask(Task *task, DistributedPlan *originalDistributedPlan)
{
PlannedStmt *localPlan = GetCachedLocalPlan(task, originalDistributedPlan);
if (localPlan != NULL)
{
/* we already have a local plan */
return;
}
if (list_length(task->relationShardList) == 0)
{
/* zero shard plan, no need to cache */
return;
}
/*
* All memory allocations should happen in the plan's context
* since we'll cache the local plan there.
*/
MemoryContext oldContext =
MemoryContextSwitchTo(GetMemoryChunkContext(originalDistributedPlan));
/*
* We prefer to use jobQuery (over task->query) because we don't want any
* functions/params have been evaluated in the cached plan.
*/
Query *shardQuery = copyObject(originalDistributedPlan->workerJob->jobQuery);
UpdateRelationsToLocalShardTables((Node *) shardQuery, task->relationShardList);
LOCKMODE lockMode =
IsModifyCommand(shardQuery) ? RowExclusiveLock : (shardQuery->hasForUpdate ?
RowShareLock : AccessShareLock);
/* fast path queries can only have a single RTE by definition */
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(shardQuery->rtable);
/*
* If the shard has been created in this transction, we wouldn't see the relationId
* for it, so do not cache.
*/
if (rangeTableEntry->relid == InvalidOid)
{
pfree(shardQuery);
return;
}
LockRelationOid(rangeTableEntry->relid, lockMode);
LocalPlannedStatement *localPlannedStatement = CitusMakeNode(LocalPlannedStatement);
localPlan = planner(shardQuery, 0, NULL);
localPlannedStatement->localPlan = localPlan;
localPlannedStatement->shardId = task->anchorShardId;
localPlannedStatement->localGroupId = GetLocalGroupId();
originalDistributedPlan->workerJob->localPlannedStatements =
lappend(originalDistributedPlan->workerJob->localPlannedStatements,
localPlannedStatement);
MemoryContextSwitchTo(oldContext);
}
/*
* GetCachedLocalPlan is a helper function which return the cached
* plan in the distributedPlan for the given task if exists.
*
* Otherwise, the function returns NULL.
*/
PlannedStmt *
GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan)
{
ListCell *cachedLocalPlanCell = NULL;
List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements;
foreach(cachedLocalPlanCell, cachedPlanList)
{
LocalPlannedStatement *localPlannedStatement = lfirst(cachedLocalPlanCell);
if (localPlannedStatement->shardId == task->anchorShardId &&
localPlannedStatement->localGroupId == GetLocalGroupId())
{
/* already have a cached plan, no need to continue */
return localPlannedStatement->localPlan;
}
}
return NULL;
}

View File

@ -105,7 +105,7 @@ static void SplitLocalAndRemotePlacements(List *taskPlacementList,
List **remoteTaskPlacementList);
static uint64 ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan,
char *queryString);
static void LogLocalCommand(const char *command);
static void LogLocalCommand(Task *task);
static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
@ -123,6 +123,7 @@ uint64
ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
{
EState *executorState = ScanStateGetExecutorState(scanState);
DistributedPlan *distributedPlan = scanState->distributedPlan;
ParamListInfo paramListInfo = copyParamList(executorState->es_param_list_info);
int numParams = 0;
Oid *parameterTypes = NULL;
@ -143,31 +144,58 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
{
Task *task = (Task *) lfirst(taskCell);
const char *shardQueryString = TaskQueryString(task);
Query *shardQuery = ParseQueryString(shardQueryString, parameterTypes, numParams);
PlannedStmt *localPlan = GetCachedLocalPlan(task, distributedPlan);
/*
* We should not consider using CURSOR_OPT_FORCE_DISTRIBUTED in case of
* intermediate results in the query. That'd trigger ExecuteLocalTaskPlan()
* go through the distributed executor, which we do not want since the
* query is already known to be local.
* If the plan is already cached, don't need to re-plan, just
* acquire necessary locks.
*/
int cursorOptions = 0;
if (localPlan != NULL)
{
Query *jobQuery = distributedPlan->workerJob->jobQuery;
LOCKMODE lockMode =
IsModifyCommand(jobQuery) ? RowExclusiveLock : (jobQuery->hasForUpdate ?
RowShareLock :
AccessShareLock);
/*
* Altough the shardQuery is local to this node, we prefer planner()
* over standard_planner(). The primary reason for that is Citus itself
* is not very tolarent standard_planner() calls that doesn't go through
* distributed_planner() because of the way that restriction hooks are
* implemented. So, let planner to call distributed_planner() which
* eventually calls standard_planner().
*/
PlannedStmt *localPlan = planner(shardQuery, cursorOptions, paramListInfo);
ListCell *oidCell = NULL;
foreach(oidCell, localPlan->relationOids)
{
LockRelationOid(lfirst_oid(oidCell), lockMode);
}
}
else
{
Query *shardQuery = ParseQueryString(TaskQueryString(task), parameterTypes,
numParams);
LogLocalCommand(shardQueryString);
/*
* We should not consider using CURSOR_OPT_FORCE_DISTRIBUTED in case of
* intermediate results in the query. That'd trigger ExecuteLocalTaskPlan()
* go through the distributed executor, which we do not want since the
* query is already known to be local.
*/
int cursorOptions = 0;
/*
* Altough the shardQuery is local to this node, we prefer planner()
* over standard_planner(). The primary reason for that is Citus itself
* is not very tolarent standard_planner() calls that doesn't go through
* distributed_planner() because of the way that restriction hooks are
* implemented. So, let planner to call distributed_planner() which
* eventually calls standard_planner().
*/
localPlan = planner(shardQuery, cursorOptions, paramListInfo);
}
LogLocalCommand(task);
char *shardQueryString = task->queryStringLazy
? task->queryStringLazy
: "<optimized out by local execution>";
totalRowsProcessed +=
ExecuteLocalTaskPlan(scanState, localPlan, TaskQueryString(task));
ExecuteLocalTaskPlan(scanState, localPlan, shardQueryString);
}
return totalRowsProcessed;
@ -482,7 +510,7 @@ ErrorIfLocalExecutionHappened(void)
* meaning it is part of distributed execution.
*/
static void
LogLocalCommand(const char *command)
LogLocalCommand(Task *task)
{
if (!(LogRemoteCommands || LogLocalCommands))
{
@ -490,7 +518,7 @@ LogLocalCommand(const char *command)
}
ereport(NOTICE, (errmsg("executing the command locally: %s",
ApplyLogRedaction(command))));
ApplyLogRedaction(TaskQueryString(task)))));
}

View File

@ -264,6 +264,81 @@ UpdateRelationToShardNames(Node *node, List *relationShardList)
}
/*
* UpdateRelationsToLocalShardTables walks over the query tree and appends shard ids to
* relations. The caller is responsible for ensuring that the resulting Query can
* be executed locally.
*/
bool
UpdateRelationsToLocalShardTables(Node *node, List *relationShardList)
{
if (node == NULL)
{
return false;
}
/* want to look at all RTEs, even in subqueries, CTEs and such */
if (IsA(node, Query))
{
return query_tree_walker((Query *) node, UpdateRelationsToLocalShardTables,
relationShardList, QTW_EXAMINE_RTES_BEFORE);
}
if (!IsA(node, RangeTblEntry))
{
return expression_tree_walker(node, UpdateRelationsToLocalShardTables,
relationShardList);
}
RangeTblEntry *newRte = (RangeTblEntry *) node;
if (newRte->rtekind != RTE_RELATION)
{
return false;
}
/*
* Search for the restrictions associated with the RTE. There better be
* some, otherwise this query wouldn't be elegible as a router query.
*
* FIXME: We should probably use a hashtable here, to do efficient
* lookup.
*/
ListCell *relationShardCell = NULL;
RelationShard *relationShard = NULL;
foreach(relationShardCell, relationShardList)
{
relationShard = (RelationShard *) lfirst(relationShardCell);
if (newRte->relid == relationShard->relationId)
{
break;
}
relationShard = NULL;
}
/* the function should only be called with local shards */
if (relationShard == NULL)
{
return true;
}
uint64 shardId = relationShard->shardId;
Oid relationId = relationShard->relationId;
char *relationName = get_rel_name(relationId);
AppendShardIdToName(&relationName, shardId);
Oid schemaId = get_rel_namespace(relationId);
newRte->relid = get_relname_relid(relationName, schemaId);
return false;
}
/*
* ConvertRteToSubqueryWithEmptyResult converts given relation RTE into
* subquery RTE that returns no results.

View File

@ -85,6 +85,7 @@ copyJobInfo(Job *newnode, Job *from)
COPY_SCALAR_FIELD(requiresMasterEvaluation);
COPY_SCALAR_FIELD(deferredPruning);
COPY_NODE_FIELD(partitionKeyValue);
COPY_NODE_FIELD(localPlannedStatements);
}
@ -266,6 +267,17 @@ CopyNodeTask(COPYFUNC_ARGS)
}
void
CopyNodeLocalPlannedStatement(COPYFUNC_ARGS)
{
DECLARE_FROM_AND_NEW_NODE(LocalPlannedStatement);
COPY_SCALAR_FIELD(shardId);
COPY_SCALAR_FIELD(localGroupId);
COPY_NODE_FIELD(localPlan);
}
void
CopyNodeTaskExecution(COPYFUNC_ARGS)
{

View File

@ -36,6 +36,7 @@ static const char *CitusNodeTagNamesD[] = {
"DistributedPlan",
"DistributedSubPlan",
"Task",
"LocalPlannedStatement",
"TaskExecution",
"ShardInterval",
"ShardPlacement",
@ -388,6 +389,7 @@ const ExtensibleNodeMethods nodeMethods[] =
DEFINE_NODE_METHODS(RelationShard),
DEFINE_NODE_METHODS(RelationRowLock),
DEFINE_NODE_METHODS(Task),
DEFINE_NODE_METHODS(LocalPlannedStatement),
DEFINE_NODE_METHODS(TaskExecution),
DEFINE_NODE_METHODS(DeferredErrorMessage),
DEFINE_NODE_METHODS(GroupShardPlacement),

View File

@ -327,6 +327,7 @@ OutJobFields(StringInfo str, const Job *node)
WRITE_BOOL_FIELD(requiresMasterEvaluation);
WRITE_BOOL_FIELD(deferredPruning);
WRITE_NODE_FIELD(partitionKeyValue);
WRITE_NODE_FIELD(localPlannedStatements);
}
@ -483,6 +484,19 @@ OutTask(OUTFUNC_ARGS)
}
void
OutLocalPlannedStatement(OUTFUNC_ARGS)
{
WRITE_LOCALS(LocalPlannedStatement);
WRITE_NODE_TYPE("LocalPlannedStatement");
WRITE_UINT64_FIELD(shardId);
WRITE_UINT_FIELD(localGroupId);
WRITE_NODE_FIELD(localPlan);
}
void
OutTaskExecution(OUTFUNC_ARGS)
{

View File

@ -189,6 +189,7 @@ readJobInfo(Job *local_node)
READ_BOOL_FIELD(requiresMasterEvaluation);
READ_BOOL_FIELD(deferredPruning);
READ_NODE_FIELD(partitionKeyValue);
READ_NODE_FIELD(localPlannedStatements);
}
@ -399,6 +400,19 @@ ReadTask(READFUNC_ARGS)
}
READFUNC_RET
ReadLocalPlannedStatement(READFUNC_ARGS)
{
READ_LOCALS(LocalPlannedStatement);
READ_UINT64_FIELD(shardId);
READ_UINT_FIELD(localGroupId);
READ_NODE_FIELD(localPlan);
READ_DONE();
}
READFUNC_RET
ReadTaskExecution(READFUNC_ARGS)
{

View File

@ -42,4 +42,6 @@ extern EState * ScanStateGetExecutorState(CitusScanState *scanState);
extern CustomScan * FetchCitusCustomScanIfExists(Plan *plan);
extern bool IsCitusPlan(Plan *plan);
extern bool IsCitusCustomScan(Plan *plan);
extern PlannedStmt * GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan);
#endif /* CITUS_CUSTOM_SCAN_H */

View File

@ -51,6 +51,7 @@ extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
extern READFUNC_RET ReadRelationShard(READFUNC_ARGS);
extern READFUNC_RET ReadRelationRowLock(READFUNC_ARGS);
extern READFUNC_RET ReadTask(READFUNC_ARGS);
extern READFUNC_RET ReadLocalPlannedStatement(READFUNC_ARGS);
extern READFUNC_RET ReadTaskExecution(READFUNC_ARGS);
extern READFUNC_RET ReadDeferredErrorMessage(READFUNC_ARGS);
extern READFUNC_RET ReadGroupShardPlacement(READFUNC_ARGS);
@ -66,6 +67,7 @@ extern void OutShardPlacement(OUTFUNC_ARGS);
extern void OutRelationShard(OUTFUNC_ARGS);
extern void OutRelationRowLock(OUTFUNC_ARGS);
extern void OutTask(OUTFUNC_ARGS);
extern void OutLocalPlannedStatement(OUTFUNC_ARGS);
extern void OutTaskExecution(OUTFUNC_ARGS);
extern void OutDeferredErrorMessage(OUTFUNC_ARGS);
extern void OutGroupShardPlacement(OUTFUNC_ARGS);
@ -91,6 +93,7 @@ extern void CopyNodeGroupShardPlacement(COPYFUNC_ARGS);
extern void CopyNodeRelationShard(COPYFUNC_ARGS);
extern void CopyNodeRelationRowLock(COPYFUNC_ARGS);
extern void CopyNodeTask(COPYFUNC_ARGS);
extern void CopyNodeLocalPlannedStatement(COPYFUNC_ARGS);
extern void CopyNodeTaskExecution(COPYFUNC_ARGS);
extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS);

View File

@ -58,6 +58,7 @@ typedef enum CitusNodeTag
T_DistributedPlan,
T_DistributedSubPlan,
T_Task,
T_LocalPlannedStatement,
T_TaskExecution,
T_ShardInterval,
T_ShardPlacement,

View File

@ -26,6 +26,6 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
extern void SetTaskQuery(Task *task, Query *query);
extern void SetTaskQueryString(Task *task, char *queryString);
extern char * TaskQueryString(Task *task);
extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList);
#endif /* DEPARSE_SHARD_QUERY_H */

View File

@ -24,6 +24,9 @@ extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
List **localTaskList, List **remoteTaskList);
extern bool ShouldExecuteTasksLocally(List *taskList);
extern void ErrorIfLocalExecutionHappened(void);
extern void SetTaskQueryAndPlacementList(Task *task, Query *query, List *placementList);
extern char * TaskQueryString(Task *task);
extern bool TaskAccessesLocalNode(Task *task);
extern void DisableLocalExecution(void);
extern bool AnyTaskAccessesRemoteNode(List *taskList);
extern bool TaskAccessesLocalNode(Task *task);

View File

@ -117,6 +117,21 @@ typedef enum RowModifyLevel
ROW_MODIFY_NONCOMMUTATIVE = 3
} RowModifyLevel;
/*
* LocalPlannedStatement represents a local plan of a shard. The scope
* for the LocalPlannedStatement is Task.
*/
typedef struct LocalPlannedStatement
{
CitusNode type;
uint64 shardId;
uint32 localGroupId;
PlannedStmt *localPlan;
} LocalPlannedStatement;
/*
* Job represents a logical unit of work that contains one set of data transfers
* in our physical plan. The physical planner maps each SQL query into one or
@ -135,6 +150,9 @@ typedef struct Job
bool requiresMasterEvaluation; /* only applies to modify jobs */
bool deferredPruning;
Const *partitionKeyValue;
/* for local shard queries, we may save the local plan here */
List *localPlannedStatements;
} Job;

View File

@ -1509,6 +1509,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM ((SELECT f
RESET client_min_messages;
RESET citus.log_local_commands;
\c - - - :master_port
SET citus.next_shard_id TO 1480000;
-- local execution with custom type
SET citus.replication_model TO "streaming";
SET citus.shard_replication_factor TO 1;
@ -1531,6 +1532,13 @@ BEGIN
INSERT INTO event_responses VALUES (p_event_id, p_user_id, p_choice)
ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response;
PERFORM count(*) FROM event_responses WHERE event_id = p_event_id;
PERFORM count(*) FROM event_responses WHERE event_id = p_event_id AND false;
UPDATE event_responses SET response = p_choice WHERE event_id = p_event_id;
END;
$fn$;
SELECT create_distributed_function('register_for_event(int,int,invite_resp)', 'p_event_id', 'event_responses');
@ -1539,13 +1547,140 @@ SELECT create_distributed_function('register_for_event(int,int,invite_resp)', 'p
(1 row)
-- call 6 times to make sure it works after the 5th time(postgres binds values after the 5th time)
-- call 7 times to make sure it works after the 5th time(postgres binds values after the 5th time)
-- after 6th, the local execution caches the local plans and uses it
-- execute it both locally and remotely
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
\c - - - :worker_2_port
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
-- values 16, 17 and 19 hits the same
-- shard, so we're re-using the same cached
-- plans per statement across different distribution
-- key values
CALL register_for_event(17, 1, 'yes');
CALL register_for_event(19, 1, 'yes');
CALL register_for_event(17, 1, 'yes');
CALL register_for_event(19, 1, 'yes');
-- should work fine if the logs are enabled
\set VERBOSITY terse
SET citus.log_local_commands TO ON;
SET client_min_messages TO DEBUG2;
CALL register_for_event(19, 1, 'yes');
NOTICE: executing the command locally: INSERT INTO public.event_responses_1480001 AS citus_table_alias (event_id, user_id, response) VALUES (19, 1, 'yes'::public.invite_resp) ON CONFLICT(event_id, user_id) DO UPDATE SET response = excluded.response
NOTICE: executing the command locally: SELECT count(*) AS count FROM public.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 19)
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS event_id, NULL::integer AS user_id, NULL::public.invite_resp AS response WHERE false) event_responses(event_id, user_id, response) WHERE ((event_id OPERATOR(pg_catalog.=) 19) AND false)
NOTICE: executing the command locally: UPDATE public.event_responses_1480001 event_responses SET response = 'yes'::public.invite_resp WHERE (event_id OPERATOR(pg_catalog.=) 19)
-- should be fine even if no parameters exists in the query
SELECT count(*) FROM event_responses WHERE event_id = 16;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: executing the command locally: SELECT count(*) AS count FROM public.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16)
count
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM event_responses WHERE event_id = 16;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: executing the command locally: SELECT count(*) AS count FROM public.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16)
count
---------------------------------------------------------------------
1
(1 row)
UPDATE event_responses SET response = 'no' WHERE event_id = 16;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: executing the command locally: UPDATE public.event_responses_1480001 event_responses SET response = 'no'::public.invite_resp WHERE (event_id OPERATOR(pg_catalog.=) 16)
INSERT INTO event_responses VALUES (16, 666, 'maybe')
ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response RETURNING *;
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: executing the command locally: INSERT INTO public.event_responses_1480001 AS citus_table_alias (event_id, user_id, response) VALUES (16, 666, 'maybe'::public.invite_resp) ON CONFLICT(event_id, user_id) DO UPDATE SET response = excluded.response RETURNING citus_table_alias.event_id, citus_table_alias.user_id, citus_table_alias.response
event_id | user_id | response
---------------------------------------------------------------------
16 | 666 | maybe
(1 row)
-- multi row INSERTs hitting the same shard
INSERT INTO event_responses VALUES (16, 666, 'maybe'), (17, 777, 'no')
ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response RETURNING *;
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: executing the command locally: INSERT INTO public.event_responses_1480001 AS citus_table_alias (event_id, user_id, response) VALUES (16,666,'maybe'::public.invite_resp), (17,777,'no'::public.invite_resp) ON CONFLICT(event_id, user_id) DO UPDATE SET response = excluded.response RETURNING citus_table_alias.event_id, citus_table_alias.user_id, citus_table_alias.response
event_id | user_id | response
---------------------------------------------------------------------
16 | 666 | maybe
17 | 777 | no
(2 rows)
-- now, similar tests with some settings changed
SET citus.enable_local_execution TO false;
SET citus.enable_fast_path_router_planner TO false;
CALL register_for_event(19, 1, 'yes');
-- should be fine even if no parameters exists in the query
SELECT count(*) FROM event_responses WHERE event_id = 16;
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
---------------------------------------------------------------------
2
(1 row)
SELECT count(*) FROM event_responses WHERE event_id = 16;
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
---------------------------------------------------------------------
2
(1 row)
UPDATE event_responses SET response = 'no' WHERE event_id = 16;
DEBUG: Creating router plan
DEBUG: Plan is router executable
INSERT INTO event_responses VALUES (16, 666, 'maybe')
ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response RETURNING *;
DEBUG: Creating router plan
DEBUG: Plan is router executable
event_id | user_id | response
---------------------------------------------------------------------
16 | 666 | maybe
(1 row)
-- multi row INSERTs hitting the same shard
INSERT INTO event_responses VALUES (16, 666, 'maybe'), (17, 777, 'no')
ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response RETURNING *;
DEBUG: Creating router plan
DEBUG: Plan is router executable
event_id | user_id | response
---------------------------------------------------------------------
16 | 666 | maybe
17 | 777 | no
(2 rows)
\c - - - :master_port
SET client_min_messages TO ERROR;
SET search_path TO public;
DROP SCHEMA local_shard_execution CASCADE;

View File

@ -773,7 +773,7 @@ RESET client_min_messages;
RESET citus.log_local_commands;
\c - - - :master_port
SET citus.next_shard_id TO 1480000;
-- local execution with custom type
SET citus.replication_model TO "streaming";
SET citus.shard_replication_factor TO 1;
@ -794,18 +794,88 @@ BEGIN
INSERT INTO event_responses VALUES (p_event_id, p_user_id, p_choice)
ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response;
PERFORM count(*) FROM event_responses WHERE event_id = p_event_id;
PERFORM count(*) FROM event_responses WHERE event_id = p_event_id AND false;
UPDATE event_responses SET response = p_choice WHERE event_id = p_event_id;
END;
$fn$;
SELECT create_distributed_function('register_for_event(int,int,invite_resp)', 'p_event_id', 'event_responses');
-- call 6 times to make sure it works after the 5th time(postgres binds values after the 5th time)
-- call 7 times to make sure it works after the 5th time(postgres binds values after the 5th time)
-- after 6th, the local execution caches the local plans and uses it
-- execute it both locally and remotely
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
\c - - - :worker_2_port
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
-- values 16, 17 and 19 hits the same
-- shard, so we're re-using the same cached
-- plans per statement across different distribution
-- key values
CALL register_for_event(17, 1, 'yes');
CALL register_for_event(19, 1, 'yes');
CALL register_for_event(17, 1, 'yes');
CALL register_for_event(19, 1, 'yes');
-- should work fine if the logs are enabled
\set VERBOSITY terse
SET citus.log_local_commands TO ON;
SET client_min_messages TO DEBUG2;
CALL register_for_event(19, 1, 'yes');
-- should be fine even if no parameters exists in the query
SELECT count(*) FROM event_responses WHERE event_id = 16;
SELECT count(*) FROM event_responses WHERE event_id = 16;
UPDATE event_responses SET response = 'no' WHERE event_id = 16;
INSERT INTO event_responses VALUES (16, 666, 'maybe')
ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response RETURNING *;
-- multi row INSERTs hitting the same shard
INSERT INTO event_responses VALUES (16, 666, 'maybe'), (17, 777, 'no')
ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response RETURNING *;
-- now, similar tests with some settings changed
SET citus.enable_local_execution TO false;
SET citus.enable_fast_path_router_planner TO false;
CALL register_for_event(19, 1, 'yes');
-- should be fine even if no parameters exists in the query
SELECT count(*) FROM event_responses WHERE event_id = 16;
SELECT count(*) FROM event_responses WHERE event_id = 16;
UPDATE event_responses SET response = 'no' WHERE event_id = 16;
INSERT INTO event_responses VALUES (16, 666, 'maybe')
ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response RETURNING *;
-- multi row INSERTs hitting the same shard
INSERT INTO event_responses VALUES (16, 666, 'maybe'), (17, 777, 'no')
ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response RETURNING *;
\c - - - :master_port
SET client_min_messages TO ERROR;
SET search_path TO public;