mirror of https://github.com/citusdata/citus.git
Rename discarded target list items in repartitioned INSERT/SELECT
parent
1aa89d3242
commit
9dd14fa90d
|
@ -770,6 +770,18 @@ AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
||||||
if (selectEntry->ressortgroupref != 0)
|
if (selectEntry->ressortgroupref != 0)
|
||||||
{
|
{
|
||||||
selectEntry->resjunk = true;
|
selectEntry->resjunk = true;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This entry might still end up in the SELECT output list, so
|
||||||
|
* rename it to avoid ambiguity.
|
||||||
|
*
|
||||||
|
* See https://github.com/citusdata/citus/pull/3470.
|
||||||
|
*/
|
||||||
|
resnameString = makeStringInfo();
|
||||||
|
appendStringInfo(resnameString, "discarded_target_item_%d",
|
||||||
|
targetEntryIndex);
|
||||||
|
selectEntry->resname = resnameString->data;
|
||||||
|
|
||||||
nonProjectedEntries = lappend(nonProjectedEntries, selectEntry);
|
nonProjectedEntries = lappend(nonProjectedEntries, selectEntry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1046,5 +1046,77 @@ SELECT count(*) FROM test;
|
||||||
20
|
20
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- In the following case we coerce some columns and move uncoerced versions to the
|
||||||
|
-- end of SELECT list. The following case verifies that we rename those columns so
|
||||||
|
-- we don't get "column reference is ambiguous" errors.
|
||||||
|
--
|
||||||
|
CREATE TABLE target_table(
|
||||||
|
c1 int,
|
||||||
|
c2 int,
|
||||||
|
c3 timestamp,
|
||||||
|
a int,
|
||||||
|
b int,
|
||||||
|
c int,
|
||||||
|
c4 int,
|
||||||
|
c5 int,
|
||||||
|
c6 int[],
|
||||||
|
cardinality int,
|
||||||
|
sum int,
|
||||||
|
PRIMARY KEY (c1, c2, c3, c4, c5, c6)
|
||||||
|
);
|
||||||
|
SET citus.shard_count TO 5;
|
||||||
|
SELECT create_distributed_table('target_table', 'c1');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE source_table(
|
||||||
|
c1 int,
|
||||||
|
c2 int,
|
||||||
|
c3 date,
|
||||||
|
c4 int,
|
||||||
|
cardinality int,
|
||||||
|
sum int
|
||||||
|
);
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SELECT create_distributed_table('source_table', 'c1');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION dist_func(a int, b int) RETURNS int[]
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
RETURN array_fill(a, ARRAY[b]);
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql STABLE;
|
||||||
|
SELECT create_distributed_function('dist_func(int, int)');
|
||||||
|
create_distributed_function
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG;
|
||||||
|
INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum)
|
||||||
|
SELECT c1, c2, c3, c4, -1::float AS c5,
|
||||||
|
dist_func(c1, 4) c6,
|
||||||
|
sum(cardinality),
|
||||||
|
sum(sum)
|
||||||
|
FROM source_table
|
||||||
|
GROUP BY c1, c2, c3, c4, c5, c6
|
||||||
|
ON CONFLICT(c1, c2, c3, c4, c5, c6)
|
||||||
|
DO UPDATE SET
|
||||||
|
cardinality = enriched.cardinality + excluded.cardinality,
|
||||||
|
sum = enriched.sum + excluded.sum;
|
||||||
|
DEBUG: rehashing catalog cache id 14 for pg_opclass; 17 tups, 8 buckets at character 224
|
||||||
|
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
|
DEBUG: partitioning SELECT query by column index 0 with name 'c1'
|
||||||
|
-- clean-up
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA insert_select_repartition CASCADE;
|
DROP SCHEMA insert_select_repartition CASCADE;
|
||||||
|
|
|
@ -481,5 +481,66 @@ RESET client_min_messages;
|
||||||
|
|
||||||
SELECT count(*) FROM test;
|
SELECT count(*) FROM test;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- In the following case we coerce some columns and move uncoerced versions to the
|
||||||
|
-- end of SELECT list. The following case verifies that we rename those columns so
|
||||||
|
-- we don't get "column reference is ambiguous" errors.
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE TABLE target_table(
|
||||||
|
c1 int,
|
||||||
|
c2 int,
|
||||||
|
c3 timestamp,
|
||||||
|
a int,
|
||||||
|
b int,
|
||||||
|
c int,
|
||||||
|
c4 int,
|
||||||
|
c5 int,
|
||||||
|
c6 int[],
|
||||||
|
cardinality int,
|
||||||
|
sum int,
|
||||||
|
PRIMARY KEY (c1, c2, c3, c4, c5, c6)
|
||||||
|
);
|
||||||
|
|
||||||
|
SET citus.shard_count TO 5;
|
||||||
|
SELECT create_distributed_table('target_table', 'c1');
|
||||||
|
|
||||||
|
CREATE TABLE source_table(
|
||||||
|
c1 int,
|
||||||
|
c2 int,
|
||||||
|
c3 date,
|
||||||
|
c4 int,
|
||||||
|
cardinality int,
|
||||||
|
sum int
|
||||||
|
);
|
||||||
|
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SELECT create_distributed_table('source_table', 'c1');
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION dist_func(a int, b int) RETURNS int[]
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
RETURN array_fill(a, ARRAY[b]);
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql STABLE;
|
||||||
|
|
||||||
|
SELECT create_distributed_function('dist_func(int, int)');
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG;
|
||||||
|
INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum)
|
||||||
|
SELECT c1, c2, c3, c4, -1::float AS c5,
|
||||||
|
dist_func(c1, 4) c6,
|
||||||
|
sum(cardinality),
|
||||||
|
sum(sum)
|
||||||
|
FROM source_table
|
||||||
|
GROUP BY c1, c2, c3, c4, c5, c6
|
||||||
|
ON CONFLICT(c1, c2, c3, c4, c5, c6)
|
||||||
|
DO UPDATE SET
|
||||||
|
cardinality = enriched.cardinality + excluded.cardinality,
|
||||||
|
sum = enriched.sum + excluded.sum;
|
||||||
|
|
||||||
|
-- clean-up
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA insert_select_repartition CASCADE;
|
DROP SCHEMA insert_select_repartition CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue