From 5eeb07124fcd1936caa0db5a95afde58c5a1f32c Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 16 Jan 2020 15:05:17 -0800 Subject: [PATCH] Repartitioned INSERT/SELECT: include job id in result id prefix --- .../executor/insert_select_executor.c | 16 ++- src/test/regress/bin/normalize.sed | 4 + .../expected/insert_select_repartition.out | 100 ++++++++++++++---- .../regress/expected/multi_insert_select.out | 24 ++--- .../regress/sql/insert_select_repartition.sql | 14 +++ 5 files changed, 116 insertions(+), 42 deletions(-) diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 179f00287..8547c21c2 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -209,10 +209,14 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) * We have a separate directory for each transaction, so choosing * the same result prefix won't cause filename conflicts. Results * directory name also includes node id and database id, so we don't - * need to include them in the filename. Jobs are executed - * sequentially, so we also don't need to include job id here. + * need to include them in the filename. We include job id here for + * the case "INSERT/SELECTs" are executed recursively. */ - char *distResultPrefix = "repartitioned_results"; + StringInfo distResultPrefixString = makeStringInfo(); + appendStringInfo(distResultPrefixString, + "repartitioned_results_" UINT64_FORMAT, + distSelectJob->jobId); + char *distResultPrefix = distResultPrefixString->data; DistTableCacheEntry *targetRelation = DistributedTableCacheEntry(targetRelationId); @@ -764,12 +768,6 @@ AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, if (selectEntry->ressortgroupref != 0) { - /* make sure that the name doesn't match any insert target list entries */ - resnameString = makeStringInfo(); - appendStringInfo(resnameString, "auto_resjunked_by_citus_%d", - targetEntryIndex); - - selectEntry->resname = resnameString->data; selectEntry->resjunk = true; nonProjectedEntries = lappend(nonProjectedEntries, selectEntry); } diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 0f9985a73..55ad1a72c 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -88,3 +88,7 @@ s/Subplan [0-9]+\_/Subplan XXX\_/g # Plan numbers in insert select s/read_intermediate_result\('insert_select_[0-9]+_/read_intermediate_result('insert_select_XXX_/g + +# ignore job id in repartitioned insert/select +s/repartitioned_results_[0-9]+/repartitioned_results_xxxxx/g + diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 6eb070560..bc50ab93c 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -30,10 +30,10 @@ HINT: Ensure the target table's partition column has a corresponding simple col DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'a' -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213585 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_from_4213583_to_0,repartitioned_results_from_4213584_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213586 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_from_4213582_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213587 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_from_4213581_to_2,repartitioned_results_from_4213582_to_2,repartitioned_results_from_4213584_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213588 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_from_4213581_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213585 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213583_to_0,repartitioned_results_xxxxx_from_4213584_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213586 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213582_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213587 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_2,repartitioned_results_xxxxx_from_4213582_to_2,repartitioned_results_xxxxx_from_4213584_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213588 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) RESET client_min_messages; SELECT * FROM target_table WHERE a=-1 OR a=-3 OR a=-7 ORDER BY a; a @@ -84,8 +84,8 @@ DETAIL: The target table's partition column should correspond to a partition co DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 2 with name 'key' -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_from_4213589_to_0,repartitioned_results_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_from_4213589_to_1,repartitioned_results_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0,repartitioned_results_xxxxx_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) RESET client_min_messages; SELECT * FROM target_table ORDER BY key; f1 | value | key @@ -114,8 +114,8 @@ DETAIL: The target table's partition column should correspond to a partition co DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 2 with name 'key' -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_from_4213589_to_0,repartitioned_results_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_from_4213589_to_1,repartitioned_results_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0,repartitioned_results_xxxxx_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) RESET client_min_messages; SELECT * FROM target_table ORDER BY key; f1 | value | key @@ -138,8 +138,8 @@ DETAIL: The target table's partition column should correspond to a partition co DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 1 with name 'key' -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_from_4213589_to_0,repartitioned_results_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_from_4213589_to_1,repartitioned_results_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0,repartitioned_results_xxxxx_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) RESET client_min_messages; SELECT * FROM target_table ORDER BY key; f1 | value | key @@ -164,8 +164,8 @@ DETAIL: The target table's partition column should correspond to a partition co DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 1 with name 'key' -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_from_4213589_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) ON CONFLICT(key) DO UPDATE SET f1 = 1 -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_from_4213589_to_1,repartitioned_results_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) ON CONFLICT(key) DO UPDATE SET f1 = 1 +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) ON CONFLICT(key) DO UPDATE SET f1 = 1 +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) ON CONFLICT(key) DO UPDATE SET f1 = 1 RESET client_min_messages; SELECT * FROM target_table ORDER BY key; f1 | value | key @@ -214,8 +214,8 @@ DETAIL: The data type of the target table's partition column should exactly mat DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'col_1' -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213593 AS citus_table_alias (col_1, col_2) SELECT col_1, col_2 FROM read_intermediate_results('{repartitioned_results_from_4213597_to_0,repartitioned_results_from_4213600_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer) ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213594 AS citus_table_alias (col_1, col_2) SELECT col_1, col_2 FROM read_intermediate_results('{repartitioned_results_from_4213599_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer) ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213593 AS citus_table_alias (col_1, col_2) SELECT col_1, col_2 FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213597_to_0,repartitioned_results_xxxxx_from_4213600_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer) ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213594 AS citus_table_alias (col_1, col_2) SELECT col_1, col_2 FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213599_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer) ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 RESET client_min_messages; SELECT * FROM target_table ORDER BY 1; col_1 | col_2 @@ -584,9 +584,9 @@ DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'a' -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_from_4213606_to_0,repartitioned_results_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213611 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_from_4213607_to_1,repartitioned_results_from_4213609_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213612 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_from_4213606_to_2,repartitioned_results_from_4213607_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213611 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213607_to_1,repartitioned_results_xxxxx_from_4213609_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213612 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_2,repartitioned_results_xxxxx_from_4213607_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RESET client_min_messages; SELECT * FROM target_table ORDER BY a; a | b @@ -655,6 +655,64 @@ SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY 4 | 6 | 1 (5 rows) +-- +-- INSERT/SELECT in CTE +-- +TRUNCATE target_table; +SET client_min_messages TO DEBUG2; +WITH r AS ( + INSERT INTO target_table SELECT * FROM source_table RETURNING * +) +INSERT INTO target_table SELECT source_table.a, max(source_table.b) FROM source_table NATURAL JOIN r GROUP BY source_table.a; +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries +DEBUG: generating subplan XXX_1 for CTE r: INSERT INTO insert_select_repartition.target_table (a, b) SELECT a, b FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, max AS b FROM (SELECT source_table.a, max(source_table.b) AS max FROM (insert_select_repartition.source_table JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) r USING (a, b)) GROUP BY source_table.a) citus_insert_select_subquery +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'a' +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213611 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213607_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213612 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213609_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b +DEBUG: partitioning SELECT query by column index 0 with name 'a' +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213611 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213607_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213612 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213609_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) +RESET client_min_messages; +SELECT * FROM target_table ORDER BY a, b; + a | b +--------------------------------------------------------------------- + 0 | 1 + 0 | 4 + 0 | 9 + 0 | 9 + 1 | 16 + 1 | 25 + 1 | 36 + 1 | 49 + 1 | 49 + 2 | 64 + 2 | 81 + 2 | 100 + 2 | 121 + 2 | 121 + 3 | 144 + 3 | 169 + 3 | 196 + 3 | 225 + 3 | 225 + 4 | 256 + 4 | 289 + 4 | 324 + 4 | 361 + 4 | 361 + 5 | 400 + 5 | 400 +(26 rows) + DROP TABLE source_table, target_table; -- -- Constraint failure and rollback @@ -753,7 +811,7 @@ DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'a' -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213617 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_from_4213613_to_0,repartitioned_results_from_4213614_to_0,repartitioned_results_from_4213615_to_0,repartitioned_results_from_4213616_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213617 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213613_to_0,repartitioned_results_xxxxx_from_4213614_to_0,repartitioned_results_xxxxx_from_4213615_to_0,repartitioned_results_xxxxx_from_4213616_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RESET client_min_messages; SELECT * FROM target_table ORDER BY a, b; a | b @@ -865,9 +923,9 @@ DEBUG: INSERT target table and the source relation of the SELECT partition colu DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'a' -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213633 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_from_4213629_to_0,repartitioned_results_from_4213630_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer) -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213634 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_from_4213630_to_1,repartitioned_results_from_4213631_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer) -DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213635 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_from_4213632_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213633 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213629_to_0,repartitioned_results_xxxxx_from_4213630_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213634 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213630_to_1,repartitioned_results_xxxxx_from_4213631_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213635 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213632_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer) RESET client_min_messages; SELECT count(*) FROM target_table; count diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 49758c713..c8398ccbb 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -726,10 +726,10 @@ DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'user_id' -DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300000 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300004_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) -DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300001 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300005_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) -DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300002 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300006_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) -DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300003 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300007_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300000 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_xxxxx_from_13300004_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300001 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_xxxxx_from_13300005_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300002 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_xxxxx_from_13300006_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300003 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_xxxxx_from_13300007_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) ROLLBACK; -- We do support set operations through recursive planning BEGIN; @@ -1129,10 +1129,10 @@ HINT: Ensure the target table's partition column has a corresponding simple col DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'user_id' -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300000_to_0,repartitioned_results_from_13300001_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300000_to_1,repartitioned_results_from_13300001_to_1,repartitioned_results_from_13300003_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300001_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300000_to_3,repartitioned_results_from_13300002_to_3,repartitioned_results_from_13300003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_xxxxx_from_13300000_to_0,repartitioned_results_xxxxx_from_13300001_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_xxxxx_from_13300000_to_1,repartitioned_results_xxxxx_from_13300001_to_1,repartitioned_results_xxxxx_from_13300003_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_xxxxx_from_13300001_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_xxxxx_from_13300000_to_3,repartitioned_results_xxxxx_from_13300002_to_3,repartitioned_results_xxxxx_from_13300003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) INSERT INTO raw_events_second (user_id) SELECT user_id :: bigint @@ -1143,10 +1143,10 @@ HINT: Ensure the target table's partition column has a corresponding simple col DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'user_id' -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300000_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300001_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300002_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_xxxxx_from_13300000_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_xxxxx_from_13300001_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_xxxxx_from_13300002_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_xxxxx_from_13300003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) INSERT INTO agg_events (value_3_agg, value_4_agg, diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index fb80de3b1..127980686 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -311,6 +311,20 @@ RESET client_min_messages; SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; +-- +-- INSERT/SELECT in CTE +-- + +TRUNCATE target_table; + +SET client_min_messages TO DEBUG2; +WITH r AS ( + INSERT INTO target_table SELECT * FROM source_table RETURNING * +) +INSERT INTO target_table SELECT source_table.a, max(source_table.b) FROM source_table NATURAL JOIN r GROUP BY source_table.a; +RESET client_min_messages; + +SELECT * FROM target_table ORDER BY a, b; DROP TABLE source_table, target_table;