diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index c889e00ad..bd1ad8d2b 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -440,11 +440,12 @@ SetTaskQueryStringList(Task *task, List *queryStringList) void -SetTaskQueryPlan(Task *task, PlannedStmt *localPlan) +SetTaskQueryPlan(Task *task, Query *query, PlannedStmt *localPlan) { Assert(localPlan != NULL); task->taskQuery.queryType = TASK_QUERY_LOCAL_PLAN; - task->taskQuery.data.localPlan = localPlan; + task->taskQuery.data.jobQueryReferenceForLazyDeparsing = query; + task->localPlan = localPlan; task->queryCount = 1; } @@ -453,7 +454,7 @@ PlannedStmt * TaskQueryLocalPlan(Task *task) { Assert(task->taskQuery.queryType == TASK_QUERY_LOCAL_PLAN); - return task->taskQuery.data.localPlan; + return task->localPlan; } @@ -515,8 +516,6 @@ TaskQueryStringAtIndex(Task *task, int index) } -static char *qry_unavailable_msg = "SELECT 'Task query unavailable - optimized away'"; - /* * TaskQueryString generates task query string text if missing. * @@ -546,7 +545,11 @@ TaskQueryString(Task *task) } else if (taskQueryType == TASK_QUERY_LOCAL_PLAN) { - return qry_unavailable_msg; + Query *query = task->taskQuery.data.jobQueryReferenceForLazyDeparsing; + Assert(query != NULL); + UpdateRelationToShardNames((Node *) query, task->relationShardList); + return AnnotateQuery(DeparseTaskQuery(task, query), + task->partitionKeyValue, task->colocationId); } Query *jobQueryReferenceForLazyDeparsing = diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index c0ec05e89..0ccdb2980 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -148,6 +148,51 @@ GeneratePlaceHolderPlannedStmt(Query *parse) } +static void +InitializeFastPathContext(FastPathRestrictionContext *fastPathContext, + Node *distributionKeyValue, + bool canAvoidDeparse, + Query *query) +{ + Assert(fastPathContext != NULL); + Assert(!fastPathContext->fastPathRouterQuery); + Assert(!fastPathContext->delayFastPathPlanning); + + /* + * We're looking at a fast path query, so we can fill the + * fastPathContext with relevant details. + */ + fastPathContext->fastPathRouterQuery = true; + if (distributionKeyValue == NULL) + { + /* nothing to record */ + } + else if (IsA(distributionKeyValue, Const)) + { + fastPathContext->distributionKeyValue = (Const *) distributionKeyValue; + } + else if (IsA(distributionKeyValue, Param)) + { + fastPathContext->distributionKeyHasParam = true; + } + + if (EnableFastPathLocalExecutor) + { + /* + * This fast path query may be executed by the local executor. + * We need to delay the fast path planning until we know if the + * shard is local or not. Make a final check for volatile + * functions in the query tree to determine if we should delay + * the fast path planning. + */ + fastPathContext->delayFastPathPlanning = canAvoidDeparse && + !FindNodeMatchingCheckFunction( + (Node *) query, + CitusIsVolatileFunction); + } +} + + /* * FastPathRouterQuery gets a query and returns true if the query is eligible for * being a fast path router query. It also fills the given fastPathContext with @@ -175,7 +220,6 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext) bool isFastPath = false; bool canAvoidDeparse = false; Node *distributionKeyValue = NULL; - RangeTblEntry *rangeTableEntry = NULL; if (!EnableFastPathRouterPlanner) { @@ -207,8 +251,8 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext) else if (query->commandType == CMD_INSERT) { /* we don't need to do any further checks, all INSERTs are fast-path */ - isFastPath = true; - goto returnFastPath; + InitializeFastPathContext(fastPathContext, NULL, true, query); + return true; } /* make sure that the only range table in FROM clause */ @@ -217,7 +261,7 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext) return false; } - rangeTableEntry = (RangeTblEntry *) linitial(query->rtable); + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(query->rtable); if (rangeTableEntry->rtekind != RTE_RELATION) { return false; @@ -281,52 +325,10 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext) !ColumnAppearsMultipleTimes(quals, distributionKey)); } -returnFastPath: - if (isFastPath) { - Assert(fastPathContext != NULL); - Assert(!fastPathContext->fastPathRouterQuery); - Assert(!fastPathContext->delayFastPathPlanning); - - /* - * We're looking at a fast path query, so we can fill the - * fastPathContext with relevant details. - */ - fastPathContext->fastPathRouterQuery = true; - if (distributionKeyValue == NULL) - { - /* nothing to record */ - } - else if (IsA(distributionKeyValue, Const)) - { - fastPathContext->distributionKeyValue = (Const *) distributionKeyValue; - } - else if (IsA(distributionKeyValue, Param)) - { - fastPathContext->distributionKeyHasParam = true; - } - - /* - * Note the range table entry for the table we're querying. - */ - Assert(rangeTableEntry != NULL || query->commandType == CMD_INSERT); - fastPathContext->distTableRte = rangeTableEntry; - - if (EnableFastPathLocalExecutor) - { - /* - * This fast path query may be executed by the local executor. - * We need to delay the fast path planning until we know if the - * shard is local or not. Make a final check for volatile - * functions in the query tree to determine if we should delay - * the fast path planning. - */ - fastPathContext->delayFastPathPlanning = canAvoidDeparse && - !FindNodeMatchingCheckFunction( - (Node *) query, - CitusIsVolatileFunction); - } + InitializeFastPathContext(fastPathContext, distributionKeyValue, canAvoidDeparse, + query); } return isFastPath; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 03c339566..b5158f309 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -16,6 +16,8 @@ #include "postgres.h" #include "access/stratnum.h" +#include "access/tupdesc.h" +#include "access/tupdesc_details.h" #include "access/xact.h" #include "catalog/pg_opfamily.h" #include "catalog/pg_proc.h" @@ -175,7 +177,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList); static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList); static bool IsLocallyAccessibleCitusLocalTable(Oid relationId); -static Query * ConvertToQueryOnShard(Query *query, Oid relationID, Oid shardRelationId); +static bool ConvertToQueryOnShard(Query *query, Oid relationID, Oid shardRelationId); /* * CreateRouterPlan attempts to create a router executor plan for the given @@ -1952,6 +1954,75 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon } +/* + * CheckAttributesMatch checks if the attributes of the Citus table and the shard + * table match. + * + * It is used to ensure that the shard table has the same schema as the Citus + * table before replacing the Citus table OID with the shard table OID in the + * parse tree we (Citus planner) recieved from Postgres. + */ +static +bool +CheckAttributesMatch(Oid citusTableId, Oid shardTableId) +{ + Relation citusR, shardR; + bool same_schema = false; + + citusR = RelationIdGetRelation(citusTableId); + shardR = RelationIdGetRelation(shardTableId); + + if (RelationIsValid(citusR) && RelationIsValid(shardR)) + { + TupleDesc citusTupDesc = citusR->rd_att; + TupleDesc shardTupDesc = shardR->rd_att; + + if (citusTupDesc->natts == shardTupDesc->natts) + { + /* + * Do an attribute-by-attribute comparison. This is borrowed from + * the Postgres function equalTupleDescs(), which we cannot use + * because the citus table and shard table have different composite + * types. + */ + same_schema = true; + for (int i = 0; i < citusTupDesc->natts && same_schema; i++) + { + Form_pg_attribute attr1 = TupleDescAttr(citusTupDesc, i); + Form_pg_attribute attr2 = TupleDescAttr(shardTupDesc, i); + + if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0) + { + same_schema = false; + } + if (attr1->atttypid != attr2->atttypid) + { + same_schema = false; + } + if (attr1->atttypmod != attr2->atttypmod) + { + same_schema = false; + } + if (attr1->attcollation != attr2->attcollation) + { + same_schema = false; + } + + /* Record types derived from tables could have dropped fields. */ + if (attr1->attisdropped != attr2->attisdropped) + { + same_schema = false; + } + } + } + } + + RelationClose(citusR); + RelationClose(shardR); + return same_schema; +} + + void CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext, DistributedPlan *plan) @@ -1969,7 +2040,7 @@ CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext, if (job->deferredPruning) { - /* Call fast path query planner, Save plan in planContext->plan */ + /* Execution time pruning => don't know which shard at this point */ planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query, planContext->boundParams); @@ -1985,27 +2056,26 @@ CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext, ShardPlacement *primaryPlacement = (ShardPlacement *) linitial(placements); List *relationShards = task->relationShardList; Assert(list_length(relationShards) == 1); + RelationShard *relationShard = (RelationShard *) linitial(relationShards); + Assert(relationShard->shardId == primaryPlacement->shardId); bool isLocalExecution = (primaryPlacement->groupId == localGroupId); - if (isLocalExecution) + if (isLocalExecution && ConvertToQueryOnShard(planContext->query, + relationShard->relationId, + relationShard->shardId)) { - ConvertToQueryOnShard(planContext->query, - fastPathContext->distTableRte->relid, - primaryPlacement->shardId); - /* Plan the query with the new shard relation id */ - /* Save plan in planContext->plan */ planContext->plan = standard_planner(planContext->query, NULL, planContext->cursorOptions, planContext->boundParams); - SetTaskQueryPlan(task, planContext->plan); + SetTaskQueryPlan(task, job->jobQuery, planContext->plan); ereport(DEBUG2, (errmsg("Local plan for fast-path router " "query"))); } else { - /* Call fast path query planner, Save plan in planContext->plan */ + /* Fall back to fast path planner and generating SQL query on the shard */ planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query, planContext->boundParams); @@ -2029,7 +2099,7 @@ CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext, * changes the RTEPermissionInfo's relid to the shard's relation id also. * At this point the Query is ready for the postgres planner. */ -static Query * +static bool ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId) { Assert(list_length(query->rtable) == 1); @@ -2060,6 +2130,23 @@ ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId) citusTableRte->rellockmode, 0, NULL, NULL); /* todo - use suitable callback for perms check? */ + /* Verify that the attributes of citus table and shard table match */ + if (!CheckAttributesMatch(citusTableOid, shardRelationId)) + { + /* There is a difference between the attributes of the citus + * table and the shard table. This can happen if there is a DROP + * COLUMN on the citus table. In this case, we cannot + * convert the query to a shard query, so clean up and return. + */ + UnlockRelationOid(shardRelationId, citusTableRte->rellockmode); + pfree(shardRelationName); + ereport(DEBUG2, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("shard table \"%s\" does not match the " + "distributed table \"%s\"", + shardRelationName, citusTableName))); + return false; + } + /* Change the target list entries that reference the original citus table's relation id */ ListCell *lc = NULL; foreach(lc, query->targetList) @@ -2071,7 +2158,6 @@ ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId) } } - /* Change the range table entry's oid to that of the shard's */ Assert(shardRelationId != InvalidOid); citusTableRte->relid = shardRelationId; @@ -2084,7 +2170,7 @@ ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId) rtePermInfo->relid = shardRelationId; #endif - return query; + return true; } diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index e834c1470..1e47609a0 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -289,7 +289,8 @@ CopyTaskQuery(Task *newnode, Task *from) case TASK_QUERY_LOCAL_PLAN: { - COPY_NODE_FIELD(taskQuery.data.localPlan); + COPY_NODE_FIELD(localPlan); + COPY_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing); break; } diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 7ac49f920..efcdb3032 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -27,7 +27,7 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryStringList(Task *task, List *queryStringList); -extern void SetTaskQueryPlan(Task *task, PlannedStmt *localPlan); +extern void SetTaskQueryPlan(Task *task, Query *query, PlannedStmt *localPlan); extern char * TaskQueryString(Task *task); extern PlannedStmt * TaskQueryLocalPlan(Task *task); extern char * TaskQueryStringAtIndex(Task *task, int index); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 9c7cb0b90..9571ec84b 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -105,11 +105,6 @@ typedef struct FastPathRestrictionContext * known if the shard is local */ bool delayFastPathPlanning; - - /* - * Range table entry for the table we're querying - */ - RangeTblEntry *distTableRte; } FastPathRestrictionContext; typedef struct PlannerRestrictionContext diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index c73b78227..bb68f1636 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -220,8 +220,6 @@ typedef struct TaskQuery * when we want to access each query string. */ List *queryStringList; - - PlannedStmt *localPlan; /* only applies to local tasks */ }data; }TaskQuery; @@ -337,6 +335,8 @@ typedef struct Task Const *partitionKeyValue; int colocationId; + + PlannedStmt *localPlan; /* only applies to local tasks */ } Task;