From 232ad4735bd54e4dc5c3af2053b6425852218818 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 28 Jul 2022 16:24:17 +0200 Subject: [PATCH] Fix issues with insert..select casts and column ordering --- .../executor/insert_select_executor.c | 95 --- .../planner/insert_select_planner.c | 187 ++++- .../planner/multi_router_planner.c | 39 +- .../distributed/insert_select_executor.h | 1 - .../expected/coordinator_shouldhaveshards.out | 10 +- .../insert_select_into_local_table.out | 758 +++++++++++++++++- .../expected/insert_select_repartition.out | 2 +- src/test/regress/expected/multi_explain.out | 25 +- .../regress/expected/multi_insert_select.out | 45 +- .../mx_coordinator_shouldhaveshards.out | 64 -- .../sql/insert_select_into_local_table.sql | 447 +++++++++++ src/test/regress/sql/multi_insert_select.sql | 34 + 12 files changed, 1487 insertions(+), 220 deletions(-) diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 618dbab86..5c4e298cf 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -55,7 +55,6 @@ bool EnableRepartitionedInsertSelect = true; -static Query * WrapSubquery(Query *subquery); static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, char *resultIdPrefix); static void ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, @@ -299,100 +298,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node) } -/* - * BuildSelectForInsertSelect extracts the SELECT part from an INSERT...SELECT query. - * If the INSERT...SELECT has CTEs then these are added to the resulting SELECT instead. - */ -Query * -BuildSelectForInsertSelect(Query *insertSelectQuery) -{ - RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); - Query *selectQuery = selectRte->subquery; - - /* - * Wrap the SELECT as a subquery if the INSERT...SELECT has CTEs or the SELECT - * has top-level set operations. - * - * We could simply wrap all queries, but that might create a subquery that is - * not supported by the logical planner. Since the logical planner also does - * not support CTEs and top-level set operations, we can wrap queries containing - * those without breaking anything. - */ - if (list_length(insertSelectQuery->cteList) > 0) - { - selectQuery = WrapSubquery(selectRte->subquery); - - /* copy CTEs from the INSERT ... SELECT statement into outer SELECT */ - selectQuery->cteList = copyObject(insertSelectQuery->cteList); - selectQuery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE; - } - else if (selectQuery->setOperations != NULL) - { - /* top-level set operations confuse the ReorderInsertSelectTargetLists logic */ - selectQuery = WrapSubquery(selectRte->subquery); - } - - return selectQuery; -} - - -/* - * WrapSubquery wraps the given query as a subquery in a newly constructed - * "SELECT * FROM (...subquery...) citus_insert_select_subquery" query. - */ -static Query * -WrapSubquery(Query *subquery) -{ - ParseState *pstate = make_parsestate(NULL); - List *newTargetList = NIL; - - Query *outerQuery = makeNode(Query); - outerQuery->commandType = CMD_SELECT; - - /* create range table entries */ - Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL); - RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem( - addRangeTableEntryForSubquery( - pstate, subquery, - selectAlias, false, true)); - outerQuery->rtable = list_make1(newRangeTableEntry); - - /* set the FROM expression to the subquery */ - RangeTblRef *newRangeTableRef = makeNode(RangeTblRef); - newRangeTableRef->rtindex = 1; - outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL); - - /* create a target list that matches the SELECT */ - TargetEntry *selectTargetEntry = NULL; - foreach_ptr(selectTargetEntry, subquery->targetList) - { - /* exactly 1 entry in FROM */ - int indexInRangeTable = 1; - - if (selectTargetEntry->resjunk) - { - continue; - } - - Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno, - exprType((Node *) selectTargetEntry->expr), - exprTypmod((Node *) selectTargetEntry->expr), - exprCollation((Node *) selectTargetEntry->expr), 0); - - TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar, - selectTargetEntry->resno, - selectTargetEntry->resname, - selectTargetEntry->resjunk); - - newTargetList = lappend(newTargetList, newSelectTargetEntry); - } - - outerQuery->targetList = newTargetList; - - return outerQuery; -} - - /* * TwoPhaseInsertSelectTaskList generates a list of tasks for a query that * inserts into a target relation and selects from a set of co-located diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 55559ce58..7d4e2e58e 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -48,8 +48,10 @@ #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/rel.h" +#include +static void PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery); static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery, PlannerRestrictionContext * @@ -83,6 +85,7 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams); static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery); +static Query * WrapSubquery(Query *subquery); static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList); static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, Oid targetRelationId); @@ -370,14 +373,17 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, * combineQuery, this function also creates a dummy combineQuery for that. */ DistributedPlan * -CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamListInfo - boundParams, bool hasUnresolvedParams, +CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery, + ParamListInfo boundParams, bool hasUnresolvedParams, PlannerRestrictionContext *plannerRestrictionContext) { - RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(originalQuery); + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); + + PrepareInsertSelectForCitusPlanner(insertSelectQuery); + + /* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */ + Query *selectQuery = selectRte->subquery; - Query *selectQuery = BuildSelectForInsertSelect(originalQuery); - originalQuery->cteList = NIL; DistributedPlan *distPlan = CreateDistributedPlan(planId, selectQuery, copyObject(selectQuery), boundParams, hasUnresolvedParams, @@ -417,12 +423,84 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamL * distributed select instead of returning it. */ selectRte->subquery = distPlan->combineQuery; - distPlan->combineQuery = originalQuery; + distPlan->combineQuery = insertSelectQuery; return distPlan; } +/* + * PrepareInsertSelectForCitusPlanner prepares an INSERT..SELECT query tree + * that was passed to the planner for use by Citus. + * + * First, it rebuilds the target lists of the INSERT and the SELECT + * to be in the same order, which is not guaranteed in the parse tree. + * + * Second, some of the constants in the target list will have type + * "unknown", which would confuse the Citus planner. To address that, + * we add casts to SELECT target list entries whose type does not correspond + * to the destination. This also helps us feed the output directly into + * a COPY stream for INSERT..SELECT via coordinator. + * + * In case of UNION or other set operations, the SELECT does not have a + * clearly defined target list, so we first wrap the UNION in a subquery. + * UNION queries do not have the "unknown" type problem. + * + * Finally, if the INSERT has CTEs, we move those CTEs into the SELECT, + * such that we can plan the SELECT as an independent query. To ensure + * the ctelevelsup for CTE RTE's remain the same, we wrap the SELECT into + * a subquery, unless we already did so in case of a UNION. + */ +static void +PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery) +{ + RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery); + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); + Oid targetRelationId = insertRte->relid; + + bool isWrapped = false; + + if (selectRte->subquery->setOperations != NULL) + { + /* + * Prepare UNION query for reordering and adding casts by + * wrapping it in a subquery to have a single target list. + */ + selectRte->subquery = WrapSubquery(selectRte->subquery); + isWrapped = true; + } + + /* this is required for correct deparsing of the query */ + ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); + + /* + * Cast types of insert target list and select projection list to + * match the column types of the target relation. + */ + selectRte->subquery->targetList = + AddInsertSelectCasts(insertSelectQuery->targetList, + copyObject(selectRte->subquery->targetList), + targetRelationId); + + if (list_length(insertSelectQuery->cteList) > 0) + { + if (!isWrapped) + { + /* + * By wrapping the SELECT in a subquery, we can avoid adjusting + * ctelevelsup in RTE's that point to the CTEs. + */ + selectRte->subquery = WrapSubquery(selectRte->subquery); + } + + /* copy CTEs from the INSERT ... SELECT statement into outer SELECT */ + selectRte->subquery->cteList = copyObject(insertSelectQuery->cteList); + selectRte->subquery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE; + insertSelectQuery->cteList = NIL; + } +} + + /* * CreateCombineQueryForRouterPlan is used for creating a dummy combineQuery * for a router plan, since router plans normally don't have one. @@ -881,12 +959,11 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, ListCell *insertTargetEntryCell; List *newSubqueryTargetlist = NIL; List *newInsertTargetlist = NIL; + List *columnNameList = NIL; int resno = 1; - Index insertTableId = 1; + Index selectTableId = 2; int targetEntryIndex = 0; - AssertArg(InsertSelectIntoCitusTable(originalQuery)); - Query *subquery = subqueryRte->subquery; Oid insertRelationId = insertRte->relid; @@ -954,6 +1031,9 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, newSubqueryTargetEntry); } + Value *columnName = makeString(newSubqueryTargetEntry->resname); + columnNameList = lappend(columnNameList, columnName); + /* * The newly created select target entry cannot be a junk entry since junk * entries are not in the final target list and we're processing the @@ -961,7 +1041,7 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, */ Assert(!newSubqueryTargetEntry->resjunk); - Var *newInsertVar = makeVar(insertTableId, originalAttrNo, + Var *newInsertVar = makeVar(selectTableId, resno, exprType((Node *) newSubqueryTargetEntry->expr), exprTypmod((Node *) newSubqueryTargetEntry->expr), exprCollation((Node *) newSubqueryTargetEntry->expr), @@ -1005,6 +1085,7 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, originalQuery->targetList = newInsertTargetlist; subquery->targetList = newSubqueryTargetlist; + subqueryRte->eref->colnames = columnNameList; return NULL; } @@ -1412,19 +1493,10 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou return distributedPlan; } - Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery); + PrepareInsertSelectForCitusPlanner(insertSelectQuery); - selectRte->subquery = selectQuery; - ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); - - /* - * Cast types of insert target list and select projection list to - * match the column types of the target relation. - */ - selectQuery->targetList = - AddInsertSelectCasts(insertSelectQuery->targetList, - selectQuery->targetList, - targetRelationId); + /* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */ + Query *selectQuery = selectRte->subquery; /* * Later we might need to call WrapTaskListForProjection(), which requires @@ -1506,6 +1578,63 @@ InsertSelectResultIdPrefix(uint64 planId) } +/* + * WrapSubquery wraps the given query as a subquery in a newly constructed + * "SELECT * FROM (...subquery...) citus_insert_select_subquery" query. + */ +static Query * +WrapSubquery(Query *subquery) +{ + ParseState *pstate = make_parsestate(NULL); + List *newTargetList = NIL; + + Query *outerQuery = makeNode(Query); + outerQuery->commandType = CMD_SELECT; + + /* create range table entries */ + Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL); + RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem( + addRangeTableEntryForSubquery( + pstate, subquery, + selectAlias, false, true)); + outerQuery->rtable = list_make1(newRangeTableEntry); + + /* set the FROM expression to the subquery */ + RangeTblRef *newRangeTableRef = makeNode(RangeTblRef); + newRangeTableRef->rtindex = 1; + outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL); + + /* create a target list that matches the SELECT */ + TargetEntry *selectTargetEntry = NULL; + foreach_ptr(selectTargetEntry, subquery->targetList) + { + /* exactly 1 entry in FROM */ + int indexInRangeTable = 1; + + if (selectTargetEntry->resjunk) + { + continue; + } + + Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno, + exprType((Node *) selectTargetEntry->expr), + exprTypmod((Node *) selectTargetEntry->expr), + exprCollation((Node *) selectTargetEntry->expr), 0); + + TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar, + selectTargetEntry->resno, + selectTargetEntry->resname, + selectTargetEntry->resjunk); + + newTargetList = lappend(newTargetList, newSelectTargetEntry); + } + + outerQuery->targetList = newTargetList; + + return outerQuery; +} + + /* * RelabelTargetEntryList relabels select target list to have matching names with * insert target list. @@ -1557,16 +1686,22 @@ AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, { TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell); TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell); - Var *insertColumn = (Var *) insertEntry->expr; + Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor, insertEntry->resno - 1); - Oid sourceType = insertColumn->vartype; + Oid sourceType = exprType((Node *) selectEntry->expr); Oid targetType = attr->atttypid; if (sourceType != targetType) { - insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType, - attr->attcollation, attr->atttypmod); + /* ReorderInsertSelectTargetLists ensures we only have Vars */ + Assert(IsA(insertEntry->expr, Var)); + + /* we will cast the SELECT expression, so the type changes */ + Var *insertVar = (Var *) insertEntry->expr; + insertVar->vartype = targetType; + insertVar->vartypmod = attr->atttypmod; + insertVar->varcollid = attr->attcollation; /* * We cannot modify the selectEntry in-place, because ORDER BY or diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 9bf8766fd..c7a2d2777 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -3558,19 +3558,9 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query) NULL, NULL); } - if (contain_nextval_expression_walker((Node *) query->targetList, NULL)) - { - /* - * We let queries with nextval in the target list fall through to - * the logical planner, which knows how to handle those queries. - */ - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "Sequences cannot be used in router queries", - NULL, NULL); - } - bool hasPostgresOrCitusLocalTable = false; bool hasDistributedTable = false; + bool hasReferenceTable = false; ExtractRangeTableRelationWalker((Node *) query, &rangeTableRelationList); foreach(rangeTableRelationCell, rangeTableRelationList) @@ -3586,6 +3576,11 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query) hasPostgresOrCitusLocalTable = true; continue; } + else if (IsCitusTableType(distributedTableId, REFERENCE_TABLE)) + { + hasReferenceTable = true; + continue; + } else if (IsCitusTableType(distributedTableId, CITUS_LOCAL_TABLE)) { hasPostgresOrCitusLocalTable = true; @@ -3628,6 +3623,28 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query) } } + /* + * We want to make sure nextval happens on the coordinator / the current + * node, since the user may have certain expectations around the values + * produced by the sequence. We therefore cannot push down the nextval + * call as part of a router query. + * + * We let queries with nextval in the target list fall through to + * the logical planner, which will ensure that the nextval is called + * in the combine query on the coordinator. + * + * If there are no distributed or reference tables in the query, + * then the query will anyway happen on the coordinator, so we can + * allow nextval. + */ + if (contain_nextval_expression_walker((Node *) query->targetList, NULL) && + (hasDistributedTable || hasReferenceTable)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "Sequences cannot be used in router queries", + NULL, NULL); + } + /* local tables are not allowed if there are distributed tables */ if (hasPostgresOrCitusLocalTable && hasDistributedTable) { diff --git a/src/include/distributed/insert_select_executor.h b/src/include/distributed/insert_select_executor.h index bcfe29bfb..6e84b80f2 100644 --- a/src/include/distributed/insert_select_executor.h +++ b/src/include/distributed/insert_select_executor.h @@ -19,7 +19,6 @@ extern bool EnableRepartitionedInsertSelect; extern TupleTableSlot * NonPushableInsertSelectExecScan(CustomScanState *node); -extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery); extern bool IsSupportedRedistributionTarget(Oid targetRelationId); extern bool IsRedistributablePlan(Plan *selectPlan); diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index dcae09dbe..21738c381 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -847,8 +847,8 @@ HAVING (max(table_2.value) >= (SELECT value FROM a)); DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 DEBUG: push down of limit count: 1 -DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx @@ -856,9 +856,9 @@ DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint DEBUG: Subplan XXX_2 will be written to local file -NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 -NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 -NOTICE: executing the command locally: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +NOTICE: executing the command locally: SELECT count(*) AS auto_coerced_by_citus_0, (worker_column_1)::text AS auto_coerced_by_citus_1, worker_column_1 AS discarded_target_item_1, max(worker_column_2) AS worker_column_4 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 +NOTICE: executing the command locally: SELECT count(*) AS auto_coerced_by_citus_0, (worker_column_1)::text AS auto_coerced_by_citus_1, worker_column_1 AS discarded_target_item_1, max(worker_column_2) AS worker_column_4 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 +NOTICE: executing the command locally: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery NOTICE: executing the copy locally for shard xxxxx WITH stats AS ( SELECT count(key) m FROM table_1 diff --git a/src/test/regress/expected/insert_select_into_local_table.out b/src/test/regress/expected/insert_select_into_local_table.out index 79376f6a4..f53348272 100644 --- a/src/test/regress/expected/insert_select_into_local_table.out +++ b/src/test/regress/expected/insert_select_into_local_table.out @@ -149,6 +149,67 @@ SELECT * FROM non_dist_unique ORDER BY 1; 5 | 8 (5 rows) +INSERT INTO non_dist_unique +SELECT a+1, b FROM dist_table +UNION ALL +SELECT a+100, b FROM dist_table +ON CONFLICT (a) DO NOTHING; +SELECT * FROM non_dist_unique ORDER BY 1; + a | b +--------------------------------------------------------------------- + 1 | 6 + 2 | 7 + 3 | 14 + 4 | 15 + 5 | 8 + 101 | 6 + 102 | 7 + 103 | 8 +(8 rows) + +INSERT INTO non_dist_unique +SELECT a+1, b FROM dist_table +UNION ALL +SELECT a+100, b FROM dist_table +ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + 1; +SELECT * FROM non_dist_unique ORDER BY 1; + a | b +--------------------------------------------------------------------- + 1 | 6 + 2 | 7 + 3 | 8 + 4 | 9 + 5 | 8 + 101 | 7 + 102 | 8 + 103 | 9 +(8 rows) + +WITH cte1 AS (SELECT s FROM generate_series(1,10) s) +INSERT INTO non_dist_unique +WITH cte2 AS (SELECT s FROM generate_series(1,10) s) +SELECT a+1, b FROM dist_table WHERE b IN (SELECT s FROM cte1) +UNION ALL +SELECT s, s FROM cte1 +ON CONFLICT (a) DO NOTHING; +SELECT * FROM non_dist_unique ORDER BY 1; + a | b +--------------------------------------------------------------------- + 1 | 6 + 2 | 7 + 3 | 8 + 4 | 9 + 5 | 8 + 6 | 6 + 7 | 7 + 8 | 8 + 9 | 9 + 10 | 10 + 101 | 7 + 102 | 8 + 103 | 9 +(13 rows) + DROP TABLE non_dist_unique; -- test INSERT INTO a table with DEFAULT CREATE TABLE non_dist_default (a INT, c TEXT DEFAULT 'def'); @@ -168,6 +229,16 @@ SELECT * FROM non_dist_default ORDER BY 1, 2; 3 | def (3 rows) +SELECT alter_table_set_access_method('non_dist_default', 'columnar'); +NOTICE: creating a new table for insert_select_into_local_table.non_dist_default +NOTICE: moving the data of insert_select_into_local_table.non_dist_default +NOTICE: dropping the old insert_select_into_local_table.non_dist_default +NOTICE: renaming the new table to insert_select_into_local_table.non_dist_default + alter_table_set_access_method +--------------------------------------------------------------------- + +(1 row) + INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1; SELECT * FROM non_dist_default ORDER BY 1, 2; a | c @@ -354,6 +425,691 @@ SELECT * FROM non_dist_2 ORDER BY 1, 2; (3 rows) TRUNCATE non_dist_2; +-- check issue https://github.com/citusdata/citus/issues/5858 +CREATE TABLE local_dest_table( + col_1 integer, + col_2 integer, + col_3 text, + col_4 text, + drop_col text, + col_5 bigint, + col_6 text, + col_7 text default 'col_7', + col_8 varchar +); +ALTER TABLE local_dest_table DROP COLUMN drop_col; +CREATE TABLE dist_source_table_1( + int_col integer, + drop_col text, + text_col_1 text, + dist_col integer, + text_col_2 text +); +SELECT create_distributed_table('dist_source_table_1', 'dist_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE dist_source_table_1 DROP COLUMN drop_col; +INSERT INTO dist_source_table_1 VALUES (1, 'value', 1, 'value'); +INSERT INTO dist_source_table_1 VALUES (2, 'value2', 1, 'value'); +INSERT INTO dist_source_table_1 VALUES (3, 'value', 3, 'value3'); +CREATE TABLE dist_source_table_2( + dist_col integer, + int_col integer +); +SELECT create_distributed_table('dist_source_table_2', 'dist_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_source_table_2 VALUES (1, 1); +INSERT INTO dist_source_table_2 VALUES (2, 2); +INSERT INTO dist_source_table_2 VALUES (4, 4); +CREATE TABLE local_source_table_1 AS SELECT * FROM dist_source_table_1; +CREATE TABLE local_source_table_2 AS SELECT * FROM dist_source_table_2; +/* + * query_results_equal compares the effect of two queries on local_dest_table. + * We use this to ensure that INSERT INTO local_dest_table SELECT behaves + * the same when selecting from a regular table (postgres handles it) and + * a distributed table (Citus handles it). + * + * The queries are generated by calling format() on query_table twice, + * once for each source_table argument. + */ +CREATE OR REPLACE FUNCTION query_results_equal(query_template text, source_table_1 text, source_table_2 text) +RETURNS bool +AS $$ +DECLARE + l1 local_dest_table[]; + l2 local_dest_table[]; +BEGIN + /* get the results using source_table_1 as source */ + TRUNCATE local_dest_table; + EXECUTE format(query_template, source_table_1); + SELECT array_agg(l) INTO l1 + FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l; + + /* get the results using source_table_2 as source */ + TRUNCATE local_dest_table; + EXECUTE format(query_template, source_table_2); + SELECT array_agg(l) INTO l2 + FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l; + + RAISE NOTICE 'l2=%', l1; + RAISE NOTICE 'l2=%', l2; + RETURN l1 = l2; +END; +$$ LANGUAGE plpgsql; +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table + SELECT + t1.dist_col, + 1, + 'string1', + 'string2', + 2, + 'string3', + t1.text_col_1, + t1.text_col_2 + FROM %1$s_1 t1 + WHERE t1.int_col IN (SELECT int_col FROM %1$s_2) +$$, 'local_source_table', 'dist_source_table'); +NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table + SELECT + t1.dist_col, + 1, + 'string1', + 'string2', + 2, + 'string3', + t1.text_col_1, + t1.text_col_2 + FROM %1$s t1 + returning * +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_3, col_4) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_7, col_4) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_4, col_3) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + WHERE dist_col = 1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s + UNION ALL + SELECT + 'string', + int_col + FROM %1$s; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + WITH cte1 AS (SELECT s FROM generate_series(1,10) s) + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s WHERE int_col IN (SELECT s FROM cte1) + UNION ALL + SELECT + 'string', + int_col + FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1) +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s) + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s WHERE int_col IN (SELECT s FROM cte1) + UNION ALL + SELECT + * + FROM cte1 +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_3) + SELECT t1.text_col_1 + FROM %1$s t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8) + SELECT + max(t1.dist_col), + 3, + 'string_3', + 4, + 44, + t1.text_col_1, + 'string_1000' + FROM %1$s t1 + GROUP BY t1.text_col_2, t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,3,string_3,,4,44,value,string_1000)","(1,3,string_3,,4,44,value2,string_1000)","(3,3,string_3,,4,44,value,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,3,string_3,,4,44,value,string_1000)","(1,3,string_3,,4,44,value2,string_1000)","(3,3,string_3,,4,44,value,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_7, col_8) + SELECT + t1.text_col_1, + 'string_1000' + FROM dist_source_table_1 t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_6, col_7, col_8) + SELECT + 'string_4', + t1.text_col_1, + 'string_1000' + FROM %1$s t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_5, col_3) + SELECT 12, 'string_11' FROM %1$s t1 + UNION + SELECT int_col, 'string' FROM %1$s; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table(col_3, col_2) + SELECT text_col_1, count(*) FROM %1$s GROUP BY 1 +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,1,value2,,,,col_7,)","(,2,value,,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,1,value2,,,,col_7,)","(,2,value,,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table(col_3, col_5) + SELECT text_col_1, count(*)::int FROM %1$s GROUP BY 1 +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,value,,2,,col_7,)","(,,value2,,1,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,value,,2,,col_7,)","(,,value2,,1,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +-- repeat above tests with Citus local table +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table + SELECT + t1.dist_col, + 1, + 'string1', + 'string2', + 2, + 'string3', + t1.text_col_1, + t1.text_col_2 + FROM %1$s_1 t1 + WHERE t1.int_col IN (SELECT int_col FROM %1$s_2) +$$, 'local_source_table', 'dist_source_table'); +NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table + SELECT + t1.dist_col, + 1, + 'string1', + 'string2', + 2, + 'string3', + t1.text_col_1, + t1.text_col_2 + FROM %1$s t1 + returning * +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_3, col_4) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_7, col_4) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_4, col_3) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + WHERE dist_col = 1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s + UNION ALL + SELECT + 'string', + int_col + FROM %1$s; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + WITH cte1 AS (SELECT s FROM generate_series(1,10) s) + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s WHERE int_col IN (SELECT s FROM cte1) + UNION ALL + SELECT + 'string', + int_col + FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1) +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s) + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s WHERE int_col IN (SELECT s FROM cte1) + UNION ALL + SELECT + * + FROM cte1 +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_3) + SELECT t1.text_col_1 + FROM %1$s t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8) + SELECT + max(t1.dist_col), + 3, + 'string_3', + 4, + 44, + t1.text_col_1, + 'string_1000' + FROM %1$s t1 + GROUP BY t1.text_col_2, t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(1,3,string_3,,4,44,value,string_1000)","(1,3,string_3,,4,44,value2,string_1000)","(3,3,string_3,,4,44,value,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(1,3,string_3,,4,44,value,string_1000)","(1,3,string_3,,4,44,value2,string_1000)","(3,3,string_3,,4,44,value,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_7, col_8) + SELECT + t1.text_col_1, + 'string_1000' + FROM dist_source_table_1 t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_6, col_7, col_8) + SELECT + 'string_4', + t1.text_col_1, + 'string_1000' + FROM %1$s t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_5, col_3) + SELECT 12, 'string_11' FROM %1$s t1 + UNION + SELECT int_col, 'string' FROM %1$s; +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table(col_3, col_2) + SELECT text_col_1, count(*) FROM %1$s GROUP BY 1 +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,1,value2,,,,col_7,)","(,2,value,,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,1,value2,,,,col_7,)","(,2,value,,,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table(col_3, col_5) + SELECT text_col_1, count(*)::int FROM %1$s GROUP BY 1 +$$, 'local_source_table_1', 'dist_source_table_1'); +NOTICE: l2={"(,,value,,2,,col_7,)","(,,value2,,1,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE +NOTICE: l2={"(,,value,,2,,col_7,)","(,,value2,,1,,col_7,)"} +CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE + query_results_equal +--------------------------------------------------------------------- + t +(1 row) + +-- go back to proper local table for remaining tests +TRUNCATE local_dest_table; +SELECT undistribute_table('local_source_table_1'); +ERROR: cannot undistribute table because the table is not distributed +-- use a sequence (cannot use query_results_equal, since sequence values would not match) +CREATE SEQUENCE seq; +BEGIN; +INSERT INTO local_dest_table (col_5, col_3) +SELECT 12, 'string_11' FROM dist_source_table_1 +UNION +SELECT nextval('seq'), 'string' FROM dist_source_table_1; +SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8; + col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8 +--------------------------------------------------------------------- + | | string | | 1 | | col_7 | + | | string | | 2 | | col_7 | + | | string | | 3 | | col_7 | + | | string_11 | | 12 | | col_7 | +(4 rows) + +ROLLBACK; +-- add a bigserial column +ALTER TABLE local_dest_table ADD COLUMN col_9 bigserial; +-- not supported due to limitations in nextval handling +INSERT INTO local_dest_table (col_5, col_3) +SELECT 12, 'string_11' FROM dist_source_table_1 +UNION +SELECT 11, 'string' FROM dist_source_table_1; +SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8; + col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8 | col_9 +--------------------------------------------------------------------- + | | string | | 11 | | col_7 | | 2 + | | string_11 | | 12 | | col_7 | | 1 +(2 rows) + +BEGIN; +INSERT INTO local_dest_table(col_3, col_2) +SELECT text_col_1, count(*) FROM dist_source_table_1 GROUP BY 1; +SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8; + col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8 | col_9 +--------------------------------------------------------------------- + | 1 | value2 | | | | col_7 | | 3 + | 2 | value | | | | col_7 | | 4 + | | string | | 11 | | col_7 | | 2 + | | string_11 | | 12 | | col_7 | | 1 +(4 rows) + +ROLLBACK; +BEGIN; +INSERT INTO local_dest_table (col_4, col_3) SELECT + 'string1', + 'string2'::text +FROM dist_source_table_1 t1 +WHERE dist_col = 1 +RETURNING *; + col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8 | col_9 +--------------------------------------------------------------------- + | | string2 | string1 | | | col_7 | | 5 + | | string2 | string1 | | | col_7 | | 6 +(2 rows) + +ROLLBACK; \set VERBOSITY terse DROP SCHEMA insert_select_into_local_table CASCADE; -NOTICE: drop cascades to 5 other objects +NOTICE: drop cascades to 12 other objects diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index afa54b7e8..5d8f1981b 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -500,7 +500,7 @@ INSERT INTO target_table SELECT mapped_key, c FROM t NATURAL JOIN source_table; DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for CTE t: SELECT mapped_key, a, c FROM insert_select_repartition.source_table WHERE ((a)::double precision OPERATOR(pg_catalog.>) floor(random())) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key AS a, (c)::integer[] AS b FROM (SELECT t.mapped_key, t.c FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key AS a, auto_coerced_by_citus_1 AS b FROM (SELECT t.mapped_key, (t.c)::integer[] AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery DEBUG: performing repartitioned INSERT ... SELECT RESET client_min_messages; SELECT * FROM target_table ORDER BY a; diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index f382085bc..c1bf35aa6 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -1347,21 +1347,20 @@ WITH cte1 AS (SELECT * FROM cte1 WHERE EXISTS (SELECT * FROM cte1) LIMIT 5) SELECT s FROM cte1 WHERE EXISTS (SELECT * FROM cte1); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator - -> Subquery Scan on citus_insert_select_subquery + -> Result + One-Time Filter: $3 CTE cte1 -> Function Scan on generate_series s - -> Result - One-Time Filter: $3 - CTE cte1 - -> Limit - InitPlan 2 (returns $1) - -> CTE Scan on cte1 cte1_1 - -> Result - One-Time Filter: $1 - -> CTE Scan on cte1 cte1_2 - InitPlan 4 (returns $3) - -> CTE Scan on cte1 cte1_3 - -> CTE Scan on cte1 + CTE cte1 + -> Limit + InitPlan 2 (returns $1) + -> CTE Scan on cte1 cte1_1 + -> Result + One-Time Filter: $1 + -> CTE Scan on cte1 cte1_2 + InitPlan 4 (returns $3) + -> CTE Scan on cte1 cte1_3 + -> CTE Scan on cte1 EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part ( SELECT s FROM generate_series(1,5) s) UNION diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 99c5e8e56..112dd2fc9 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -667,7 +667,7 @@ DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for CTE fist_table_agg: SELECT (max(value_1) OPERATOR(pg_catalog.+) 1) AS v1_agg, user_id FROM public.raw_events_first GROUP BY user_id DEBUG: Router planner cannot handle multi-shard select queries -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, v1_agg AS value_1_agg FROM (SELECT fist_table_agg.v1_agg, fist_table_agg.user_id FROM (SELECT intermediate_result.v1_agg, intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v1_agg integer, user_id integer)) fist_table_agg) citus_insert_select_subquery +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, v1_agg AS value_1_agg FROM (SELECT fist_table_agg.user_id, fist_table_agg.v1_agg FROM (SELECT intermediate_result.v1_agg, intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v1_agg integer, user_id integer)) fist_table_agg) citus_insert_select_subquery DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator ROLLBACK; @@ -2712,7 +2712,6 @@ WITH top10 AS ( ) INSERT INTO dist_table_with_sequence (value_1) SELECT * FROM top10; -ERROR: cannot handle complex subqueries when the router executor is disabled SELECT * FROM dist_table_with_sequence ORDER BY user_id, value_1; user_id | value_1 --------------------------------------------------------------------- @@ -2797,7 +2796,6 @@ WITH top10 AS ( ) INSERT INTO dist_table_with_user_sequence (value_1) SELECT * FROM top10; -ERROR: cannot handle complex subqueries when the router executor is disabled SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1; user_id | value_1 --------------------------------------------------------------------- @@ -3234,6 +3232,47 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_first OFFSET 0 ON CONFLICT DO NOTHING; ABORT; +-- test fix for issue https://github.com/citusdata/citus/issues/5891 +CREATE TABLE dist_table_1( +dist_col integer, +int_col integer, +text_col_1 text, +text_col_2 text +); +SELECT create_distributed_table('dist_table_1', 'dist_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_table_1 VALUES (1, 1, 'string', 'string'); +CREATE TABLE dist_table_2( +dist_col integer, +int_col integer +); +SELECT create_distributed_table('dist_table_2', 'dist_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_table_2 VALUES (1, 1); +with a as (select random()) INSERT INTO dist_table_1 +SELECT +t1.dist_col, +1, +'string', +'string' +FROM a, dist_table_1 t1 +join dist_table_2 t2 using (dist_col) +limit 1 +returning text_col_1; + text_col_1 +--------------------------------------------------------------------- + string +(1 row) + +DROP TABLE dist_table_1, dist_table_2; -- wrap in a transaction to improve performance BEGIN; DROP TABLE coerce_events; diff --git a/src/test/regress/expected/mx_coordinator_shouldhaveshards.out b/src/test/regress/expected/mx_coordinator_shouldhaveshards.out index dd18951fe..fb27766ac 100644 --- a/src/test/regress/expected/mx_coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/mx_coordinator_shouldhaveshards.out @@ -161,70 +161,6 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator 0 (1 row) -<<<<<<< HEAD -======= -WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1) -SELECT count(*), -key -FROM a JOIN table_2_rep USING (key) -GROUP BY key -HAVING (max(table_2_rep.value) >= (SELECT value FROM a)); -DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1 -DEBUG: push down of limit count: 1 -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) -DEBUG: Subplan XXX_1 will be written to local file -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx - count | key ---------------------------------------------------------------------- - 1 | 1 -(1 row) - -WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1) -INSERT INTO table_1_rep SELECT count(*), -key -FROM a JOIN table_2_rep USING (key) -GROUP BY key -HAVING (max(table_2_rep.value) >= (SELECT value FROM a)); -DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries -DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1 -DEBUG: push down of limit count: 1 -DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery -DEBUG: Collecting INSERT ... SELECT results on coordinator -DEBUG: Subplan XXX_1 will be written to local file -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be written to local file -WITH stats AS ( - SELECT count(key) m FROM table_1_rep -), -inserts AS ( - INSERT INTO table_2_rep - SELECT key, count(*) - FROM table_1_rep - WHERE key >= (SELECT m FROM stats) - GROUP BY key - HAVING count(*) <= (SELECT m FROM stats) - LIMIT 1 - RETURNING * -) SELECT count(*) FROM inserts; -DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM mx_coordinator_shouldhaveshards.table_1_rep -DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO mx_coordinator_shouldhaveshards.table_2_rep (key, value) SELECT key, count(*) AS count FROM mx_coordinator_shouldhaveshards.table_1_rep WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2_rep.key, table_2_rep.value -DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries -DEBUG: push down of limit count: 1 -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts -DEBUG: Subplan XXX_1 will be written to local file -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be written to local file -DEBUG: Collecting INSERT ... SELECT results on coordinator - count ---------------------------------------------------------------------- - 0 -(1 row) - ->>>>>>> cff013a05... Fix issues with insert..select casts and column ordering \c - - - :master_port SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? diff --git a/src/test/regress/sql/insert_select_into_local_table.sql b/src/test/regress/sql/insert_select_into_local_table.sql index 21564f1f6..1b2b49a5d 100644 --- a/src/test/regress/sql/insert_select_into_local_table.sql +++ b/src/test/regress/sql/insert_select_into_local_table.sql @@ -64,6 +64,30 @@ INSERT INTO non_dist_unique SELECT a+1, b FROM dist_table ON CONFLICT (a) DO NOT SELECT * FROM non_dist_unique ORDER BY 1; INSERT INTO non_dist_unique SELECT a+2, b FROM dist_table ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + non_dist_unique.b; SELECT * FROM non_dist_unique ORDER BY 1; + +INSERT INTO non_dist_unique +SELECT a+1, b FROM dist_table +UNION ALL +SELECT a+100, b FROM dist_table +ON CONFLICT (a) DO NOTHING; +SELECT * FROM non_dist_unique ORDER BY 1; + +INSERT INTO non_dist_unique +SELECT a+1, b FROM dist_table +UNION ALL +SELECT a+100, b FROM dist_table +ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + 1; +SELECT * FROM non_dist_unique ORDER BY 1; + +WITH cte1 AS (SELECT s FROM generate_series(1,10) s) +INSERT INTO non_dist_unique +WITH cte2 AS (SELECT s FROM generate_series(1,10) s) +SELECT a+1, b FROM dist_table WHERE b IN (SELECT s FROM cte1) +UNION ALL +SELECT s, s FROM cte1 +ON CONFLICT (a) DO NOTHING; +SELECT * FROM non_dist_unique ORDER BY 1; + DROP TABLE non_dist_unique; @@ -73,6 +97,7 @@ INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a = 1; SELECT * FROM non_dist_default ORDER BY 1, 2; INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a > 1; SELECT * FROM non_dist_default ORDER BY 1, 2; +SELECT alter_table_set_access_method('non_dist_default', 'columnar'); INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1; SELECT * FROM non_dist_default ORDER BY 1, 2; INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a > 1; @@ -149,5 +174,427 @@ INSERT INTO non_dist_2 SELECT a, c FROM ref_table; SELECT * FROM non_dist_2 ORDER BY 1, 2; TRUNCATE non_dist_2; +-- check issue https://github.com/citusdata/citus/issues/5858 +CREATE TABLE local_dest_table( + col_1 integer, + col_2 integer, + col_3 text, + col_4 text, + drop_col text, + col_5 bigint, + col_6 text, + col_7 text default 'col_7', + col_8 varchar +); + +ALTER TABLE local_dest_table DROP COLUMN drop_col; + +CREATE TABLE dist_source_table_1( + int_col integer, + drop_col text, + text_col_1 text, + dist_col integer, + text_col_2 text +); +SELECT create_distributed_table('dist_source_table_1', 'dist_col'); + +ALTER TABLE dist_source_table_1 DROP COLUMN drop_col; + +INSERT INTO dist_source_table_1 VALUES (1, 'value', 1, 'value'); +INSERT INTO dist_source_table_1 VALUES (2, 'value2', 1, 'value'); +INSERT INTO dist_source_table_1 VALUES (3, 'value', 3, 'value3'); + +CREATE TABLE dist_source_table_2( + dist_col integer, + int_col integer +); +SELECT create_distributed_table('dist_source_table_2', 'dist_col'); + +INSERT INTO dist_source_table_2 VALUES (1, 1); +INSERT INTO dist_source_table_2 VALUES (2, 2); +INSERT INTO dist_source_table_2 VALUES (4, 4); + +CREATE TABLE local_source_table_1 AS SELECT * FROM dist_source_table_1; +CREATE TABLE local_source_table_2 AS SELECT * FROM dist_source_table_2; + +/* + * query_results_equal compares the effect of two queries on local_dest_table. + * We use this to ensure that INSERT INTO local_dest_table SELECT behaves + * the same when selecting from a regular table (postgres handles it) and + * a distributed table (Citus handles it). + * + * The queries are generated by calling format() on query_table twice, + * once for each source_table argument. + */ +CREATE OR REPLACE FUNCTION query_results_equal(query_template text, source_table_1 text, source_table_2 text) +RETURNS bool +AS $$ +DECLARE + l1 local_dest_table[]; + l2 local_dest_table[]; +BEGIN + /* get the results using source_table_1 as source */ + TRUNCATE local_dest_table; + EXECUTE format(query_template, source_table_1); + SELECT array_agg(l) INTO l1 + FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l; + + /* get the results using source_table_2 as source */ + TRUNCATE local_dest_table; + EXECUTE format(query_template, source_table_2); + SELECT array_agg(l) INTO l2 + FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l; + + RAISE NOTICE 'l2=%', l1; + RAISE NOTICE 'l2=%', l2; + RETURN l1 = l2; +END; +$$ LANGUAGE plpgsql; + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table + SELECT + t1.dist_col, + 1, + 'string1', + 'string2', + 2, + 'string3', + t1.text_col_1, + t1.text_col_2 + FROM %1$s_1 t1 + WHERE t1.int_col IN (SELECT int_col FROM %1$s_2) +$$, 'local_source_table', 'dist_source_table'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table + SELECT + t1.dist_col, + 1, + 'string1', + 'string2', + 2, + 'string3', + t1.text_col_1, + t1.text_col_2 + FROM %1$s t1 + returning * +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_3, col_4) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_7, col_4) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_4, col_3) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + WHERE dist_col = 1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s + UNION ALL + SELECT + 'string', + int_col + FROM %1$s; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + WITH cte1 AS (SELECT s FROM generate_series(1,10) s) + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s WHERE int_col IN (SELECT s FROM cte1) + UNION ALL + SELECT + 'string', + int_col + FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1) +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s) + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s WHERE int_col IN (SELECT s FROM cte1) + UNION ALL + SELECT + * + FROM cte1 +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_3) + SELECT t1.text_col_1 + FROM %1$s t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8) + SELECT + max(t1.dist_col), + 3, + 'string_3', + 4, + 44, + t1.text_col_1, + 'string_1000' + FROM %1$s t1 + GROUP BY t1.text_col_2, t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_7, col_8) + SELECT + t1.text_col_1, + 'string_1000' + FROM dist_source_table_1 t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_6, col_7, col_8) + SELECT + 'string_4', + t1.text_col_1, + 'string_1000' + FROM %1$s t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_5, col_3) + SELECT 12, 'string_11' FROM %1$s t1 + UNION + SELECT int_col, 'string' FROM %1$s; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table(col_3, col_2) + SELECT text_col_1, count(*) FROM %1$s GROUP BY 1 +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table(col_3, col_5) + SELECT text_col_1, count(*)::int FROM %1$s GROUP BY 1 +$$, 'local_source_table_1', 'dist_source_table_1'); + +-- repeat above tests with Citus local table +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table + SELECT + t1.dist_col, + 1, + 'string1', + 'string2', + 2, + 'string3', + t1.text_col_1, + t1.text_col_2 + FROM %1$s_1 t1 + WHERE t1.int_col IN (SELECT int_col FROM %1$s_2) +$$, 'local_source_table', 'dist_source_table'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table + SELECT + t1.dist_col, + 1, + 'string1', + 'string2', + 2, + 'string3', + t1.text_col_1, + t1.text_col_2 + FROM %1$s t1 + returning * +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_3, col_4) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_7, col_4) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_4, col_3) SELECT + 'string1', + 'string2'::text + FROM %1$s t1 + WHERE dist_col = 1 + returning *; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s + UNION ALL + SELECT + 'string', + int_col + FROM %1$s; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + WITH cte1 AS (SELECT s FROM generate_series(1,10) s) + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s WHERE int_col IN (SELECT s FROM cte1) + UNION ALL + SELECT + 'string', + int_col + FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1) +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s) + INSERT INTO local_dest_table (col_4, col_1) + SELECT + 'string1', + dist_col + FROM %1$s WHERE int_col IN (SELECT s FROM cte1) + UNION ALL + SELECT + * + FROM cte1 +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_3) + SELECT t1.text_col_1 + FROM %1$s t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8) + SELECT + max(t1.dist_col), + 3, + 'string_3', + 4, + 44, + t1.text_col_1, + 'string_1000' + FROM %1$s t1 + GROUP BY t1.text_col_2, t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_7, col_8) + SELECT + t1.text_col_1, + 'string_1000' + FROM dist_source_table_1 t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_6, col_7, col_8) + SELECT + 'string_4', + t1.text_col_1, + 'string_1000' + FROM %1$s t1 + GROUP BY t1.text_col_1; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table (col_5, col_3) + SELECT 12, 'string_11' FROM %1$s t1 + UNION + SELECT int_col, 'string' FROM %1$s; +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table(col_3, col_2) + SELECT text_col_1, count(*) FROM %1$s GROUP BY 1 +$$, 'local_source_table_1', 'dist_source_table_1'); + +SELECT * FROM query_results_equal($$ + INSERT INTO local_dest_table(col_3, col_5) + SELECT text_col_1, count(*)::int FROM %1$s GROUP BY 1 +$$, 'local_source_table_1', 'dist_source_table_1'); + +-- go back to proper local table for remaining tests +TRUNCATE local_dest_table; +SELECT undistribute_table('local_source_table_1'); + +-- use a sequence (cannot use query_results_equal, since sequence values would not match) +CREATE SEQUENCE seq; + +BEGIN; +INSERT INTO local_dest_table (col_5, col_3) +SELECT 12, 'string_11' FROM dist_source_table_1 +UNION +SELECT nextval('seq'), 'string' FROM dist_source_table_1; +SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8; +ROLLBACK; + +-- add a bigserial column +ALTER TABLE local_dest_table ADD COLUMN col_9 bigserial; + +-- not supported due to limitations in nextval handling +INSERT INTO local_dest_table (col_5, col_3) +SELECT 12, 'string_11' FROM dist_source_table_1 +UNION +SELECT 11, 'string' FROM dist_source_table_1; +SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8; + +BEGIN; +INSERT INTO local_dest_table(col_3, col_2) +SELECT text_col_1, count(*) FROM dist_source_table_1 GROUP BY 1; +SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8; +ROLLBACK; + +BEGIN; +INSERT INTO local_dest_table (col_4, col_3) SELECT + 'string1', + 'string2'::text +FROM dist_source_table_1 t1 +WHERE dist_col = 1 +RETURNING *; +ROLLBACK; + \set VERBOSITY terse DROP SCHEMA insert_select_into_local_table CASCADE; diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index 7de9f9763..cf87a4084 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -2335,6 +2335,40 @@ SELECT * FROM raw_events_first OFFSET 0 ON CONFLICT DO NOTHING; ABORT; +-- test fix for issue https://github.com/citusdata/citus/issues/5891 +CREATE TABLE dist_table_1( +dist_col integer, +int_col integer, +text_col_1 text, +text_col_2 text +); + +SELECT create_distributed_table('dist_table_1', 'dist_col'); + +INSERT INTO dist_table_1 VALUES (1, 1, 'string', 'string'); + +CREATE TABLE dist_table_2( +dist_col integer, +int_col integer +); + +SELECT create_distributed_table('dist_table_2', 'dist_col'); + +INSERT INTO dist_table_2 VALUES (1, 1); + +with a as (select random()) INSERT INTO dist_table_1 +SELECT +t1.dist_col, +1, +'string', +'string' +FROM a, dist_table_1 t1 +join dist_table_2 t2 using (dist_col) +limit 1 +returning text_col_1; + +DROP TABLE dist_table_1, dist_table_2; + -- wrap in a transaction to improve performance BEGIN; DROP TABLE coerce_events;