mirror of https://github.com/citusdata/citus.git
Repartitioned INSERT/SELECT: Test CTEs
parent
494cc383cc
commit
fe548b762f
|
@ -149,9 +149,8 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
||||||
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If the type of insert column and target table's column type is
|
* Cast types of insert target list and select projection list to
|
||||||
* different from each other. Cast insert column't type to target
|
* match the column types of the target relation.
|
||||||
* table's column
|
|
||||||
*/
|
*/
|
||||||
selectQuery->targetList =
|
selectQuery->targetList =
|
||||||
AddInsertSelectCasts(insertSelectQuery->targetList,
|
AddInsertSelectCasts(insertSelectQuery->targetList,
|
||||||
|
@ -1045,8 +1044,8 @@ IsRedistributablePlan(Plan *selectPlan)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WrapForProjection wraps task->queryString to only select given projected
|
* WrapTaskListForProjection wraps task->queryString to only select given
|
||||||
* columns. It modifies the taskList.
|
* projected columns. It modifies the taskList.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
|
WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
|
||||||
|
|
|
@ -249,15 +249,11 @@ SELECT create_distributed_table('target_table', 'a');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SET client_min_messages TO DEBUG2;
|
SET client_min_messages TO DEBUG1;
|
||||||
INSERT INTO target_table SELECT mapped_key, c FROM source_table;
|
INSERT INTO target_table SELECT mapped_key, c FROM source_table;
|
||||||
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||||
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
|
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
|
||||||
DEBUG: performing repartitioned INSERT ... SELECT
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
DEBUG: partitioning SELECT query by column index 0 with name 'mapped_key'
|
|
||||||
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213604 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213602_to_0,repartitioned_results_from_4213603_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[])
|
|
||||||
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213605 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213601_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[])
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
SELECT * FROM target_table ORDER BY a;
|
SELECT * FROM target_table ORDER BY a;
|
||||||
a | b
|
a | b
|
||||||
|
@ -286,14 +282,10 @@ RESET citus.log_remote_commands; RESET client_min_messages;
|
||||||
DROP TABLE results;
|
DROP TABLE results;
|
||||||
-- now verify that we don't write the extra columns to the intermediate result files and
|
-- now verify that we don't write the extra columns to the intermediate result files and
|
||||||
-- insertion to the target works fine.
|
-- insertion to the target works fine.
|
||||||
SET client_min_messages TO DEBUG2;
|
SET client_min_messages TO DEBUG1;
|
||||||
INSERT INTO target_table SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a;
|
INSERT INTO target_table SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a;
|
||||||
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
|
||||||
DEBUG: performing repartitioned INSERT ... SELECT
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
DEBUG: partitioning SELECT query by column index 0 with name 'max'
|
|
||||||
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213604 AS citus_table_alias (a, b) SELECT max, array_agg FROM read_intermediate_results('{repartitioned_results_from_4213602_to_0,repartitioned_results_from_4213603_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(max integer, array_agg integer[])
|
|
||||||
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213605 AS citus_table_alias (a, b) SELECT max, array_agg FROM read_intermediate_results('{repartitioned_results_from_4213601_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(max integer, array_agg integer[])
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
SELECT * FROM target_table ORDER BY a;
|
SELECT * FROM target_table ORDER BY a;
|
||||||
a | b
|
a | b
|
||||||
|
@ -482,23 +474,16 @@ SELECT * FROM target_table ORDER BY a;
|
||||||
-- repartitioned INSERT/SELECT with RETURNING
|
-- repartitioned INSERT/SELECT with RETURNING
|
||||||
--
|
--
|
||||||
TRUNCATE target_table;
|
TRUNCATE target_table;
|
||||||
SET client_min_messages TO DEBUG2;
|
SET client_min_messages TO DEBUG1;
|
||||||
WITH c AS (
|
WITH c AS (
|
||||||
INSERT INTO target_table
|
INSERT INTO target_table
|
||||||
SELECT mapped_key, c FROM source_table
|
SELECT mapped_key, c FROM source_table
|
||||||
RETURNING *)
|
RETURNING *)
|
||||||
SELECT * FROM c ORDER by a;
|
SELECT * FROM c ORDER by a;
|
||||||
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
|
|
||||||
DEBUG: generating subplan XXX_1 for CTE c: INSERT INTO insert_select_repartition.target_table (a, b) SELECT mapped_key, c FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b
|
DEBUG: generating subplan XXX_1 for CTE c: INSERT INTO insert_select_repartition.target_table (a, b) SELECT mapped_key, c FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b
|
||||||
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer[])) c ORDER BY a
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer[])) c ORDER BY a
|
||||||
DEBUG: Creating router plan
|
|
||||||
DEBUG: Plan is router executable
|
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
|
||||||
DEBUG: performing repartitioned INSERT ... SELECT
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
DEBUG: partitioning SELECT query by column index 0 with name 'mapped_key'
|
|
||||||
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213604 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213602_to_0,repartitioned_results_from_4213603_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[]) RETURNING citus_table_alias.a, citus_table_alias.b
|
|
||||||
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213605 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213601_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[]) RETURNING citus_table_alias.a, citus_table_alias.b
|
|
||||||
a | b
|
a | b
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
-4 | {3}
|
-4 | {3}
|
||||||
|
@ -508,5 +493,33 @@ DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_tabl
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
--
|
||||||
|
-- in combination with CTEs
|
||||||
|
--
|
||||||
|
TRUNCATE target_table;
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
WITH t AS (
|
||||||
|
SELECT mapped_key, a, c FROM source_table
|
||||||
|
WHERE a > floor(random())
|
||||||
|
)
|
||||||
|
INSERT INTO target_table
|
||||||
|
SELECT mapped_key, c FROM t NATURAL JOIN source_table;
|
||||||
|
DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE t: SELECT mapped_key, a, c FROM insert_select_repartition.source_table WHERE ((a)::double precision OPERATOR(pg_catalog.>) floor(random()))
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key, (c)::integer[] AS auto_coerced_by_citus_1 FROM (SELECT t.mapped_key, t.c FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery
|
||||||
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT * FROM target_table ORDER BY a;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
-4 | {3}
|
||||||
|
-3 | {}
|
||||||
|
-2 | {4,6}
|
||||||
|
-1 | {1,2,3}
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- The case where select query has a GROUP BY ...
|
||||||
|
--
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA insert_select_repartition CASCADE;
|
DROP SCHEMA insert_select_repartition CASCADE;
|
||||||
|
|
|
@ -117,7 +117,7 @@ SET citus.shard_count TO 2;
|
||||||
CREATE TABLE target_table(a int, b int[]);
|
CREATE TABLE target_table(a int, b int[]);
|
||||||
SELECT create_distributed_table('target_table', 'a');
|
SELECT create_distributed_table('target_table', 'a');
|
||||||
|
|
||||||
SET client_min_messages TO DEBUG2;
|
SET client_min_messages TO DEBUG1;
|
||||||
INSERT INTO target_table SELECT mapped_key, c FROM source_table;
|
INSERT INTO target_table SELECT mapped_key, c FROM source_table;
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
@ -138,7 +138,7 @@ DROP TABLE results;
|
||||||
|
|
||||||
-- now verify that we don't write the extra columns to the intermediate result files and
|
-- now verify that we don't write the extra columns to the intermediate result files and
|
||||||
-- insertion to the target works fine.
|
-- insertion to the target works fine.
|
||||||
SET client_min_messages TO DEBUG2;
|
SET client_min_messages TO DEBUG1;
|
||||||
INSERT INTO target_table SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a;
|
INSERT INTO target_table SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a;
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
@ -221,7 +221,7 @@ SELECT * FROM target_table ORDER BY a;
|
||||||
-- repartitioned INSERT/SELECT with RETURNING
|
-- repartitioned INSERT/SELECT with RETURNING
|
||||||
--
|
--
|
||||||
TRUNCATE target_table;
|
TRUNCATE target_table;
|
||||||
SET client_min_messages TO DEBUG2;
|
SET client_min_messages TO DEBUG1;
|
||||||
WITH c AS (
|
WITH c AS (
|
||||||
INSERT INTO target_table
|
INSERT INTO target_table
|
||||||
SELECT mapped_key, c FROM source_table
|
SELECT mapped_key, c FROM source_table
|
||||||
|
@ -229,5 +229,23 @@ WITH c AS (
|
||||||
SELECT * FROM c ORDER by a;
|
SELECT * FROM c ORDER by a;
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- in combination with CTEs
|
||||||
|
--
|
||||||
|
TRUNCATE target_table;
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
WITH t AS (
|
||||||
|
SELECT mapped_key, a, c FROM source_table
|
||||||
|
WHERE a > floor(random())
|
||||||
|
)
|
||||||
|
INSERT INTO target_table
|
||||||
|
SELECT mapped_key, c FROM t NATURAL JOIN source_table;
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT * FROM target_table ORDER BY a;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- The case where select query has a GROUP BY ...
|
||||||
|
--
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA insert_select_repartition CASCADE;
|
DROP SCHEMA insert_select_repartition CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue