From 2892e07e32f534966113997e96c612797ee73ab0 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 21 Jul 2022 21:19:24 +0200 Subject: [PATCH] Fix other insert..select issues --- .../executor/insert_select_executor.c | 95 -------------- .../planner/insert_select_planner.c | 117 ++++++++++++++++-- .../distributed/insert_select_executor.h | 1 - .../expected/coordinator_shouldhaveshards.out | 10 +- .../expected/insert_select_repartition.out | 2 +- src/test/regress/expected/multi_explain.out | 25 ++-- .../regress/expected/multi_insert_select.out | 4 +- .../mx_coordinator_shouldhaveshards.out | 16 +-- 8 files changed, 133 insertions(+), 137 deletions(-) diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 338b03075..9549846d5 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -55,7 +55,6 @@ bool EnableRepartitionedInsertSelect = true; -static Query * WrapSubquery(Query *subquery); static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, char *resultIdPrefix); static void ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, @@ -299,100 +298,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node) } -/* - * BuildSelectForInsertSelect extracts the SELECT part from an INSERT...SELECT query. - * If the INSERT...SELECT has CTEs then these are added to the resulting SELECT instead. - */ -Query * -BuildSelectForInsertSelect(Query *insertSelectQuery) -{ - RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); - Query *selectQuery = selectRte->subquery; - - /* - * Wrap the SELECT as a subquery if the INSERT...SELECT has CTEs or the SELECT - * has top-level set operations. - * - * We could simply wrap all queries, but that might create a subquery that is - * not supported by the logical planner. Since the logical planner also does - * not support CTEs and top-level set operations, we can wrap queries containing - * those without breaking anything. - */ - if (list_length(insertSelectQuery->cteList) > 0) - { - selectQuery = WrapSubquery(selectRte->subquery); - - /* copy CTEs from the INSERT ... SELECT statement into outer SELECT */ - selectQuery->cteList = copyObject(insertSelectQuery->cteList); - selectQuery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE; - } - else if (selectQuery->setOperations != NULL) - { - /* top-level set operations confuse the ReorderInsertSelectTargetLists logic */ - selectQuery = WrapSubquery(selectRte->subquery); - } - - return selectQuery; -} - - -/* - * WrapSubquery wraps the given query as a subquery in a newly constructed - * "SELECT * FROM (...subquery...) citus_insert_select_subquery" query. - */ -static Query * -WrapSubquery(Query *subquery) -{ - ParseState *pstate = make_parsestate(NULL); - List *newTargetList = NIL; - - Query *outerQuery = makeNode(Query); - outerQuery->commandType = CMD_SELECT; - - /* create range table entries */ - Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL); - RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem( - addRangeTableEntryForSubquery( - pstate, subquery, - selectAlias, false, true)); - outerQuery->rtable = list_make1(newRangeTableEntry); - - /* set the FROM expression to the subquery */ - RangeTblRef *newRangeTableRef = makeNode(RangeTblRef); - newRangeTableRef->rtindex = 1; - outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL); - - /* create a target list that matches the SELECT */ - TargetEntry *selectTargetEntry = NULL; - foreach_ptr(selectTargetEntry, subquery->targetList) - { - /* exactly 1 entry in FROM */ - int indexInRangeTable = 1; - - if (selectTargetEntry->resjunk) - { - continue; - } - - Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno, - exprType((Node *) selectTargetEntry->expr), - exprTypmod((Node *) selectTargetEntry->expr), - exprCollation((Node *) selectTargetEntry->expr), 0); - - TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar, - selectTargetEntry->resno, - selectTargetEntry->resname, - selectTargetEntry->resjunk); - - newTargetList = lappend(newTargetList, newSelectTargetEntry); - } - - outerQuery->targetList = newTargetList; - - return outerQuery; -} - - /* * TwoPhaseInsertSelectTaskList generates a list of tasks for a query that * inserts into a target relation and selects from a set of co-located diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index b915ddc5b..e8f2288f8 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -83,6 +83,7 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams); static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery); +static Query * WrapSubquery(Query *subquery); static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList); static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, Oid targetRelationId); @@ -377,8 +378,17 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery, RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery); Oid targetRelationId = insertRte->relid; RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); + Query *selectQuery = selectRte->subquery; + bool hasSetOperations = false; - Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery); + if (selectQuery->setOperations != NULL) + { + selectQuery = selectRte->subquery = WrapSubquery(selectQuery); + hasSetOperations = true; + } + + /* this is required for correct deparsing of the query */ + ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); /* * Cast types of insert target list and select projection list to @@ -386,10 +396,21 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery, */ selectQuery->targetList = AddInsertSelectCasts(insertSelectQuery->targetList, - selectQuery->targetList, + copyObject(selectQuery->targetList), targetRelationId); - insertSelectQuery->cteList = NIL; + if (list_length(insertSelectQuery->cteList) > 0) + { + if (!hasSetOperations) + { + selectQuery = selectRte->subquery = WrapSubquery(selectQuery); + } + + /* copy CTEs from the INSERT ... SELECT statement into outer SELECT */ + selectQuery->cteList = copyObject(insertSelectQuery->cteList); + selectQuery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE; + insertSelectQuery->cteList = NIL; + } DistributedPlan *distPlan = CreateDistributedPlan(planId, selectQuery, copyObject(selectQuery), @@ -895,11 +916,9 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, List *newSubqueryTargetlist = NIL; List *newInsertTargetlist = NIL; int resno = 1; - Index insertTableId = 1; + Index selectTableId = 2; int targetEntryIndex = 0; - AssertArg(InsertSelectIntoCitusTable(originalQuery)); - Query *subquery = subqueryRte->subquery; Oid insertRelationId = insertRte->relid; @@ -974,7 +993,7 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, */ Assert(!newSubqueryTargetEntry->resjunk); - Var *newInsertVar = makeVar(insertTableId, originalAttrNo, + Var *newInsertVar = makeVar(selectTableId, resno, exprType((Node *) newSubqueryTargetEntry->expr), exprTypmod((Node *) newSubqueryTargetEntry->expr), exprCollation((Node *) newSubqueryTargetEntry->expr), @@ -1425,9 +1444,15 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou return distributedPlan; } - Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery); + Query *selectQuery = selectRte->subquery; + bool hasSetOperations = false; + + if (selectQuery->setOperations != NULL) + { + selectQuery = selectRte->subquery = WrapSubquery(selectQuery); + hasSetOperations = true; + } - selectRte->subquery = selectQuery; ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); /* @@ -1439,6 +1464,19 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou selectQuery->targetList, targetRelationId); + if (list_length(insertSelectQuery->cteList) > 0) + { + if (!hasSetOperations) + { + selectQuery = selectRte->subquery = WrapSubquery(selectQuery); + } + + /* copy CTEs from the INSERT ... SELECT statement into outer SELECT */ + selectQuery->cteList = copyObject(insertSelectQuery->cteList); + selectQuery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE; + insertSelectQuery->cteList = NIL; + } + /* * Later we might need to call WrapTaskListForProjection(), which requires * that select target list has unique names, otherwise the outer query @@ -1519,6 +1557,63 @@ InsertSelectResultIdPrefix(uint64 planId) } +/* + * WrapSubquery wraps the given query as a subquery in a newly constructed + * "SELECT * FROM (...subquery...) citus_insert_select_subquery" query. + */ +static Query * +WrapSubquery(Query *subquery) +{ + ParseState *pstate = make_parsestate(NULL); + List *newTargetList = NIL; + + Query *outerQuery = makeNode(Query); + outerQuery->commandType = CMD_SELECT; + + /* create range table entries */ + Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL); + RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem( + addRangeTableEntryForSubquery( + pstate, subquery, + selectAlias, false, true)); + outerQuery->rtable = list_make1(newRangeTableEntry); + + /* set the FROM expression to the subquery */ + RangeTblRef *newRangeTableRef = makeNode(RangeTblRef); + newRangeTableRef->rtindex = 1; + outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL); + + /* create a target list that matches the SELECT */ + TargetEntry *selectTargetEntry = NULL; + foreach_ptr(selectTargetEntry, subquery->targetList) + { + /* exactly 1 entry in FROM */ + int indexInRangeTable = 1; + + if (selectTargetEntry->resjunk) + { + continue; + } + + Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno, + exprType((Node *) selectTargetEntry->expr), + exprTypmod((Node *) selectTargetEntry->expr), + exprCollation((Node *) selectTargetEntry->expr), 0); + + TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar, + selectTargetEntry->resno, + selectTargetEntry->resname, + selectTargetEntry->resjunk); + + newTargetList = lappend(newTargetList, newSelectTargetEntry); + } + + outerQuery->targetList = newTargetList; + + return outerQuery; +} + + /* * RelabelTargetEntryList relabels select target list to have matching names with * insert target list. @@ -1565,8 +1660,8 @@ AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, Oid targetType = attr->atttypid; if (sourceType != targetType) { - insertEntry->expr = CastExpr(insertEntry->expr, sourceType, targetType, - attr->attcollation, attr->atttypmod); + insertEntry->expr = CastExpr(copyObject(insertEntry->expr), sourceType, + targetType, attr->attcollation, attr->atttypmod); /* * We cannot modify the selectEntry in-place, because ORDER BY or diff --git a/src/include/distributed/insert_select_executor.h b/src/include/distributed/insert_select_executor.h index bcfe29bfb..6e84b80f2 100644 --- a/src/include/distributed/insert_select_executor.h +++ b/src/include/distributed/insert_select_executor.h @@ -19,7 +19,6 @@ extern bool EnableRepartitionedInsertSelect; extern TupleTableSlot * NonPushableInsertSelectExecScan(CustomScanState *node); -extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery); extern bool IsSupportedRedistributionTarget(Oid targetRelationId); extern bool IsRedistributablePlan(Plan *selectPlan); diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 485e7f11b..dd93dad39 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -896,8 +896,8 @@ HAVING (max(table_2.value) >= (SELECT value FROM a)); DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 DEBUG: push down of limit count: 1 -DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx @@ -905,9 +905,9 @@ DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint DEBUG: Subplan XXX_2 will be written to local file -NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 -NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 -NOTICE: executing the command locally: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +NOTICE: executing the command locally: SELECT count(*) AS auto_coerced_by_citus_0, (worker_column_1)::text AS auto_coerced_by_citus_1, worker_column_1 AS discarded_target_item_1, max(worker_column_2) AS worker_column_4 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 +NOTICE: executing the command locally: SELECT count(*) AS auto_coerced_by_citus_0, (worker_column_1)::text AS auto_coerced_by_citus_1, worker_column_1 AS discarded_target_item_1, max(worker_column_2) AS worker_column_4 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 +NOTICE: executing the command locally: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery NOTICE: executing the copy locally for shard xxxxx WITH stats AS ( SELECT count(key) m FROM table_1 diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 856690159..f6e4f17a5 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -500,7 +500,7 @@ INSERT INTO target_table SELECT mapped_key, c FROM t NATURAL JOIN source_table; DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for CTE t: SELECT mapped_key, a, c FROM insert_select_repartition.source_table WHERE ((a)::double precision OPERATOR(pg_catalog.>) floor(random())) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key AS a, (c)::integer[] AS b FROM (SELECT t.mapped_key, t.c FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key AS a, auto_coerced_by_citus_1 AS b FROM (SELECT t.mapped_key, (t.c)::integer[] AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery DEBUG: performing repartitioned INSERT ... SELECT RESET client_min_messages; SELECT * FROM target_table ORDER BY a; diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index a7c53e293..8d4d00d36 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -1411,21 +1411,20 @@ WITH cte1 AS (SELECT * FROM cte1 WHERE EXISTS (SELECT * FROM cte1) LIMIT 5) SELECT s FROM cte1 WHERE EXISTS (SELECT * FROM cte1); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator - -> Subquery Scan on citus_insert_select_subquery + -> Result + One-Time Filter: $3 CTE cte1 -> Function Scan on generate_series s - -> Result - One-Time Filter: $3 - CTE cte1 - -> Limit - InitPlan 2 (returns $1) - -> CTE Scan on cte1 cte1_1 - -> Result - One-Time Filter: $1 - -> CTE Scan on cte1 cte1_2 - InitPlan 4 (returns $3) - -> CTE Scan on cte1 cte1_3 - -> CTE Scan on cte1 + CTE cte1 + -> Limit + InitPlan 2 (returns $1) + -> CTE Scan on cte1 cte1_1 + -> Result + One-Time Filter: $1 + -> CTE Scan on cte1 cte1_2 + InitPlan 4 (returns $3) + -> CTE Scan on cte1 cte1_3 + -> CTE Scan on cte1 EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part ( SELECT s FROM generate_series(1,5) s) UNION diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index c7679d02e..806d77578 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -669,7 +669,7 @@ DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for CTE fist_table_agg: SELECT (max(value_1) OPERATOR(pg_catalog.+) 1) AS v1_agg, user_id FROM public.raw_events_first GROUP BY user_id DEBUG: Router planner cannot handle multi-shard select queries -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, v1_agg AS value_1_agg FROM (SELECT fist_table_agg.v1_agg, fist_table_agg.user_id FROM (SELECT intermediate_result.v1_agg, intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v1_agg integer, user_id integer)) fist_table_agg) citus_insert_select_subquery +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, v1_agg AS value_1_agg FROM (SELECT fist_table_agg.user_id, fist_table_agg.v1_agg FROM (SELECT intermediate_result.v1_agg, intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v1_agg integer, user_id integer)) fist_table_agg) citus_insert_select_subquery DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator ROLLBACK; @@ -2714,7 +2714,6 @@ WITH top10 AS ( ) INSERT INTO dist_table_with_sequence (value_1) SELECT * FROM top10; -ERROR: cannot handle complex subqueries when the router executor is disabled SELECT * FROM dist_table_with_sequence ORDER BY user_id, value_1; user_id | value_1 --------------------------------------------------------------------- @@ -2799,7 +2798,6 @@ WITH top10 AS ( ) INSERT INTO dist_table_with_user_sequence (value_1) SELECT * FROM top10; -ERROR: cannot handle complex subqueries when the router executor is disabled SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1; user_id | value_1 --------------------------------------------------------------------- diff --git a/src/test/regress/expected/mx_coordinator_shouldhaveshards.out b/src/test/regress/expected/mx_coordinator_shouldhaveshards.out index ba6eb8dba..cbfe25281 100644 --- a/src/test/regress/expected/mx_coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/mx_coordinator_shouldhaveshards.out @@ -78,8 +78,8 @@ HAVING (max(table_2.value) >= (SELECT value FROM a)); DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 DEBUG: push down of limit count: 1 -DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx @@ -139,8 +139,8 @@ HAVING (max(table_2_rep.value) >= (SELECT value FROM a)); DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1 DEBUG: push down of limit count: 1 -DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx @@ -204,8 +204,8 @@ HAVING (max(table_2.value) >= (SELECT value FROM a)); DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 DEBUG: push down of limit count: 1 -DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx @@ -265,8 +265,8 @@ HAVING (max(table_2_rep.value) >= (SELECT value FROM a)); DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1 DEBUG: push down of limit count: 1 -DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx