diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index dc685afa4..b52847df0 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -149,9 +149,8 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); /* - * If the type of insert column and target table's column type is - * different from each other. Cast insert column't type to target - * table's column + * Cast types of insert target list and select projection list to + * match the column types of the target relation. */ selectQuery->targetList = AddInsertSelectCasts(insertSelectQuery->targetList, @@ -1045,8 +1044,8 @@ IsRedistributablePlan(Plan *selectPlan) /* - * WrapForProjection wraps task->queryString to only select given projected - * columns. It modifies the taskList. + * WrapTaskListForProjection wraps task->queryString to only select given + * projected columns. It modifies the taskList. */ static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries) diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index ab69137f7..9a2fd0396 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -249,15 +249,11 @@ SELECT create_distributed_table('target_table', 'a'); (1 row) -SET client_min_messages TO DEBUG2; +SET client_min_messages TO DEBUG1; INSERT INTO target_table SELECT mapped_key, c FROM source_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. -DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT -DEBUG: partitioning SELECT query by column index 0 with name 'mapped_key' -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213604 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213602_to_0,repartitioned_results_from_4213603_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[]) -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213605 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213601_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[]) RESET client_min_messages; SELECT * FROM target_table ORDER BY a; a | b @@ -286,14 +282,10 @@ RESET citus.log_remote_commands; RESET client_min_messages; DROP TABLE results; -- now verify that we don't write the extra columns to the intermediate result files and -- insertion to the target works fine. -SET client_min_messages TO DEBUG2; +SET client_min_messages TO DEBUG1; INSERT INTO target_table SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match -DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT -DEBUG: partitioning SELECT query by column index 0 with name 'max' -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213604 AS citus_table_alias (a, b) SELECT max, array_agg FROM read_intermediate_results('{repartitioned_results_from_4213602_to_0,repartitioned_results_from_4213603_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(max integer, array_agg integer[]) -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213605 AS citus_table_alias (a, b) SELECT max, array_agg FROM read_intermediate_results('{repartitioned_results_from_4213601_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(max integer, array_agg integer[]) RESET client_min_messages; SELECT * FROM target_table ORDER BY a; a | b @@ -482,23 +474,16 @@ SELECT * FROM target_table ORDER BY a; -- repartitioned INSERT/SELECT with RETURNING -- TRUNCATE target_table; -SET client_min_messages TO DEBUG2; +SET client_min_messages TO DEBUG1; WITH c AS ( INSERT INTO target_table SELECT mapped_key, c FROM source_table RETURNING *) SELECT * FROM c ORDER by a; -DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries DEBUG: generating subplan XXX_1 for CTE c: INSERT INTO insert_select_repartition.target_table (a, b) SELECT mapped_key, c FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer[])) c ORDER BY a -DEBUG: Creating router plan -DEBUG: Plan is router executable -DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT -DEBUG: partitioning SELECT query by column index 0 with name 'mapped_key' -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213604 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213602_to_0,repartitioned_results_from_4213603_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[]) RETURNING citus_table_alias.a, citus_table_alias.b -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213605 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213601_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[]) RETURNING citus_table_alias.a, citus_table_alias.b a | b --------------------------------------------------------------------- -4 | {3} @@ -508,5 +493,33 @@ DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_tabl (4 rows) RESET client_min_messages; +-- +-- in combination with CTEs +-- +TRUNCATE target_table; +SET client_min_messages TO DEBUG1; +WITH t AS ( + SELECT mapped_key, a, c FROM source_table + WHERE a > floor(random()) +) +INSERT INTO target_table +SELECT mapped_key, c FROM t NATURAL JOIN source_table; +DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries +DEBUG: generating subplan XXX_1 for CTE t: SELECT mapped_key, a, c FROM insert_select_repartition.source_table WHERE ((a)::double precision OPERATOR(pg_catalog.>) floor(random())) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key, (c)::integer[] AS auto_coerced_by_citus_1 FROM (SELECT t.mapped_key, t.c FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery +DEBUG: performing repartitioned INSERT ... SELECT +RESET client_min_messages; +SELECT * FROM target_table ORDER BY a; + a | b +--------------------------------------------------------------------- + -4 | {3} + -3 | {} + -2 | {4,6} + -1 | {1,2,3} +(4 rows) + +-- +-- The case where select query has a GROUP BY ... +-- SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index 7e0bc0cb2..7dbfd0f76 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -117,7 +117,7 @@ SET citus.shard_count TO 2; CREATE TABLE target_table(a int, b int[]); SELECT create_distributed_table('target_table', 'a'); -SET client_min_messages TO DEBUG2; +SET client_min_messages TO DEBUG1; INSERT INTO target_table SELECT mapped_key, c FROM source_table; RESET client_min_messages; @@ -138,7 +138,7 @@ DROP TABLE results; -- now verify that we don't write the extra columns to the intermediate result files and -- insertion to the target works fine. -SET client_min_messages TO DEBUG2; +SET client_min_messages TO DEBUG1; INSERT INTO target_table SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a; RESET client_min_messages; @@ -221,7 +221,7 @@ SELECT * FROM target_table ORDER BY a; -- repartitioned INSERT/SELECT with RETURNING -- TRUNCATE target_table; -SET client_min_messages TO DEBUG2; +SET client_min_messages TO DEBUG1; WITH c AS ( INSERT INTO target_table SELECT mapped_key, c FROM source_table @@ -229,5 +229,23 @@ WITH c AS ( SELECT * FROM c ORDER by a; RESET client_min_messages; +-- +-- in combination with CTEs +-- +TRUNCATE target_table; +SET client_min_messages TO DEBUG1; +WITH t AS ( + SELECT mapped_key, a, c FROM source_table + WHERE a > floor(random()) +) +INSERT INTO target_table +SELECT mapped_key, c FROM t NATURAL JOIN source_table; +RESET client_min_messages; +SELECT * FROM target_table ORDER BY a; + +-- +-- The case where select query has a GROUP BY ... +-- + SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE;