diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 3f46161ae..4510e83ee 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -62,7 +62,7 @@ CustomScanMethods TaskTrackerCustomScanMethods = { }; CustomScanMethods CoordinatorInsertSelectCustomScanMethods = { - "Citus INSERT ... SELECT via coordinator", + "Citus INSERT ... SELECT", CoordinatorInsertSelectCreateScan }; diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index b52847df0..f4b1ac718 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -69,13 +69,11 @@ static List * BuildColumnNameListFromTargetList(Oid targetRelationId, static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList); static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, Oid targetRelationId); -static bool IsSupportedRedistributionTarget(Oid targetRelationId); static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery, DistTableCacheEntry *targetRelation, List **redistributedResults, bool useBinaryFormat); static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn); -static bool IsRedistributablePlan(Plan *selectPlan); static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, int targetTypeMod); static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries); @@ -873,7 +871,7 @@ CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, * IsSupportedRedistributionTarget determines whether re-partitioning into the * given target relation is supported. */ -static bool +bool IsSupportedRedistributionTarget(Oid targetRelationId) { DistTableCacheEntry *tableEntry = DistributedTableCacheEntry(targetRelationId); @@ -966,7 +964,7 @@ RedistributedInsertSelectTaskList(Query *insertSelectQuery, ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data))); LockShardDistributionMetadata(shardId, ShareLock); - List *insertShardPlacementList = FinalizedShardPlacementList(shardId); + List *insertShardPlacementList = ActiveShardPlacementList(shardId); RelationShard *relationShard = CitusMakeNode(RelationShard); relationShard->relationId = targetShardInterval->relationId; @@ -1015,7 +1013,7 @@ PartitionColumnIndex(List *insertTargetList, Var *partitionColumn) /* * IsRedistributablePlan returns true if the given plan is a redistrituable plan. */ -static bool +bool IsRedistributablePlan(Plan *selectPlan) { /* don't redistribute if query is not distributed or requires merge on coordinator */ diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 7a527269a..ab86b4aa2 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -32,6 +32,7 @@ #include "distributed/multi_logical_planner.h" #include "distributed/multi_master_planner.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_router_planner.h" #include "distributed/distributed_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/remote_commands.h" @@ -139,9 +140,12 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, DistributedPlan *distributedPlan = scanState->distributedPlan; Query *insertSelectQuery = distributedPlan->insertSelectQuery; Query *query = BuildSelectForInsertSelect(insertSelectQuery); + RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); + Oid targetRelationId = insertRte->relid; IntoClause *into = NULL; ParamListInfo params = NULL; char *queryString = NULL; + int cursorOptions = CURSOR_OPT_PARALLEL_OK; if (es->analyze) { @@ -150,6 +154,17 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, "... SELECT commands via the coordinator"))); } + PlannedStmt *selectPlan = pg_plan_query(query, cursorOptions, params); + if (IsRedistributablePlan(selectPlan->planTree) && + IsSupportedRedistributionTarget(targetRelationId)) + { + ExplainPropertyText("INSERT/SELECT method", "repartition", es); + } + else + { + ExplainPropertyText("INSERT/SELECT method", "pull to coordinator", es); + } + ExplainOpenGroup("Select Query", "Select Query", false, es); /* explain the inner SELECT query */ diff --git a/src/include/distributed/insert_select_executor.h b/src/include/distributed/insert_select_executor.h index eda3d6679..a2101ee1a 100644 --- a/src/include/distributed/insert_select_executor.h +++ b/src/include/distributed/insert_select_executor.h @@ -20,6 +20,8 @@ extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node); extern bool ExecutingInsertSelect(void); extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery); +extern bool IsSupportedRedistributionTarget(Oid targetRelationId); +extern bool IsRedistributablePlan(Plan *selectPlan); #endif /* INSERT_SELECT_EXECUTOR_H */ diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 6cae63c01..792f66dbf 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -275,9 +275,9 @@ SET citus.log_remote_commands TO true; SET client_min_messages TO DEBUG; CREATE TABLE results AS SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a; DEBUG: Router planner cannot handle multi-shard select queries DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially -LOG: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213601 source_table WHERE true GROUP BY a -LOG: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213602 source_table WHERE true GROUP BY a -LOG: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213603 source_table WHERE true GROUP BY a +NOTICE: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213601 source_table WHERE true GROUP BY a +NOTICE: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213602 source_table WHERE true GROUP BY a +NOTICE: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213603 source_table WHERE true GROUP BY a RESET citus.log_remote_commands; RESET client_min_messages; DROP TABLE results; -- now verify that we don't write the extra columns to the intermediate result files and @@ -555,6 +555,24 @@ SELECT * FROM target_table ORDER BY a; 5 | 400 (6 rows) +-- +-- EXPLAIN output should specify repartitioned INSERT/SELECT +-- +EXPLAIN INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate (cost=43.90..45.90 rows=200 width=8) + Group Key: a + -> Seq Scan on source_table_4213606 source_table (cost=0.00..32.60 rows=2260 width=8) +(10 rows) + DROP TABLE source_table, target_table; SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 9da8e24e0..ad633c38f 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -1188,7 +1188,8 @@ ROLLBACK; EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part SELECT o_orderkey FROM orders_hash_part LIMIT 3; -Custom Scan (Citus INSERT ... SELECT via coordinator) +Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator -> Limit -> Custom Scan (Citus Adaptive) Task Count: 4 @@ -1205,7 +1206,8 @@ t EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part (l_orderkey, l_quantity) SELECT o_orderkey, 5 FROM orders_hash_part LIMIT 3; -Custom Scan (Citus INSERT ... SELECT via coordinator) +Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator -> Limit -> Custom Scan (Citus Adaptive) Task Count: 4 @@ -1217,7 +1219,8 @@ Custom Scan (Citus INSERT ... SELECT via coordinator) EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part (l_orderkey) SELECT s FROM generate_series(1,5) s; -Custom Scan (Citus INSERT ... SELECT via coordinator) +Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator -> Function Scan on generate_series s -- WHERE EXISTS forces pg12 to materialize cte EXPLAIN (COSTS OFF) @@ -1225,7 +1228,8 @@ WITH cte1 AS (SELECT s FROM generate_series(1,10) s) INSERT INTO lineitem_hash_part WITH cte1 AS (SELECT * FROM cte1 WHERE EXISTS (SELECT * FROM cte1) LIMIT 5) SELECT s FROM cte1 WHERE EXISTS (SELECT * FROM cte1); -Custom Scan (Citus INSERT ... SELECT via coordinator) +Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator -> Result One-Time Filter: $3 CTE cte1 @@ -1244,7 +1248,8 @@ EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part ( SELECT s FROM generate_series(1,5) s) UNION ( SELECT s FROM generate_series(5,10) s); -Custom Scan (Citus INSERT ... SELECT via coordinator) +Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator -> HashAggregate Group Key: s.s -> Append diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index ad432162b..1c1632fb5 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -265,6 +265,13 @@ RESET client_min_messages; SELECT * FROM target_table ORDER BY a; +-- +-- EXPLAIN output should specify repartitioned INSERT/SELECT +-- + +EXPLAIN INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; + + DROP TABLE source_table, target_table; SET client_min_messages TO WARNING;