From 9dd14fa90d6190586780854bec76a80b032787f6 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Tue, 4 Feb 2020 23:15:22 -0800 Subject: [PATCH 1/2] Rename discarded target list items in repartitioned INSERT/SELECT --- .../executor/insert_select_executor.c | 12 ++++ .../expected/insert_select_repartition.out | 72 +++++++++++++++++++ .../regress/sql/insert_select_repartition.sql | 61 ++++++++++++++++ 3 files changed, 145 insertions(+) diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index b1bb6ab2d..a1b3928f0 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -770,6 +770,18 @@ AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, if (selectEntry->ressortgroupref != 0) { selectEntry->resjunk = true; + + /* + * This entry might still end up in the SELECT output list, so + * rename it to avoid ambiguity. + * + * See https://github.com/citusdata/citus/pull/3470. + */ + resnameString = makeStringInfo(); + appendStringInfo(resnameString, "discarded_target_item_%d", + targetEntryIndex); + selectEntry->resname = resnameString->data; + nonProjectedEntries = lappend(nonProjectedEntries, selectEntry); } } diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 9aa0a5181..9cf213ff6 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -1046,5 +1046,77 @@ SELECT count(*) FROM test; 20 (1 row) +-- +-- In the following case we coerce some columns and move uncoerced versions to the +-- end of SELECT list. The following case verifies that we rename those columns so +-- we don't get "column reference is ambiguous" errors. +-- +CREATE TABLE target_table( + c1 int, + c2 int, + c3 timestamp, + a int, + b int, + c int, + c4 int, + c5 int, + c6 int[], + cardinality int, + sum int, + PRIMARY KEY (c1, c2, c3, c4, c5, c6) +); +SET citus.shard_count TO 5; +SELECT create_distributed_table('target_table', 'c1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE source_table( + c1 int, + c2 int, + c3 date, + c4 int, + cardinality int, + sum int +); +SET citus.shard_count TO 4; +SELECT create_distributed_table('source_table', 'c1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION dist_func(a int, b int) RETURNS int[] +AS $$ +BEGIN + RETURN array_fill(a, ARRAY[b]); +END; +$$ +LANGUAGE plpgsql STABLE; +SELECT create_distributed_function('dist_func(int, int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG; +INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum) +SELECT c1, c2, c3, c4, -1::float AS c5, + dist_func(c1, 4) c6, + sum(cardinality), + sum(sum) +FROM source_table +GROUP BY c1, c2, c3, c4, c5, c6 +ON CONFLICT(c1, c2, c3, c4, c5, c6) +DO UPDATE SET + cardinality = enriched.cardinality + excluded.cardinality, + sum = enriched.sum + excluded.sum; +DEBUG: rehashing catalog cache id 14 for pg_opclass; 17 tups, 8 buckets at character 224 +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'c1' +-- clean-up SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index ea2f3190a..2d0b3dbd6 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -481,5 +481,66 @@ RESET client_min_messages; SELECT count(*) FROM test; +-- +-- In the following case we coerce some columns and move uncoerced versions to the +-- end of SELECT list. The following case verifies that we rename those columns so +-- we don't get "column reference is ambiguous" errors. +-- + +CREATE TABLE target_table( + c1 int, + c2 int, + c3 timestamp, + a int, + b int, + c int, + c4 int, + c5 int, + c6 int[], + cardinality int, + sum int, + PRIMARY KEY (c1, c2, c3, c4, c5, c6) +); + +SET citus.shard_count TO 5; +SELECT create_distributed_table('target_table', 'c1'); + +CREATE TABLE source_table( + c1 int, + c2 int, + c3 date, + c4 int, + cardinality int, + sum int +); + +SET citus.shard_count TO 4; +SELECT create_distributed_table('source_table', 'c1'); + +CREATE OR REPLACE FUNCTION dist_func(a int, b int) RETURNS int[] +AS $$ +BEGIN + RETURN array_fill(a, ARRAY[b]); +END; +$$ +LANGUAGE plpgsql STABLE; + +SELECT create_distributed_function('dist_func(int, int)'); + +SET client_min_messages TO DEBUG; +INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum) +SELECT c1, c2, c3, c4, -1::float AS c5, + dist_func(c1, 4) c6, + sum(cardinality), + sum(sum) +FROM source_table +GROUP BY c1, c2, c3, c4, c5, c6 +ON CONFLICT(c1, c2, c3, c4, c5, c6) +DO UPDATE SET + cardinality = enriched.cardinality + excluded.cardinality, + sum = enriched.sum + excluded.sum; + +-- clean-up + SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; From 64ca5c9acbb2c3f8ae575a1c1a1ec409867dec52 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 5 Feb 2020 11:06:03 +0100 Subject: [PATCH 2/2] Add additional INSERT..SELECT repartition tests --- .../expected/insert_select_repartition.out | 38 +++++++++++++++++++ .../regress/sql/insert_select_repartition.sql | 20 +++++++++- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 9cf213ff6..b483b3d7f 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -1101,6 +1101,16 @@ SELECT create_distributed_function('dist_func(int, int)'); (1 row) SET client_min_messages TO DEBUG; +SET citus.enable_unique_job_ids TO off; +INSERT INTO source_table VALUES (1,2, '2020-02-02', 3, 4, 5); +DEBUG: Creating router plan +DEBUG: Plan is router executable +INSERT INTO source_table VALUES (1,2, '2020-02-02', 3, 4, 5); +DEBUG: Creating router plan +DEBUG: Plan is router executable +INSERT INTO source_table VALUES (3,4, '2020-02-02', 3, 4, 5); +DEBUG: Creating router plan +DEBUG: Plan is router executable INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum) SELECT c1, c2, c3, c4, -1::float AS c5, dist_func(c1, 4) c6, @@ -1117,6 +1127,34 @@ DEBUG: INSERT target table and the source relation of the SELECT partition colu DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT DEBUG: partitioning SELECT query by column index 0 with name 'c1' +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213639 AS enriched (c1, c2, c3, c4, c5, c6, cardinality, sum) SELECT c1, c2, c3, c4, c5, c6, cardinality, sum FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213644_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(c1 integer, c2 integer, c3 timestamp without time zone, c4 integer, c5 integer, c6 integer[], cardinality integer, sum integer) ON CONFLICT(c1, c2, c3, c4, c5, c6) DO UPDATE SET cardinality = (enriched.cardinality OPERATOR(pg_catalog.+) excluded.cardinality), sum = (enriched.sum OPERATOR(pg_catalog.+) excluded.sum) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213641 AS enriched (c1, c2, c3, c4, c5, c6, cardinality, sum) SELECT c1, c2, c3, c4, c5, c6, cardinality, sum FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213645_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(c1 integer, c2 integer, c3 timestamp without time zone, c4 integer, c5 integer, c6 integer[], cardinality integer, sum integer) ON CONFLICT(c1, c2, c3, c4, c5, c6) DO UPDATE SET cardinality = (enriched.cardinality OPERATOR(pg_catalog.+) excluded.cardinality), sum = (enriched.sum OPERATOR(pg_catalog.+) excluded.sum) +RESET client_min_messages; +EXPLAIN (COSTS OFF) INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum) +SELECT c1, c2, c3, c4, -1::float AS c5, + dist_func(c1, 4) c6, + sum(cardinality), + sum(sum) +FROM source_table +GROUP BY c1, c2, c3, c4, c5, c6 +ON CONFLICT(c1, c2, c3, c4, c5, c6) +DO UPDATE SET + cardinality = enriched.cardinality + excluded.cardinality, + sum = enriched.sum + excluded.sum; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate + Group Key: c1, c2, c3, c4, '-1'::double precision, insert_select_repartition.dist_func(c1, 4) + -> Seq Scan on source_table_4213644 source_table +(10 rows) + -- clean-up SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index 2d0b3dbd6..8d8b625ea 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -528,6 +528,11 @@ LANGUAGE plpgsql STABLE; SELECT create_distributed_function('dist_func(int, int)'); SET client_min_messages TO DEBUG; +SET citus.enable_unique_job_ids TO off; +INSERT INTO source_table VALUES (1,2, '2020-02-02', 3, 4, 5); +INSERT INTO source_table VALUES (1,2, '2020-02-02', 3, 4, 5); +INSERT INTO source_table VALUES (3,4, '2020-02-02', 3, 4, 5); + INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum) SELECT c1, c2, c3, c4, -1::float AS c5, dist_func(c1, 4) c6, @@ -540,7 +545,20 @@ DO UPDATE SET cardinality = enriched.cardinality + excluded.cardinality, sum = enriched.sum + excluded.sum; --- clean-up +RESET client_min_messages; +EXPLAIN (COSTS OFF) INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum) +SELECT c1, c2, c3, c4, -1::float AS c5, + dist_func(c1, 4) c6, + sum(cardinality), + sum(sum) +FROM source_table +GROUP BY c1, c2, c3, c4, c5, c6 +ON CONFLICT(c1, c2, c3, c4, c5, c6) +DO UPDATE SET + cardinality = enriched.cardinality + excluded.cardinality, + sum = enriched.sum + excluded.sum; + +-- clean-up SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE;