diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 18563c763..4f940fcd9 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -313,6 +313,7 @@ ExecuteLocalTaskListExtended(List *taskList, { int taskNumParams = numParams; Oid *taskParameterTypes = parameterTypes; + int taskType = GetTaskQueryType(task); if (task->parametersInQueryStringResolved) { @@ -330,7 +331,7 @@ ExecuteLocalTaskListExtended(List *taskList, * for concatenated strings, we set queryStringList so that we can access * each query string. */ - if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) + if (taskType == TASK_QUERY_TEXT_LIST) { List *queryStringList = task->taskQuery.data.queryStringList; totalRowsProcessed += @@ -342,22 +343,29 @@ ExecuteLocalTaskListExtended(List *taskList, continue; } - Query *shardQuery = ParseQueryString(TaskQueryString(task), - taskParameterTypes, - taskNumParams); + if (taskType == TASK_QUERY_LOCAL_PLAN) + { + localPlan = task->taskQuery.data.localPlan; + } + else + { + Query *shardQuery = ParseQueryString(TaskQueryString(task), + taskParameterTypes, + taskNumParams); - int cursorOptions = CURSOR_OPT_PARALLEL_OK; + int cursorOptions = CURSOR_OPT_PARALLEL_OK; - /* - * Altough the shardQuery is local to this node, we prefer planner() - * over standard_planner(). The primary reason for that is Citus itself - * is not very tolarent standard_planner() calls that doesn't go through - * distributed_planner() because of the way that restriction hooks are - * implemented. So, let planner to call distributed_planner() which - * eventually calls standard_planner(). - */ - localPlan = planner(shardQuery, NULL, cursorOptions, paramListInfo); + /* + * Altough the shardQuery is local to this node, we prefer planner() + * over standard_planner(). The primary reason for that is Citus itself + * is not very tolarent standard_planner() calls that doesn't go through + * distributed_planner() because of the way that restriction hooks are + * implemented. So, let planner to call distributed_planner() which + * eventually calls standard_planner(). + */ + localPlan = planner(shardQuery, NULL, cursorOptions, paramListInfo); + } } char *shardQueryString = NULL; diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 2542d931a..8eef8f297 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -439,6 +439,24 @@ SetTaskQueryStringList(Task *task, List *queryStringList) } +void +SetTaskQueryPlan(Task *task, PlannedStmt *localPlan) +{ + Assert(localPlan != NULL); + task->taskQuery.queryType = TASK_QUERY_LOCAL_PLAN; + task->taskQuery.data.localPlan = localPlan; + task->queryCount = 1; +} + + +PlannedStmt * +TaskQueryLocalPlan(Task *task) +{ + Assert(GetTaskQueryType(task) == TASK_QUERY_LOCAL_PLAN); + return task->taskQuery.data.localPlan; +} + + /* * DeparseTaskQuery is a general way of deparsing a query based on a task. */ diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 193e2f250..07dc4afc2 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -97,6 +97,10 @@ static bool ListContainsDistributedTableRTE(List *rangeTableList, bool *maybeHasForeignDistributedTable); static PlannedStmt * CreateDistributedPlannedStmt( DistributedPlanningContext *planContext); + +static PlannedStmt * CreateFastPathDistributedPlannedStmt( + DistributedPlanningContext *planContext); + static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, DistributedPlanningContext *planContext); @@ -135,13 +139,13 @@ static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo, Const *resultFormatConst); static List * OuterPlanParamsList(PlannerInfo *root); static List * CopyPlanParamList(List *originalPlanParamList); -static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void); +static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext( + FastPathRestrictionContext *fastPathContext); static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void); static void PopPlannerRestrictionContext(void); static void ResetPlannerRestrictionContext( PlannerRestrictionContext *plannerRestrictionContext); -static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, - Node *distributionKeyValue); +static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext); static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, int rteIdCounter); static RTEListProperties * GetRTEListProperties(List *rangeTableList); @@ -156,6 +160,8 @@ static bool CheckPostPlanDistribution(bool isDistributedQuery, Query *origQuery, List *rangeTableList, Query *plannedQuery); +static DistributedPlan * CreateFastPathDistributedPlan( + DistributedPlanningContext *planContext); /* Distributed planner hook */ PlannedStmt * @@ -166,7 +172,7 @@ distributed_planner(Query *parse, { bool needsDistributedPlanning = false; bool fastPathRouterQuery = false; - Node *distributionKeyValue = NULL; + FastPathRestrictionContext fastPathContext = { 0 }; List *rangeTableList = ExtractRangeTableEntryList(parse); @@ -191,7 +197,8 @@ distributed_planner(Query *parse, &maybeHasForeignDistributedTable); if (needsDistributedPlanning) { - fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue); + fastPathRouterQuery = FastPathRouterQuery(parse, query_string, + &fastPathContext); if (maybeHasForeignDistributedTable) { @@ -248,7 +255,8 @@ distributed_planner(Query *parse, HideCitusDependentObjectsOnQueriesOfPgMetaTables((Node *) parse, NULL); /* create a restriction context and put it at the end if context list */ - planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(); + planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext( + &fastPathContext); /* * We keep track of how many times we've recursed into the planner, primarily @@ -264,7 +272,7 @@ distributed_planner(Query *parse, { if (fastPathRouterQuery) { - result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue); + result = PlanFastPathDistributedStmt(&planContext); } else { @@ -649,32 +657,21 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan) * the FastPathPlanner. */ static PlannedStmt * -PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, - Node *distributionKeyValue) +PlanFastPathDistributedStmt(DistributedPlanningContext *planContext) { FastPathRestrictionContext *fastPathContext = planContext->plannerRestrictionContext->fastPathRestrictionContext; - planContext->plannerRestrictionContext->fastPathRestrictionContext-> - fastPathRouterQuery = true; + if (!fastPathContext->delayFastPathPlanning) + { + planContext->plan = FastPathPlanner(planContext->originalQuery, + planContext->query, + planContext->boundParams); - if (distributionKeyValue == NULL) - { - /* nothing to record */ - } - else if (IsA(distributionKeyValue, Const)) - { - fastPathContext->distributionKeyValue = (Const *) distributionKeyValue; - } - else if (IsA(distributionKeyValue, Param)) - { - fastPathContext->distributionKeyHasParam = true; + return CreateDistributedPlannedStmt(planContext); } - planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query, - planContext->boundParams); - - return CreateDistributedPlannedStmt(planContext); + return CreateFastPathDistributedPlannedStmt(planContext); } @@ -826,6 +823,76 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) } +static PlannedStmt * +CreateFastPathDistributedPlannedStmt(DistributedPlanningContext *planContext) +{ + uint64 planId = NextPlanId++; + bool hasUnresolvedParams = false; /* todo: need to assign this from CreateFastPathDistributedPlan() output */ + PlannedStmt *resultPlan = NULL; + + DistributedPlan *distributedPlan = CreateFastPathDistributedPlan(planContext); + + /* + * If no plan was generated, prepare a generic error to be emitted. + * Normally this error message will never returned to the user, as it's + * usually due to unresolved prepared statement parameters - in that case + * the logic below will force a custom plan (i.e. with parameters bound to + * specific values) to be generated. But sql (not plpgsql) functions + * unfortunately don't go through a codepath supporting custom plans - so + * we still need to have an error prepared. + */ + if (!distributedPlan) + { + /* currently always should have a more specific error otherwise */ + distributedPlan = CitusMakeNode(DistributedPlan); + distributedPlan->planningError = + DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "could not create distributed plan", + "Possibly this is caused by the use of parameters in SQL " + "functions, which is not supported in Citus.", + "Consider using PL/pgSQL functions instead."); + } + + /* + * Error out if none of the planners resulted in a usable plan, unless the + * error was possibly triggered by missing parameters. In that case we'll + * not error out here, but instead rely on postgres' custom plan logic. + * Postgres re-plans prepared statements the first five executions + * (i.e. it produces custom plans), after that the cost of a generic plan + * is compared with the average custom plan cost. We support otherwise + * unsupported prepared statement parameters by assigning an exorbitant + * cost to the unsupported query. That'll lead to the custom plan being + * chosen. But for that to be possible we can't error out here, as + * otherwise that logic is never reached. + */ + if (distributedPlan->planningError && !hasUnresolvedParams) + { + RaiseDeferredError(distributedPlan->planningError, ERROR); + } + + /* remember the plan's identifier for identifying subplans */ + distributedPlan->planId = planId; + + /* create final plan by combining local plan with distributed plan. todo: FIXME */ + resultPlan = FinalizePlan(planContext->plan, distributedPlan); + + /* + * As explained above, force planning costs to be unrealistically high if + * query planning failed (possibly) due to prepared statement parameters or + * if it is planned as a multi shard modify query. + */ + if ((distributedPlan->planningError || + (UpdateOrDeleteOrMergeQuery(planContext->originalQuery) && IsMultiTaskPlan( + distributedPlan))) && + hasUnresolvedParams) + { + DissuadePlannerFromUsingPlan(resultPlan); + } + + return resultPlan; +} + + /* * InlineCtesAndCreateDistributedPlannedStmt gets all the parameters required * for creating a distributed planned statement. The function is primarily a @@ -1225,6 +1292,95 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina } +static DistributedPlan * +CreateFastPathDistributedPlan(DistributedPlanningContext *planContext) +{ + DistributedPlan *distributedPlan = NULL; + Query *originalQuery = planContext->originalQuery; + Query *query = planContext->query; + ParamListInfo boundParams = planContext->boundParams; + PlannerRestrictionContext *plannerRestrictionContext = + planContext->plannerRestrictionContext; + + bool hasUnresolvedParams = false; + if (HasUnresolvedExternParamsWalker((Node *) originalQuery, + boundParams)) + { + hasUnresolvedParams = true; + } + + /* Router planner only */ + RouterPlanType routerPlan = GetRouterPlanType(query, originalQuery, + hasUnresolvedParams); + switch (routerPlan) + { + case DML_QUERY: + case SELECT_QUERY: + { + /* + * modifications and selects are handled via the same fast path routing. + */ + distributedPlan = + CreateFastPathRouterPlan(planContext); + break; + } + + case INSERT_SELECT_INTO_CITUS_TABLE: + case INSERT_SELECT_INTO_LOCAL_TABLE: + case MERGE_QUERY: + case REPLAN_WITH_BOUND_PARAMETERS: + { + /* + * Unexpected plan type - error out for now. + */ + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "unexpected plan type for fast path single shard planning"))); + } + } + + /* the functions above always return a plan, possibly with an error */ + Assert(distributedPlan); + + if (distributedPlan->planningError == NULL) + { + return distributedPlan; + } + else + { + RaiseDeferredError(distributedPlan->planningError, DEBUG2); + } + + if (hasUnresolvedParams) + { + /* + * There are parameters that don't have a value in boundParams. + * + * The remainder of the planning logic cannot handle unbound + * parameters. We return a NULL plan, which will have an + * extremely high cost, such that postgres will replan with + * bound parameters. + */ + return NULL; + } + + /* force evaluation of bound params */ + boundParams = copyParamList(boundParams); + + /* + * If there are parameters that do have a value in boundParams, replace + * them in the original query. This allows us to more easily cut the + * query into pieces (during recursive planning) or deparse parts of + * the query (during subquery pushdown planning). + */ + originalQuery = (Query *) ResolveExternalParams((Node *) originalQuery, + boundParams); + Assert(originalQuery != NULL); + + return distributedPlan; +} + + /* * EnsurePartitionTableNotReplicated errors out if the input relation is * a partition table and the table has a replication factor greater than @@ -1377,7 +1533,7 @@ GetDistributedPlan(CustomScan *customScan) Node *node = (Node *) linitial(customScan->custom_private); Assert(CitusIsA(node, DistributedPlan)); - CheckNodeCopyAndSerialization(node); + /* CheckNodeCopyAndSerialization(node); commented out for local perf profiling */ DistributedPlan *distributedPlan = (DistributedPlan *) node; @@ -2413,7 +2569,8 @@ CopyPlanParamList(List *originalPlanParamList) * inserted to the beginning of the plannerRestrictionContextList and it is returned. */ static PlannerRestrictionContext * -CreateAndPushPlannerRestrictionContext(void) +CreateAndPushPlannerRestrictionContext( + FastPathRestrictionContext *fastPathRestrictionContext) { PlannerRestrictionContext *plannerRestrictionContext = palloc0(sizeof(PlannerRestrictionContext)); @@ -2427,6 +2584,14 @@ CreateAndPushPlannerRestrictionContext(void) plannerRestrictionContext->fastPathRestrictionContext = palloc0(sizeof(FastPathRestrictionContext)); + if (fastPathRestrictionContext != NULL) + { + /* copy the fast path restriction context */ + memcpy(plannerRestrictionContext->fastPathRestrictionContext, + fastPathRestrictionContext, + sizeof(FastPathRestrictionContext)); + } + plannerRestrictionContext->memoryContext = CurrentMemoryContext; /* we'll apply logical AND as we add tables */ diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 59f80bb40..3bbf7956b 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -53,6 +53,7 @@ #include "distributed/shardinterval_utils.h" bool EnableFastPathRouterPlanner = true; +bool EnableSingShardFastPathPOC = true; static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, @@ -112,10 +113,6 @@ GeneratePlaceHolderPlannedStmt(Query *parse) Plan *plan = &scanNode->plan; #endif - Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; - - Assert(FastPathRouterQuery(parse, &distKey)); - /* there is only a single relation rte */ #if PG_VERSION_NUM >= PG_VERSION_16 scanNode->scan.scanrelid = 1; @@ -166,10 +163,14 @@ GeneratePlaceHolderPlannedStmt(Query *parse) * don't have any sublinks/CTEs etc */ bool -FastPathRouterQuery(Query *query, Node **distributionKeyValue) +FastPathRouterQuery(Query *query, const char *query_string, + FastPathRestrictionContext *fastPathContext) { FromExpr *joinTree = query->jointree; Node *quals = NULL; + bool isFastPath = false; + bool isDistributedTable = false; + Node *distributionKeyValue = NULL; if (!EnableFastPathRouterPlanner) { @@ -201,7 +202,9 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) else if (query->commandType == CMD_INSERT) { /* we don't need to do any further checks, all INSERTs are fast-path */ - return true; + isFastPath = true; + isDistributedTable = true; + goto returnFastPath; } /* make sure that the only range table in FROM clause */ @@ -232,45 +235,74 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) Var *distributionKey = PartitionColumn(distributedTableId, 1); if (!distributionKey) { - return true; + isFastPath = true; } - /* WHERE clause should not be empty for distributed tables */ - if (joinTree == NULL || - (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && joinTree->quals == - NULL)) + if (!isFastPath) { - return false; + isDistributedTable = IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE); + + if (joinTree == NULL || + (joinTree->quals == NULL && !isDistributedTable)) + { + /* no quals, not a fast path query */ + return false; + } + + quals = joinTree->quals; + if (quals != NULL && IsA(quals, List)) + { + quals = (Node *) make_ands_explicit((List *) quals); + } + + /* + * Distribution column must be used in a simple equality match check and it must be + * place at top level conjunction operator. In simple words, we should have + * WHERE dist_key = VALUE [AND ....]; + * + * We're also not allowing any other appearances of the distribution key in the quals. + * + * Overall the logic might sound fuzzy since it involves two individual checks: + * (a) Check for top level AND operator with one side being "dist_key = const" + * (b) Only allow single appearance of "dist_key" in the quals + * + * This is to simplify both of the individual checks and omit various edge cases + * that might arise with multiple distribution keys in the quals. + */ + isFastPath = (ConjunctionContainsColumnFilter(quals, distributionKey, + &distributionKeyValue) && + !ColumnAppearsMultipleTimes(quals, distributionKey)); } - /* convert list of expressions into expression tree for further processing */ - quals = joinTree->quals; - if (quals != NULL && IsA(quals, List)) +returnFastPath: + + if (isFastPath) { - quals = (Node *) make_ands_explicit((List *) quals); + fastPathContext->fastPathRouterQuery = true; + if (distributionKeyValue == NULL) + { + /* nothing to record */ + } + else if (IsA(distributionKeyValue, Const)) + { + fastPathContext->distributionKeyValue = (Const *) distributionKeyValue; + } + else if (IsA(distributionKeyValue, Param)) + { + fastPathContext->distributionKeyHasParam = true; + } + + if (EnableSingShardFastPathPOC) + { + fastPathContext->distTableRte = rangeTableEntry; + fastPathContext->delayFastPathPlanning = isDistributedTable; /*&& !fastPathContext->distributionKeyHasParam; */ + + /* If the dist key is parameterized the query will use the plan cache (todo: verify) */ + fastPathContext->clientQueryString = query_string; + } } - /* - * Distribution column must be used in a simple equality match check and it must be - * place at top level conjunction operator. In simple words, we should have - * WHERE dist_key = VALUE [AND ....]; - * - * We're also not allowing any other appearances of the distribution key in the quals. - * - * Overall the logic might sound fuzzy since it involves two individual checks: - * (a) Check for top level AND operator with one side being "dist_key = const" - * (b) Only allow single appearance of "dist_key" in the quals - * - * This is to simplify both of the individual checks and omit various edge cases - * that might arise with multiple distribution keys in the quals. - */ - if (ConjunctionContainsColumnFilter(quals, distributionKey, distributionKeyValue) && - !ColumnAppearsMultipleTimes(quals, distributionKey)) - { - return true; - } - - return false; + return isFastPath; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 19d386343..52a8c2fd3 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -20,6 +20,7 @@ #include "catalog/pg_opfamily.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "distributed/shard_utils.h" #include "executor/execdesc.h" #include "lib/stringinfo.h" #include "nodes/makefuncs.h" @@ -34,6 +35,7 @@ #include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/planmain.h" +#include "optimizer/planner.h" #include "optimizer/restrictinfo.h" #include "parser/parse_oper.h" #include "parser/parsetree.h" @@ -173,7 +175,9 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList); static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList); static bool IsLocallyAccessibleCitusLocalTable(Oid relationId); - +static Job * RouterJobFastPath(DistributedPlanningContext *planContext, + DistributedPlan *distributedPlan); +static Query * ReplaceShardRelationId(Query *query, Oid relationID, Oid shardRelationId); /* * CreateRouterPlan attempts to create a router executor plan for the given @@ -201,6 +205,64 @@ CreateRouterPlan(Query *originalQuery, Query *query, } +DistributedPlan * +CreateFastPathRouterPlan(DistributedPlanningContext *planContext) +{ + DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); + Query *query = planContext->query; + bool is_select = (query->commandType == CMD_SELECT); + + distributedPlan->planningError = NULL; /* todo: relevant parts of DeferErrorIfUnsupportedRouterPlannableSelectQuery(planContext->query); */ + /* Query targetlist does not have nextval */ + /* Query -> hasForUpdate AND table replication factor > 1 */ + if (!is_select) + { + distributedPlan->planningError = ModifyQuerySupported(query, + planContext->originalQuery, + false, + planContext-> + plannerRestrictionContext); + } + + if (distributedPlan->planningError == NULL) + { + distributedPlan->modLevel = RowModifyLevelForQuery(query); + Job *job = NULL; + + if (is_select || UpdateOrDeleteOrMergeQuery(query)) + { + job = RouterJobFastPath(planContext, distributedPlan); + } + else + { + job = RouterInsertJob(planContext->originalQuery); + + /* Apply fast path planning :: todo - place earlier in planning */ + planContext->plan = FastPathPlanner(planContext->originalQuery, + planContext->query, + planContext->boundParams); + } + + if (distributedPlan->planningError == NULL) + { + distributedPlan->workerJob = job; + distributedPlan->combineQuery = NULL; + distributedPlan->expectResults = is_select || query->returningList != NIL; + distributedPlan->targetRelationId = is_select ? InvalidOid : + ResultRelationOidForQuery(query); + } + + /* todo: handle the case where planningError is not NULL */ + } + + distributedPlan->fastPathRouterPlan = + planContext->plannerRestrictionContext->fastPathRestrictionContext-> + fastPathRouterQuery; + + return distributedPlan; +} + + /* * CreateModifyPlan attempts to create a plan for the given modification * statement. If planning fails ->planningError is set to a description of @@ -1948,6 +2010,188 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon } +static Job * +RouterJobFastPath(DistributedPlanningContext *planContext, + DistributedPlan *distributedPlan) +{ + FastPathRestrictionContext *fastPathContext = + planContext->plannerRestrictionContext->fastPathRestrictionContext; + + /* DeferredErrorMessage *planningError = NULL; todo - check if we need this */ + Job *job = NULL; + Assert(fastPathContext->fastPathRouterQuery); + Assert(fastPathContext->delayFastPathPlanning); + + if (fastPathContext->distributionKeyHasParam) + { + /* + * Deferred pruning - don't know which shard at this point. + */ + job = CreateJob(planContext->query); + job->deferredPruning = true; + + /* Apply fast path planning */ + planContext->plan = FastPathPlanner(planContext->originalQuery, + planContext->query, + planContext->boundParams); + return job; + } + + Query *originalQuery = planContext->originalQuery; + bool isMultiShardQuery = false, shardsPresent = false, + distTableWithShardKey = false, isLocalExecution = false; + Const *partitionKeyValue = NULL; + Const *distributionKeyValue = fastPathContext->distributionKeyValue; + uint64 shardId = INVALID_SHARD_ID; + int32 localGroupId = -1; + + List *shardIntervals = + TargetShardIntervalForFastPathQuery(originalQuery, &isMultiShardQuery, + distributionKeyValue, + &partitionKeyValue); + Assert(!isMultiShardQuery); + Assert(list_length(shardIntervals) == 1); + + List *relationShards = RelationShardListForShardIntervalList(shardIntervals, + &shardsPresent); + Assert(shardsPresent); + Assert(list_length(relationShards) == 1); + + RelationShard *shard = (RelationShard *) linitial(relationShards); + shardId = shard->shardId; + + CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry( + fastPathContext->distTableRte->relid); + Assert(cacheEntry != NULL); + Assert(cacheEntry->relationId == shard->relationId); + Assert(IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE)); + + distTableWithShardKey = HasDistributionKeyCacheEntry(cacheEntry); + Assert(distTableWithShardKey); + + List *taskPlacementList = CreateTaskPlacementListForShardIntervals(shardIntervals, + true, false, + false); + Assert(list_length(taskPlacementList) == 1); + ShardPlacement *primaryPlacement = + (ShardPlacement *) linitial(taskPlacementList); + Assert(primaryPlacement->shardId == shardId); + Assert(originalQuery->resultRelation == 0 || + fastPathContext->distTableRte->rtekind != RTE_SUBQUERY); + Assert(shardId != INVALID_SHARD_ID); + if (originalQuery->hasModifyingCTE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "Fast path queries with modifying CTEs are not supported"))); + } + + job = CreateJob(originalQuery); + job->partitionKeyValue = partitionKeyValue; + job->requiresCoordinatorEvaluation = false; /* todo: assert this is correct */ + job->colocationId = TableColocationId(fastPathContext->distTableRte->relid); + + TaskType taskType = READ_TASK; + char replicationModel = 0; + + if (originalQuery->commandType != CMD_SELECT) + { + taskType = MODIFY_TASK; + replicationModel = cacheEntry->replicationModel; + } + + Task *task = CreateTask(taskType); + task->isLocalTableModification = false; + List *relationRowLockList = NIL; + + task->taskPlacementList = taskPlacementList; + task->partitionKeyValue = partitionKeyValue; + task->colocationId = job->colocationId; + + /* Determine if task is local or remote */ + localGroupId = GetLocalGroupId(); + isLocalExecution = (primaryPlacement->groupId == localGroupId); + + if (isLocalExecution) + { + planContext->query = ReplaceShardRelationId(planContext->query, + shard->relationId, + shardId); + + /* Plan the query with the new shard relation id */ + /* Save plan in planContext->plan */ + planContext->plan = standard_planner(planContext->query, NULL, + planContext->cursorOptions, + planContext->boundParams); + SetTaskQueryPlan(task, planContext->plan); + } + else + { + SetTaskQueryString(task, (char *) fastPathContext->clientQueryString); + + /* Call fast path query planner, Save plan in planContext->plan */ + planContext->plan = FastPathPlanner(planContext->originalQuery, + planContext->query, + planContext->boundParams); + } + task->anchorShardId = shardId; + task->jobId = job->jobId; + task->relationShardList = relationShards; + task->relationRowLockList = relationRowLockList; + task->replicationModel = replicationModel; + task->parametersInQueryStringResolved = job->parametersInJobQueryResolved; + + job->taskList = list_make1(task); + return job; +} + + +static Query * +ReplaceShardRelationId(Query *query, Oid citusTableOid, Oid shardId) +{ + Oid shardRelationId = InvalidOid; + + Assert(list_length(query->rtable) == 1); + Assert(list_length(query->rteperminfos) == 1); + + RangeTblEntry *rte = (RangeTblEntry *) linitial(query->rtable); + Assert(rte->relid == citusTableOid); + RTEPermissionInfo *rtePermInfo = (RTEPermissionInfo *) linitial(query->rteperminfos); + + const char *citusTableName = get_rel_name(citusTableOid); + Assert(citusTableName != NULL); + + /* construct shard relation name */ + char *shardRelationName = pstrdup(citusTableName); + AppendShardIdToName(&shardRelationName, shardId); + + RangeVar shardRangeVar = { + .relname = shardRelationName, + .schemaname = NULL, /* todo - should initialize this ? get_rel_namespace(shardRelationId), */ + .inh = rte->inh, + .relpersistence = RELPERSISTENCE_PERMANENT, + }; + + shardRelationId = RangeVarGetRelidExtended(&shardRangeVar, rte->rellockmode, + 0, NULL, NULL); /* todo - use suitable callback for perms check? */ + Assert(shardRelationId != InvalidOid); + rte->relid = shardRelationId; + rtePermInfo->relid = shardRelationId; + + ListCell *lc = NULL; + foreach(lc, query->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(lc); + if (targetEntry->resorigtbl == citusTableOid) + { + targetEntry->resorigtbl = shardRelationId; + } + } + + return query; +} + + /* * GenerateSingleShardRouterTaskList is a wrapper around other corresponding task * list generation functions specific to single shard selects and modifications. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 430eb8555..823646e26 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1366,6 +1366,17 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_single_shard_poc", + gettext_noop("Enables execution shortcuts for single shard " + "queries in the proof of concept mode."), + NULL, + &EnableSingShardFastPathPOC, + true, + PGC_USERSET, + GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_local_execution", gettext_noop("Enables queries on shards that are local to the current node " diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 4b4a334c8..1c40c5755 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -287,6 +287,14 @@ CopyTaskQuery(Task *newnode, Task *from) break; } + case TASK_QUERY_LOCAL_PLAN: + { + /*COPY_NODE_FIELD(taskQuery.data.localPlan); */ + /* This is a local planned statement, so shallow copy is enough */ + COPY_SCALAR_FIELD(taskQuery.data.localPlan); + break; + } + default: { break; diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 8fb012588..7ac49f920 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -27,7 +27,9 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryStringList(Task *task, List *queryStringList); +extern void SetTaskQueryPlan(Task *task, PlannedStmt *localPlan); extern char * TaskQueryString(Task *task); +extern PlannedStmt * TaskQueryLocalPlan(Task *task); extern char * TaskQueryStringAtIndex(Task *task, int index); extern int GetTaskQueryType(Task *task); extern void AddInsertAliasIfNeeded(Query *query); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 33a9c2fa8..5f343aae2 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -99,6 +99,12 @@ typedef struct FastPathRestrictionContext * Set to true when distKey = Param; in the queryTree */ bool distributionKeyHasParam; + + bool delayFastPathPlanning; /* Indicates to hold off on callning the fast path planner until its known if the shard is local */ + + RangeTblEntry *distTableRte; /* Range table entry for the table we're querying */ + + const char *clientQueryString; /* As passed in by the client */ } FastPathRestrictionContext; typedef struct PlannerRestrictionContext diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 475a41b37..c73b78227 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -174,7 +174,8 @@ typedef enum TaskQueryType TASK_QUERY_NULL, TASK_QUERY_TEXT, TASK_QUERY_OBJECT, - TASK_QUERY_TEXT_LIST + TASK_QUERY_TEXT_LIST, + TASK_QUERY_LOCAL_PLAN, } TaskQueryType; typedef struct TaskQuery @@ -219,6 +220,8 @@ typedef struct TaskQuery * when we want to access each query string. */ List *queryStringList; + + PlannedStmt *localPlan; /* only applies to local tasks */ }data; }TaskQuery; diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 44be2736e..5cfd5a69b 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -28,12 +28,15 @@ extern bool EnableRouterExecution; extern bool EnableFastPathRouterPlanner; +extern bool EnableSingShardFastPathPOC; extern bool EnableNonColocatedRouterQueryPushdown; extern DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query, PlannerRestrictionContext * plannerRestrictionContext); +extern DistributedPlan * CreateFastPathRouterPlan( + DistributedPlanningContext *planContext); extern DistributedPlan * CreateModifyPlan(Query *originalQuery, Query *query, PlannerRestrictionContext * plannerRestrictionContext); @@ -100,7 +103,8 @@ extern void GenerateSingleShardRouterTaskList(Job *job, extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams); -extern bool FastPathRouterQuery(Query *query, Node **distributionKeyValue); +extern bool FastPathRouterQuery(Query *query, const char *query_string, + FastPathRestrictionContext *fastPathContext); extern bool JoinConditionIsOnFalse(List *relOptInfo); extern Oid ResultRelationOidForQuery(Query *query); extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId,