diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 57363ccd8..2b64773d3 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -87,11 +87,6 @@ static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan, boundParams, PlannerRestrictionContext * plannerRestrictionContext); -static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery, - Query *query, ParamListInfo boundParams, - bool hasUnresolvedParams, - PlannerRestrictionContext * - plannerRestrictionContext); static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId); @@ -193,21 +188,6 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) } else if (needsDistributedPlanning) { - /* - * Inserting into a local table needs to go through the regular postgres - * planner/executor, but the SELECT needs to go through Citus. We currently - * don't have a way of doing both things and therefore error out, but do - * have a handy tip for users. - */ - if (InsertSelectIntoLocalTable(parse)) - { - ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a " - "local table"), - errhint("Consider using CREATE TEMPORARY TABLE tmp AS " - "SELECT ... and inserting from the temporary " - "table."))); - } - /* * standard_planner scribbles on it's input, but for deparsing we need the * unmodified form. Note that before copying we call @@ -922,7 +902,7 @@ TryCreateDistributedPlannedStmt(PlannedStmt *localPlan, * - If any, go back to step 1 by calling itself recursively * 3. Logical planner */ -static DistributedPlan * +DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamListInfo boundParams, bool hasUnresolvedParams, PlannerRestrictionContext *plannerRestrictionContext) @@ -952,6 +932,22 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi distributedPlan = CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext); } + else if (InsertSelectIntoLocalTable(originalQuery)) + { + if (hasUnresolvedParams) + { + /* + * Unresolved parameters can cause performance regressions in + * INSERT...SELECT when the partition column is a parameter + * because we don't perform any additional pruning in the executor. + */ + return NULL; + } + distributedPlan = + CreateInsertSelectIntoLocalTablePlan(planId, originalQuery, boundParams, + hasUnresolvedParams, + plannerRestrictionContext); + } else { /* modifications are always routed through the same planner/executor */ diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 303b54d63..99676b4b5 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -13,11 +13,14 @@ #include "distributed/pg_version_constants.h" #include "catalog/pg_class.h" +#include "catalog/pg_type.h" #include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/errormessage.h" +#include "distributed/listutils.h" #include "distributed/log_utils.h" +#include "distributed/insert_select_executor.h" #include "distributed/insert_select_planner.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" @@ -60,6 +63,8 @@ static Task * RouterModifyTaskForShardInterval(Query *originalQuery, uint32 taskIdIndex, bool allRelationsJoinedOnPartitionKey, DeferredErrorMessage **routerPlannerError); +static Query * CreateMasterQueryForRouterPlan(DistributedPlan *distPlan); +static List * CreateTargetListForMasterQuery(List *targetList); static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, RangeTblEntry *subqueryRte, @@ -304,6 +309,176 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, } +/* + * CreateInsertSelectIntoLocalTablePlan creates the plan for INSERT .. SELECT queries + * where the selected table is distributed and the inserted table is not. + * + * To create the plan, this function first creates a distributed plan for the SELECT + * part. Then puts it as a subquery to the original (non-distributed) INSERT query as + * a subquery. Finally, it puts this INSERT query, which now has a distributed SELECT + * subquery, in the masterQuery. + * + * If the SELECT query is a router query, whose distributed plan does not have a + * masterQuery, this function also creates a dummy masterQuery for that. + */ +DistributedPlan * +CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamListInfo + boundParams, bool hasUnresolvedParams, + PlannerRestrictionContext *plannerRestrictionContext) +{ + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(originalQuery); + + Query *selectQuery = BuildSelectForInsertSelect(originalQuery); + originalQuery->cteList = NIL; + DistributedPlan *distPlan = CreateDistributedPlan(planId, selectQuery, + copyObject(selectQuery), + boundParams, hasUnresolvedParams, + plannerRestrictionContext); + + /* + * We don't expect distPlan to be NULL here because hasUnresolvedParams is + * already checked before this function and CreateDistributedPlan only returns + * NULL when there are unresolved parameters. + */ + Assert(distPlan != NULL); + + if (distPlan->planningError) + { + return distPlan; + } + + if (distPlan->masterQuery == NULL) + { + /* + * For router queries, we construct a synthetic master query that simply passes + * on the results of the remote tasks, which we can then use as the select in + * the INSERT .. SELECT. + */ + distPlan->masterQuery = CreateMasterQueryForRouterPlan( + distPlan); + } + + /* + * masterQuery of a distributed select is for combining the results from + * worker nodes on the coordinator node. Putting it as a subquery to the + * INSERT query, causes the INSERT query to insert the combined select value + * from the workers. And making the resulting insert query the masterQuery + * let's us execute this insert command. + * + * So this operation makes the master query insert the result of the + * distributed select instead of returning it. + */ + selectRte->subquery = distPlan->masterQuery; + distPlan->masterQuery = originalQuery; + + return distPlan; +} + + +/* + * CreateMasterQueryForRouterPlan is used for creating a dummy masterQuery + * for a router plan, since router plans normally don't have one. + */ +static Query * +CreateMasterQueryForRouterPlan(DistributedPlan *distPlan) +{ + const Index insertTableId = 1; + List *tableIdList = list_make1(makeInteger(insertTableId)); + Job *dependentJob = distPlan->workerJob; + List *dependentTargetList = dependentJob->jobQuery->targetList; + + /* compute column names for the derived table */ + uint32 columnCount = (uint32) list_length(dependentTargetList); + List *columnNameList = DerivedColumnNameList(columnCount, + dependentJob->jobId); + + List *funcColumnNames = NIL; + List *funcColumnTypes = NIL; + List *funcColumnTypeMods = NIL; + List *funcCollations = NIL; + + TargetEntry *targetEntry = NULL; + foreach_ptr(targetEntry, dependentTargetList) + { + Node *expr = (Node *) targetEntry->expr; + + char *name = targetEntry->resname; + if (name == NULL) + { + name = pstrdup("unnamed"); + } + + funcColumnNames = lappend(funcColumnNames, makeString(name)); + + funcColumnTypes = lappend_oid(funcColumnTypes, exprType(expr)); + funcColumnTypeMods = lappend_int(funcColumnTypeMods, exprTypmod(expr)); + funcCollations = lappend_oid(funcCollations, exprCollation(expr)); + } + + RangeTblEntry *rangeTableEntry = DerivedRangeTableEntry(NULL, + columnNameList, + tableIdList, + funcColumnNames, + funcColumnTypes, + funcColumnTypeMods, + funcCollations); + + List *targetList = CreateTargetListForMasterQuery(dependentTargetList); + + RangeTblRef *rangeTableRef = makeNode(RangeTblRef); + rangeTableRef->rtindex = 1; + + FromExpr *joinTree = makeNode(FromExpr); + joinTree->quals = NULL; + joinTree->fromlist = list_make1(rangeTableRef); + + Query *masterQuery = makeNode(Query); + masterQuery->commandType = CMD_SELECT; + masterQuery->querySource = QSRC_ORIGINAL; + masterQuery->canSetTag = true; + masterQuery->rtable = list_make1(rangeTableEntry); + masterQuery->targetList = targetList; + masterQuery->jointree = joinTree; + return masterQuery; +} + + +/* + * CreateTargetListForMasterQuery is used for creating a target list for + * master query. + */ +static List * +CreateTargetListForMasterQuery(List *targetList) +{ + List *newTargetEntryList = NIL; + const uint32 masterTableId = 1; + int columnId = 1; + + /* iterate over original target entries */ + TargetEntry *originalTargetEntry = NULL; + foreach_ptr(originalTargetEntry, targetList) + { + TargetEntry *newTargetEntry = flatCopyTargetEntry(originalTargetEntry); + + Var *column = makeVarFromTargetEntry(masterTableId, originalTargetEntry); + column->varattno = columnId; + column->varoattno = columnId; + columnId++; + + if (column->vartype == RECORDOID || column->vartype == RECORDARRAYOID) + { + column->vartypmod = BlessRecordExpression(originalTargetEntry->expr); + } + + Expr *newExpression = (Expr *) column; + + newTargetEntry->expr = newExpression; + newTargetEntryList = lappend(newTargetEntryList, newTargetEntry); + } + return newTargetEntryList; +} + + /* * DistributedInsertSelectSupported returns NULL if the INSERT ... SELECT query * is supported, or a description why not. diff --git a/src/backend/distributed/planner/multi_master_planner.c b/src/backend/distributed/planner/multi_master_planner.c index 456a74267..22baee4fd 100644 --- a/src/backend/distributed/planner/multi_master_planner.c +++ b/src/backend/distributed/planner/multi_master_planner.c @@ -17,6 +17,7 @@ #include "catalog/pg_type.h" #include "distributed/citus_ruleutils.h" +#include "distributed/insert_select_planner.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_master_planner.h" @@ -191,6 +192,37 @@ CitusCustomScanPathPlan(PlannerInfo *root, */ citusPath->remoteScan->scan.plan.targetlist = tlist; + /* + * The custom_scan_tlist contains target entries for to the "output" of the call + * to citus_extradata_container, which is actually replaced by a CustomScan. + * The target entries are initialized with varno 1 (see MasterTargetList), since + * it's currently the only relation in the join tree of the masterQuery. + * + * If the citus_extradata_container function call is not the first relation to + * appear in the flattened rtable for the entire plan, then varno is now pointing + * to the wrong relation and needs to be updated. + * + * Example: + * When the masterQuery field of the DistributedPlan is + * INSERT INTO local SELECT .. FROM citus_extradata_container. + * In that case the varno of citusdata_extradata_container should be 3, because + * it is preceded range table entries for "local" and the subquery. + */ + if (rel->relid != 1) + { + TargetEntry *targetEntry = NULL; + + foreach_ptr(targetEntry, citusPath->remoteScan->custom_scan_tlist) + { + /* we created this list, so we know it only contains Var */ + Assert(IsA(targetEntry->expr, Var)); + + Var *var = (Var *) targetEntry->expr; + + var->varno = rel->relid; + } + } + /* clauses might have been added by the planner, need to add them to our scan */ RestrictInfo *restrictInfo = NULL; List **quals = &citusPath->remoteScan->scan.plan.qual; diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index efaa7039e..1e98892e0 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -135,13 +135,6 @@ static RangeTblEntry * JoinRangeTableEntry(JoinExpr *joinExpr, List *dependentJo static int ExtractRangeTableId(Node *node); static void ExtractColumns(RangeTblEntry *rangeTableEntry, int rangeTableId, List *dependentJobList, List **columnNames, List **columnVars); -static RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *columnNames, - List *tableIdList, List *funcColumnNames, - List *funcColumnTypes, - List *funcColumnTypeMods, - List *funcCollations); - -static List * DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId); static Query * BuildSubqueryJobQuery(MultiNode *multiNode); static void UpdateAllColumnAttributes(Node *columnContainer, List *rangeTableList, List *dependentJobList); @@ -900,7 +893,7 @@ BaseRangeTableList(MultiNode *multiNode) * derived table either represents the output of a repartition job; or the data * on worker nodes in case of the master node query. */ -static RangeTblEntry * +RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *columnList, List *tableIdList, List *funcColumnNames, List *funcColumnTypes, List *funcColumnTypeMods, List *funcCollations) @@ -923,7 +916,7 @@ DerivedRangeTableEntry(MultiNode *multiNode, List *columnList, List *tableIdList * tables. These column names are then used when building the create stament * query string for derived tables. */ -static List * +List * DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId) { List *columnNameList = NIL; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 877878804..170dc526d 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -453,7 +453,7 @@ AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval RangeTblEntry * ExtractSelectRangeTableEntry(Query *query) { - Assert(InsertSelectIntoCitusTable(query)); + Assert(InsertSelectIntoCitusTable(query) || InsertSelectIntoLocalTable(query)); /* * Since we already asserted InsertSelectIntoCitusTable() it is safe to access diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 80501118e..e25ff0c9b 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -209,4 +209,12 @@ extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan); extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan, struct DistributedPlan *distributedPlan); + +extern struct DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery, + Query *query, ParamListInfo + boundParams, bool + hasUnresolvedParams, + PlannerRestrictionContext * + plannerRestrictionContext); + #endif /* DISTRIBUTED_PLANNER_H */ diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h index eca337536..84066278b 100644 --- a/src/include/distributed/insert_select_planner.h +++ b/src/include/distributed/insert_select_planner.h @@ -34,6 +34,13 @@ extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ance extern DistributedPlan * CreateInsertSelectPlan(uint64 planId, Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); +extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId, + Query *originalQuery, + ParamListInfo + boundParams, bool + hasUnresolvedParams, + PlannerRestrictionContext * + plannerRestrictionContext); extern char * InsertSelectResultIdPrefix(uint64 planId); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 63f5e42e3..b37cb9ef6 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -579,4 +579,13 @@ extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId, extern uint64 UniqueJobId(void); +extern List * DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId); +extern RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *columnList, + List *tableIdList, + List *funcColumnNames, + List *funcColumnTypes, + List *funcColumnTypeMods, + List *funcCollations); + + #endif /* MULTI_PHYSICAL_PLANNER_H */ diff --git a/src/test/regress/expected/insert_select_into_local_table.out b/src/test/regress/expected/insert_select_into_local_table.out new file mode 100644 index 000000000..79376f6a4 --- /dev/null +++ b/src/test/regress/expected/insert_select_into_local_table.out @@ -0,0 +1,359 @@ +CREATE SCHEMA insert_select_into_local_table; +SET search_path TO insert_select_into_local_table; +SET citus.shard_count = 4; +SET citus.next_shard_id TO 11235800; +CREATE TABLE dist_table (a INT, b INT, c TEXT); +SELECT create_distributed_table('dist_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_table VALUES (1, 6, 'txt1'), (2, 7, 'txt2'), (3, 8, 'txt3'); +CREATE TABLE non_dist_1 (a INT, b INT, c TEXT); +CREATE TABLE non_dist_2 (a INT, c TEXT); +CREATE TABLE non_dist_3 (a INT); +-- test non-router queries +INSERT INTO non_dist_1 SELECT * FROM dist_table; +INSERT INTO non_dist_2 SELECT a, c FROM dist_table; +INSERT INTO non_dist_3 SELECT a FROM dist_table; +SELECT * FROM non_dist_1 ORDER BY 1, 2, 3; + a | b | c +--------------------------------------------------------------------- + 1 | 6 | txt1 + 2 | 7 | txt2 + 3 | 8 | txt3 +(3 rows) + +SELECT * FROM non_dist_2 ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 1 | txt1 + 2 | txt2 + 3 | txt3 +(3 rows) + +SELECT * FROM non_dist_3 ORDER BY 1; + a +--------------------------------------------------------------------- + 1 + 2 + 3 +(3 rows) + +TRUNCATE non_dist_1, non_dist_2, non_dist_3; +-- test router queries +INSERT INTO non_dist_1 SELECT * FROM dist_table WHERE a = 1; +INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE a = 1; +INSERT INTO non_dist_3 SELECT a FROM dist_table WHERE a = 1; +SELECT * FROM non_dist_1 ORDER BY 1, 2, 3; + a | b | c +--------------------------------------------------------------------- + 1 | 6 | txt1 +(1 row) + +SELECT * FROM non_dist_2 ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 1 | txt1 +(1 row) + +SELECT * FROM non_dist_3 ORDER BY 1; + a +--------------------------------------------------------------------- + 1 +(1 row) + +TRUNCATE non_dist_1, non_dist_2, non_dist_3; +-- test columns in different order +INSERT INTO non_dist_1(b, a, c) SELECT a, b, c FROM dist_table; +SELECT * FROM non_dist_1 ORDER BY 1, 2, 3; + a | b | c +--------------------------------------------------------------------- + 6 | 1 | txt1 + 7 | 2 | txt2 + 8 | 3 | txt3 +(3 rows) + +TRUNCATE non_dist_1; +-- test EXPLAIN +EXPLAIN (COSTS FALSE) INSERT INTO non_dist_1 SELECT * FROM dist_table; + QUERY PLAN +--------------------------------------------------------------------- + Insert on non_dist_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_11235800 dist_table +(7 rows) + +EXPLAIN (COSTS FALSE) INSERT INTO non_dist_1 SELECT * FROM dist_table WHERE a = 1; + QUERY PLAN +--------------------------------------------------------------------- + Insert on non_dist_1 + -> Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_11235800 dist_table + Filter: (a = 1) +(8 rows) + +-- test RETURNING +INSERT INTO non_dist_1 SELECT * FROM dist_table ORDER BY 1, 2, 3 RETURNING *; + a | b | c +--------------------------------------------------------------------- + 1 | 6 | txt1 + 2 | 7 | txt2 + 3 | 8 | txt3 +(3 rows) + +INSERT INTO non_dist_1 SELECT * FROM dist_table WHERE a = 1 ORDER BY 1, 2, 3 RETURNING *; + a | b | c +--------------------------------------------------------------------- + 1 | 6 | txt1 +(1 row) + +-- test INSERT INTO a table with UNIQUE +CREATE TABLE non_dist_unique (a INT UNIQUE, b INT); +INSERT INTO non_dist_unique SELECT a, b FROM dist_table; +SELECT * FROM non_dist_unique ORDER BY 1; + a | b +--------------------------------------------------------------------- + 1 | 6 + 2 | 7 + 3 | 8 +(3 rows) + +INSERT INTO non_dist_unique SELECT a+1, b FROM dist_table ON CONFLICT (a) DO NOTHING; +SELECT * FROM non_dist_unique ORDER BY 1; + a | b +--------------------------------------------------------------------- + 1 | 6 + 2 | 7 + 3 | 8 + 4 | 8 +(4 rows) + +INSERT INTO non_dist_unique SELECT a+2, b FROM dist_table ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + non_dist_unique.b; +SELECT * FROM non_dist_unique ORDER BY 1; + a | b +--------------------------------------------------------------------- + 1 | 6 + 2 | 7 + 3 | 14 + 4 | 15 + 5 | 8 +(5 rows) + +DROP TABLE non_dist_unique; +-- test INSERT INTO a table with DEFAULT +CREATE TABLE non_dist_default (a INT, c TEXT DEFAULT 'def'); +INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a = 1; +SELECT * FROM non_dist_default ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 1 | def +(1 row) + +INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a > 1; +SELECT * FROM non_dist_default ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 1 | def + 2 | def + 3 | def +(3 rows) + +INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1; +SELECT * FROM non_dist_default ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 1 | def + 1 | txt1 + 2 | def + 3 | def +(4 rows) + +INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a > 1; +SELECT * FROM non_dist_default ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 1 | def + 1 | txt1 + 2 | def + 2 | txt2 + 3 | def + 3 | txt3 +(6 rows) + +DROP TABLE non_dist_default; +-- test CTEs +WITH with_table AS (SELECT a, c FROM dist_table ORDER BY a LIMIT 2) INSERT INTO non_dist_2 SELECT * FROM with_table; +SELECT * FROM non_dist_2 ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 1 | txt1 + 2 | txt2 +(2 rows) + +INSERT INTO non_dist_2 WITH with_table AS (SELECT a, c FROM dist_table ORDER BY a LIMIT 2) SELECT * FROM with_table; +SELECT * FROM non_dist_2 ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 1 | txt1 + 1 | txt1 + 2 | txt2 + 2 | txt2 +(4 rows) + +TRUNCATE non_dist_2; +WITH deleted_rows AS (DELETE FROM dist_table WHERE a < 3 RETURNING a, c) INSERT INTO non_dist_2 SELECT * FROM deleted_rows; +SELECT * FROM dist_table ORDER BY 1, 2, 3; + a | b | c +--------------------------------------------------------------------- + 3 | 8 | txt3 +(1 row) + +SELECT * FROM non_dist_2 ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 1 | txt1 + 2 | txt2 +(2 rows) + +TRUNCATE non_dist_2; +INSERT INTO dist_table VALUES (1, 6, 'txt1'), (2, 7, 'txt2'); +WITH insert_table AS (INSERT INTO non_dist_2 SELECT a, c FROM dist_table RETURNING *) SELECT * FROM insert_table ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 1 | txt1 + 2 | txt2 + 3 | txt3 +(3 rows) + +SELECT * FROM non_dist_2 ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 1 | txt1 + 2 | txt2 + 3 | txt3 +(3 rows) + +TRUNCATE non_dist_2; +-- test PREPARE +PREPARE insert_select_into_local AS INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE a = 1; +EXECUTE insert_select_into_local; +EXECUTE insert_select_into_local; +EXECUTE insert_select_into_local; +EXECUTE insert_select_into_local; +EXECUTE insert_select_into_local; +SELECT * FROM non_dist_2 ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 1 | txt1 + 1 | txt1 + 1 | txt1 + 1 | txt1 + 1 | txt1 +(5 rows) + +EXECUTE insert_select_into_local; +SELECT * FROM non_dist_2 ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 1 | txt1 + 1 | txt1 + 1 | txt1 + 1 | txt1 + 1 | txt1 + 1 | txt1 +(6 rows) + +TRUNCATE non_dist_2; +DEALLOCATE insert_select_into_local; +PREPARE insert_select_into_local(int) AS INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE a = $1; +EXECUTE insert_select_into_local(2); +EXECUTE insert_select_into_local(2); +EXECUTE insert_select_into_local(2); +EXECUTE insert_select_into_local(2); +EXECUTE insert_select_into_local(2); +SELECT * FROM non_dist_2 ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 2 | txt2 + 2 | txt2 + 2 | txt2 + 2 | txt2 + 2 | txt2 +(5 rows) + +EXECUTE insert_select_into_local(2); +SELECT * FROM non_dist_2 ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 2 | txt2 + 2 | txt2 + 2 | txt2 + 2 | txt2 + 2 | txt2 + 2 | txt2 +(6 rows) + +TRUNCATE non_dist_2; +DEALLOCATE insert_select_into_local; +PREPARE insert_select_into_local(int) AS INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE b = $1; +EXECUTE insert_select_into_local(8); +EXECUTE insert_select_into_local(8); +EXECUTE insert_select_into_local(8); +EXECUTE insert_select_into_local(8); +EXECUTE insert_select_into_local(8); +SELECT * FROM non_dist_2 ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 3 | txt3 + 3 | txt3 + 3 | txt3 + 3 | txt3 + 3 | txt3 +(5 rows) + +EXECUTE insert_select_into_local(8); +SELECT * FROM non_dist_2 ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 3 | txt3 + 3 | txt3 + 3 | txt3 + 3 | txt3 + 3 | txt3 + 3 | txt3 +(6 rows) + +TRUNCATE non_dist_2; +DEALLOCATE insert_select_into_local; +-- test reference table +CREATE TABLE ref_table (a INT, b INT, c TEXT); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO ref_table VALUES (1, 6, 'txt1'), (2, 7, 'txt2'), (3, 8, 'txt3'); +INSERT INTO non_dist_2 SELECT a, c FROM ref_table; +SELECT * FROM non_dist_2 ORDER BY 1, 2; + a | c +--------------------------------------------------------------------- + 1 | txt1 + 2 | txt2 + 3 | txt3 +(3 rows) + +TRUNCATE non_dist_2; +\set VERBOSITY terse +DROP SCHEMA insert_select_into_local_table CASCADE; +NOTICE: drop cascades to 5 other objects diff --git a/src/test/regress/expected/isolation_insert_select_vs_all.out b/src/test/regress/expected/isolation_insert_select_vs_all.out index 7987f87c5..18febf157 100644 --- a/src/test/regress/expected/isolation_insert_select_vs_all.out +++ b/src/test/regress/expected/isolation_insert_select_vs_all.out @@ -332,12 +332,12 @@ step s1-initialize: step s1-begin: BEGIN; step s1-insert-select: INSERT INTO insert_of_insert_select_hash SELECT * FROM select_of_insert_select_hash ORDER BY 1, 2 LIMIT 5;; -ERROR: cannot INSERT rows from a distributed query into a local table -step s2-distribute-table-on-inserted: SELECT create_distributed_table('insert_of_insert_select_hash', 'id'); +step s2-distribute-table-on-inserted: SELECT create_distributed_table('insert_of_insert_select_hash', 'id'); +step s1-commit: COMMIT; +step s2-distribute-table-on-inserted: <... completed> create_distributed_table -step s1-commit: COMMIT; step s1-select-count: SELECT COUNT(*) FROM select_of_insert_select_hash; count diff --git a/src/test/regress/expected/multi_follower_dml.out b/src/test/regress/expected/multi_follower_dml.out index 08e84a8b4..fb1ef0098 100644 --- a/src/test/regress/expected/multi_follower_dml.out +++ b/src/test/regress/expected/multi_follower_dml.out @@ -91,8 +91,7 @@ SELECT * FROM the_table ORDER BY a; INSERT INTO local VALUES (1, 1); ERROR: cannot execute INSERT in a read-only transaction INSERT INTO local SELECT a, b FROM the_table; -ERROR: cannot INSERT rows from a distributed query into a local table -HINT: Consider using CREATE TEMPORARY TABLE tmp AS SELECT ... and inserting from the temporary table. +ERROR: cannot execute INSERT in a read-only transaction -- we shouldn't be able to create local tables CREATE TEMP TABLE local_copy_of_the_table AS SELECT * FROM the_table; ERROR: cannot execute CREATE TABLE AS in a read-only transaction diff --git a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out index 46c29b801..a4de3c304 100644 --- a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out +++ b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out @@ -1,16 +1,5 @@ --------------------------------------------------------------------- --------------------------------------------------------------------- --- Insert into local table ---------------------------------------------------------------------- ---------------------------------------------------------------------- -CREATE TABLE test_table_1(id int); -INSERT INTO test_table_1 -SELECT user_id FROM users_table; -ERROR: cannot INSERT rows from a distributed query into a local table -HINT: Consider using CREATE TEMPORARY TABLE tmp AS SELECT ... and inserting from the temporary table. -DROP TABLE test_table_1; ---------------------------------------------------------------------- ---------------------------------------------------------------------- -- Vanilla funnel query --------------------------------------------------------------------- --------------------------------------------------------------------- diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 858d42da5..3988f0a8b 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -46,7 +46,7 @@ test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_ test: multi_shard_update_delete recursive_dml_with_different_planners_executors test: insert_select_repartition window_functions dml_recursive multi_insert_select_window test: multi_insert_select_conflict create_table_triggers -test: multi_row_insert +test: multi_row_insert insert_select_into_local_table # following should not run in parallel because it relies on connection counts to workers test: insert_select_connection_leak diff --git a/src/test/regress/sql/insert_select_into_local_table.sql b/src/test/regress/sql/insert_select_into_local_table.sql new file mode 100644 index 000000000..21564f1f6 --- /dev/null +++ b/src/test/regress/sql/insert_select_into_local_table.sql @@ -0,0 +1,153 @@ +CREATE SCHEMA insert_select_into_local_table; +SET search_path TO insert_select_into_local_table; + +SET citus.shard_count = 4; +SET citus.next_shard_id TO 11235800; + + +CREATE TABLE dist_table (a INT, b INT, c TEXT); +SELECT create_distributed_table('dist_table', 'a'); + +INSERT INTO dist_table VALUES (1, 6, 'txt1'), (2, 7, 'txt2'), (3, 8, 'txt3'); + +CREATE TABLE non_dist_1 (a INT, b INT, c TEXT); +CREATE TABLE non_dist_2 (a INT, c TEXT); +CREATE TABLE non_dist_3 (a INT); + + +-- test non-router queries +INSERT INTO non_dist_1 SELECT * FROM dist_table; +INSERT INTO non_dist_2 SELECT a, c FROM dist_table; +INSERT INTO non_dist_3 SELECT a FROM dist_table; + +SELECT * FROM non_dist_1 ORDER BY 1, 2, 3; +SELECT * FROM non_dist_2 ORDER BY 1, 2; +SELECT * FROM non_dist_3 ORDER BY 1; + +TRUNCATE non_dist_1, non_dist_2, non_dist_3; + + +-- test router queries +INSERT INTO non_dist_1 SELECT * FROM dist_table WHERE a = 1; +INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE a = 1; +INSERT INTO non_dist_3 SELECT a FROM dist_table WHERE a = 1; + +SELECT * FROM non_dist_1 ORDER BY 1, 2, 3; +SELECT * FROM non_dist_2 ORDER BY 1, 2; +SELECT * FROM non_dist_3 ORDER BY 1; + +TRUNCATE non_dist_1, non_dist_2, non_dist_3; + + +-- test columns in different order +INSERT INTO non_dist_1(b, a, c) SELECT a, b, c FROM dist_table; +SELECT * FROM non_dist_1 ORDER BY 1, 2, 3; + +TRUNCATE non_dist_1; + + +-- test EXPLAIN +EXPLAIN (COSTS FALSE) INSERT INTO non_dist_1 SELECT * FROM dist_table; +EXPLAIN (COSTS FALSE) INSERT INTO non_dist_1 SELECT * FROM dist_table WHERE a = 1; + + +-- test RETURNING +INSERT INTO non_dist_1 SELECT * FROM dist_table ORDER BY 1, 2, 3 RETURNING *; +INSERT INTO non_dist_1 SELECT * FROM dist_table WHERE a = 1 ORDER BY 1, 2, 3 RETURNING *; + + +-- test INSERT INTO a table with UNIQUE +CREATE TABLE non_dist_unique (a INT UNIQUE, b INT); +INSERT INTO non_dist_unique SELECT a, b FROM dist_table; +SELECT * FROM non_dist_unique ORDER BY 1; +INSERT INTO non_dist_unique SELECT a+1, b FROM dist_table ON CONFLICT (a) DO NOTHING; +SELECT * FROM non_dist_unique ORDER BY 1; +INSERT INTO non_dist_unique SELECT a+2, b FROM dist_table ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + non_dist_unique.b; +SELECT * FROM non_dist_unique ORDER BY 1; +DROP TABLE non_dist_unique; + + +-- test INSERT INTO a table with DEFAULT +CREATE TABLE non_dist_default (a INT, c TEXT DEFAULT 'def'); +INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a = 1; +SELECT * FROM non_dist_default ORDER BY 1, 2; +INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a > 1; +SELECT * FROM non_dist_default ORDER BY 1, 2; +INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1; +SELECT * FROM non_dist_default ORDER BY 1, 2; +INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a > 1; +SELECT * FROM non_dist_default ORDER BY 1, 2; +DROP TABLE non_dist_default; + + +-- test CTEs +WITH with_table AS (SELECT a, c FROM dist_table ORDER BY a LIMIT 2) INSERT INTO non_dist_2 SELECT * FROM with_table; +SELECT * FROM non_dist_2 ORDER BY 1, 2; + +INSERT INTO non_dist_2 WITH with_table AS (SELECT a, c FROM dist_table ORDER BY a LIMIT 2) SELECT * FROM with_table; +SELECT * FROM non_dist_2 ORDER BY 1, 2; + +TRUNCATE non_dist_2; + +WITH deleted_rows AS (DELETE FROM dist_table WHERE a < 3 RETURNING a, c) INSERT INTO non_dist_2 SELECT * FROM deleted_rows; +SELECT * FROM dist_table ORDER BY 1, 2, 3; +SELECT * FROM non_dist_2 ORDER BY 1, 2; + +TRUNCATE non_dist_2; +INSERT INTO dist_table VALUES (1, 6, 'txt1'), (2, 7, 'txt2'); + +WITH insert_table AS (INSERT INTO non_dist_2 SELECT a, c FROM dist_table RETURNING *) SELECT * FROM insert_table ORDER BY 1, 2; +SELECT * FROM non_dist_2 ORDER BY 1, 2; + +TRUNCATE non_dist_2; + + +-- test PREPARE +PREPARE insert_select_into_local AS INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE a = 1; +EXECUTE insert_select_into_local; +EXECUTE insert_select_into_local; +EXECUTE insert_select_into_local; +EXECUTE insert_select_into_local; +EXECUTE insert_select_into_local; +SELECT * FROM non_dist_2 ORDER BY 1, 2; +EXECUTE insert_select_into_local; +SELECT * FROM non_dist_2 ORDER BY 1, 2; +TRUNCATE non_dist_2; +DEALLOCATE insert_select_into_local; + +PREPARE insert_select_into_local(int) AS INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE a = $1; +EXECUTE insert_select_into_local(2); +EXECUTE insert_select_into_local(2); +EXECUTE insert_select_into_local(2); +EXECUTE insert_select_into_local(2); +EXECUTE insert_select_into_local(2); +SELECT * FROM non_dist_2 ORDER BY 1, 2; +EXECUTE insert_select_into_local(2); +SELECT * FROM non_dist_2 ORDER BY 1, 2; +TRUNCATE non_dist_2; +DEALLOCATE insert_select_into_local; + + +PREPARE insert_select_into_local(int) AS INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE b = $1; +EXECUTE insert_select_into_local(8); +EXECUTE insert_select_into_local(8); +EXECUTE insert_select_into_local(8); +EXECUTE insert_select_into_local(8); +EXECUTE insert_select_into_local(8); +SELECT * FROM non_dist_2 ORDER BY 1, 2; +EXECUTE insert_select_into_local(8); +SELECT * FROM non_dist_2 ORDER BY 1, 2; +TRUNCATE non_dist_2; +DEALLOCATE insert_select_into_local; + + +-- test reference table +CREATE TABLE ref_table (a INT, b INT, c TEXT); +SELECT create_reference_table('ref_table'); +INSERT INTO ref_table VALUES (1, 6, 'txt1'), (2, 7, 'txt2'), (3, 8, 'txt3'); +INSERT INTO non_dist_2 SELECT a, c FROM ref_table; +SELECT * FROM non_dist_2 ORDER BY 1, 2; +TRUNCATE non_dist_2; + +\set VERBOSITY terse +DROP SCHEMA insert_select_into_local_table CASCADE; diff --git a/src/test/regress/sql/multi_insert_select_non_pushable_queries.sql b/src/test/regress/sql/multi_insert_select_non_pushable_queries.sql index 3cc90ca2d..399883ed9 100644 --- a/src/test/regress/sql/multi_insert_select_non_pushable_queries.sql +++ b/src/test/regress/sql/multi_insert_select_non_pushable_queries.sql @@ -1,15 +1,3 @@ ------------------------------------- ------------------------------------- --- Insert into local table ------------------------------------- ------------------------------------- -CREATE TABLE test_table_1(id int); - -INSERT INTO test_table_1 -SELECT user_id FROM users_table; - -DROP TABLE test_table_1; - ------------------------------------ ------------------------------------ -- Vanilla funnel query