From 4e8d79998ec6dc2ef10492a7ceb1a9e9a19b3d1d Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Tue, 23 Jun 2020 20:36:02 -0700 Subject: [PATCH] Save INSERT/SELECT method in DistributedPlan. This is so we don't need to calculate it twice in insert_select_executor.c and multi_explain.c, which can cause discrepancy if an update in one of them is not reflected in the other site. --- .../executor/insert_select_executor.c | 9 +-- .../distributed/planner/distributed_planner.c | 3 +- .../planner/insert_select_planner.c | 75 ++++++++++++++++--- .../distributed/planner/multi_explain.c | 21 ++---- .../distributed/utils/citus_copyfuncs.c | 1 + .../distributed/insert_select_planner.h | 3 +- .../distributed/multi_physical_planner.h | 19 ++++- .../expected/insert_select_repartition.out | 27 ++++++- .../regress/expected/multi_insert_select.out | 1 - ...lti_insert_select_non_pushable_queries.out | 8 -- .../regress/sql/insert_select_repartition.sql | 8 ++ 11 files changed, 128 insertions(+), 47 deletions(-) diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 1228f723c..276d98c92 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -136,11 +136,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) bool hasReturning = distributedPlan->expectResults; HTAB *shardStateHash = NULL; - /* select query to execute */ - Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery); - - selectRte->subquery = selectQuery; - ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); + Query *selectQuery = selectRte->subquery; /* * Cast types of insert target list and select projection list to @@ -181,8 +177,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) LockPartitionRelations(targetRelationId, RowExclusiveLock); } - if (IsRedistributablePlan(selectPlan->planTree) && - IsSupportedRedistributionTarget(targetRelationId)) + if (distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION) { ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT"))); diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 8fc89ba6b..072624025 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -913,7 +913,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi } distributedPlan = - CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext); + CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext, + boundParams); } else if (InsertSelectIntoLocalTable(originalQuery)) { diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 6dfe0dafc..c7b9f720d 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -48,6 +48,7 @@ #include "parser/parsetree.h" #include "parser/parse_coerce.h" #include "parser/parse_relation.h" +#include "tcop/tcopprot.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -77,8 +78,11 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, subqueryRte, Oid * selectPartitionColumnTableId); -static DistributedPlan * CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse); -static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery); +static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, + ParamListInfo boundParams); +static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery); +static InsertSelectMethod GetInsertSelectMethod(Query *selectQuery, Oid targetRelationId, + ParamListInfo boundParams); /* @@ -185,7 +189,8 @@ CheckInsertSelectQuery(Query *query) */ DistributedPlan * CreateInsertSelectPlan(uint64 planId, Query *originalQuery, - PlannerRestrictionContext *plannerRestrictionContext) + PlannerRestrictionContext *plannerRestrictionContext, + ParamListInfo boundParams) { DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(originalQuery); if (deferredError != NULL) @@ -201,8 +206,12 @@ CreateInsertSelectPlan(uint64 planId, Query *originalQuery, { RaiseDeferredError(distributedPlan->planningError, DEBUG1); - /* if INSERT..SELECT cannot be distributed, pull to coordinator */ - distributedPlan = CreateCoordinatorInsertSelectPlan(planId, originalQuery); + /* + * If INSERT..SELECT cannot be distributed, pull to coordinator or use + * repartitioning. + */ + distributedPlan = CreateNonPushableInsertSelectPlan(planId, originalQuery, + boundParams); } return distributedPlan; @@ -1282,14 +1291,15 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, /* - * CreateCoordinatorInsertSelectPlan creates a query plan for a SELECT into a + * CreateNonPushableInsertSelectPlan creates a query plan for a SELECT into a * distributed table. The query plan can also be executed on a worker in MX. */ static DistributedPlan * -CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) +CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams) { Query *insertSelectQuery = copyObject(parse); + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); Oid targetRelationId = insertRte->relid; @@ -1297,14 +1307,22 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery); distributedPlan->planningError = - CoordinatorInsertSelectSupported(insertSelectQuery); + NonPushableInsertSelectSupported(insertSelectQuery); if (distributedPlan->planningError != NULL) { return distributedPlan; } + Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery); + + selectRte->subquery = selectQuery; + ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); + distributedPlan->insertSelectQuery = insertSelectQuery; + distributedPlan->insertSelectMethod = GetInsertSelectMethod(selectQuery, + targetRelationId, + boundParams); distributedPlan->expectResults = insertSelectQuery->returningList != NIL; distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId); distributedPlan->targetRelationId = targetRelationId; @@ -1314,13 +1332,13 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) /* - * CoordinatorInsertSelectSupported returns an error if executing an + * NonPushableInsertSelectSupported returns an error if executing an * INSERT ... SELECT command by pulling results of the SELECT to the coordinator - * is unsupported because it needs to generate sequence values or insert into an - * append-distributed table. + * or with repartitioning is unsupported because it needs to generate sequence + * values or insert into an append-distributed table. */ static DeferredErrorMessage * -CoordinatorInsertSelectSupported(Query *insertSelectQuery) +NonPushableInsertSelectSupported(Query *insertSelectQuery) { DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported( insertSelectQuery); @@ -1355,3 +1373,36 @@ InsertSelectResultIdPrefix(uint64 planId) return resultIdPrefix->data; } + + +/* + * GetInsertSelectMethod returns the preferred INSERT INTO ... SELECT method + * based on its select query. + */ +static InsertSelectMethod +GetInsertSelectMethod(Query *selectQuery, Oid targetRelationId, ParamListInfo boundParams) +{ + Query *selectQueryCopy = copyObject(selectQuery); + + /* + * Query will be replanned in insert_select_executor to plan correctly + * for prepared statements. So turn off logging here to avoid repeated + * log messages. We use SET LOCAL here so the change is reverted on ERROR. + */ + int savedClientMinMessages = client_min_messages; + set_config_option("client_min_messages", "ERROR", + PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + + int cursorOptions = CURSOR_OPT_PARALLEL_OK; + PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, cursorOptions, + boundParams); + + client_min_messages = savedClientMinMessages; + + bool repartitioned = IsRedistributablePlan(selectPlan->planTree) && + IsSupportedRedistributionTarget(targetRelationId); + return repartitioned ? + INSERT_SELECT_REPARTITION : + INSERT_SELECT_VIA_COORDINATOR; +} diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 1f875f51f..e8012f49f 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -209,22 +209,10 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, CitusScanState *scanState = (CitusScanState *) node; DistributedPlan *distributedPlan = scanState->distributedPlan; Query *insertSelectQuery = distributedPlan->insertSelectQuery; - Query *query = BuildSelectForInsertSelect(insertSelectQuery); - RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); - Oid targetRelationId = insertRte->relid; - IntoClause *into = NULL; - ParamListInfo params = NULL; - char *queryString = NULL; - int cursorOptions = CURSOR_OPT_PARALLEL_OK; + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); + Query *query = selectRte->subquery; - /* - * Make a copy of the query, since pg_plan_query may scribble on it and later - * stages of EXPLAIN require it. - */ - Query *queryCopy = copyObject(query); - PlannedStmt *selectPlan = pg_plan_query(queryCopy, cursorOptions, params); - bool repartition = IsRedistributablePlan(selectPlan->planTree) && - IsSupportedRedistributionTarget(targetRelationId); + bool repartition = distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION; if (es->analyze) { @@ -245,6 +233,9 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, ExplainOpenGroup("Select Query", "Select Query", false, es); /* explain the inner SELECT query */ + IntoClause *into = NULL; + ParamListInfo params = NULL; + char *queryString = NULL; ExplainOneQuery(query, 0, into, es, queryString, params, NULL); ExplainCloseGroup("Select Query", "Select Query", false, es); diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index ddb45be11..0e95f3587 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -129,6 +129,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) COPY_NODE_FIELD(relationIdList); COPY_SCALAR_FIELD(targetRelationId); COPY_NODE_FIELD(insertSelectQuery); + COPY_SCALAR_FIELD(insertSelectMethod); COPY_STRING_FIELD(intermediateResultIdPrefix); COPY_NODE_FIELD(subPlanList); diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h index 84066278b..80a61b9aa 100644 --- a/src/include/distributed/insert_select_planner.h +++ b/src/include/distributed/insert_select_planner.h @@ -33,7 +33,8 @@ extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ance struct ExplainState *es); extern DistributedPlan * CreateInsertSelectPlan(uint64 planId, Query *originalQuery, PlannerRestrictionContext * - plannerRestrictionContext); + plannerRestrictionContext, + ParamListInfo boundParams); extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamListInfo diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 19bfc6abe..87381f531 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -378,6 +378,22 @@ typedef struct JoinSequenceNode } JoinSequenceNode; +/* + * InsertSelectMethod represents the method to use for INSERT INTO ... SELECT + * queries. + * + * Note that there is a third method which is not represented here, which is + * pushing down the INSERT INTO ... SELECT to workers. This method is executed + * similar to other distributed queries and doesn't need a special execution + * code, so we don't need to represent it here. + */ +typedef enum InsertSelectMethod +{ + INSERT_SELECT_VIA_COORDINATOR, + INSERT_SELECT_REPARTITION +} InsertSelectMethod; + + /* * DistributedPlan contains all information necessary to execute a * distribute query. @@ -416,8 +432,9 @@ typedef struct DistributedPlan /* target relation of a modification */ Oid targetRelationId; - /* INSERT .. SELECT via the coordinator */ + /* INSERT .. SELECT via the coordinator or repartition */ Query *insertSelectQuery; + InsertSelectMethod insertSelectMethod; /* * If intermediateResultIdPrefix is non-null, an INSERT ... SELECT diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 298f691de..cd76772c9 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -577,7 +577,7 @@ EXPLAIN INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; -- EXPLAIN ANALYZE is currently not supported -- EXPLAIN ANALYZE INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; -ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning +ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning -- -- Duplicate names in target list -- @@ -1160,6 +1160,31 @@ DO UPDATE SET -> Seq Scan on source_table_4213644 source_table (10 rows) +-- verify that we don't report repartitioned insert/select for tables +-- with sequences. See https://github.com/citusdata/citus/issues/3936 +create table table_with_sequences (x int, y int, z bigserial); +insert into table_with_sequences values (1,1); +select create_distributed_table('table_with_sequences','x'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +explain insert into table_with_sequences select y, x from table_with_sequences; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) (cost=0.00..250.00 rows=100000 width=16) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on table_with_sequences_4213648 table_with_sequences (cost=0.00..28.50 rows=1850 width=8) +(8 rows) + -- clean-up SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index d1e00a728..766aaf47a 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -710,7 +710,6 @@ INSERT INTO agg_events raw_events_first; DEBUG: CTE sub_cte is going to be inlined via distributed planning DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries -DEBUG: Router planner cannot handle multi-shard select queries ERROR: could not run distributed query with subquery outside the FROM, WHERE and HAVING clauses HINT: Consider using an equality filter on the distributed table's partition column. -- We support set operations via the coordinator diff --git a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out index a4de3c304..e84991da9 100644 --- a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out +++ b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out @@ -147,10 +147,6 @@ FROM ( GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102]))) -DEBUG: generating subplan XXX_2 for subquery SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105]))) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) -DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102]))) UNION SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105]))) ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join RESET client_min_messages; @@ -297,10 +293,6 @@ GROUP BY ORDER BY count_pay; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12)) -DEBUG: generating subplan XXX_2 for subquery SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14)) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) -DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12)) UNION SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14)) ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join RESET client_min_messages; diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index 790bcbd73..c3bfc1f4c 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -565,6 +565,14 @@ DO UPDATE SET cardinality = enriched.cardinality + excluded.cardinality, sum = enriched.sum + excluded.sum; + +-- verify that we don't report repartitioned insert/select for tables +-- with sequences. See https://github.com/citusdata/citus/issues/3936 +create table table_with_sequences (x int, y int, z bigserial); +insert into table_with_sequences values (1,1); +select create_distributed_table('table_with_sequences','x'); +explain insert into table_with_sequences select y, x from table_with_sequences; + -- clean-up SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE;