mirror of https://github.com/citusdata/citus.git
Merge pull request #6059 from citusdata/marcocitus/fix-insert-select
commit
e001ef76cf
|
@ -55,7 +55,6 @@
|
|||
bool EnableRepartitionedInsertSelect = true;
|
||||
|
||||
|
||||
static Query * WrapSubquery(Query *subquery);
|
||||
static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
|
||||
char *resultIdPrefix);
|
||||
static void ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
|
||||
|
@ -299,100 +298,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildSelectForInsertSelect extracts the SELECT part from an INSERT...SELECT query.
|
||||
* If the INSERT...SELECT has CTEs then these are added to the resulting SELECT instead.
|
||||
*/
|
||||
Query *
|
||||
BuildSelectForInsertSelect(Query *insertSelectQuery)
|
||||
{
|
||||
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
||||
Query *selectQuery = selectRte->subquery;
|
||||
|
||||
/*
|
||||
* Wrap the SELECT as a subquery if the INSERT...SELECT has CTEs or the SELECT
|
||||
* has top-level set operations.
|
||||
*
|
||||
* We could simply wrap all queries, but that might create a subquery that is
|
||||
* not supported by the logical planner. Since the logical planner also does
|
||||
* not support CTEs and top-level set operations, we can wrap queries containing
|
||||
* those without breaking anything.
|
||||
*/
|
||||
if (list_length(insertSelectQuery->cteList) > 0)
|
||||
{
|
||||
selectQuery = WrapSubquery(selectRte->subquery);
|
||||
|
||||
/* copy CTEs from the INSERT ... SELECT statement into outer SELECT */
|
||||
selectQuery->cteList = copyObject(insertSelectQuery->cteList);
|
||||
selectQuery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE;
|
||||
}
|
||||
else if (selectQuery->setOperations != NULL)
|
||||
{
|
||||
/* top-level set operations confuse the ReorderInsertSelectTargetLists logic */
|
||||
selectQuery = WrapSubquery(selectRte->subquery);
|
||||
}
|
||||
|
||||
return selectQuery;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* WrapSubquery wraps the given query as a subquery in a newly constructed
|
||||
* "SELECT * FROM (...subquery...) citus_insert_select_subquery" query.
|
||||
*/
|
||||
static Query *
|
||||
WrapSubquery(Query *subquery)
|
||||
{
|
||||
ParseState *pstate = make_parsestate(NULL);
|
||||
List *newTargetList = NIL;
|
||||
|
||||
Query *outerQuery = makeNode(Query);
|
||||
outerQuery->commandType = CMD_SELECT;
|
||||
|
||||
/* create range table entries */
|
||||
Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL);
|
||||
RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem(
|
||||
addRangeTableEntryForSubquery(
|
||||
pstate, subquery,
|
||||
selectAlias, false, true));
|
||||
outerQuery->rtable = list_make1(newRangeTableEntry);
|
||||
|
||||
/* set the FROM expression to the subquery */
|
||||
RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
|
||||
newRangeTableRef->rtindex = 1;
|
||||
outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
|
||||
|
||||
/* create a target list that matches the SELECT */
|
||||
TargetEntry *selectTargetEntry = NULL;
|
||||
foreach_ptr(selectTargetEntry, subquery->targetList)
|
||||
{
|
||||
/* exactly 1 entry in FROM */
|
||||
int indexInRangeTable = 1;
|
||||
|
||||
if (selectTargetEntry->resjunk)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno,
|
||||
exprType((Node *) selectTargetEntry->expr),
|
||||
exprTypmod((Node *) selectTargetEntry->expr),
|
||||
exprCollation((Node *) selectTargetEntry->expr), 0);
|
||||
|
||||
TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar,
|
||||
selectTargetEntry->resno,
|
||||
selectTargetEntry->resname,
|
||||
selectTargetEntry->resjunk);
|
||||
|
||||
newTargetList = lappend(newTargetList, newSelectTargetEntry);
|
||||
}
|
||||
|
||||
outerQuery->targetList = newTargetList;
|
||||
|
||||
return outerQuery;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TwoPhaseInsertSelectTaskList generates a list of tasks for a query that
|
||||
* inserts into a target relation and selects from a set of co-located
|
||||
|
|
|
@ -48,8 +48,10 @@
|
|||
#include "utils/builtins.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/rel.h"
|
||||
#include <nodes/print.h>
|
||||
|
||||
|
||||
static void PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery);
|
||||
static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId,
|
||||
Query *originalQuery,
|
||||
PlannerRestrictionContext *
|
||||
|
@ -83,6 +85,7 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
|
|||
static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse,
|
||||
ParamListInfo boundParams);
|
||||
static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery);
|
||||
static Query * WrapSubquery(Query *subquery);
|
||||
static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList);
|
||||
static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
||||
Oid targetRelationId);
|
||||
|
@ -370,14 +373,17 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
|
|||
* combineQuery, this function also creates a dummy combineQuery for that.
|
||||
*/
|
||||
DistributedPlan *
|
||||
CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamListInfo
|
||||
boundParams, bool hasUnresolvedParams,
|
||||
CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery,
|
||||
ParamListInfo boundParams, bool hasUnresolvedParams,
|
||||
PlannerRestrictionContext *plannerRestrictionContext)
|
||||
{
|
||||
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(originalQuery);
|
||||
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
||||
|
||||
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
|
||||
|
||||
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
|
||||
Query *selectQuery = selectRte->subquery;
|
||||
|
||||
Query *selectQuery = BuildSelectForInsertSelect(originalQuery);
|
||||
originalQuery->cteList = NIL;
|
||||
DistributedPlan *distPlan = CreateDistributedPlan(planId, selectQuery,
|
||||
copyObject(selectQuery),
|
||||
boundParams, hasUnresolvedParams,
|
||||
|
@ -417,12 +423,84 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamL
|
|||
* distributed select instead of returning it.
|
||||
*/
|
||||
selectRte->subquery = distPlan->combineQuery;
|
||||
distPlan->combineQuery = originalQuery;
|
||||
distPlan->combineQuery = insertSelectQuery;
|
||||
|
||||
return distPlan;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PrepareInsertSelectForCitusPlanner prepares an INSERT..SELECT query tree
|
||||
* that was passed to the planner for use by Citus.
|
||||
*
|
||||
* First, it rebuilds the target lists of the INSERT and the SELECT
|
||||
* to be in the same order, which is not guaranteed in the parse tree.
|
||||
*
|
||||
* Second, some of the constants in the target list will have type
|
||||
* "unknown", which would confuse the Citus planner. To address that,
|
||||
* we add casts to SELECT target list entries whose type does not correspond
|
||||
* to the destination. This also helps us feed the output directly into
|
||||
* a COPY stream for INSERT..SELECT via coordinator.
|
||||
*
|
||||
* In case of UNION or other set operations, the SELECT does not have a
|
||||
* clearly defined target list, so we first wrap the UNION in a subquery.
|
||||
* UNION queries do not have the "unknown" type problem.
|
||||
*
|
||||
* Finally, if the INSERT has CTEs, we move those CTEs into the SELECT,
|
||||
* such that we can plan the SELECT as an independent query. To ensure
|
||||
* the ctelevelsup for CTE RTE's remain the same, we wrap the SELECT into
|
||||
* a subquery, unless we already did so in case of a UNION.
|
||||
*/
|
||||
static void
|
||||
PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery)
|
||||
{
|
||||
RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery);
|
||||
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
||||
Oid targetRelationId = insertRte->relid;
|
||||
|
||||
bool isWrapped = false;
|
||||
|
||||
if (selectRte->subquery->setOperations != NULL)
|
||||
{
|
||||
/*
|
||||
* Prepare UNION query for reordering and adding casts by
|
||||
* wrapping it in a subquery to have a single target list.
|
||||
*/
|
||||
selectRte->subquery = WrapSubquery(selectRte->subquery);
|
||||
isWrapped = true;
|
||||
}
|
||||
|
||||
/* this is required for correct deparsing of the query */
|
||||
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
||||
|
||||
/*
|
||||
* Cast types of insert target list and select projection list to
|
||||
* match the column types of the target relation.
|
||||
*/
|
||||
selectRte->subquery->targetList =
|
||||
AddInsertSelectCasts(insertSelectQuery->targetList,
|
||||
copyObject(selectRte->subquery->targetList),
|
||||
targetRelationId);
|
||||
|
||||
if (list_length(insertSelectQuery->cteList) > 0)
|
||||
{
|
||||
if (!isWrapped)
|
||||
{
|
||||
/*
|
||||
* By wrapping the SELECT in a subquery, we can avoid adjusting
|
||||
* ctelevelsup in RTE's that point to the CTEs.
|
||||
*/
|
||||
selectRte->subquery = WrapSubquery(selectRte->subquery);
|
||||
}
|
||||
|
||||
/* copy CTEs from the INSERT ... SELECT statement into outer SELECT */
|
||||
selectRte->subquery->cteList = copyObject(insertSelectQuery->cteList);
|
||||
selectRte->subquery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE;
|
||||
insertSelectQuery->cteList = NIL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateCombineQueryForRouterPlan is used for creating a dummy combineQuery
|
||||
* for a router plan, since router plans normally don't have one.
|
||||
|
@ -881,12 +959,11 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
|
|||
ListCell *insertTargetEntryCell;
|
||||
List *newSubqueryTargetlist = NIL;
|
||||
List *newInsertTargetlist = NIL;
|
||||
List *columnNameList = NIL;
|
||||
int resno = 1;
|
||||
Index insertTableId = 1;
|
||||
Index selectTableId = 2;
|
||||
int targetEntryIndex = 0;
|
||||
|
||||
AssertArg(InsertSelectIntoCitusTable(originalQuery));
|
||||
|
||||
Query *subquery = subqueryRte->subquery;
|
||||
|
||||
Oid insertRelationId = insertRte->relid;
|
||||
|
@ -954,6 +1031,9 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
|
|||
newSubqueryTargetEntry);
|
||||
}
|
||||
|
||||
String *columnName = makeString(newSubqueryTargetEntry->resname);
|
||||
columnNameList = lappend(columnNameList, columnName);
|
||||
|
||||
/*
|
||||
* The newly created select target entry cannot be a junk entry since junk
|
||||
* entries are not in the final target list and we're processing the
|
||||
|
@ -961,7 +1041,7 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
|
|||
*/
|
||||
Assert(!newSubqueryTargetEntry->resjunk);
|
||||
|
||||
Var *newInsertVar = makeVar(insertTableId, originalAttrNo,
|
||||
Var *newInsertVar = makeVar(selectTableId, resno,
|
||||
exprType((Node *) newSubqueryTargetEntry->expr),
|
||||
exprTypmod((Node *) newSubqueryTargetEntry->expr),
|
||||
exprCollation((Node *) newSubqueryTargetEntry->expr),
|
||||
|
@ -1005,6 +1085,7 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
|
|||
|
||||
originalQuery->targetList = newInsertTargetlist;
|
||||
subquery->targetList = newSubqueryTargetlist;
|
||||
subqueryRte->eref->colnames = columnNameList;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1412,19 +1493,10 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
|
|||
return distributedPlan;
|
||||
}
|
||||
|
||||
Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery);
|
||||
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
|
||||
|
||||
selectRte->subquery = selectQuery;
|
||||
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
||||
|
||||
/*
|
||||
* Cast types of insert target list and select projection list to
|
||||
* match the column types of the target relation.
|
||||
*/
|
||||
selectQuery->targetList =
|
||||
AddInsertSelectCasts(insertSelectQuery->targetList,
|
||||
selectQuery->targetList,
|
||||
targetRelationId);
|
||||
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
|
||||
Query *selectQuery = selectRte->subquery;
|
||||
|
||||
/*
|
||||
* Later we might need to call WrapTaskListForProjection(), which requires
|
||||
|
@ -1506,6 +1578,63 @@ InsertSelectResultIdPrefix(uint64 planId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* WrapSubquery wraps the given query as a subquery in a newly constructed
|
||||
* "SELECT * FROM (...subquery...) citus_insert_select_subquery" query.
|
||||
*/
|
||||
static Query *
|
||||
WrapSubquery(Query *subquery)
|
||||
{
|
||||
ParseState *pstate = make_parsestate(NULL);
|
||||
List *newTargetList = NIL;
|
||||
|
||||
Query *outerQuery = makeNode(Query);
|
||||
outerQuery->commandType = CMD_SELECT;
|
||||
|
||||
/* create range table entries */
|
||||
Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL);
|
||||
RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem(
|
||||
addRangeTableEntryForSubquery(
|
||||
pstate, subquery,
|
||||
selectAlias, false, true));
|
||||
outerQuery->rtable = list_make1(newRangeTableEntry);
|
||||
|
||||
/* set the FROM expression to the subquery */
|
||||
RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
|
||||
newRangeTableRef->rtindex = 1;
|
||||
outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
|
||||
|
||||
/* create a target list that matches the SELECT */
|
||||
TargetEntry *selectTargetEntry = NULL;
|
||||
foreach_ptr(selectTargetEntry, subquery->targetList)
|
||||
{
|
||||
/* exactly 1 entry in FROM */
|
||||
int indexInRangeTable = 1;
|
||||
|
||||
if (selectTargetEntry->resjunk)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno,
|
||||
exprType((Node *) selectTargetEntry->expr),
|
||||
exprTypmod((Node *) selectTargetEntry->expr),
|
||||
exprCollation((Node *) selectTargetEntry->expr), 0);
|
||||
|
||||
TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar,
|
||||
selectTargetEntry->resno,
|
||||
selectTargetEntry->resname,
|
||||
selectTargetEntry->resjunk);
|
||||
|
||||
newTargetList = lappend(newTargetList, newSelectTargetEntry);
|
||||
}
|
||||
|
||||
outerQuery->targetList = newTargetList;
|
||||
|
||||
return outerQuery;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RelabelTargetEntryList relabels select target list to have matching names with
|
||||
* insert target list.
|
||||
|
@ -1549,18 +1678,24 @@ AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
|||
int targetEntryIndex = 0;
|
||||
TargetEntry *insertEntry = NULL;
|
||||
TargetEntry *selectEntry = NULL;
|
||||
|
||||
forboth_ptr(insertEntry, insertTargetList, selectEntry, selectTargetList)
|
||||
{
|
||||
Var *insertColumn = (Var *) insertEntry->expr;
|
||||
Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor,
|
||||
insertEntry->resno - 1);
|
||||
|
||||
Oid sourceType = insertColumn->vartype;
|
||||
Oid sourceType = exprType((Node *) selectEntry->expr);
|
||||
Oid targetType = attr->atttypid;
|
||||
if (sourceType != targetType)
|
||||
{
|
||||
insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType,
|
||||
attr->attcollation, attr->atttypmod);
|
||||
/* ReorderInsertSelectTargetLists ensures we only have Vars */
|
||||
Assert(IsA(insertEntry->expr, Var));
|
||||
|
||||
/* we will cast the SELECT expression, so the type changes */
|
||||
Var *insertVar = (Var *) insertEntry->expr;
|
||||
insertVar->vartype = targetType;
|
||||
insertVar->vartypmod = attr->atttypmod;
|
||||
insertVar->varcollid = attr->attcollation;
|
||||
|
||||
/*
|
||||
* We cannot modify the selectEntry in-place, because ORDER BY or
|
||||
|
|
|
@ -3558,19 +3558,9 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query)
|
|||
NULL, NULL);
|
||||
}
|
||||
|
||||
if (contain_nextval_expression_walker((Node *) query->targetList, NULL))
|
||||
{
|
||||
/*
|
||||
* We let queries with nextval in the target list fall through to
|
||||
* the logical planner, which knows how to handle those queries.
|
||||
*/
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"Sequences cannot be used in router queries",
|
||||
NULL, NULL);
|
||||
}
|
||||
|
||||
bool hasPostgresOrCitusLocalTable = false;
|
||||
bool hasDistributedTable = false;
|
||||
bool hasReferenceTable = false;
|
||||
|
||||
ExtractRangeTableRelationWalker((Node *) query, &rangeTableRelationList);
|
||||
foreach(rangeTableRelationCell, rangeTableRelationList)
|
||||
|
@ -3586,6 +3576,11 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query)
|
|||
hasPostgresOrCitusLocalTable = true;
|
||||
continue;
|
||||
}
|
||||
else if (IsCitusTableType(distributedTableId, REFERENCE_TABLE))
|
||||
{
|
||||
hasReferenceTable = true;
|
||||
continue;
|
||||
}
|
||||
else if (IsCitusTableType(distributedTableId, CITUS_LOCAL_TABLE))
|
||||
{
|
||||
hasPostgresOrCitusLocalTable = true;
|
||||
|
@ -3628,6 +3623,28 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query)
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* We want to make sure nextval happens on the coordinator / the current
|
||||
* node, since the user may have certain expectations around the values
|
||||
* produced by the sequence. We therefore cannot push down the nextval
|
||||
* call as part of a router query.
|
||||
*
|
||||
* We let queries with nextval in the target list fall through to
|
||||
* the logical planner, which will ensure that the nextval is called
|
||||
* in the combine query on the coordinator.
|
||||
*
|
||||
* If there are no distributed or reference tables in the query,
|
||||
* then the query will anyway happen on the coordinator, so we can
|
||||
* allow nextval.
|
||||
*/
|
||||
if (contain_nextval_expression_walker((Node *) query->targetList, NULL) &&
|
||||
(hasDistributedTable || hasReferenceTable))
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"Sequences cannot be used in router queries",
|
||||
NULL, NULL);
|
||||
}
|
||||
|
||||
/* local tables are not allowed if there are distributed tables */
|
||||
if (hasPostgresOrCitusLocalTable && hasDistributedTable)
|
||||
{
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
extern bool EnableRepartitionedInsertSelect;
|
||||
|
||||
extern TupleTableSlot * NonPushableInsertSelectExecScan(CustomScanState *node);
|
||||
extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery);
|
||||
extern bool IsSupportedRedistributionTarget(Oid targetRelationId);
|
||||
extern bool IsRedistributablePlan(Plan *selectPlan);
|
||||
|
||||
|
|
|
@ -896,8 +896,8 @@ HAVING (max(table_2.value) >= (SELECT value FROM a));
|
|||
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
|
||||
DEBUG: push down of limit count: 1
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
DEBUG: Subplan XXX_1 will be written to local file
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
|
@ -905,9 +905,9 @@ DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
|||
NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint
|
||||
NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint
|
||||
DEBUG: Subplan XXX_2 will be written to local file
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
||||
NOTICE: executing the command locally: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||
NOTICE: executing the command locally: SELECT count(*) AS auto_coerced_by_citus_0, (worker_column_1)::text AS auto_coerced_by_citus_1, worker_column_1 AS discarded_target_item_1, max(worker_column_2) AS worker_column_4 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
||||
NOTICE: executing the command locally: SELECT count(*) AS auto_coerced_by_citus_0, (worker_column_1)::text AS auto_coerced_by_citus_1, worker_column_1 AS discarded_target_item_1, max(worker_column_2) AS worker_column_4 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
||||
NOTICE: executing the command locally: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
WITH stats AS (
|
||||
SELECT count(key) m FROM table_1
|
||||
|
|
|
@ -149,6 +149,67 @@ SELECT * FROM non_dist_unique ORDER BY 1;
|
|||
5 | 8
|
||||
(5 rows)
|
||||
|
||||
INSERT INTO non_dist_unique
|
||||
SELECT a+1, b FROM dist_table
|
||||
UNION ALL
|
||||
SELECT a+100, b FROM dist_table
|
||||
ON CONFLICT (a) DO NOTHING;
|
||||
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 6
|
||||
2 | 7
|
||||
3 | 14
|
||||
4 | 15
|
||||
5 | 8
|
||||
101 | 6
|
||||
102 | 7
|
||||
103 | 8
|
||||
(8 rows)
|
||||
|
||||
INSERT INTO non_dist_unique
|
||||
SELECT a+1, b FROM dist_table
|
||||
UNION ALL
|
||||
SELECT a+100, b FROM dist_table
|
||||
ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + 1;
|
||||
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 6
|
||||
2 | 7
|
||||
3 | 8
|
||||
4 | 9
|
||||
5 | 8
|
||||
101 | 7
|
||||
102 | 8
|
||||
103 | 9
|
||||
(8 rows)
|
||||
|
||||
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
|
||||
INSERT INTO non_dist_unique
|
||||
WITH cte2 AS (SELECT s FROM generate_series(1,10) s)
|
||||
SELECT a+1, b FROM dist_table WHERE b IN (SELECT s FROM cte1)
|
||||
UNION ALL
|
||||
SELECT s, s FROM cte1
|
||||
ON CONFLICT (a) DO NOTHING;
|
||||
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 6
|
||||
2 | 7
|
||||
3 | 8
|
||||
4 | 9
|
||||
5 | 8
|
||||
6 | 6
|
||||
7 | 7
|
||||
8 | 8
|
||||
9 | 9
|
||||
10 | 10
|
||||
101 | 7
|
||||
102 | 8
|
||||
103 | 9
|
||||
(13 rows)
|
||||
|
||||
DROP TABLE non_dist_unique;
|
||||
-- test INSERT INTO a table with DEFAULT
|
||||
CREATE TABLE non_dist_default (a INT, c TEXT DEFAULT 'def');
|
||||
|
@ -168,6 +229,16 @@ SELECT * FROM non_dist_default ORDER BY 1, 2;
|
|||
3 | def
|
||||
(3 rows)
|
||||
|
||||
SELECT alter_table_set_access_method('non_dist_default', 'columnar');
|
||||
NOTICE: creating a new table for insert_select_into_local_table.non_dist_default
|
||||
NOTICE: moving the data of insert_select_into_local_table.non_dist_default
|
||||
NOTICE: dropping the old insert_select_into_local_table.non_dist_default
|
||||
NOTICE: renaming the new table to insert_select_into_local_table.non_dist_default
|
||||
alter_table_set_access_method
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1;
|
||||
SELECT * FROM non_dist_default ORDER BY 1, 2;
|
||||
a | c
|
||||
|
@ -354,6 +425,691 @@ SELECT * FROM non_dist_2 ORDER BY 1, 2;
|
|||
(3 rows)
|
||||
|
||||
TRUNCATE non_dist_2;
|
||||
-- check issue https://github.com/citusdata/citus/issues/5858
|
||||
CREATE TABLE local_dest_table(
|
||||
col_1 integer,
|
||||
col_2 integer,
|
||||
col_3 text,
|
||||
col_4 text,
|
||||
drop_col text,
|
||||
col_5 bigint,
|
||||
col_6 text,
|
||||
col_7 text default 'col_7',
|
||||
col_8 varchar
|
||||
);
|
||||
ALTER TABLE local_dest_table DROP COLUMN drop_col;
|
||||
CREATE TABLE dist_source_table_1(
|
||||
int_col integer,
|
||||
drop_col text,
|
||||
text_col_1 text,
|
||||
dist_col integer,
|
||||
text_col_2 text
|
||||
);
|
||||
SELECT create_distributed_table('dist_source_table_1', 'dist_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE dist_source_table_1 DROP COLUMN drop_col;
|
||||
INSERT INTO dist_source_table_1 VALUES (1, 'value', 1, 'value');
|
||||
INSERT INTO dist_source_table_1 VALUES (2, 'value2', 1, 'value');
|
||||
INSERT INTO dist_source_table_1 VALUES (3, 'value', 3, 'value3');
|
||||
CREATE TABLE dist_source_table_2(
|
||||
dist_col integer,
|
||||
int_col integer
|
||||
);
|
||||
SELECT create_distributed_table('dist_source_table_2', 'dist_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_source_table_2 VALUES (1, 1);
|
||||
INSERT INTO dist_source_table_2 VALUES (2, 2);
|
||||
INSERT INTO dist_source_table_2 VALUES (4, 4);
|
||||
CREATE TABLE local_source_table_1 AS SELECT * FROM dist_source_table_1;
|
||||
CREATE TABLE local_source_table_2 AS SELECT * FROM dist_source_table_2;
|
||||
/*
|
||||
* query_results_equal compares the effect of two queries on local_dest_table.
|
||||
* We use this to ensure that INSERT INTO local_dest_table SELECT behaves
|
||||
* the same when selecting from a regular table (postgres handles it) and
|
||||
* a distributed table (Citus handles it).
|
||||
*
|
||||
* The queries are generated by calling format() on query_table twice,
|
||||
* once for each source_table argument.
|
||||
*/
|
||||
CREATE OR REPLACE FUNCTION query_results_equal(query_template text, source_table_1 text, source_table_2 text)
|
||||
RETURNS bool
|
||||
AS $$
|
||||
DECLARE
|
||||
l1 local_dest_table[];
|
||||
l2 local_dest_table[];
|
||||
BEGIN
|
||||
/* get the results using source_table_1 as source */
|
||||
TRUNCATE local_dest_table;
|
||||
EXECUTE format(query_template, source_table_1);
|
||||
SELECT array_agg(l) INTO l1
|
||||
FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l;
|
||||
|
||||
/* get the results using source_table_2 as source */
|
||||
TRUNCATE local_dest_table;
|
||||
EXECUTE format(query_template, source_table_2);
|
||||
SELECT array_agg(l) INTO l2
|
||||
FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l;
|
||||
|
||||
RAISE NOTICE 'l2=%', l1;
|
||||
RAISE NOTICE 'l2=%', l2;
|
||||
RETURN l1 = l2;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table
|
||||
SELECT
|
||||
t1.dist_col,
|
||||
1,
|
||||
'string1',
|
||||
'string2',
|
||||
2,
|
||||
'string3',
|
||||
t1.text_col_1,
|
||||
t1.text_col_2
|
||||
FROM %1$s_1 t1
|
||||
WHERE t1.int_col IN (SELECT int_col FROM %1$s_2)
|
||||
$$, 'local_source_table', 'dist_source_table');
|
||||
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table
|
||||
SELECT
|
||||
t1.dist_col,
|
||||
1,
|
||||
'string1',
|
||||
'string2',
|
||||
2,
|
||||
'string3',
|
||||
t1.text_col_1,
|
||||
t1.text_col_2
|
||||
FROM %1$s t1
|
||||
returning *
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_3, col_4) SELECT
|
||||
'string1',
|
||||
'string2'::text
|
||||
FROM %1$s t1
|
||||
returning *;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_7, col_4) SELECT
|
||||
'string1',
|
||||
'string2'::text
|
||||
FROM %1$s t1
|
||||
returning *;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_4, col_3) SELECT
|
||||
'string1',
|
||||
'string2'::text
|
||||
FROM %1$s t1
|
||||
WHERE dist_col = 1
|
||||
returning *;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_4, col_1)
|
||||
SELECT
|
||||
'string1',
|
||||
dist_col
|
||||
FROM %1$s
|
||||
UNION ALL
|
||||
SELECT
|
||||
'string',
|
||||
int_col
|
||||
FROM %1$s;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
|
||||
INSERT INTO local_dest_table (col_4, col_1)
|
||||
SELECT
|
||||
'string1',
|
||||
dist_col
|
||||
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
|
||||
UNION ALL
|
||||
SELECT
|
||||
'string',
|
||||
int_col
|
||||
FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1)
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s)
|
||||
INSERT INTO local_dest_table (col_4, col_1)
|
||||
SELECT
|
||||
'string1',
|
||||
dist_col
|
||||
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
|
||||
UNION ALL
|
||||
SELECT
|
||||
*
|
||||
FROM cte1
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_3)
|
||||
SELECT t1.text_col_1
|
||||
FROM %1$s t1
|
||||
GROUP BY t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8)
|
||||
SELECT
|
||||
max(t1.dist_col),
|
||||
3,
|
||||
'string_3',
|
||||
4,
|
||||
44,
|
||||
t1.text_col_1,
|
||||
'string_1000'
|
||||
FROM %1$s t1
|
||||
GROUP BY t1.text_col_2, t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(1,3,string_3,,4,44,value,string_1000)","(1,3,string_3,,4,44,value2,string_1000)","(3,3,string_3,,4,44,value,string_1000)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(1,3,string_3,,4,44,value,string_1000)","(1,3,string_3,,4,44,value2,string_1000)","(3,3,string_3,,4,44,value,string_1000)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_7, col_8)
|
||||
SELECT
|
||||
t1.text_col_1,
|
||||
'string_1000'
|
||||
FROM dist_source_table_1 t1
|
||||
GROUP BY t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_6, col_7, col_8)
|
||||
SELECT
|
||||
'string_4',
|
||||
t1.text_col_1,
|
||||
'string_1000'
|
||||
FROM %1$s t1
|
||||
GROUP BY t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_5, col_3)
|
||||
SELECT 12, 'string_11' FROM %1$s t1
|
||||
UNION
|
||||
SELECT int_col, 'string' FROM %1$s;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table(col_3, col_2)
|
||||
SELECT text_col_1, count(*) FROM %1$s GROUP BY 1
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,1,value2,,,,col_7,)","(,2,value,,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,1,value2,,,,col_7,)","(,2,value,,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table(col_3, col_5)
|
||||
SELECT text_col_1, count(*)::int FROM %1$s GROUP BY 1
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,value,,2,,col_7,)","(,,value2,,1,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,value,,2,,col_7,)","(,,value2,,1,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- repeat above tests with Citus local table
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table
|
||||
SELECT
|
||||
t1.dist_col,
|
||||
1,
|
||||
'string1',
|
||||
'string2',
|
||||
2,
|
||||
'string3',
|
||||
t1.text_col_1,
|
||||
t1.text_col_2
|
||||
FROM %1$s_1 t1
|
||||
WHERE t1.int_col IN (SELECT int_col FROM %1$s_2)
|
||||
$$, 'local_source_table', 'dist_source_table');
|
||||
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table
|
||||
SELECT
|
||||
t1.dist_col,
|
||||
1,
|
||||
'string1',
|
||||
'string2',
|
||||
2,
|
||||
'string3',
|
||||
t1.text_col_1,
|
||||
t1.text_col_2
|
||||
FROM %1$s t1
|
||||
returning *
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(1,1,string1,string2,2,string3,value,value)","(1,1,string1,string2,2,string3,value2,value)","(3,1,string1,string2,2,string3,value,value3)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_3, col_4) SELECT
|
||||
'string1',
|
||||
'string2'::text
|
||||
FROM %1$s t1
|
||||
returning *;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)","(,,string1,string2,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_7, col_4) SELECT
|
||||
'string1',
|
||||
'string2'::text
|
||||
FROM %1$s t1
|
||||
returning *;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,,string2,,,string1,)","(,,,string2,,,string1,)","(,,,string2,,,string1,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_4, col_3) SELECT
|
||||
'string1',
|
||||
'string2'::text
|
||||
FROM %1$s t1
|
||||
WHERE dist_col = 1
|
||||
returning *;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,string2,string1,,,col_7,)","(,,string2,string1,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_4, col_1)
|
||||
SELECT
|
||||
'string1',
|
||||
dist_col
|
||||
FROM %1$s
|
||||
UNION ALL
|
||||
SELECT
|
||||
'string',
|
||||
int_col
|
||||
FROM %1$s;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(1,,,string,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
|
||||
INSERT INTO local_dest_table (col_4, col_1)
|
||||
SELECT
|
||||
'string1',
|
||||
dist_col
|
||||
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
|
||||
UNION ALL
|
||||
SELECT
|
||||
'string',
|
||||
int_col
|
||||
FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1)
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(2,,,string,,,col_7,)","(3,,,string,,,col_7,)","(3,,,string1,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s)
|
||||
INSERT INTO local_dest_table (col_4, col_1)
|
||||
SELECT
|
||||
'string1',
|
||||
dist_col
|
||||
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
|
||||
UNION ALL
|
||||
SELECT
|
||||
*
|
||||
FROM cte1
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(1,,,string1,,,col_7,)","(1,,,string1,,,col_7,)","(1,,,stringcte,,,col_7,)","(2,,,stringcte,,,col_7,)","(3,,,string1,,,col_7,)","(3,,,stringcte,,,col_7,)","(4,,,stringcte,,,col_7,)","(5,,,stringcte,,,col_7,)","(6,,,stringcte,,,col_7,)","(7,,,stringcte,,,col_7,)","(8,,,stringcte,,,col_7,)","(9,,,stringcte,,,col_7,)","(10,,,stringcte,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_3)
|
||||
SELECT t1.text_col_1
|
||||
FROM %1$s t1
|
||||
GROUP BY t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,value,,,,col_7,)","(,,value2,,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8)
|
||||
SELECT
|
||||
max(t1.dist_col),
|
||||
3,
|
||||
'string_3',
|
||||
4,
|
||||
44,
|
||||
t1.text_col_1,
|
||||
'string_1000'
|
||||
FROM %1$s t1
|
||||
GROUP BY t1.text_col_2, t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(1,3,string_3,,4,44,value,string_1000)","(1,3,string_3,,4,44,value2,string_1000)","(3,3,string_3,,4,44,value,string_1000)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(1,3,string_3,,4,44,value,string_1000)","(1,3,string_3,,4,44,value2,string_1000)","(3,3,string_3,,4,44,value,string_1000)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_7, col_8)
|
||||
SELECT
|
||||
t1.text_col_1,
|
||||
'string_1000'
|
||||
FROM dist_source_table_1 t1
|
||||
GROUP BY t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,,,,,value,string_1000)","(,,,,,,value2,string_1000)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_6, col_7, col_8)
|
||||
SELECT
|
||||
'string_4',
|
||||
t1.text_col_1,
|
||||
'string_1000'
|
||||
FROM %1$s t1
|
||||
GROUP BY t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,,,,string_4,value,string_1000)","(,,,,,string_4,value2,string_1000)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_5, col_3)
|
||||
SELECT 12, 'string_11' FROM %1$s t1
|
||||
UNION
|
||||
SELECT int_col, 'string' FROM %1$s;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,string,,1,,col_7,)","(,,string,,2,,col_7,)","(,,string,,3,,col_7,)","(,,string_11,,12,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table(col_3, col_2)
|
||||
SELECT text_col_1, count(*) FROM %1$s GROUP BY 1
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,1,value2,,,,col_7,)","(,2,value,,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,1,value2,,,,col_7,)","(,2,value,,,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table(col_3, col_5)
|
||||
SELECT text_col_1, count(*)::int FROM %1$s GROUP BY 1
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
NOTICE: l2={"(,,value,,2,,col_7,)","(,,value2,,1,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
NOTICE: l2={"(,,value,,2,,col_7,)","(,,value2,,1,,col_7,)"}
|
||||
CONTEXT: PL/pgSQL function query_results_equal(text,text,text) line XX at RAISE
|
||||
query_results_equal
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- go back to proper local table for remaining tests
|
||||
TRUNCATE local_dest_table;
|
||||
SELECT undistribute_table('local_source_table_1');
|
||||
ERROR: cannot undistribute table because the table is not distributed
|
||||
-- use a sequence (cannot use query_results_equal, since sequence values would not match)
|
||||
CREATE SEQUENCE seq;
|
||||
BEGIN;
|
||||
INSERT INTO local_dest_table (col_5, col_3)
|
||||
SELECT 12, 'string_11' FROM dist_source_table_1
|
||||
UNION
|
||||
SELECT nextval('seq'), 'string' FROM dist_source_table_1;
|
||||
SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8;
|
||||
col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8
|
||||
---------------------------------------------------------------------
|
||||
| | string | | 1 | | col_7 |
|
||||
| | string | | 2 | | col_7 |
|
||||
| | string | | 3 | | col_7 |
|
||||
| | string_11 | | 12 | | col_7 |
|
||||
(4 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- add a bigserial column
|
||||
ALTER TABLE local_dest_table ADD COLUMN col_9 bigserial;
|
||||
-- not supported due to limitations in nextval handling
|
||||
INSERT INTO local_dest_table (col_5, col_3)
|
||||
SELECT 12, 'string_11' FROM dist_source_table_1
|
||||
UNION
|
||||
SELECT 11, 'string' FROM dist_source_table_1;
|
||||
SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8;
|
||||
col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8 | col_9
|
||||
---------------------------------------------------------------------
|
||||
| | string | | 11 | | col_7 | | 2
|
||||
| | string_11 | | 12 | | col_7 | | 1
|
||||
(2 rows)
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO local_dest_table(col_3, col_2)
|
||||
SELECT text_col_1, count(*) FROM dist_source_table_1 GROUP BY 1;
|
||||
SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8;
|
||||
col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8 | col_9
|
||||
---------------------------------------------------------------------
|
||||
| 1 | value2 | | | | col_7 | | 3
|
||||
| 2 | value | | | | col_7 | | 4
|
||||
| | string | | 11 | | col_7 | | 2
|
||||
| | string_11 | | 12 | | col_7 | | 1
|
||||
(4 rows)
|
||||
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
INSERT INTO local_dest_table (col_4, col_3) SELECT
|
||||
'string1',
|
||||
'string2'::text
|
||||
FROM dist_source_table_1 t1
|
||||
WHERE dist_col = 1
|
||||
RETURNING *;
|
||||
col_1 | col_2 | col_3 | col_4 | col_5 | col_6 | col_7 | col_8 | col_9
|
||||
---------------------------------------------------------------------
|
||||
| | string2 | string1 | | | col_7 | | 5
|
||||
| | string2 | string1 | | | col_7 | | 6
|
||||
(2 rows)
|
||||
|
||||
ROLLBACK;
|
||||
\set VERBOSITY terse
|
||||
DROP SCHEMA insert_select_into_local_table CASCADE;
|
||||
NOTICE: drop cascades to 5 other objects
|
||||
NOTICE: drop cascades to 12 other objects
|
||||
|
|
|
@ -500,7 +500,7 @@ INSERT INTO target_table
|
|||
SELECT mapped_key, c FROM t NATURAL JOIN source_table;
|
||||
DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: generating subplan XXX_1 for CTE t: SELECT mapped_key, a, c FROM insert_select_repartition.source_table WHERE ((a)::double precision OPERATOR(pg_catalog.>) floor(random()))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key AS a, (c)::integer[] AS b FROM (SELECT t.mapped_key, t.c FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key AS a, auto_coerced_by_citus_1 AS b FROM (SELECT t.mapped_key, (t.c)::integer[] AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery
|
||||
DEBUG: performing repartitioned INSERT ... SELECT
|
||||
RESET client_min_messages;
|
||||
SELECT * FROM target_table ORDER BY a;
|
||||
|
|
|
@ -1411,21 +1411,20 @@ WITH cte1 AS (SELECT * FROM cte1 WHERE EXISTS (SELECT * FROM cte1) LIMIT 5)
|
|||
SELECT s FROM cte1 WHERE EXISTS (SELECT * FROM cte1);
|
||||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> Subquery Scan on citus_insert_select_subquery
|
||||
-> Result
|
||||
One-Time Filter: $3
|
||||
CTE cte1
|
||||
-> Function Scan on generate_series s
|
||||
-> Result
|
||||
One-Time Filter: $3
|
||||
CTE cte1
|
||||
-> Limit
|
||||
InitPlan 2 (returns $1)
|
||||
-> CTE Scan on cte1 cte1_1
|
||||
-> Result
|
||||
One-Time Filter: $1
|
||||
-> CTE Scan on cte1 cte1_2
|
||||
InitPlan 4 (returns $3)
|
||||
-> CTE Scan on cte1 cte1_3
|
||||
-> CTE Scan on cte1
|
||||
CTE cte1
|
||||
-> Limit
|
||||
InitPlan 2 (returns $1)
|
||||
-> CTE Scan on cte1 cte1_1
|
||||
-> Result
|
||||
One-Time Filter: $1
|
||||
-> CTE Scan on cte1 cte1_2
|
||||
InitPlan 4 (returns $3)
|
||||
-> CTE Scan on cte1 cte1_3
|
||||
-> CTE Scan on cte1
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO lineitem_hash_part
|
||||
( SELECT s FROM generate_series(1,5) s) UNION
|
||||
|
|
|
@ -669,7 +669,7 @@ DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
|||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: generating subplan XXX_1 for CTE fist_table_agg: SELECT (max(value_1) OPERATOR(pg_catalog.+) 1) AS v1_agg, user_id FROM public.raw_events_first GROUP BY user_id
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, v1_agg AS value_1_agg FROM (SELECT fist_table_agg.v1_agg, fist_table_agg.user_id FROM (SELECT intermediate_result.v1_agg, intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v1_agg integer, user_id integer)) fist_table_agg) citus_insert_select_subquery
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, v1_agg AS value_1_agg FROM (SELECT fist_table_agg.user_id, fist_table_agg.v1_agg FROM (SELECT intermediate_result.v1_agg, intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v1_agg integer, user_id integer)) fist_table_agg) citus_insert_select_subquery
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
ROLLBACK;
|
||||
|
@ -2714,7 +2714,6 @@ WITH top10 AS (
|
|||
)
|
||||
INSERT INTO dist_table_with_sequence (value_1)
|
||||
SELECT * FROM top10;
|
||||
ERROR: cannot handle complex subqueries when the router executor is disabled
|
||||
SELECT * FROM dist_table_with_sequence ORDER BY user_id, value_1;
|
||||
user_id | value_1
|
||||
---------------------------------------------------------------------
|
||||
|
@ -2799,7 +2798,6 @@ WITH top10 AS (
|
|||
)
|
||||
INSERT INTO dist_table_with_user_sequence (value_1)
|
||||
SELECT * FROM top10;
|
||||
ERROR: cannot handle complex subqueries when the router executor is disabled
|
||||
SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1;
|
||||
user_id | value_1
|
||||
---------------------------------------------------------------------
|
||||
|
@ -3236,6 +3234,47 @@ INSERT INTO raw_events_first
|
|||
SELECT * FROM raw_events_first OFFSET 0
|
||||
ON CONFLICT DO NOTHING;
|
||||
ABORT;
|
||||
-- test fix for issue https://github.com/citusdata/citus/issues/5891
|
||||
CREATE TABLE dist_table_1(
|
||||
dist_col integer,
|
||||
int_col integer,
|
||||
text_col_1 text,
|
||||
text_col_2 text
|
||||
);
|
||||
SELECT create_distributed_table('dist_table_1', 'dist_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_table_1 VALUES (1, 1, 'string', 'string');
|
||||
CREATE TABLE dist_table_2(
|
||||
dist_col integer,
|
||||
int_col integer
|
||||
);
|
||||
SELECT create_distributed_table('dist_table_2', 'dist_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_table_2 VALUES (1, 1);
|
||||
with a as (select random()) INSERT INTO dist_table_1
|
||||
SELECT
|
||||
t1.dist_col,
|
||||
1,
|
||||
'string',
|
||||
'string'
|
||||
FROM a, dist_table_1 t1
|
||||
join dist_table_2 t2 using (dist_col)
|
||||
limit 1
|
||||
returning text_col_1;
|
||||
text_col_1
|
||||
---------------------------------------------------------------------
|
||||
string
|
||||
(1 row)
|
||||
|
||||
DROP TABLE dist_table_1, dist_table_2;
|
||||
-- wrap in a transaction to improve performance
|
||||
BEGIN;
|
||||
DROP TABLE coerce_events;
|
||||
|
|
|
@ -78,8 +78,8 @@ HAVING (max(table_2.value) >= (SELECT value FROM a));
|
|||
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
|
||||
DEBUG: push down of limit count: 1
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
DEBUG: Subplan XXX_1 will be written to local file
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
|
@ -139,8 +139,8 @@ HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
|
|||
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1
|
||||
DEBUG: push down of limit count: 1
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
DEBUG: Subplan XXX_1 will be written to local file
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
|
@ -204,8 +204,8 @@ HAVING (max(table_2.value) >= (SELECT value FROM a));
|
|||
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
|
||||
DEBUG: push down of limit count: 1
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
DEBUG: Subplan XXX_1 will be written to local file
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
|
@ -265,8 +265,8 @@ HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
|
|||
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1
|
||||
DEBUG: push down of limit count: 1
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
DEBUG: Subplan XXX_1 will be written to local file
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
|
|
|
@ -64,6 +64,30 @@ INSERT INTO non_dist_unique SELECT a+1, b FROM dist_table ON CONFLICT (a) DO NOT
|
|||
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||
INSERT INTO non_dist_unique SELECT a+2, b FROM dist_table ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + non_dist_unique.b;
|
||||
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||
|
||||
INSERT INTO non_dist_unique
|
||||
SELECT a+1, b FROM dist_table
|
||||
UNION ALL
|
||||
SELECT a+100, b FROM dist_table
|
||||
ON CONFLICT (a) DO NOTHING;
|
||||
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||
|
||||
INSERT INTO non_dist_unique
|
||||
SELECT a+1, b FROM dist_table
|
||||
UNION ALL
|
||||
SELECT a+100, b FROM dist_table
|
||||
ON CONFLICT (a) DO UPDATE SET b = EXCLUDED.b + 1;
|
||||
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||
|
||||
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
|
||||
INSERT INTO non_dist_unique
|
||||
WITH cte2 AS (SELECT s FROM generate_series(1,10) s)
|
||||
SELECT a+1, b FROM dist_table WHERE b IN (SELECT s FROM cte1)
|
||||
UNION ALL
|
||||
SELECT s, s FROM cte1
|
||||
ON CONFLICT (a) DO NOTHING;
|
||||
SELECT * FROM non_dist_unique ORDER BY 1;
|
||||
|
||||
DROP TABLE non_dist_unique;
|
||||
|
||||
|
||||
|
@ -73,6 +97,7 @@ INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a = 1;
|
|||
SELECT * FROM non_dist_default ORDER BY 1, 2;
|
||||
INSERT INTO non_dist_default SELECT a FROM dist_table WHERE a > 1;
|
||||
SELECT * FROM non_dist_default ORDER BY 1, 2;
|
||||
SELECT alter_table_set_access_method('non_dist_default', 'columnar');
|
||||
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a = 1;
|
||||
SELECT * FROM non_dist_default ORDER BY 1, 2;
|
||||
INSERT INTO non_dist_default SELECT a, c FROM dist_table WHERE a > 1;
|
||||
|
@ -149,5 +174,427 @@ INSERT INTO non_dist_2 SELECT a, c FROM ref_table;
|
|||
SELECT * FROM non_dist_2 ORDER BY 1, 2;
|
||||
TRUNCATE non_dist_2;
|
||||
|
||||
-- check issue https://github.com/citusdata/citus/issues/5858
|
||||
CREATE TABLE local_dest_table(
|
||||
col_1 integer,
|
||||
col_2 integer,
|
||||
col_3 text,
|
||||
col_4 text,
|
||||
drop_col text,
|
||||
col_5 bigint,
|
||||
col_6 text,
|
||||
col_7 text default 'col_7',
|
||||
col_8 varchar
|
||||
);
|
||||
|
||||
ALTER TABLE local_dest_table DROP COLUMN drop_col;
|
||||
|
||||
CREATE TABLE dist_source_table_1(
|
||||
int_col integer,
|
||||
drop_col text,
|
||||
text_col_1 text,
|
||||
dist_col integer,
|
||||
text_col_2 text
|
||||
);
|
||||
SELECT create_distributed_table('dist_source_table_1', 'dist_col');
|
||||
|
||||
ALTER TABLE dist_source_table_1 DROP COLUMN drop_col;
|
||||
|
||||
INSERT INTO dist_source_table_1 VALUES (1, 'value', 1, 'value');
|
||||
INSERT INTO dist_source_table_1 VALUES (2, 'value2', 1, 'value');
|
||||
INSERT INTO dist_source_table_1 VALUES (3, 'value', 3, 'value3');
|
||||
|
||||
CREATE TABLE dist_source_table_2(
|
||||
dist_col integer,
|
||||
int_col integer
|
||||
);
|
||||
SELECT create_distributed_table('dist_source_table_2', 'dist_col');
|
||||
|
||||
INSERT INTO dist_source_table_2 VALUES (1, 1);
|
||||
INSERT INTO dist_source_table_2 VALUES (2, 2);
|
||||
INSERT INTO dist_source_table_2 VALUES (4, 4);
|
||||
|
||||
CREATE TABLE local_source_table_1 AS SELECT * FROM dist_source_table_1;
|
||||
CREATE TABLE local_source_table_2 AS SELECT * FROM dist_source_table_2;
|
||||
|
||||
/*
|
||||
* query_results_equal compares the effect of two queries on local_dest_table.
|
||||
* We use this to ensure that INSERT INTO local_dest_table SELECT behaves
|
||||
* the same when selecting from a regular table (postgres handles it) and
|
||||
* a distributed table (Citus handles it).
|
||||
*
|
||||
* The queries are generated by calling format() on query_table twice,
|
||||
* once for each source_table argument.
|
||||
*/
|
||||
CREATE OR REPLACE FUNCTION query_results_equal(query_template text, source_table_1 text, source_table_2 text)
|
||||
RETURNS bool
|
||||
AS $$
|
||||
DECLARE
|
||||
l1 local_dest_table[];
|
||||
l2 local_dest_table[];
|
||||
BEGIN
|
||||
/* get the results using source_table_1 as source */
|
||||
TRUNCATE local_dest_table;
|
||||
EXECUTE format(query_template, source_table_1);
|
||||
SELECT array_agg(l) INTO l1
|
||||
FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l;
|
||||
|
||||
/* get the results using source_table_2 as source */
|
||||
TRUNCATE local_dest_table;
|
||||
EXECUTE format(query_template, source_table_2);
|
||||
SELECT array_agg(l) INTO l2
|
||||
FROM (SELECT * FROM local_dest_table ORDER BY 1, 2, 3, 4, 5, 6, 7, 8) l;
|
||||
|
||||
RAISE NOTICE 'l2=%', l1;
|
||||
RAISE NOTICE 'l2=%', l2;
|
||||
RETURN l1 = l2;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table
|
||||
SELECT
|
||||
t1.dist_col,
|
||||
1,
|
||||
'string1',
|
||||
'string2',
|
||||
2,
|
||||
'string3',
|
||||
t1.text_col_1,
|
||||
t1.text_col_2
|
||||
FROM %1$s_1 t1
|
||||
WHERE t1.int_col IN (SELECT int_col FROM %1$s_2)
|
||||
$$, 'local_source_table', 'dist_source_table');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table
|
||||
SELECT
|
||||
t1.dist_col,
|
||||
1,
|
||||
'string1',
|
||||
'string2',
|
||||
2,
|
||||
'string3',
|
||||
t1.text_col_1,
|
||||
t1.text_col_2
|
||||
FROM %1$s t1
|
||||
returning *
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_3, col_4) SELECT
|
||||
'string1',
|
||||
'string2'::text
|
||||
FROM %1$s t1
|
||||
returning *;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_7, col_4) SELECT
|
||||
'string1',
|
||||
'string2'::text
|
||||
FROM %1$s t1
|
||||
returning *;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_4, col_3) SELECT
|
||||
'string1',
|
||||
'string2'::text
|
||||
FROM %1$s t1
|
||||
WHERE dist_col = 1
|
||||
returning *;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_4, col_1)
|
||||
SELECT
|
||||
'string1',
|
||||
dist_col
|
||||
FROM %1$s
|
||||
UNION ALL
|
||||
SELECT
|
||||
'string',
|
||||
int_col
|
||||
FROM %1$s;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
|
||||
INSERT INTO local_dest_table (col_4, col_1)
|
||||
SELECT
|
||||
'string1',
|
||||
dist_col
|
||||
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
|
||||
UNION ALL
|
||||
SELECT
|
||||
'string',
|
||||
int_col
|
||||
FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1)
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s)
|
||||
INSERT INTO local_dest_table (col_4, col_1)
|
||||
SELECT
|
||||
'string1',
|
||||
dist_col
|
||||
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
|
||||
UNION ALL
|
||||
SELECT
|
||||
*
|
||||
FROM cte1
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_3)
|
||||
SELECT t1.text_col_1
|
||||
FROM %1$s t1
|
||||
GROUP BY t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8)
|
||||
SELECT
|
||||
max(t1.dist_col),
|
||||
3,
|
||||
'string_3',
|
||||
4,
|
||||
44,
|
||||
t1.text_col_1,
|
||||
'string_1000'
|
||||
FROM %1$s t1
|
||||
GROUP BY t1.text_col_2, t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_7, col_8)
|
||||
SELECT
|
||||
t1.text_col_1,
|
||||
'string_1000'
|
||||
FROM dist_source_table_1 t1
|
||||
GROUP BY t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_6, col_7, col_8)
|
||||
SELECT
|
||||
'string_4',
|
||||
t1.text_col_1,
|
||||
'string_1000'
|
||||
FROM %1$s t1
|
||||
GROUP BY t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_5, col_3)
|
||||
SELECT 12, 'string_11' FROM %1$s t1
|
||||
UNION
|
||||
SELECT int_col, 'string' FROM %1$s;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table(col_3, col_2)
|
||||
SELECT text_col_1, count(*) FROM %1$s GROUP BY 1
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table(col_3, col_5)
|
||||
SELECT text_col_1, count(*)::int FROM %1$s GROUP BY 1
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
-- repeat above tests with Citus local table
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table
|
||||
SELECT
|
||||
t1.dist_col,
|
||||
1,
|
||||
'string1',
|
||||
'string2',
|
||||
2,
|
||||
'string3',
|
||||
t1.text_col_1,
|
||||
t1.text_col_2
|
||||
FROM %1$s_1 t1
|
||||
WHERE t1.int_col IN (SELECT int_col FROM %1$s_2)
|
||||
$$, 'local_source_table', 'dist_source_table');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table
|
||||
SELECT
|
||||
t1.dist_col,
|
||||
1,
|
||||
'string1',
|
||||
'string2',
|
||||
2,
|
||||
'string3',
|
||||
t1.text_col_1,
|
||||
t1.text_col_2
|
||||
FROM %1$s t1
|
||||
returning *
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_3, col_4) SELECT
|
||||
'string1',
|
||||
'string2'::text
|
||||
FROM %1$s t1
|
||||
returning *;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_7, col_4) SELECT
|
||||
'string1',
|
||||
'string2'::text
|
||||
FROM %1$s t1
|
||||
returning *;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_4, col_3) SELECT
|
||||
'string1',
|
||||
'string2'::text
|
||||
FROM %1$s t1
|
||||
WHERE dist_col = 1
|
||||
returning *;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_4, col_1)
|
||||
SELECT
|
||||
'string1',
|
||||
dist_col
|
||||
FROM %1$s
|
||||
UNION ALL
|
||||
SELECT
|
||||
'string',
|
||||
int_col
|
||||
FROM %1$s;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
|
||||
INSERT INTO local_dest_table (col_4, col_1)
|
||||
SELECT
|
||||
'string1',
|
||||
dist_col
|
||||
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
|
||||
UNION ALL
|
||||
SELECT
|
||||
'string',
|
||||
int_col
|
||||
FROM %1$s WHERE int_col IN (SELECT s + 1 FROM cte1)
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
WITH cte1 AS (SELECT 'stringcte', s FROM generate_series(1,10) s)
|
||||
INSERT INTO local_dest_table (col_4, col_1)
|
||||
SELECT
|
||||
'string1',
|
||||
dist_col
|
||||
FROM %1$s WHERE int_col IN (SELECT s FROM cte1)
|
||||
UNION ALL
|
||||
SELECT
|
||||
*
|
||||
FROM cte1
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_3)
|
||||
SELECT t1.text_col_1
|
||||
FROM %1$s t1
|
||||
GROUP BY t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_1, col_2, col_3, col_5, col_6, col_7, col_8)
|
||||
SELECT
|
||||
max(t1.dist_col),
|
||||
3,
|
||||
'string_3',
|
||||
4,
|
||||
44,
|
||||
t1.text_col_1,
|
||||
'string_1000'
|
||||
FROM %1$s t1
|
||||
GROUP BY t1.text_col_2, t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_7, col_8)
|
||||
SELECT
|
||||
t1.text_col_1,
|
||||
'string_1000'
|
||||
FROM dist_source_table_1 t1
|
||||
GROUP BY t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_6, col_7, col_8)
|
||||
SELECT
|
||||
'string_4',
|
||||
t1.text_col_1,
|
||||
'string_1000'
|
||||
FROM %1$s t1
|
||||
GROUP BY t1.text_col_1;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table (col_5, col_3)
|
||||
SELECT 12, 'string_11' FROM %1$s t1
|
||||
UNION
|
||||
SELECT int_col, 'string' FROM %1$s;
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table(col_3, col_2)
|
||||
SELECT text_col_1, count(*) FROM %1$s GROUP BY 1
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
SELECT * FROM query_results_equal($$
|
||||
INSERT INTO local_dest_table(col_3, col_5)
|
||||
SELECT text_col_1, count(*)::int FROM %1$s GROUP BY 1
|
||||
$$, 'local_source_table_1', 'dist_source_table_1');
|
||||
|
||||
-- go back to proper local table for remaining tests
|
||||
TRUNCATE local_dest_table;
|
||||
SELECT undistribute_table('local_source_table_1');
|
||||
|
||||
-- use a sequence (cannot use query_results_equal, since sequence values would not match)
|
||||
CREATE SEQUENCE seq;
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO local_dest_table (col_5, col_3)
|
||||
SELECT 12, 'string_11' FROM dist_source_table_1
|
||||
UNION
|
||||
SELECT nextval('seq'), 'string' FROM dist_source_table_1;
|
||||
SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8;
|
||||
ROLLBACK;
|
||||
|
||||
-- add a bigserial column
|
||||
ALTER TABLE local_dest_table ADD COLUMN col_9 bigserial;
|
||||
|
||||
-- not supported due to limitations in nextval handling
|
||||
INSERT INTO local_dest_table (col_5, col_3)
|
||||
SELECT 12, 'string_11' FROM dist_source_table_1
|
||||
UNION
|
||||
SELECT 11, 'string' FROM dist_source_table_1;
|
||||
SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8;
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO local_dest_table(col_3, col_2)
|
||||
SELECT text_col_1, count(*) FROM dist_source_table_1 GROUP BY 1;
|
||||
SELECT * FROM local_dest_table ORDER BY 1,2,3,4,5,6,7,8;
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO local_dest_table (col_4, col_3) SELECT
|
||||
'string1',
|
||||
'string2'::text
|
||||
FROM dist_source_table_1 t1
|
||||
WHERE dist_col = 1
|
||||
RETURNING *;
|
||||
ROLLBACK;
|
||||
|
||||
\set VERBOSITY terse
|
||||
DROP SCHEMA insert_select_into_local_table CASCADE;
|
||||
|
|
|
@ -2337,6 +2337,40 @@ SELECT * FROM raw_events_first OFFSET 0
|
|||
ON CONFLICT DO NOTHING;
|
||||
ABORT;
|
||||
|
||||
-- test fix for issue https://github.com/citusdata/citus/issues/5891
|
||||
CREATE TABLE dist_table_1(
|
||||
dist_col integer,
|
||||
int_col integer,
|
||||
text_col_1 text,
|
||||
text_col_2 text
|
||||
);
|
||||
|
||||
SELECT create_distributed_table('dist_table_1', 'dist_col');
|
||||
|
||||
INSERT INTO dist_table_1 VALUES (1, 1, 'string', 'string');
|
||||
|
||||
CREATE TABLE dist_table_2(
|
||||
dist_col integer,
|
||||
int_col integer
|
||||
);
|
||||
|
||||
SELECT create_distributed_table('dist_table_2', 'dist_col');
|
||||
|
||||
INSERT INTO dist_table_2 VALUES (1, 1);
|
||||
|
||||
with a as (select random()) INSERT INTO dist_table_1
|
||||
SELECT
|
||||
t1.dist_col,
|
||||
1,
|
||||
'string',
|
||||
'string'
|
||||
FROM a, dist_table_1 t1
|
||||
join dist_table_2 t2 using (dist_col)
|
||||
limit 1
|
||||
returning text_col_1;
|
||||
|
||||
DROP TABLE dist_table_1, dist_table_2;
|
||||
|
||||
-- wrap in a transaction to improve performance
|
||||
BEGIN;
|
||||
DROP TABLE coerce_events;
|
||||
|
|
Loading…
Reference in New Issue