Save INSERT/SELECT method in DistributedPlan.

This is so we don't need to calculate it twice in
insert_select_executor.c and multi_explain.c, which can
cause discrepancy if an update in one of them is not
reflected in the other site.
pull/3943/head
Hadi Moshayedi 2020-06-23 20:36:02 -07:00
parent 64506143e4
commit 4e8d79998e
11 changed files with 128 additions and 47 deletions

View File

@ -136,11 +136,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
bool hasReturning = distributedPlan->expectResults;
HTAB *shardStateHash = NULL;
/* select query to execute */
Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery);
selectRte->subquery = selectQuery;
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
Query *selectQuery = selectRte->subquery;
/*
* Cast types of insert target list and select projection list to
@ -181,8 +177,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
LockPartitionRelations(targetRelationId, RowExclusiveLock);
}
if (IsRedistributablePlan(selectPlan->planTree) &&
IsSupportedRedistributionTarget(targetRelationId))
if (distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION)
{
ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT")));

View File

@ -913,7 +913,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
}
distributedPlan =
CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext);
CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext,
boundParams);
}
else if (InsertSelectIntoLocalTable(originalQuery))
{

View File

@ -48,6 +48,7 @@
#include "parser/parsetree.h"
#include "parser/parse_coerce.h"
#include "parser/parse_relation.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
@ -77,8 +78,11 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
subqueryRte,
Oid *
selectPartitionColumnTableId);
static DistributedPlan * CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse);
static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery);
static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse,
ParamListInfo boundParams);
static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery);
static InsertSelectMethod GetInsertSelectMethod(Query *selectQuery, Oid targetRelationId,
ParamListInfo boundParams);
/*
@ -185,7 +189,8 @@ CheckInsertSelectQuery(Query *query)
*/
DistributedPlan *
CreateInsertSelectPlan(uint64 planId, Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
PlannerRestrictionContext *plannerRestrictionContext,
ParamListInfo boundParams)
{
DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(originalQuery);
if (deferredError != NULL)
@ -201,8 +206,12 @@ CreateInsertSelectPlan(uint64 planId, Query *originalQuery,
{
RaiseDeferredError(distributedPlan->planningError, DEBUG1);
/* if INSERT..SELECT cannot be distributed, pull to coordinator */
distributedPlan = CreateCoordinatorInsertSelectPlan(planId, originalQuery);
/*
* If INSERT..SELECT cannot be distributed, pull to coordinator or use
* repartitioning.
*/
distributedPlan = CreateNonPushableInsertSelectPlan(planId, originalQuery,
boundParams);
}
return distributedPlan;
@ -1282,14 +1291,15 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
/*
* CreateCoordinatorInsertSelectPlan creates a query plan for a SELECT into a
* CreateNonPushableInsertSelectPlan creates a query plan for a SELECT into a
* distributed table. The query plan can also be executed on a worker in MX.
*/
static DistributedPlan *
CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse)
CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams)
{
Query *insertSelectQuery = copyObject(parse);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
Oid targetRelationId = insertRte->relid;
@ -1297,14 +1307,22 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse)
distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery);
distributedPlan->planningError =
CoordinatorInsertSelectSupported(insertSelectQuery);
NonPushableInsertSelectSupported(insertSelectQuery);
if (distributedPlan->planningError != NULL)
{
return distributedPlan;
}
Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery);
selectRte->subquery = selectQuery;
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
distributedPlan->insertSelectQuery = insertSelectQuery;
distributedPlan->insertSelectMethod = GetInsertSelectMethod(selectQuery,
targetRelationId,
boundParams);
distributedPlan->expectResults = insertSelectQuery->returningList != NIL;
distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId);
distributedPlan->targetRelationId = targetRelationId;
@ -1314,13 +1332,13 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse)
/*
* CoordinatorInsertSelectSupported returns an error if executing an
* NonPushableInsertSelectSupported returns an error if executing an
* INSERT ... SELECT command by pulling results of the SELECT to the coordinator
* is unsupported because it needs to generate sequence values or insert into an
* append-distributed table.
* or with repartitioning is unsupported because it needs to generate sequence
* values or insert into an append-distributed table.
*/
static DeferredErrorMessage *
CoordinatorInsertSelectSupported(Query *insertSelectQuery)
NonPushableInsertSelectSupported(Query *insertSelectQuery)
{
DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(
insertSelectQuery);
@ -1355,3 +1373,36 @@ InsertSelectResultIdPrefix(uint64 planId)
return resultIdPrefix->data;
}
/*
* GetInsertSelectMethod returns the preferred INSERT INTO ... SELECT method
* based on its select query.
*/
static InsertSelectMethod
GetInsertSelectMethod(Query *selectQuery, Oid targetRelationId, ParamListInfo boundParams)
{
Query *selectQueryCopy = copyObject(selectQuery);
/*
* Query will be replanned in insert_select_executor to plan correctly
* for prepared statements. So turn off logging here to avoid repeated
* log messages. We use SET LOCAL here so the change is reverted on ERROR.
*/
int savedClientMinMessages = client_min_messages;
set_config_option("client_min_messages", "ERROR",
PGC_USERSET, PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, cursorOptions,
boundParams);
client_min_messages = savedClientMinMessages;
bool repartitioned = IsRedistributablePlan(selectPlan->planTree) &&
IsSupportedRedistributionTarget(targetRelationId);
return repartitioned ?
INSERT_SELECT_REPARTITION :
INSERT_SELECT_VIA_COORDINATOR;
}

View File

@ -209,22 +209,10 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors,
CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *distributedPlan = scanState->distributedPlan;
Query *insertSelectQuery = distributedPlan->insertSelectQuery;
Query *query = BuildSelectForInsertSelect(insertSelectQuery);
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
Oid targetRelationId = insertRte->relid;
IntoClause *into = NULL;
ParamListInfo params = NULL;
char *queryString = NULL;
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
Query *query = selectRte->subquery;
/*
* Make a copy of the query, since pg_plan_query may scribble on it and later
* stages of EXPLAIN require it.
*/
Query *queryCopy = copyObject(query);
PlannedStmt *selectPlan = pg_plan_query(queryCopy, cursorOptions, params);
bool repartition = IsRedistributablePlan(selectPlan->planTree) &&
IsSupportedRedistributionTarget(targetRelationId);
bool repartition = distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION;
if (es->analyze)
{
@ -245,6 +233,9 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors,
ExplainOpenGroup("Select Query", "Select Query", false, es);
/* explain the inner SELECT query */
IntoClause *into = NULL;
ParamListInfo params = NULL;
char *queryString = NULL;
ExplainOneQuery(query, 0, into, es, queryString, params, NULL);
ExplainCloseGroup("Select Query", "Select Query", false, es);

View File

@ -129,6 +129,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS)
COPY_NODE_FIELD(relationIdList);
COPY_SCALAR_FIELD(targetRelationId);
COPY_NODE_FIELD(insertSelectQuery);
COPY_SCALAR_FIELD(insertSelectMethod);
COPY_STRING_FIELD(intermediateResultIdPrefix);
COPY_NODE_FIELD(subPlanList);

View File

@ -33,7 +33,8 @@ extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ance
struct ExplainState *es);
extern DistributedPlan * CreateInsertSelectPlan(uint64 planId, Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
plannerRestrictionContext,
ParamListInfo boundParams);
extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId,
Query *originalQuery,
ParamListInfo

View File

@ -378,6 +378,22 @@ typedef struct JoinSequenceNode
} JoinSequenceNode;
/*
* InsertSelectMethod represents the method to use for INSERT INTO ... SELECT
* queries.
*
* Note that there is a third method which is not represented here, which is
* pushing down the INSERT INTO ... SELECT to workers. This method is executed
* similar to other distributed queries and doesn't need a special execution
* code, so we don't need to represent it here.
*/
typedef enum InsertSelectMethod
{
INSERT_SELECT_VIA_COORDINATOR,
INSERT_SELECT_REPARTITION
} InsertSelectMethod;
/*
* DistributedPlan contains all information necessary to execute a
* distribute query.
@ -416,8 +432,9 @@ typedef struct DistributedPlan
/* target relation of a modification */
Oid targetRelationId;
/* INSERT .. SELECT via the coordinator */
/* INSERT .. SELECT via the coordinator or repartition */
Query *insertSelectQuery;
InsertSelectMethod insertSelectMethod;
/*
* If intermediateResultIdPrefix is non-null, an INSERT ... SELECT

View File

@ -577,7 +577,7 @@ EXPLAIN INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
-- EXPLAIN ANALYZE is currently not supported
--
EXPLAIN ANALYZE INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning
ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning
--
-- Duplicate names in target list
--
@ -1160,6 +1160,31 @@ DO UPDATE SET
-> Seq Scan on source_table_4213644 source_table
(10 rows)
-- verify that we don't report repartitioned insert/select for tables
-- with sequences. See https://github.com/citusdata/citus/issues/3936
create table table_with_sequences (x int, y int, z bigserial);
insert into table_with_sequences values (1,1);
select create_distributed_table('table_with_sequences','x');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
create_distributed_table
---------------------------------------------------------------------
(1 row)
explain insert into table_with_sequences select y, x from table_with_sequences;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0)
INSERT/SELECT method: pull to coordinator
-> Custom Scan (Citus Adaptive) (cost=0.00..250.00 rows=100000 width=16)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on table_with_sequences_4213648 table_with_sequences (cost=0.00..28.50 rows=1850 width=8)
(8 rows)
-- clean-up
SET client_min_messages TO WARNING;
DROP SCHEMA insert_select_repartition CASCADE;

View File

@ -710,7 +710,6 @@ INSERT INTO agg_events
raw_events_first;
DEBUG: CTE sub_cte is going to be inlined via distributed planning
DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Router planner cannot handle multi-shard select queries
ERROR: could not run distributed query with subquery outside the FROM, WHERE and HAVING clauses
HINT: Consider using an equality filter on the distributed table's partition column.
-- We support set operations via the coordinator

View File

@ -147,10 +147,6 @@ FROM (
GROUP BY t1.user_id, hasdone_event
) t GROUP BY user_id, hasdone_event;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102])))
DEBUG: generating subplan XXX_2 for subquery SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105])))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone)
DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102]))) UNION SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105])))
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join
RESET client_min_messages;
@ -297,10 +293,6 @@ GROUP BY
ORDER BY
count_pay;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12))
DEBUG: generating subplan XXX_2 for subquery SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone)
DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12)) UNION SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14))
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join
RESET client_min_messages;

View File

@ -565,6 +565,14 @@ DO UPDATE SET
cardinality = enriched.cardinality + excluded.cardinality,
sum = enriched.sum + excluded.sum;
-- verify that we don't report repartitioned insert/select for tables
-- with sequences. See https://github.com/citusdata/citus/issues/3936
create table table_with_sequences (x int, y int, z bigserial);
insert into table_with_sequences values (1,1);
select create_distributed_table('table_with_sequences','x');
explain insert into table_with_sequences select y, x from table_with_sequences;
-- clean-up
SET client_min_messages TO WARNING;
DROP SCHEMA insert_select_repartition CASCADE;