From 58f85f55c02d2034b24f5902340d9caec69da7f4 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 9 Mar 2021 01:08:15 +0100 Subject: [PATCH 1/2] Fixes a crash in queries with a modifying CTE and a SELECT without FROM --- .../distributed/planner/deparse_shard_query.c | 155 ++++++++++-------- 1 file changed, 88 insertions(+), 67 deletions(-) diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 052a89ba6..73666935e 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -38,8 +38,9 @@ #include "utils/rel.h" #include "utils/syscache.h" -static void UpdateTaskQueryString(Query *query, Oid distributedTableId, - RangeTblEntry *valuesRTE, Task *task); + +static void AddInsertAliasIfNeeded(Query *query); +static void UpdateTaskQueryString(Query *query, Task *task); static bool ReplaceRelationConstraintByShardConstraint(List *relationShardList, OnConflictExpr *onConflict); static RelationShard * FindRelationShard(Oid inputRelationId, List *relationShardList); @@ -57,27 +58,43 @@ RebuildQueryStrings(Job *workerJob) { Query *originalQuery = workerJob->jobQuery; List *taskList = workerJob->taskList; - Oid relationId = ((RangeTblEntry *) linitial(originalQuery->rtable))->relid; - RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery); - Task *task = NULL; + if (originalQuery->commandType == CMD_INSERT) + { + AddInsertAliasIfNeeded(originalQuery); + } + foreach_ptr(task, taskList) { Query *query = originalQuery; - if (UpdateOrDeleteQuery(query) && list_length(taskList) > 1) + /* + * Copy the query if there are multiple tasks. If there is a single + * task, we scribble on the original query to avoid the copying + * overhead. + */ + if (list_length(taskList) > 1) { query = copyObject(originalQuery); } + + if (UpdateOrDeleteQuery(query)) + { + /* + * For UPDATE and DELETE queries, we may have subqueries and joins, so + * we use relation shard list to update shard names and call + * pg_get_query_def() directly. + */ + List *relationShardList = task->relationShardList; + UpdateRelationToShardNames((Node *) query, relationShardList); + } else if (query->commandType == CMD_INSERT && task->modifyWithSubquery) { /* for INSERT..SELECT, adjust shard names in SELECT part */ List *relationShardList = task->relationShardList; ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId); - query = copyObject(originalQuery); - RangeTblEntry *copiedInsertRte = ExtractResultRelationRTEOrError(query); RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(query); Query *copiedSubquery = copiedSubqueryRte->subquery; @@ -90,29 +107,18 @@ RebuildQueryStrings(Job *workerJob) ReorderInsertSelectTargetLists(query, copiedInsertRte, copiedSubqueryRte); - /* setting an alias simplifies deparsing of RETURNING */ - if (copiedInsertRte->alias == NULL) - { - Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); - copiedInsertRte->alias = alias; - } - UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList); } - else if (query->commandType == CMD_INSERT && (query->onConflict != NULL || - valuesRTE != NULL)) + + if (query->commandType == CMD_INSERT) { + RangeTblEntry *modifiedRelationRTE = linitial(originalQuery->rtable); + /* - * Always an alias in UPSERTs and multi-row INSERTs to avoid - * deparsing issues (e.g. RETURNING might reference the original - * table name, which has been replaced by a shard name). + * We store the modified relaiton ID in the task so we can lazily call + * deparse_shard_query when the string is needed */ - RangeTblEntry *rangeTableEntry = linitial(query->rtable); - if (rangeTableEntry->alias == NULL) - { - Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); - rangeTableEntry->alias = alias; - } + task->anchorDistributedTableId = modifiedRelationRTE->relid; } bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT || @@ -122,7 +128,7 @@ RebuildQueryStrings(Job *workerJob) ? "(null)" : ApplyLogRedaction(TaskQueryString(task))))); - UpdateTaskQueryString(query, relationId, valuesRTE, task); + UpdateTaskQueryString(query, task); /* * If parameters were resolved in the job query, then they are now also @@ -137,53 +143,68 @@ RebuildQueryStrings(Job *workerJob) /* - * UpdateTaskQueryString updates the query string stored within the provided - * Task. If the Task has row values from a multi-row INSERT, those are injected - * into the provided query (using the provided valuesRTE, which must belong to - * the query) before deparse occurs (the query's full VALUES list will be - * restored before this function returns). + * AddInsertAliasIfNeeded adds an alias in UPSERTs and multi-row INSERTs to avoid + * deparsing issues (e.g. RETURNING might reference the original table name, + * which has been replaced by a shard name). */ static void -UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *valuesRTE, - Task *task) +AddInsertAliasIfNeeded(Query *query) +{ + Assert(query->commandType == CMD_INSERT); + + if (query->onConflict == NULL && + ExtractDistributedInsertValuesRTE(query) == NULL) + { + /* simple single-row insert does not need an alias */ + return; + } + + RangeTblEntry *rangeTableEntry = linitial(query->rtable); + if (rangeTableEntry->alias != NULL) + { + /* INSERT already has an alias */ + return; + } + + Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); + rangeTableEntry->alias = alias; +} + + +/* + * UpdateTaskQueryString updates the query string stored within the provided + * Task. If the Task has row values from a multi-row INSERT, those are injected + * into the provided query before deparse occurs (the query's full VALUES list + * will be restored before this function returns). + */ +static void +UpdateTaskQueryString(Query *query, Task *task) { List *oldValuesLists = NIL; - - if (valuesRTE != NULL) - { - Assert(valuesRTE->rtekind == RTE_VALUES); - Assert(task->rowValuesLists != NULL); - - oldValuesLists = valuesRTE->values_lists; - valuesRTE->values_lists = task->rowValuesLists; - } - - if (query->commandType != CMD_INSERT) - { - /* - * For UPDATE and DELETE queries, we may have subqueries and joins, so - * we use relation shard list to update shard names and call - * pg_get_query_def() directly. - */ - List *relationShardList = task->relationShardList; - UpdateRelationToShardNames((Node *) query, relationShardList); - } - else if (ShouldLazyDeparseQuery(task)) - { - /* - * not all insert queries are copied before calling this - * function, so we do it here - */ - query = copyObject(query); - } + RangeTblEntry *valuesRTE = NULL; if (query->commandType == CMD_INSERT) { - /* - * We store this in the task so we can lazily call - * deparse_shard_query when the string is needed - */ - task->anchorDistributedTableId = distributedTableId; + /* extract the VALUES from the INSERT */ + valuesRTE = ExtractDistributedInsertValuesRTE(query); + + if (valuesRTE != NULL) + { + Assert(valuesRTE->rtekind == RTE_VALUES); + Assert(task->rowValuesLists != NULL); + + oldValuesLists = valuesRTE->values_lists; + valuesRTE->values_lists = task->rowValuesLists; + } + + if (ShouldLazyDeparseQuery(task)) + { + /* + * not all insert queries are copied before calling this + * function, so we do it here + */ + query = copyObject(query); + } } SetTaskQueryIfShouldLazyDeparse(task, query); From 9c0d7f5c26108f5f9735bc08bfdd1275bf7b52bf Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 9 Mar 2021 01:18:41 +0100 Subject: [PATCH 2/2] Add tests for modifying CTE and SELECT without FROM --- .../expected/multi_mx_router_planner.out | 25 ++++++++++++ .../regress/expected/multi_router_planner.out | 39 +++++++++++++++++++ .../regress/sql/multi_mx_router_planner.sql | 10 +++++ src/test/regress/sql/multi_router_planner.sql | 15 +++++++ 4 files changed, 89 insertions(+) diff --git a/src/test/regress/expected/multi_mx_router_planner.out b/src/test/regress/expected/multi_mx_router_planner.out index bd82b4dd1..ee4c670bb 100644 --- a/src/test/regress/expected/multi_mx_router_planner.out +++ b/src/test/regress/expected/multi_mx_router_planner.out @@ -288,6 +288,31 @@ DEBUG: query has a single distribution column value: 1 --------------------------------------------------------------------- (0 rows) +WITH update_article AS ( + UPDATE articles_hash_mx SET word_count = 11 WHERE id = 1 AND word_count = 10 RETURNING * +) +SELECT coalesce(1,random()); +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generating subplan XXX_1 for CTE update_article: UPDATE public.articles_hash_mx SET word_count = 11 WHERE ((id OPERATOR(pg_catalog.=) 1) AND (word_count OPERATOR(pg_catalog.=) 10)) RETURNING id, author_id, title, word_count +DEBUG: Creating router plan +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT COALESCE((1)::double precision, random()) AS "coalesce" +DEBUG: Creating router plan + coalesce +--------------------------------------------------------------------- + 1 +(1 row) + +WITH update_article AS ( + UPDATE articles_hash_mx SET word_count = 10 WHERE author_id = 1 AND id = 1 AND word_count = 11 RETURNING * +) +SELECT coalesce(1,random()); +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 1 + coalesce +--------------------------------------------------------------------- + 1 +(1 row) + -- recursive CTEs are supported when filtered on partition column INSERT INTO company_employees_mx values(1, 1, 0); DEBUG: Creating router plan diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index c14aa68a5..11c52c29b 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -507,6 +507,45 @@ DEBUG: Creating router plan 1 | 1 | arsenous | 10 (1 row) +WITH update_article AS ( + UPDATE articles_hash SET word_count = 11 WHERE id = 1 AND word_count = 10 RETURNING * +) +SELECT coalesce(1,random()); +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generating subplan XXX_1 for CTE update_article: UPDATE public.articles_hash SET word_count = 11 WHERE ((id OPERATOR(pg_catalog.=) 1) AND (word_count OPERATOR(pg_catalog.=) 10)) RETURNING id, author_id, title, word_count +DEBUG: Creating router plan +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT COALESCE((1)::double precision, random()) AS "coalesce" +DEBUG: Creating router plan + coalesce +--------------------------------------------------------------------- + 1 +(1 row) + +WITH update_article AS ( + UPDATE articles_hash SET word_count = 10 WHERE author_id = 1 AND id = 1 AND word_count = 11 RETURNING * +) +SELECT coalesce(1,random()); +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 1 + coalesce +--------------------------------------------------------------------- + 1 +(1 row) + +WITH update_article AS ( + UPDATE authors_reference SET name = '' WHERE id = 0 RETURNING * +) +SELECT coalesce(1,random()); +DEBUG: cannot router plan modification of a non-distributed table +DEBUG: generating subplan XXX_1 for CTE update_article: UPDATE public.authors_reference SET name = ''::character varying WHERE (id OPERATOR(pg_catalog.=) 0) RETURNING name, id +DEBUG: Creating router plan +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT COALESCE((1)::double precision, random()) AS "coalesce" +DEBUG: Creating router plan + coalesce +--------------------------------------------------------------------- + 1 +(1 row) + WITH delete_article AS ( DELETE FROM articles_hash WHERE id = 1 AND word_count = 10 RETURNING * ) diff --git a/src/test/regress/sql/multi_mx_router_planner.sql b/src/test/regress/sql/multi_mx_router_planner.sql index b241439b0..504c0d602 100644 --- a/src/test/regress/sql/multi_mx_router_planner.sql +++ b/src/test/regress/sql/multi_mx_router_planner.sql @@ -147,6 +147,16 @@ WITH id_author AS ( SELECT id, author_id FROM articles_hash_mx WHERE author_id = id_title AS (SELECT id, title from articles_hash_mx WHERE author_id = 2) SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; +WITH update_article AS ( + UPDATE articles_hash_mx SET word_count = 11 WHERE id = 1 AND word_count = 10 RETURNING * +) +SELECT coalesce(1,random()); + +WITH update_article AS ( + UPDATE articles_hash_mx SET word_count = 10 WHERE author_id = 1 AND id = 1 AND word_count = 11 RETURNING * +) +SELECT coalesce(1,random()); + -- recursive CTEs are supported when filtered on partition column INSERT INTO company_employees_mx values(1, 1, 0); diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index 084525418..d70388b95 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -280,6 +280,21 @@ WITH update_article AS ( ) SELECT * FROM update_article; +WITH update_article AS ( + UPDATE articles_hash SET word_count = 11 WHERE id = 1 AND word_count = 10 RETURNING * +) +SELECT coalesce(1,random()); + +WITH update_article AS ( + UPDATE articles_hash SET word_count = 10 WHERE author_id = 1 AND id = 1 AND word_count = 11 RETURNING * +) +SELECT coalesce(1,random()); + +WITH update_article AS ( + UPDATE authors_reference SET name = '' WHERE id = 0 RETURNING * +) +SELECT coalesce(1,random()); + WITH delete_article AS ( DELETE FROM articles_hash WHERE id = 1 AND word_count = 10 RETURNING * )