Fix issues with insert..select casts and column ordering

pull/6106/head
Marco Slot 2022-07-28 16:24:17 +02:00
parent 4fae4db6e0
commit 232ad4735b
12 changed files with 1487 additions and 220 deletions

View File

@ -55,7 +55,6 @@
bool EnableRepartitionedInsertSelect = true;
static Query * WrapSubquery(Query *subquery);
static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
char *resultIdPrefix);
static void ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
@ -299,100 +298,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
}
/*
* BuildSelectForInsertSelect extracts the SELECT part from an INSERT...SELECT query.
* If the INSERT...SELECT has CTEs then these are added to the resulting SELECT instead.
*/
Query *
BuildSelectForInsertSelect(Query *insertSelectQuery)
{
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
Query *selectQuery = selectRte->subquery;
/*
* Wrap the SELECT as a subquery if the INSERT...SELECT has CTEs or the SELECT
* has top-level set operations.
*
* We could simply wrap all queries, but that might create a subquery that is
* not supported by the logical planner. Since the logical planner also does
* not support CTEs and top-level set operations, we can wrap queries containing
* those without breaking anything.
*/
if (list_length(insertSelectQuery->cteList) > 0)
{
selectQuery = WrapSubquery(selectRte->subquery);
/* copy CTEs from the INSERT ... SELECT statement into outer SELECT */
selectQuery->cteList = copyObject(insertSelectQuery->cteList);
selectQuery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE;
}
else if (selectQuery->setOperations != NULL)
{
/* top-level set operations confuse the ReorderInsertSelectTargetLists logic */
selectQuery = WrapSubquery(selectRte->subquery);
}
return selectQuery;
}
/*
* WrapSubquery wraps the given query as a subquery in a newly constructed
* "SELECT * FROM (...subquery...) citus_insert_select_subquery" query.
*/
static Query *
WrapSubquery(Query *subquery)
{
ParseState *pstate = make_parsestate(NULL);
List *newTargetList = NIL;
Query *outerQuery = makeNode(Query);
outerQuery->commandType = CMD_SELECT;
/* create range table entries */
Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL);
RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem(
addRangeTableEntryForSubquery(
pstate, subquery,
selectAlias, false, true));
outerQuery->rtable = list_make1(newRangeTableEntry);
/* set the FROM expression to the subquery */
RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
newRangeTableRef->rtindex = 1;
outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
/* create a target list that matches the SELECT */
TargetEntry *selectTargetEntry = NULL;
foreach_ptr(selectTargetEntry, subquery->targetList)
{
/* exactly 1 entry in FROM */
int indexInRangeTable = 1;
if (selectTargetEntry->resjunk)
{
continue;
}
Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno,
exprType((Node *) selectTargetEntry->expr),
exprTypmod((Node *) selectTargetEntry->expr),
exprCollation((Node *) selectTargetEntry->expr), 0);
TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar,
selectTargetEntry->resno,
selectTargetEntry->resname,
selectTargetEntry->resjunk);
newTargetList = lappend(newTargetList, newSelectTargetEntry);
}
outerQuery->targetList = newTargetList;
return outerQuery;
}
/*
* TwoPhaseInsertSelectTaskList generates a list of tasks for a query that
* inserts into a target relation and selects from a set of co-located

View File

@ -48,8 +48,10 @@
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include <nodes/print.h>
static void PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery);
static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId,
Query *originalQuery,
PlannerRestrictionContext *
@ -83,6 +85,7 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse,
ParamListInfo boundParams);
static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery);
static Query * WrapSubquery(Query *subquery);
static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList);
static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
Oid targetRelationId);
@ -370,14 +373,17 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
* combineQuery, this function also creates a dummy combineQuery for that.
*/
DistributedPlan *
CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamListInfo
boundParams, bool hasUnresolvedParams,
CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery,
ParamListInfo boundParams, bool hasUnresolvedParams,
PlannerRestrictionContext *plannerRestrictionContext)
{
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(originalQuery);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
Query *selectQuery = selectRte->subquery;
Query *selectQuery = BuildSelectForInsertSelect(originalQuery);
originalQuery->cteList = NIL;
DistributedPlan *distPlan = CreateDistributedPlan(planId, selectQuery,
copyObject(selectQuery),
boundParams, hasUnresolvedParams,
@ -417,12 +423,84 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamL
* distributed select instead of returning it.
*/
selectRte->subquery = distPlan->combineQuery;
distPlan->combineQuery = originalQuery;
distPlan->combineQuery = insertSelectQuery;
return distPlan;
}
/*
* PrepareInsertSelectForCitusPlanner prepares an INSERT..SELECT query tree
* that was passed to the planner for use by Citus.
*
* First, it rebuilds the target lists of the INSERT and the SELECT
* to be in the same order, which is not guaranteed in the parse tree.
*
* Second, some of the constants in the target list will have type
* "unknown", which would confuse the Citus planner. To address that,
* we add casts to SELECT target list entries whose type does not correspond
* to the destination. This also helps us feed the output directly into
* a COPY stream for INSERT..SELECT via coordinator.
*
* In case of UNION or other set operations, the SELECT does not have a
* clearly defined target list, so we first wrap the UNION in a subquery.
* UNION queries do not have the "unknown" type problem.
*
* Finally, if the INSERT has CTEs, we move those CTEs into the SELECT,
* such that we can plan the SELECT as an independent query. To ensure
* the ctelevelsup for CTE RTE's remain the same, we wrap the SELECT into
* a subquery, unless we already did so in case of a UNION.
*/
static void
PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery)
{
RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
Oid targetRelationId = insertRte->relid;
bool isWrapped = false;
if (selectRte->subquery->setOperations != NULL)
{
/*
* Prepare UNION query for reordering and adding casts by
* wrapping it in a subquery to have a single target list.
*/
selectRte->subquery = WrapSubquery(selectRte->subquery);
isWrapped = true;
}
/* this is required for correct deparsing of the query */
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
/*
* Cast types of insert target list and select projection list to
* match the column types of the target relation.
*/
selectRte->subquery->targetList =
AddInsertSelectCasts(insertSelectQuery->targetList,
copyObject(selectRte->subquery->targetList),
targetRelationId);
if (list_length(insertSelectQuery->cteList) > 0)
{
if (!isWrapped)
{
/*
* By wrapping the SELECT in a subquery, we can avoid adjusting
* ctelevelsup in RTE's that point to the CTEs.
*/
selectRte->subquery = WrapSubquery(selectRte->subquery);
}
/* copy CTEs from the INSERT ... SELECT statement into outer SELECT */
selectRte->subquery->cteList = copyObject(insertSelectQuery->cteList);
selectRte->subquery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE;
insertSelectQuery->cteList = NIL;
}
}
/*
* CreateCombineQueryForRouterPlan is used for creating a dummy combineQuery
* for a router plan, since router plans normally don't have one.
@ -881,12 +959,11 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
ListCell *insertTargetEntryCell;
List *newSubqueryTargetlist = NIL;
List *newInsertTargetlist = NIL;
List *columnNameList = NIL;
int resno = 1;
Index insertTableId = 1;
Index selectTableId = 2;
int targetEntryIndex = 0;
AssertArg(InsertSelectIntoCitusTable(originalQuery));
Query *subquery = subqueryRte->subquery;
Oid insertRelationId = insertRte->relid;
@ -954,6 +1031,9 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
newSubqueryTargetEntry);
}
Value *columnName = makeString(newSubqueryTargetEntry->resname);
columnNameList = lappend(columnNameList, columnName);
/*
* The newly created select target entry cannot be a junk entry since junk
* entries are not in the final target list and we're processing the
@ -961,7 +1041,7 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
*/
Assert(!newSubqueryTargetEntry->resjunk);
Var *newInsertVar = makeVar(insertTableId, originalAttrNo,
Var *newInsertVar = makeVar(selectTableId, resno,
exprType((Node *) newSubqueryTargetEntry->expr),
exprTypmod((Node *) newSubqueryTargetEntry->expr),
exprCollation((Node *) newSubqueryTargetEntry->expr),
@ -1005,6 +1085,7 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
originalQuery->targetList = newInsertTargetlist;
subquery->targetList = newSubqueryTargetlist;
subqueryRte->eref->colnames = columnNameList;
return NULL;
}
@ -1412,19 +1493,10 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
return distributedPlan;
}
Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery);
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
selectRte->subquery = selectQuery;
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
/*
* Cast types of insert target list and select projection list to
* match the column types of the target relation.
*/
selectQuery->targetList =
AddInsertSelectCasts(insertSelectQuery->targetList,
selectQuery->targetList,
targetRelationId);
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
Query *selectQuery = selectRte->subquery;
/*
* Later we might need to call WrapTaskListForProjection(), which requires
@ -1506,6 +1578,63 @@ InsertSelectResultIdPrefix(uint64 planId)
}
/*
* WrapSubquery wraps the given query as a subquery in a newly constructed
* "SELECT * FROM (...subquery...) citus_insert_select_subquery" query.
*/
static Query *
WrapSubquery(Query *subquery)
{
ParseState *pstate = make_parsestate(NULL);
List *newTargetList = NIL;
Query *outerQuery = makeNode(Query);
outerQuery->commandType = CMD_SELECT;
/* create range table entries */
Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL);
RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem(
addRangeTableEntryForSubquery(
pstate, subquery,
selectAlias, false, true));
outerQuery->rtable = list_make1(newRangeTableEntry);
/* set the FROM expression to the subquery */
RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
newRangeTableRef->rtindex = 1;
outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
/* create a target list that matches the SELECT */
TargetEntry *selectTargetEntry = NULL;
foreach_ptr(selectTargetEntry, subquery->targetList)
{
/* exactly 1 entry in FROM */
int indexInRangeTable = 1;
if (selectTargetEntry->resjunk)
{
continue;
}
Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno,
exprType((Node *) selectTargetEntry->expr),
exprTypmod((Node *) selectTargetEntry->expr),
exprCollation((Node *) selectTargetEntry->expr), 0);
TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar,
selectTargetEntry->resno,
selectTargetEntry->resname,
selectTargetEntry->resjunk);
newTargetList = lappend(newTargetList, newSelectTargetEntry);
}
outerQuery->targetList = newTargetList;
return outerQuery;
}
/*
* RelabelTargetEntryList relabels select target list to have matching names with
* insert target list.
@ -1557,16 +1686,22 @@ AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
{
TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell);
TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell);
Var *insertColumn = (Var *) insertEntry->expr;
Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor,
insertEntry->resno - 1);
Oid sourceType = insertColumn->vartype;
Oid sourceType = exprType((Node *) selectEntry->expr);
Oid targetType = attr->atttypid;
if (sourceType != targetType)
{
insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType,
attr->attcollation, attr->atttypmod);
/* ReorderInsertSelectTargetLists ensures we only have Vars */
Assert(IsA(insertEntry->expr, Var));
/* we will cast the SELECT expression, so the type changes */
Var *insertVar = (Var *) insertEntry->expr;
insertVar->vartype = targetType;
insertVar->vartypmod = attr->atttypmod;
insertVar->varcollid = attr->attcollation;
/*
* We cannot modify the selectEntry in-place, because ORDER BY or

View File

@ -3558,19 +3558,9 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query)
NULL, NULL);
}
if (contain_nextval_expression_walker((Node *) query->targetList, NULL))
{
/*
* We let queries with nextval in the target list fall through to
* the logical planner, which knows how to handle those queries.
*/
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"Sequences cannot be used in router queries",
NULL, NULL);
}
bool hasPostgresOrCitusLocalTable = false;
bool hasDistributedTable = false;
bool hasReferenceTable = false;
ExtractRangeTableRelationWalker((Node *) query, &rangeTableRelationList);
foreach(rangeTableRelationCell, rangeTableRelationList)
@ -3586,6 +3576,11 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query)
hasPostgresOrCitusLocalTable = true;
continue;
}
else if (IsCitusTableType(distributedTableId, REFERENCE_TABLE))
{
hasReferenceTable = true;
continue;
}
else if (IsCitusTableType(distributedTableId, CITUS_LOCAL_TABLE))
{
hasPostgresOrCitusLocalTable = true;
@ -3628,6 +3623,28 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query)
}
}
/*
* We want to make sure nextval happens on the coordinator / the current
* node, since the user may have certain expectations around the values
* produced by the sequence. We therefore cannot push down the nextval
* call as part of a router query.
*
* We let queries with nextval in the target list fall through to
* the logical planner, which will ensure that the nextval is called
* in the combine query on the coordinator.
*
* If there are no distributed or reference tables in the query,
* then the query will anyway happen on the coordinator, so we can
* allow nextval.
*/
if (contain_nextval_expression_walker((Node *) query->targetList, NULL) &&
(hasDistributedTable || hasReferenceTable))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"Sequences cannot be used in router queries",
NULL, NULL);
}
/* local tables are not allowed if there are distributed tables */
if (hasPostgresOrCitusLocalTable && hasDistributedTable)
{

View File

@ -19,7 +19,6 @@
extern bool EnableRepartitionedInsertSelect;
extern TupleTableSlot * NonPushableInsertSelectExecScan(CustomScanState *node);
extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery);
extern bool IsSupportedRedistributionTarget(Oid targetRelationId);
extern bool IsRedistributablePlan(Plan *selectPlan);

View File

@ -847,8 +847,8 @@ HAVING (max(table_2.value) >= (SELECT value FROM a));
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
DEBUG: push down of limit count: 1
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
@ -856,9 +856,9 @@ DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint
NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint
DEBUG: Subplan XXX_2 will be written to local file
NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
NOTICE: executing the command locally: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
NOTICE: executing the command locally: SELECT count(*) AS auto_coerced_by_citus_0, (worker_column_1)::text AS auto_coerced_by_citus_1, worker_column_1 AS discarded_target_item_1, max(worker_column_2) AS worker_column_4 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
NOTICE: executing the command locally: SELECT count(*) AS auto_coerced_by_citus_0, (worker_column_1)::text AS auto_coerced_by_citus_1, worker_column_1 AS discarded_target_item_1, max(worker_column_2) AS worker_column_4 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
NOTICE: executing the command locally: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
NOTICE: executing the copy locally for shard xxxxx
WITH stats AS (
SELECT count(key) m FROM table_1

View File

@ -149,6 +149,67 @@ SELECT * FROM non_dist_unique ORDER BY 1;
5 | 8
(5 rows)
INSERT INTO non_dist_unique
SELECT a+1, b FROM dist_table
UNION ALL
SELECT a+100, b FROM dist_table
ON CONFLICT (a) DO NOTHING;
SELECT * FROM non_dist_unique ORDER BY 1;
a | b
---------------------------------------------------------------------
1 | 6
2 | 7
3 | 14
4 | 15
5 | 8
101 | 6
102 | 7
103 | 8
(8 rows)
INSERT INTO non_dist_unique
SELECT a+1, b FROM dist_table
UNION ALL
SELECT a+100, b FROM dist_table
ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + 1;
SELECT * FROM non_dist_unique ORDER BY 1;
a | b
---------------------------------------------------------------------
1 | 6
2 | 7
3 | 8
4 | 9
5 | 8
101 | 7
102 | 8
103 | 9
(8 rows)
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
INSERT INTO non_dist_unique
WITH cte2 AS (SELECT s FROM generate_series(1,10) s)
SELECT a+1, b FROM dist_table WHERE b IN (SELECT s FROM cte1)
UNION ALL
SELECT s, s FROM cte1
ON CONFLICT (a) DO NOTHING;
SELECT * FROM non_dist_unique ORDER BY 1;
a | b
---------------------------------------------------------------------
1 | 6
2 | 7
3 | 8
4 | 9
5 | 8
6 | 6
7 | 7
8 | 8
9 | 9
10 | 10
101 | 7
102 | 8
103 | 9
(13 rows)
DROP TABLE non_dist_unique;
-- test INSERT INTO a table with DEFAULT
CREATE TABLE non_dist_default (a INT, c TEXT DEFAULT 'def');
@ -168,6 +229,16 @@ SELECT * FROM non_dist_default ORDER BY 1, 2;
3 | def
(3 rows)
SELECT alter_table_set_access_method('non_dist_default', 'columnar');
NOTICE: creating a new table for insert_select_into_local_table.non_dist_default
NOTICE: moving the data of insert_select_into_local_table.non_dist_default
NOTICE: dropping the old insert_select_into_local_table.non_dist_default
NOTICE: renaming the new table to insert_select_into_local_table.non_dist_default
alter_table_set_access_method
---------------------------------------------------------------------
(1 row)
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1;
SELECT * FROM non_dist_default ORDER BY 1, 2;
a | c
@ -354,6 +425,691 @@ SELECT * FROM non_dist_2 ORDER BY 1, 2;
(3 rows)
TRUNCATE non_dist_2;
-- check issue https://github.com/citusdata/citus/issues/5858
CREATE TABLE local_dest_table(
col_1 integer,
col_2 integer,
col_3 text,
col_4 text,
drop_col text,
col_5 bigint,
col_6 text,
col_7 text default 'col_7',
col_8 varchar
);
ALTER TABLE local_dest_table DROP COLUMN drop_col;
CREATE TABLE dist_source_table_1(
int_col integer,
drop_col text,
text_col_1 text,
dist_col integer,
text_col_2 text
);
SELECT create_distributed_table('dist_source_table_1', 'dist_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE dist_source_table_1 DROP COLUMN drop_col;
INSERT INTO dist_source_table_1 VALUES (1, 'value', 1, 'value');
INSERT INTO dist_source_table_1 VALUES (2, 'value2', 1, 'value');
INSERT INTO dist_source_table_1 VALUES (3, 'value', 3, 'value3');
CREATE TABLE dist_source_table_2(
dist_col integer,
int_col integer
);
SELECT create_distributed_table('dist_source_table_2', 'dist_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_source_table_2 VALUES (1, 1);
INSERT INTO dist_source_table_2 VALUES (2, 2);
INSERT INTO dist_source_table_2 VALUES (4, 4);
CREATE TABLE local_source_table_1 AS SELECT * FROM dist_source_table_1;
CREATE TABLE local_source_table_2 AS SELECT * FROM dist_source_table_2;
/*
* query_results_equal compares the effect of two queries on local_dest_table.
* We use this to ensure that INSERT INTO local_dest_table SELECT behaves
* the same when selecting from a regular table (postgres handles it) and
* a distributed table (Citus handles it).
*
* The queries are generated by calling format() on query_table twice,
* once for each source_table argument.
*/
CREATE OR REPLACE FUNCTION query_results_equal(query_template text, source_table_1 text, source_table_2 text)
RETURNS bool
AS $$
DECLARE
l1 local_dest_table[];
l2 local_dest_table[];
BEGIN
/* get the results using source_table_1 as source */
TRUNCATE local_dest_table;
EXECUTE format(query_template, source_table_1);
SELECT array_agg(l) INTO l1
FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l;
/* get the results using source_table_2 as source */
TRUNCATE local_dest_table;
EXECUTE format(query_template, source_table_2);
SELECT array_agg(l) INTO l2
FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l;
RAISE NOTICE 'l2=%', l1;
RAISE NOTICE 'l2=%', l2;
RETURN l1 = l2;
END;
$$ LANGUAGE plpgsql;
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table
SELECT
t1.dist_col,
1,
'string1',
'string2',
2,
'string3',
t1.text_col_1,
t1.text_col_2
FROM %1$s_1 t1
WHERE t1.int_col IN (SELECT int_col FROM %1$s_2)
$$, 'local_source_table', 'dist_source_table');
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table
SELECT
t1.dist_col,
1,
'string1',
'string2',
2,
'string3',
t1.text_col_1,
t1.text_col_2
FROM %1$s t1
returning *
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_3, col_4) SELECT
'string1',
'string2'::text
FROM %1$s t1
returning *;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_7, col_4) SELECT
'string1',
'string2'::text
FROM %1$s t1
returning *;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_4, col_3) SELECT
'string1',
'string2'::text
FROM %1$s t1
WHERE dist_col = 1
returning *;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_4, col_1)
SELECT
'string1',
dist_col
FROM %1$s
UNION ALL
SELECT
'string',
int_col
FROM %1$s;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
INSERT INTO local_dest_table (col_4, col_1)
SELECT
'string1',
dist_col
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
UNION ALL
SELECT
'string',
int_col
FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1)
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s)
INSERT INTO local_dest_table (col_4, col_1)
SELECT
'string1',
dist_col
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
UNION ALL
SELECT
*
FROM cte1
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_3)
SELECT t1.text_col_1
FROM %1$s t1
GROUP BY t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8)
SELECT
max(t1.dist_col),
3,
'string_3',
4,
44,
t1.text_col_1,
'string_1000'
FROM %1$s t1
GROUP BY t1.text_col_2, t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(1,3,string_3,,4,44,value,string_1000)","(1,3,string_3,,4,44,value2,string_1000)","(3,3,string_3,,4,44,value,string_1000)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(1,3,string_3,,4,44,value,string_1000)","(1,3,string_3,,4,44,value2,string_1000)","(3,3,string_3,,4,44,value,string_1000)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_7, col_8)
SELECT
t1.text_col_1,
'string_1000'
FROM dist_source_table_1 t1
GROUP BY t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_6, col_7, col_8)
SELECT
'string_4',
t1.text_col_1,
'string_1000'
FROM %1$s t1
GROUP BY t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_5, col_3)
SELECT 12, 'string_11' FROM %1$s t1
UNION
SELECT int_col, 'string' FROM %1$s;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table(col_3, col_2)
SELECT text_col_1, count(*) FROM %1$s GROUP BY 1
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,1,value2,,,,col_7,)","(,2,value,,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,1,value2,,,,col_7,)","(,2,value,,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table(col_3, col_5)
SELECT text_col_1, count(*)::int FROM %1$s GROUP BY 1
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,value,,2,,col_7,)","(,,value2,,1,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,value,,2,,col_7,)","(,,value2,,1,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
-- repeat above tests with Citus local table
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table
SELECT
t1.dist_col,
1,
'string1',
'string2',
2,
'string3',
t1.text_col_1,
t1.text_col_2
FROM %1$s_1 t1
WHERE t1.int_col IN (SELECT int_col FROM %1$s_2)
$$, 'local_source_table', 'dist_source_table');
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table
SELECT
t1.dist_col,
1,
'string1',
'string2',
2,
'string3',
t1.text_col_1,
t1.text_col_2
FROM %1$s t1
returning *
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_3, col_4) SELECT
'string1',
'string2'::text
FROM %1$s t1
returning *;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_7, col_4) SELECT
'string1',
'string2'::text
FROM %1$s t1
returning *;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_4, col_3) SELECT
'string1',
'string2'::text
FROM %1$s t1
WHERE dist_col = 1
returning *;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_4, col_1)
SELECT
'string1',
dist_col
FROM %1$s
UNION ALL
SELECT
'string',
int_col
FROM %1$s;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
INSERT INTO local_dest_table (col_4, col_1)
SELECT
'string1',
dist_col
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
UNION ALL
SELECT
'string',
int_col
FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1)
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s)
INSERT INTO local_dest_table (col_4, col_1)
SELECT
'string1',
dist_col
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
UNION ALL
SELECT
*
FROM cte1
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_3)
SELECT t1.text_col_1
FROM %1$s t1
GROUP BY t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8)
SELECT
max(t1.dist_col),
3,
'string_3',
4,
44,
t1.text_col_1,
'string_1000'
FROM %1$s t1
GROUP BY t1.text_col_2, t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(1,3,string_3,,4,44,value,string_1000)","(1,3,string_3,,4,44,value2,string_1000)","(3,3,string_3,,4,44,value,string_1000)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(1,3,string_3,,4,44,value,string_1000)","(1,3,string_3,,4,44,value2,string_1000)","(3,3,string_3,,4,44,value,string_1000)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_7, col_8)
SELECT
t1.text_col_1,
'string_1000'
FROM dist_source_table_1 t1
GROUP BY t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_6, col_7, col_8)
SELECT
'string_4',
t1.text_col_1,
'string_1000'
FROM %1$s t1
GROUP BY t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_5, col_3)
SELECT 12, 'string_11' FROM %1$s t1
UNION
SELECT int_col, 'string' FROM %1$s;
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table(col_3, col_2)
SELECT text_col_1, count(*) FROM %1$s GROUP BY 1
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,1,value2,,,,col_7,)","(,2,value,,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,1,value2,,,,col_7,)","(,2,value,,,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table(col_3, col_5)
SELECT text_col_1, count(*)::int FROM %1$s GROUP BY 1
$$, 'local_source_table_1', 'dist_source_table_1');
NOTICE: l2={"(,,value,,2,,col_7,)","(,,value2,,1,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
NOTICE: l2={"(,,value,,2,,col_7,)","(,,value2,,1,,col_7,)"}
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
query_results_equal
---------------------------------------------------------------------
t
(1 row)
-- go back to proper local table for remaining tests
TRUNCATE local_dest_table;
SELECT undistribute_table('local_source_table_1');
ERROR: cannot undistribute table because the table is not distributed
-- use a sequence (cannot use query_results_equal, since sequence values would not match)
CREATE SEQUENCE seq;
BEGIN;
INSERT INTO local_dest_table (col_5, col_3)
SELECT 12, 'string_11' FROM dist_source_table_1
UNION
SELECT nextval('seq'), 'string' FROM dist_source_table_1;
SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8;
col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8
---------------------------------------------------------------------
| | string | | 1 | | col_7 |
| | string | | 2 | | col_7 |
| | string | | 3 | | col_7 |
| | string_11 | | 12 | | col_7 |
(4 rows)
ROLLBACK;
-- add a bigserial column
ALTER TABLE local_dest_table ADD COLUMN col_9 bigserial;
-- not supported due to limitations in nextval handling
INSERT INTO local_dest_table (col_5, col_3)
SELECT 12, 'string_11' FROM dist_source_table_1
UNION
SELECT 11, 'string' FROM dist_source_table_1;
SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8;
col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8 | col_9
---------------------------------------------------------------------
| | string | | 11 | | col_7 | | 2
| | string_11 | | 12 | | col_7 | | 1
(2 rows)
BEGIN;
INSERT INTO local_dest_table(col_3, col_2)
SELECT text_col_1, count(*) FROM dist_source_table_1 GROUP BY 1;
SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8;
col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8 | col_9
---------------------------------------------------------------------
| 1 | value2 | | | | col_7 | | 3
| 2 | value | | | | col_7 | | 4
| | string | | 11 | | col_7 | | 2
| | string_11 | | 12 | | col_7 | | 1
(4 rows)
ROLLBACK;
BEGIN;
INSERT INTO local_dest_table (col_4, col_3) SELECT
'string1',
'string2'::text
FROM dist_source_table_1 t1
WHERE dist_col = 1
RETURNING *;
col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8 | col_9
---------------------------------------------------------------------
| | string2 | string1 | | | col_7 | | 5
| | string2 | string1 | | | col_7 | | 6
(2 rows)
ROLLBACK;
\set VERBOSITY terse
DROP SCHEMA insert_select_into_local_table CASCADE;
NOTICE: drop cascades to 5 other objects
NOTICE: drop cascades to 12 other objects

View File

@ -500,7 +500,7 @@ INSERT INTO target_table
SELECT mapped_key, c FROM t NATURAL JOIN source_table;
DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries
DEBUG: generating subplan XXX_1 for CTE t: SELECT mapped_key, a, c FROM insert_select_repartition.source_table WHERE ((a)::double precision OPERATOR(pg_catalog.>) floor(random()))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key AS a, (c)::integer[] AS b FROM (SELECT t.mapped_key, t.c FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key AS a, auto_coerced_by_citus_1 AS b FROM (SELECT t.mapped_key, (t.c)::integer[] AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery
DEBUG: performing repartitioned INSERT ... SELECT
RESET client_min_messages;
SELECT * FROM target_table ORDER BY a;

View File

@ -1347,21 +1347,20 @@ WITH cte1 AS (SELECT * FROM cte1 WHERE EXISTS (SELECT * FROM cte1) LIMIT 5)
SELECT s FROM cte1 WHERE EXISTS (SELECT * FROM cte1);
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> Subquery Scan on citus_insert_select_subquery
-> Result
One-Time Filter: $3
CTE cte1
-> Function Scan on generate_series s
-> Result
One-Time Filter: $3
CTE cte1
-> Limit
InitPlan 2 (returns $1)
-> CTE Scan on cte1 cte1_1
-> Result
One-Time Filter: $1
-> CTE Scan on cte1 cte1_2
InitPlan 4 (returns $3)
-> CTE Scan on cte1 cte1_3
-> CTE Scan on cte1
CTE cte1
-> Limit
InitPlan 2 (returns $1)
-> CTE Scan on cte1 cte1_1
-> Result
One-Time Filter: $1
-> CTE Scan on cte1 cte1_2
InitPlan 4 (returns $3)
-> CTE Scan on cte1 cte1_3
-> CTE Scan on cte1
EXPLAIN (COSTS OFF)
INSERT INTO lineitem_hash_part
( SELECT s FROM generate_series(1,5) s) UNION

View File

@ -667,7 +667,7 @@ DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_1 for CTE fist_table_agg: SELECT (max(value_1) OPERATOR(pg_catalog.+) 1) AS v1_agg, user_id FROM public.raw_events_first GROUP BY user_id
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, v1_agg AS value_1_agg FROM (SELECT fist_table_agg.v1_agg, fist_table_agg.user_id FROM (SELECT intermediate_result.v1_agg, intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v1_agg integer, user_id integer)) fist_table_agg) citus_insert_select_subquery
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, v1_agg AS value_1_agg FROM (SELECT fist_table_agg.user_id, fist_table_agg.v1_agg FROM (SELECT intermediate_result.v1_agg, intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v1_agg integer, user_id integer)) fist_table_agg) citus_insert_select_subquery
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
ROLLBACK;
@ -2712,7 +2712,6 @@ WITH top10 AS (
)
INSERT INTO dist_table_with_sequence (value_1)
SELECT * FROM top10;
ERROR: cannot handle complex subqueries when the router executor is disabled
SELECT * FROM dist_table_with_sequence ORDER BY user_id, value_1;
user_id | value_1
---------------------------------------------------------------------
@ -2797,7 +2796,6 @@ WITH top10 AS (
)
INSERT INTO dist_table_with_user_sequence (value_1)
SELECT * FROM top10;
ERROR: cannot handle complex subqueries when the router executor is disabled
SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1;
user_id | value_1
---------------------------------------------------------------------
@ -3234,6 +3232,47 @@ INSERT INTO raw_events_first
SELECT * FROM raw_events_first OFFSET 0
ON CONFLICT DO NOTHING;
ABORT;
-- test fix for issue https://github.com/citusdata/citus/issues/5891
CREATE TABLE dist_table_1(
dist_col integer,
int_col integer,
text_col_1 text,
text_col_2 text
);
SELECT create_distributed_table('dist_table_1', 'dist_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_table_1 VALUES (1, 1, 'string', 'string');
CREATE TABLE dist_table_2(
dist_col integer,
int_col integer
);
SELECT create_distributed_table('dist_table_2', 'dist_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_table_2 VALUES (1, 1);
with a as (select random()) INSERT INTO dist_table_1
SELECT
t1.dist_col,
1,
'string',
'string'
FROM a, dist_table_1 t1
join dist_table_2 t2 using (dist_col)
limit 1
returning text_col_1;
text_col_1
---------------------------------------------------------------------
string
(1 row)
DROP TABLE dist_table_1, dist_table_2;
-- wrap in a transaction to improve performance
BEGIN;
DROP TABLE coerce_events;

View File

@ -161,70 +161,6 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator
0
(1 row)
<<<<<<< HEAD
=======
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1
DEBUG: push down of limit count: 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
count | key
---------------------------------------------------------------------
1 | 1
(1 row)
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
INSERT INTO table_1_rep SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1
DEBUG: push down of limit count: 1
DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be written to local file
WITH stats AS (
SELECT count(key) m FROM table_1_rep
),
inserts AS (
INSERT INTO table_2_rep
SELECT key, count(*)
FROM table_1_rep
WHERE key >= (SELECT m FROM stats)
GROUP BY key
HAVING count(*) <= (SELECT m FROM stats)
LIMIT 1
RETURNING *
) SELECT count(*) FROM inserts;
DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM mx_coordinator_shouldhaveshards.table_1_rep
DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO mx_coordinator_shouldhaveshards.table_2_rep (key, value) SELECT key, count(*) AS count FROM mx_coordinator_shouldhaveshards.table_1_rep WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2_rep.key, table_2_rep.value
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: push down of limit count: 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be written to local file
DEBUG: Collecting INSERT ... SELECT results on coordinator
count
---------------------------------------------------------------------
0
(1 row)
>>>>>>> cff013a05... Fix issues with insert..select casts and column ordering
\c - - - :master_port
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
?column?

View File

@ -64,6 +64,30 @@ INSERT INTO non_dist_unique SELECT a+1, b FROM dist_table ON CONFLICT (a) DO NOT
SELECT * FROM non_dist_unique ORDER BY 1;
INSERT INTO non_dist_unique SELECT a+2, b FROM dist_table ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + non_dist_unique.b;
SELECT * FROM non_dist_unique ORDER BY 1;
INSERT INTO non_dist_unique
SELECT a+1, b FROM dist_table
UNION ALL
SELECT a+100, b FROM dist_table
ON CONFLICT (a) DO NOTHING;
SELECT * FROM non_dist_unique ORDER BY 1;
INSERT INTO non_dist_unique
SELECT a+1, b FROM dist_table
UNION ALL
SELECT a+100, b FROM dist_table
ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + 1;
SELECT * FROM non_dist_unique ORDER BY 1;
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
INSERT INTO non_dist_unique
WITH cte2 AS (SELECT s FROM generate_series(1,10) s)
SELECT a+1, b FROM dist_table WHERE b IN (SELECT s FROM cte1)
UNION ALL
SELECT s, s FROM cte1
ON CONFLICT (a) DO NOTHING;
SELECT * FROM non_dist_unique ORDER BY 1;
DROP TABLE non_dist_unique;
@ -73,6 +97,7 @@ INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a = 1;
SELECT * FROM non_dist_default ORDER BY 1, 2;
INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a > 1;
SELECT * FROM non_dist_default ORDER BY 1, 2;
SELECT alter_table_set_access_method('non_dist_default', 'columnar');
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1;
SELECT * FROM non_dist_default ORDER BY 1, 2;
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a > 1;
@ -149,5 +174,427 @@ INSERT INTO non_dist_2 SELECT a, c FROM ref_table;
SELECT * FROM non_dist_2 ORDER BY 1, 2;
TRUNCATE non_dist_2;
-- check issue https://github.com/citusdata/citus/issues/5858
CREATE TABLE local_dest_table(
col_1 integer,
col_2 integer,
col_3 text,
col_4 text,
drop_col text,
col_5 bigint,
col_6 text,
col_7 text default 'col_7',
col_8 varchar
);
ALTER TABLE local_dest_table DROP COLUMN drop_col;
CREATE TABLE dist_source_table_1(
int_col integer,
drop_col text,
text_col_1 text,
dist_col integer,
text_col_2 text
);
SELECT create_distributed_table('dist_source_table_1', 'dist_col');
ALTER TABLE dist_source_table_1 DROP COLUMN drop_col;
INSERT INTO dist_source_table_1 VALUES (1, 'value', 1, 'value');
INSERT INTO dist_source_table_1 VALUES (2, 'value2', 1, 'value');
INSERT INTO dist_source_table_1 VALUES (3, 'value', 3, 'value3');
CREATE TABLE dist_source_table_2(
dist_col integer,
int_col integer
);
SELECT create_distributed_table('dist_source_table_2', 'dist_col');
INSERT INTO dist_source_table_2 VALUES (1, 1);
INSERT INTO dist_source_table_2 VALUES (2, 2);
INSERT INTO dist_source_table_2 VALUES (4, 4);
CREATE TABLE local_source_table_1 AS SELECT * FROM dist_source_table_1;
CREATE TABLE local_source_table_2 AS SELECT * FROM dist_source_table_2;
/*
* query_results_equal compares the effect of two queries on local_dest_table.
* We use this to ensure that INSERT INTO local_dest_table SELECT behaves
* the same when selecting from a regular table (postgres handles it) and
* a distributed table (Citus handles it).
*
* The queries are generated by calling format() on query_table twice,
* once for each source_table argument.
*/
CREATE OR REPLACE FUNCTION query_results_equal(query_template text, source_table_1 text, source_table_2 text)
RETURNS bool
AS $$
DECLARE
l1 local_dest_table[];
l2 local_dest_table[];
BEGIN
/* get the results using source_table_1 as source */
TRUNCATE local_dest_table;
EXECUTE format(query_template, source_table_1);
SELECT array_agg(l) INTO l1
FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l;
/* get the results using source_table_2 as source */
TRUNCATE local_dest_table;
EXECUTE format(query_template, source_table_2);
SELECT array_agg(l) INTO l2
FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l;
RAISE NOTICE 'l2=%', l1;
RAISE NOTICE 'l2=%', l2;
RETURN l1 = l2;
END;
$$ LANGUAGE plpgsql;
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table
SELECT
t1.dist_col,
1,
'string1',
'string2',
2,
'string3',
t1.text_col_1,
t1.text_col_2
FROM %1$s_1 t1
WHERE t1.int_col IN (SELECT int_col FROM %1$s_2)
$$, 'local_source_table', 'dist_source_table');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table
SELECT
t1.dist_col,
1,
'string1',
'string2',
2,
'string3',
t1.text_col_1,
t1.text_col_2
FROM %1$s t1
returning *
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_3, col_4) SELECT
'string1',
'string2'::text
FROM %1$s t1
returning *;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_7, col_4) SELECT
'string1',
'string2'::text
FROM %1$s t1
returning *;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_4, col_3) SELECT
'string1',
'string2'::text
FROM %1$s t1
WHERE dist_col = 1
returning *;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_4, col_1)
SELECT
'string1',
dist_col
FROM %1$s
UNION ALL
SELECT
'string',
int_col
FROM %1$s;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
INSERT INTO local_dest_table (col_4, col_1)
SELECT
'string1',
dist_col
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
UNION ALL
SELECT
'string',
int_col
FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1)
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s)
INSERT INTO local_dest_table (col_4, col_1)
SELECT
'string1',
dist_col
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
UNION ALL
SELECT
*
FROM cte1
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_3)
SELECT t1.text_col_1
FROM %1$s t1
GROUP BY t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8)
SELECT
max(t1.dist_col),
3,
'string_3',
4,
44,
t1.text_col_1,
'string_1000'
FROM %1$s t1
GROUP BY t1.text_col_2, t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_7, col_8)
SELECT
t1.text_col_1,
'string_1000'
FROM dist_source_table_1 t1
GROUP BY t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_6, col_7, col_8)
SELECT
'string_4',
t1.text_col_1,
'string_1000'
FROM %1$s t1
GROUP BY t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_5, col_3)
SELECT 12, 'string_11' FROM %1$s t1
UNION
SELECT int_col, 'string' FROM %1$s;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table(col_3, col_2)
SELECT text_col_1, count(*) FROM %1$s GROUP BY 1
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table(col_3, col_5)
SELECT text_col_1, count(*)::int FROM %1$s GROUP BY 1
$$, 'local_source_table_1', 'dist_source_table_1');
-- repeat above tests with Citus local table
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table
SELECT
t1.dist_col,
1,
'string1',
'string2',
2,
'string3',
t1.text_col_1,
t1.text_col_2
FROM %1$s_1 t1
WHERE t1.int_col IN (SELECT int_col FROM %1$s_2)
$$, 'local_source_table', 'dist_source_table');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table
SELECT
t1.dist_col,
1,
'string1',
'string2',
2,
'string3',
t1.text_col_1,
t1.text_col_2
FROM %1$s t1
returning *
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_3, col_4) SELECT
'string1',
'string2'::text
FROM %1$s t1
returning *;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_7, col_4) SELECT
'string1',
'string2'::text
FROM %1$s t1
returning *;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_4, col_3) SELECT
'string1',
'string2'::text
FROM %1$s t1
WHERE dist_col = 1
returning *;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_4, col_1)
SELECT
'string1',
dist_col
FROM %1$s
UNION ALL
SELECT
'string',
int_col
FROM %1$s;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
INSERT INTO local_dest_table (col_4, col_1)
SELECT
'string1',
dist_col
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
UNION ALL
SELECT
'string',
int_col
FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1)
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s)
INSERT INTO local_dest_table (col_4, col_1)
SELECT
'string1',
dist_col
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
UNION ALL
SELECT
*
FROM cte1
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_3)
SELECT t1.text_col_1
FROM %1$s t1
GROUP BY t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8)
SELECT
max(t1.dist_col),
3,
'string_3',
4,
44,
t1.text_col_1,
'string_1000'
FROM %1$s t1
GROUP BY t1.text_col_2, t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_7, col_8)
SELECT
t1.text_col_1,
'string_1000'
FROM dist_source_table_1 t1
GROUP BY t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_6, col_7, col_8)
SELECT
'string_4',
t1.text_col_1,
'string_1000'
FROM %1$s t1
GROUP BY t1.text_col_1;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table (col_5, col_3)
SELECT 12, 'string_11' FROM %1$s t1
UNION
SELECT int_col, 'string' FROM %1$s;
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table(col_3, col_2)
SELECT text_col_1, count(*) FROM %1$s GROUP BY 1
$$, 'local_source_table_1', 'dist_source_table_1');
SELECT * FROM query_results_equal($$
INSERT INTO local_dest_table(col_3, col_5)
SELECT text_col_1, count(*)::int FROM %1$s GROUP BY 1
$$, 'local_source_table_1', 'dist_source_table_1');
-- go back to proper local table for remaining tests
TRUNCATE local_dest_table;
SELECT undistribute_table('local_source_table_1');
-- use a sequence (cannot use query_results_equal, since sequence values would not match)
CREATE SEQUENCE seq;
BEGIN;
INSERT INTO local_dest_table (col_5, col_3)
SELECT 12, 'string_11' FROM dist_source_table_1
UNION
SELECT nextval('seq'), 'string' FROM dist_source_table_1;
SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8;
ROLLBACK;
-- add a bigserial column
ALTER TABLE local_dest_table ADD COLUMN col_9 bigserial;
-- not supported due to limitations in nextval handling
INSERT INTO local_dest_table (col_5, col_3)
SELECT 12, 'string_11' FROM dist_source_table_1
UNION
SELECT 11, 'string' FROM dist_source_table_1;
SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8;
BEGIN;
INSERT INTO local_dest_table(col_3, col_2)
SELECT text_col_1, count(*) FROM dist_source_table_1 GROUP BY 1;
SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8;
ROLLBACK;
BEGIN;
INSERT INTO local_dest_table (col_4, col_3) SELECT
'string1',
'string2'::text
FROM dist_source_table_1 t1
WHERE dist_col = 1
RETURNING *;
ROLLBACK;
\set VERBOSITY terse
DROP SCHEMA insert_select_into_local_table CASCADE;

View File

@ -2335,6 +2335,40 @@ SELECT * FROM raw_events_first OFFSET 0
ON CONFLICT DO NOTHING;
ABORT;
-- test fix for issue https://github.com/citusdata/citus/issues/5891
CREATE TABLE dist_table_1(
dist_col integer,
int_col integer,
text_col_1 text,
text_col_2 text
);
SELECT create_distributed_table('dist_table_1', 'dist_col');
INSERT INTO dist_table_1 VALUES (1, 1, 'string', 'string');
CREATE TABLE dist_table_2(
dist_col integer,
int_col integer
);
SELECT create_distributed_table('dist_table_2', 'dist_col');
INSERT INTO dist_table_2 VALUES (1, 1);
with a as (select random()) INSERT INTO dist_table_1
SELECT
t1.dist_col,
1,
'string',
'string'
FROM a, dist_table_1 t1
join dist_table_2 t2 using (dist_col)
limit 1
returning text_col_1;
DROP TABLE dist_table_1, dist_table_2;
-- wrap in a transaction to improve performance
BEGIN;
DROP TABLE coerce_events;