diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 052a89ba6..73666935e 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -38,8 +38,9 @@ #include "utils/rel.h" #include "utils/syscache.h" -static void UpdateTaskQueryString(Query *query, Oid distributedTableId, - RangeTblEntry *valuesRTE, Task *task); + +static void AddInsertAliasIfNeeded(Query *query); +static void UpdateTaskQueryString(Query *query, Task *task); static bool ReplaceRelationConstraintByShardConstraint(List *relationShardList, OnConflictExpr *onConflict); static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList); @@ -57,27 +58,43 @@ RebuildQueryStrings(Job *workerJob) { Query *originalQuery = workerJob->jobQuery; List *taskList = workerJob->taskList; - Oid relationId = ((RangeTblEntry *) linitial(originalQuery->rtable))->relid; - RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery); - Task *task = NULL; + if (originalQuery->commandType == CMD_INSERT) + { + AddInsertAliasIfNeeded(originalQuery); + } + foreach_ptr(task, taskList) { Query *query = originalQuery; - if (UpdateOrDeleteQuery(query) && list_length(taskList) > 1) + /* + * Copy the query if there are multiple tasks. If there is a single + * task, we scribble on the original query to avoid the copying + * overhead. + */ + if (list_length(taskList) > 1) { query = copyObject(originalQuery); } + + if (UpdateOrDeleteQuery(query)) + { + /* + * For UPDATE and DELETE queries, we may have subqueries and joins, so + * we use relation shard list to update shard names and call + * pg_get_query_def() directly. + */ + List *relationShardList = task->relationShardList; + UpdateRelationToShardNames((Node *) query, relationShardList); + } else if (query->commandType == CMD_INSERT && task->modifyWithSubquery) { /* for INSERT..SELECT, adjust shard names in SELECT part */ List *relationShardList = task->relationShardList; ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId); - query = copyObject(originalQuery); - RangeTblEntry *copiedInsertRte = ExtractResultRelationRTEOrError(query); RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(query); Query *copiedSubquery = copiedSubqueryRte->subquery; @@ -90,29 +107,18 @@ RebuildQueryStrings(Job *workerJob) ReorderInsertSelectTargetLists(query, copiedInsertRte, copiedSubqueryRte); - /* setting an alias simplifies deparsing of RETURNING */ - if (copiedInsertRte->alias == NULL) - { - Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); - copiedInsertRte->alias = alias; - } - UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList); } - else if (query->commandType == CMD_INSERT && (query->onConflict != NULL || - valuesRTE != NULL)) + + if (query->commandType == CMD_INSERT) { + RangeTblEntry *modifiedRelationRTE = linitial(originalQuery->rtable); + /* - * Always an alias in UPSERTs and multi-row INSERTs to avoid - * deparsing issues (e.g. RETURNING might reference the original - * table name, which has been replaced by a shard name). + * We store the modified relaiton ID in the task so we can lazily call + * deparse_shard_query when the string is needed */ - RangeTblEntry *rangeTableEntry = linitial(query->rtable); - if (rangeTableEntry->alias == NULL) - { - Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); - rangeTableEntry->alias = alias; - } + task->anchorDistributedTableId = modifiedRelationRTE->relid; } bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT || @@ -122,7 +128,7 @@ RebuildQueryStrings(Job *workerJob) ? "(null)" : ApplyLogRedaction(TaskQueryString(task))))); - UpdateTaskQueryString(query, relationId, valuesRTE, task); + UpdateTaskQueryString(query, task); /* * If parameters were resolved in the job query, then they are now also @@ -137,53 +143,68 @@ RebuildQueryStrings(Job *workerJob) /* - * UpdateTaskQueryString updates the query string stored within the provided - * Task. If the Task has row values from a multi-row INSERT, those are injected - * into the provided query (using the provided valuesRTE, which must belong to - * the query) before deparse occurs (the query's full VALUES list will be - * restored before this function returns). + * AddInsertAliasIfNeeded adds an alias in UPSERTs and multi-row INSERTs to avoid + * deparsing issues (e.g. RETURNING might reference the original table name, + * which has been replaced by a shard name). */ static void -UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *valuesRTE, - Task *task) +AddInsertAliasIfNeeded(Query *query) +{ + Assert(query->commandType == CMD_INSERT); + + if (query->onConflict == NULL && + ExtractDistributedInsertValuesRTE(query) == NULL) + { + /* simple single-row insert does not need an alias */ + return; + } + + RangeTblEntry *rangeTableEntry = linitial(query->rtable); + if (rangeTableEntry->alias != NULL) + { + /* INSERT already has an alias */ + return; + } + + Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); + rangeTableEntry->alias = alias; +} + + +/* + * UpdateTaskQueryString updates the query string stored within the provided + * Task. If the Task has row values from a multi-row INSERT, those are injected + * into the provided query before deparse occurs (the query's full VALUES list + * will be restored before this function returns). + */ +static void +UpdateTaskQueryString(Query *query, Task *task) { List *oldValuesLists = NIL; - - if (valuesRTE != NULL) - { - Assert(valuesRTE->rtekind == RTE_VALUES); - Assert(task->rowValuesLists != NULL); - - oldValuesLists = valuesRTE->values_lists; - valuesRTE->values_lists = task->rowValuesLists; - } - - if (query->commandType != CMD_INSERT) - { - /* - * For UPDATE and DELETE queries, we may have subqueries and joins, so - * we use relation shard list to update shard names and call - * pg_get_query_def() directly. - */ - List *relationShardList = task->relationShardList; - UpdateRelationToShardNames((Node *) query, relationShardList); - } - else if (ShouldLazyDeparseQuery(task)) - { - /* - * not all insert queries are copied before calling this - * function, so we do it here - */ - query = copyObject(query); - } + RangeTblEntry *valuesRTE = NULL; if (query->commandType == CMD_INSERT) { - /* - * We store this in the task so we can lazily call - * deparse_shard_query when the string is needed - */ - task->anchorDistributedTableId = distributedTableId; + /* extract the VALUES from the INSERT */ + valuesRTE = ExtractDistributedInsertValuesRTE(query); + + if (valuesRTE != NULL) + { + Assert(valuesRTE->rtekind == RTE_VALUES); + Assert(task->rowValuesLists != NULL); + + oldValuesLists = valuesRTE->values_lists; + valuesRTE->values_lists = task->rowValuesLists; + } + + if (ShouldLazyDeparseQuery(task)) + { + /* + * not all insert queries are copied before calling this + * function, so we do it here + */ + query = copyObject(query); + } } SetTaskQueryIfShouldLazyDeparse(task, query);