Merge pull request #4800 from citusdata/marcocitus/fix-mod-cte

pull/4809/head
Marco Slot 2021-03-09 21:12:46 +01:00 committed by GitHub
commit 68a527ba17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 177 additions and 67 deletions

View File

@ -38,8 +38,9 @@
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/syscache.h" #include "utils/syscache.h"
static void UpdateTaskQueryString(Query *query, Oid distributedTableId,
RangeTblEntry *valuesRTE, Task *task); static void AddInsertAliasIfNeeded(Query *query);
static void UpdateTaskQueryString(Query *query, Task *task);
static bool ReplaceRelationConstraintByShardConstraint(List *relationShardList, static bool ReplaceRelationConstraintByShardConstraint(List *relationShardList,
OnConflictExpr *onConflict); OnConflictExpr *onConflict);
static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList); static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList);
@ -57,27 +58,43 @@ RebuildQueryStrings(Job *workerJob)
{ {
Query *originalQuery = workerJob->jobQuery; Query *originalQuery = workerJob->jobQuery;
List *taskList = workerJob->taskList; List *taskList = workerJob->taskList;
Oid relationId = ((RangeTblEntry *) linitial(originalQuery->rtable))->relid;
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery);
Task *task = NULL; Task *task = NULL;
if (originalQuery->commandType == CMD_INSERT)
{
AddInsertAliasIfNeeded(originalQuery);
}
foreach_ptr(task, taskList) foreach_ptr(task, taskList)
{ {
Query *query = originalQuery; Query *query = originalQuery;
if (UpdateOrDeleteQuery(query) && list_length(taskList) > 1) /*
* Copy the query if there are multiple tasks. If there is a single
* task, we scribble on the original query to avoid the copying
* overhead.
*/
if (list_length(taskList) > 1)
{ {
query = copyObject(originalQuery); query = copyObject(originalQuery);
} }
if (UpdateOrDeleteQuery(query))
{
/*
* For UPDATE and DELETE queries, we may have subqueries and joins, so
* we use relation shard list to update shard names and call
* pg_get_query_def() directly.
*/
List *relationShardList = task->relationShardList;
UpdateRelationToShardNames((Node *) query, relationShardList);
}
else if (query->commandType == CMD_INSERT && task->modifyWithSubquery) else if (query->commandType == CMD_INSERT && task->modifyWithSubquery)
{ {
/* for INSERT..SELECT, adjust shard names in SELECT part */ /* for INSERT..SELECT, adjust shard names in SELECT part */
List *relationShardList = task->relationShardList; List *relationShardList = task->relationShardList;
ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId); ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId);
query = copyObject(originalQuery);
RangeTblEntry *copiedInsertRte = ExtractResultRelationRTEOrError(query); RangeTblEntry *copiedInsertRte = ExtractResultRelationRTEOrError(query);
RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(query); RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(query);
Query *copiedSubquery = copiedSubqueryRte->subquery; Query *copiedSubquery = copiedSubqueryRte->subquery;
@ -90,29 +107,18 @@ RebuildQueryStrings(Job *workerJob)
ReorderInsertSelectTargetLists(query, copiedInsertRte, copiedSubqueryRte); ReorderInsertSelectTargetLists(query, copiedInsertRte, copiedSubqueryRte);
/* setting an alias simplifies deparsing of RETURNING */
if (copiedInsertRte->alias == NULL)
{
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
copiedInsertRte->alias = alias;
}
UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList); UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList);
} }
else if (query->commandType == CMD_INSERT && (query->onConflict != NULL ||
valuesRTE != NULL)) if (query->commandType == CMD_INSERT)
{ {
RangeTblEntry *modifiedRelationRTE = linitial(originalQuery->rtable);
/* /*
* Always an alias in UPSERTs and multi-row INSERTs to avoid * We store the modified relaiton ID in the task so we can lazily call
* deparsing issues (e.g. RETURNING might reference the original * deparse_shard_query when the string is needed
* table name, which has been replaced by a shard name).
*/ */
RangeTblEntry *rangeTableEntry = linitial(query->rtable); task->anchorDistributedTableId = modifiedRelationRTE->relid;
if (rangeTableEntry->alias == NULL)
{
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
rangeTableEntry->alias = alias;
}
} }
bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT || bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT ||
@ -122,7 +128,7 @@ RebuildQueryStrings(Job *workerJob)
? "(null)" ? "(null)"
: ApplyLogRedaction(TaskQueryString(task))))); : ApplyLogRedaction(TaskQueryString(task)))));
UpdateTaskQueryString(query, relationId, valuesRTE, task); UpdateTaskQueryString(query, task);
/* /*
* If parameters were resolved in the job query, then they are now also * If parameters were resolved in the job query, then they are now also
@ -137,53 +143,68 @@ RebuildQueryStrings(Job *workerJob)
/* /*
* UpdateTaskQueryString updates the query string stored within the provided * AddInsertAliasIfNeeded adds an alias in UPSERTs and multi-row INSERTs to avoid
* Task. If the Task has row values from a multi-row INSERT, those are injected * deparsing issues (e.g. RETURNING might reference the original table name,
* into the provided query (using the provided valuesRTE, which must belong to * which has been replaced by a shard name).
* the query) before deparse occurs (the query's full VALUES list will be
* restored before this function returns).
*/ */
static void static void
UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *valuesRTE, AddInsertAliasIfNeeded(Query *query)
Task *task) {
Assert(query->commandType == CMD_INSERT);
if (query->onConflict == NULL &&
ExtractDistributedInsertValuesRTE(query) == NULL)
{
/* simple single-row insert does not need an alias */
return;
}
RangeTblEntry *rangeTableEntry = linitial(query->rtable);
if (rangeTableEntry->alias != NULL)
{
/* INSERT already has an alias */
return;
}
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
rangeTableEntry->alias = alias;
}
/*
* UpdateTaskQueryString updates the query string stored within the provided
* Task. If the Task has row values from a multi-row INSERT, those are injected
* into the provided query before deparse occurs (the query's full VALUES list
* will be restored before this function returns).
*/
static void
UpdateTaskQueryString(Query *query, Task *task)
{ {
List *oldValuesLists = NIL; List *oldValuesLists = NIL;
RangeTblEntry *valuesRTE = NULL;
if (valuesRTE != NULL)
{
Assert(valuesRTE->rtekind == RTE_VALUES);
Assert(task->rowValuesLists != NULL);
oldValuesLists = valuesRTE->values_lists;
valuesRTE->values_lists = task->rowValuesLists;
}
if (query->commandType != CMD_INSERT)
{
/*
* For UPDATE and DELETE queries, we may have subqueries and joins, so
* we use relation shard list to update shard names and call
* pg_get_query_def() directly.
*/
List *relationShardList = task->relationShardList;
UpdateRelationToShardNames((Node *) query, relationShardList);
}
else if (ShouldLazyDeparseQuery(task))
{
/*
* not all insert queries are copied before calling this
* function, so we do it here
*/
query = copyObject(query);
}
if (query->commandType == CMD_INSERT) if (query->commandType == CMD_INSERT)
{ {
/* /* extract the VALUES from the INSERT */
* We store this in the task so we can lazily call valuesRTE = ExtractDistributedInsertValuesRTE(query);
* deparse_shard_query when the string is needed
*/ if (valuesRTE != NULL)
task->anchorDistributedTableId = distributedTableId; {
Assert(valuesRTE->rtekind == RTE_VALUES);
Assert(task->rowValuesLists != NULL);
oldValuesLists = valuesRTE->values_lists;
valuesRTE->values_lists = task->rowValuesLists;
}
if (ShouldLazyDeparseQuery(task))
{
/*
* not all insert queries are copied before calling this
* function, so we do it here
*/
query = copyObject(query);
}
} }
SetTaskQueryIfShouldLazyDeparse(task, query); SetTaskQueryIfShouldLazyDeparse(task, query);

View File

@ -288,6 +288,31 @@ DEBUG: query has a single distribution column value: 1
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
WITH update_article AS (
UPDATE articles_hash_mx SET word_count = 11 WHERE id = 1 AND word_count = 10 RETURNING *
)
SELECT coalesce(1,random());
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_1 for CTE update_article: UPDATE public.articles_hash_mx SET word_count = 11 WHERE ((id OPERATOR(pg_catalog.=) 1) AND (word_count OPERATOR(pg_catalog.=) 10)) RETURNING id, author_id, title, word_count
DEBUG: Creating router plan
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT COALESCE((1)::double precision, random()) AS "coalesce"
DEBUG: Creating router plan
coalesce
---------------------------------------------------------------------
1
(1 row)
WITH update_article AS (
UPDATE articles_hash_mx SET word_count = 10 WHERE author_id = 1 AND id = 1 AND word_count = 11 RETURNING *
)
SELECT coalesce(1,random());
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 1
coalesce
---------------------------------------------------------------------
1
(1 row)
-- recursive CTEs are supported when filtered on partition column -- recursive CTEs are supported when filtered on partition column
INSERT INTO company_employees_mx values(1, 1, 0); INSERT INTO company_employees_mx values(1, 1, 0);
DEBUG: Creating router plan DEBUG: Creating router plan

View File

@ -507,6 +507,45 @@ DEBUG: Creating router plan
1 | 1 | arsenous | 10 1 | 1 | arsenous | 10
(1 row) (1 row)
WITH update_article AS (
UPDATE articles_hash SET word_count = 11 WHERE id = 1 AND word_count = 10 RETURNING *
)
SELECT coalesce(1,random());
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_1 for CTE update_article: UPDATE public.articles_hash SET word_count = 11 WHERE ((id OPERATOR(pg_catalog.=) 1) AND (word_count OPERATOR(pg_catalog.=) 10)) RETURNING id, author_id, title, word_count
DEBUG: Creating router plan
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT COALESCE((1)::double precision, random()) AS "coalesce"
DEBUG: Creating router plan
coalesce
---------------------------------------------------------------------
1
(1 row)
WITH update_article AS (
UPDATE articles_hash SET word_count = 10 WHERE author_id = 1 AND id = 1 AND word_count = 11 RETURNING *
)
SELECT coalesce(1,random());
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 1
coalesce
---------------------------------------------------------------------
1
(1 row)
WITH update_article AS (
UPDATE authors_reference SET name = '' WHERE id = 0 RETURNING *
)
SELECT coalesce(1,random());
DEBUG: cannot router plan modification of a non-distributed table
DEBUG: generating subplan XXX_1 for CTE update_article: UPDATE public.authors_reference SET name = ''::character varying WHERE (id OPERATOR(pg_catalog.=) 0) RETURNING name, id
DEBUG: Creating router plan
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT COALESCE((1)::double precision, random()) AS "coalesce"
DEBUG: Creating router plan
coalesce
---------------------------------------------------------------------
1
(1 row)
WITH delete_article AS ( WITH delete_article AS (
DELETE FROM articles_hash WHERE id = 1 AND word_count = 10 RETURNING * DELETE FROM articles_hash WHERE id = 1 AND word_count = 10 RETURNING *
) )

View File

@ -147,6 +147,16 @@ WITH id_author AS ( SELECT id, author_id FROM articles_hash_mx WHERE author_id =
id_title AS (SELECT id, title from articles_hash_mx WHERE author_id = 2) id_title AS (SELECT id, title from articles_hash_mx WHERE author_id = 2)
SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id;
WITH update_article AS (
UPDATE articles_hash_mx SET word_count = 11 WHERE id = 1 AND word_count = 10 RETURNING *
)
SELECT coalesce(1,random());
WITH update_article AS (
UPDATE articles_hash_mx SET word_count = 10 WHERE author_id = 1 AND id = 1 AND word_count = 11 RETURNING *
)
SELECT coalesce(1,random());
-- recursive CTEs are supported when filtered on partition column -- recursive CTEs are supported when filtered on partition column
INSERT INTO company_employees_mx values(1, 1, 0); INSERT INTO company_employees_mx values(1, 1, 0);

View File

@ -280,6 +280,21 @@ WITH update_article AS (
) )
SELECT * FROM update_article; SELECT * FROM update_article;
WITH update_article AS (
UPDATE articles_hash SET word_count = 11 WHERE id = 1 AND word_count = 10 RETURNING *
)
SELECT coalesce(1,random());
WITH update_article AS (
UPDATE articles_hash SET word_count = 10 WHERE author_id = 1 AND id = 1 AND word_count = 11 RETURNING *
)
SELECT coalesce(1,random());
WITH update_article AS (
UPDATE authors_reference SET name = '' WHERE id = 0 RETURNING *
)
SELECT coalesce(1,random());
WITH delete_article AS ( WITH delete_article AS (
DELETE FROM articles_hash WHERE id = 1 AND word_count = 10 RETURNING * DELETE FROM articles_hash WHERE id = 1 AND word_count = 10 RETURNING *
) )