From 494cc383ccb09600b7cd564e9b1c2448e26417f4 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 13 Jan 2020 17:54:40 -0800 Subject: [PATCH] Repartitioned INSERT/SELECT: Enable RETURNING --- .../executor/insert_select_executor.c | 11 ++----- .../expected/insert_select_repartition.out | 30 +++++++++++++++++++ .../expected/multi_insert_select_conflict.out | 6 ++-- .../regress/sql/insert_select_repartition.sql | 12 ++++++++ 4 files changed, 48 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 97b31c9cf..dc685afa4 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -75,7 +75,7 @@ static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery, List **redistributedResults, bool useBinaryFormat); static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn); -static bool IsRedistributablePlan(Plan *selectPlan, bool hasReturning); +static bool IsRedistributablePlan(Plan *selectPlan); static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, int targetTypeMod); static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries); @@ -180,7 +180,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) LockPartitionRelations(targetRelationId, RowExclusiveLock); } - if (IsRedistributablePlan(selectPlan->planTree, hasReturning) && + if (IsRedistributablePlan(selectPlan->planTree) && IsSupportedRedistributionTarget(targetRelationId)) { ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT"))); @@ -1017,13 +1017,8 @@ PartitionColumnIndex(List *insertTargetList, Var *partitionColumn) * IsRedistributablePlan returns true if the given plan is a redistrituable plan. */ static bool -IsRedistributablePlan(Plan *selectPlan, bool hasReturning) +IsRedistributablePlan(Plan *selectPlan) { - if (hasReturning) - { - return false; - } - /* don't redistribute if query is not distributed or requires merge on coordinator */ if (!IsCitusCustomScan(selectPlan)) { diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 1107aa8a3..ab69137f7 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -478,5 +478,35 @@ SELECT * FROM target_table ORDER BY a; -1 | {1,2,3} (12 rows) +-- +-- repartitioned INSERT/SELECT with RETURNING +-- +TRUNCATE target_table; +SET client_min_messages TO DEBUG2; +WITH c AS ( + INSERT INTO target_table + SELECT mapped_key, c FROM source_table + RETURNING *) +SELECT * FROM c ORDER by a; +DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries +DEBUG: generating subplan XXX_1 for CTE c: INSERT INTO insert_select_repartition.target_table (a, b) SELECT mapped_key, c FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer[])) c ORDER BY a +DEBUG: Creating router plan +DEBUG: Plan is router executable +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'mapped_key' +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213604 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213602_to_0,repartitioned_results_from_4213603_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[]) RETURNING citus_table_alias.a, citus_table_alias.b +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213605 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213601_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[]) RETURNING citus_table_alias.a, citus_table_alias.b + a | b +--------------------------------------------------------------------- + -4 | {3} + -3 | {} + -2 | {4,6} + -1 | {1,2,3} +(4 rows) + +RESET client_min_messages; SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/expected/multi_insert_select_conflict.out b/src/test/regress/expected/multi_insert_select_conflict.out index 4cbed96ce..329763333 100644 --- a/src/test/regress/expected/multi_insert_select_conflict.out +++ b/src/test/regress/expected/multi_insert_select_conflict.out @@ -70,7 +70,7 @@ DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 -DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: performing repartitioned INSERT ... SELECT col_1 | col_2 --------------------------------------------------------------------- 1 | 1 @@ -324,11 +324,11 @@ BEGIN; FROM source_table_1 ON CONFLICT (col_1) DO UPDATE SET col_2 = 1 RETURNING *; col_1 | col_2 --------------------------------------------------------------------- - 1 | 1 - 2 | 1 3 | 1 4 | 1 + 1 | 1 5 | 1 + 2 | 1 (5 rows) ROLLBACK; diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index 6d296cf68..7e0bc0cb2 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -217,5 +217,17 @@ INSERT INTO target_table SELECT mapped_key, c FROM source_table; END; SELECT * FROM target_table ORDER BY a; +-- +-- repartitioned INSERT/SELECT with RETURNING +-- +TRUNCATE target_table; +SET client_min_messages TO DEBUG2; +WITH c AS ( + INSERT INTO target_table + SELECT mapped_key, c FROM source_table + RETURNING *) +SELECT * FROM c ORDER by a; +RESET client_min_messages; + SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE;