mirror of https://github.com/citusdata/citus.git
Fixes a crash in queries with a modifying CTE and a SELECT without FROM
(cherry picked from commit 58f85f55c0
)
pull/5009/head
parent
18ab327c6c
commit
46e316881b
|
@ -38,8 +38,9 @@
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
|
||||||
static void UpdateTaskQueryString(Query *query, Oid distributedTableId,
|
|
||||||
RangeTblEntry *valuesRTE, Task *task);
|
static void AddInsertAliasIfNeeded(Query *query);
|
||||||
|
static void UpdateTaskQueryString(Query *query, Task *task);
|
||||||
static bool ReplaceRelationConstraintByShardConstraint(List *relationShardList,
|
static bool ReplaceRelationConstraintByShardConstraint(List *relationShardList,
|
||||||
OnConflictExpr *onConflict);
|
OnConflictExpr *onConflict);
|
||||||
static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList);
|
static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList);
|
||||||
|
@ -57,27 +58,43 @@ RebuildQueryStrings(Job *workerJob)
|
||||||
{
|
{
|
||||||
Query *originalQuery = workerJob->jobQuery;
|
Query *originalQuery = workerJob->jobQuery;
|
||||||
List *taskList = workerJob->taskList;
|
List *taskList = workerJob->taskList;
|
||||||
Oid relationId = ((RangeTblEntry *) linitial(originalQuery->rtable))->relid;
|
|
||||||
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery);
|
|
||||||
|
|
||||||
Task *task = NULL;
|
Task *task = NULL;
|
||||||
|
|
||||||
|
if (originalQuery->commandType == CMD_INSERT)
|
||||||
|
{
|
||||||
|
AddInsertAliasIfNeeded(originalQuery);
|
||||||
|
}
|
||||||
|
|
||||||
foreach_ptr(task, taskList)
|
foreach_ptr(task, taskList)
|
||||||
{
|
{
|
||||||
Query *query = originalQuery;
|
Query *query = originalQuery;
|
||||||
|
|
||||||
if (UpdateOrDeleteQuery(query) && list_length(taskList) > 1)
|
/*
|
||||||
|
* Copy the query if there are multiple tasks. If there is a single
|
||||||
|
* task, we scribble on the original query to avoid the copying
|
||||||
|
* overhead.
|
||||||
|
*/
|
||||||
|
if (list_length(taskList) > 1)
|
||||||
{
|
{
|
||||||
query = copyObject(originalQuery);
|
query = copyObject(originalQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (UpdateOrDeleteQuery(query))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* For UPDATE and DELETE queries, we may have subqueries and joins, so
|
||||||
|
* we use relation shard list to update shard names and call
|
||||||
|
* pg_get_query_def() directly.
|
||||||
|
*/
|
||||||
|
List *relationShardList = task->relationShardList;
|
||||||
|
UpdateRelationToShardNames((Node *) query, relationShardList);
|
||||||
|
}
|
||||||
else if (query->commandType == CMD_INSERT && task->modifyWithSubquery)
|
else if (query->commandType == CMD_INSERT && task->modifyWithSubquery)
|
||||||
{
|
{
|
||||||
/* for INSERT..SELECT, adjust shard names in SELECT part */
|
/* for INSERT..SELECT, adjust shard names in SELECT part */
|
||||||
List *relationShardList = task->relationShardList;
|
List *relationShardList = task->relationShardList;
|
||||||
ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId);
|
ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId);
|
||||||
|
|
||||||
query = copyObject(originalQuery);
|
|
||||||
|
|
||||||
RangeTblEntry *copiedInsertRte = ExtractResultRelationRTEOrError(query);
|
RangeTblEntry *copiedInsertRte = ExtractResultRelationRTEOrError(query);
|
||||||
RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(query);
|
RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(query);
|
||||||
Query *copiedSubquery = copiedSubqueryRte->subquery;
|
Query *copiedSubquery = copiedSubqueryRte->subquery;
|
||||||
|
@ -90,29 +107,18 @@ RebuildQueryStrings(Job *workerJob)
|
||||||
|
|
||||||
ReorderInsertSelectTargetLists(query, copiedInsertRte, copiedSubqueryRte);
|
ReorderInsertSelectTargetLists(query, copiedInsertRte, copiedSubqueryRte);
|
||||||
|
|
||||||
/* setting an alias simplifies deparsing of RETURNING */
|
|
||||||
if (copiedInsertRte->alias == NULL)
|
|
||||||
{
|
|
||||||
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
|
|
||||||
copiedInsertRte->alias = alias;
|
|
||||||
}
|
|
||||||
|
|
||||||
UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList);
|
UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList);
|
||||||
}
|
}
|
||||||
else if (query->commandType == CMD_INSERT && (query->onConflict != NULL ||
|
|
||||||
valuesRTE != NULL))
|
if (query->commandType == CMD_INSERT)
|
||||||
{
|
{
|
||||||
|
RangeTblEntry *modifiedRelationRTE = linitial(originalQuery->rtable);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Always an alias in UPSERTs and multi-row INSERTs to avoid
|
* We store the modified relaiton ID in the task so we can lazily call
|
||||||
* deparsing issues (e.g. RETURNING might reference the original
|
* deparse_shard_query when the string is needed
|
||||||
* table name, which has been replaced by a shard name).
|
|
||||||
*/
|
*/
|
||||||
RangeTblEntry *rangeTableEntry = linitial(query->rtable);
|
task->anchorDistributedTableId = modifiedRelationRTE->relid;
|
||||||
if (rangeTableEntry->alias == NULL)
|
|
||||||
{
|
|
||||||
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
|
|
||||||
rangeTableEntry->alias = alias;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT ||
|
bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT ||
|
||||||
|
@ -122,7 +128,7 @@ RebuildQueryStrings(Job *workerJob)
|
||||||
? "(null)"
|
? "(null)"
|
||||||
: ApplyLogRedaction(TaskQueryString(task)))));
|
: ApplyLogRedaction(TaskQueryString(task)))));
|
||||||
|
|
||||||
UpdateTaskQueryString(query, relationId, valuesRTE, task);
|
UpdateTaskQueryString(query, task);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If parameters were resolved in the job query, then they are now also
|
* If parameters were resolved in the job query, then they are now also
|
||||||
|
@ -137,17 +143,50 @@ RebuildQueryStrings(Job *workerJob)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* UpdateTaskQueryString updates the query string stored within the provided
|
* AddInsertAliasIfNeeded adds an alias in UPSERTs and multi-row INSERTs to avoid
|
||||||
* Task. If the Task has row values from a multi-row INSERT, those are injected
|
* deparsing issues (e.g. RETURNING might reference the original table name,
|
||||||
* into the provided query (using the provided valuesRTE, which must belong to
|
* which has been replaced by a shard name).
|
||||||
* the query) before deparse occurs (the query's full VALUES list will be
|
|
||||||
* restored before this function returns).
|
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *valuesRTE,
|
AddInsertAliasIfNeeded(Query *query)
|
||||||
Task *task)
|
{
|
||||||
|
Assert(query->commandType == CMD_INSERT);
|
||||||
|
|
||||||
|
if (query->onConflict == NULL &&
|
||||||
|
ExtractDistributedInsertValuesRTE(query) == NULL)
|
||||||
|
{
|
||||||
|
/* simple single-row insert does not need an alias */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
RangeTblEntry *rangeTableEntry = linitial(query->rtable);
|
||||||
|
if (rangeTableEntry->alias != NULL)
|
||||||
|
{
|
||||||
|
/* INSERT already has an alias */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
|
||||||
|
rangeTableEntry->alias = alias;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UpdateTaskQueryString updates the query string stored within the provided
|
||||||
|
* Task. If the Task has row values from a multi-row INSERT, those are injected
|
||||||
|
* into the provided query before deparse occurs (the query's full VALUES list
|
||||||
|
* will be restored before this function returns).
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
UpdateTaskQueryString(Query *query, Task *task)
|
||||||
{
|
{
|
||||||
List *oldValuesLists = NIL;
|
List *oldValuesLists = NIL;
|
||||||
|
RangeTblEntry *valuesRTE = NULL;
|
||||||
|
|
||||||
|
if (query->commandType == CMD_INSERT)
|
||||||
|
{
|
||||||
|
/* extract the VALUES from the INSERT */
|
||||||
|
valuesRTE = ExtractDistributedInsertValuesRTE(query);
|
||||||
|
|
||||||
if (valuesRTE != NULL)
|
if (valuesRTE != NULL)
|
||||||
{
|
{
|
||||||
|
@ -158,17 +197,7 @@ UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *value
|
||||||
valuesRTE->values_lists = task->rowValuesLists;
|
valuesRTE->values_lists = task->rowValuesLists;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (query->commandType != CMD_INSERT)
|
if (ShouldLazyDeparseQuery(task))
|
||||||
{
|
|
||||||
/*
|
|
||||||
* For UPDATE and DELETE queries, we may have subqueries and joins, so
|
|
||||||
* we use relation shard list to update shard names and call
|
|
||||||
* pg_get_query_def() directly.
|
|
||||||
*/
|
|
||||||
List *relationShardList = task->relationShardList;
|
|
||||||
UpdateRelationToShardNames((Node *) query, relationShardList);
|
|
||||||
}
|
|
||||||
else if (ShouldLazyDeparseQuery(task))
|
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* not all insert queries are copied before calling this
|
* not all insert queries are copied before calling this
|
||||||
|
@ -176,14 +205,6 @@ UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *value
|
||||||
*/
|
*/
|
||||||
query = copyObject(query);
|
query = copyObject(query);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (query->commandType == CMD_INSERT)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* We store this in the task so we can lazily call
|
|
||||||
* deparse_shard_query when the string is needed
|
|
||||||
*/
|
|
||||||
task->anchorDistributedTableId = distributedTableId;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SetTaskQueryIfShouldLazyDeparse(task, query);
|
SetTaskQueryIfShouldLazyDeparse(task, query);
|
||||||
|
|
Loading…
Reference in New Issue