Add test with repartition, fix output from before

pull/7807/head
naisila 2025-01-07 23:47:10 +03:00 committed by Naisila Puka
parent 1c07cdb926
commit 47bb0d504a
2 changed files with 281 additions and 14 deletions

View File

@ -2198,7 +2198,7 @@ RESET citus.log_remote_commands;
-- Add support for MERGE ... WHEN NOT MATCHED BY SOURCE.
-- Relevant PG commit:
-- https://github.com/postgres/postgres/commit/0294df2f1
SET citus.next_shard_id TO 25122024;
SET citus.next_shard_id TO 1072025;
-- Regular Postgres tables
CREATE TABLE postgres_target_1 (tid integer, balance float, val text);
CREATE TABLE postgres_target_2 (tid integer, balance float, val text);
@ -2210,33 +2210,51 @@ INSERT INTO postgres_source SELECT id, id * 10 FROM generate_series(1,14) AS id
CREATE TABLE citus_local_target (tid integer, balance float, val text);
CREATE TABLE citus_local_source (sid integer, delta float);
SELECT citus_add_local_table_to_metadata('citus_local_target');
ERROR: duplicate key value violates unique constraint "pg_dist_shard_shardid_index"
DETAIL: Key (shardid)=(25122024) already exists.
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
SELECT citus_add_local_table_to_metadata('citus_local_source');
ERROR: duplicate key value violates unique constraint "pg_dist_shard_shardid_index"
DETAIL: Key (shardid)=(25122025) already exists.
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
INSERT INTO citus_local_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
INSERT INTO citus_local_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
-- Citus distributed tables
CREATE TABLE citus_distributed_target (tid integer, balance float, val text);
CREATE TABLE citus_distributed_source (sid integer, delta float);
SELECT create_distributed_table('citus_distributed_target', 'tid');
ERROR: duplicate key value violates unique constraint "pg_dist_shard_shardid_index"
DETAIL: Key (shardid)=(25122026) already exists.
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('citus_distributed_source', 'sid');
ERROR: duplicate key value violates unique constraint "pg_dist_shard_shardid_index"
DETAIL: Key (shardid)=(25122027) already exists.
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO citus_distributed_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
INSERT INTO citus_distributed_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
-- Citus reference tables
CREATE TABLE citus_reference_target (tid integer, balance float, val text);
CREATE TABLE citus_reference_source (sid integer, delta float);
SELECT create_reference_table('citus_reference_target');
ERROR: duplicate key value violates unique constraint "pg_dist_shard_shardid_index"
DETAIL: Key (shardid)=(25122028) already exists.
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('citus_reference_source');
ERROR: duplicate key value violates unique constraint "pg_dist_shard_shardid_index"
DETAIL: Key (shardid)=(25122029) already exists.
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO citus_reference_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
INSERT INTO citus_reference_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
-- Try all combinations of tables with two queries:
@ -2413,6 +2431,7 @@ MERGE INTO citus_local_target t
INSERT VALUES (sid, delta, 'inserted by merge')
WHEN NOT MATCHED BY SOURCE THEN
UPDATE SET val = val || ' not matched by source';
ERROR: MERGE involving repartition of rows is supported only if the target is distributed
-- Distributed-Local
-- try simple MERGE
BEGIN;
@ -2535,6 +2554,141 @@ MERGE INTO citus_reference_target t
INSERT VALUES (sid, delta, 'inserted by merge')
WHEN NOT MATCHED BY SOURCE THEN
UPDATE SET val = val || ' not matched by source';
ERROR: Reference table as target is not allowed in MERGE command
-- Complex repartition query example with a mix of tables
-- Example from blog post
-- https://www.citusdata.com/blog/2023/07/27/how-citus-12-supports-postgres-merge
-- Contains information about the machines in the manufacturing facility
CREATE TABLE machines (
machine_id NUMERIC PRIMARY KEY,
machine_name VARCHAR(100),
location VARCHAR(50),
status VARCHAR(20)
);
SELECT create_reference_table('machines');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- Holds data on the various sensors installed on each machine
CREATE TABLE sensors (
sensor_id NUMERIC PRIMARY KEY,
sensor_name VARCHAR(100),
machine_id NUMERIC,
sensor_type VARCHAR(50)
);
SELECT create_distributed_table('sensors', 'sensor_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Stores real-time readings from the sensors
CREATE TABLE sensor_readings (
reading_id NUMERIC ,
sensor_id NUMERIC,
reading_value NUMERIC,
reading_timestamp TIMESTAMP
);
SELECT create_distributed_table('sensor_readings', 'sensor_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Holds real-time sensor readings for machines on 'Production Floor 1'
CREATE TABLE real_sensor_readings (
real_reading_id NUMERIC ,
sensor_id NUMERIC,
reading_value NUMERIC,
reading_timestamp TIMESTAMP
);
SELECT create_distributed_table('real_sensor_readings', 'sensor_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Insert data into the machines table
INSERT INTO machines (machine_id, machine_name, location, status)
VALUES
(1, 'Machine A', 'Production Floor 1', 'Active'),
(2, 'Machine B', 'Production Floor 2', 'Active'),
(3, 'Machine C', 'Production Floor 1', 'Inactive');
-- Insert data into the sensors table
INSERT INTO sensors (sensor_id, sensor_name, machine_id, sensor_type)
VALUES
(1, 'Temperature Sensor 1', 1, 'Temperature'),
(2, 'Pressure Sensor 1', 1, 'Pressure'),
(3, 'Temperature Sensor 2', 2, 'Temperature'),
(4, 'Vibration Sensor 1', 3, 'Vibration');
-- Insert data into the real_sensor_readings table
INSERT INTO real_sensor_readings (real_reading_id, sensor_id, reading_value, reading_timestamp)
VALUES
(1, 1, 35.6, TIMESTAMP '2023-07-20 10:15:00'),
(2, 1, 36.8, TIMESTAMP '2023-07-20 10:30:00'),
(3, 2, 100.5, TIMESTAMP '2023-07-20 10:15:00'),
(4, 2, 101.2, TIMESTAMP '2023-07-20 10:30:00'),
(5, 3, 36.2, TIMESTAMP '2023-07-20 10:15:00'),
(6, 3, 36.5, TIMESTAMP '2023-07-20 10:30:00'),
(7, 4, 0.02, TIMESTAMP '2023-07-20 10:15:00'),
(8, 4, 0.03, TIMESTAMP '2023-07-20 10:30:00');
-- Insert DUMMY data to use for WHEN NOT MATCHED BY SOURCE
INSERT INTO sensor_readings VALUES (0, 0, 0, TIMESTAMP '2023-07-20 10:15:00');
SET client_min_messages TO DEBUG1;
-- Complex merge query which needs repartitioning
MERGE INTO sensor_readings SR
USING (SELECT
rsr.sensor_id,
AVG(rsr.reading_value) AS average_reading,
MAX(rsr.reading_timestamp) AS last_reading_timestamp,
MAX(rsr.real_reading_id) AS rid
FROM sensors s
INNER JOIN machines m ON s.machine_id = m.machine_id
INNER JOIN real_sensor_readings rsr ON s.sensor_id = rsr.sensor_id
WHERE m.location = 'Production Floor 1'
GROUP BY rsr.sensor_id
) NEW_READINGS
ON (SR.sensor_id = NEW_READINGS.sensor_id)
-- Existing reading, update it
WHEN MATCHED THEN
UPDATE SET reading_value = NEW_READINGS.average_reading, reading_timestamp = NEW_READINGS.last_reading_timestamp
-- New reading, record it
WHEN NOT MATCHED BY TARGET THEN
INSERT (reading_id, sensor_id, reading_value, reading_timestamp)
VALUES (NEW_READINGS.rid, NEW_READINGS.sensor_id,
NEW_READINGS.average_reading, NEW_READINGS.last_reading_timestamp)
-- Target has dummy entry not matched by source
-- dummy move change reading_value to 100 to notice the change
WHEN NOT MATCHED BY SOURCE THEN
UPDATE SET reading_value = 100;
DEBUG: A mix of distributed and reference table, try repartitioning
DEBUG: A mix of distributed and reference table, routable query is not possible
DEBUG: Creating MERGE repartition plan
DEBUG: Using column - index:0 from the source list to redistribute
DEBUG: Executing subplans of the source query and storing the results at the respective node(s)
DEBUG: Redistributing source result rows across nodes
DEBUG: Executing final MERGE on workers using intermediate results
DEBUG: <Deparsed MERGE query: MERGE INTO pg17.sensor_readings_1072043 sr USING (SELECT intermediate_result.sensor_id, intermediate_result.average_reading, intermediate_result.last_reading_timestamp, intermediate_result.rid FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1072047_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(sensor_id numeric, average_reading numeric, last_reading_timestamp timestamp without time zone, rid numeric)) new_readings ON (sr.sensor_id OPERATOR(pg_catalog.=) new_readings.sensor_id) WHEN MATCHED THEN UPDATE SET reading_value = new_readings.average_reading, reading_timestamp = new_readings.last_reading_timestamp WHEN NOT MATCHED BY TARGET THEN INSERT (reading_id, sensor_id, reading_value, reading_timestamp) VALUES (new_readings.rid, new_readings.sensor_id, new_readings.average_reading, new_readings.last_reading_timestamp) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET reading_value = 100>
DEBUG: <Deparsed MERGE query: MERGE INTO pg17.sensor_readings_1072045 sr USING (SELECT intermediate_result.sensor_id, intermediate_result.average_reading, intermediate_result.last_reading_timestamp, intermediate_result.rid FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1072049_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(sensor_id numeric, average_reading numeric, last_reading_timestamp timestamp without time zone, rid numeric)) new_readings ON (sr.sensor_id OPERATOR(pg_catalog.=) new_readings.sensor_id) WHEN MATCHED THEN UPDATE SET reading_value = new_readings.average_reading, reading_timestamp = new_readings.last_reading_timestamp WHEN NOT MATCHED BY TARGET THEN INSERT (reading_id, sensor_id, reading_value, reading_timestamp) VALUES (new_readings.rid, new_readings.sensor_id, new_readings.average_reading, new_readings.last_reading_timestamp) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET reading_value = 100>
RESET client_min_messages;
-- Expected output is:
-- reading_id | sensor_id | reading_value | reading_timestamp
-- ------------+-----------+------------------------+---------------------
-- 0 | 0 | 100 | 2023-07-20 10:15:00
-- 2 | 1 | 36.2000000000000000 | 2023-07-20 10:30:00
-- 4 | 2 | 100.8500000000000000 | 2023-07-20 10:30:00
-- 8 | 4 | 0.02500000000000000000 | 2023-07-20 10:30:00
SELECT * FROM sensor_readings ORDER BY 1;
reading_id | sensor_id | reading_value | reading_timestamp
---------------------------------------------------------------------
0 | 0 | 100 | Thu Jul 20 10:15:00 2023
2 | 1 | 36.2000000000000000 | Thu Jul 20 10:30:00 2023
4 | 2 | 100.8500000000000000 | Thu Jul 20 10:30:00 2023
8 | 4 | 0.02500000000000000000 | Thu Jul 20 10:30:00 2023
(4 rows)
-- End of MERGE ... WHEN NOT MATCHED BY SOURCE tests
\set VERBOSITY terse
SET client_min_messages TO WARNING;

View File

@ -1084,7 +1084,7 @@ RESET citus.log_remote_commands;
-- Relevant PG commit:
-- https://github.com/postgres/postgres/commit/0294df2f1
SET citus.next_shard_id TO 25122024;
SET citus.next_shard_id TO 1072025;
-- Regular Postgres tables
CREATE TABLE postgres_target_1 (tid integer, balance float, val text);
@ -1336,6 +1336,119 @@ MERGE INTO citus_reference_target t
WHEN NOT MATCHED BY SOURCE THEN
UPDATE SET val = val || ' not matched by source';
-- Complex repartition query example with a mix of tables
-- Example from blog post
-- https://www.citusdata.com/blog/2023/07/27/how-citus-12-supports-postgres-merge
-- Contains information about the machines in the manufacturing facility
CREATE TABLE machines (
machine_id NUMERIC PRIMARY KEY,
machine_name VARCHAR(100),
location VARCHAR(50),
status VARCHAR(20)
);
SELECT create_reference_table('machines');
-- Holds data on the various sensors installed on each machine
CREATE TABLE sensors (
sensor_id NUMERIC PRIMARY KEY,
sensor_name VARCHAR(100),
machine_id NUMERIC,
sensor_type VARCHAR(50)
);
SELECT create_distributed_table('sensors', 'sensor_id');
-- Stores real-time readings from the sensors
CREATE TABLE sensor_readings (
reading_id NUMERIC ,
sensor_id NUMERIC,
reading_value NUMERIC,
reading_timestamp TIMESTAMP
);
SELECT create_distributed_table('sensor_readings', 'sensor_id');
-- Holds real-time sensor readings for machines on 'Production Floor 1'
CREATE TABLE real_sensor_readings (
real_reading_id NUMERIC ,
sensor_id NUMERIC,
reading_value NUMERIC,
reading_timestamp TIMESTAMP
);
SELECT create_distributed_table('real_sensor_readings', 'sensor_id');
-- Insert data into the machines table
INSERT INTO machines (machine_id, machine_name, location, status)
VALUES
(1, 'Machine A', 'Production Floor 1', 'Active'),
(2, 'Machine B', 'Production Floor 2', 'Active'),
(3, 'Machine C', 'Production Floor 1', 'Inactive');
-- Insert data into the sensors table
INSERT INTO sensors (sensor_id, sensor_name, machine_id, sensor_type)
VALUES
(1, 'Temperature Sensor 1', 1, 'Temperature'),
(2, 'Pressure Sensor 1', 1, 'Pressure'),
(3, 'Temperature Sensor 2', 2, 'Temperature'),
(4, 'Vibration Sensor 1', 3, 'Vibration');
-- Insert data into the real_sensor_readings table
INSERT INTO real_sensor_readings (real_reading_id, sensor_id, reading_value, reading_timestamp)
VALUES
(1, 1, 35.6, TIMESTAMP '2023-07-20 10:15:00'),
(2, 1, 36.8, TIMESTAMP '2023-07-20 10:30:00'),
(3, 2, 100.5, TIMESTAMP '2023-07-20 10:15:00'),
(4, 2, 101.2, TIMESTAMP '2023-07-20 10:30:00'),
(5, 3, 36.2, TIMESTAMP '2023-07-20 10:15:00'),
(6, 3, 36.5, TIMESTAMP '2023-07-20 10:30:00'),
(7, 4, 0.02, TIMESTAMP '2023-07-20 10:15:00'),
(8, 4, 0.03, TIMESTAMP '2023-07-20 10:30:00');
-- Insert DUMMY data to use for WHEN NOT MATCHED BY SOURCE
INSERT INTO sensor_readings VALUES (0, 0, 0, TIMESTAMP '2023-07-20 10:15:00');
SET client_min_messages TO DEBUG1;
-- Complex merge query which needs repartitioning
MERGE INTO sensor_readings SR
USING (SELECT
rsr.sensor_id,
AVG(rsr.reading_value) AS average_reading,
MAX(rsr.reading_timestamp) AS last_reading_timestamp,
MAX(rsr.real_reading_id) AS rid
FROM sensors s
INNER JOIN machines m ON s.machine_id = m.machine_id
INNER JOIN real_sensor_readings rsr ON s.sensor_id = rsr.sensor_id
WHERE m.location = 'Production Floor 1'
GROUP BY rsr.sensor_id
) NEW_READINGS
ON (SR.sensor_id = NEW_READINGS.sensor_id)
-- Existing reading, update it
WHEN MATCHED THEN
UPDATE SET reading_value = NEW_READINGS.average_reading, reading_timestamp = NEW_READINGS.last_reading_timestamp
-- New reading, record it
WHEN NOT MATCHED BY TARGET THEN
INSERT (reading_id, sensor_id, reading_value, reading_timestamp)
VALUES (NEW_READINGS.rid, NEW_READINGS.sensor_id,
NEW_READINGS.average_reading, NEW_READINGS.last_reading_timestamp)
-- Target has dummy entry not matched by source
-- dummy move change reading_value to 100 to notice the change
WHEN NOT MATCHED BY SOURCE THEN
UPDATE SET reading_value = 100;
RESET client_min_messages;
-- Expected output is:
-- reading_id | sensor_id | reading_value | reading_timestamp
-- ------------+-----------+------------------------+---------------------
-- 0 | 0 | 100 | 2023-07-20 10:15:00
-- 2 | 1 | 36.2000000000000000 | 2023-07-20 10:30:00
-- 4 | 2 | 100.8500000000000000 | 2023-07-20 10:30:00
-- 8 | 4 | 0.02500000000000000000 | 2023-07-20 10:30:00
SELECT * FROM sensor_readings ORDER BY 1;
-- End of MERGE ... WHEN NOT MATCHED BY SOURCE tests
\set VERBOSITY terse