mirror of https://github.com/citusdata/citus.git
341 lines
14 KiB
Plaintext
341 lines
14 KiB
Plaintext
-- We create two sets of source and target tables, one set in Postgres and
|
|
-- the other in Citus distributed. We run the _exact_ MERGE SQL on both sets
|
|
-- and compare the final results of the target tables in Postgres and Citus.
|
|
-- The results should match. This process is repeated for various combinations
|
|
-- of MERGE SQL.
|
|
DROP SCHEMA IF EXISTS merge_repartition2_schema CASCADE;
|
|
NOTICE: schema "merge_repartition2_schema" does not exist, skipping
|
|
CREATE SCHEMA merge_repartition2_schema;
|
|
SET search_path TO merge_repartition2_schema;
|
|
SET citus.shard_count TO 4;
|
|
SET citus.next_shard_id TO 6000000;
|
|
SET citus.explain_all_tasks TO true;
|
|
SET citus.shard_replication_factor TO 1;
|
|
SET citus.max_adaptive_executor_pool_size TO 1;
|
|
SET client_min_messages = warning;
|
|
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
RESET client_min_messages;
|
|
CREATE TABLE pg_target(id int, val int);
|
|
CREATE TABLE pg_source(id int, val int, const int);
|
|
CREATE TABLE citus_target(id int, val int);
|
|
CREATE TABLE citus_source(id int, val int, const int);
|
|
SELECT citus_add_local_table_to_metadata('citus_target');
|
|
citus_add_local_table_to_metadata
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT citus_add_local_table_to_metadata('citus_source');
|
|
citus_add_local_table_to_metadata
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE OR REPLACE FUNCTION cleanup_data() RETURNS VOID SET search_path TO merge_repartition2_schema AS $$
|
|
TRUNCATE pg_target;
|
|
TRUNCATE pg_source;
|
|
TRUNCATE citus_target;
|
|
TRUNCATE citus_source;
|
|
SELECT undistribute_table('citus_target');
|
|
SELECT undistribute_table('citus_source');
|
|
$$
|
|
LANGUAGE SQL;
|
|
--
|
|
-- Load same set of data to both Postgres and Citus tables
|
|
--
|
|
CREATE OR REPLACE FUNCTION setup_data() RETURNS VOID SET search_path TO merge_repartition2_schema AS $$
|
|
INSERT INTO pg_source SELECT i, i+1, 1 FROM generate_series(1, 100000) i;
|
|
INSERT INTO pg_target SELECT i, 1 FROM generate_series(50001, 100000) i;
|
|
INSERT INTO citus_source SELECT i, i+1, 1 FROM generate_series(1, 100000) i;
|
|
INSERT INTO citus_target SELECT i, 1 FROM generate_series(50001, 100000) i;
|
|
$$
|
|
LANGUAGE SQL;
|
|
--
|
|
-- Compares the final target tables, merge-modified data, of both Postgres and Citus tables
|
|
--
|
|
CREATE OR REPLACE FUNCTION check_data(table1_name text, column1_name text, table2_name text, column2_name text)
|
|
RETURNS VOID SET search_path TO merge_repartition2_schema AS $$
|
|
DECLARE
|
|
table1_avg numeric;
|
|
table2_avg numeric;
|
|
BEGIN
|
|
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column1_name, table1_name) INTO table1_avg;
|
|
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column2_name, table2_name) INTO table2_avg;
|
|
|
|
IF table1_avg > table2_avg THEN
|
|
RAISE EXCEPTION 'The average of %.% is greater than %.%', table1_name, column1_name, table2_name, column2_name;
|
|
ELSIF table1_avg < table2_avg THEN
|
|
RAISE EXCEPTION 'The average of %.% is less than %.%', table1_name, column1_name, table2_name, column2_name;
|
|
ELSE
|
|
RAISE NOTICE 'The average of %.% is equal to %.%', table1_name, column1_name, table2_name, column2_name;
|
|
END IF;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
CREATE OR REPLACE FUNCTION compare_data() RETURNS VOID SET search_path TO merge_repartition2_schema AS $$
|
|
SELECT check_data('pg_target', 'id', 'citus_target', 'id');
|
|
SELECT check_data('pg_target', 'val', 'citus_target', 'val');
|
|
$$
|
|
LANGUAGE SQL;
|
|
-- Test nested cte
|
|
SELECT cleanup_data();
|
|
NOTICE: creating a new table for merge_repartition2_schema.citus_target
|
|
CONTEXT: SQL function "cleanup_data" statement 5
|
|
NOTICE: moving the data of merge_repartition2_schema.citus_target
|
|
CONTEXT: SQL function "cleanup_data" statement 5
|
|
NOTICE: dropping the old merge_repartition2_schema.citus_target
|
|
CONTEXT: SQL function "cleanup_data" statement 5
|
|
NOTICE: renaming the new table to merge_repartition2_schema.citus_target
|
|
CONTEXT: SQL function "cleanup_data" statement 5
|
|
NOTICE: creating a new table for merge_repartition2_schema.citus_source
|
|
CONTEXT: SQL function "cleanup_data" statement 6
|
|
NOTICE: moving the data of merge_repartition2_schema.citus_source
|
|
CONTEXT: SQL function "cleanup_data" statement 6
|
|
NOTICE: dropping the old merge_repartition2_schema.citus_source
|
|
CONTEXT: SQL function "cleanup_data" statement 6
|
|
NOTICE: renaming the new table to merge_repartition2_schema.citus_source
|
|
CONTEXT: SQL function "cleanup_data" statement 6
|
|
cleanup_data
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT setup_data();
|
|
setup_data
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT create_distributed_table('citus_target', 'id');
|
|
NOTICE: Copying data from local table...
|
|
NOTICE: copying the data has completed
|
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition2_schema.citus_target$$)
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
|
|
NOTICE: Copying data from local table...
|
|
NOTICE: copying the data has completed
|
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition2_schema.citus_source$$)
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
WITH cte_top AS(WITH cte_1 AS (WITH cte_2 AS (SELECT id, val FROM pg_source) SELECT * FROM cte_2) SELECT * FROM cte_1)
|
|
MERGE INTO pg_target t
|
|
USING (SELECT const, val, id FROM pg_source WHERE id IN (SELECT id FROM cte_top)) as s
|
|
ON (s.id = t.id)
|
|
WHEN MATCHED AND t.id <= 75000 THEN
|
|
UPDATE SET val = (s.val::int8+1)
|
|
WHEN MATCHED THEN
|
|
DELETE
|
|
WHEN NOT MATCHED THEN
|
|
INSERT VALUES (s.id, s.val);
|
|
WITH cte_top AS(WITH cte_1 AS (WITH cte_2 AS (SELECT id, val FROM citus_source) SELECT * FROM cte_2) SELECT * FROM cte_1)
|
|
MERGE INTO citus_target t
|
|
USING (SELECT const, val, id FROM citus_source WHERE id IN (SELECT id FROM cte_top)) as s
|
|
ON (s.id = t.id)
|
|
WHEN MATCHED AND t.id <= 75000 THEN
|
|
UPDATE SET val = (s.val::int8+1)
|
|
WHEN MATCHED THEN
|
|
DELETE
|
|
WHEN NOT MATCHED THEN
|
|
INSERT VALUES (s.id, s.val);
|
|
SELECT compare_data();
|
|
NOTICE: The average of pg_target.id is equal to citus_target.id
|
|
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
|
|
SQL function "compare_data" statement 1
|
|
NOTICE: The average of pg_target.val is equal to citus_target.val
|
|
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
|
|
SQL function "compare_data" statement 2
|
|
compare_data
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Test aggregate function in source query
|
|
MERGE INTO pg_target t
|
|
USING (SELECT count(id+1)::text as value, val as key FROM pg_source group by key) s
|
|
ON t.id = s.key
|
|
WHEN MATCHED AND t.id <= 75000 THEN
|
|
UPDATE SET val = (s.value::int8+1)
|
|
WHEN MATCHED THEN
|
|
DELETE
|
|
WHEN NOT MATCHED THEN
|
|
INSERT VALUES(s.key, value::int4+10);
|
|
MERGE INTO citus_target t
|
|
USING (SELECT count(id+1)::text as value, val as key FROM citus_source group by key) s
|
|
ON t.id = s.key
|
|
WHEN MATCHED AND t.id <= 75000 THEN
|
|
UPDATE SET val = (s.value::int8+1)
|
|
WHEN MATCHED THEN
|
|
DELETE
|
|
WHEN NOT MATCHED THEN
|
|
INSERT VALUES(s.key, value::int4+10);
|
|
SELECT compare_data();
|
|
NOTICE: The average of pg_target.id is equal to citus_target.id
|
|
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
|
|
SQL function "compare_data" statement 1
|
|
NOTICE: The average of pg_target.val is equal to citus_target.val
|
|
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
|
|
SQL function "compare_data" statement 2
|
|
compare_data
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
---- https://github.com/citusdata/citus/issues/8180 ----
|
|
CREATE TABLE dist_1 (a int, b int, c int);
|
|
CREATE TABLE dist_2 (a int, b int, c int);
|
|
CREATE TABLE dist_different_order_1 (b int, a int, c int);
|
|
SELECT create_distributed_table('dist_1', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT create_distributed_table('dist_2', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT create_distributed_table('dist_different_order_1', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
MERGE INTO dist_1
|
|
USING dist_2
|
|
ON (dist_1.a = dist_2.b)
|
|
WHEN MATCHED THEN UPDATE SET b = dist_2.b;
|
|
MERGE INTO dist_1
|
|
USING dist_1 src
|
|
ON (dist_1.a = src.b)
|
|
WHEN MATCHED THEN UPDATE SET b = src.b;
|
|
MERGE INTO dist_different_order_1
|
|
USING dist_1
|
|
ON (dist_different_order_1.a = dist_1.b)
|
|
WHEN MATCHED THEN UPDATE SET b = dist_1.b;
|
|
CREATE TABLE dist_1_cast (a int, b int);
|
|
CREATE TABLE dist_2_cast (a int, b numeric);
|
|
SELECT create_distributed_table('dist_1_cast', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT create_distributed_table('dist_2_cast', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
MERGE INTO dist_1_cast
|
|
USING dist_2_cast
|
|
ON (dist_1_cast.a = dist_2_cast.b)
|
|
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
|
|
ERROR: In the MERGE ON clause, there is a datatype mismatch between target's distribution column and the expression originating from the source.
|
|
DETAIL: If the types are different, Citus uses different hash functions for the two column types, which might lead to incorrect repartitioning of the result data
|
|
MERGE INTO dist_1_cast
|
|
USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast
|
|
ON (dist_1_cast.a = dist_2_cast.b)
|
|
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
|
|
-- a more sophisticated example
|
|
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
|
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
|
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
|
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
|
SELECT create_distributed_table('dist_source', 'tstamp_col');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT create_distributed_table('dist_target', 'int_col');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
|
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
|
i,
|
|
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
|
'source_' || i,
|
|
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
|
FROM generate_series(1001, 2000) i;
|
|
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
|
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
|
i,
|
|
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
|
'source_' || i,
|
|
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
|
FROM generate_series(901, 1000) i;
|
|
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
|
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
|
i,
|
|
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
|
'source_' || i,
|
|
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
|
FROM generate_series(1501, 2000) i;
|
|
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
|
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
|
i,
|
|
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
|
'source_' || i-1,
|
|
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
|
FROM generate_series(1401, 1500) i;
|
|
INSERT INTO local_source SELECT * FROM dist_source;
|
|
INSERT INTO local_target SELECT * FROM dist_target;
|
|
-- execute the query on distributed tables
|
|
MERGE INTO dist_target target_alias
|
|
USING dist_source source_alias
|
|
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
|
WHEN MATCHED THEN UPDATE SET
|
|
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
|
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
|
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
|
text_col = source_alias.json_col->>'a'
|
|
WHEN NOT MATCHED THEN
|
|
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
|
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
|
MERGE INTO local_target target_alias
|
|
USING local_source source_alias
|
|
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
|
WHEN MATCHED THEN UPDATE SET
|
|
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
|
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
|
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
|
text_col = source_alias.json_col->>'a'
|
|
WHEN NOT MATCHED THEN
|
|
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
|
-- compare both targets
|
|
SELECT COUNT(*) = 0 AS targets_match
|
|
FROM (
|
|
SELECT * FROM dist_target
|
|
EXCEPT
|
|
SELECT * FROM local_target
|
|
UNION ALL
|
|
SELECT * FROM local_target
|
|
EXCEPT
|
|
SELECT * FROM dist_target
|
|
) q;
|
|
targets_match
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
SET client_min_messages TO WARNING;
|
|
DROP SCHEMA merge_repartition2_schema CASCADE;
|