mirror of https://github.com/citusdata/citus.git
Isolate schema sharding/MERGE tests into a new file, and
use the new GUC parameterpull/7065/head
parent
6498e1eb6c
commit
87dc88f837
|
@ -1184,9 +1184,9 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
|
|||
{
|
||||
if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED))
|
||||
{
|
||||
ereport(ERROR, (errmsg("MERGE operation on non-colocated "
|
||||
"distributed table(s) without a shard "
|
||||
"key is not yet supported")));
|
||||
ereport(ERROR, (errmsg("MERGE operation across distributed schemas "
|
||||
"or with a row-based distributed table is "
|
||||
"not yet supported")));
|
||||
}
|
||||
|
||||
/* Get all the Join conditions from the ON clause */
|
||||
|
|
|
@ -3816,195 +3816,6 @@ UPDATE SET val = dist_source.val
|
|||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(dist_source.id, dist_source.val);
|
||||
ERROR: For MERGE command, append/range distribution table is not supported yet
|
||||
-- test merge with single-shard tables
|
||||
CREATE SCHEMA query_single_shard_table;
|
||||
SET search_path TO query_single_shard_table;
|
||||
CREATE TABLE nullkey_c1_t1(a int, b int);
|
||||
CREATE TABLE nullkey_c1_t2(a int, b int);
|
||||
SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE nullkey_c2_t1(a int, b int);
|
||||
CREATE TABLE nullkey_c2_t2(a int, b int);
|
||||
SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null);
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE reference_table(a int, b int);
|
||||
CREATE TABLE distributed_table(a int, b int);
|
||||
CREATE TABLE citus_local_table(a int, b int);
|
||||
SELECT create_reference_table('reference_table');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('distributed_table', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus_add_local_table_to_metadata('citus_local_table');
|
||||
citus_add_local_table_to_metadata
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG2;
|
||||
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
|
||||
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i;
|
||||
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
|
||||
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
CREATE TABLE postgres_local_table(a int, b int);
|
||||
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
|
||||
-- with a colocated table
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b;
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000185 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b>
|
||||
DEBUG: Creating MERGE router plan
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN DELETE;
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000185 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE>
|
||||
DEBUG: Creating MERGE router plan
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000185 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
|
||||
DEBUG: Creating MERGE router plan
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN DELETE
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000185 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
|
||||
DEBUG: Creating MERGE router plan
|
||||
-- with non-colocated single-shard table
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b;
|
||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b);
|
||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported
|
||||
-- with a distributed table
|
||||
MERGE INTO nullkey_c1_t1 USING distributed_table ON (nullkey_c1_t1.a = distributed_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = distributed_table.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (distributed_table.a, distributed_table.b);
|
||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported
|
||||
MERGE INTO distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = distributed_table.a)
|
||||
WHEN MATCHED THEN DELETE
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
DEBUG: Using column - index:0 from the source list to redistribute
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Collect source query results on coordinator
|
||||
DEBUG: Create a MERGE task list that needs to be routed
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000189 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000189'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
||||
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000189 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000189'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000190 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000190'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
||||
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000190 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000190'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000191 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000191'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
||||
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000191 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000191'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000192 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000192'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
||||
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000192 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000192'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
||||
-- with a reference table
|
||||
MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = reference_table.b;
|
||||
DEBUG: A mix of distributed and reference table, try repartitioning
|
||||
DEBUG: A mix of distributed and reference table, routable query is not possible
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported
|
||||
MERGE INTO reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = reference_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
||||
ERROR: Reference table as target is not allowed in MERGE command
|
||||
-- with a citus local table
|
||||
MERGE INTO nullkey_c1_t1 USING citus_local_table ON (nullkey_c1_t1.a = citus_local_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = citus_local_table.b;
|
||||
DEBUG: A mix of distributed and local table, try repartitioning
|
||||
DEBUG: A mix of distributed and citus-local table, routable query is not possible
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported
|
||||
MERGE INTO citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = citus_local_table.a)
|
||||
WHEN MATCHED THEN DELETE;
|
||||
DEBUG: A mix of distributed and local table, try repartitioning
|
||||
DEBUG: A mix of distributed and citus-local table, routable query is not possible
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE involving repartition of rows is supported only if the target is distributed
|
||||
-- with a postgres table
|
||||
MERGE INTO nullkey_c1_t1 USING postgres_local_table ON (nullkey_c1_t1.a = postgres_local_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = postgres_local_table.b;
|
||||
DEBUG: There is only one distributed table, merge is not pushable, try repartitioning
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE INTO an distributed table from Postgres table is not yet supported
|
||||
MERGE INTO postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = postgres_local_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
||||
DEBUG: There is only one distributed table, merge is not pushable, try repartitioning
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE involving repartition of rows is supported only if the target is distributed
|
||||
-- using ctes
|
||||
WITH cte AS (
|
||||
SELECT * FROM nullkey_c1_t1
|
||||
)
|
||||
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||
DEBUG: <Deparsed MERGE query: WITH cte AS (SELECT nullkey_c1_t1_1.a, nullkey_c1_t1_1.b FROM query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1_1) MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
|
||||
DEBUG: Creating MERGE router plan
|
||||
WITH cte AS (
|
||||
SELECT * FROM distributed_table
|
||||
)
|
||||
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported
|
||||
WITH cte AS materialized (
|
||||
SELECT * FROM distributed_table
|
||||
)
|
||||
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE operation on non-colocated distributed table(s) without a shard key is not yet supported
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA query_single_shard_table CASCADE;
|
||||
SET search_path TO merge_schema;
|
||||
-- Test Columnar table
|
||||
CREATE TABLE target_columnar(cid int, name text) USING columnar;
|
||||
SELECT create_distributed_table('target_columnar', 'cid');
|
||||
|
|
|
@ -0,0 +1,221 @@
|
|||
SHOW server_version \gset
|
||||
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
|
||||
\gset
|
||||
\if :server_version_ge_15
|
||||
\else
|
||||
\q
|
||||
\endif
|
||||
-- MERGE command performs a join from data_source to target_table_name
|
||||
DROP SCHEMA IF EXISTS schema_shard_table1 CASCADE;
|
||||
NOTICE: schema "schema_shard_table1" does not exist, skipping
|
||||
DROP SCHEMA IF EXISTS schema_shard_table2 CASCADE;
|
||||
NOTICE: schema "schema_shard_table2" does not exist, skipping
|
||||
DROP SCHEMA IF EXISTS schema_shard_table CASCADE;
|
||||
NOTICE: schema "schema_shard_table" does not exist, skipping
|
||||
-- test merge with schema-shard tables
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.max_adaptive_executor_pool_size TO 1;
|
||||
SET citus.next_shard_id TO 4005000;
|
||||
SET citus.enable_repartition_joins TO true;
|
||||
CREATE SCHEMA schema_shard_table;
|
||||
SET search_path TO schema_shard_table;
|
||||
CREATE TABLE reference_table(a int, b int);
|
||||
CREATE TABLE distributed_table(a int, b int);
|
||||
CREATE TABLE citus_local_table(a int, b int);
|
||||
CREATE TABLE postgres_local_table(a int, b int);
|
||||
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
|
||||
INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i;
|
||||
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
|
||||
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
|
||||
SELECT create_reference_table('reference_table');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$schema_shard_table.reference_table$$)
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('distributed_table', 'a');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$schema_shard_table.distributed_table$$)
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus_add_local_table_to_metadata('citus_local_table');
|
||||
citus_add_local_table_to_metadata
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE SCHEMA schema_shard_table1;
|
||||
CREATE SCHEMA schema_shard_table2;
|
||||
SET search_path TO schema_shard_table1;
|
||||
CREATE TABLE nullkey_c1_t1(a int, b int);
|
||||
CREATE TABLE nullkey_c1_t2(a int, b int);
|
||||
INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(0, 5) i;
|
||||
INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(3, 8) i;
|
||||
SET search_path TO schema_shard_table2;
|
||||
CREATE TABLE nullkey_c2_t1(a int, b int);
|
||||
CREATE TABLE nullkey_c2_t2(a int, b int);
|
||||
INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(0, 5) i;
|
||||
INSERT INTO nullkey_c2_t2 SELECT i, i FROM generate_series(3, 8) i;
|
||||
SET search_path TO schema_shard_table1;
|
||||
-- with a colocated table
|
||||
SET client_min_messages TO DEBUG2;
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b;
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b>
|
||||
DEBUG: Creating MERGE router plan
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN DELETE;
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE>
|
||||
DEBUG: Creating MERGE router plan
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
|
||||
DEBUG: Creating MERGE router plan
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN DELETE
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING schema_shard_table1.nullkey_c1_t2_4005007 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
|
||||
DEBUG: Creating MERGE router plan
|
||||
SET search_path TO schema_shard_table2;
|
||||
-- with non-colocated schema-shard table
|
||||
MERGE INTO schema_shard_table1.nullkey_c1_t1 USING nullkey_c2_t1 ON (schema_shard_table1.nullkey_c1_t1.a = nullkey_c2_t1.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b;
|
||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
|
||||
MERGE INTO schema_shard_table1.nullkey_c1_t1 USING nullkey_c2_t1 ON (schema_shard_table1.nullkey_c1_t1.a = nullkey_c2_t1.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b);
|
||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
|
||||
-- with a distributed table
|
||||
SET search_path TO schema_shard_table1;
|
||||
MERGE INTO nullkey_c1_t1 USING schema_shard_table.distributed_table ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = schema_shard_table.distributed_table.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (schema_shard_table.distributed_table.a, schema_shard_table.distributed_table.b);
|
||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
|
||||
MERGE INTO schema_shard_table.distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a)
|
||||
WHEN MATCHED THEN DELETE
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
DEBUG: Using column - index:0 from the source list to redistribute
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Collect source query results on coordinator
|
||||
DEBUG: Create a MERGE task list that needs to be routed
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table.distributed_table_4005001 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005001'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
||||
DEBUG: distributed statement: MERGE INTO schema_shard_table.distributed_table_4005001 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005001'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table.distributed_table_4005002 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005002'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
||||
DEBUG: distributed statement: MERGE INTO schema_shard_table.distributed_table_4005002 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005002'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table.distributed_table_4005003 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005003'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
||||
DEBUG: distributed statement: MERGE INTO schema_shard_table.distributed_table_4005003 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005003'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table.distributed_table_4005004 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005004'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
||||
DEBUG: distributed statement: MERGE INTO schema_shard_table.distributed_table_4005004 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005004'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
||||
DEBUG: Execute MERGE task list
|
||||
RESET client_min_messages;
|
||||
SELECT count(*) FROM schema_shard_table.distributed_table WHERE a in (0, 1, 2);
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
3
|
||||
(1 row)
|
||||
|
||||
MERGE INTO schema_shard_table.distributed_table
|
||||
USING (SELECT s1.a AS s1a, s2.b AS s2b
|
||||
FROM nullkey_c1_t1 s1 JOIN schema_shard_table2.nullkey_c2_t1 s2
|
||||
ON s1.a = s2.a) src
|
||||
ON (src.s1a = schema_shard_table.distributed_table.a)
|
||||
WHEN MATCHED THEN DELETE
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (src.s1a, src.s2b);
|
||||
-- Three matching rows must be deleted
|
||||
SELECT count(*) FROM schema_shard_table.distributed_table WHERE a in (0, 1, 2);
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- with a reference table
|
||||
SET client_min_messages TO DEBUG2;
|
||||
MERGE INTO nullkey_c1_t1 USING schema_shard_table.reference_table ON (nullkey_c1_t1.a = schema_shard_table.reference_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = schema_shard_table.reference_table.b;
|
||||
DEBUG: A mix of distributed and reference table, try repartitioning
|
||||
DEBUG: A mix of distributed and reference table, routable query is not possible
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
|
||||
MERGE INTO schema_shard_table.reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.reference_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
||||
ERROR: Reference table as target is not allowed in MERGE command
|
||||
-- with a citus local table
|
||||
MERGE INTO nullkey_c1_t1 USING schema_shard_table.citus_local_table ON (nullkey_c1_t1.a = schema_shard_table.citus_local_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = schema_shard_table.citus_local_table.b;
|
||||
DEBUG: A mix of distributed and local table, try repartitioning
|
||||
DEBUG: A mix of distributed and citus-local table, routable query is not possible
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
|
||||
MERGE INTO schema_shard_table.citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.citus_local_table.a)
|
||||
WHEN MATCHED THEN DELETE;
|
||||
DEBUG: A mix of distributed and local table, try repartitioning
|
||||
DEBUG: A mix of distributed and citus-local table, routable query is not possible
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE involving repartition of rows is supported only if the target is distributed
|
||||
-- with a postgres table
|
||||
MERGE INTO nullkey_c1_t1 USING schema_shard_table.postgres_local_table ON (nullkey_c1_t1.a = schema_shard_table.postgres_local_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = schema_shard_table.postgres_local_table.b;
|
||||
DEBUG: There is only one distributed table, merge is not pushable, try repartitioning
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE INTO an distributed table from Postgres table is not yet supported
|
||||
MERGE INTO schema_shard_table.postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.postgres_local_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
||||
DEBUG: There is only one distributed table, merge is not pushable, try repartitioning
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE involving repartition of rows is supported only if the target is distributed
|
||||
-- using ctes
|
||||
WITH cte AS (
|
||||
SELECT * FROM nullkey_c1_t1
|
||||
)
|
||||
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||
DEBUG: <Deparsed MERGE query: WITH cte AS (SELECT nullkey_c1_t1_1.a, nullkey_c1_t1_1.b FROM schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1_1) MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
|
||||
DEBUG: Creating MERGE router plan
|
||||
WITH cte AS (
|
||||
SELECT * FROM schema_shard_table.distributed_table
|
||||
)
|
||||
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
|
||||
WITH cte AS materialized (
|
||||
SELECT * FROM schema_shard_table.distributed_table
|
||||
)
|
||||
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||
DEBUG: Creating MERGE repartition plan
|
||||
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA schema_shard_table1 CASCADE;
|
||||
DROP SCHEMA schema_shard_table2 CASCADE;
|
||||
DROP SCHEMA schema_shard_table CASCADE;
|
|
@ -0,0 +1,6 @@
|
|||
SHOW server_version \gset
|
||||
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
|
||||
\gset
|
||||
\if :server_version_ge_15
|
||||
\else
|
||||
\q
|
|
@ -112,7 +112,7 @@ test: clock
|
|||
|
||||
# MERGE tests
|
||||
test: merge pgmerge merge_repartition2
|
||||
test: merge_repartition1
|
||||
test: merge_repartition1 merge_schema_sharding
|
||||
test: merge_partition_tables
|
||||
|
||||
# ---------
|
||||
|
|
|
@ -2366,118 +2366,6 @@ UPDATE SET val = dist_source.val
|
|||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(dist_source.id, dist_source.val);
|
||||
|
||||
-- test merge with single-shard tables
|
||||
|
||||
CREATE SCHEMA query_single_shard_table;
|
||||
|
||||
SET search_path TO query_single_shard_table;
|
||||
|
||||
CREATE TABLE nullkey_c1_t1(a int, b int);
|
||||
CREATE TABLE nullkey_c1_t2(a int, b int);
|
||||
SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none');
|
||||
SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1');
|
||||
|
||||
CREATE TABLE nullkey_c2_t1(a int, b int);
|
||||
CREATE TABLE nullkey_c2_t2(a int, b int);
|
||||
SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none');
|
||||
SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null);
|
||||
|
||||
CREATE TABLE reference_table(a int, b int);
|
||||
CREATE TABLE distributed_table(a int, b int);
|
||||
CREATE TABLE citus_local_table(a int, b int);
|
||||
SELECT create_reference_table('reference_table');
|
||||
SELECT create_distributed_table('distributed_table', 'a');
|
||||
SELECT citus_add_local_table_to_metadata('citus_local_table');
|
||||
|
||||
SET client_min_messages TO DEBUG2;
|
||||
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
|
||||
|
||||
INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i;
|
||||
|
||||
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
|
||||
|
||||
CREATE TABLE postgres_local_table(a int, b int);
|
||||
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
|
||||
|
||||
-- with a colocated table
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b;
|
||||
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN DELETE;
|
||||
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
|
||||
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN DELETE
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
|
||||
|
||||
-- with non-colocated single-shard table
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b;
|
||||
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b);
|
||||
|
||||
-- with a distributed table
|
||||
MERGE INTO nullkey_c1_t1 USING distributed_table ON (nullkey_c1_t1.a = distributed_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = distributed_table.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (distributed_table.a, distributed_table.b);
|
||||
|
||||
MERGE INTO distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = distributed_table.a)
|
||||
WHEN MATCHED THEN DELETE
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
||||
|
||||
-- with a reference table
|
||||
MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = reference_table.b;
|
||||
|
||||
MERGE INTO reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = reference_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
||||
|
||||
-- with a citus local table
|
||||
MERGE INTO nullkey_c1_t1 USING citus_local_table ON (nullkey_c1_t1.a = citus_local_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = citus_local_table.b;
|
||||
|
||||
MERGE INTO citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = citus_local_table.a)
|
||||
WHEN MATCHED THEN DELETE;
|
||||
|
||||
-- with a postgres table
|
||||
MERGE INTO nullkey_c1_t1 USING postgres_local_table ON (nullkey_c1_t1.a = postgres_local_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = postgres_local_table.b;
|
||||
|
||||
MERGE INTO postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = postgres_local_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
||||
|
||||
-- using ctes
|
||||
WITH cte AS (
|
||||
SELECT * FROM nullkey_c1_t1
|
||||
)
|
||||
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||
|
||||
WITH cte AS (
|
||||
SELECT * FROM distributed_table
|
||||
)
|
||||
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||
|
||||
WITH cte AS materialized (
|
||||
SELECT * FROM distributed_table
|
||||
)
|
||||
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA query_single_shard_table CASCADE;
|
||||
|
||||
SET search_path TO merge_schema;
|
||||
|
||||
-- Test Columnar table
|
||||
CREATE TABLE target_columnar(cid int, name text) USING columnar;
|
||||
SELECT create_distributed_table('target_columnar', 'cid');
|
||||
|
|
|
@ -0,0 +1,148 @@
|
|||
SHOW server_version \gset
|
||||
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
|
||||
\gset
|
||||
\if :server_version_ge_15
|
||||
\else
|
||||
\q
|
||||
\endif
|
||||
|
||||
-- MERGE command performs a join from data_source to target_table_name
|
||||
DROP SCHEMA IF EXISTS schema_shard_table1 CASCADE;
|
||||
DROP SCHEMA IF EXISTS schema_shard_table2 CASCADE;
|
||||
DROP SCHEMA IF EXISTS schema_shard_table CASCADE;
|
||||
|
||||
-- test merge with schema-shard tables
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.max_adaptive_executor_pool_size TO 1;
|
||||
SET citus.next_shard_id TO 4005000;
|
||||
SET citus.enable_repartition_joins TO true;
|
||||
|
||||
CREATE SCHEMA schema_shard_table;
|
||||
SET search_path TO schema_shard_table;
|
||||
CREATE TABLE reference_table(a int, b int);
|
||||
CREATE TABLE distributed_table(a int, b int);
|
||||
CREATE TABLE citus_local_table(a int, b int);
|
||||
CREATE TABLE postgres_local_table(a int, b int);
|
||||
|
||||
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
|
||||
INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i;
|
||||
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
|
||||
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
|
||||
|
||||
SELECT create_reference_table('reference_table');
|
||||
SELECT create_distributed_table('distributed_table', 'a');
|
||||
SELECT citus_add_local_table_to_metadata('citus_local_table');
|
||||
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE SCHEMA schema_shard_table1;
|
||||
CREATE SCHEMA schema_shard_table2;
|
||||
|
||||
SET search_path TO schema_shard_table1;
|
||||
CREATE TABLE nullkey_c1_t1(a int, b int);
|
||||
CREATE TABLE nullkey_c1_t2(a int, b int);
|
||||
INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(0, 5) i;
|
||||
INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(3, 8) i;
|
||||
|
||||
SET search_path TO schema_shard_table2;
|
||||
CREATE TABLE nullkey_c2_t1(a int, b int);
|
||||
CREATE TABLE nullkey_c2_t2(a int, b int);
|
||||
INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(0, 5) i;
|
||||
INSERT INTO nullkey_c2_t2 SELECT i, i FROM generate_series(3, 8) i;
|
||||
|
||||
SET search_path TO schema_shard_table1;
|
||||
|
||||
-- with a colocated table
|
||||
SET client_min_messages TO DEBUG2;
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b;
|
||||
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN DELETE;
|
||||
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
|
||||
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN DELETE
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
|
||||
|
||||
SET search_path TO schema_shard_table2;
|
||||
|
||||
-- with non-colocated schema-shard table
|
||||
MERGE INTO schema_shard_table1.nullkey_c1_t1 USING nullkey_c2_t1 ON (schema_shard_table1.nullkey_c1_t1.a = nullkey_c2_t1.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b;
|
||||
|
||||
MERGE INTO schema_shard_table1.nullkey_c1_t1 USING nullkey_c2_t1 ON (schema_shard_table1.nullkey_c1_t1.a = nullkey_c2_t1.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b);
|
||||
|
||||
-- with a distributed table
|
||||
SET search_path TO schema_shard_table1;
|
||||
MERGE INTO nullkey_c1_t1 USING schema_shard_table.distributed_table ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = schema_shard_table.distributed_table.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (schema_shard_table.distributed_table.a, schema_shard_table.distributed_table.b);
|
||||
|
||||
MERGE INTO schema_shard_table.distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a)
|
||||
WHEN MATCHED THEN DELETE
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
||||
|
||||
RESET client_min_messages;
|
||||
SELECT count(*) FROM schema_shard_table.distributed_table WHERE a in (0, 1, 2);
|
||||
MERGE INTO schema_shard_table.distributed_table
|
||||
USING (SELECT s1.a AS s1a, s2.b AS s2b
|
||||
FROM nullkey_c1_t1 s1 JOIN schema_shard_table2.nullkey_c2_t1 s2
|
||||
ON s1.a = s2.a) src
|
||||
ON (src.s1a = schema_shard_table.distributed_table.a)
|
||||
WHEN MATCHED THEN DELETE
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (src.s1a, src.s2b);
|
||||
-- Three matching rows must be deleted
|
||||
SELECT count(*) FROM schema_shard_table.distributed_table WHERE a in (0, 1, 2);
|
||||
|
||||
-- with a reference table
|
||||
SET client_min_messages TO DEBUG2;
|
||||
MERGE INTO nullkey_c1_t1 USING schema_shard_table.reference_table ON (nullkey_c1_t1.a = schema_shard_table.reference_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = schema_shard_table.reference_table.b;
|
||||
|
||||
MERGE INTO schema_shard_table.reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.reference_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
||||
|
||||
-- with a citus local table
|
||||
MERGE INTO nullkey_c1_t1 USING schema_shard_table.citus_local_table ON (nullkey_c1_t1.a = schema_shard_table.citus_local_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = schema_shard_table.citus_local_table.b;
|
||||
|
||||
MERGE INTO schema_shard_table.citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.citus_local_table.a)
|
||||
WHEN MATCHED THEN DELETE;
|
||||
|
||||
-- with a postgres table
|
||||
MERGE INTO nullkey_c1_t1 USING schema_shard_table.postgres_local_table ON (nullkey_c1_t1.a = schema_shard_table.postgres_local_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = schema_shard_table.postgres_local_table.b;
|
||||
|
||||
MERGE INTO schema_shard_table.postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.postgres_local_table.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
||||
|
||||
-- using ctes
|
||||
WITH cte AS (
|
||||
SELECT * FROM nullkey_c1_t1
|
||||
)
|
||||
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||
|
||||
WITH cte AS (
|
||||
SELECT * FROM schema_shard_table.distributed_table
|
||||
)
|
||||
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||
|
||||
WITH cte AS materialized (
|
||||
SELECT * FROM schema_shard_table.distributed_table
|
||||
)
|
||||
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA schema_shard_table1 CASCADE;
|
||||
DROP SCHEMA schema_shard_table2 CASCADE;
|
||||
DROP SCHEMA schema_shard_table CASCADE;
|
Loading…
Reference in New Issue