From 4e8d79998ec6dc2ef10492a7ceb1a9e9a19b3d1d Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Tue, 23 Jun 2020 20:36:02 -0700 Subject: [PATCH 1/4] Save INSERT/SELECT method in DistributedPlan. This is so we don't need to calculate it twice in insert_select_executor.c and multi_explain.c, which can cause discrepancy if an update in one of them is not reflected in the other site. --- .../executor/insert_select_executor.c | 9 +-- .../distributed/planner/distributed_planner.c | 3 +- .../planner/insert_select_planner.c | 75 ++++++++++++++++--- .../distributed/planner/multi_explain.c | 21 ++---- .../distributed/utils/citus_copyfuncs.c | 1 + .../distributed/insert_select_planner.h | 3 +- .../distributed/multi_physical_planner.h | 19 ++++- .../expected/insert_select_repartition.out | 27 ++++++- .../regress/expected/multi_insert_select.out | 1 - ...lti_insert_select_non_pushable_queries.out | 8 -- .../regress/sql/insert_select_repartition.sql | 8 ++ 11 files changed, 128 insertions(+), 47 deletions(-) diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 1228f723c..276d98c92 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -136,11 +136,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) bool hasReturning = distributedPlan->expectResults; HTAB *shardStateHash = NULL; - /* select query to execute */ - Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery); - - selectRte->subquery = selectQuery; - ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); + Query *selectQuery = selectRte->subquery; /* * Cast types of insert target list and select projection list to @@ -181,8 +177,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) LockPartitionRelations(targetRelationId, RowExclusiveLock); } - if (IsRedistributablePlan(selectPlan->planTree) && - IsSupportedRedistributionTarget(targetRelationId)) + if (distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION) { ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT"))); diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 8fc89ba6b..072624025 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -913,7 +913,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi } distributedPlan = - CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext); + CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext, + boundParams); } else if (InsertSelectIntoLocalTable(originalQuery)) { diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 6dfe0dafc..c7b9f720d 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -48,6 +48,7 @@ #include "parser/parsetree.h" #include "parser/parse_coerce.h" #include "parser/parse_relation.h" +#include "tcop/tcopprot.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -77,8 +78,11 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, subqueryRte, Oid * selectPartitionColumnTableId); -static DistributedPlan * CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse); -static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery); +static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, + ParamListInfo boundParams); +static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery); +static InsertSelectMethod GetInsertSelectMethod(Query *selectQuery, Oid targetRelationId, + ParamListInfo boundParams); /* @@ -185,7 +189,8 @@ CheckInsertSelectQuery(Query *query) */ DistributedPlan * CreateInsertSelectPlan(uint64 planId, Query *originalQuery, - PlannerRestrictionContext *plannerRestrictionContext) + PlannerRestrictionContext *plannerRestrictionContext, + ParamListInfo boundParams) { DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(originalQuery); if (deferredError != NULL) @@ -201,8 +206,12 @@ CreateInsertSelectPlan(uint64 planId, Query *originalQuery, { RaiseDeferredError(distributedPlan->planningError, DEBUG1); - /* if INSERT..SELECT cannot be distributed, pull to coordinator */ - distributedPlan = CreateCoordinatorInsertSelectPlan(planId, originalQuery); + /* + * If INSERT..SELECT cannot be distributed, pull to coordinator or use + * repartitioning. + */ + distributedPlan = CreateNonPushableInsertSelectPlan(planId, originalQuery, + boundParams); } return distributedPlan; @@ -1282,14 +1291,15 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, /* - * CreateCoordinatorInsertSelectPlan creates a query plan for a SELECT into a + * CreateNonPushableInsertSelectPlan creates a query plan for a SELECT into a * distributed table. The query plan can also be executed on a worker in MX. */ static DistributedPlan * -CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) +CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams) { Query *insertSelectQuery = copyObject(parse); + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); Oid targetRelationId = insertRte->relid; @@ -1297,14 +1307,22 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery); distributedPlan->planningError = - CoordinatorInsertSelectSupported(insertSelectQuery); + NonPushableInsertSelectSupported(insertSelectQuery); if (distributedPlan->planningError != NULL) { return distributedPlan; } + Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery); + + selectRte->subquery = selectQuery; + ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); + distributedPlan->insertSelectQuery = insertSelectQuery; + distributedPlan->insertSelectMethod = GetInsertSelectMethod(selectQuery, + targetRelationId, + boundParams); distributedPlan->expectResults = insertSelectQuery->returningList != NIL; distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId); distributedPlan->targetRelationId = targetRelationId; @@ -1314,13 +1332,13 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) /* - * CoordinatorInsertSelectSupported returns an error if executing an + * NonPushableInsertSelectSupported returns an error if executing an * INSERT ... SELECT command by pulling results of the SELECT to the coordinator - * is unsupported because it needs to generate sequence values or insert into an - * append-distributed table. + * or with repartitioning is unsupported because it needs to generate sequence + * values or insert into an append-distributed table. */ static DeferredErrorMessage * -CoordinatorInsertSelectSupported(Query *insertSelectQuery) +NonPushableInsertSelectSupported(Query *insertSelectQuery) { DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported( insertSelectQuery); @@ -1355,3 +1373,36 @@ InsertSelectResultIdPrefix(uint64 planId) return resultIdPrefix->data; } + + +/* + * GetInsertSelectMethod returns the preferred INSERT INTO ... SELECT method + * based on its select query. + */ +static InsertSelectMethod +GetInsertSelectMethod(Query *selectQuery, Oid targetRelationId, ParamListInfo boundParams) +{ + Query *selectQueryCopy = copyObject(selectQuery); + + /* + * Query will be replanned in insert_select_executor to plan correctly + * for prepared statements. So turn off logging here to avoid repeated + * log messages. We use SET LOCAL here so the change is reverted on ERROR. + */ + int savedClientMinMessages = client_min_messages; + set_config_option("client_min_messages", "ERROR", + PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + + int cursorOptions = CURSOR_OPT_PARALLEL_OK; + PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, cursorOptions, + boundParams); + + client_min_messages = savedClientMinMessages; + + bool repartitioned = IsRedistributablePlan(selectPlan->planTree) && + IsSupportedRedistributionTarget(targetRelationId); + return repartitioned ? + INSERT_SELECT_REPARTITION : + INSERT_SELECT_VIA_COORDINATOR; +} diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 1f875f51f..e8012f49f 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -209,22 +209,10 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, CitusScanState *scanState = (CitusScanState *) node; DistributedPlan *distributedPlan = scanState->distributedPlan; Query *insertSelectQuery = distributedPlan->insertSelectQuery; - Query *query = BuildSelectForInsertSelect(insertSelectQuery); - RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); - Oid targetRelationId = insertRte->relid; - IntoClause *into = NULL; - ParamListInfo params = NULL; - char *queryString = NULL; - int cursorOptions = CURSOR_OPT_PARALLEL_OK; + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); + Query *query = selectRte->subquery; - /* - * Make a copy of the query, since pg_plan_query may scribble on it and later - * stages of EXPLAIN require it. - */ - Query *queryCopy = copyObject(query); - PlannedStmt *selectPlan = pg_plan_query(queryCopy, cursorOptions, params); - bool repartition = IsRedistributablePlan(selectPlan->planTree) && - IsSupportedRedistributionTarget(targetRelationId); + bool repartition = distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION; if (es->analyze) { @@ -245,6 +233,9 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, ExplainOpenGroup("Select Query", "Select Query", false, es); /* explain the inner SELECT query */ + IntoClause *into = NULL; + ParamListInfo params = NULL; + char *queryString = NULL; ExplainOneQuery(query, 0, into, es, queryString, params, NULL); ExplainCloseGroup("Select Query", "Select Query", false, es); diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index ddb45be11..0e95f3587 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -129,6 +129,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) COPY_NODE_FIELD(relationIdList); COPY_SCALAR_FIELD(targetRelationId); COPY_NODE_FIELD(insertSelectQuery); + COPY_SCALAR_FIELD(insertSelectMethod); COPY_STRING_FIELD(intermediateResultIdPrefix); COPY_NODE_FIELD(subPlanList); diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h index 84066278b..80a61b9aa 100644 --- a/src/include/distributed/insert_select_planner.h +++ b/src/include/distributed/insert_select_planner.h @@ -33,7 +33,8 @@ extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ance struct ExplainState *es); extern DistributedPlan * CreateInsertSelectPlan(uint64 planId, Query *originalQuery, PlannerRestrictionContext * - plannerRestrictionContext); + plannerRestrictionContext, + ParamListInfo boundParams); extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamListInfo diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 19bfc6abe..87381f531 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -378,6 +378,22 @@ typedef struct JoinSequenceNode } JoinSequenceNode; +/* + * InsertSelectMethod represents the method to use for INSERT INTO ... SELECT + * queries. + * + * Note that there is a third method which is not represented here, which is + * pushing down the INSERT INTO ... SELECT to workers. This method is executed + * similar to other distributed queries and doesn't need a special execution + * code, so we don't need to represent it here. + */ +typedef enum InsertSelectMethod +{ + INSERT_SELECT_VIA_COORDINATOR, + INSERT_SELECT_REPARTITION +} InsertSelectMethod; + + /* * DistributedPlan contains all information necessary to execute a * distribute query. @@ -416,8 +432,9 @@ typedef struct DistributedPlan /* target relation of a modification */ Oid targetRelationId; - /* INSERT .. SELECT via the coordinator */ + /* INSERT .. SELECT via the coordinator or repartition */ Query *insertSelectQuery; + InsertSelectMethod insertSelectMethod; /* * If intermediateResultIdPrefix is non-null, an INSERT ... SELECT diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 298f691de..cd76772c9 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -577,7 +577,7 @@ EXPLAIN INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; -- EXPLAIN ANALYZE is currently not supported -- EXPLAIN ANALYZE INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; -ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning +ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning -- -- Duplicate names in target list -- @@ -1160,6 +1160,31 @@ DO UPDATE SET -> Seq Scan on source_table_4213644 source_table (10 rows) +-- verify that we don't report repartitioned insert/select for tables +-- with sequences. See https://github.com/citusdata/citus/issues/3936 +create table table_with_sequences (x int, y int, z bigserial); +insert into table_with_sequences values (1,1); +select create_distributed_table('table_with_sequences','x'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +explain insert into table_with_sequences select y, x from table_with_sequences; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) (cost=0.00..250.00 rows=100000 width=16) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on table_with_sequences_4213648 table_with_sequences (cost=0.00..28.50 rows=1850 width=8) +(8 rows) + -- clean-up SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index d1e00a728..766aaf47a 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -710,7 +710,6 @@ INSERT INTO agg_events raw_events_first; DEBUG: CTE sub_cte is going to be inlined via distributed planning DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries -DEBUG: Router planner cannot handle multi-shard select queries ERROR: could not run distributed query with subquery outside the FROM, WHERE and HAVING clauses HINT: Consider using an equality filter on the distributed table's partition column. -- We support set operations via the coordinator diff --git a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out index a4de3c304..e84991da9 100644 --- a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out +++ b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out @@ -147,10 +147,6 @@ FROM ( GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102]))) -DEBUG: generating subplan XXX_2 for subquery SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105]))) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) -DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102]))) UNION SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105]))) ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join RESET client_min_messages; @@ -297,10 +293,6 @@ GROUP BY ORDER BY count_pay; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12)) -DEBUG: generating subplan XXX_2 for subquery SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14)) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) -DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12)) UNION SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14)) ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join RESET client_min_messages; diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index 790bcbd73..c3bfc1f4c 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -565,6 +565,14 @@ DO UPDATE SET cardinality = enriched.cardinality + excluded.cardinality, sum = enriched.sum + excluded.sum; + +-- verify that we don't report repartitioned insert/select for tables +-- with sequences. See https://github.com/citusdata/citus/issues/3936 +create table table_with_sequences (x int, y int, z bigserial); +insert into table_with_sequences values (1,1); +select create_distributed_table('table_with_sequences','x'); +explain insert into table_with_sequences select y, x from table_with_sequences; + -- clean-up SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; From cd25a271746a1ebcacb4314a053747a2f72c0db7 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Wed, 24 Jun 2020 11:31:49 -0700 Subject: [PATCH 2/4] Fix crash caused by EXPLAIN EXECUTE INSERT ... SELECT --- .../distributed/planner/multi_explain.c | 10 ++- .../expected/insert_select_repartition.out | 90 +++++++++++++++++++ .../regress/sql/insert_select_repartition.sql | 54 +++++++++++ 3 files changed, 152 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index e8012f49f..cf4f089f9 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -210,7 +210,13 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, DistributedPlan *distributedPlan = scanState->distributedPlan; Query *insertSelectQuery = distributedPlan->insertSelectQuery; RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); - Query *query = selectRte->subquery; + + /* + * Create a copy because ExplainOneQuery can modify the query, and later + * executions of prepared statements might require it. See + * https://github.com/citusdata/citus/issues/3947 for what can happen. + */ + Query *queryCopy = copyObject(selectRte->subquery); bool repartition = distributedPlan->insertSelectMethod == INSERT_SELECT_REPARTITION; @@ -236,7 +242,7 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, IntoClause *into = NULL; ParamListInfo params = NULL; char *queryString = NULL; - ExplainOneQuery(query, 0, into, es, queryString, params, NULL); + ExplainOneQuery(queryCopy, 0, into, es, queryString, params, NULL); ExplainCloseGroup("Select Query", "Select Query", false, es); } diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index cd76772c9..34bf0b457 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -660,6 +660,96 @@ SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY 4 | 6 | 1 (5 rows) +DEALLOCATE insert_plan; +-- +-- Prepared router INSERT/SELECT. We currently use pull to coordinator when the +-- distributed query has a single task. +-- +TRUNCATE target_table; +PREPARE insert_plan(int) AS +INSERT INTO target_table + SELECT a, max(b) FROM source_table + WHERE a=$1 GROUP BY a; +SET client_min_messages TO DEBUG1; +EXECUTE insert_plan(0); +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +EXECUTE insert_plan(0); +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +EXECUTE insert_plan(0); +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +EXECUTE insert_plan(0); +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +EXECUTE insert_plan(0); +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +EXECUTE insert_plan(0); +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +EXECUTE insert_plan(0); +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator +RESET client_min_messages; +SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; + a | count | distinct_values +--------------------------------------------------------------------- + 0 | 7 | 1 +(1 row) + +DEALLOCATE insert_plan; +-- +-- Prepared INSERT/SELECT with no parameters. +-- +TRUNCATE target_table; +PREPARE insert_plan AS +INSERT INTO target_table + SELECT a, max(b) FROM source_table + WHERE a BETWEEN 1 AND 2 GROUP BY a; +EXPLAIN EXECUTE insert_plan; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> GroupAggregate (cost=44.09..44.28 rows=11 width=8) + Group Key: a + -> Sort (cost=44.09..44.12 rows=11 width=8) + Sort Key: a + -> Seq Scan on source_table_4213606 source_table (cost=0.00..43.90 rows=11 width=8) + Filter: ((a >= 1) AND (a <= 2)) +(13 rows) + +SET client_min_messages TO DEBUG1; +EXECUTE insert_plan; +DEBUG: performing repartitioned INSERT ... SELECT +EXECUTE insert_plan; +DEBUG: performing repartitioned INSERT ... SELECT +EXECUTE insert_plan; +DEBUG: performing repartitioned INSERT ... SELECT +EXECUTE insert_plan; +DEBUG: performing repartitioned INSERT ... SELECT +EXECUTE insert_plan; +DEBUG: performing repartitioned INSERT ... SELECT +EXECUTE insert_plan; +DEBUG: performing repartitioned INSERT ... SELECT +EXECUTE insert_plan; +DEBUG: performing repartitioned INSERT ... SELECT +RESET client_min_messages; +SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; + a | count | distinct_values +--------------------------------------------------------------------- + 1 | 7 | 1 + 2 | 7 | 1 +(2 rows) + +DEALLOCATE insert_plan; -- -- INSERT/SELECT in CTE -- diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index c3bfc1f4c..0d66e2d77 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -317,6 +317,60 @@ RESET client_min_messages; SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; +DEALLOCATE insert_plan; + +-- +-- Prepared router INSERT/SELECT. We currently use pull to coordinator when the +-- distributed query has a single task. +-- +TRUNCATE target_table; + +PREPARE insert_plan(int) AS +INSERT INTO target_table + SELECT a, max(b) FROM source_table + WHERE a=$1 GROUP BY a; + +SET client_min_messages TO DEBUG1; +EXECUTE insert_plan(0); +EXECUTE insert_plan(0); +EXECUTE insert_plan(0); +EXECUTE insert_plan(0); +EXECUTE insert_plan(0); +EXECUTE insert_plan(0); +EXECUTE insert_plan(0); +RESET client_min_messages; + +SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; + +DEALLOCATE insert_plan; + +-- +-- Prepared INSERT/SELECT with no parameters. +-- + +TRUNCATE target_table; + +PREPARE insert_plan AS +INSERT INTO target_table + SELECT a, max(b) FROM source_table + WHERE a BETWEEN 1 AND 2 GROUP BY a; + +EXPLAIN EXECUTE insert_plan; + +SET client_min_messages TO DEBUG1; +EXECUTE insert_plan; +EXECUTE insert_plan; +EXECUTE insert_plan; +EXECUTE insert_plan; +EXECUTE insert_plan; +EXECUTE insert_plan; +EXECUTE insert_plan; +RESET client_min_messages; + +SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; + +DEALLOCATE insert_plan; + -- -- INSERT/SELECT in CTE -- From d34c21890fe86776d9b79bd2ce6ca789306823f7 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Wed, 24 Jun 2020 23:50:21 -0700 Subject: [PATCH 3/4] Rename CoordinatorInsertSelect... to NonPushableInsertSelect --- .../distributed/executor/citus_custom_scan.c | 26 +++++++++---------- .../executor/insert_select_executor.c | 19 +++++++------- .../executor/multi_server_executor.c | 2 +- .../distributed/executor/query_stats.c | 2 +- .../distributed/planner/distributed_planner.c | 4 +-- .../distributed/planner/multi_explain.c | 7 ++--- src/include/distributed/citus_custom_scan.h | 2 +- .../distributed/insert_select_executor.h | 2 +- .../distributed/insert_select_planner.h | 2 +- .../distributed/multi_server_executor.h | 2 +- 10 files changed, 34 insertions(+), 34 deletions(-) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 537f1a6bd..81d6cef87 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -49,7 +49,7 @@ /* functions for creating custom scan nodes */ static Node * AdaptiveExecutorCreateScan(CustomScan *scan); static Node * TaskTrackerCreateScan(CustomScan *scan); -static Node * CoordinatorInsertSelectCreateScan(CustomScan *scan); +static Node * NonPushableInsertSelectCreateScan(CustomScan *scan); static Node * DelayedErrorCreateScan(CustomScan *scan); /* functions that are common to different scans */ @@ -77,9 +77,9 @@ CustomScanMethods TaskTrackerCustomScanMethods = { TaskTrackerCreateScan }; -CustomScanMethods CoordinatorInsertSelectCustomScanMethods = { +CustomScanMethods NonPushableInsertSelectCustomScanMethods = { "Citus INSERT ... SELECT", - CoordinatorInsertSelectCreateScan + NonPushableInsertSelectCreateScan }; CustomScanMethods DelayedErrorCustomScanMethods = { @@ -109,13 +109,13 @@ static CustomExecMethods TaskTrackerCustomExecMethods = { .ExplainCustomScan = CitusExplainScan }; -static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = { - .CustomName = "CoordinatorInsertSelectScan", +static CustomExecMethods NonPushableInsertSelectCustomExecMethods = { + .CustomName = "NonPushableInsertSelectScan", .BeginCustomScan = CitusBeginScan, - .ExecCustomScan = CoordinatorInsertSelectExecScan, + .ExecCustomScan = NonPushableInsertSelectExecScan, .EndCustomScan = CitusEndScan, .ReScanCustomScan = CitusReScan, - .ExplainCustomScan = CoordinatorInsertSelectExplainScan + .ExplainCustomScan = NonPushableInsertSelectExplainScan }; @@ -133,7 +133,7 @@ IsCitusCustomState(PlanState *planState) CustomScanState *css = castNode(CustomScanState, planState); if (css->methods == &AdaptiveExecutorCustomExecMethods || css->methods == &TaskTrackerCustomExecMethods || - css->methods == &CoordinatorInsertSelectCustomExecMethods) + css->methods == &NonPushableInsertSelectCustomExecMethods) { return true; } @@ -150,7 +150,7 @@ RegisterCitusCustomScanMethods(void) { RegisterCustomScanMethods(&AdaptiveExecutorCustomScanMethods); RegisterCustomScanMethods(&TaskTrackerCustomScanMethods); - RegisterCustomScanMethods(&CoordinatorInsertSelectCustomScanMethods); + RegisterCustomScanMethods(&NonPushableInsertSelectCustomScanMethods); RegisterCustomScanMethods(&DelayedErrorCustomScanMethods); } @@ -598,20 +598,20 @@ TaskTrackerCreateScan(CustomScan *scan) /* - * CoordinatorInsertSelectCrateScan creates the scan state for executing + * NonPushableInsertSelectCrateScan creates the scan state for executing * INSERT..SELECT into a distributed table via the coordinator. */ static Node * -CoordinatorInsertSelectCreateScan(CustomScan *scan) +NonPushableInsertSelectCreateScan(CustomScan *scan) { CitusScanState *scanState = palloc0(sizeof(CitusScanState)); - scanState->executorType = MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; + scanState->executorType = MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT; scanState->customScanState.ss.ps.type = T_CustomScanState; scanState->distributedPlan = GetDistributedPlan(scan); scanState->customScanState.methods = - &CoordinatorInsertSelectCustomExecMethods; + &NonPushableInsertSelectCustomExecMethods; return (Node *) scanState; } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 276d98c92..e0e0d1d19 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -57,7 +57,7 @@ bool EnableRepartitionedInsertSelect = true; static int insertSelectExecutorLevel = 0; -static TupleTableSlot * CoordinatorInsertSelectExecScanInternal(CustomScanState *node); +static TupleTableSlot * NonPushableInsertSelectExecScanInternal(CustomScanState *node); static Query * WrapSubquery(Query *subquery); static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, char *resultIdPrefix); @@ -85,19 +85,19 @@ static void RelableTargetEntryList(List *selectTargetList, List *insertTargetLis /* - * CoordinatorInsertSelectExecScan is a wrapper around - * CoordinatorInsertSelectExecScanInternal which also properly increments + * NonPushableInsertSelectExecScan is a wrapper around + * NonPushableInsertSelectExecScanInternal which also properly increments * or decrements insertSelectExecutorLevel. */ TupleTableSlot * -CoordinatorInsertSelectExecScan(CustomScanState *node) +NonPushableInsertSelectExecScan(CustomScanState *node) { TupleTableSlot *result = NULL; insertSelectExecutorLevel++; PG_TRY(); { - result = CoordinatorInsertSelectExecScanInternal(node); + result = NonPushableInsertSelectExecScanInternal(node); } PG_CATCH(); { @@ -112,13 +112,12 @@ CoordinatorInsertSelectExecScan(CustomScanState *node) /* - * CoordinatorInsertSelectExecScan executes an INSERT INTO distributed_table - * SELECT .. query by setting up a DestReceiver that copies tuples into the - * distributed table and then executing the SELECT query using that DestReceiver - * as the tuple destination. + * NonPushableInsertSelectExecScan executes an INSERT INTO distributed_table + * SELECT .. query either by routing via coordinator or by repartitioning + * task results and moving data directly between nodes. */ static TupleTableSlot * -CoordinatorInsertSelectExecScanInternal(CustomScanState *node) +NonPushableInsertSelectExecScanInternal(CustomScanState *node) { CitusScanState *scanState = (CitusScanState *) node; diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index d7b003677..fec24d191 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -88,7 +88,7 @@ JobExecutorType(DistributedPlan *distributedPlan) * the executor already knows how to handle adaptive * executor when necessary. */ - return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; + return MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT; } Assert(distributedPlan->modLevel == ROW_MODIFY_READONLY); diff --git a/src/backend/distributed/executor/query_stats.c b/src/backend/distributed/executor/query_stats.c index 7f9493c72..04e6002c2 100644 --- a/src/backend/distributed/executor/query_stats.c +++ b/src/backend/distributed/executor/query_stats.c @@ -99,7 +99,7 @@ CitusExecutorName(MultiExecutorType executorType) return "task-tracker"; } - case MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT: + case MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT: { return "insert-select"; } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 072624025..25a4b95f4 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -1295,9 +1295,9 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) break; } - case MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT: + case MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT: { - customScan->methods = &CoordinatorInsertSelectCustomScanMethods; + customScan->methods = &NonPushableInsertSelectCustomScanMethods; break; } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index cf4f089f9..9b20f154f 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -198,12 +198,13 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es /* - * CoordinatorInsertSelectExplainScan is a custom scan explain callback function + * NonPushableInsertSelectExplainScan is a custom scan explain callback function * which is used to print explain information of a Citus plan for an INSERT INTO - * distributed_table SELECT ... query that is evaluated on the coordinator. + * distributed_table SELECT ... query that is evaluated on the coordinator or + * uses repartitioning. */ void -CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, +NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es) { CitusScanState *scanState = (CitusScanState *) node; diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index 497e5db7d..dcddb27a3 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -32,7 +32,7 @@ typedef struct CitusScanState /* custom scan methods for all executors */ extern CustomScanMethods AdaptiveExecutorCustomScanMethods; extern CustomScanMethods TaskTrackerCustomScanMethods; -extern CustomScanMethods CoordinatorInsertSelectCustomScanMethods; +extern CustomScanMethods NonPushableInsertSelectCustomScanMethods; extern CustomScanMethods DelayedErrorCustomScanMethods; diff --git a/src/include/distributed/insert_select_executor.h b/src/include/distributed/insert_select_executor.h index e94c7fe9c..b8151712d 100644 --- a/src/include/distributed/insert_select_executor.h +++ b/src/include/distributed/insert_select_executor.h @@ -18,7 +18,7 @@ extern bool EnableRepartitionedInsertSelect; -extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node); +extern TupleTableSlot * NonPushableInsertSelectExecScan(CustomScanState *node); extern bool ExecutingInsertSelect(void); extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery); extern bool IsSupportedRedistributionTarget(Oid targetRelationId); diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h index 80a61b9aa..bdbcbd16b 100644 --- a/src/include/distributed/insert_select_planner.h +++ b/src/include/distributed/insert_select_planner.h @@ -29,7 +29,7 @@ extern bool InsertSelectIntoLocalTable(Query *query); extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, RangeTblEntry *subqueryRte); -extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, +extern void NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es); extern DistributedPlan * CreateInsertSelectPlan(uint64 planId, Query *originalQuery, PlannerRestrictionContext * diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index d49f09d0c..077adb4d3 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -88,7 +88,7 @@ typedef enum MULTI_EXECUTOR_INVALID_FIRST = 0, MULTI_EXECUTOR_ADAPTIVE = 1, MULTI_EXECUTOR_TASK_TRACKER = 2, - MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT = 3 + MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT = 3 } MultiExecutorType; From 4ed59d2db36449e360b42a01e8b1d3fe1c03b346 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 25 Jun 2020 10:26:26 -0700 Subject: [PATCH 4/4] Move more from insert_select_executor to insert_select_planner --- .../executor/insert_select_executor.c | 292 +--------------- .../executor/multi_server_executor.c | 2 +- .../planner/function_call_delegation.c | 8 +- .../planner/insert_select_planner.c | 324 ++++++++++++++++-- .../distributed/utils/citus_copyfuncs.c | 1 + .../distributed/insert_select_executor.h | 1 - .../distributed/insert_select_planner.h | 1 + .../distributed/multi_physical_planner.h | 4 +- .../expected/insert_select_repartition.out | 2 +- src/test/regress/expected/multi_explain.out | 36 +- .../expected/multi_insert_conflict.out | 0 .../regress/expected/multi_insert_select.out | 18 +- .../expected/multi_insert_select_conflict.out | 8 +- ...lti_insert_select_non_pushable_queries.out | 14 +- 14 files changed, 354 insertions(+), 357 deletions(-) create mode 100644 src/test/regress/expected/multi_insert_conflict.out diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index e0e0d1d19..ac47f0109 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -53,11 +53,7 @@ /* Config variables managed via guc.c */ bool EnableRepartitionedInsertSelect = true; -/* depth of current insert/select executor. */ -static int insertSelectExecutorLevel = 0; - -static TupleTableSlot * NonPushableInsertSelectExecScanInternal(CustomScanState *node); static Query * WrapSubquery(Query *subquery); static List * TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, char *resultIdPrefix); @@ -71,44 +67,12 @@ static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, static List * BuildColumnNameListFromTargetList(Oid targetRelationId, List *insertTargetList); static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList); -static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, - Oid targetRelationId); static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery, CitusTableCacheEntry *targetRelation, List **redistributedResults, bool useBinaryFormat); static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn); -static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, - int targetTypeMod); static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries); -static void RelableTargetEntryList(List *selectTargetList, List *insertTargetList); - - -/* - * NonPushableInsertSelectExecScan is a wrapper around - * NonPushableInsertSelectExecScanInternal which also properly increments - * or decrements insertSelectExecutorLevel. - */ -TupleTableSlot * -NonPushableInsertSelectExecScan(CustomScanState *node) -{ - TupleTableSlot *result = NULL; - insertSelectExecutorLevel++; - - PG_TRY(); - { - result = NonPushableInsertSelectExecScanInternal(node); - } - PG_CATCH(); - { - insertSelectExecutorLevel--; - PG_RE_THROW(); - } - PG_END_TRY(); - - insertSelectExecutorLevel--; - return result; -} /* @@ -116,15 +80,14 @@ NonPushableInsertSelectExecScan(CustomScanState *node) * SELECT .. query either by routing via coordinator or by repartitioning * task results and moving data directly between nodes. */ -static TupleTableSlot * -NonPushableInsertSelectExecScanInternal(CustomScanState *node) +TupleTableSlot * +NonPushableInsertSelectExecScan(CustomScanState *node) { CitusScanState *scanState = (CitusScanState *) node; if (!scanState->finishedRemoteScan) { EState *executorState = ScanStateGetExecutorState(scanState); - ParamListInfo paramListInfo = executorState->es_param_list_info; DistributedPlan *distributedPlan = scanState->distributedPlan; Query *insertSelectQuery = copyObject(distributedPlan->insertSelectQuery); List *insertTargetList = insertSelectQuery->targetList; @@ -136,35 +99,7 @@ NonPushableInsertSelectExecScanInternal(CustomScanState *node) HTAB *shardStateHash = NULL; Query *selectQuery = selectRte->subquery; - - /* - * Cast types of insert target list and select projection list to - * match the column types of the target relation. - */ - selectQuery->targetList = - AddInsertSelectCasts(insertSelectQuery->targetList, - selectQuery->targetList, - targetRelationId); - - /* - * Later we might need to call WrapTaskListForProjection(), which requires - * that select target list has unique names, otherwise the outer query - * cannot select columns unambiguously. So we relabel select columns to - * match target columns. - */ - RelableTargetEntryList(selectQuery->targetList, insertTargetList); - - /* - * Make a copy of the query, since pg_plan_query may scribble on it and we - * want it to be replanned every time if it is stored in a prepared - * statement. - */ - selectQuery = copyObject(selectQuery); - - /* plan the subquery, this may be another distributed query */ - int cursorOptions = CURSOR_OPT_PARALLEL_OK; - PlannedStmt *selectPlan = pg_plan_query(selectQuery, cursorOptions, - paramListInfo); + PlannedStmt *selectPlan = copyObject(distributedPlan->selectPlanForInsertSelect); /* * If we are dealing with partitioned table, we also need to lock its @@ -671,207 +606,6 @@ PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList) } -/* ExecutingInsertSelect returns true if we are executing an INSERT ...SELECT query */ -bool -ExecutingInsertSelect(void) -{ - return insertSelectExecutorLevel > 0; -} - - -/* - * AddInsertSelectCasts makes sure that the types in columns in the given - * target lists have the same type as the columns of the given relation. - * It might add casts to ensure that. - * - * It returns the updated selectTargetList. - */ -static List * -AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, - Oid targetRelationId) -{ - ListCell *insertEntryCell = NULL; - ListCell *selectEntryCell = NULL; - List *projectedEntries = NIL; - List *nonProjectedEntries = NIL; - - /* - * ReorderInsertSelectTargetLists() makes sure that first few columns of - * the SELECT query match the insert targets. It might contain additional - * items for GROUP BY, etc. - */ - Assert(list_length(insertTargetList) <= list_length(selectTargetList)); - - Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock); - TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation); - - int targetEntryIndex = 0; - forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList) - { - TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell); - TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell); - Var *insertColumn = (Var *) insertEntry->expr; - Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor, - insertEntry->resno - 1); - - Oid sourceType = insertColumn->vartype; - Oid targetType = attr->atttypid; - if (sourceType != targetType) - { - insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType, - attr->attcollation, attr->atttypmod); - - /* - * We cannot modify the selectEntry in-place, because ORDER BY or - * GROUP BY clauses might be pointing to it with comparison types - * of the source type. So instead we keep the original one as a - * non-projected entry, so GROUP BY and ORDER BY are happy, and - * create a duplicated projected entry with the coerced expression. - */ - TargetEntry *coercedEntry = copyObject(selectEntry); - coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType, - targetType, attr->attcollation, - attr->atttypmod); - coercedEntry->ressortgroupref = 0; - - /* - * The only requirement is that users don't use this name in ORDER BY - * or GROUP BY, and it should be unique across the same query. - */ - StringInfo resnameString = makeStringInfo(); - appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex); - coercedEntry->resname = resnameString->data; - - projectedEntries = lappend(projectedEntries, coercedEntry); - - if (selectEntry->ressortgroupref != 0) - { - selectEntry->resjunk = true; - - /* - * This entry might still end up in the SELECT output list, so - * rename it to avoid ambiguity. - * - * See https://github.com/citusdata/citus/pull/3470. - */ - resnameString = makeStringInfo(); - appendStringInfo(resnameString, "discarded_target_item_%d", - targetEntryIndex); - selectEntry->resname = resnameString->data; - - nonProjectedEntries = lappend(nonProjectedEntries, selectEntry); - } - } - else - { - projectedEntries = lappend(projectedEntries, selectEntry); - } - - targetEntryIndex++; - } - - for (int entryIndex = list_length(insertTargetList); - entryIndex < list_length(selectTargetList); - entryIndex++) - { - nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList, - entryIndex)); - } - - /* selectEntry->resno must be the ordinal number of the entry */ - selectTargetList = list_concat(projectedEntries, nonProjectedEntries); - int entryResNo = 1; - TargetEntry *selectTargetEntry = NULL; - foreach_ptr(selectTargetEntry, selectTargetList) - { - selectTargetEntry->resno = entryResNo++; - } - - heap_close(distributedRelation, NoLock); - - return selectTargetList; -} - - -/* - * CastExpr returns an expression which casts the given expr from sourceType to - * the given targetType. - */ -static Expr * -CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, - int targetTypeMod) -{ - Oid coercionFuncId = InvalidOid; - CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType, - COERCION_EXPLICIT, - &coercionFuncId); - - if (coercionType == COERCION_PATH_FUNC) - { - FuncExpr *coerceExpr = makeNode(FuncExpr); - coerceExpr->funcid = coercionFuncId; - coerceExpr->args = list_make1(copyObject(expr)); - coerceExpr->funccollid = targetCollation; - coerceExpr->funcresulttype = targetType; - - return (Expr *) coerceExpr; - } - else if (coercionType == COERCION_PATH_RELABELTYPE) - { - RelabelType *coerceExpr = makeNode(RelabelType); - coerceExpr->arg = copyObject(expr); - coerceExpr->resulttype = targetType; - coerceExpr->resulttypmod = targetTypeMod; - coerceExpr->resultcollid = targetCollation; - coerceExpr->relabelformat = COERCE_IMPLICIT_CAST; - coerceExpr->location = -1; - - return (Expr *) coerceExpr; - } - else if (coercionType == COERCION_PATH_ARRAYCOERCE) - { - Oid sourceBaseType = get_base_element_type(sourceType); - Oid targetBaseType = get_base_element_type(targetType); - - CaseTestExpr *elemExpr = makeNode(CaseTestExpr); - elemExpr->collation = targetCollation; - elemExpr->typeId = sourceBaseType; - elemExpr->typeMod = -1; - - Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType, - targetBaseType, targetCollation, - targetTypeMod); - - ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr); - coerceExpr->arg = copyObject(expr); - coerceExpr->elemexpr = elemCastExpr; - coerceExpr->resultcollid = targetCollation; - coerceExpr->resulttype = targetType; - coerceExpr->resulttypmod = targetTypeMod; - coerceExpr->location = -1; - coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; - - return (Expr *) coerceExpr; - } - else if (coercionType == COERCION_PATH_COERCEVIAIO) - { - CoerceViaIO *coerceExpr = makeNode(CoerceViaIO); - coerceExpr->arg = (Expr *) copyObject(expr); - coerceExpr->resulttype = targetType; - coerceExpr->resultcollid = targetCollation; - coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; - coerceExpr->location = -1; - - return (Expr *) coerceExpr; - } - else - { - ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d", - sourceType, targetType))); - } -} - - /* * IsSupportedRedistributionTarget determines whether re-partitioning into the * given target relation is supported. @@ -1102,23 +836,3 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries) SetTaskQueryString(task, wrappedQuery->data); } } - - -/* - * RelableTargetEntryList relabels select target list to have matching names with - * insert target list. - */ -static void -RelableTargetEntryList(List *selectTargetList, List *insertTargetList) -{ - ListCell *selectTargetCell = NULL; - ListCell *insertTargetCell = NULL; - - forboth(selectTargetCell, selectTargetList, insertTargetCell, insertTargetList) - { - TargetEntry *selectTargetEntry = lfirst(selectTargetCell); - TargetEntry *insertTargetEntry = lfirst(insertTargetCell); - - selectTargetEntry->resname = insertTargetEntry->resname; - } -} diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index fec24d191..a13c39ca3 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -84,7 +84,7 @@ JobExecutorType(DistributedPlan *distributedPlan) { /* * Even if adaptiveExecutorEnabled, we go through - * MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT because + * MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT because * the executor already knows how to handle adaptive * executor when necessary. */ diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 6c5601ead..caa4034b9 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -26,7 +26,6 @@ #include "distributed/deparse_shard_query.h" #include "distributed/function_call_delegation.h" #include "distributed/insert_select_planner.h" -#include "distributed/insert_select_executor.h" #include "distributed/metadata_utility.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" @@ -230,11 +229,10 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) } /* - * This can be called while executing INSERT ... SELECT func(). insert_select_executor - * doesn't get the planned subquery and gets the actual struct Query, so the planning - * for these kinds of queries happens at the execution time. + * Cannot delegate functions for INSERT ... SELECT func(), since they require + * coordinated transactions. */ - if (ExecutingInsertSelect()) + if (PlanningInsertSelect()) { ereport(DEBUG1, (errmsg("not pushing down function calls in INSERT ... SELECT"))); return NULL; diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index c7b9f720d..22f882aeb 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -49,10 +49,16 @@ #include "parser/parse_coerce.h" #include "parser/parse_relation.h" #include "tcop/tcopprot.h" +#include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/rel.h" +static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId, + Query *originalQuery, + PlannerRestrictionContext * + plannerRestrictionContext, + ParamListInfo boundParams); static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); @@ -81,8 +87,15 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams); static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery); -static InsertSelectMethod GetInsertSelectMethod(Query *selectQuery, Oid targetRelationId, - ParamListInfo boundParams); +static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList); +static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, + Oid targetRelationId); +static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, + int targetTypeMod); + + +/* depth of current insert/select planner. */ +static int insertSelectPlannerLevel = 0; /* @@ -182,15 +195,45 @@ CheckInsertSelectQuery(Query *query) /* - * CreateInsertSelectPlan tries to create a distributed plan for an - * INSERT INTO distributed_table SELECT ... query by push down the - * command to the workers and if that is not possible it creates a - * plan for evaluating the SELECT on the coordinator. + * CoordinatorInsertSelectExecScan is a wrapper around + * CoordinatorInsertSelectExecScanInternal which also properly increments + * or decrements insertSelectExecutorLevel. */ DistributedPlan * CreateInsertSelectPlan(uint64 planId, Query *originalQuery, PlannerRestrictionContext *plannerRestrictionContext, ParamListInfo boundParams) +{ + DistributedPlan *result = NULL; + insertSelectPlannerLevel++; + + PG_TRY(); + { + result = CreateInsertSelectPlanInternal(planId, originalQuery, + plannerRestrictionContext, boundParams); + } + PG_CATCH(); + { + insertSelectPlannerLevel--; + PG_RE_THROW(); + } + PG_END_TRY(); + + insertSelectPlannerLevel--; + return result; +} + + +/* + * CreateInsertSelectPlan tries to create a distributed plan for an + * INSERT INTO distributed_table SELECT ... query by push down the + * command to the workers and if that is not possible it creates a + * plan for evaluating the SELECT on the coordinator. + */ +static DistributedPlan * +CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext, + ParamListInfo boundParams) { DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(originalQuery); if (deferredError != NULL) @@ -1319,10 +1362,43 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou selectRte->subquery = selectQuery; ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); + /* + * Cast types of insert target list and select projection list to + * match the column types of the target relation. + */ + selectQuery->targetList = + AddInsertSelectCasts(insertSelectQuery->targetList, + selectQuery->targetList, + targetRelationId); + + /* + * Later we might need to call WrapTaskListForProjection(), which requires + * that select target list has unique names, otherwise the outer query + * cannot select columns unambiguously. So we relabel select columns to + * match target columns. + */ + List *insertTargetList = insertSelectQuery->targetList; + RelabelTargetEntryList(selectQuery->targetList, insertTargetList); + + /* + * Make a copy of the select query, since following code scribbles it + * but we need to keep the original for EXPLAIN. + */ + Query *selectQueryCopy = copyObject(selectQuery); + + /* plan the subquery, this may be another distributed query */ + int cursorOptions = CURSOR_OPT_PARALLEL_OK; + PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, cursorOptions, + boundParams); + + bool repartitioned = IsRedistributablePlan(selectPlan->planTree) && + IsSupportedRedistributionTarget(targetRelationId); + distributedPlan->insertSelectQuery = insertSelectQuery; - distributedPlan->insertSelectMethod = GetInsertSelectMethod(selectQuery, - targetRelationId, - boundParams); + distributedPlan->selectPlanForInsertSelect = selectPlan; + distributedPlan->insertSelectMethod = repartitioned ? + INSERT_SELECT_REPARTITION : + INSERT_SELECT_VIA_COORDINATOR; distributedPlan->expectResults = insertSelectQuery->returningList != NIL; distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId); distributedPlan->targetRelationId = targetRelationId; @@ -1376,33 +1452,221 @@ InsertSelectResultIdPrefix(uint64 planId) /* - * GetInsertSelectMethod returns the preferred INSERT INTO ... SELECT method - * based on its select query. + * RelabelTargetEntryList relabels select target list to have matching names with + * insert target list. */ -static InsertSelectMethod -GetInsertSelectMethod(Query *selectQuery, Oid targetRelationId, ParamListInfo boundParams) +static void +RelabelTargetEntryList(List *selectTargetList, List *insertTargetList) { - Query *selectQueryCopy = copyObject(selectQuery); + ListCell *selectTargetCell = NULL; + ListCell *insertTargetCell = NULL; + + forboth(selectTargetCell, selectTargetList, insertTargetCell, insertTargetList) + { + TargetEntry *selectTargetEntry = lfirst(selectTargetCell); + TargetEntry *insertTargetEntry = lfirst(insertTargetCell); + + selectTargetEntry->resname = insertTargetEntry->resname; + } +} + + +/* + * AddInsertSelectCasts makes sure that the types in columns in the given + * target lists have the same type as the columns of the given relation. + * It might add casts to ensure that. + * + * It returns the updated selectTargetList. + */ +static List * +AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, + Oid targetRelationId) +{ + ListCell *insertEntryCell = NULL; + ListCell *selectEntryCell = NULL; + List *projectedEntries = NIL; + List *nonProjectedEntries = NIL; /* - * Query will be replanned in insert_select_executor to plan correctly - * for prepared statements. So turn off logging here to avoid repeated - * log messages. We use SET LOCAL here so the change is reverted on ERROR. + * ReorderInsertSelectTargetLists() makes sure that first few columns of + * the SELECT query match the insert targets. It might contain additional + * items for GROUP BY, etc. */ - int savedClientMinMessages = client_min_messages; - set_config_option("client_min_messages", "ERROR", - PGC_USERSET, PGC_S_SESSION, - GUC_ACTION_LOCAL, true, 0, false); + Assert(list_length(insertTargetList) <= list_length(selectTargetList)); - int cursorOptions = CURSOR_OPT_PARALLEL_OK; - PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, cursorOptions, - boundParams); + Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock); + TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation); - client_min_messages = savedClientMinMessages; + int targetEntryIndex = 0; + forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList) + { + TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell); + TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell); + Var *insertColumn = (Var *) insertEntry->expr; + Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor, + insertEntry->resno - 1); - bool repartitioned = IsRedistributablePlan(selectPlan->planTree) && - IsSupportedRedistributionTarget(targetRelationId); - return repartitioned ? - INSERT_SELECT_REPARTITION : - INSERT_SELECT_VIA_COORDINATOR; + Oid sourceType = insertColumn->vartype; + Oid targetType = attr->atttypid; + if (sourceType != targetType) + { + insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType, + attr->attcollation, attr->atttypmod); + + /* + * We cannot modify the selectEntry in-place, because ORDER BY or + * GROUP BY clauses might be pointing to it with comparison types + * of the source type. So instead we keep the original one as a + * non-projected entry, so GROUP BY and ORDER BY are happy, and + * create a duplicated projected entry with the coerced expression. + */ + TargetEntry *coercedEntry = copyObject(selectEntry); + coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType, + targetType, attr->attcollation, + attr->atttypmod); + coercedEntry->ressortgroupref = 0; + + /* + * The only requirement is that users don't use this name in ORDER BY + * or GROUP BY, and it should be unique across the same query. + */ + StringInfo resnameString = makeStringInfo(); + appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex); + coercedEntry->resname = resnameString->data; + + projectedEntries = lappend(projectedEntries, coercedEntry); + + if (selectEntry->ressortgroupref != 0) + { + selectEntry->resjunk = true; + + /* + * This entry might still end up in the SELECT output list, so + * rename it to avoid ambiguity. + * + * See https://github.com/citusdata/citus/pull/3470. + */ + resnameString = makeStringInfo(); + appendStringInfo(resnameString, "discarded_target_item_%d", + targetEntryIndex); + selectEntry->resname = resnameString->data; + + nonProjectedEntries = lappend(nonProjectedEntries, selectEntry); + } + } + else + { + projectedEntries = lappend(projectedEntries, selectEntry); + } + + targetEntryIndex++; + } + + for (int entryIndex = list_length(insertTargetList); + entryIndex < list_length(selectTargetList); + entryIndex++) + { + nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList, + entryIndex)); + } + + /* selectEntry->resno must be the ordinal number of the entry */ + selectTargetList = list_concat(projectedEntries, nonProjectedEntries); + int entryResNo = 1; + TargetEntry *selectTargetEntry = NULL; + foreach_ptr(selectTargetEntry, selectTargetList) + { + selectTargetEntry->resno = entryResNo++; + } + + heap_close(distributedRelation, NoLock); + + return selectTargetList; +} + + +/* + * CastExpr returns an expression which casts the given expr from sourceType to + * the given targetType. + */ +static Expr * +CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, + int targetTypeMod) +{ + Oid coercionFuncId = InvalidOid; + CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType, + COERCION_EXPLICIT, + &coercionFuncId); + + if (coercionType == COERCION_PATH_FUNC) + { + FuncExpr *coerceExpr = makeNode(FuncExpr); + coerceExpr->funcid = coercionFuncId; + coerceExpr->args = list_make1(copyObject(expr)); + coerceExpr->funccollid = targetCollation; + coerceExpr->funcresulttype = targetType; + + return (Expr *) coerceExpr; + } + else if (coercionType == COERCION_PATH_RELABELTYPE) + { + RelabelType *coerceExpr = makeNode(RelabelType); + coerceExpr->arg = copyObject(expr); + coerceExpr->resulttype = targetType; + coerceExpr->resulttypmod = targetTypeMod; + coerceExpr->resultcollid = targetCollation; + coerceExpr->relabelformat = COERCE_IMPLICIT_CAST; + coerceExpr->location = -1; + + return (Expr *) coerceExpr; + } + else if (coercionType == COERCION_PATH_ARRAYCOERCE) + { + Oid sourceBaseType = get_base_element_type(sourceType); + Oid targetBaseType = get_base_element_type(targetType); + + CaseTestExpr *elemExpr = makeNode(CaseTestExpr); + elemExpr->collation = targetCollation; + elemExpr->typeId = sourceBaseType; + elemExpr->typeMod = -1; + + Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType, + targetBaseType, targetCollation, + targetTypeMod); + + ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr); + coerceExpr->arg = copyObject(expr); + coerceExpr->elemexpr = elemCastExpr; + coerceExpr->resultcollid = targetCollation; + coerceExpr->resulttype = targetType; + coerceExpr->resulttypmod = targetTypeMod; + coerceExpr->location = -1; + coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; + + return (Expr *) coerceExpr; + } + else if (coercionType == COERCION_PATH_COERCEVIAIO) + { + CoerceViaIO *coerceExpr = makeNode(CoerceViaIO); + coerceExpr->arg = (Expr *) copyObject(expr); + coerceExpr->resulttype = targetType; + coerceExpr->resultcollid = targetCollation; + coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; + coerceExpr->location = -1; + + return (Expr *) coerceExpr; + } + else + { + ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d", + sourceType, targetType))); + } +} + + +/* PlanningInsertSelect returns true if we are planning an INSERT ...SELECT query */ +bool +PlanningInsertSelect(void) +{ + return insertSelectPlannerLevel > 0; } diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 0e95f3587..d3f64e792 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -129,6 +129,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) COPY_NODE_FIELD(relationIdList); COPY_SCALAR_FIELD(targetRelationId); COPY_NODE_FIELD(insertSelectQuery); + COPY_NODE_FIELD(selectPlanForInsertSelect); COPY_SCALAR_FIELD(insertSelectMethod); COPY_STRING_FIELD(intermediateResultIdPrefix); diff --git a/src/include/distributed/insert_select_executor.h b/src/include/distributed/insert_select_executor.h index b8151712d..bcfe29bfb 100644 --- a/src/include/distributed/insert_select_executor.h +++ b/src/include/distributed/insert_select_executor.h @@ -19,7 +19,6 @@ extern bool EnableRepartitionedInsertSelect; extern TupleTableSlot * NonPushableInsertSelectExecScan(CustomScanState *node); -extern bool ExecutingInsertSelect(void); extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery); extern bool IsSupportedRedistributionTarget(Oid targetRelationId); extern bool IsRedistributablePlan(Plan *selectPlan); diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h index bdbcbd16b..74b8a0708 100644 --- a/src/include/distributed/insert_select_planner.h +++ b/src/include/distributed/insert_select_planner.h @@ -43,6 +43,7 @@ extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId, PlannerRestrictionContext * plannerRestrictionContext); extern char * InsertSelectResultIdPrefix(uint64 planId); +extern bool PlanningInsertSelect(void); #endif /* INSERT_SELECT_PLANNER_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 87381f531..2d6d9ef8f 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -432,8 +432,10 @@ typedef struct DistributedPlan /* target relation of a modification */ Oid targetRelationId; - /* INSERT .. SELECT via the coordinator or repartition */ + /* + * INSERT .. SELECT via the coordinator or repartition */ Query *insertSelectQuery; + PlannedStmt *selectPlanForInsertSelect; InsertSelectMethod insertSelectMethod; /* diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 34bf0b457..5c4af0bc5 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -763,10 +763,10 @@ DEBUG: INSERT target table and the source relation of the SELECT partition colu DEBUG: only SELECT, UPDATE, or DELETE common table expressions may be router planned DEBUG: generating subplan XXX_1 for CTE r: INSERT INTO insert_select_repartition.target_table (a, b) SELECT a, b FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, max AS b FROM (SELECT source_table.a, max(source_table.b) AS max FROM (insert_select_repartition.source_table JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) r USING (a, b)) GROUP BY source_table.a) citus_insert_select_subquery DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT -DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'a' DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index ad1a18401..1612446c7 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -1357,31 +1357,33 @@ WITH cte1 AS (SELECT * FROM cte1 WHERE EXISTS (SELECT * FROM cte1) LIMIT 5) SELECT s FROM cte1 WHERE EXISTS (SELECT * FROM cte1); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator - -> Result - One-Time Filter: $3 + -> Subquery Scan on citus_insert_select_subquery CTE cte1 -> Function Scan on generate_series s - CTE cte1 - -> Limit - InitPlan 2 (returns $1) - -> CTE Scan on cte1 cte1_1 - -> Result - One-Time Filter: $1 - -> CTE Scan on cte1 cte1_2 - InitPlan 4 (returns $3) - -> CTE Scan on cte1 cte1_3 - -> CTE Scan on cte1 + -> Result + One-Time Filter: $3 + CTE cte1 + -> Limit + InitPlan 2 (returns $1) + -> CTE Scan on cte1 cte1_1 + -> Result + One-Time Filter: $1 + -> CTE Scan on cte1 cte1_2 + InitPlan 4 (returns $3) + -> CTE Scan on cte1 cte1_3 + -> CTE Scan on cte1 EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part ( SELECT s FROM generate_series(1,5) s) UNION ( SELECT s FROM generate_series(5,10) s); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator - -> HashAggregate - Group Key: s.s - -> Append - -> Function Scan on generate_series s - -> Function Scan on generate_series s_1 + -> Subquery Scan on citus_insert_select_subquery + -> HashAggregate + Group Key: s.s + -> Append + -> Function Scan on generate_series s + -> Function Scan on generate_series s_1 -- explain with recursive planning -- prevent PG 11 - PG 12 outputs to diverge SET citus.enable_cte_inlining TO false; diff --git a/src/test/regress/expected/multi_insert_conflict.out b/src/test/regress/expected/multi_insert_conflict.out new file mode 100644 index 000000000..e69de29bb diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 766aaf47a..d50595468 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -710,6 +710,14 @@ INSERT INTO agg_events raw_events_first; DEBUG: CTE sub_cte is going to be inlined via distributed planning DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries +DEBUG: CTE sub_cte is going to be inlined via distributed planning +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generating subplan XXX_1 for CTE sub_cte: SELECT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, (SELECT sub_cte."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) sub_cte) AS value_1_agg FROM public.raw_events_first +DEBUG: Router planner cannot handle multi-shard select queries ERROR: could not run distributed query with subquery outside the FROM, WHERE and HAVING clauses HINT: Consider using an equality filter on the distributed table's partition column. -- We support set operations via the coordinator @@ -1044,7 +1052,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -1109,7 +1117,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -1408,7 +1416,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -1453,7 +1461,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> HashAggregate @@ -1501,7 +1509,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.id + Group Key: remote_scan.user_id -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) diff --git a/src/test/regress/expected/multi_insert_select_conflict.out b/src/test/regress/expected/multi_insert_select_conflict.out index d069cf7d3..f9145b0a6 100644 --- a/src/test/regress/expected/multi_insert_select_conflict.out +++ b/src/test/regress/expected/multi_insert_select_conflict.out @@ -114,10 +114,10 @@ WITH inserted_table AS ( ) SELECT * FROM inserted_table ORDER BY 1; DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 RETURNING target_table.col_1, target_table.col_2 DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- @@ -150,13 +150,13 @@ WITH inserted_table AS ( ) SELECT * FROM inserted_table ORDER BY 1; DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM ((SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) UNION (SELECT source_table_2.col_1, source_table_2.col_2, source_table_2.col_3 FROM on_conflict.source_table_2 LIMIT 5)) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = 0 RETURNING target_table.col_1, target_table.col_2 DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5 DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_2 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_2 LIMIT 5 DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) UNION SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- @@ -246,10 +246,10 @@ WITH inserted_table AS ( ) SELECT * FROM inserted_table ORDER BY 1; DEBUG: generating subplan XXX_1 for CTE inserted_table: WITH cte AS (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1), cte_2 AS (SELECT cte.col_1, cte.col_2 FROM cte) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = (excluded.col_2 OPERATOR(pg_catalog.+) 1) RETURNING target_table.col_1, target_table.col_2 DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: generating subplan XXX_1 for CTE cte: SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) cte DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte_2.col_1, cte_2.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_2) citus_insert_select_subquery +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- @@ -269,9 +269,9 @@ WITH cte AS ( UPDATE target_table SET col_2 = 4 WHERE col_1 IN (SELECT col_1 FROM cte); DEBUG: generating subplan XXX_1 for CTE cte: WITH basic AS (SELECT source_table_1.col_1, source_table_1.col_2 FROM on_conflict.source_table_1) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM basic ON CONFLICT DO NOTHING RETURNING target_table.col_1, target_table.col_2 DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE on_conflict.target_table SET col_2 = 4 WHERE (col_1 OPERATOR(pg_catalog.=) ANY (SELECT cte.col_1 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte)) DEBUG: generating subplan XXX_1 for CTE basic: SELECT col_1, col_2 FROM on_conflict.source_table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT basic.col_1, basic.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) basic) citus_insert_select_subquery +DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE on_conflict.target_table SET col_2 = 4 WHERE (col_1 OPERATOR(pg_catalog.=) ANY (SELECT cte.col_1 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte)) DEBUG: Collecting INSERT ... SELECT results on coordinator RESET client_min_messages; -- Following query is supported by using repartition join for the insert/select diff --git a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out index e84991da9..d6c8cc02f 100644 --- a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out +++ b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out @@ -147,6 +147,10 @@ FROM ( GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102]))) +DEBUG: generating subplan XXX_2 for subquery SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105]))) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) +DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102]))) UNION SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105]))) ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join RESET client_min_messages; @@ -293,6 +297,10 @@ GROUP BY ORDER BY count_pay; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12)) +DEBUG: generating subplan XXX_2 for subquery SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) +DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12)) UNION SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14)) ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join RESET client_min_messages; @@ -676,7 +684,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.user_id, remote_scan.event_type + Group Key: remote_scan.user_id, remote_scan.value_1_agg -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -696,7 +704,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.user_id, remote_scan.event_type + Group Key: remote_scan.user_id, remote_scan.value_1_agg -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) @@ -717,7 +725,7 @@ $Q$); Custom Scan (Citus INSERT ... SELECT) INSERT/SELECT method: pull to coordinator -> HashAggregate - Group Key: remote_scan.user_id, remote_scan.event_type + Group Key: remote_scan.user_id, remote_scan.value_1_agg -> Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive)