diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 8eef8f297..d0c830643 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -515,6 +515,8 @@ TaskQueryStringAtIndex(Task *task, int index) } +static char *plan_msg = "Task query optimized away"; + /* * TaskQueryString generates task query string text if missing. * @@ -542,6 +544,10 @@ TaskQueryString(Task *task) { return task->taskQuery.data.queryStringLazy; } + else if (taskQueryType == TASK_QUERY_LOCAL_PLAN) + { + return plan_msg; + } Query *jobQueryReferenceForLazyDeparsing = task->taskQuery.data.jobQueryReferenceForLazyDeparsing; diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 07dc4afc2..14affdb92 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -1299,8 +1299,6 @@ CreateFastPathDistributedPlan(DistributedPlanningContext *planContext) Query *originalQuery = planContext->originalQuery; Query *query = planContext->query; ParamListInfo boundParams = planContext->boundParams; - PlannerRestrictionContext *plannerRestrictionContext = - planContext->plannerRestrictionContext; bool hasUnresolvedParams = false; if (HasUnresolvedExternParamsWalker((Node *) originalQuery, @@ -1533,7 +1531,7 @@ GetDistributedPlan(CustomScan *customScan) Node *node = (Node *) linitial(customScan->custom_private); Assert(CitusIsA(node, DistributedPlan)); - /* CheckNodeCopyAndSerialization(node); commented out for local perf profiling */ + CheckNodeCopyAndSerialization(node); DistributedPlan *distributedPlan = (DistributedPlan *) node; diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 3d9c1cecb..f6a7969af 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -43,6 +43,7 @@ #include "pg_version_constants.h" +#include "distributed/citus_clauses.h" #include "distributed/distributed_planner.h" #include "distributed/insert_select_planner.h" #include "distributed/metadata_cache.h" @@ -171,7 +172,7 @@ FastPathRouterQuery(Query *query, const char *query_string, FromExpr *joinTree = query->jointree; Node *quals = NULL; bool isFastPath = false; - bool isDistributedTable = false; + bool canAvoidDeparse = false; Node *distributionKeyValue = NULL; RangeTblEntry *rangeTableEntry = NULL; @@ -206,7 +207,7 @@ FastPathRouterQuery(Query *query, const char *query_string, { /* we don't need to do any further checks, all INSERTs are fast-path */ isFastPath = true; - isDistributedTable = true; + canAvoidDeparse = true; goto returnFastPath; } @@ -238,18 +239,21 @@ FastPathRouterQuery(Query *query, const char *query_string, Var *distributionKey = PartitionColumn(distributedTableId, 1); if (!distributionKey) { - isDistributedTable = IsCitusTableTypeCacheEntry(cacheEntry, - SINGLE_SHARD_DISTRIBUTED) || - IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE); + canAvoidDeparse = IsCitusTableTypeCacheEntry(cacheEntry, + SINGLE_SHARD_DISTRIBUTED) || + + /*(IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE) */ + /* && (query->commandType == CMD_SELECT)) || */ + false; isFastPath = true; } if (!isFastPath) { - isDistributedTable = IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE); + canAvoidDeparse = IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE); if (joinTree == NULL || - (joinTree->quals == NULL && !isDistributedTable)) + (joinTree->quals == NULL && !canAvoidDeparse)) { /* no quals, not a fast path query */ return false; @@ -300,9 +304,12 @@ returnFastPath: if (EnableSingShardFastPathPOC) { - Assert(rangeTableEntry != NULL); + Assert(rangeTableEntry != NULL || query->commandType == CMD_INSERT); fastPathContext->distTableRte = rangeTableEntry; - fastPathContext->delayFastPathPlanning = isDistributedTable; + canAvoidDeparse = canAvoidDeparse && + !FindNodeMatchingCheckFunction((Node *) query, + CitusIsVolatileFunction); + fastPathContext->delayFastPathPlanning = canAvoidDeparse; /* If the dist key is parameterized the query will use the plan cache (todo: verify) */ fastPathContext->clientQueryString = query_string; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index d8fb2c4ea..bfb3682ec 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -243,14 +243,13 @@ CreateFastPathRouterPlan(DistributedPlanningContext *planContext) planContext->boundParams); } - if (distributedPlan->planningError == NULL) - { - distributedPlan->workerJob = job; - distributedPlan->combineQuery = NULL; - distributedPlan->expectResults = is_select || query->returningList != NIL; - distributedPlan->targetRelationId = is_select ? InvalidOid : - ResultRelationOidForQuery(query); - } + distributedPlan->workerJob = job; + distributedPlan->combineQuery = NULL; + distributedPlan->expectResults = is_select || query->returningList != NIL; + distributedPlan->targetRelationId = is_select ? InvalidOid : + ResultRelationOidForQuery(query); + + ereport(DEBUG2, (errmsg("Creating router plan"))); /* todo: handle the case where planningError is not NULL */ } @@ -2052,6 +2051,9 @@ RouterJobFastPath(DistributedPlanningContext *planContext, Assert(!isMultiShardQuery); Assert(list_length(shardIntervals) == 1); + ereport(DEBUG2, (errmsg("Distributed planning for a fast-path router " + "query"))); + List *relationShards = RelationShardListForShardIntervalList(shardIntervals, &shardsPresent); Assert(shardsPresent); @@ -2064,12 +2066,13 @@ RouterJobFastPath(DistributedPlanningContext *planContext, fastPathContext->distTableRte->relid); Assert(cacheEntry != NULL); Assert(cacheEntry->relationId == shard->relationId); - Assert(IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE)); List *taskPlacementList = CreateTaskPlacementListForShardIntervals(shardIntervals, - true, false, + shardsPresent, + true, false); - Assert(list_length(taskPlacementList) == 1); + + /* Assert(list_length(taskPlacementList) == 1); // not the case for reference tables */ ShardPlacement *primaryPlacement = (ShardPlacement *) linitial(taskPlacementList); Assert(primaryPlacement->shardId == shardId); diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 1c40c5755..e834c1470 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -289,9 +289,7 @@ CopyTaskQuery(Task *newnode, Task *from) case TASK_QUERY_LOCAL_PLAN: { - /*COPY_NODE_FIELD(taskQuery.data.localPlan); */ - /* This is a local planned statement, so shallow copy is enough */ - COPY_SCALAR_FIELD(taskQuery.data.localPlan); + COPY_NODE_FIELD(taskQuery.data.localPlan); break; }