From 87dc88f8375b71f5d2c909adfe33a4c94ff28d0a Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Tue, 18 Jul 2023 16:23:53 -0700 Subject: [PATCH] Isolate schema sharding/MERGE tests into a new file, and use the new GUC parameter --- .../distributed/planner/merge_planner.c | 6 +- src/test/regress/expected/merge.out | 189 --------------- .../expected/merge_schema_sharding.out | 221 ++++++++++++++++++ .../expected/merge_schema_sharding_0.out | 6 + src/test/regress/multi_schedule | 2 +- src/test/regress/sql/merge.sql | 112 --------- .../regress/sql/merge_schema_sharding.sql | 148 ++++++++++++ 7 files changed, 379 insertions(+), 305 deletions(-) create mode 100644 src/test/regress/expected/merge_schema_sharding.out create mode 100644 src/test/regress/expected/merge_schema_sharding_0.out create mode 100644 src/test/regress/sql/merge_schema_sharding.sql diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index 6a80a7c33..213e914d9 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -1184,9 +1184,9 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, { if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED)) { - ereport(ERROR, (errmsg("MERGE operation on non-colocated " - "distributed table(s) without a shard " - "key is not yet supported"))); + ereport(ERROR, (errmsg("MERGE operation across distributed schemas " + "or with a row-based distributed table is " + "not yet supported"))); } /* Get all the Join conditions from the ON clause */ diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 3cb69936c..0fd5ba3c3 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -3816,195 +3816,6 @@ UPDATE SET val = dist_source.val WHEN NOT MATCHED THEN INSERT VALUES(dist_source.id, dist_source.val); ERROR: For MERGE command, append/range distribution table is not supported yet --- test merge with single-shard tables -CREATE SCHEMA query_single_shard_table; -SET search_path TO query_single_shard_table; -CREATE TABLE nullkey_c1_t1(a int, b int); -CREATE TABLE nullkey_c1_t2(a int, b int); -SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -CREATE TABLE nullkey_c2_t1(a int, b int); -CREATE TABLE nullkey_c2_t2(a int, b int); -SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -CREATE TABLE reference_table(a int, b int); -CREATE TABLE distributed_table(a int, b int); -CREATE TABLE citus_local_table(a int, b int); -SELECT create_reference_table('reference_table'); - create_reference_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_table('distributed_table', 'a'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT citus_add_local_table_to_metadata('citus_local_table'); - citus_add_local_table_to_metadata ---------------------------------------------------------------------- - -(1 row) - -SET client_min_messages TO DEBUG2; -INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; -DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Collecting INSERT ... SELECT results on coordinator -INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i; -DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Collecting INSERT ... SELECT results on coordinator -INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; -DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Collecting INSERT ... SELECT results on coordinator -CREATE TABLE postgres_local_table(a int, b int); -INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; --- with a colocated table -MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) -WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b; -DEBUG: -DEBUG: Creating MERGE router plan -MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) -WHEN MATCHED THEN DELETE; -DEBUG: -DEBUG: Creating MERGE router plan -MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) -WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b -WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); -DEBUG: -DEBUG: Creating MERGE router plan -MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) -WHEN MATCHED THEN DELETE -WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); -DEBUG: -DEBUG: Creating MERGE router plan --- with non-colocated single-shard table -MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a) -WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b; -DEBUG: Distributed tables are not co-located, try repartitioning -DEBUG: For MERGE command, all the distributed tables must be colocated -DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported -MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a) -WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b -WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b); -DEBUG: Distributed tables are not co-located, try repartitioning -DEBUG: For MERGE command, all the distributed tables must be colocated -DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported --- with a distributed table -MERGE INTO nullkey_c1_t1 USING distributed_table ON (nullkey_c1_t1.a = distributed_table.a) -WHEN MATCHED THEN UPDATE SET b = distributed_table.b -WHEN NOT MATCHED THEN INSERT VALUES (distributed_table.a, distributed_table.b); -DEBUG: Distributed tables are not co-located, try repartitioning -DEBUG: For MERGE command, all the distributed tables must be colocated -DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported -MERGE INTO distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = distributed_table.a) -WHEN MATCHED THEN DELETE -WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); -DEBUG: Distributed tables are not co-located, try repartitioning -DEBUG: For MERGE command, all the distributed tables must be colocated -DEBUG: Creating MERGE repartition plan -DEBUG: Using column - index:0 from the source list to redistribute -DEBUG: Distributed planning for a fast-path router query -DEBUG: Creating router plan -DEBUG: Collect source query results on coordinator -DEBUG: Create a MERGE task list that needs to be routed -DEBUG: -DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000189 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000189'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) -DEBUG: -DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000190 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000190'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) -DEBUG: -DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000191 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000191'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) -DEBUG: -DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000192 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000192'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) --- with a reference table -MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a) -WHEN MATCHED THEN UPDATE SET b = reference_table.b; -DEBUG: A mix of distributed and reference table, try repartitioning -DEBUG: A mix of distributed and reference table, routable query is not possible -DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported -MERGE INTO reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = reference_table.a) -WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b -WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); -ERROR: Reference table as target is not allowed in MERGE command --- with a citus local table -MERGE INTO nullkey_c1_t1 USING citus_local_table ON (nullkey_c1_t1.a = citus_local_table.a) -WHEN MATCHED THEN UPDATE SET b = citus_local_table.b; -DEBUG: A mix of distributed and local table, try repartitioning -DEBUG: A mix of distributed and citus-local table, routable query is not possible -DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported -MERGE INTO citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = citus_local_table.a) -WHEN MATCHED THEN DELETE; -DEBUG: A mix of distributed and local table, try repartitioning -DEBUG: A mix of distributed and citus-local table, routable query is not possible -DEBUG: Creating MERGE repartition plan -ERROR: MERGE involving repartition of rows is supported only if the target is distributed --- with a postgres table -MERGE INTO nullkey_c1_t1 USING postgres_local_table ON (nullkey_c1_t1.a = postgres_local_table.a) -WHEN MATCHED THEN UPDATE SET b = postgres_local_table.b; -DEBUG: There is only one distributed table, merge is not pushable, try repartitioning -DEBUG: Creating MERGE repartition plan -ERROR: MERGE INTO an distributed table from Postgres table is not yet supported -MERGE INTO postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = postgres_local_table.a) -WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b -WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); -DEBUG: There is only one distributed table, merge is not pushable, try repartitioning -DEBUG: Creating MERGE repartition plan -ERROR: MERGE involving repartition of rows is supported only if the target is distributed --- using ctes -WITH cte AS ( - SELECT * FROM nullkey_c1_t1 -) -MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) -WHEN MATCHED THEN UPDATE SET b = cte.b; -DEBUG: -DEBUG: Creating MERGE router plan -WITH cte AS ( - SELECT * FROM distributed_table -) -MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) -WHEN MATCHED THEN UPDATE SET b = cte.b; -DEBUG: Distributed tables are not co-located, try repartitioning -DEBUG: For MERGE command, all the distributed tables must be colocated -DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported -WITH cte AS materialized ( - SELECT * FROM distributed_table -) -MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) -WHEN MATCHED THEN UPDATE SET b = cte.b; -DEBUG: Distributed tables are not co-located, try repartitioning -DEBUG: For MERGE command, all the distributed tables must be colocated -DEBUG: Creating MERGE repartition plan -ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported -SET client_min_messages TO WARNING; -DROP SCHEMA query_single_shard_table CASCADE; -SET search_path TO merge_schema; -- Test Columnar table CREATE TABLE target_columnar(cid int, name text) USING columnar; SELECT create_distributed_table('target_columnar', 'cid'); diff --git a/src/test/regress/expected/merge_schema_sharding.out b/src/test/regress/expected/merge_schema_sharding.out new file mode 100644 index 000000000..fb4c0b235 --- /dev/null +++ b/src/test/regress/expected/merge_schema_sharding.out @@ -0,0 +1,221 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif +-- MERGE command performs a join from data_source to target_table_name +DROP SCHEMA IF EXISTS schema_shard_table1 CASCADE; +NOTICE: schema "schema_shard_table1" does not exist, skipping +DROP SCHEMA IF EXISTS schema_shard_table2 CASCADE; +NOTICE: schema "schema_shard_table2" does not exist, skipping +DROP SCHEMA IF EXISTS schema_shard_table CASCADE; +NOTICE: schema "schema_shard_table" does not exist, skipping +-- test merge with schema-shard tables +SET citus.shard_replication_factor TO 1; +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.next_shard_id TO 4005000; +SET citus.enable_repartition_joins TO true; +CREATE SCHEMA schema_shard_table; +SET search_path TO schema_shard_table; +CREATE TABLE reference_table(a int, b int); +CREATE TABLE distributed_table(a int, b int); +CREATE TABLE citus_local_table(a int, b int); +CREATE TABLE postgres_local_table(a int, b int); +INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; +INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i; +INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; +INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; +SELECT create_reference_table('reference_table'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$schema_shard_table.reference_table$$) + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('distributed_table', 'a'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$schema_shard_table.distributed_table$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('citus_local_table'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +SET citus.enable_schema_based_sharding TO ON; +CREATE SCHEMA schema_shard_table1; +CREATE SCHEMA schema_shard_table2; +SET search_path TO schema_shard_table1; +CREATE TABLE nullkey_c1_t1(a int, b int); +CREATE TABLE nullkey_c1_t2(a int, b int); +INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(0, 5) i; +INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(3, 8) i; +SET search_path TO schema_shard_table2; +CREATE TABLE nullkey_c2_t1(a int, b int); +CREATE TABLE nullkey_c2_t2(a int, b int); +INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(0, 5) i; +INSERT INTO nullkey_c2_t2 SELECT i, i FROM generate_series(3, 8) i; +SET search_path TO schema_shard_table1; +-- with a colocated table +SET client_min_messages TO DEBUG2; +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b; +DEBUG: +DEBUG: Creating MERGE router plan +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN DELETE; +DEBUG: +DEBUG: Creating MERGE router plan +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); +DEBUG: +DEBUG: Creating MERGE router plan +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); +DEBUG: +DEBUG: Creating MERGE router plan +SET search_path TO schema_shard_table2; +-- with non-colocated schema-shard table +MERGE INTO schema_shard_table1.nullkey_c1_t1 USING nullkey_c2_t1 ON (schema_shard_table1.nullkey_c1_t1.a = nullkey_c2_t1.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b; +DEBUG: Distributed tables are not co-located, try repartitioning +DEBUG: For MERGE command, all the distributed tables must be colocated +DEBUG: Creating MERGE repartition plan +ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +MERGE INTO schema_shard_table1.nullkey_c1_t1 USING nullkey_c2_t1 ON (schema_shard_table1.nullkey_c1_t1.a = nullkey_c2_t1.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b); +DEBUG: Distributed tables are not co-located, try repartitioning +DEBUG: For MERGE command, all the distributed tables must be colocated +DEBUG: Creating MERGE repartition plan +ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +-- with a distributed table +SET search_path TO schema_shard_table1; +MERGE INTO nullkey_c1_t1 USING schema_shard_table.distributed_table ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a) +WHEN MATCHED THEN UPDATE SET b = schema_shard_table.distributed_table.b +WHEN NOT MATCHED THEN INSERT VALUES (schema_shard_table.distributed_table.a, schema_shard_table.distributed_table.b); +DEBUG: Distributed tables are not co-located, try repartitioning +DEBUG: For MERGE command, all the distributed tables must be colocated +DEBUG: Creating MERGE repartition plan +ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +MERGE INTO schema_shard_table.distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a) +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); +DEBUG: Distributed tables are not co-located, try repartitioning +DEBUG: For MERGE command, all the distributed tables must be colocated +DEBUG: Creating MERGE repartition plan +DEBUG: Using column - index:0 from the source list to redistribute +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Collect source query results on coordinator +DEBUG: Create a MERGE task list that needs to be routed +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table.distributed_table_4005001 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005001'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table.distributed_table_4005002 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005002'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table.distributed_table_4005003 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005003'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) +DEBUG: +DEBUG: distributed statement: MERGE INTO schema_shard_table.distributed_table_4005004 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005004'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b) +DEBUG: Execute MERGE task list +RESET client_min_messages; +SELECT count(*) FROM schema_shard_table.distributed_table WHERE a in (0, 1, 2); + count +--------------------------------------------------------------------- + 3 +(1 row) + +MERGE INTO schema_shard_table.distributed_table +USING (SELECT s1.a AS s1a, s2.b AS s2b + FROM nullkey_c1_t1 s1 JOIN schema_shard_table2.nullkey_c2_t1 s2 + ON s1.a = s2.a) src +ON (src.s1a = schema_shard_table.distributed_table.a) +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (src.s1a, src.s2b); +-- Three matching rows must be deleted +SELECT count(*) FROM schema_shard_table.distributed_table WHERE a in (0, 1, 2); + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- with a reference table +SET client_min_messages TO DEBUG2; +MERGE INTO nullkey_c1_t1 USING schema_shard_table.reference_table ON (nullkey_c1_t1.a = schema_shard_table.reference_table.a) +WHEN MATCHED THEN UPDATE SET b = schema_shard_table.reference_table.b; +DEBUG: A mix of distributed and reference table, try repartitioning +DEBUG: A mix of distributed and reference table, routable query is not possible +DEBUG: Creating MERGE repartition plan +ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +MERGE INTO schema_shard_table.reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.reference_table.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); +ERROR: Reference table as target is not allowed in MERGE command +-- with a citus local table +MERGE INTO nullkey_c1_t1 USING schema_shard_table.citus_local_table ON (nullkey_c1_t1.a = schema_shard_table.citus_local_table.a) +WHEN MATCHED THEN UPDATE SET b = schema_shard_table.citus_local_table.b; +DEBUG: A mix of distributed and local table, try repartitioning +DEBUG: A mix of distributed and citus-local table, routable query is not possible +DEBUG: Creating MERGE repartition plan +ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +MERGE INTO schema_shard_table.citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.citus_local_table.a) +WHEN MATCHED THEN DELETE; +DEBUG: A mix of distributed and local table, try repartitioning +DEBUG: A mix of distributed and citus-local table, routable query is not possible +DEBUG: Creating MERGE repartition plan +ERROR: MERGE involving repartition of rows is supported only if the target is distributed +-- with a postgres table +MERGE INTO nullkey_c1_t1 USING schema_shard_table.postgres_local_table ON (nullkey_c1_t1.a = schema_shard_table.postgres_local_table.a) +WHEN MATCHED THEN UPDATE SET b = schema_shard_table.postgres_local_table.b; +DEBUG: There is only one distributed table, merge is not pushable, try repartitioning +DEBUG: Creating MERGE repartition plan +ERROR: MERGE INTO an distributed table from Postgres table is not yet supported +MERGE INTO schema_shard_table.postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.postgres_local_table.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); +DEBUG: There is only one distributed table, merge is not pushable, try repartitioning +DEBUG: Creating MERGE repartition plan +ERROR: MERGE involving repartition of rows is supported only if the target is distributed +-- using ctes +WITH cte AS ( + SELECT * FROM nullkey_c1_t1 +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; +DEBUG: +DEBUG: Creating MERGE router plan +WITH cte AS ( + SELECT * FROM schema_shard_table.distributed_table +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; +DEBUG: Distributed tables are not co-located, try repartitioning +DEBUG: For MERGE command, all the distributed tables must be colocated +DEBUG: Creating MERGE repartition plan +ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +WITH cte AS materialized ( + SELECT * FROM schema_shard_table.distributed_table +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; +DEBUG: Distributed tables are not co-located, try repartitioning +DEBUG: For MERGE command, all the distributed tables must be colocated +DEBUG: Creating MERGE repartition plan +ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported +SET client_min_messages TO WARNING; +DROP SCHEMA schema_shard_table1 CASCADE; +DROP SCHEMA schema_shard_table2 CASCADE; +DROP SCHEMA schema_shard_table CASCADE; diff --git a/src/test/regress/expected/merge_schema_sharding_0.out b/src/test/regress/expected/merge_schema_sharding_0.out new file mode 100644 index 000000000..a7e3fbf20 --- /dev/null +++ b/src/test/regress/expected/merge_schema_sharding_0.out @@ -0,0 +1,6 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 4d42dbc78..19040f1e0 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -112,7 +112,7 @@ test: clock # MERGE tests test: merge pgmerge merge_repartition2 -test: merge_repartition1 +test: merge_repartition1 merge_schema_sharding test: merge_partition_tables # --------- diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 4fb911736..62722c6c8 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -2366,118 +2366,6 @@ UPDATE SET val = dist_source.val WHEN NOT MATCHED THEN INSERT VALUES(dist_source.id, dist_source.val); --- test merge with single-shard tables - -CREATE SCHEMA query_single_shard_table; - -SET search_path TO query_single_shard_table; - -CREATE TABLE nullkey_c1_t1(a int, b int); -CREATE TABLE nullkey_c1_t2(a int, b int); -SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none'); -SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1'); - -CREATE TABLE nullkey_c2_t1(a int, b int); -CREATE TABLE nullkey_c2_t2(a int, b int); -SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none'); -SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null); - -CREATE TABLE reference_table(a int, b int); -CREATE TABLE distributed_table(a int, b int); -CREATE TABLE citus_local_table(a int, b int); -SELECT create_reference_table('reference_table'); -SELECT create_distributed_table('distributed_table', 'a'); -SELECT citus_add_local_table_to_metadata('citus_local_table'); - -SET client_min_messages TO DEBUG2; -INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; - -INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i; - -INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; - -CREATE TABLE postgres_local_table(a int, b int); -INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; - --- with a colocated table -MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) -WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b; - -MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) -WHEN MATCHED THEN DELETE; - -MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) -WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b -WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); - -MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) -WHEN MATCHED THEN DELETE -WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); - --- with non-colocated single-shard table -MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a) -WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b; - -MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a) -WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b -WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b); - --- with a distributed table -MERGE INTO nullkey_c1_t1 USING distributed_table ON (nullkey_c1_t1.a = distributed_table.a) -WHEN MATCHED THEN UPDATE SET b = distributed_table.b -WHEN NOT MATCHED THEN INSERT VALUES (distributed_table.a, distributed_table.b); - -MERGE INTO distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = distributed_table.a) -WHEN MATCHED THEN DELETE -WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); - --- with a reference table -MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a) -WHEN MATCHED THEN UPDATE SET b = reference_table.b; - -MERGE INTO reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = reference_table.a) -WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b -WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); - --- with a citus local table -MERGE INTO nullkey_c1_t1 USING citus_local_table ON (nullkey_c1_t1.a = citus_local_table.a) -WHEN MATCHED THEN UPDATE SET b = citus_local_table.b; - -MERGE INTO citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = citus_local_table.a) -WHEN MATCHED THEN DELETE; - --- with a postgres table -MERGE INTO nullkey_c1_t1 USING postgres_local_table ON (nullkey_c1_t1.a = postgres_local_table.a) -WHEN MATCHED THEN UPDATE SET b = postgres_local_table.b; - -MERGE INTO postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = postgres_local_table.a) -WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b -WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); - --- using ctes -WITH cte AS ( - SELECT * FROM nullkey_c1_t1 -) -MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) -WHEN MATCHED THEN UPDATE SET b = cte.b; - -WITH cte AS ( - SELECT * FROM distributed_table -) -MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) -WHEN MATCHED THEN UPDATE SET b = cte.b; - -WITH cte AS materialized ( - SELECT * FROM distributed_table -) -MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) -WHEN MATCHED THEN UPDATE SET b = cte.b; - -SET client_min_messages TO WARNING; -DROP SCHEMA query_single_shard_table CASCADE; - -SET search_path TO merge_schema; - -- Test Columnar table CREATE TABLE target_columnar(cid int, name text) USING columnar; SELECT create_distributed_table('target_columnar', 'cid'); diff --git a/src/test/regress/sql/merge_schema_sharding.sql b/src/test/regress/sql/merge_schema_sharding.sql new file mode 100644 index 000000000..8ea947c1c --- /dev/null +++ b/src/test/regress/sql/merge_schema_sharding.sql @@ -0,0 +1,148 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif + +-- MERGE command performs a join from data_source to target_table_name +DROP SCHEMA IF EXISTS schema_shard_table1 CASCADE; +DROP SCHEMA IF EXISTS schema_shard_table2 CASCADE; +DROP SCHEMA IF EXISTS schema_shard_table CASCADE; + +-- test merge with schema-shard tables +SET citus.shard_replication_factor TO 1; +SET citus.max_adaptive_executor_pool_size TO 1; +SET citus.next_shard_id TO 4005000; +SET citus.enable_repartition_joins TO true; + +CREATE SCHEMA schema_shard_table; +SET search_path TO schema_shard_table; +CREATE TABLE reference_table(a int, b int); +CREATE TABLE distributed_table(a int, b int); +CREATE TABLE citus_local_table(a int, b int); +CREATE TABLE postgres_local_table(a int, b int); + +INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i; +INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i; +INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i; +INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i; + +SELECT create_reference_table('reference_table'); +SELECT create_distributed_table('distributed_table', 'a'); +SELECT citus_add_local_table_to_metadata('citus_local_table'); + +SET citus.enable_schema_based_sharding TO ON; +CREATE SCHEMA schema_shard_table1; +CREATE SCHEMA schema_shard_table2; + +SET search_path TO schema_shard_table1; +CREATE TABLE nullkey_c1_t1(a int, b int); +CREATE TABLE nullkey_c1_t2(a int, b int); +INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(0, 5) i; +INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(3, 8) i; + +SET search_path TO schema_shard_table2; +CREATE TABLE nullkey_c2_t1(a int, b int); +CREATE TABLE nullkey_c2_t2(a int, b int); +INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(0, 5) i; +INSERT INTO nullkey_c2_t2 SELECT i, i FROM generate_series(3, 8) i; + +SET search_path TO schema_shard_table1; + +-- with a colocated table +SET client_min_messages TO DEBUG2; +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b; + +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN DELETE; + +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); + +MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a) +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b); + +SET search_path TO schema_shard_table2; + +-- with non-colocated schema-shard table +MERGE INTO schema_shard_table1.nullkey_c1_t1 USING nullkey_c2_t1 ON (schema_shard_table1.nullkey_c1_t1.a = nullkey_c2_t1.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b; + +MERGE INTO schema_shard_table1.nullkey_c1_t1 USING nullkey_c2_t1 ON (schema_shard_table1.nullkey_c1_t1.a = nullkey_c2_t1.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b); + +-- with a distributed table +SET search_path TO schema_shard_table1; +MERGE INTO nullkey_c1_t1 USING schema_shard_table.distributed_table ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a) +WHEN MATCHED THEN UPDATE SET b = schema_shard_table.distributed_table.b +WHEN NOT MATCHED THEN INSERT VALUES (schema_shard_table.distributed_table.a, schema_shard_table.distributed_table.b); + +MERGE INTO schema_shard_table.distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a) +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); + +RESET client_min_messages; +SELECT count(*) FROM schema_shard_table.distributed_table WHERE a in (0, 1, 2); +MERGE INTO schema_shard_table.distributed_table +USING (SELECT s1.a AS s1a, s2.b AS s2b + FROM nullkey_c1_t1 s1 JOIN schema_shard_table2.nullkey_c2_t1 s2 + ON s1.a = s2.a) src +ON (src.s1a = schema_shard_table.distributed_table.a) +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT VALUES (src.s1a, src.s2b); +-- Three matching rows must be deleted +SELECT count(*) FROM schema_shard_table.distributed_table WHERE a in (0, 1, 2); + +-- with a reference table +SET client_min_messages TO DEBUG2; +MERGE INTO nullkey_c1_t1 USING schema_shard_table.reference_table ON (nullkey_c1_t1.a = schema_shard_table.reference_table.a) +WHEN MATCHED THEN UPDATE SET b = schema_shard_table.reference_table.b; + +MERGE INTO schema_shard_table.reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.reference_table.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); + +-- with a citus local table +MERGE INTO nullkey_c1_t1 USING schema_shard_table.citus_local_table ON (nullkey_c1_t1.a = schema_shard_table.citus_local_table.a) +WHEN MATCHED THEN UPDATE SET b = schema_shard_table.citus_local_table.b; + +MERGE INTO schema_shard_table.citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.citus_local_table.a) +WHEN MATCHED THEN DELETE; + +-- with a postgres table +MERGE INTO nullkey_c1_t1 USING schema_shard_table.postgres_local_table ON (nullkey_c1_t1.a = schema_shard_table.postgres_local_table.a) +WHEN MATCHED THEN UPDATE SET b = schema_shard_table.postgres_local_table.b; + +MERGE INTO schema_shard_table.postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.postgres_local_table.a) +WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b +WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b); + +-- using ctes +WITH cte AS ( + SELECT * FROM nullkey_c1_t1 +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; + +WITH cte AS ( + SELECT * FROM schema_shard_table.distributed_table +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; + +WITH cte AS materialized ( + SELECT * FROM schema_shard_table.distributed_table +) +MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a) +WHEN MATCHED THEN UPDATE SET b = cte.b; + +SET client_min_messages TO WARNING; +DROP SCHEMA schema_shard_table1 CASCADE; +DROP SCHEMA schema_shard_table2 CASCADE; +DROP SCHEMA schema_shard_table CASCADE;