Insert Select Into Local Table (#3870)

* Insert select with master query

* Use relid to set custom_scan_tlist varno

* Reviews

* Fixes null check

Co-authored-by: Marco Slot <marco.slot@gmail.com>
pull/3909/head
Halil Ozan Akgül 2020-06-12 17:06:31 +03:00 committed by GitHub
parent 0e12d045b1
commit 8c5eb6b7ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 768 additions and 60 deletions

View File

@ -87,11 +87,6 @@ static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
boundParams,
PlannerRestrictionContext *
plannerRestrictionContext);
static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery,
Query *query, ParamListInfo boundParams,
bool hasUnresolvedParams,
PlannerRestrictionContext *
plannerRestrictionContext);
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
relationId);
@ -193,21 +188,6 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
}
else if (needsDistributedPlanning)
{
/*
* Inserting into a local table needs to go through the regular postgres
* planner/executor, but the SELECT needs to go through Citus. We currently
* don't have a way of doing both things and therefore error out, but do
* have a handy tip for users.
*/
if (InsertSelectIntoLocalTable(parse))
{
ereport(ERROR, (errmsg("cannot INSERT rows from a distributed query into a "
"local table"),
errhint("Consider using CREATE TEMPORARY TABLE tmp AS "
"SELECT ... and inserting from the temporary "
"table.")));
}
/*
* standard_planner scribbles on it's input, but for deparsing we need the
* unmodified form. Note that before copying we call
@ -922,7 +902,7 @@ TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
* - If any, go back to step 1 by calling itself recursively
* 3. Logical planner
*/
static DistributedPlan *
DistributedPlan *
CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamListInfo
boundParams, bool hasUnresolvedParams,
PlannerRestrictionContext *plannerRestrictionContext)
@ -952,6 +932,22 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
distributedPlan =
CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext);
}
else if (InsertSelectIntoLocalTable(originalQuery))
{
if (hasUnresolvedParams)
{
/*
* Unresolved parameters can cause performance regressions in
* INSERT...SELECT when the partition column is a parameter
* because we don't perform any additional pruning in the executor.
*/
return NULL;
}
distributedPlan =
CreateInsertSelectIntoLocalTablePlan(planId, originalQuery, boundParams,
hasUnresolvedParams,
plannerRestrictionContext);
}
else
{
/* modifications are always routed through the same planner/executor */

View File

@ -13,11 +13,14 @@
#include "distributed/pg_version_constants.h"
#include "catalog/pg_class.h"
#include "catalog/pg_type.h"
#include "distributed/citus_clauses.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/errormessage.h"
#include "distributed/listutils.h"
#include "distributed/log_utils.h"
#include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
@ -60,6 +63,8 @@ static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
uint32 taskIdIndex,
bool allRelationsJoinedOnPartitionKey,
DeferredErrorMessage **routerPlannerError);
static Query * CreateMasterQueryForRouterPlan(DistributedPlan *distPlan);
static List * CreateTargetListForMasterQuery(List *targetList);
static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree,
RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte,
@ -304,6 +309,176 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
}
/*
* CreateInsertSelectIntoLocalTablePlan creates the plan for INSERT .. SELECT queries
* where the selected table is distributed and the inserted table is not.
*
* To create the plan, this function first creates a distributed plan for the SELECT
* part. Then puts it as a subquery to the original (non-distributed) INSERT query as
* a subquery. Finally, it puts this INSERT query, which now has a distributed SELECT
* subquery, in the masterQuery.
*
* If the SELECT query is a router query, whose distributed plan does not have a
* masterQuery, this function also creates a dummy masterQuery for that.
*/
DistributedPlan *
CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamListInfo
boundParams, bool hasUnresolvedParams,
PlannerRestrictionContext *plannerRestrictionContext)
{
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(originalQuery);
Query *selectQuery = BuildSelectForInsertSelect(originalQuery);
originalQuery->cteList = NIL;
DistributedPlan *distPlan = CreateDistributedPlan(planId, selectQuery,
copyObject(selectQuery),
boundParams, hasUnresolvedParams,
plannerRestrictionContext);
/*
* We don't expect distPlan to be NULL here because hasUnresolvedParams is
* already checked before this function and CreateDistributedPlan only returns
* NULL when there are unresolved parameters.
*/
Assert(distPlan != NULL);
if (distPlan->planningError)
{
return distPlan;
}
if (distPlan->masterQuery == NULL)
{
/*
* For router queries, we construct a synthetic master query that simply passes
* on the results of the remote tasks, which we can then use as the select in
* the INSERT .. SELECT.
*/
distPlan->masterQuery = CreateMasterQueryForRouterPlan(
distPlan);
}
/*
* masterQuery of a distributed select is for combining the results from
* worker nodes on the coordinator node. Putting it as a subquery to the
* INSERT query, causes the INSERT query to insert the combined select value
* from the workers. And making the resulting insert query the masterQuery
* let's us execute this insert command.
*
* So this operation makes the master query insert the result of the
* distributed select instead of returning it.
*/
selectRte->subquery = distPlan->masterQuery;
distPlan->masterQuery = originalQuery;
return distPlan;
}
/*
* CreateMasterQueryForRouterPlan is used for creating a dummy masterQuery
* for a router plan, since router plans normally don't have one.
*/
static Query *
CreateMasterQueryForRouterPlan(DistributedPlan *distPlan)
{
const Index insertTableId = 1;
List *tableIdList = list_make1(makeInteger(insertTableId));
Job *dependentJob = distPlan->workerJob;
List *dependentTargetList = dependentJob->jobQuery->targetList;
/* compute column names for the derived table */
uint32 columnCount = (uint32) list_length(dependentTargetList);
List *columnNameList = DerivedColumnNameList(columnCount,
dependentJob->jobId);
List *funcColumnNames = NIL;
List *funcColumnTypes = NIL;
List *funcColumnTypeMods = NIL;
List *funcCollations = NIL;
TargetEntry *targetEntry = NULL;
foreach_ptr(targetEntry, dependentTargetList)
{
Node *expr = (Node *) targetEntry->expr;
char *name = targetEntry->resname;
if (name == NULL)
{
name = pstrdup("unnamed");
}
funcColumnNames = lappend(funcColumnNames, makeString(name));
funcColumnTypes = lappend_oid(funcColumnTypes, exprType(expr));
funcColumnTypeMods = lappend_int(funcColumnTypeMods, exprTypmod(expr));
funcCollations = lappend_oid(funcCollations, exprCollation(expr));
}
RangeTblEntry *rangeTableEntry = DerivedRangeTableEntry(NULL,
columnNameList,
tableIdList,
funcColumnNames,
funcColumnTypes,
funcColumnTypeMods,
funcCollations);
List *targetList = CreateTargetListForMasterQuery(dependentTargetList);
RangeTblRef *rangeTableRef = makeNode(RangeTblRef);
rangeTableRef->rtindex = 1;
FromExpr *joinTree = makeNode(FromExpr);
joinTree->quals = NULL;
joinTree->fromlist = list_make1(rangeTableRef);
Query *masterQuery = makeNode(Query);
masterQuery->commandType = CMD_SELECT;
masterQuery->querySource = QSRC_ORIGINAL;
masterQuery->canSetTag = true;
masterQuery->rtable = list_make1(rangeTableEntry);
masterQuery->targetList = targetList;
masterQuery->jointree = joinTree;
return masterQuery;
}
/*
* CreateTargetListForMasterQuery is used for creating a target list for
* master query.
*/
static List *
CreateTargetListForMasterQuery(List *targetList)
{
List *newTargetEntryList = NIL;
const uint32 masterTableId = 1;
int columnId = 1;
/* iterate over original target entries */
TargetEntry *originalTargetEntry = NULL;
foreach_ptr(originalTargetEntry, targetList)
{
TargetEntry *newTargetEntry = flatCopyTargetEntry(originalTargetEntry);
Var *column = makeVarFromTargetEntry(masterTableId, originalTargetEntry);
column->varattno = columnId;
column->varoattno = columnId;
columnId++;
if (column->vartype == RECORDOID || column->vartype == RECORDARRAYOID)
{
column->vartypmod = BlessRecordExpression(originalTargetEntry->expr);
}
Expr *newExpression = (Expr *) column;
newTargetEntry->expr = newExpression;
newTargetEntryList = lappend(newTargetEntryList, newTargetEntry);
}
return newTargetEntryList;
}
/*
* DistributedInsertSelectSupported returns NULL if the INSERT ... SELECT query
* is supported, or a description why not.

View File

@ -17,6 +17,7 @@
#include "catalog/pg_type.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/insert_select_planner.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_master_planner.h"
@ -191,6 +192,37 @@ CitusCustomScanPathPlan(PlannerInfo *root,
*/
citusPath->remoteScan->scan.plan.targetlist = tlist;
/*
* The custom_scan_tlist contains target entries for to the "output" of the call
* to citus_extradata_container, which is actually replaced by a CustomScan.
* The target entries are initialized with varno 1 (see MasterTargetList), since
* it's currently the only relation in the join tree of the masterQuery.
*
* If the citus_extradata_container function call is not the first relation to
* appear in the flattened rtable for the entire plan, then varno is now pointing
* to the wrong relation and needs to be updated.
*
* Example:
* When the masterQuery field of the DistributedPlan is
* INSERT INTO local SELECT .. FROM citus_extradata_container.
* In that case the varno of citusdata_extradata_container should be 3, because
* it is preceded range table entries for "local" and the subquery.
*/
if (rel->relid != 1)
{
TargetEntry *targetEntry = NULL;
foreach_ptr(targetEntry, citusPath->remoteScan->custom_scan_tlist)
{
/* we created this list, so we know it only contains Var */
Assert(IsA(targetEntry->expr, Var));
Var *var = (Var *) targetEntry->expr;
var->varno = rel->relid;
}
}
/* clauses might have been added by the planner, need to add them to our scan */
RestrictInfo *restrictInfo = NULL;
List **quals = &citusPath->remoteScan->scan.plan.qual;

View File

@ -135,13 +135,6 @@ static RangeTblEntry * JoinRangeTableEntry(JoinExpr *joinExpr, List *dependentJo
static int ExtractRangeTableId(Node *node);
static void ExtractColumns(RangeTblEntry *rangeTableEntry, int rangeTableId,
List *dependentJobList, List **columnNames, List **columnVars);
static RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *columnNames,
List *tableIdList, List *funcColumnNames,
List *funcColumnTypes,
List *funcColumnTypeMods,
List *funcCollations);
static List * DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId);
static Query * BuildSubqueryJobQuery(MultiNode *multiNode);
static void UpdateAllColumnAttributes(Node *columnContainer, List *rangeTableList,
List *dependentJobList);
@ -900,7 +893,7 @@ BaseRangeTableList(MultiNode *multiNode)
* derived table either represents the output of a repartition job; or the data
* on worker nodes in case of the master node query.
*/
static RangeTblEntry *
RangeTblEntry *
DerivedRangeTableEntry(MultiNode *multiNode, List *columnList, List *tableIdList,
List *funcColumnNames, List *funcColumnTypes,
List *funcColumnTypeMods, List *funcCollations)
@ -923,7 +916,7 @@ DerivedRangeTableEntry(MultiNode *multiNode, List *columnList, List *tableIdList
* tables. These column names are then used when building the create stament
* query string for derived tables.
*/
static List *
List *
DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId)
{
List *columnNameList = NIL;

View File

@ -453,7 +453,7 @@ AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval
RangeTblEntry *
ExtractSelectRangeTableEntry(Query *query)
{
Assert(InsertSelectIntoCitusTable(query));
Assert(InsertSelectIntoCitusTable(query) || InsertSelectIntoLocalTable(query));
/*
* Since we already asserted InsertSelectIntoCitusTable() it is safe to access

View File

@ -209,4 +209,12 @@ extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan);
extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
struct DistributedPlan *distributedPlan);
extern struct DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery,
Query *query, ParamListInfo
boundParams, bool
hasUnresolvedParams,
PlannerRestrictionContext *
plannerRestrictionContext);
#endif /* DISTRIBUTED_PLANNER_H */

View File

@ -34,6 +34,13 @@ extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ance
extern DistributedPlan * CreateInsertSelectPlan(uint64 planId, Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId,
Query *originalQuery,
ParamListInfo
boundParams, bool
hasUnresolvedParams,
PlannerRestrictionContext *
plannerRestrictionContext);
extern char * InsertSelectResultIdPrefix(uint64 planId);

View File

@ -579,4 +579,13 @@ extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId,
extern uint64 UniqueJobId(void);
extern List * DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId);
extern RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *columnList,
List *tableIdList,
List *funcColumnNames,
List *funcColumnTypes,
List *funcColumnTypeMods,
List *funcCollations);
#endif /* MULTI_PHYSICAL_PLANNER_H */

View File

@ -0,0 +1,359 @@
CREATE SCHEMA insert_select_into_local_table;
SET search_path TO insert_select_into_local_table;
SET citus.shard_count = 4;
SET citus.next_shard_id TO 11235800;
CREATE TABLE dist_table (a INT, b INT, c TEXT);
SELECT create_distributed_table('dist_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_table VALUES (1, 6, 'txt1'), (2, 7, 'txt2'), (3, 8, 'txt3');
CREATE TABLE non_dist_1 (a INT, b INT, c TEXT);
CREATE TABLE non_dist_2 (a INT, c TEXT);
CREATE TABLE non_dist_3 (a INT);
-- test non-router queries
INSERT INTO non_dist_1 SELECT * FROM dist_table;
INSERT INTO non_dist_2 SELECT a, c FROM dist_table;
INSERT INTO non_dist_3 SELECT a FROM dist_table;
SELECT * FROM non_dist_1 ORDER BY 1, 2, 3;
a | b | c
---------------------------------------------------------------------
1 | 6 | txt1
2 | 7 | txt2
3 | 8 | txt3
(3 rows)
SELECT * FROM non_dist_2 ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
1 | txt1
2 | txt2
3 | txt3
(3 rows)
SELECT * FROM non_dist_3 ORDER BY 1;
a
---------------------------------------------------------------------
1
2
3
(3 rows)
TRUNCATE non_dist_1, non_dist_2, non_dist_3;
-- test router queries
INSERT INTO non_dist_1 SELECT * FROM dist_table WHERE a = 1;
INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE a = 1;
INSERT INTO non_dist_3 SELECT a FROM dist_table WHERE a = 1;
SELECT * FROM non_dist_1 ORDER BY 1, 2, 3;
a | b | c
---------------------------------------------------------------------
1 | 6 | txt1
(1 row)
SELECT * FROM non_dist_2 ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
1 | txt1
(1 row)
SELECT * FROM non_dist_3 ORDER BY 1;
a
---------------------------------------------------------------------
1
(1 row)
TRUNCATE non_dist_1, non_dist_2, non_dist_3;
-- test columns in different order
INSERT INTO non_dist_1(b, a, c) SELECT a, b, c FROM dist_table;
SELECT * FROM non_dist_1 ORDER BY 1, 2, 3;
a | b | c
---------------------------------------------------------------------
6 | 1 | txt1
7 | 2 | txt2
8 | 3 | txt3
(3 rows)
TRUNCATE non_dist_1;
-- test EXPLAIN
EXPLAIN (COSTS FALSE) INSERT INTO non_dist_1 SELECT * FROM dist_table;
QUERY PLAN
---------------------------------------------------------------------
Insert on non_dist_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_11235800 dist_table
(7 rows)
EXPLAIN (COSTS FALSE) INSERT INTO non_dist_1 SELECT * FROM dist_table WHERE a = 1;
QUERY PLAN
---------------------------------------------------------------------
Insert on non_dist_1
-> Custom Scan (Citus Adaptive)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_11235800 dist_table
Filter: (a = 1)
(8 rows)
-- test RETURNING
INSERT INTO non_dist_1 SELECT * FROM dist_table ORDER BY 1, 2, 3 RETURNING *;
a | b | c
---------------------------------------------------------------------
1 | 6 | txt1
2 | 7 | txt2
3 | 8 | txt3
(3 rows)
INSERT INTO non_dist_1 SELECT * FROM dist_table WHERE a = 1 ORDER BY 1, 2, 3 RETURNING *;
a | b | c
---------------------------------------------------------------------
1 | 6 | txt1
(1 row)
-- test INSERT INTO a table with UNIQUE
CREATE TABLE non_dist_unique (a INT UNIQUE, b INT);
INSERT INTO non_dist_unique SELECT a, b FROM dist_table;
SELECT * FROM non_dist_unique ORDER BY 1;
a | b
---------------------------------------------------------------------
1 | 6
2 | 7
3 | 8
(3 rows)
INSERT INTO non_dist_unique SELECT a+1, b FROM dist_table ON CONFLICT (a) DO NOTHING;
SELECT * FROM non_dist_unique ORDER BY 1;
a | b
---------------------------------------------------------------------
1 | 6
2 | 7
3 | 8
4 | 8
(4 rows)
INSERT INTO non_dist_unique SELECT a+2, b FROM dist_table ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + non_dist_unique.b;
SELECT * FROM non_dist_unique ORDER BY 1;
a | b
---------------------------------------------------------------------
1 | 6
2 | 7
3 | 14
4 | 15
5 | 8
(5 rows)
DROP TABLE non_dist_unique;
-- test INSERT INTO a table with DEFAULT
CREATE TABLE non_dist_default (a INT, c TEXT DEFAULT 'def');
INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a = 1;
SELECT * FROM non_dist_default ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
1 | def
(1 row)
INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a > 1;
SELECT * FROM non_dist_default ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
1 | def
2 | def
3 | def
(3 rows)
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1;
SELECT * FROM non_dist_default ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
1 | def
1 | txt1
2 | def
3 | def
(4 rows)
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a > 1;
SELECT * FROM non_dist_default ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
1 | def
1 | txt1
2 | def
2 | txt2
3 | def
3 | txt3
(6 rows)
DROP TABLE non_dist_default;
-- test CTEs
WITH with_table AS (SELECT a, c FROM dist_table ORDER BY a LIMIT 2) INSERT INTO non_dist_2 SELECT * FROM with_table;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
1 | txt1
2 | txt2
(2 rows)
INSERT INTO non_dist_2 WITH with_table AS (SELECT a, c FROM dist_table ORDER BY a LIMIT 2) SELECT * FROM with_table;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
1 | txt1
1 | txt1
2 | txt2
2 | txt2
(4 rows)
TRUNCATE non_dist_2;
WITH deleted_rows AS (DELETE FROM dist_table WHERE a < 3 RETURNING a, c) INSERT INTO non_dist_2 SELECT * FROM deleted_rows;
SELECT * FROM dist_table ORDER BY 1, 2, 3;
a | b | c
---------------------------------------------------------------------
3 | 8 | txt3
(1 row)
SELECT * FROM non_dist_2 ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
1 | txt1
2 | txt2
(2 rows)
TRUNCATE non_dist_2;
INSERT INTO dist_table VALUES (1, 6, 'txt1'), (2, 7, 'txt2');
WITH insert_table AS (INSERT INTO non_dist_2 SELECT a, c FROM dist_table RETURNING *) SELECT * FROM insert_table ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
1 | txt1
2 | txt2
3 | txt3
(3 rows)
SELECT * FROM non_dist_2 ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
1 | txt1
2 | txt2
3 | txt3
(3 rows)
TRUNCATE non_dist_2;
-- test PREPARE
PREPARE insert_select_into_local AS INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE a = 1;
EXECUTE insert_select_into_local;
EXECUTE insert_select_into_local;
EXECUTE insert_select_into_local;
EXECUTE insert_select_into_local;
EXECUTE insert_select_into_local;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
1 | txt1
1 | txt1
1 | txt1
1 | txt1
1 | txt1
(5 rows)
EXECUTE insert_select_into_local;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
1 | txt1
1 | txt1
1 | txt1
1 | txt1
1 | txt1
1 | txt1
(6 rows)
TRUNCATE non_dist_2;
DEALLOCATE insert_select_into_local;
PREPARE insert_select_into_local(int) AS INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE a = $1;
EXECUTE insert_select_into_local(2);
EXECUTE insert_select_into_local(2);
EXECUTE insert_select_into_local(2);
EXECUTE insert_select_into_local(2);
EXECUTE insert_select_into_local(2);
SELECT * FROM non_dist_2 ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
2 | txt2
2 | txt2
2 | txt2
2 | txt2
2 | txt2
(5 rows)
EXECUTE insert_select_into_local(2);
SELECT * FROM non_dist_2 ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
2 | txt2
2 | txt2
2 | txt2
2 | txt2
2 | txt2
2 | txt2
(6 rows)
TRUNCATE non_dist_2;
DEALLOCATE insert_select_into_local;
PREPARE insert_select_into_local(int) AS INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE b = $1;
EXECUTE insert_select_into_local(8);
EXECUTE insert_select_into_local(8);
EXECUTE insert_select_into_local(8);
EXECUTE insert_select_into_local(8);
EXECUTE insert_select_into_local(8);
SELECT * FROM non_dist_2 ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
3 | txt3
3 | txt3
3 | txt3
3 | txt3
3 | txt3
(5 rows)
EXECUTE insert_select_into_local(8);
SELECT * FROM non_dist_2 ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
3 | txt3
3 | txt3
3 | txt3
3 | txt3
3 | txt3
3 | txt3
(6 rows)
TRUNCATE non_dist_2;
DEALLOCATE insert_select_into_local;
-- test reference table
CREATE TABLE ref_table (a INT, b INT, c TEXT);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO ref_table VALUES (1, 6, 'txt1'), (2, 7, 'txt2'), (3, 8, 'txt3');
INSERT INTO non_dist_2 SELECT a, c FROM ref_table;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
a | c
---------------------------------------------------------------------
1 | txt1
2 | txt2
3 | txt3
(3 rows)
TRUNCATE non_dist_2;
\set VERBOSITY terse
DROP SCHEMA insert_select_into_local_table CASCADE;
NOTICE: drop cascades to 5 other objects

View File

@ -332,12 +332,12 @@ step s1-initialize:
step s1-begin: BEGIN;
step s1-insert-select: INSERT INTO insert_of_insert_select_hash SELECT * FROM select_of_insert_select_hash ORDER BY 1, 2 LIMIT 5;;
ERROR: cannot INSERT rows from a distributed query into a local table
step s2-distribute-table-on-inserted: SELECT create_distributed_table('insert_of_insert_select_hash', 'id');
step s2-distribute-table-on-inserted: SELECT create_distributed_table('insert_of_insert_select_hash', 'id'); <waiting ...>
step s1-commit: COMMIT;
step s2-distribute-table-on-inserted: <... completed>
create_distributed_table
step s1-commit: COMMIT;
step s1-select-count: SELECT COUNT(*) FROM select_of_insert_select_hash;
count

View File

@ -91,8 +91,7 @@ SELECT * FROM the_table ORDER BY a;
INSERT INTO local VALUES (1, 1);
ERROR: cannot execute INSERT in a read-only transaction
INSERT INTO local SELECT a, b FROM the_table;
ERROR: cannot INSERT rows from a distributed query into a local table
HINT: Consider using CREATE TEMPORARY TABLE tmp AS SELECT ... and inserting from the temporary table.
ERROR: cannot execute INSERT in a read-only transaction
-- we shouldn't be able to create local tables
CREATE TEMP TABLE local_copy_of_the_table AS SELECT * FROM the_table;
ERROR: cannot execute CREATE TABLE AS in a read-only transaction

View File

@ -1,16 +1,5 @@
---------------------------------------------------------------------
---------------------------------------------------------------------
-- Insert into local table
---------------------------------------------------------------------
---------------------------------------------------------------------
CREATE TABLE test_table_1(id int);
INSERT INTO test_table_1
SELECT user_id FROM users_table;
ERROR: cannot INSERT rows from a distributed query into a local table
HINT: Consider using CREATE TEMPORARY TABLE tmp AS SELECT ... and inserting from the temporary table.
DROP TABLE test_table_1;
---------------------------------------------------------------------
---------------------------------------------------------------------
-- Vanilla funnel query
---------------------------------------------------------------------
---------------------------------------------------------------------

View File

@ -46,7 +46,7 @@ test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_
test: multi_shard_update_delete recursive_dml_with_different_planners_executors
test: insert_select_repartition window_functions dml_recursive multi_insert_select_window
test: multi_insert_select_conflict create_table_triggers
test: multi_row_insert
test: multi_row_insert insert_select_into_local_table
# following should not run in parallel because it relies on connection counts to workers
test: insert_select_connection_leak

View File

@ -0,0 +1,153 @@
CREATE SCHEMA insert_select_into_local_table;
SET search_path TO insert_select_into_local_table;
SET citus.shard_count = 4;
SET citus.next_shard_id TO 11235800;
CREATE TABLE dist_table (a INT, b INT, c TEXT);
SELECT create_distributed_table('dist_table', 'a');
INSERT INTO dist_table VALUES (1, 6, 'txt1'), (2, 7, 'txt2'), (3, 8, 'txt3');
CREATE TABLE non_dist_1 (a INT, b INT, c TEXT);
CREATE TABLE non_dist_2 (a INT, c TEXT);
CREATE TABLE non_dist_3 (a INT);
-- test non-router queries
INSERT INTO non_dist_1 SELECT * FROM dist_table;
INSERT INTO non_dist_2 SELECT a, c FROM dist_table;
INSERT INTO non_dist_3 SELECT a FROM dist_table;
SELECT * FROM non_dist_1 ORDER BY 1, 2, 3;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
SELECT * FROM non_dist_3 ORDER BY 1;
TRUNCATE non_dist_1, non_dist_2, non_dist_3;
-- test router queries
INSERT INTO non_dist_1 SELECT * FROM dist_table WHERE a = 1;
INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE a = 1;
INSERT INTO non_dist_3 SELECT a FROM dist_table WHERE a = 1;
SELECT * FROM non_dist_1 ORDER BY 1, 2, 3;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
SELECT * FROM non_dist_3 ORDER BY 1;
TRUNCATE non_dist_1, non_dist_2, non_dist_3;
-- test columns in different order
INSERT INTO non_dist_1(b, a, c) SELECT a, b, c FROM dist_table;
SELECT * FROM non_dist_1 ORDER BY 1, 2, 3;
TRUNCATE non_dist_1;
-- test EXPLAIN
EXPLAIN (COSTS FALSE) INSERT INTO non_dist_1 SELECT * FROM dist_table;
EXPLAIN (COSTS FALSE) INSERT INTO non_dist_1 SELECT * FROM dist_table WHERE a = 1;
-- test RETURNING
INSERT INTO non_dist_1 SELECT * FROM dist_table ORDER BY 1, 2, 3 RETURNING *;
INSERT INTO non_dist_1 SELECT * FROM dist_table WHERE a = 1 ORDER BY 1, 2, 3 RETURNING *;
-- test INSERT INTO a table with UNIQUE
CREATE TABLE non_dist_unique (a INT UNIQUE, b INT);
INSERT INTO non_dist_unique SELECT a, b FROM dist_table;
SELECT * FROM non_dist_unique ORDER BY 1;
INSERT INTO non_dist_unique SELECT a+1, b FROM dist_table ON CONFLICT (a) DO NOTHING;
SELECT * FROM non_dist_unique ORDER BY 1;
INSERT INTO non_dist_unique SELECT a+2, b FROM dist_table ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + non_dist_unique.b;
SELECT * FROM non_dist_unique ORDER BY 1;
DROP TABLE non_dist_unique;
-- test INSERT INTO a table with DEFAULT
CREATE TABLE non_dist_default (a INT, c TEXT DEFAULT 'def');
INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a = 1;
SELECT * FROM non_dist_default ORDER BY 1, 2;
INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a > 1;
SELECT * FROM non_dist_default ORDER BY 1, 2;
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1;
SELECT * FROM non_dist_default ORDER BY 1, 2;
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a > 1;
SELECT * FROM non_dist_default ORDER BY 1, 2;
DROP TABLE non_dist_default;
-- test CTEs
WITH with_table AS (SELECT a, c FROM dist_table ORDER BY a LIMIT 2) INSERT INTO non_dist_2 SELECT * FROM with_table;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
INSERT INTO non_dist_2 WITH with_table AS (SELECT a, c FROM dist_table ORDER BY a LIMIT 2) SELECT * FROM with_table;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
TRUNCATE non_dist_2;
WITH deleted_rows AS (DELETE FROM dist_table WHERE a < 3 RETURNING a, c) INSERT INTO non_dist_2 SELECT * FROM deleted_rows;
SELECT * FROM dist_table ORDER BY 1, 2, 3;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
TRUNCATE non_dist_2;
INSERT INTO dist_table VALUES (1, 6, 'txt1'), (2, 7, 'txt2');
WITH insert_table AS (INSERT INTO non_dist_2 SELECT a, c FROM dist_table RETURNING *) SELECT * FROM insert_table ORDER BY 1, 2;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
TRUNCATE non_dist_2;
-- test PREPARE
PREPARE insert_select_into_local AS INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE a = 1;
EXECUTE insert_select_into_local;
EXECUTE insert_select_into_local;
EXECUTE insert_select_into_local;
EXECUTE insert_select_into_local;
EXECUTE insert_select_into_local;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
EXECUTE insert_select_into_local;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
TRUNCATE non_dist_2;
DEALLOCATE insert_select_into_local;
PREPARE insert_select_into_local(int) AS INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE a = $1;
EXECUTE insert_select_into_local(2);
EXECUTE insert_select_into_local(2);
EXECUTE insert_select_into_local(2);
EXECUTE insert_select_into_local(2);
EXECUTE insert_select_into_local(2);
SELECT * FROM non_dist_2 ORDER BY 1, 2;
EXECUTE insert_select_into_local(2);
SELECT * FROM non_dist_2 ORDER BY 1, 2;
TRUNCATE non_dist_2;
DEALLOCATE insert_select_into_local;
PREPARE insert_select_into_local(int) AS INSERT INTO non_dist_2 SELECT a, c FROM dist_table WHERE b = $1;
EXECUTE insert_select_into_local(8);
EXECUTE insert_select_into_local(8);
EXECUTE insert_select_into_local(8);
EXECUTE insert_select_into_local(8);
EXECUTE insert_select_into_local(8);
SELECT * FROM non_dist_2 ORDER BY 1, 2;
EXECUTE insert_select_into_local(8);
SELECT * FROM non_dist_2 ORDER BY 1, 2;
TRUNCATE non_dist_2;
DEALLOCATE insert_select_into_local;
-- test reference table
CREATE TABLE ref_table (a INT, b INT, c TEXT);
SELECT create_reference_table('ref_table');
INSERT INTO ref_table VALUES (1, 6, 'txt1'), (2, 7, 'txt2'), (3, 8, 'txt3');
INSERT INTO non_dist_2 SELECT a, c FROM ref_table;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
TRUNCATE non_dist_2;
\set VERBOSITY terse
DROP SCHEMA insert_select_into_local_table CASCADE;

View File

@ -1,15 +1,3 @@
------------------------------------
------------------------------------
-- Insert into local table
------------------------------------
------------------------------------
CREATE TABLE test_table_1(id int);
INSERT INTO test_table_1
SELECT user_id FROM users_table;
DROP TABLE test_table_1;
------------------------------------
------------------------------------
-- Vanilla funnel query