mirror of https://github.com/citusdata/citus.git
Fix other insert..select issues
parent
deb9e54c61
commit
2892e07e32
|
@ -55,7 +55,6 @@
|
|||
bool EnableRepartitionedInsertSelect = true;
|
||||
|
||||
|
||||
static Query * WrapSubquery(Query *subquery);
|
||||
static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
|
||||
char *resultIdPrefix);
|
||||
static void ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
|
||||
|
@ -299,100 +298,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildSelectForInsertSelect extracts the SELECT part from an INSERT...SELECT query.
|
||||
* If the INSERT...SELECT has CTEs then these are added to the resulting SELECT instead.
|
||||
*/
|
||||
Query *
|
||||
BuildSelectForInsertSelect(Query *insertSelectQuery)
|
||||
{
|
||||
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
||||
Query *selectQuery = selectRte->subquery;
|
||||
|
||||
/*
|
||||
* Wrap the SELECT as a subquery if the INSERT...SELECT has CTEs or the SELECT
|
||||
* has top-level set operations.
|
||||
*
|
||||
* We could simply wrap all queries, but that might create a subquery that is
|
||||
* not supported by the logical planner. Since the logical planner also does
|
||||
* not support CTEs and top-level set operations, we can wrap queries containing
|
||||
* those without breaking anything.
|
||||
*/
|
||||
if (list_length(insertSelectQuery->cteList) > 0)
|
||||
{
|
||||
selectQuery = WrapSubquery(selectRte->subquery);
|
||||
|
||||
/* copy CTEs from the INSERT ... SELECT statement into outer SELECT */
|
||||
selectQuery->cteList = copyObject(insertSelectQuery->cteList);
|
||||
selectQuery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE;
|
||||
}
|
||||
else if (selectQuery->setOperations != NULL)
|
||||
{
|
||||
/* top-level set operations confuse the ReorderInsertSelectTargetLists logic */
|
||||
selectQuery = WrapSubquery(selectRte->subquery);
|
||||
}
|
||||
|
||||
return selectQuery;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* WrapSubquery wraps the given query as a subquery in a newly constructed
|
||||
* "SELECT * FROM (...subquery...) citus_insert_select_subquery" query.
|
||||
*/
|
||||
static Query *
|
||||
WrapSubquery(Query *subquery)
|
||||
{
|
||||
ParseState *pstate = make_parsestate(NULL);
|
||||
List *newTargetList = NIL;
|
||||
|
||||
Query *outerQuery = makeNode(Query);
|
||||
outerQuery->commandType = CMD_SELECT;
|
||||
|
||||
/* create range table entries */
|
||||
Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL);
|
||||
RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem(
|
||||
addRangeTableEntryForSubquery(
|
||||
pstate, subquery,
|
||||
selectAlias, false, true));
|
||||
outerQuery->rtable = list_make1(newRangeTableEntry);
|
||||
|
||||
/* set the FROM expression to the subquery */
|
||||
RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
|
||||
newRangeTableRef->rtindex = 1;
|
||||
outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
|
||||
|
||||
/* create a target list that matches the SELECT */
|
||||
TargetEntry *selectTargetEntry = NULL;
|
||||
foreach_ptr(selectTargetEntry, subquery->targetList)
|
||||
{
|
||||
/* exactly 1 entry in FROM */
|
||||
int indexInRangeTable = 1;
|
||||
|
||||
if (selectTargetEntry->resjunk)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno,
|
||||
exprType((Node *) selectTargetEntry->expr),
|
||||
exprTypmod((Node *) selectTargetEntry->expr),
|
||||
exprCollation((Node *) selectTargetEntry->expr), 0);
|
||||
|
||||
TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar,
|
||||
selectTargetEntry->resno,
|
||||
selectTargetEntry->resname,
|
||||
selectTargetEntry->resjunk);
|
||||
|
||||
newTargetList = lappend(newTargetList, newSelectTargetEntry);
|
||||
}
|
||||
|
||||
outerQuery->targetList = newTargetList;
|
||||
|
||||
return outerQuery;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TwoPhaseInsertSelectTaskList generates a list of tasks for a query that
|
||||
* inserts into a target relation and selects from a set of co-located
|
||||
|
|
|
@ -83,6 +83,7 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
|
|||
static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse,
|
||||
ParamListInfo boundParams);
|
||||
static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery);
|
||||
static Query * WrapSubquery(Query *subquery);
|
||||
static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList);
|
||||
static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
||||
Oid targetRelationId);
|
||||
|
@ -377,8 +378,17 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery,
|
|||
RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery);
|
||||
Oid targetRelationId = insertRte->relid;
|
||||
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
||||
Query *selectQuery = selectRte->subquery;
|
||||
bool hasSetOperations = false;
|
||||
|
||||
Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery);
|
||||
if (selectQuery->setOperations != NULL)
|
||||
{
|
||||
selectQuery = selectRte->subquery = WrapSubquery(selectQuery);
|
||||
hasSetOperations = true;
|
||||
}
|
||||
|
||||
/* this is required for correct deparsing of the query */
|
||||
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
||||
|
||||
/*
|
||||
* Cast types of insert target list and select projection list to
|
||||
|
@ -386,10 +396,21 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery,
|
|||
*/
|
||||
selectQuery->targetList =
|
||||
AddInsertSelectCasts(insertSelectQuery->targetList,
|
||||
selectQuery->targetList,
|
||||
copyObject(selectQuery->targetList),
|
||||
targetRelationId);
|
||||
|
||||
insertSelectQuery->cteList = NIL;
|
||||
if (list_length(insertSelectQuery->cteList) > 0)
|
||||
{
|
||||
if (!hasSetOperations)
|
||||
{
|
||||
selectQuery = selectRte->subquery = WrapSubquery(selectQuery);
|
||||
}
|
||||
|
||||
/* copy CTEs from the INSERT ... SELECT statement into outer SELECT */
|
||||
selectQuery->cteList = copyObject(insertSelectQuery->cteList);
|
||||
selectQuery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE;
|
||||
insertSelectQuery->cteList = NIL;
|
||||
}
|
||||
|
||||
DistributedPlan *distPlan = CreateDistributedPlan(planId, selectQuery,
|
||||
copyObject(selectQuery),
|
||||
|
@ -895,11 +916,9 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
|
|||
List *newSubqueryTargetlist = NIL;
|
||||
List *newInsertTargetlist = NIL;
|
||||
int resno = 1;
|
||||
Index insertTableId = 1;
|
||||
Index selectTableId = 2;
|
||||
int targetEntryIndex = 0;
|
||||
|
||||
AssertArg(InsertSelectIntoCitusTable(originalQuery));
|
||||
|
||||
Query *subquery = subqueryRte->subquery;
|
||||
|
||||
Oid insertRelationId = insertRte->relid;
|
||||
|
@ -974,7 +993,7 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
|
|||
*/
|
||||
Assert(!newSubqueryTargetEntry->resjunk);
|
||||
|
||||
Var *newInsertVar = makeVar(insertTableId, originalAttrNo,
|
||||
Var *newInsertVar = makeVar(selectTableId, resno,
|
||||
exprType((Node *) newSubqueryTargetEntry->expr),
|
||||
exprTypmod((Node *) newSubqueryTargetEntry->expr),
|
||||
exprCollation((Node *) newSubqueryTargetEntry->expr),
|
||||
|
@ -1425,9 +1444,15 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
|
|||
return distributedPlan;
|
||||
}
|
||||
|
||||
Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery);
|
||||
Query *selectQuery = selectRte->subquery;
|
||||
bool hasSetOperations = false;
|
||||
|
||||
if (selectQuery->setOperations != NULL)
|
||||
{
|
||||
selectQuery = selectRte->subquery = WrapSubquery(selectQuery);
|
||||
hasSetOperations = true;
|
||||
}
|
||||
|
||||
selectRte->subquery = selectQuery;
|
||||
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
||||
|
||||
/*
|
||||
|
@ -1439,6 +1464,19 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
|
|||
selectQuery->targetList,
|
||||
targetRelationId);
|
||||
|
||||
if (list_length(insertSelectQuery->cteList) > 0)
|
||||
{
|
||||
if (!hasSetOperations)
|
||||
{
|
||||
selectQuery = selectRte->subquery = WrapSubquery(selectQuery);
|
||||
}
|
||||
|
||||
/* copy CTEs from the INSERT ... SELECT statement into outer SELECT */
|
||||
selectQuery->cteList = copyObject(insertSelectQuery->cteList);
|
||||
selectQuery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE;
|
||||
insertSelectQuery->cteList = NIL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Later we might need to call WrapTaskListForProjection(), which requires
|
||||
* that select target list has unique names, otherwise the outer query
|
||||
|
@ -1519,6 +1557,63 @@ InsertSelectResultIdPrefix(uint64 planId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* WrapSubquery wraps the given query as a subquery in a newly constructed
|
||||
* "SELECT * FROM (...subquery...) citus_insert_select_subquery" query.
|
||||
*/
|
||||
static Query *
|
||||
WrapSubquery(Query *subquery)
|
||||
{
|
||||
ParseState *pstate = make_parsestate(NULL);
|
||||
List *newTargetList = NIL;
|
||||
|
||||
Query *outerQuery = makeNode(Query);
|
||||
outerQuery->commandType = CMD_SELECT;
|
||||
|
||||
/* create range table entries */
|
||||
Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL);
|
||||
RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem(
|
||||
addRangeTableEntryForSubquery(
|
||||
pstate, subquery,
|
||||
selectAlias, false, true));
|
||||
outerQuery->rtable = list_make1(newRangeTableEntry);
|
||||
|
||||
/* set the FROM expression to the subquery */
|
||||
RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
|
||||
newRangeTableRef->rtindex = 1;
|
||||
outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
|
||||
|
||||
/* create a target list that matches the SELECT */
|
||||
TargetEntry *selectTargetEntry = NULL;
|
||||
foreach_ptr(selectTargetEntry, subquery->targetList)
|
||||
{
|
||||
/* exactly 1 entry in FROM */
|
||||
int indexInRangeTable = 1;
|
||||
|
||||
if (selectTargetEntry->resjunk)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno,
|
||||
exprType((Node *) selectTargetEntry->expr),
|
||||
exprTypmod((Node *) selectTargetEntry->expr),
|
||||
exprCollation((Node *) selectTargetEntry->expr), 0);
|
||||
|
||||
TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar,
|
||||
selectTargetEntry->resno,
|
||||
selectTargetEntry->resname,
|
||||
selectTargetEntry->resjunk);
|
||||
|
||||
newTargetList = lappend(newTargetList, newSelectTargetEntry);
|
||||
}
|
||||
|
||||
outerQuery->targetList = newTargetList;
|
||||
|
||||
return outerQuery;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RelabelTargetEntryList relabels select target list to have matching names with
|
||||
* insert target list.
|
||||
|
@ -1565,8 +1660,8 @@ AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
|||
Oid targetType = attr->atttypid;
|
||||
if (sourceType != targetType)
|
||||
{
|
||||
insertEntry->expr = CastExpr(insertEntry->expr, sourceType, targetType,
|
||||
attr->attcollation, attr->atttypmod);
|
||||
insertEntry->expr = CastExpr(copyObject(insertEntry->expr), sourceType,
|
||||
targetType, attr->attcollation, attr->atttypmod);
|
||||
|
||||
/*
|
||||
* We cannot modify the selectEntry in-place, because ORDER BY or
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
extern bool EnableRepartitionedInsertSelect;
|
||||
|
||||
extern TupleTableSlot * NonPushableInsertSelectExecScan(CustomScanState *node);
|
||||
extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery);
|
||||
extern bool IsSupportedRedistributionTarget(Oid targetRelationId);
|
||||
extern bool IsRedistributablePlan(Plan *selectPlan);
|
||||
|
||||
|
|
|
@ -896,8 +896,8 @@ HAVING (max(table_2.value) >= (SELECT value FROM a));
|
|||
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
|
||||
DEBUG: push down of limit count: 1
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
DEBUG: Subplan XXX_1 will be written to local file
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
|
@ -905,9 +905,9 @@ DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
|||
NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint
|
||||
NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint
|
||||
DEBUG: Subplan XXX_2 will be written to local file
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
||||
NOTICE: executing the command locally: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||
NOTICE: executing the command locally: SELECT count(*) AS auto_coerced_by_citus_0, (worker_column_1)::text AS auto_coerced_by_citus_1, worker_column_1 AS discarded_target_item_1, max(worker_column_2) AS worker_column_4 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
||||
NOTICE: executing the command locally: SELECT count(*) AS auto_coerced_by_citus_0, (worker_column_1)::text AS auto_coerced_by_citus_1, worker_column_1 AS discarded_target_item_1, max(worker_column_2) AS worker_column_4 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
||||
NOTICE: executing the command locally: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
WITH stats AS (
|
||||
SELECT count(key) m FROM table_1
|
||||
|
|
|
@ -500,7 +500,7 @@ INSERT INTO target_table
|
|||
SELECT mapped_key, c FROM t NATURAL JOIN source_table;
|
||||
DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: generating subplan XXX_1 for CTE t: SELECT mapped_key, a, c FROM insert_select_repartition.source_table WHERE ((a)::double precision OPERATOR(pg_catalog.>) floor(random()))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key AS a, (c)::integer[] AS b FROM (SELECT t.mapped_key, t.c FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key AS a, auto_coerced_by_citus_1 AS b FROM (SELECT t.mapped_key, (t.c)::integer[] AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery
|
||||
DEBUG: performing repartitioned INSERT ... SELECT
|
||||
RESET client_min_messages;
|
||||
SELECT * FROM target_table ORDER BY a;
|
||||
|
|
|
@ -1411,21 +1411,20 @@ WITH cte1 AS (SELECT * FROM cte1 WHERE EXISTS (SELECT * FROM cte1) LIMIT 5)
|
|||
SELECT s FROM cte1 WHERE EXISTS (SELECT * FROM cte1);
|
||||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> Subquery Scan on citus_insert_select_subquery
|
||||
-> Result
|
||||
One-Time Filter: $3
|
||||
CTE cte1
|
||||
-> Function Scan on generate_series s
|
||||
-> Result
|
||||
One-Time Filter: $3
|
||||
CTE cte1
|
||||
-> Limit
|
||||
InitPlan 2 (returns $1)
|
||||
-> CTE Scan on cte1 cte1_1
|
||||
-> Result
|
||||
One-Time Filter: $1
|
||||
-> CTE Scan on cte1 cte1_2
|
||||
InitPlan 4 (returns $3)
|
||||
-> CTE Scan on cte1 cte1_3
|
||||
-> CTE Scan on cte1
|
||||
CTE cte1
|
||||
-> Limit
|
||||
InitPlan 2 (returns $1)
|
||||
-> CTE Scan on cte1 cte1_1
|
||||
-> Result
|
||||
One-Time Filter: $1
|
||||
-> CTE Scan on cte1 cte1_2
|
||||
InitPlan 4 (returns $3)
|
||||
-> CTE Scan on cte1 cte1_3
|
||||
-> CTE Scan on cte1
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO lineitem_hash_part
|
||||
( SELECT s FROM generate_series(1,5) s) UNION
|
||||
|
|
|
@ -669,7 +669,7 @@ DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
|||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: generating subplan XXX_1 for CTE fist_table_agg: SELECT (max(value_1) OPERATOR(pg_catalog.+) 1) AS v1_agg, user_id FROM public.raw_events_first GROUP BY user_id
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, v1_agg AS value_1_agg FROM (SELECT fist_table_agg.v1_agg, fist_table_agg.user_id FROM (SELECT intermediate_result.v1_agg, intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v1_agg integer, user_id integer)) fist_table_agg) citus_insert_select_subquery
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, v1_agg AS value_1_agg FROM (SELECT fist_table_agg.user_id, fist_table_agg.v1_agg FROM (SELECT intermediate_result.v1_agg, intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v1_agg integer, user_id integer)) fist_table_agg) citus_insert_select_subquery
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
ROLLBACK;
|
||||
|
@ -2714,7 +2714,6 @@ WITH top10 AS (
|
|||
)
|
||||
INSERT INTO dist_table_with_sequence (value_1)
|
||||
SELECT * FROM top10;
|
||||
ERROR: cannot handle complex subqueries when the router executor is disabled
|
||||
SELECT * FROM dist_table_with_sequence ORDER BY user_id, value_1;
|
||||
user_id | value_1
|
||||
---------------------------------------------------------------------
|
||||
|
@ -2799,7 +2798,6 @@ WITH top10 AS (
|
|||
)
|
||||
INSERT INTO dist_table_with_user_sequence (value_1)
|
||||
SELECT * FROM top10;
|
||||
ERROR: cannot handle complex subqueries when the router executor is disabled
|
||||
SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1;
|
||||
user_id | value_1
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -78,8 +78,8 @@ HAVING (max(table_2.value) >= (SELECT value FROM a));
|
|||
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
|
||||
DEBUG: push down of limit count: 1
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
DEBUG: Subplan XXX_1 will be written to local file
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
|
@ -139,8 +139,8 @@ HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
|
|||
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1
|
||||
DEBUG: push down of limit count: 1
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
DEBUG: Subplan XXX_1 will be written to local file
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
|
@ -204,8 +204,8 @@ HAVING (max(table_2.value) >= (SELECT value FROM a));
|
|||
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
|
||||
DEBUG: push down of limit count: 1
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
DEBUG: Subplan XXX_1 will be written to local file
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
|
@ -265,8 +265,8 @@ HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
|
|||
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
|
||||
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1
|
||||
DEBUG: push down of limit count: 1
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
DEBUG: Subplan XXX_1 will be written to local file
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
|
|
Loading…
Reference in New Issue