mirror of https://github.com/citusdata/citus.git
Merge pull request #3470 from citusdata/insert_select_issue
Rename discarded target list items in repartitioned INSERT/SELECTpull/3471/head^2
commit
8c972dc614
|
@ -770,6 +770,18 @@ AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
|||
if (selectEntry->ressortgroupref != 0)
|
||||
{
|
||||
selectEntry->resjunk = true;
|
||||
|
||||
/*
|
||||
* This entry might still end up in the SELECT output list, so
|
||||
* rename it to avoid ambiguity.
|
||||
*
|
||||
* See https://github.com/citusdata/citus/pull/3470.
|
||||
*/
|
||||
resnameString = makeStringInfo();
|
||||
appendStringInfo(resnameString, "discarded_target_item_%d",
|
||||
targetEntryIndex);
|
||||
selectEntry->resname = resnameString->data;
|
||||
|
||||
nonProjectedEntries = lappend(nonProjectedEntries, selectEntry);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1046,5 +1046,115 @@ SELECT count(*) FROM test;
|
|||
20
|
||||
(1 row)
|
||||
|
||||
--
|
||||
-- In the following case we coerce some columns and move uncoerced versions to the
|
||||
-- end of SELECT list. The following case verifies that we rename those columns so
|
||||
-- we don't get "column reference is ambiguous" errors.
|
||||
--
|
||||
CREATE TABLE target_table(
|
||||
c1 int,
|
||||
c2 int,
|
||||
c3 timestamp,
|
||||
a int,
|
||||
b int,
|
||||
c int,
|
||||
c4 int,
|
||||
c5 int,
|
||||
c6 int[],
|
||||
cardinality int,
|
||||
sum int,
|
||||
PRIMARY KEY (c1, c2, c3, c4, c5, c6)
|
||||
);
|
||||
SET citus.shard_count TO 5;
|
||||
SELECT create_distributed_table('target_table', 'c1');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE source_table(
|
||||
c1 int,
|
||||
c2 int,
|
||||
c3 date,
|
||||
c4 int,
|
||||
cardinality int,
|
||||
sum int
|
||||
);
|
||||
SET citus.shard_count TO 4;
|
||||
SELECT create_distributed_table('source_table', 'c1');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE OR REPLACE FUNCTION dist_func(a int, b int) RETURNS int[]
|
||||
AS $$
|
||||
BEGIN
|
||||
RETURN array_fill(a, ARRAY[b]);
|
||||
END;
|
||||
$$
|
||||
LANGUAGE plpgsql STABLE;
|
||||
SELECT create_distributed_function('dist_func(int, int)');
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG;
|
||||
SET citus.enable_unique_job_ids TO off;
|
||||
INSERT INTO source_table VALUES (1,2, '2020-02-02', 3, 4, 5);
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
INSERT INTO source_table VALUES (1,2, '2020-02-02', 3, 4, 5);
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
INSERT INTO source_table VALUES (3,4, '2020-02-02', 3, 4, 5);
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum)
|
||||
SELECT c1, c2, c3, c4, -1::float AS c5,
|
||||
dist_func(c1, 4) c6,
|
||||
sum(cardinality),
|
||||
sum(sum)
|
||||
FROM source_table
|
||||
GROUP BY c1, c2, c3, c4, c5, c6
|
||||
ON CONFLICT(c1, c2, c3, c4, c5, c6)
|
||||
DO UPDATE SET
|
||||
cardinality = enriched.cardinality + excluded.cardinality,
|
||||
sum = enriched.sum + excluded.sum;
|
||||
DEBUG: rehashing catalog cache id 14 for pg_opclass; 17 tups, 8 buckets at character 224
|
||||
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: performing repartitioned INSERT ... SELECT
|
||||
DEBUG: partitioning SELECT query by column index 0 with name 'c1'
|
||||
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213639 AS enriched (c1, c2, c3, c4, c5, c6, cardinality, sum) SELECT c1, c2, c3, c4, c5, c6, cardinality, sum FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213644_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(c1 integer, c2 integer, c3 timestamp without time zone, c4 integer, c5 integer, c6 integer[], cardinality integer, sum integer) ON CONFLICT(c1, c2, c3, c4, c5, c6) DO UPDATE SET cardinality = (enriched.cardinality OPERATOR(pg_catalog.+) excluded.cardinality), sum = (enriched.sum OPERATOR(pg_catalog.+) excluded.sum)
|
||||
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213641 AS enriched (c1, c2, c3, c4, c5, c6, cardinality, sum) SELECT c1, c2, c3, c4, c5, c6, cardinality, sum FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213645_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(c1 integer, c2 integer, c3 timestamp without time zone, c4 integer, c5 integer, c6 integer[], cardinality integer, sum integer) ON CONFLICT(c1, c2, c3, c4, c5, c6) DO UPDATE SET cardinality = (enriched.cardinality OPERATOR(pg_catalog.+) excluded.cardinality), sum = (enriched.sum OPERATOR(pg_catalog.+) excluded.sum)
|
||||
RESET client_min_messages;
|
||||
EXPLAIN (COSTS OFF) INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum)
|
||||
SELECT c1, c2, c3, c4, -1::float AS c5,
|
||||
dist_func(c1, 4) c6,
|
||||
sum(cardinality),
|
||||
sum(sum)
|
||||
FROM source_table
|
||||
GROUP BY c1, c2, c3, c4, c5, c6
|
||||
ON CONFLICT(c1, c2, c3, c4, c5, c6)
|
||||
DO UPDATE SET
|
||||
cardinality = enriched.cardinality + excluded.cardinality,
|
||||
sum = enriched.sum + excluded.sum;
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: repartition
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
Tasks Shown: One of 4
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> HashAggregate
|
||||
Group Key: c1, c2, c3, c4, '-1'::double precision, insert_select_repartition.dist_func(c1, 4)
|
||||
-> Seq Scan on source_table_4213644 source_table
|
||||
(10 rows)
|
||||
|
||||
-- clean-up
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA insert_select_repartition CASCADE;
|
||||
|
|
|
@ -481,5 +481,84 @@ RESET client_min_messages;
|
|||
|
||||
SELECT count(*) FROM test;
|
||||
|
||||
--
|
||||
-- In the following case we coerce some columns and move uncoerced versions to the
|
||||
-- end of SELECT list. The following case verifies that we rename those columns so
|
||||
-- we don't get "column reference is ambiguous" errors.
|
||||
--
|
||||
|
||||
CREATE TABLE target_table(
|
||||
c1 int,
|
||||
c2 int,
|
||||
c3 timestamp,
|
||||
a int,
|
||||
b int,
|
||||
c int,
|
||||
c4 int,
|
||||
c5 int,
|
||||
c6 int[],
|
||||
cardinality int,
|
||||
sum int,
|
||||
PRIMARY KEY (c1, c2, c3, c4, c5, c6)
|
||||
);
|
||||
|
||||
SET citus.shard_count TO 5;
|
||||
SELECT create_distributed_table('target_table', 'c1');
|
||||
|
||||
CREATE TABLE source_table(
|
||||
c1 int,
|
||||
c2 int,
|
||||
c3 date,
|
||||
c4 int,
|
||||
cardinality int,
|
||||
sum int
|
||||
);
|
||||
|
||||
SET citus.shard_count TO 4;
|
||||
SELECT create_distributed_table('source_table', 'c1');
|
||||
|
||||
CREATE OR REPLACE FUNCTION dist_func(a int, b int) RETURNS int[]
|
||||
AS $$
|
||||
BEGIN
|
||||
RETURN array_fill(a, ARRAY[b]);
|
||||
END;
|
||||
$$
|
||||
LANGUAGE plpgsql STABLE;
|
||||
|
||||
SELECT create_distributed_function('dist_func(int, int)');
|
||||
|
||||
SET client_min_messages TO DEBUG;
|
||||
SET citus.enable_unique_job_ids TO off;
|
||||
INSERT INTO source_table VALUES (1,2, '2020-02-02', 3, 4, 5);
|
||||
INSERT INTO source_table VALUES (1,2, '2020-02-02', 3, 4, 5);
|
||||
INSERT INTO source_table VALUES (3,4, '2020-02-02', 3, 4, 5);
|
||||
|
||||
INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum)
|
||||
SELECT c1, c2, c3, c4, -1::float AS c5,
|
||||
dist_func(c1, 4) c6,
|
||||
sum(cardinality),
|
||||
sum(sum)
|
||||
FROM source_table
|
||||
GROUP BY c1, c2, c3, c4, c5, c6
|
||||
ON CONFLICT(c1, c2, c3, c4, c5, c6)
|
||||
DO UPDATE SET
|
||||
cardinality = enriched.cardinality + excluded.cardinality,
|
||||
sum = enriched.sum + excluded.sum;
|
||||
|
||||
RESET client_min_messages;
|
||||
|
||||
EXPLAIN (COSTS OFF) INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum)
|
||||
SELECT c1, c2, c3, c4, -1::float AS c5,
|
||||
dist_func(c1, 4) c6,
|
||||
sum(cardinality),
|
||||
sum(sum)
|
||||
FROM source_table
|
||||
GROUP BY c1, c2, c3, c4, c5, c6
|
||||
ON CONFLICT(c1, c2, c3, c4, c5, c6)
|
||||
DO UPDATE SET
|
||||
cardinality = enriched.cardinality + excluded.cardinality,
|
||||
sum = enriched.sum + excluded.sum;
|
||||
|
||||
-- clean-up
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA insert_select_repartition CASCADE;
|
||||
|
|
Loading…
Reference in New Issue