mirror of https://github.com/citusdata/citus.git
Fixes a crash in queries with a modifying CTE and a SELECT without FROM
(cherry picked from commit 58f85f55c0
)
pull/5479/head
parent
a875449b60
commit
70fcddc0cb
|
@ -38,8 +38,9 @@
|
|||
#include "utils/rel.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
static void UpdateTaskQueryString(Query *query, Oid distributedTableId,
|
||||
RangeTblEntry *valuesRTE, Task *task);
|
||||
|
||||
static void AddInsertAliasIfNeeded(Query *query);
|
||||
static void UpdateTaskQueryString(Query *query, Task *task);
|
||||
static bool ReplaceRelationConstraintByShardConstraint(List *relationShardList,
|
||||
OnConflictExpr *onConflict);
|
||||
static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList);
|
||||
|
@ -57,27 +58,43 @@ RebuildQueryStrings(Job *workerJob)
|
|||
{
|
||||
Query *originalQuery = workerJob->jobQuery;
|
||||
List *taskList = workerJob->taskList;
|
||||
Oid relationId = ((RangeTblEntry *) linitial(originalQuery->rtable))->relid;
|
||||
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery);
|
||||
|
||||
Task *task = NULL;
|
||||
|
||||
if (originalQuery->commandType == CMD_INSERT)
|
||||
{
|
||||
AddInsertAliasIfNeeded(originalQuery);
|
||||
}
|
||||
|
||||
foreach_ptr(task, taskList)
|
||||
{
|
||||
Query *query = originalQuery;
|
||||
|
||||
if (UpdateOrDeleteQuery(query) && list_length(taskList) > 1)
|
||||
/*
|
||||
* Copy the query if there are multiple tasks. If there is a single
|
||||
* task, we scribble on the original query to avoid the copying
|
||||
* overhead.
|
||||
*/
|
||||
if (list_length(taskList) > 1)
|
||||
{
|
||||
query = copyObject(originalQuery);
|
||||
}
|
||||
|
||||
if (UpdateOrDeleteQuery(query))
|
||||
{
|
||||
/*
|
||||
* For UPDATE and DELETE queries, we may have subqueries and joins, so
|
||||
* we use relation shard list to update shard names and call
|
||||
* pg_get_query_def() directly.
|
||||
*/
|
||||
List *relationShardList = task->relationShardList;
|
||||
UpdateRelationToShardNames((Node *) query, relationShardList);
|
||||
}
|
||||
else if (query->commandType == CMD_INSERT && task->modifyWithSubquery)
|
||||
{
|
||||
/* for INSERT..SELECT, adjust shard names in SELECT part */
|
||||
List *relationShardList = task->relationShardList;
|
||||
ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId);
|
||||
|
||||
query = copyObject(originalQuery);
|
||||
|
||||
RangeTblEntry *copiedInsertRte = ExtractResultRelationRTEOrError(query);
|
||||
RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(query);
|
||||
Query *copiedSubquery = copiedSubqueryRte->subquery;
|
||||
|
@ -90,29 +107,18 @@ RebuildQueryStrings(Job *workerJob)
|
|||
|
||||
ReorderInsertSelectTargetLists(query, copiedInsertRte, copiedSubqueryRte);
|
||||
|
||||
/* setting an alias simplifies deparsing of RETURNING */
|
||||
if (copiedInsertRte->alias == NULL)
|
||||
{
|
||||
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
|
||||
copiedInsertRte->alias = alias;
|
||||
}
|
||||
|
||||
UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList);
|
||||
}
|
||||
else if (query->commandType == CMD_INSERT && (query->onConflict != NULL ||
|
||||
valuesRTE != NULL))
|
||||
|
||||
if (query->commandType == CMD_INSERT)
|
||||
{
|
||||
RangeTblEntry *modifiedRelationRTE = linitial(originalQuery->rtable);
|
||||
|
||||
/*
|
||||
* Always an alias in UPSERTs and multi-row INSERTs to avoid
|
||||
* deparsing issues (e.g. RETURNING might reference the original
|
||||
* table name, which has been replaced by a shard name).
|
||||
* We store the modified relaiton ID in the task so we can lazily call
|
||||
* deparse_shard_query when the string is needed
|
||||
*/
|
||||
RangeTblEntry *rangeTableEntry = linitial(query->rtable);
|
||||
if (rangeTableEntry->alias == NULL)
|
||||
{
|
||||
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
|
||||
rangeTableEntry->alias = alias;
|
||||
}
|
||||
task->anchorDistributedTableId = modifiedRelationRTE->relid;
|
||||
}
|
||||
|
||||
bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT ||
|
||||
|
@ -122,7 +128,7 @@ RebuildQueryStrings(Job *workerJob)
|
|||
? "(null)"
|
||||
: ApplyLogRedaction(TaskQueryString(task)))));
|
||||
|
||||
UpdateTaskQueryString(query, relationId, valuesRTE, task);
|
||||
UpdateTaskQueryString(query, task);
|
||||
|
||||
/*
|
||||
* If parameters were resolved in the job query, then they are now also
|
||||
|
@ -137,53 +143,68 @@ RebuildQueryStrings(Job *workerJob)
|
|||
|
||||
|
||||
/*
|
||||
* UpdateTaskQueryString updates the query string stored within the provided
|
||||
* Task. If the Task has row values from a multi-row INSERT, those are injected
|
||||
* into the provided query (using the provided valuesRTE, which must belong to
|
||||
* the query) before deparse occurs (the query's full VALUES list will be
|
||||
* restored before this function returns).
|
||||
* AddInsertAliasIfNeeded adds an alias in UPSERTs and multi-row INSERTs to avoid
|
||||
* deparsing issues (e.g. RETURNING might reference the original table name,
|
||||
* which has been replaced by a shard name).
|
||||
*/
|
||||
static void
|
||||
UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *valuesRTE,
|
||||
Task *task)
|
||||
AddInsertAliasIfNeeded(Query *query)
|
||||
{
|
||||
Assert(query->commandType == CMD_INSERT);
|
||||
|
||||
if (query->onConflict == NULL &&
|
||||
ExtractDistributedInsertValuesRTE(query) == NULL)
|
||||
{
|
||||
/* simple single-row insert does not need an alias */
|
||||
return;
|
||||
}
|
||||
|
||||
RangeTblEntry *rangeTableEntry = linitial(query->rtable);
|
||||
if (rangeTableEntry->alias != NULL)
|
||||
{
|
||||
/* INSERT already has an alias */
|
||||
return;
|
||||
}
|
||||
|
||||
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
|
||||
rangeTableEntry->alias = alias;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateTaskQueryString updates the query string stored within the provided
|
||||
* Task. If the Task has row values from a multi-row INSERT, those are injected
|
||||
* into the provided query before deparse occurs (the query's full VALUES list
|
||||
* will be restored before this function returns).
|
||||
*/
|
||||
static void
|
||||
UpdateTaskQueryString(Query *query, Task *task)
|
||||
{
|
||||
List *oldValuesLists = NIL;
|
||||
|
||||
if (valuesRTE != NULL)
|
||||
{
|
||||
Assert(valuesRTE->rtekind == RTE_VALUES);
|
||||
Assert(task->rowValuesLists != NULL);
|
||||
|
||||
oldValuesLists = valuesRTE->values_lists;
|
||||
valuesRTE->values_lists = task->rowValuesLists;
|
||||
}
|
||||
|
||||
if (query->commandType != CMD_INSERT)
|
||||
{
|
||||
/*
|
||||
* For UPDATE and DELETE queries, we may have subqueries and joins, so
|
||||
* we use relation shard list to update shard names and call
|
||||
* pg_get_query_def() directly.
|
||||
*/
|
||||
List *relationShardList = task->relationShardList;
|
||||
UpdateRelationToShardNames((Node *) query, relationShardList);
|
||||
}
|
||||
else if (ShouldLazyDeparseQuery(task))
|
||||
{
|
||||
/*
|
||||
* not all insert queries are copied before calling this
|
||||
* function, so we do it here
|
||||
*/
|
||||
query = copyObject(query);
|
||||
}
|
||||
RangeTblEntry *valuesRTE = NULL;
|
||||
|
||||
if (query->commandType == CMD_INSERT)
|
||||
{
|
||||
/*
|
||||
* We store this in the task so we can lazily call
|
||||
* deparse_shard_query when the string is needed
|
||||
*/
|
||||
task->anchorDistributedTableId = distributedTableId;
|
||||
/* extract the VALUES from the INSERT */
|
||||
valuesRTE = ExtractDistributedInsertValuesRTE(query);
|
||||
|
||||
if (valuesRTE != NULL)
|
||||
{
|
||||
Assert(valuesRTE->rtekind == RTE_VALUES);
|
||||
Assert(task->rowValuesLists != NULL);
|
||||
|
||||
oldValuesLists = valuesRTE->values_lists;
|
||||
valuesRTE->values_lists = task->rowValuesLists;
|
||||
}
|
||||
|
||||
if (ShouldLazyDeparseQuery(task))
|
||||
{
|
||||
/*
|
||||
* not all insert queries are copied before calling this
|
||||
* function, so we do it here
|
||||
*/
|
||||
query = copyObject(query);
|
||||
}
|
||||
}
|
||||
|
||||
SetTaskQueryIfShouldLazyDeparse(task, query);
|
||||
|
|
Loading…
Reference in New Issue