diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 4510e83ee..74fc1f11a 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -15,10 +15,12 @@ #include "distributed/backend_data.h" #include "distributed/citus_clauses.h" #include "distributed/citus_custom_scan.h" +#include "distributed/citus_nodefuncs.h" #include "distributed/deparse_shard_query.h" #include "distributed/distributed_execution_locks.h" #include "distributed/insert_select_executor.h" #include "distributed/insert_select_planner.h" +#include "distributed/local_executor.h" #include "distributed/multi_executor.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_router_planner.h" @@ -27,6 +29,12 @@ #include "distributed/worker_protocol.h" #include "executor/executor.h" #include "nodes/makefuncs.h" +#if PG_VERSION_NUM >= 120000 +#include "optimizer/optimizer.h" +#else +#include "optimizer/planner.h" +#endif +#include "optimizer/clauses.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -39,13 +47,16 @@ static Node * DelayedErrorCreateScan(CustomScan *scan); /* functions that are common to different scans */ static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags); -static void CitusGenerateDeferredQueryStrings(CustomScanState *node, EState *estate, int - eflags); +static void CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, + int eflags); static void HandleDeferredShardPruningForFastPathQueries( DistributedPlan *distributedPlan); static void HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan); - -static void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags); +static void CacheLocalPlanForTask(Task *task, DistributedPlan *originalDistributedPlan); +static DistributedPlan * CopyDistributedPlanWithoutCache(CitusScanState *scanState); +static void ResetExecutionParameters(EState *executorState); +static void CitusBeginScanWithoutCoordinatorProcessing(CustomScanState *node, + EState *estate, int eflags); static void CitusEndScan(CustomScanState *node); static void CitusReScan(CustomScanState *node); @@ -138,16 +149,12 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags) if (workerJob && (workerJob->requiresMasterEvaluation || workerJob->deferredPruning)) { - CitusGenerateDeferredQueryStrings(node, estate, eflags); - } + CitusBeginScanWithCoordinatorProcessing(node, estate, eflags); - if (distributedPlan->modLevel == ROW_MODIFY_READONLY || - distributedPlan->insertSelectQuery != NULL) - { return; } - CitusModifyBeginScan(node, estate, eflags); + CitusBeginScanWithoutCoordinatorProcessing(node, estate, eflags); } @@ -176,33 +183,38 @@ CitusExecScan(CustomScanState *node) /* - * CitusModifyBeginScan first evaluates expressions in the query and then - * performs shard pruning in case the partition column in an insert was - * defined as a function call. - * - * The function also checks the validity of the given custom scan node and - * gets locks on the shards involved in the task list of the distributed plan. + * CitusBeginScanWithoutCoordinatorProcessing is intended to work on all executions + * that do not require any coordinator processing. The function simply acquires the + * necessary locks on the shards involved in the task list of the distributed plan + * and does the placement assignements. This implies that the function is a no-op for + * SELECT queries as they do not require any locking and placement assignements. */ static void -CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) +CitusBeginScanWithoutCoordinatorProcessing(CustomScanState *node, EState *estate, int + eflags) { CitusScanState *scanState = (CitusScanState *) node; DistributedPlan *distributedPlan = scanState->distributedPlan; - /* - * If we have not already copied the plan into this context, do it now. - * Note that we could have copied the plan during CitusGenerateDeferredQueryStrings. - */ - if (GetMemoryChunkContext(distributedPlan) != CurrentMemoryContext) + if (distributedPlan->modLevel == ROW_MODIFY_READONLY || + distributedPlan->insertSelectQuery != NULL) { - distributedPlan = copyObject(distributedPlan); - - scanState->distributedPlan = distributedPlan; + return; } + /* we'll be modifying the distributed plan by assigning taskList, do it on a copy */ + distributedPlan = copyObject(distributedPlan); + scanState->distributedPlan = distributedPlan; + Job *workerJob = distributedPlan->workerJob; List *taskList = workerJob->taskList; + /* + * These more complex jobs should have been evaluated in + * CitusBeginScanWithCoordinatorProcessing. + */ + Assert(!(workerJob->requiresMasterEvaluation || workerJob->deferredPruning)); + /* prevent concurrent placement changes */ AcquireMetadataLocks(taskList); @@ -212,22 +224,17 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) /* - * CitusGenerateDeferredQueryStrings generates query strings at the start of the execution + * CitusBeginScanWithCoordinatorProcessing generates query strings at the start of the execution * in two cases: when the query requires master evaluation and/or deferred shard pruning. + * + * The function is also smart about caching plans if the plan is local to this node. */ static void -CitusGenerateDeferredQueryStrings(CustomScanState *node, EState *estate, int eflags) +CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, int eflags) { CitusScanState *scanState = (CitusScanState *) node; - - /* - * We must not change the distributed plan since it may be reused across multiple - * executions of a prepared statement. Instead we create a deep copy that we only - * use for the current execution. - */ - DistributedPlan *distributedPlan = copyObject(scanState->distributedPlan); - scanState->distributedPlan = distributedPlan; - + DistributedPlan *originalDistributedPlan = scanState->distributedPlan; + DistributedPlan *distributedPlan = CopyDistributedPlanWithoutCache(scanState); Job *workerJob = distributedPlan->workerJob; Query *jobQuery = workerJob->jobQuery; @@ -235,7 +242,6 @@ CitusGenerateDeferredQueryStrings(CustomScanState *node, EState *estate, int efl Assert(workerJob->requiresMasterEvaluation || workerJob->deferredPruning); PlanState *planState = &(scanState->customScanState.ss.ps); - EState *executorState = planState->state; /* citus only evaluates functions for modification queries */ bool modifyQueryRequiresMasterEvaluation = @@ -251,36 +257,32 @@ CitusGenerateDeferredQueryStrings(CustomScanState *node, EState *estate, int efl modifyQueryRequiresMasterEvaluation || workerJob->deferredPruning; if (shoudEvaluteFunctionsOrParams) { - distributedPlan = (scanState->distributedPlan); - scanState->distributedPlan = distributedPlan; - - workerJob = distributedPlan->workerJob; - jobQuery = workerJob->jobQuery; - + /* evaluate functions and parameters */ ExecuteMasterEvaluableFunctions(jobQuery, planState); - - /* - * We've processed parameters in ExecuteMasterEvaluableFunctions and - * don't need to send their values to workers, since they will be - * represented as constants in the deparsed query. To avoid sending - * parameter values, we set the parameter list to NULL. - */ - executorState->es_param_list_info = NULL; } /* * After evaluating the function/parameters, we're done unless shard pruning * is also deferred. */ - if (!workerJob->deferredPruning) + if (workerJob->requiresMasterEvaluation && !workerJob->deferredPruning) { RebuildQueryStrings(workerJob->jobQuery, workerJob->taskList); + /* we'll use generated strings, no need to have the parameters anymore */ + EState *executorState = planState->state; + ResetExecutionParameters(executorState); + return; } - /* at this point, we're about to do the shard pruning */ - Assert(workerJob->deferredPruning); + /* + * At this point, we're about to do the shard pruning for fast-path queries. + * Given that pruning is deferred always for INSERTs, we get here + * !EnableFastPathRouterPlanner as well. + */ + Assert(workerJob->deferredPruning && + (distributedPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner)); if (jobQuery->commandType == CMD_INSERT) { HandleDeferredShardPruningForInserts(distributedPlan); @@ -289,6 +291,191 @@ CitusGenerateDeferredQueryStrings(CustomScanState *node, EState *estate, int efl { HandleDeferredShardPruningForFastPathQueries(distributedPlan); } + + if (jobQuery->commandType != CMD_SELECT) + { + /* prevent concurrent placement changes */ + AcquireMetadataLocks(workerJob->taskList); + + /* modify tasks are always assigned using first-replica policy */ + workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList); + } + + if (list_length(distributedPlan->workerJob->taskList) != 1) + { + /* + * We might have zero shard queries or multi-row INSERTs at this point, + * we only want to cache single task queries. + */ + return; + } + + /* + * As long as the task accesses local node and the query doesn't have + * any volatile functions, we cache the local Postgres plan on the + * shard for re-use. + */ + Task *task = linitial(distributedPlan->workerJob->taskList); + if (EnableLocalExecution && TaskAccessesLocalNode(task) && + !contain_volatile_functions( + (Node *) originalDistributedPlan->workerJob->jobQuery)) + { + CacheLocalPlanForTask(task, originalDistributedPlan); + } + else + { + /* + * If we're not going to use a cached plan, we'll use the query string that is + * already generated where the parameters are replaced, so we should not have + * the parameters anymore. + */ + EState *executorState = planState->state; + ResetExecutionParameters(executorState); + } +} + + +/* + * CopyDistributedPlanWithoutCache is a helper function which copies the + * distributedPlan into the current memory context. + * + * We must not change the distributed plan since it may be reused across multiple + * executions of a prepared statement. Instead we create a deep copy that we only + * use for the current execution. + * + * We also exclude localPlannedStatements from the copyObject call for performance + * reasons, as they are immutable, so no need to have a deep copy. + */ +static DistributedPlan * +CopyDistributedPlanWithoutCache(CitusScanState *scanState) +{ + DistributedPlan *originalDistributedPlan = scanState->distributedPlan; + List *localPlannedStatements = + originalDistributedPlan->workerJob->localPlannedStatements; + originalDistributedPlan->workerJob->localPlannedStatements = NIL; + + DistributedPlan *distributedPlan = copyObject(originalDistributedPlan); + scanState->distributedPlan = distributedPlan; + + /* set back the immutable field */ + originalDistributedPlan->workerJob->localPlannedStatements = localPlannedStatements; + distributedPlan->workerJob->localPlannedStatements = localPlannedStatements; + + return distributedPlan; +} + + +/* + * ResetExecutionParameters set the parameter list to NULL. See the function + * for details. + */ +static void +ResetExecutionParameters(EState *executorState) +{ + /* + * We've processed parameters in ExecuteMasterEvaluableFunctions and + * don't need to send their values to workers, since they will be + * represented as constants in the deparsed query. To avoid sending + * parameter values, we set the parameter list to NULL. + */ + executorState->es_param_list_info = NULL; +} + + +/* + * CacheLocalPlanForTask caches a plan that is local to this node in the + * originalDistributedPlan. + * + * The basic idea is to be able to skip planning on the shards when possible. + */ +static void +CacheLocalPlanForTask(Task *task, DistributedPlan *originalDistributedPlan) +{ + PlannedStmt *localPlan = GetCachedLocalPlan(task, originalDistributedPlan); + if (localPlan != NULL) + { + /* we already have a local plan */ + return; + } + + if (list_length(task->relationShardList) == 0) + { + /* zero shard plan, no need to cache */ + return; + } + + /* + * All memory allocations should happen in the plan's context + * since we'll cache the local plan there. + */ + MemoryContext oldContext = + MemoryContextSwitchTo(GetMemoryChunkContext(originalDistributedPlan)); + + /* + * We prefer to use jobQuery (over task->query) because we don't want any + * functions/params have been evaluated in the cached plan. + */ + Query *shardQuery = copyObject(originalDistributedPlan->workerJob->jobQuery); + + UpdateRelationsToLocalShardTables((Node *) shardQuery, task->relationShardList); + + LOCKMODE lockMode = + IsModifyCommand(shardQuery) ? RowExclusiveLock : (shardQuery->hasForUpdate ? + RowShareLock : AccessShareLock); + + /* fast path queries can only have a single RTE by definition */ + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(shardQuery->rtable); + + /* + * If the shard has been created in this transction, we wouldn't see the relationId + * for it, so do not cache. + */ + if (rangeTableEntry->relid == InvalidOid) + { + pfree(shardQuery); + return; + } + + LockRelationOid(rangeTableEntry->relid, lockMode); + + LocalPlannedStatement *localPlannedStatement = CitusMakeNode(LocalPlannedStatement); + localPlan = planner(shardQuery, 0, NULL); + localPlannedStatement->localPlan = localPlan; + localPlannedStatement->shardId = task->anchorShardId; + localPlannedStatement->localGroupId = GetLocalGroupId(); + + originalDistributedPlan->workerJob->localPlannedStatements = + lappend(originalDistributedPlan->workerJob->localPlannedStatements, + localPlannedStatement); + + MemoryContextSwitchTo(oldContext); +} + + +/* + * GetCachedLocalPlan is a helper function which return the cached + * plan in the distributedPlan for the given task if exists. + * + * Otherwise, the function returns NULL. + */ +PlannedStmt * +GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan) +{ + ListCell *cachedLocalPlanCell = NULL; + List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements; + foreach(cachedLocalPlanCell, cachedPlanList) + { + LocalPlannedStatement *localPlannedStatement = lfirst(cachedLocalPlanCell); + + if (localPlannedStatement->shardId == task->anchorShardId && + localPlannedStatement->localGroupId == GetLocalGroupId()) + { + /* already have a cached plan, no need to continue */ + return localPlannedStatement->localPlan; + } + } + + return NULL; } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 41df95f62..9e1cab457 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -105,7 +105,7 @@ static void SplitLocalAndRemotePlacements(List *taskPlacementList, List **remoteTaskPlacementList); static uint64 ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *queryString); -static void LogLocalCommand(const char *command); +static void LogLocalCommand(Task *task); static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); @@ -123,6 +123,7 @@ uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) { EState *executorState = ScanStateGetExecutorState(scanState); + DistributedPlan *distributedPlan = scanState->distributedPlan; ParamListInfo paramListInfo = copyParamList(executorState->es_param_list_info); int numParams = 0; Oid *parameterTypes = NULL; @@ -143,31 +144,58 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) { Task *task = (Task *) lfirst(taskCell); - const char *shardQueryString = TaskQueryString(task); - Query *shardQuery = ParseQueryString(shardQueryString, parameterTypes, numParams); + PlannedStmt *localPlan = GetCachedLocalPlan(task, distributedPlan); /* - * We should not consider using CURSOR_OPT_FORCE_DISTRIBUTED in case of - * intermediate results in the query. That'd trigger ExecuteLocalTaskPlan() - * go through the distributed executor, which we do not want since the - * query is already known to be local. + * If the plan is already cached, don't need to re-plan, just + * acquire necessary locks. */ - int cursorOptions = 0; + if (localPlan != NULL) + { + Query *jobQuery = distributedPlan->workerJob->jobQuery; + LOCKMODE lockMode = + IsModifyCommand(jobQuery) ? RowExclusiveLock : (jobQuery->hasForUpdate ? + RowShareLock : + AccessShareLock); - /* - * Altough the shardQuery is local to this node, we prefer planner() - * over standard_planner(). The primary reason for that is Citus itself - * is not very tolarent standard_planner() calls that doesn't go through - * distributed_planner() because of the way that restriction hooks are - * implemented. So, let planner to call distributed_planner() which - * eventually calls standard_planner(). - */ - PlannedStmt *localPlan = planner(shardQuery, cursorOptions, paramListInfo); + ListCell *oidCell = NULL; + foreach(oidCell, localPlan->relationOids) + { + LockRelationOid(lfirst_oid(oidCell), lockMode); + } + } + else + { + Query *shardQuery = ParseQueryString(TaskQueryString(task), parameterTypes, + numParams); - LogLocalCommand(shardQueryString); + /* + * We should not consider using CURSOR_OPT_FORCE_DISTRIBUTED in case of + * intermediate results in the query. That'd trigger ExecuteLocalTaskPlan() + * go through the distributed executor, which we do not want since the + * query is already known to be local. + */ + int cursorOptions = 0; + + /* + * Altough the shardQuery is local to this node, we prefer planner() + * over standard_planner(). The primary reason for that is Citus itself + * is not very tolarent standard_planner() calls that doesn't go through + * distributed_planner() because of the way that restriction hooks are + * implemented. So, let planner to call distributed_planner() which + * eventually calls standard_planner(). + */ + localPlan = planner(shardQuery, cursorOptions, paramListInfo); + } + + LogLocalCommand(task); + + char *shardQueryString = task->queryStringLazy + ? task->queryStringLazy + : ""; totalRowsProcessed += - ExecuteLocalTaskPlan(scanState, localPlan, TaskQueryString(task)); + ExecuteLocalTaskPlan(scanState, localPlan, shardQueryString); } return totalRowsProcessed; @@ -482,7 +510,7 @@ ErrorIfLocalExecutionHappened(void) * meaning it is part of distributed execution. */ static void -LogLocalCommand(const char *command) +LogLocalCommand(Task *task) { if (!(LogRemoteCommands || LogLocalCommands)) { @@ -490,7 +518,7 @@ LogLocalCommand(const char *command) } ereport(NOTICE, (errmsg("executing the command locally: %s", - ApplyLogRedaction(command)))); + ApplyLogRedaction(TaskQueryString(task))))); } diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 8befa16a7..951b0b832 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -264,6 +264,81 @@ UpdateRelationToShardNames(Node *node, List *relationShardList) } +/* + * UpdateRelationsToLocalShardTables walks over the query tree and appends shard ids to + * relations. The caller is responsible for ensuring that the resulting Query can + * be executed locally. + */ +bool +UpdateRelationsToLocalShardTables(Node *node, List *relationShardList) +{ + if (node == NULL) + { + return false; + } + + /* want to look at all RTEs, even in subqueries, CTEs and such */ + if (IsA(node, Query)) + { + return query_tree_walker((Query *) node, UpdateRelationsToLocalShardTables, + relationShardList, QTW_EXAMINE_RTES_BEFORE); + } + + if (!IsA(node, RangeTblEntry)) + { + return expression_tree_walker(node, UpdateRelationsToLocalShardTables, + relationShardList); + } + + RangeTblEntry *newRte = (RangeTblEntry *) node; + + if (newRte->rtekind != RTE_RELATION) + { + return false; + } + + /* + * Search for the restrictions associated with the RTE. There better be + * some, otherwise this query wouldn't be elegible as a router query. + * + * FIXME: We should probably use a hashtable here, to do efficient + * lookup. + */ + ListCell *relationShardCell = NULL; + RelationShard *relationShard = NULL; + + foreach(relationShardCell, relationShardList) + { + relationShard = (RelationShard *) lfirst(relationShardCell); + + if (newRte->relid == relationShard->relationId) + { + break; + } + + relationShard = NULL; + } + + /* the function should only be called with local shards */ + if (relationShard == NULL) + { + return true; + } + + uint64 shardId = relationShard->shardId; + Oid relationId = relationShard->relationId; + + char *relationName = get_rel_name(relationId); + AppendShardIdToName(&relationName, shardId); + + Oid schemaId = get_rel_namespace(relationId); + + newRte->relid = get_relname_relid(relationName, schemaId); + + return false; +} + + /* * ConvertRteToSubqueryWithEmptyResult converts given relation RTE into * subquery RTE that returns no results. diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index a087f14b6..b5243252f 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -85,6 +85,7 @@ copyJobInfo(Job *newnode, Job *from) COPY_SCALAR_FIELD(requiresMasterEvaluation); COPY_SCALAR_FIELD(deferredPruning); COPY_NODE_FIELD(partitionKeyValue); + COPY_NODE_FIELD(localPlannedStatements); } @@ -266,6 +267,17 @@ CopyNodeTask(COPYFUNC_ARGS) } +void +CopyNodeLocalPlannedStatement(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(LocalPlannedStatement); + + COPY_SCALAR_FIELD(shardId); + COPY_SCALAR_FIELD(localGroupId); + COPY_NODE_FIELD(localPlan); +} + + void CopyNodeTaskExecution(COPYFUNC_ARGS) { diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index e86fcff9f..1209874a2 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -36,6 +36,7 @@ static const char *CitusNodeTagNamesD[] = { "DistributedPlan", "DistributedSubPlan", "Task", + "LocalPlannedStatement", "TaskExecution", "ShardInterval", "ShardPlacement", @@ -388,6 +389,7 @@ const ExtensibleNodeMethods nodeMethods[] = DEFINE_NODE_METHODS(RelationShard), DEFINE_NODE_METHODS(RelationRowLock), DEFINE_NODE_METHODS(Task), + DEFINE_NODE_METHODS(LocalPlannedStatement), DEFINE_NODE_METHODS(TaskExecution), DEFINE_NODE_METHODS(DeferredErrorMessage), DEFINE_NODE_METHODS(GroupShardPlacement), diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 73bbc4bae..2c55fc08c 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -327,6 +327,7 @@ OutJobFields(StringInfo str, const Job *node) WRITE_BOOL_FIELD(requiresMasterEvaluation); WRITE_BOOL_FIELD(deferredPruning); WRITE_NODE_FIELD(partitionKeyValue); + WRITE_NODE_FIELD(localPlannedStatements); } @@ -483,6 +484,19 @@ OutTask(OUTFUNC_ARGS) } +void +OutLocalPlannedStatement(OUTFUNC_ARGS) +{ + WRITE_LOCALS(LocalPlannedStatement); + + WRITE_NODE_TYPE("LocalPlannedStatement"); + + WRITE_UINT64_FIELD(shardId); + WRITE_UINT_FIELD(localGroupId); + WRITE_NODE_FIELD(localPlan); +} + + void OutTaskExecution(OUTFUNC_ARGS) { diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 34065217f..810fe5a93 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -189,6 +189,7 @@ readJobInfo(Job *local_node) READ_BOOL_FIELD(requiresMasterEvaluation); READ_BOOL_FIELD(deferredPruning); READ_NODE_FIELD(partitionKeyValue); + READ_NODE_FIELD(localPlannedStatements); } @@ -399,6 +400,19 @@ ReadTask(READFUNC_ARGS) } +READFUNC_RET +ReadLocalPlannedStatement(READFUNC_ARGS) +{ + READ_LOCALS(LocalPlannedStatement); + + READ_UINT64_FIELD(shardId); + READ_UINT_FIELD(localGroupId); + READ_NODE_FIELD(localPlan); + + READ_DONE(); +} + + READFUNC_RET ReadTaskExecution(READFUNC_ARGS) { diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index efd93147d..d9d6a49e8 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -42,4 +42,6 @@ extern EState * ScanStateGetExecutorState(CitusScanState *scanState); extern CustomScan * FetchCitusCustomScanIfExists(Plan *plan); extern bool IsCitusPlan(Plan *plan); extern bool IsCitusCustomScan(Plan *plan); + +extern PlannedStmt * GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan); #endif /* CITUS_CUSTOM_SCAN_H */ diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index 426abe4fb..e9507bfa8 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -51,6 +51,7 @@ extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS); extern READFUNC_RET ReadRelationShard(READFUNC_ARGS); extern READFUNC_RET ReadRelationRowLock(READFUNC_ARGS); extern READFUNC_RET ReadTask(READFUNC_ARGS); +extern READFUNC_RET ReadLocalPlannedStatement(READFUNC_ARGS); extern READFUNC_RET ReadTaskExecution(READFUNC_ARGS); extern READFUNC_RET ReadDeferredErrorMessage(READFUNC_ARGS); extern READFUNC_RET ReadGroupShardPlacement(READFUNC_ARGS); @@ -66,6 +67,7 @@ extern void OutShardPlacement(OUTFUNC_ARGS); extern void OutRelationShard(OUTFUNC_ARGS); extern void OutRelationRowLock(OUTFUNC_ARGS); extern void OutTask(OUTFUNC_ARGS); +extern void OutLocalPlannedStatement(OUTFUNC_ARGS); extern void OutTaskExecution(OUTFUNC_ARGS); extern void OutDeferredErrorMessage(OUTFUNC_ARGS); extern void OutGroupShardPlacement(OUTFUNC_ARGS); @@ -91,6 +93,7 @@ extern void CopyNodeGroupShardPlacement(COPYFUNC_ARGS); extern void CopyNodeRelationShard(COPYFUNC_ARGS); extern void CopyNodeRelationRowLock(COPYFUNC_ARGS); extern void CopyNodeTask(COPYFUNC_ARGS); +extern void CopyNodeLocalPlannedStatement(COPYFUNC_ARGS); extern void CopyNodeTaskExecution(COPYFUNC_ARGS); extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index b6f10d751..f1816d7c5 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -58,6 +58,7 @@ typedef enum CitusNodeTag T_DistributedPlan, T_DistributedSubPlan, T_Task, + T_LocalPlannedStatement, T_TaskExecution, T_ShardInterval, T_ShardPlacement, diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 971907099..54c176ca0 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -26,6 +26,6 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern void SetTaskQuery(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); extern char * TaskQueryString(Task *task); - +extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList); #endif /* DEPARSE_SHARD_QUERY_H */ diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 4629132ca..464d87e0e 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -24,6 +24,9 @@ extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, List **localTaskList, List **remoteTaskList); extern bool ShouldExecuteTasksLocally(List *taskList); extern void ErrorIfLocalExecutionHappened(void); +extern void SetTaskQueryAndPlacementList(Task *task, Query *query, List *placementList); +extern char * TaskQueryString(Task *task); +extern bool TaskAccessesLocalNode(Task *task); extern void DisableLocalExecution(void); extern bool AnyTaskAccessesRemoteNode(List *taskList); extern bool TaskAccessesLocalNode(Task *task); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 1b2fb894b..fef1fe60f 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -117,6 +117,21 @@ typedef enum RowModifyLevel ROW_MODIFY_NONCOMMUTATIVE = 3 } RowModifyLevel; + +/* + * LocalPlannedStatement represents a local plan of a shard. The scope + * for the LocalPlannedStatement is Task. + */ +typedef struct LocalPlannedStatement +{ + CitusNode type; + + uint64 shardId; + uint32 localGroupId; + PlannedStmt *localPlan; +} LocalPlannedStatement; + + /* * Job represents a logical unit of work that contains one set of data transfers * in our physical plan. The physical planner maps each SQL query into one or @@ -135,6 +150,9 @@ typedef struct Job bool requiresMasterEvaluation; /* only applies to modify jobs */ bool deferredPruning; Const *partitionKeyValue; + + /* for local shard queries, we may save the local plan here */ + List *localPlannedStatements; } Job; diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 5d8f755ad..2dd3247e0 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -1509,6 +1509,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM ((SELECT f RESET client_min_messages; RESET citus.log_local_commands; \c - - - :master_port +SET citus.next_shard_id TO 1480000; -- local execution with custom type SET citus.replication_model TO "streaming"; SET citus.shard_replication_factor TO 1; @@ -1531,6 +1532,13 @@ BEGIN INSERT INTO event_responses VALUES (p_event_id, p_user_id, p_choice) ON CONFLICT (event_id, user_id) DO UPDATE SET response = EXCLUDED.response; + + PERFORM count(*) FROM event_responses WHERE event_id = p_event_id; + + PERFORM count(*) FROM event_responses WHERE event_id = p_event_id AND false; + + UPDATE event_responses SET response = p_choice WHERE event_id = p_event_id; + END; $fn$; SELECT create_distributed_function('register_for_event(int,int,invite_resp)', 'p_event_id', 'event_responses'); @@ -1539,13 +1547,140 @@ SELECT create_distributed_function('register_for_event(int,int,invite_resp)', 'p (1 row) --- call 6 times to make sure it works after the 5th time(postgres binds values after the 5th time) +-- call 7 times to make sure it works after the 5th time(postgres binds values after the 5th time) +-- after 6th, the local execution caches the local plans and uses it +-- execute it both locally and remotely CALL register_for_event(16, 1, 'yes'); CALL register_for_event(16, 1, 'yes'); CALL register_for_event(16, 1, 'yes'); CALL register_for_event(16, 1, 'yes'); CALL register_for_event(16, 1, 'yes'); CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +\c - - - :worker_2_port +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +-- values 16, 17 and 19 hits the same +-- shard, so we're re-using the same cached +-- plans per statement across different distribution +-- key values +CALL register_for_event(17, 1, 'yes'); +CALL register_for_event(19, 1, 'yes'); +CALL register_for_event(17, 1, 'yes'); +CALL register_for_event(19, 1, 'yes'); +-- should work fine if the logs are enabled +\set VERBOSITY terse +SET citus.log_local_commands TO ON; +SET client_min_messages TO DEBUG2; +CALL register_for_event(19, 1, 'yes'); +NOTICE: executing the command locally: INSERT INTO public.event_responses_1480001 AS citus_table_alias (event_id, user_id, response) VALUES (19, 1, 'yes'::public.invite_resp) ON CONFLICT(event_id, user_id) DO UPDATE SET response = excluded.response +NOTICE: executing the command locally: SELECT count(*) AS count FROM public.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 19) +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS event_id, NULL::integer AS user_id, NULL::public.invite_resp AS response WHERE false) event_responses(event_id, user_id, response) WHERE ((event_id OPERATOR(pg_catalog.=) 19) AND false) +NOTICE: executing the command locally: UPDATE public.event_responses_1480001 event_responses SET response = 'yes'::public.invite_resp WHERE (event_id OPERATOR(pg_catalog.=) 19) +-- should be fine even if no parameters exists in the query +SELECT count(*) FROM event_responses WHERE event_id = 16; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: executing the command locally: SELECT count(*) AS count FROM public.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16) + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT count(*) FROM event_responses WHERE event_id = 16; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: executing the command locally: SELECT count(*) AS count FROM public.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16) + count +--------------------------------------------------------------------- + 1 +(1 row) + +UPDATE event_responses SET response = 'no' WHERE event_id = 16; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: executing the command locally: UPDATE public.event_responses_1480001 event_responses SET response = 'no'::public.invite_resp WHERE (event_id OPERATOR(pg_catalog.=) 16) +INSERT INTO event_responses VALUES (16, 666, 'maybe') +ON CONFLICT (event_id, user_id) +DO UPDATE SET response = EXCLUDED.response RETURNING *; +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: executing the command locally: INSERT INTO public.event_responses_1480001 AS citus_table_alias (event_id, user_id, response) VALUES (16, 666, 'maybe'::public.invite_resp) ON CONFLICT(event_id, user_id) DO UPDATE SET response = excluded.response RETURNING citus_table_alias.event_id, citus_table_alias.user_id, citus_table_alias.response + event_id | user_id | response +--------------------------------------------------------------------- + 16 | 666 | maybe +(1 row) + +-- multi row INSERTs hitting the same shard +INSERT INTO event_responses VALUES (16, 666, 'maybe'), (17, 777, 'no') +ON CONFLICT (event_id, user_id) +DO UPDATE SET response = EXCLUDED.response RETURNING *; +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: executing the command locally: INSERT INTO public.event_responses_1480001 AS citus_table_alias (event_id, user_id, response) VALUES (16,666,'maybe'::public.invite_resp), (17,777,'no'::public.invite_resp) ON CONFLICT(event_id, user_id) DO UPDATE SET response = excluded.response RETURNING citus_table_alias.event_id, citus_table_alias.user_id, citus_table_alias.response + event_id | user_id | response +--------------------------------------------------------------------- + 16 | 666 | maybe + 17 | 777 | no +(2 rows) + +-- now, similar tests with some settings changed +SET citus.enable_local_execution TO false; +SET citus.enable_fast_path_router_planner TO false; +CALL register_for_event(19, 1, 'yes'); +-- should be fine even if no parameters exists in the query +SELECT count(*) FROM event_responses WHERE event_id = 16; +DEBUG: Creating router plan +DEBUG: Plan is router executable + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT count(*) FROM event_responses WHERE event_id = 16; +DEBUG: Creating router plan +DEBUG: Plan is router executable + count +--------------------------------------------------------------------- + 2 +(1 row) + +UPDATE event_responses SET response = 'no' WHERE event_id = 16; +DEBUG: Creating router plan +DEBUG: Plan is router executable +INSERT INTO event_responses VALUES (16, 666, 'maybe') +ON CONFLICT (event_id, user_id) +DO UPDATE SET response = EXCLUDED.response RETURNING *; +DEBUG: Creating router plan +DEBUG: Plan is router executable + event_id | user_id | response +--------------------------------------------------------------------- + 16 | 666 | maybe +(1 row) + +-- multi row INSERTs hitting the same shard +INSERT INTO event_responses VALUES (16, 666, 'maybe'), (17, 777, 'no') +ON CONFLICT (event_id, user_id) +DO UPDATE SET response = EXCLUDED.response RETURNING *; +DEBUG: Creating router plan +DEBUG: Plan is router executable + event_id | user_id | response +--------------------------------------------------------------------- + 16 | 666 | maybe + 17 | 777 | no +(2 rows) + +\c - - - :master_port SET client_min_messages TO ERROR; SET search_path TO public; DROP SCHEMA local_shard_execution CASCADE; diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index f829733c1..7e0a06e17 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -773,7 +773,7 @@ RESET client_min_messages; RESET citus.log_local_commands; \c - - - :master_port - +SET citus.next_shard_id TO 1480000; -- local execution with custom type SET citus.replication_model TO "streaming"; SET citus.shard_replication_factor TO 1; @@ -794,18 +794,88 @@ BEGIN INSERT INTO event_responses VALUES (p_event_id, p_user_id, p_choice) ON CONFLICT (event_id, user_id) DO UPDATE SET response = EXCLUDED.response; + + PERFORM count(*) FROM event_responses WHERE event_id = p_event_id; + + PERFORM count(*) FROM event_responses WHERE event_id = p_event_id AND false; + + UPDATE event_responses SET response = p_choice WHERE event_id = p_event_id; + END; $fn$; SELECT create_distributed_function('register_for_event(int,int,invite_resp)', 'p_event_id', 'event_responses'); --- call 6 times to make sure it works after the 5th time(postgres binds values after the 5th time) +-- call 7 times to make sure it works after the 5th time(postgres binds values after the 5th time) +-- after 6th, the local execution caches the local plans and uses it +-- execute it both locally and remotely CALL register_for_event(16, 1, 'yes'); CALL register_for_event(16, 1, 'yes'); CALL register_for_event(16, 1, 'yes'); CALL register_for_event(16, 1, 'yes'); CALL register_for_event(16, 1, 'yes'); CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); + +\c - - - :worker_2_port +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); + +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); + +-- values 16, 17 and 19 hits the same +-- shard, so we're re-using the same cached +-- plans per statement across different distribution +-- key values +CALL register_for_event(17, 1, 'yes'); +CALL register_for_event(19, 1, 'yes'); +CALL register_for_event(17, 1, 'yes'); +CALL register_for_event(19, 1, 'yes'); + +-- should work fine if the logs are enabled +\set VERBOSITY terse +SET citus.log_local_commands TO ON; +SET client_min_messages TO DEBUG2; +CALL register_for_event(19, 1, 'yes'); + +-- should be fine even if no parameters exists in the query +SELECT count(*) FROM event_responses WHERE event_id = 16; +SELECT count(*) FROM event_responses WHERE event_id = 16; +UPDATE event_responses SET response = 'no' WHERE event_id = 16; +INSERT INTO event_responses VALUES (16, 666, 'maybe') +ON CONFLICT (event_id, user_id) +DO UPDATE SET response = EXCLUDED.response RETURNING *; + +-- multi row INSERTs hitting the same shard +INSERT INTO event_responses VALUES (16, 666, 'maybe'), (17, 777, 'no') +ON CONFLICT (event_id, user_id) +DO UPDATE SET response = EXCLUDED.response RETURNING *; + +-- now, similar tests with some settings changed +SET citus.enable_local_execution TO false; +SET citus.enable_fast_path_router_planner TO false; +CALL register_for_event(19, 1, 'yes'); + +-- should be fine even if no parameters exists in the query +SELECT count(*) FROM event_responses WHERE event_id = 16; +SELECT count(*) FROM event_responses WHERE event_id = 16; +UPDATE event_responses SET response = 'no' WHERE event_id = 16; +INSERT INTO event_responses VALUES (16, 666, 'maybe') +ON CONFLICT (event_id, user_id) +DO UPDATE SET response = EXCLUDED.response RETURNING *; + +-- multi row INSERTs hitting the same shard +INSERT INTO event_responses VALUES (16, 666, 'maybe'), (17, 777, 'no') +ON CONFLICT (event_id, user_id) +DO UPDATE SET response = EXCLUDED.response RETURNING *; + +\c - - - :master_port SET client_min_messages TO ERROR; SET search_path TO public;