diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 5ba60b5ad..3b3b4e1d1 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -686,7 +686,8 @@ RegenerateTaskForFasthPathQuery(Job *workerJob) relationShardList, placementList, shardId, - isLocalTableModification); + isLocalTableModification, + false); } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 18563c763..bc7f737e4 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -313,6 +313,7 @@ ExecuteLocalTaskListExtended(List *taskList, { int taskNumParams = numParams; Oid *taskParameterTypes = parameterTypes; + int taskType = GetTaskQueryType(task); if (task->parametersInQueryStringResolved) { @@ -330,7 +331,7 @@ ExecuteLocalTaskListExtended(List *taskList, * for concatenated strings, we set queryStringList so that we can access * each query string. */ - if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) + if (taskType == TASK_QUERY_TEXT_LIST) { List *queryStringList = task->taskQuery.data.queryStringList; totalRowsProcessed += @@ -342,22 +343,31 @@ ExecuteLocalTaskListExtended(List *taskList, continue; } - Query *shardQuery = ParseQueryString(TaskQueryString(task), - taskParameterTypes, - taskNumParams); + if (taskType != TASK_QUERY_LOCAL_PLAN) + { + Query *shardQuery = ParseQueryString(TaskQueryString(task), + taskParameterTypes, + taskNumParams); + int cursorOptions = CURSOR_OPT_PARALLEL_OK; - int cursorOptions = CURSOR_OPT_PARALLEL_OK; - - /* - * Altough the shardQuery is local to this node, we prefer planner() - * over standard_planner(). The primary reason for that is Citus itself - * is not very tolarent standard_planner() calls that doesn't go through - * distributed_planner() because of the way that restriction hooks are - * implemented. So, let planner to call distributed_planner() which - * eventually calls standard_planner(). - */ - localPlan = planner(shardQuery, NULL, cursorOptions, paramListInfo); + /* + * Altough the shardQuery is local to this node, we prefer planner() + * over standard_planner(). The primary reason for that is Citus itself + * is not very tolarent standard_planner() calls that doesn't go through + * distributed_planner() because of the way that restriction hooks are + * implemented. So, let planner to call distributed_planner() which + * eventually calls standard_planner(). + */ + localPlan = planner(shardQuery, NULL, cursorOptions, paramListInfo); + } + else + { + ereport(DEBUG2, (errmsg( + "Local executor: Using task's cached local plan for task %u", + task->taskId))); + localPlan = TaskQueryLocalPlan(task); + } } char *shardQueryString = NULL; diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 2542d931a..1aedbac17 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -439,6 +439,27 @@ SetTaskQueryStringList(Task *task, List *queryStringList) } +void +SetTaskQueryPlan(Task *task, Query *query, PlannedStmt *localPlan) +{ + Assert(localPlan != NULL); + task->taskQuery.queryType = TASK_QUERY_LOCAL_PLAN; + task->taskQuery.data.localCompiled = (LocalCompilation *) palloc0( + sizeof(LocalCompilation)); + task->taskQuery.data.localCompiled->query = query; + task->taskQuery.data.localCompiled->plan = localPlan; + task->queryCount = 1; +} + + +PlannedStmt * +TaskQueryLocalPlan(Task *task) +{ + Assert(task->taskQuery.queryType == TASK_QUERY_LOCAL_PLAN); + return task->taskQuery.data.localCompiled->plan; +} + + /* * DeparseTaskQuery is a general way of deparsing a query based on a task. */ @@ -524,6 +545,26 @@ TaskQueryString(Task *task) { return task->taskQuery.data.queryStringLazy; } + else if (taskQueryType == TASK_QUERY_LOCAL_PLAN) + { + Query *query = task->taskQuery.data.localCompiled->query; + Assert(query != NULL); + + /* + * Use the query of the local compilation to generate the + * query string. For local compiled tasks, the query is retained + * for this purpose, which may be EXPLAIN ANALYZing the task, or + * command logging. Generating the query string on the fly is + * acceptable because the plan of the local compilation is used + * for query execution. + */ + MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext( + query)); + UpdateRelationToShardNames((Node *) query, task->relationShardList); + MemoryContextSwitchTo(previousContext); + return AnnotateQuery(DeparseTaskQuery(task, query), + task->partitionKeyValue, task->colocationId); + } Query *jobQueryReferenceForLazyDeparsing = task->taskQuery.data.jobQueryReferenceForLazyDeparsing; diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 193e2f250..e22296ec7 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -135,13 +135,13 @@ static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo, Const *resultFormatConst); static List * OuterPlanParamsList(PlannerInfo *root); static List * CopyPlanParamList(List *originalPlanParamList); -static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void); +static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext( + FastPathRestrictionContext *fastPathContext); static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void); static void PopPlannerRestrictionContext(void); static void ResetPlannerRestrictionContext( PlannerRestrictionContext *plannerRestrictionContext); -static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, - Node *distributionKeyValue); +static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext); static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, int rteIdCounter); static RTEListProperties * GetRTEListProperties(List *rangeTableList); @@ -166,7 +166,7 @@ distributed_planner(Query *parse, { bool needsDistributedPlanning = false; bool fastPathRouterQuery = false; - Node *distributionKeyValue = NULL; + FastPathRestrictionContext fastPathContext = { 0 }; List *rangeTableList = ExtractRangeTableEntryList(parse); @@ -191,8 +191,7 @@ distributed_planner(Query *parse, &maybeHasForeignDistributedTable); if (needsDistributedPlanning) { - fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue); - + fastPathRouterQuery = FastPathRouterQuery(parse, &fastPathContext); if (maybeHasForeignDistributedTable) { WarnIfListHasForeignDistributedTable(rangeTableList); @@ -247,8 +246,9 @@ distributed_planner(Query *parse, */ HideCitusDependentObjectsOnQueriesOfPgMetaTables((Node *) parse, NULL); - /* create a restriction context and put it at the end if context list */ - planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(); + /* create a restriction context and put it at the end of context list */ + planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext( + &fastPathContext); /* * We keep track of how many times we've recursed into the planner, primarily @@ -264,7 +264,7 @@ distributed_planner(Query *parse, { if (fastPathRouterQuery) { - result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue); + result = PlanFastPathDistributedStmt(&planContext); } else { @@ -649,30 +649,21 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan) * the FastPathPlanner. */ static PlannedStmt * -PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, - Node *distributionKeyValue) +PlanFastPathDistributedStmt(DistributedPlanningContext *planContext) { FastPathRestrictionContext *fastPathContext = planContext->plannerRestrictionContext->fastPathRestrictionContext; + Assert(fastPathContext != NULL); + Assert(fastPathContext->fastPathRouterQuery); - planContext->plannerRestrictionContext->fastPathRestrictionContext-> - fastPathRouterQuery = true; + FastPathPreprocessParseTree(planContext->query); - if (distributionKeyValue == NULL) + if (!fastPathContext->delayFastPathPlanning) { - /* nothing to record */ + planContext->plan = FastPathPlanner(planContext->originalQuery, + planContext->query, + planContext->boundParams); } - else if (IsA(distributionKeyValue, Const)) - { - fastPathContext->distributionKeyValue = (Const *) distributionKeyValue; - } - else if (IsA(distributionKeyValue, Param)) - { - fastPathContext->distributionKeyHasParam = true; - } - - planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query, - planContext->boundParams); return CreateDistributedPlannedStmt(planContext); } @@ -803,6 +794,8 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) RaiseDeferredError(distributedPlan->planningError, ERROR); } + CheckAndBuildDelayedFastPathPlan(planContext, distributedPlan); + /* remember the plan's identifier for identifying subplans */ distributedPlan->planId = planId; @@ -2407,13 +2400,15 @@ CopyPlanParamList(List *originalPlanParamList) /* - * CreateAndPushPlannerRestrictionContext creates a new relation restriction context - * and a new join context, inserts it to the beginning of the - * plannerRestrictionContextList. Finally, the planner restriction context is - * inserted to the beginning of the plannerRestrictionContextList and it is returned. + * CreateAndPushPlannerRestrictionContext creates a new planner restriction + * context with an empty relation restriction context and an empty join and + * a copy of the given fast path restriction context (if present). Finally, + * the planner restriction context is inserted to the beginning of the + * global plannerRestrictionContextList and it is returned. */ static PlannerRestrictionContext * -CreateAndPushPlannerRestrictionContext(void) +CreateAndPushPlannerRestrictionContext( + FastPathRestrictionContext *fastPathRestrictionContext) { PlannerRestrictionContext *plannerRestrictionContext = palloc0(sizeof(PlannerRestrictionContext)); @@ -2427,6 +2422,21 @@ CreateAndPushPlannerRestrictionContext(void) plannerRestrictionContext->fastPathRestrictionContext = palloc0(sizeof(FastPathRestrictionContext)); + if (fastPathRestrictionContext != NULL) + { + /* copy the given fast path restriction context */ + FastPathRestrictionContext *plannersFastPathCtx = + plannerRestrictionContext->fastPathRestrictionContext; + plannersFastPathCtx->fastPathRouterQuery = + fastPathRestrictionContext->fastPathRouterQuery; + plannersFastPathCtx->distributionKeyValue = + fastPathRestrictionContext->distributionKeyValue; + plannersFastPathCtx->distributionKeyHasParam = + fastPathRestrictionContext->distributionKeyHasParam; + plannersFastPathCtx->delayFastPathPlanning = + fastPathRestrictionContext->delayFastPathPlanning; + } + plannerRestrictionContext->memoryContext = CurrentMemoryContext; /* we'll apply logical AND as we add tables */ diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 59f80bb40..f887e7b24 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -43,6 +43,7 @@ #include "pg_version_constants.h" +#include "distributed/citus_clauses.h" #include "distributed/distributed_planner.h" #include "distributed/insert_select_planner.h" #include "distributed/metadata_cache.h" @@ -53,6 +54,7 @@ #include "distributed/shardinterval_utils.h" bool EnableFastPathRouterPlanner = true; +bool EnableFastPathLocalExecutor = true; static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, @@ -61,6 +63,19 @@ static bool ConjunctionContainsColumnFilter(Node *node, Var *column, Node **distributionKeyValue); +void +FastPathPreprocessParseTree(Query *parse) +{ + /* + * Citus planner relies on some of the transformations on constant + * evaluation on the parse tree. + */ + parse->targetList = + (List *) eval_const_expressions(NULL, (Node *) parse->targetList); + parse->jointree->quals = + (Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals); +} + /* * FastPathPlanner is intended to be used instead of standard_planner() for trivial @@ -73,15 +88,6 @@ static bool ConjunctionContainsColumnFilter(Node *node, PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams) { - /* - * Citus planner relies on some of the transformations on constant - * evaluation on the parse tree. - */ - parse->targetList = - (List *) eval_const_expressions(NULL, (Node *) parse->targetList); - parse->jointree->quals = - (Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals); - PlannedStmt *result = GeneratePlaceHolderPlannedStmt(originalQuery); return result; @@ -112,10 +118,6 @@ GeneratePlaceHolderPlannedStmt(Query *parse) Plan *plan = &scanNode->plan; #endif - Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; - - Assert(FastPathRouterQuery(parse, &distKey)); - /* there is only a single relation rte */ #if PG_VERSION_NUM >= PG_VERSION_16 scanNode->scan.scanrelid = 1; @@ -150,26 +152,78 @@ GeneratePlaceHolderPlannedStmt(Query *parse) } +static void +InitializeFastPathContext(FastPathRestrictionContext *fastPathContext, + Node *distributionKeyValue, + bool canAvoidDeparse, + Query *query) +{ + Assert(fastPathContext != NULL); + Assert(!fastPathContext->fastPathRouterQuery); + Assert(!fastPathContext->delayFastPathPlanning); + + /* + * We're looking at a fast path query, so we can fill the + * fastPathContext with relevant details. + */ + fastPathContext->fastPathRouterQuery = true; + if (distributionKeyValue == NULL) + { + /* nothing to record */ + } + else if (IsA(distributionKeyValue, Const)) + { + fastPathContext->distributionKeyValue = (Const *) distributionKeyValue; + } + else if (IsA(distributionKeyValue, Param)) + { + fastPathContext->distributionKeyHasParam = true; + } + + if (EnableFastPathLocalExecutor) + { + /* + * This fast path query may be executed by the local executor. + * We need to delay the fast path planning until we know if the + * shard is local or not. Make a final check for volatile + * functions in the query tree to determine if we should delay + * the fast path planning. + */ + fastPathContext->delayFastPathPlanning = canAvoidDeparse && + !FindNodeMatchingCheckFunction( + (Node *) query, + CitusIsVolatileFunction); + } +} + + /* * FastPathRouterQuery gets a query and returns true if the query is eligible for - * being a fast path router query. + * being a fast path router query. It also fills the given fastPathContext with + * details about the query such as the distribution key value (if available), + * whether the distribution key is a parameter, and the range table entry for the + * table being queried. * The requirements for the fast path query can be listed below: * * - SELECT/UPDATE/DELETE query without CTES, sublinks-subqueries, set operations * - The query should touch only a single hash distributed or reference table * - The distribution with equality operator should be in the WHERE clause * and it should be ANDed with any other filters. Also, the distribution - * key should only exists once in the WHERE clause. So basically, + * key should only exist once in the WHERE clause. So basically, * SELECT ... FROM dist_table WHERE dist_key = X * If the filter is a const, distributionKeyValue is set * - All INSERT statements (including multi-row INSERTs) as long as the commands * don't have any sublinks/CTEs etc + * - */ bool -FastPathRouterQuery(Query *query, Node **distributionKeyValue) +FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext) { FromExpr *joinTree = query->jointree; Node *quals = NULL; + bool isFastPath = false; + bool canAvoidDeparse = false; + Node *distributionKeyValue = NULL; if (!EnableFastPathRouterPlanner) { @@ -201,6 +255,7 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) else if (query->commandType == CMD_INSERT) { /* we don't need to do any further checks, all INSERTs are fast-path */ + InitializeFastPathContext(fastPathContext, NULL, true, query); return true; } @@ -232,45 +287,55 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) Var *distributionKey = PartitionColumn(distributedTableId, 1); if (!distributionKey) { - return true; + /* Local execution may avoid a deparse on single shard distributed tables */ + canAvoidDeparse = IsCitusTableTypeCacheEntry(cacheEntry, + SINGLE_SHARD_DISTRIBUTED); + isFastPath = true; } - /* WHERE clause should not be empty for distributed tables */ - if (joinTree == NULL || - (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && joinTree->quals == - NULL)) + if (!isFastPath) { - return false; + canAvoidDeparse = IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE); + + if (joinTree == NULL || + (joinTree->quals == NULL && !canAvoidDeparse)) + { + /* no quals, not a fast path query */ + return false; + } + + quals = joinTree->quals; + if (quals != NULL && IsA(quals, List)) + { + quals = (Node *) make_ands_explicit((List *) quals); + } + + /* + * Distribution column must be used in a simple equality match check and it must be + * place at top level conjunction operator. In simple words, we should have + * WHERE dist_key = VALUE [AND ....]; + * + * We're also not allowing any other appearances of the distribution key in the quals. + * + * Overall the logic might sound fuzzy since it involves two individual checks: + * (a) Check for top level AND operator with one side being "dist_key = const" + * (b) Only allow single appearance of "dist_key" in the quals + * + * This is to simplify both of the individual checks and omit various edge cases + * that might arise with multiple distribution keys in the quals. + */ + isFastPath = (ConjunctionContainsColumnFilter(quals, distributionKey, + &distributionKeyValue) && + !ColumnAppearsMultipleTimes(quals, distributionKey)); } - /* convert list of expressions into expression tree for further processing */ - quals = joinTree->quals; - if (quals != NULL && IsA(quals, List)) + if (isFastPath) { - quals = (Node *) make_ands_explicit((List *) quals); + InitializeFastPathContext(fastPathContext, distributionKeyValue, canAvoidDeparse, + query); } - /* - * Distribution column must be used in a simple equality match check and it must be - * place at top level conjunction operator. In simple words, we should have - * WHERE dist_key = VALUE [AND ....]; - * - * We're also not allowing any other appearances of the distribution key in the quals. - * - * Overall the logic might sound fuzzy since it involves two individual checks: - * (a) Check for top level AND operator with one side being "dist_key = const" - * (b) Only allow single appearance of "dist_key" in the quals - * - * This is to simplify both of the individual checks and omit various edge cases - * that might arise with multiple distribution keys in the quals. - */ - if (ConjunctionContainsColumnFilter(quals, distributionKey, distributionKeyValue) && - !ColumnAppearsMultipleTimes(quals, distributionKey)) - { - return true; - } - - return false; + return isFastPath; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 19d386343..ab86070dd 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -16,6 +16,8 @@ #include "postgres.h" #include "access/stratnum.h" +#include "access/tupdesc.h" +#include "access/tupdesc_details.h" #include "access/xact.h" #include "catalog/pg_opfamily.h" #include "catalog/pg_proc.h" @@ -34,6 +36,7 @@ #include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/planmain.h" +#include "optimizer/planner.h" #include "optimizer/restrictinfo.h" #include "parser/parse_oper.h" #include "parser/parsetree.h" @@ -81,6 +84,7 @@ #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "distributed/shard_pruning.h" +#include "distributed/shard_utils.h" #include "distributed/shardinterval_utils.h" /* intermediate value for INSERT processing */ @@ -164,7 +168,7 @@ static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, bool isLocalTableModification, Const *partitionKeyValue, - int colocationId); + int colocationId, bool delayedFastPath); static bool RowLocksOnRelations(Node *node, List **rtiLockList); static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType @@ -173,7 +177,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList); static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList); static bool IsLocallyAccessibleCitusLocalTable(Oid relationId); - +static bool ConvertToQueryOnShard(Query *query, Oid relationID, Oid shardRelationId); /* * CreateRouterPlan attempts to create a router executor plan for the given @@ -1940,7 +1944,9 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon { GenerateSingleShardRouterTaskList(job, relationShardList, placementList, shardId, - isLocalTableModification); + isLocalTableModification, + fastPathRestrictionContext-> + delayFastPathPlanning); } job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation; @@ -1948,6 +1954,258 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon } +/* + * CheckAttributesMatch checks if the attributes of the Citus table and the shard + * table match. + * + * It is used to ensure that the shard table has the same schema as the Citus + * table before replacing the Citus table OID with the shard table OID in the + * parse tree we (Citus planner) recieved from Postgres. + */ +static +bool +CheckAttributesMatch(Oid citusTableId, Oid shardTableId) +{ + Relation citusR, shardR; + bool same_schema = false; + + citusR = RelationIdGetRelation(citusTableId); + shardR = RelationIdGetRelation(shardTableId); + + if (RelationIsValid(citusR) && RelationIsValid(shardR)) + { + TupleDesc citusTupDesc = citusR->rd_att; + TupleDesc shardTupDesc = shardR->rd_att; + + if (citusTupDesc->natts == shardTupDesc->natts) + { + /* + * Do an attribute-by-attribute comparison. This is borrowed from + * the Postgres function equalTupleDescs(), which we cannot use + * because the citus table and shard table have different composite + * types. + */ + same_schema = true; + for (int i = 0; i < citusTupDesc->natts && same_schema; i++) + { + Form_pg_attribute attr1 = TupleDescAttr(citusTupDesc, i); + Form_pg_attribute attr2 = TupleDescAttr(shardTupDesc, i); + + if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0) + { + same_schema = false; + } + if (attr1->atttypid != attr2->atttypid) + { + same_schema = false; + } + if (attr1->atttypmod != attr2->atttypmod) + { + same_schema = false; + } + if (attr1->attcollation != attr2->attcollation) + { + same_schema = false; + } + + /* Record types derived from tables could have dropped fields. */ + if (attr1->attisdropped != attr2->attisdropped) + { + same_schema = false; + } + } + } + } + + RelationClose(citusR); + RelationClose(shardR); + return same_schema; +} + + +/* + * CheckAndBuildDelayedFastPathPlan() - if the query being planned is a fast + * path query, not marked for deferred pruning and the placement for the task + * is not a dummy placement then if the placement is local to this node we can + * take a shortcut of replacing the OID of the citus table with the OID of the + * shard in the query tree and plan that directly, instead of deparsing the + * parse tree to a SQL query on the shard and parsing and planning that in + * the local executor. Instead, the local executor can use the plan created + * here. + */ +void +CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext, + DistributedPlan *plan) +{ + FastPathRestrictionContext *fastPathContext = + planContext->plannerRestrictionContext->fastPathRestrictionContext; + + if (!fastPathContext->delayFastPathPlanning) + { + return; + } + + Job *job = plan->workerJob; + Assert(job != NULL); + + if (job->deferredPruning) + { + /* Execution time pruning => don't know which shard at this point */ + planContext->plan = FastPathPlanner(planContext->originalQuery, + planContext->query, + planContext->boundParams); + return; + } + + List *tasks = job->taskList; + Assert(list_length(tasks) == 1); + Task *task = (Task *) linitial(tasks); + List *placements = task->taskPlacementList; + Assert(list_length(placements) > 0); + int32 localGroupId = GetLocalGroupId(); + ShardPlacement *primaryPlacement = (ShardPlacement *) linitial(placements); + + bool isLocalExecution = !IsDummyPlacement(primaryPlacement) && + (primaryPlacement->groupId == localGroupId); + bool canBuildLocalPlan = true; + + if (isLocalExecution) + { + List *relationShards = task->relationShardList; + Assert(list_length(relationShards) == 1); + RelationShard *relationShard = (RelationShard *) linitial(relationShards); + Assert(relationShard->shardId == primaryPlacement->shardId); + + canBuildLocalPlan = ConvertToQueryOnShard(planContext->query, + relationShard->relationId, + relationShard->shardId); + if (canBuildLocalPlan) + { + /* Plan the query with the new shard relation id */ + planContext->plan = standard_planner(planContext->query, NULL, + planContext->cursorOptions, + planContext->boundParams); + SetTaskQueryPlan(task, job->jobQuery, planContext->plan); + + ereport(DEBUG2, (errmsg( + "Fast-path router query: created local execution plan " + "to avoid deparse to and compile of shard query"))); + return; + } + } + + /* + * Either the shard is not local to this node, or it was not safe to replace + * the OIDs in the parse tree; in any case we fall back to generating the shard + * query and compiling that. + */ + Assert(!isLocalExecution || (isLocalExecution && !canBuildLocalPlan)); + + /* Fall back to fast path planner and generating SQL query on the shard */ + planContext->plan = FastPathPlanner(planContext->originalQuery, + planContext->query, + planContext->boundParams); + UpdateRelationToShardNames((Node *) job->jobQuery, task->relationShardList); + SetTaskQueryIfShouldLazyDeparse(task, job->jobQuery); +} + + +/* + * ConvertToQueryOnShard() converts the given query on a citus table (identified by + * citusTableOid) to a query on a shard (identified by shardId). + * + * The function assumes that the query is a "fast path" query - it has only one + * RangeTblEntry and one RTEPermissionInfo. + * + * It acquires the same lock on the shard that was acquired on the citus table + * by the Postgres parser. It checks that the attribute numbers and metadata of + * the shard table and citus table are identical - otherwise it is not safe + * to proceed with this shortcut. Assuming the attributes do match, the actual + * conversion involves changing the target list entries that reference the + * citus table's oid to reference the shard's relation id instead. Finally, + * it changes the RangeTblEntry's relid to the shard's relation id and (PG16+) + * changes the RTEPermissionInfo's relid to the shard's relation id also. + * At this point the Query is ready for the postgres planner. + */ +static bool +ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId) +{ + Assert(list_length(query->rtable) == 1); + RangeTblEntry *citusTableRte = (RangeTblEntry *) linitial(query->rtable); + Assert(citusTableRte->relid == citusTableOid); + Assert(list_length(query->rteperminfos) == 1); + + const char *citusTableName = get_rel_name(citusTableOid); + Assert(citusTableName != NULL); + + /* construct shard relation name */ + char *shardRelationName = pstrdup(citusTableName); + AppendShardIdToName(&shardRelationName, shardId); + + /* construct the schema name */ + char *schemaName = get_namespace_name(get_rel_namespace(citusTableOid)); + + /* now construct a range variable for the shard */ + RangeVar shardRangeVar = { + .relname = shardRelationName, + .schemaname = schemaName, + .inh = citusTableRte->inh, + .relpersistence = RELPERSISTENCE_PERMANENT, + }; + + /* Must apply the same lock to the shard that was applied to the citus table */ + Oid shardRelationId = RangeVarGetRelidExtended(&shardRangeVar, + citusTableRte->rellockmode, + 0, NULL, NULL); /* todo - use suitable callback for perms check? */ + + /* Verify that the attributes of citus table and shard table match */ + if (!CheckAttributesMatch(citusTableOid, shardRelationId)) + { + /* There is a difference between the attributes of the citus + * table and the shard table. This can happen if there is a DROP + * COLUMN on the citus table. In this case, we cannot + * convert the query to a shard query, so clean up and return. + */ + UnlockRelationOid(shardRelationId, citusTableRte->rellockmode); + ereport(DEBUG2, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg( + "Router planner fast path cannot modify parse tree for local execution: shard table \"%s.%s\" does not match the " + "distributed table \"%s.%s\"", + schemaName, shardRelationName, schemaName, + citusTableName))); + pfree(shardRelationName); + pfree(schemaName); + + return false; + } + + /* Change the target list entries that reference the original citus table's relation id */ + ListCell *lc = NULL; + foreach(lc, query->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(lc); + if (targetEntry->resorigtbl == citusTableOid) + { + targetEntry->resorigtbl = shardRelationId; + } + } + + /* Change the range table entry's oid to that of the shard's */ + Assert(shardRelationId != InvalidOid); + citusTableRte->relid = shardRelationId; + +#if PG_VERSION_NUM >= PG_VERSION_16 + + /* Change the range table permission oid to that of the shard's (PG16+) */ + Assert(list_length(query->rteperminfos) == 1); + RTEPermissionInfo *rtePermInfo = (RTEPermissionInfo *) linitial(query->rteperminfos); + rtePermInfo->relid = shardRelationId; +#endif + + return true; +} + + /* * GenerateSingleShardRouterTaskList is a wrapper around other corresponding task * list generation functions specific to single shard selects and modifications. @@ -1957,7 +2215,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon void GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, List *placementList, uint64 shardId, bool - isLocalTableModification) + isLocalTableModification, bool delayedFastPath) { Query *originalQuery = job->jobQuery; @@ -1970,7 +2228,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, shardId, job->parametersInJobQueryResolved, isLocalTableModification, - job->partitionKeyValue, job->colocationId); + job->partitionKeyValue, job->colocationId, + delayedFastPath); /* * Queries to reference tables, or distributed tables with multiple replica's have @@ -2001,7 +2260,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, shardId, job->parametersInJobQueryResolved, isLocalTableModification, - job->partitionKeyValue, job->colocationId); + job->partitionKeyValue, job->colocationId, + delayedFastPath); } } @@ -2096,7 +2356,7 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, bool isLocalTableModification, Const *partitionKeyValue, - int colocationId) + int colocationId, bool delayedFastPath) { TaskType taskType = READ_TASK; char replicationModel = 0; @@ -2168,7 +2428,10 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, task->taskPlacementList = placementList; task->partitionKeyValue = partitionKeyValue; task->colocationId = colocationId; - SetTaskQueryIfShouldLazyDeparse(task, query); + if (!delayedFastPath) + { + SetTaskQueryIfShouldLazyDeparse(task, query); + } task->anchorShardId = shardId; task->jobId = jobId; task->relationShardList = relationShardList; @@ -2449,10 +2712,15 @@ PlanRouterQuery(Query *originalQuery, /* * If this is an UPDATE or DELETE query which requires coordinator evaluation, - * don't try update shard names, and postpone that to execution phase. + * don't try update shard names, and postpone that to execution phase. Also, if + * this is a delayed fast path query, we don't update the shard names + * either, as the shard names will be updated in the fast path query planner. */ bool isUpdateOrDelete = UpdateOrDeleteOrMergeQuery(originalQuery); - if (!(isUpdateOrDelete && RequiresCoordinatorEvaluation(originalQuery))) + bool delayedFastPath = + plannerRestrictionContext->fastPathRestrictionContext->delayFastPathPlanning; + if (!(isUpdateOrDelete && RequiresCoordinatorEvaluation(originalQuery)) && + !delayedFastPath) { UpdateRelationToShardNames((Node *) originalQuery, *relationShardList); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 430eb8555..29d7c4d6b 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -212,6 +212,7 @@ static const char * MaxSharedPoolSizeGucShowHook(void); static const char * LocalPoolSizeGucShowHook(void); static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source); +static bool WarnIfLocalExecutionDisabled(bool *newval, void **extra, GucSource source); static void CitusAuthHook(Port *port, int status); static bool IsSuperuser(char *userName); static void AdjustDynamicLibraryPathForCdcDecoders(void); @@ -1377,6 +1378,17 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_local_execution_local_plan", + gettext_noop("Enables the planner to avoid a query deparse and planning if " + "the shard is local to the current node."), + NULL, + &EnableFastPathLocalExecutor, + true, + PGC_USERSET, + GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, + WarnIfLocalExecutionDisabled, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_local_reference_table_foreign_keys", gettext_noop("Enables foreign keys from/to local tables"), @@ -2802,6 +2814,21 @@ WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source) } +static bool +WarnIfLocalExecutionDisabled(bool *newval, void **extra, GucSource source) +{ + if (*newval == true && EnableLocalExecution == false) + { + ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg( + "citus.enable_local_execution must be set in order for " + "citus.enable_local_execution_local_plan to be effective."))); + } + + return true; +} + + /* * NoticeIfSubqueryPushdownEnabled prints a notice when a user sets * citus.subquery_pushdown to ON. It doesn't print the notice if the diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 4b4a334c8..51716cff3 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -287,6 +287,15 @@ CopyTaskQuery(Task *newnode, Task *from) break; } + case TASK_QUERY_LOCAL_PLAN: + { + newnode->taskQuery.data.localCompiled = + (LocalCompilation *) palloc0(sizeof(LocalCompilation)); + COPY_NODE_FIELD(taskQuery.data.localCompiled->plan); + COPY_NODE_FIELD(taskQuery.data.localCompiled->query); + break; + } + default: { break; diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 8fb012588..efcdb3032 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -27,7 +27,9 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryStringList(Task *task, List *queryStringList); +extern void SetTaskQueryPlan(Task *task, Query *query, PlannedStmt *localPlan); extern char * TaskQueryString(Task *task); +extern PlannedStmt * TaskQueryLocalPlan(Task *task); extern char * TaskQueryStringAtIndex(Task *task, int index); extern int GetTaskQueryType(Task *task); extern void AddInsertAliasIfNeeded(Query *query); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 33a9c2fa8..f416aa911 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -99,6 +99,12 @@ typedef struct FastPathRestrictionContext * Set to true when distKey = Param; in the queryTree */ bool distributionKeyHasParam; + + /* + * Indicates to hold off calling the fast path planner until its + * known if the shard is local or not. + */ + bool delayFastPathPlanning; } FastPathRestrictionContext; typedef struct PlannerRestrictionContext diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 475a41b37..e5ec2205d 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -174,9 +174,16 @@ typedef enum TaskQueryType TASK_QUERY_NULL, TASK_QUERY_TEXT, TASK_QUERY_OBJECT, - TASK_QUERY_TEXT_LIST + TASK_QUERY_TEXT_LIST, + TASK_QUERY_LOCAL_PLAN, } TaskQueryType; +typedef struct LocalCompilation +{ + PlannedStmt *plan; /* the local plan for this task */ + Query *query; /* query to deparse for EXPLAIN ANALYZE or local command logging */ +} LocalCompilation; + typedef struct TaskQuery { TaskQueryType queryType; @@ -219,6 +226,15 @@ typedef struct TaskQuery * when we want to access each query string. */ List *queryStringList; + + /* + * For tasks that can be executed locally, this field contains the + * local plan for the task. This is only set when the shard that the + * task is assigned to is local to the node that executes the task. + * The query field is used to deparse the query for EXPLAIN ANALYZE + * or local command logging. + */ + LocalCompilation *localCompiled; /* only applies to local tasks */ }data; }TaskQuery; diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 44be2736e..6f5a22f6f 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -28,6 +28,7 @@ extern bool EnableRouterExecution; extern bool EnableFastPathRouterPlanner; +extern bool EnableFastPathLocalExecutor; extern bool EnableNonColocatedRouterQueryPushdown; @@ -91,16 +92,19 @@ extern void GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, List *placementList, uint64 shardId, - bool isLocalTableModification); + bool isLocalTableModification, + bool delayedFastPath); /* * FastPathPlanner is a subset of router planner, that's why we prefer to * keep the external function here. */extern PlannedStmt * GeneratePlaceHolderPlannedStmt(Query *parse); +extern void FastPathPreprocessParseTree(Query *parse); extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams); -extern bool FastPathRouterQuery(Query *query, Node **distributionKeyValue); +extern bool FastPathRouterQuery(Query *query, + FastPathRestrictionContext *fastPathContext); extern bool JoinConditionIsOnFalse(List *relOptInfo); extern Oid ResultRelationOidForQuery(Query *query); extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId, @@ -120,5 +124,7 @@ extern Job * RouterJob(Query *originalQuery, DeferredErrorMessage **planningError); extern bool ContainsOnlyLocalOrReferenceTables(RTEListProperties *rteProperties); extern RangeTblEntry * ExtractSourceResultRangeTableEntry(Query *query); +extern void CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext, + DistributedPlan *plan); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/test/regress/expected/local_execution_local_plan.out b/src/test/regress/expected/local_execution_local_plan.out new file mode 100644 index 000000000..517548987 --- /dev/null +++ b/src/test/regress/expected/local_execution_local_plan.out @@ -0,0 +1,236 @@ +-- Test local execution with local plan in a sharded environment. +-- This is an enhancement to local execution where instead of deparsing +-- and compiling the shard query, the planner replaces the OID of the +-- distributed table with the OID of the local shard in the parse tree +-- and plans that. +-- +-- https://github.com/citusdata/citus/pull/8035 +CREATE SCHEMA local_shard_execution_local_plan; +SET search_path TO local_shard_execution_local_plan; +SET citus.next_shard_id TO 86000000; +-- Test row-based sharding +SET citus.shard_replication_factor TO 1; +CREATE TABLE test_tbl (a int, b int, data_f double precision); +SELECT create_distributed_table('test_tbl', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT setseed(0.42); -- make the random data inserted deterministic + setseed +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test_tbl +SELECT (random()*20)::int AS a, + (random()*20)::int AS b, + random()*10000.0 AS data_f +FROM generate_series(1, 10000); +-- Put the shard on worker 1 to ensure consistent test output across different schedules +SET client_min_messages to ERROR; -- suppress warning if shard is already on worker 1 +SELECT citus_move_shard_placement(86000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + +RESET client_min_messages; +\c - - - :worker_1_port +SET search_path TO local_shard_execution_local_plan; +SET client_min_messages TO DEBUG2; +SET citus.log_remote_commands TO ON; +SET citus.log_local_commands TO ON; +-- This query resolves to a single shard (aka fast path) +-- which is located on worker_1; with client_min_messages +-- at DEBUG2 we see a message that the planner is avoiding +-- query deparse and plan +SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1) +FROM test_tbl +WHERE a = 8 AND b IN (1,3,5,8,13,21) +GROUP BY b +ORDER BY b; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query +DEBUG: query has a single distribution column value: 8 +NOTICE: executing the command locally: SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM local_shard_execution_local_plan.test_tbl_86000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b +DEBUG: Local executor: Using task's cached local plan for task 0 + b | avg | min | max | count +--------------------------------------------------------------------- + 1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21 + 3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23 + 5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25 + 8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27 + 13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18 +(5 rows) + +SET citus.enable_local_execution_local_plan TO OFF; +-- With local execution local plan disabled, the same query +-- does query deparse and planning of the shard query and +-- provides the same results +SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1) +FROM test_tbl +WHERE a = 8 AND b IN (1,3,5,8,13,21) +GROUP BY b +ORDER BY b; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 8 +NOTICE: executing the command locally: SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM local_shard_execution_local_plan.test_tbl_86000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b + b | avg | min | max | count +--------------------------------------------------------------------- + 1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21 + 3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23 + 5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25 + 8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27 + 13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18 +(5 rows) + +\c - - - :worker_2_port +SET search_path TO local_shard_execution_local_plan; +SET client_min_messages TO DEBUG2; +SET citus.log_remote_commands TO ON; +SET citus.log_local_commands TO ON; +-- Run the same query on the other worker - the local +-- execution path is not taken because the shard is not +-- local to this worker +SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1) +FROM test_tbl +WHERE a = 8 AND b IN (1,3,5,8,13,21) +GROUP BY b +ORDER BY b; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 8 +NOTICE: issuing SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM local_shard_execution_local_plan.test_tbl_86000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + b | avg | min | max | count +--------------------------------------------------------------------- + 1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21 + 3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23 + 5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25 + 8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27 + 13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18 +(5 rows) + +\c - - - :master_port +DROP SCHEMA local_shard_execution_local_plan CASCADE; +NOTICE: drop cascades to table local_shard_execution_local_plan.test_tbl +-- Now test local execution with local plan for a schema sharded table. +SET citus.enable_schema_based_sharding to on; +CREATE SCHEMA schema_sharding_test; +SET search_path TO schema_sharding_test; +SET citus.next_shard_id TO 87000000; +SET citus.shard_replication_factor TO 1; +CREATE TABLE test_tbl (a int, b int, data_f double precision); +SELECT setseed(0.42); -- make the random data inserted deterministic + setseed +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test_tbl +SELECT (random()*20)::int AS a, + (random()*20)::int AS b, + random()*10000.0 AS data_f +FROM generate_series(1, 10000); +-- Put the shard on worker 2 to ensure consistent test output across different schedules +SET client_min_messages to ERROR; -- suppress warning if shard is already on worker 2 +SELECT citus_move_shard_placement(87000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + +RESET client_min_messages; +\c - - - :worker_1_port +SET client_min_messages TO DEBUG2; +SET citus.log_remote_commands TO ON; +SET citus.log_local_commands TO ON; +-- Run the test query on worker_1; with schema based sharding +-- the data is not local to this worker so local execution +-- path is not taken. +SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1) +FROM schema_sharding_test.test_tbl +WHERE a = 8 AND b IN (1,3,5,8,13,21) +GROUP BY b +ORDER BY b; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +NOTICE: issuing SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM schema_sharding_test.test_tbl_87000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + b | avg | min | max | count +--------------------------------------------------------------------- + 1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21 + 3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23 + 5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25 + 8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27 + 13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18 +(5 rows) + +\c - - - :worker_2_port +SET client_min_messages TO DEBUG2; +SET citus.log_remote_commands TO ON; +SET citus.log_local_commands TO ON; +-- Run the test query on worker_2; with schema based sharding +-- the data is local to this worker so local execution +-- path is taken, and the planner avoids query deparse and +-- planning of the shard query. +SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1) +FROM schema_sharding_test.test_tbl +WHERE a = 8 AND b IN (1,3,5,8,13,21) +GROUP BY b +ORDER BY b; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query +NOTICE: executing the command locally: SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM schema_sharding_test.test_tbl_87000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b +DEBUG: Local executor: Using task's cached local plan for task 0 + b | avg | min | max | count +--------------------------------------------------------------------- + 1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21 + 3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23 + 5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25 + 8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27 + 13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18 +(5 rows) + +SET citus.enable_local_execution_local_plan TO OFF; +-- Run the test query on worker_2 but with local execution +-- local plan disabled; now the planner does query deparse +-- and planning of the shard query. +SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1) +FROM schema_sharding_test.test_tbl +WHERE a = 8 AND b IN (1,3,5,8,13,21) +GROUP BY b +ORDER BY b; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +NOTICE: executing the command locally: SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM schema_sharding_test.test_tbl_87000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b + b | avg | min | max | count +--------------------------------------------------------------------- + 1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21 + 3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23 + 5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25 + 8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27 + 13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18 +(5 rows) + +\c - - - :master_port +DROP SCHEMA schema_sharding_test CASCADE; +NOTICE: drop cascades to table schema_sharding_test.test_tbl +RESET ALL; diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 2b1fa3c0b..24791715a 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -2410,8 +2410,10 @@ NOTICE: executing the command locally: UPDATE local_shard_execution.event_respo SELECT count(*) FROM event_responses WHERE event_id = 16; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan +DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query DEBUG: query has a single distribution column value: 16 NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16) +DEBUG: Local executor: Using task's cached local plan for task 0 count --------------------------------------------------------------------- 1 @@ -2420,8 +2422,10 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar SELECT count(*) FROM event_responses WHERE event_id = 16; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan +DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query DEBUG: query has a single distribution column value: 16 NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16) +DEBUG: Local executor: Using task's cached local plan for task 0 count --------------------------------------------------------------------- 1 @@ -2430,8 +2434,10 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar UPDATE event_responses SET response = 'no' WHERE event_id = 16; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan +DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query DEBUG: query has a single distribution column value: 16 NOTICE: executing the command locally: UPDATE local_shard_execution.event_responses_1480001 event_responses SET response = 'no'::local_shard_execution.invite_resp WHERE (event_id OPERATOR(pg_catalog.=) 16) +DEBUG: Local executor: Using task's cached local plan for task 0 INSERT INTO event_responses VALUES (16, 666, 'maybe') ON CONFLICT (event_id, user_id) DO UPDATE SET response = EXCLUDED.response RETURNING *; @@ -2529,8 +2535,10 @@ SET citus.log_remote_commands TO ON; SELECT * FROM event_responses_no_pkey WHERE event_id = 2; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan +DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query DEBUG: query has a single distribution column value: 2 NOTICE: executing the command locally: SELECT event_id, user_id, response FROM local_shard_execution.event_responses_no_pkey_1480007 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 2) +DEBUG: Local executor: Using task's cached local plan for task 0 event_id | user_id | response --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/expected/local_shard_execution_dropped_column.out b/src/test/regress/expected/local_shard_execution_dropped_column.out index 6a2fe1b0b..271d03455 100644 --- a/src/test/regress/expected/local_shard_execution_dropped_column.out +++ b/src/test/regress/expected/local_shard_execution_dropped_column.out @@ -193,8 +193,102 @@ execute p4(8); NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8) execute p4(8); NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8) +-- Test that "Avoid deparse and planning of shard query for local execution" (*) +-- does not take the fast path of modifying the parse tree with the shard OID, as +-- the dropped column means the attribute check between the distributed table and +-- shard fails. With client_min_messages at DEBUG2, we see "cannot modify parse tree +-- for local execution", indicating that router planning has detected the difference. +-- +-- (*) https://github.com/citusdata/citus/pull/8035 +SET client_min_messages to DEBUG2; +prepare p5(int) as SELECT count(*) FROM t1 WHERE c = 8 and a = $1 GROUP BY c; +execute p5(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1" +DEBUG: query has a single distribution column value: 8 +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- +(0 rows) + +execute p5(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1" +DEBUG: query has a single distribution column value: 8 +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- +(0 rows) + +execute p5(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1" +DEBUG: query has a single distribution column value: 8 +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- +(0 rows) + +execute p5(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1" +DEBUG: query has a single distribution column value: 8 +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- +(0 rows) + +execute p5(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1" +DEBUG: query has a single distribution column value: 8 +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- +(0 rows) + +execute p5(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1" +DEBUG: query has a single distribution column value: 8 +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- +(0 rows) + +execute p5(5); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- +(0 rows) + +execute p5(5); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- +(0 rows) + +execute p5(5); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- +(0 rows) + +execute p5(5); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- +(0 rows) + +RESET client_min_messages; \c - - - :master_port --- one another combination is that the shell table +-- one other combination is that the shell table -- has a dropped column but not the shard, via rebalance operation SET search_path TO local_shard_execution_dropped_column; ALTER TABLE t1 DROP COLUMN a; @@ -204,6 +298,12 @@ SELECT citus_move_shard_placement(2460000, 'localhost', :worker_1_port, 'localho (1 row) +SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + \c - - - :worker_2_port SET search_path TO local_shard_execution_dropped_column; -- show the dropped columns @@ -331,6 +431,108 @@ execute p3(8); NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING execute p3(8); NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING +-- Test that "Avoid deparse and planning of shard query for local execution" +-- does not take the fast path of modifying the parse tree with the shard OID +-- for this scenario (rebalance) also. +-- +-- (*) https://github.com/citusdata/citus/pull/8035 +SET client_min_messages to DEBUG2; +prepare p4(int) as SELECT count(*) FROM t1 WHERE c = 8 and 5 = $1 GROUP BY c; +execute p4(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1" +DEBUG: query has a single distribution column value: 8 +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p4(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1" +DEBUG: query has a single distribution column value: 8 +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p4(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1" +DEBUG: query has a single distribution column value: 8 +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p4(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1" +DEBUG: query has a single distribution column value: 8 +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p4(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1" +DEBUG: query has a single distribution column value: 8 +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p4(5); +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1" +DEBUG: query has a single distribution column value: 8 +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p4(5); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p4(5); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p4(5); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +execute p4(5); +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c + count +--------------------------------------------------------------------- + 1 +(1 row) + +RESET client_min_messages; \c - - - :master_port DROP SCHEMA local_shard_execution_dropped_column CASCADE; NOTICE: drop cascades to table local_shard_execution_dropped_column.t1 diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 6654b4ab0..487552663 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -70,6 +70,7 @@ test: metadata_sync_helpers test: issue_6592 test: executor_local_failure +test: local_execution_local_plan # test that no tests leaked intermediate results. This should always be last test: ensure_no_intermediate_data_leak diff --git a/src/test/regress/sql/local_execution_local_plan.sql b/src/test/regress/sql/local_execution_local_plan.sql new file mode 100644 index 000000000..8d2138337 --- /dev/null +++ b/src/test/regress/sql/local_execution_local_plan.sql @@ -0,0 +1,149 @@ +-- Test local execution with local plan in a sharded environment. +-- This is an enhancement to local execution where instead of deparsing +-- and compiling the shard query, the planner replaces the OID of the +-- distributed table with the OID of the local shard in the parse tree +-- and plans that. +-- +-- https://github.com/citusdata/citus/pull/8035 + +CREATE SCHEMA local_shard_execution_local_plan; +SET search_path TO local_shard_execution_local_plan; + +SET citus.next_shard_id TO 86000000; + +-- Test row-based sharding + +SET citus.shard_replication_factor TO 1; +CREATE TABLE test_tbl (a int, b int, data_f double precision); +SELECT create_distributed_table('test_tbl', 'a'); +SELECT setseed(0.42); -- make the random data inserted deterministic +INSERT INTO test_tbl +SELECT (random()*20)::int AS a, + (random()*20)::int AS b, + random()*10000.0 AS data_f +FROM generate_series(1, 10000); + +-- Put the shard on worker 1 to ensure consistent test output across different schedules +SET client_min_messages to ERROR; -- suppress warning if shard is already on worker 1 +SELECT citus_move_shard_placement(86000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); +SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests +RESET client_min_messages; + +\c - - - :worker_1_port +SET search_path TO local_shard_execution_local_plan; + +SET client_min_messages TO DEBUG2; +SET citus.log_remote_commands TO ON; +SET citus.log_local_commands TO ON; + +-- This query resolves to a single shard (aka fast path) +-- which is located on worker_1; with client_min_messages +-- at DEBUG2 we see a message that the planner is avoiding +-- query deparse and plan +SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1) +FROM test_tbl +WHERE a = 8 AND b IN (1,3,5,8,13,21) +GROUP BY b +ORDER BY b; + +SET citus.enable_local_execution_local_plan TO OFF; + +-- With local execution local plan disabled, the same query +-- does query deparse and planning of the shard query and +-- provides the same results +SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1) +FROM test_tbl +WHERE a = 8 AND b IN (1,3,5,8,13,21) +GROUP BY b +ORDER BY b; + +\c - - - :worker_2_port +SET search_path TO local_shard_execution_local_plan; + +SET client_min_messages TO DEBUG2; +SET citus.log_remote_commands TO ON; +SET citus.log_local_commands TO ON; + +-- Run the same query on the other worker - the local +-- execution path is not taken because the shard is not +-- local to this worker +SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1) +FROM test_tbl +WHERE a = 8 AND b IN (1,3,5,8,13,21) +GROUP BY b +ORDER BY b; + +\c - - - :master_port + +DROP SCHEMA local_shard_execution_local_plan CASCADE; + +-- Now test local execution with local plan for a schema sharded table. + +SET citus.enable_schema_based_sharding to on; +CREATE SCHEMA schema_sharding_test; +SET search_path TO schema_sharding_test; + +SET citus.next_shard_id TO 87000000; + +SET citus.shard_replication_factor TO 1; +CREATE TABLE test_tbl (a int, b int, data_f double precision); +SELECT setseed(0.42); -- make the random data inserted deterministic +INSERT INTO test_tbl +SELECT (random()*20)::int AS a, + (random()*20)::int AS b, + random()*10000.0 AS data_f +FROM generate_series(1, 10000); + +-- Put the shard on worker 2 to ensure consistent test output across different schedules +SET client_min_messages to ERROR; -- suppress warning if shard is already on worker 2 +SELECT citus_move_shard_placement(87000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes'); +SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests +RESET client_min_messages; + +\c - - - :worker_1_port + +SET client_min_messages TO DEBUG2; +SET citus.log_remote_commands TO ON; +SET citus.log_local_commands TO ON; + +-- Run the test query on worker_1; with schema based sharding +-- the data is not local to this worker so local execution +-- path is not taken. +SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1) +FROM schema_sharding_test.test_tbl +WHERE a = 8 AND b IN (1,3,5,8,13,21) +GROUP BY b +ORDER BY b; + +\c - - - :worker_2_port + +SET client_min_messages TO DEBUG2; +SET citus.log_remote_commands TO ON; +SET citus.log_local_commands TO ON; + +-- Run the test query on worker_2; with schema based sharding +-- the data is local to this worker so local execution +-- path is taken, and the planner avoids query deparse and +-- planning of the shard query. + +SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1) +FROM schema_sharding_test.test_tbl +WHERE a = 8 AND b IN (1,3,5,8,13,21) +GROUP BY b +ORDER BY b; + +SET citus.enable_local_execution_local_plan TO OFF; + +-- Run the test query on worker_2 but with local execution +-- local plan disabled; now the planner does query deparse +-- and planning of the shard query. +SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1) +FROM schema_sharding_test.test_tbl +WHERE a = 8 AND b IN (1,3,5,8,13,21) +GROUP BY b +ORDER BY b; + +\c - - - :master_port + +DROP SCHEMA schema_sharding_test CASCADE; +RESET ALL; diff --git a/src/test/regress/sql/local_shard_execution_dropped_column.sql b/src/test/regress/sql/local_shard_execution_dropped_column.sql index 1d7dac0b7..921fbf18c 100644 --- a/src/test/regress/sql/local_shard_execution_dropped_column.sql +++ b/src/test/regress/sql/local_shard_execution_dropped_column.sql @@ -78,14 +78,37 @@ execute p4(8); execute p4(8); execute p4(8); +-- Test that "Avoid deparse and planning of shard query for local execution" (*) +-- does not take the fast path of modifying the parse tree with the shard OID, as +-- the dropped column means the attribute check between the distributed table and +-- shard fails. With client_min_messages at DEBUG2, we see "cannot modify parse tree +-- for local execution", indicating that router planning has detected the difference. +-- +-- (*) https://github.com/citusdata/citus/pull/8035 + +SET client_min_messages to DEBUG2; +prepare p5(int) as SELECT count(*) FROM t1 WHERE c = 8 and a = $1 GROUP BY c; +execute p5(5); +execute p5(5); +execute p5(5); +execute p5(5); +execute p5(5); +execute p5(5); +execute p5(5); +execute p5(5); +execute p5(5); +execute p5(5); +RESET client_min_messages; + \c - - - :master_port --- one another combination is that the shell table +-- one other combination is that the shell table -- has a dropped column but not the shard, via rebalance operation SET search_path TO local_shard_execution_dropped_column; ALTER TABLE t1 DROP COLUMN a; SELECT citus_move_shard_placement(2460000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes'); +SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests \c - - - :worker_2_port SET search_path TO local_shard_execution_dropped_column; @@ -132,5 +155,25 @@ execute p3(8); execute p3(8); execute p3(8); +-- Test that "Avoid deparse and planning of shard query for local execution" +-- does not take the fast path of modifying the parse tree with the shard OID +-- for this scenario (rebalance) also. +-- +-- (*) https://github.com/citusdata/citus/pull/8035 + +SET client_min_messages to DEBUG2; +prepare p4(int) as SELECT count(*) FROM t1 WHERE c = 8 and 5 = $1 GROUP BY c; +execute p4(5); +execute p4(5); +execute p4(5); +execute p4(5); +execute p4(5); +execute p4(5); +execute p4(5); +execute p4(5); +execute p4(5); +execute p4(5); +RESET client_min_messages; + \c - - - :master_port DROP SCHEMA local_shard_execution_dropped_column CASCADE;