mirror of https://github.com/citusdata/citus.git
Propagate MERGE ... WHEN NOT MATCHED BY SOURCE (#7807)
DESCRIPTION: Propagates MERGE ... WHEN NOT MATCHED BY SOURCE It seems like there is not much needed to be done here. `get_merge_query_def` from `ruleutils_17` is updated with "WHEN NOT MATCHED BY SOURCE" therefore `deparse_shard_query` parses the merge query for execution on the shard correctly. Relevant PG commit: https://github.com/postgres/postgres/commit/0294df2f1pull/7922/head
parent
74d945f5ae
commit
0642a4dc08
|
@ -1546,8 +1546,8 @@ FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* NOT MATCHED can have either INSERT or DO NOTHING */
|
/* NOT MATCHED can have either INSERT, DO NOTHING or UPDATE(PG17) */
|
||||||
if (action->commandType == CMD_NOTHING)
|
if (action->commandType == CMD_NOTHING || action->commandType == CMD_UPDATE)
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2195,6 +2195,501 @@ CONTEXT: PL/pgSQL function public.explain_filter(text) line XX at FOR over EXEC
|
||||||
|
|
||||||
RESET citus.log_remote_commands;
|
RESET citus.log_remote_commands;
|
||||||
-- End of EXPLAIN MEMORY SERIALIZE tests
|
-- End of EXPLAIN MEMORY SERIALIZE tests
|
||||||
|
-- Add support for MERGE ... WHEN NOT MATCHED BY SOURCE.
|
||||||
|
-- Relevant PG commit:
|
||||||
|
-- https://github.com/postgres/postgres/commit/0294df2f1
|
||||||
|
SET citus.next_shard_id TO 1072025;
|
||||||
|
-- Regular Postgres tables
|
||||||
|
CREATE TABLE postgres_target_1 (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE postgres_target_2 (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE postgres_source (sid integer, delta float);
|
||||||
|
INSERT INTO postgres_target_1 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO postgres_target_2 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO postgres_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||||
|
-- Citus local tables
|
||||||
|
CREATE TABLE citus_local_target (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE citus_local_source (sid integer, delta float);
|
||||||
|
SELECT citus_add_local_table_to_metadata('citus_local_target');
|
||||||
|
citus_add_local_table_to_metadata
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_add_local_table_to_metadata('citus_local_source');
|
||||||
|
citus_add_local_table_to_metadata
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO citus_local_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO citus_local_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||||
|
-- Citus distributed tables
|
||||||
|
CREATE TABLE citus_distributed_target (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE citus_distributed_source (sid integer, delta float);
|
||||||
|
SELECT create_distributed_table('citus_distributed_target', 'tid');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('citus_distributed_source', 'sid');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO citus_distributed_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO citus_distributed_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||||
|
-- Citus reference tables
|
||||||
|
CREATE TABLE citus_reference_target (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE citus_reference_source (sid integer, delta float);
|
||||||
|
SELECT create_reference_table('citus_reference_target');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_reference_table('citus_reference_source');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO citus_reference_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO citus_reference_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||||
|
-- Try all combinations of tables with two queries:
|
||||||
|
-- 1: Simple Merge
|
||||||
|
-- 2: Merge with a constant qual
|
||||||
|
-- Run the merge queries with the postgres tables
|
||||||
|
-- to save the expected output
|
||||||
|
-- try simple MERGE
|
||||||
|
MERGE INTO postgres_target_1 t
|
||||||
|
USING postgres_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM postgres_target_1 ORDER BY tid, val;
|
||||||
|
tid | balance | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 110 | initial updated by merge
|
||||||
|
2 | 20 | inserted by merge
|
||||||
|
3 | 330 | initial updated by merge
|
||||||
|
4 | 40 | inserted by merge
|
||||||
|
5 | 550 | initial updated by merge
|
||||||
|
6 | 60 | inserted by merge
|
||||||
|
7 | 770 | initial updated by merge
|
||||||
|
8 | 80 | inserted by merge
|
||||||
|
9 | 990 | initial updated by merge
|
||||||
|
10 | 100 | inserted by merge
|
||||||
|
11 | 1210 | initial updated by merge
|
||||||
|
12 | 120 | inserted by merge
|
||||||
|
13 | 1430 | initial updated by merge
|
||||||
|
14 | 140 | inserted by merge
|
||||||
|
15 | 1500 | initial not matched by source
|
||||||
|
(15 rows)
|
||||||
|
|
||||||
|
-- same with a constant qual
|
||||||
|
MERGE INTO postgres_target_2 t
|
||||||
|
USING postgres_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM postgres_target_2 ORDER BY tid, val;
|
||||||
|
tid | balance | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 110 | initial updated by merge
|
||||||
|
2 | 20 | inserted by merge
|
||||||
|
3 | 300 | initial not matched by source
|
||||||
|
3 | 30 | inserted by merge
|
||||||
|
4 | 40 | inserted by merge
|
||||||
|
5 | 500 | initial not matched by source
|
||||||
|
5 | 50 | inserted by merge
|
||||||
|
6 | 60 | inserted by merge
|
||||||
|
7 | 700 | initial not matched by source
|
||||||
|
7 | 70 | inserted by merge
|
||||||
|
8 | 80 | inserted by merge
|
||||||
|
9 | 900 | initial not matched by source
|
||||||
|
9 | 90 | inserted by merge
|
||||||
|
10 | 100 | inserted by merge
|
||||||
|
11 | 1100 | initial not matched by source
|
||||||
|
11 | 110 | inserted by merge
|
||||||
|
12 | 120 | inserted by merge
|
||||||
|
13 | 1300 | initial not matched by source
|
||||||
|
13 | 130 | inserted by merge
|
||||||
|
14 | 140 | inserted by merge
|
||||||
|
15 | 1500 | initial not matched by source
|
||||||
|
(21 rows)
|
||||||
|
|
||||||
|
-- function to compare the output from Citus tables
|
||||||
|
-- with the expected output from Postgres tables
|
||||||
|
CREATE OR REPLACE FUNCTION compare_tables(table1 TEXT, table2 TEXT) RETURNS BOOLEAN AS $$
|
||||||
|
DECLARE ret BOOL;
|
||||||
|
BEGIN
|
||||||
|
EXECUTE 'select count(*) = 0 from ((
|
||||||
|
SELECT * FROM ' || table1 ||
|
||||||
|
' EXCEPT
|
||||||
|
SELECT * FROM ' || table2 || ' )
|
||||||
|
UNION ALL (
|
||||||
|
SELECT * FROM ' || table2 ||
|
||||||
|
' EXCEPT
|
||||||
|
SELECT * FROM ' || table1 || ' ))' INTO ret;
|
||||||
|
RETURN ret;
|
||||||
|
END
|
||||||
|
$$ LANGUAGE PLPGSQL;
|
||||||
|
-- Local-Local
|
||||||
|
-- Let's also print the command here
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
SET citus.log_local_commands TO on;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_local_target', 'postgres_target_1');
|
||||||
|
compare_tables
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED BY TARGET THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_local_target', 'postgres_target_2');
|
||||||
|
compare_tables
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Local-Reference
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED BY TARGET THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_local_target', 'postgres_target_1');
|
||||||
|
compare_tables
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_local_target', 'postgres_target_2');
|
||||||
|
compare_tables
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Local-Distributed - Merge currently not supported, Feature in development.
|
||||||
|
-- try simple MERGE
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
ERROR: MERGE involving repartition of rows is supported only if the target is distributed
|
||||||
|
-- Distributed-Local
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_distributed_target', 'postgres_target_1');
|
||||||
|
compare_tables
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED BY TARGET THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_distributed_target', 'postgres_target_2');
|
||||||
|
compare_tables
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Distributed-Distributed
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_distributed_target', 'postgres_target_1');
|
||||||
|
compare_tables
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_distributed_target', 'postgres_target_2');
|
||||||
|
compare_tables
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Distributed-Reference
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_distributed_target', 'postgres_target_1');
|
||||||
|
compare_tables
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_distributed_target', 'postgres_target_2');
|
||||||
|
compare_tables
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- Reference-N/A - Reference table as target is not allowed in Merge
|
||||||
|
-- try simple MERGE
|
||||||
|
MERGE INTO citus_reference_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
ERROR: Reference table as target is not allowed in MERGE command
|
||||||
|
-- Complex repartition query example with a mix of tables
|
||||||
|
-- Example from blog post
|
||||||
|
-- https://www.citusdata.com/blog/2023/07/27/how-citus-12-supports-postgres-merge
|
||||||
|
-- Contains information about the machines in the manufacturing facility
|
||||||
|
CREATE TABLE machines (
|
||||||
|
machine_id NUMERIC PRIMARY KEY,
|
||||||
|
machine_name VARCHAR(100),
|
||||||
|
location VARCHAR(50),
|
||||||
|
status VARCHAR(20)
|
||||||
|
);
|
||||||
|
SELECT create_reference_table('machines');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Holds data on the various sensors installed on each machine
|
||||||
|
CREATE TABLE sensors (
|
||||||
|
sensor_id NUMERIC PRIMARY KEY,
|
||||||
|
sensor_name VARCHAR(100),
|
||||||
|
machine_id NUMERIC,
|
||||||
|
sensor_type VARCHAR(50)
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('sensors', 'sensor_id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Stores real-time readings from the sensors
|
||||||
|
CREATE TABLE sensor_readings (
|
||||||
|
reading_id NUMERIC ,
|
||||||
|
sensor_id NUMERIC,
|
||||||
|
reading_value NUMERIC,
|
||||||
|
reading_timestamp TIMESTAMP
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('sensor_readings', 'sensor_id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Holds real-time sensor readings for machines on 'Production Floor 1'
|
||||||
|
CREATE TABLE real_sensor_readings (
|
||||||
|
real_reading_id NUMERIC ,
|
||||||
|
sensor_id NUMERIC,
|
||||||
|
reading_value NUMERIC,
|
||||||
|
reading_timestamp TIMESTAMP
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('real_sensor_readings', 'sensor_id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Insert data into the machines table
|
||||||
|
INSERT INTO machines (machine_id, machine_name, location, status)
|
||||||
|
VALUES
|
||||||
|
(1, 'Machine A', 'Production Floor 1', 'Active'),
|
||||||
|
(2, 'Machine B', 'Production Floor 2', 'Active'),
|
||||||
|
(3, 'Machine C', 'Production Floor 1', 'Inactive');
|
||||||
|
-- Insert data into the sensors table
|
||||||
|
INSERT INTO sensors (sensor_id, sensor_name, machine_id, sensor_type)
|
||||||
|
VALUES
|
||||||
|
(1, 'Temperature Sensor 1', 1, 'Temperature'),
|
||||||
|
(2, 'Pressure Sensor 1', 1, 'Pressure'),
|
||||||
|
(3, 'Temperature Sensor 2', 2, 'Temperature'),
|
||||||
|
(4, 'Vibration Sensor 1', 3, 'Vibration');
|
||||||
|
-- Insert data into the real_sensor_readings table
|
||||||
|
INSERT INTO real_sensor_readings (real_reading_id, sensor_id, reading_value, reading_timestamp)
|
||||||
|
VALUES
|
||||||
|
(1, 1, 35.6, TIMESTAMP '2023-07-20 10:15:00'),
|
||||||
|
(2, 1, 36.8, TIMESTAMP '2023-07-20 10:30:00'),
|
||||||
|
(3, 2, 100.5, TIMESTAMP '2023-07-20 10:15:00'),
|
||||||
|
(4, 2, 101.2, TIMESTAMP '2023-07-20 10:30:00'),
|
||||||
|
(5, 3, 36.2, TIMESTAMP '2023-07-20 10:15:00'),
|
||||||
|
(6, 3, 36.5, TIMESTAMP '2023-07-20 10:30:00'),
|
||||||
|
(7, 4, 0.02, TIMESTAMP '2023-07-20 10:15:00'),
|
||||||
|
(8, 4, 0.03, TIMESTAMP '2023-07-20 10:30:00');
|
||||||
|
-- Insert DUMMY data to use for WHEN NOT MATCHED BY SOURCE
|
||||||
|
INSERT INTO sensor_readings VALUES (0, 0, 0, TIMESTAMP '2023-07-20 10:15:00');
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
-- Complex merge query which needs repartitioning
|
||||||
|
MERGE INTO sensor_readings SR
|
||||||
|
USING (SELECT
|
||||||
|
rsr.sensor_id,
|
||||||
|
AVG(rsr.reading_value) AS average_reading,
|
||||||
|
MAX(rsr.reading_timestamp) AS last_reading_timestamp,
|
||||||
|
MAX(rsr.real_reading_id) AS rid
|
||||||
|
FROM sensors s
|
||||||
|
INNER JOIN machines m ON s.machine_id = m.machine_id
|
||||||
|
INNER JOIN real_sensor_readings rsr ON s.sensor_id = rsr.sensor_id
|
||||||
|
WHERE m.location = 'Production Floor 1'
|
||||||
|
GROUP BY rsr.sensor_id
|
||||||
|
) NEW_READINGS
|
||||||
|
ON (SR.sensor_id = NEW_READINGS.sensor_id)
|
||||||
|
-- Existing reading, update it
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET reading_value = NEW_READINGS.average_reading, reading_timestamp = NEW_READINGS.last_reading_timestamp
|
||||||
|
-- New reading, record it
|
||||||
|
WHEN NOT MATCHED BY TARGET THEN
|
||||||
|
INSERT (reading_id, sensor_id, reading_value, reading_timestamp)
|
||||||
|
VALUES (NEW_READINGS.rid, NEW_READINGS.sensor_id,
|
||||||
|
NEW_READINGS.average_reading, NEW_READINGS.last_reading_timestamp)
|
||||||
|
-- Target has dummy entry not matched by source
|
||||||
|
-- dummy move change reading_value to 100 to notice the change
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET reading_value = 100;
|
||||||
|
DEBUG: A mix of distributed and reference table, try repartitioning
|
||||||
|
DEBUG: A mix of distributed and reference table, routable query is not possible
|
||||||
|
DEBUG: Creating MERGE repartition plan
|
||||||
|
DEBUG: Using column - index:0 from the source list to redistribute
|
||||||
|
DEBUG: Executing subplans of the source query and storing the results at the respective node(s)
|
||||||
|
DEBUG: Redistributing source result rows across nodes
|
||||||
|
DEBUG: Executing final MERGE on workers using intermediate results
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO pg17.sensor_readings_1072043 sr USING (SELECT intermediate_result.sensor_id, intermediate_result.average_reading, intermediate_result.last_reading_timestamp, intermediate_result.rid FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1072047_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(sensor_id numeric, average_reading numeric, last_reading_timestamp timestamp without time zone, rid numeric)) new_readings ON (sr.sensor_id OPERATOR(pg_catalog.=) new_readings.sensor_id) WHEN MATCHED THEN UPDATE SET reading_value = new_readings.average_reading, reading_timestamp = new_readings.last_reading_timestamp WHEN NOT MATCHED BY TARGET THEN INSERT (reading_id, sensor_id, reading_value, reading_timestamp) VALUES (new_readings.rid, new_readings.sensor_id, new_readings.average_reading, new_readings.last_reading_timestamp) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET reading_value = 100>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO pg17.sensor_readings_1072045 sr USING (SELECT intermediate_result.sensor_id, intermediate_result.average_reading, intermediate_result.last_reading_timestamp, intermediate_result.rid FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1072049_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(sensor_id numeric, average_reading numeric, last_reading_timestamp timestamp without time zone, rid numeric)) new_readings ON (sr.sensor_id OPERATOR(pg_catalog.=) new_readings.sensor_id) WHEN MATCHED THEN UPDATE SET reading_value = new_readings.average_reading, reading_timestamp = new_readings.last_reading_timestamp WHEN NOT MATCHED BY TARGET THEN INSERT (reading_id, sensor_id, reading_value, reading_timestamp) VALUES (new_readings.rid, new_readings.sensor_id, new_readings.average_reading, new_readings.last_reading_timestamp) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET reading_value = 100>
|
||||||
|
RESET client_min_messages;
|
||||||
|
-- Expected output is:
|
||||||
|
-- reading_id | sensor_id | reading_value | reading_timestamp
|
||||||
|
-- ------------+-----------+------------------------+---------------------
|
||||||
|
-- 0 | 0 | 100 | 2023-07-20 10:15:00
|
||||||
|
-- 2 | 1 | 36.2000000000000000 | 2023-07-20 10:30:00
|
||||||
|
-- 4 | 2 | 100.8500000000000000 | 2023-07-20 10:30:00
|
||||||
|
-- 8 | 4 | 0.02500000000000000000 | 2023-07-20 10:30:00
|
||||||
|
SELECT * FROM sensor_readings ORDER BY 1;
|
||||||
|
reading_id | sensor_id | reading_value | reading_timestamp
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0 | 100 | Thu Jul 20 10:15:00 2023
|
||||||
|
2 | 1 | 36.2000000000000000 | Thu Jul 20 10:30:00 2023
|
||||||
|
4 | 2 | 100.8500000000000000 | Thu Jul 20 10:30:00 2023
|
||||||
|
8 | 4 | 0.02500000000000000000 | Thu Jul 20 10:30:00 2023
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- End of MERGE ... WHEN NOT MATCHED BY SOURCE tests
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA pg17 CASCADE;
|
DROP SCHEMA pg17 CASCADE;
|
||||||
|
|
|
@ -1080,6 +1080,376 @@ select public.explain_filter('explain (analyze,serialize) create temp table expl
|
||||||
|
|
||||||
RESET citus.log_remote_commands;
|
RESET citus.log_remote_commands;
|
||||||
-- End of EXPLAIN MEMORY SERIALIZE tests
|
-- End of EXPLAIN MEMORY SERIALIZE tests
|
||||||
|
-- Add support for MERGE ... WHEN NOT MATCHED BY SOURCE.
|
||||||
|
-- Relevant PG commit:
|
||||||
|
-- https://github.com/postgres/postgres/commit/0294df2f1
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 1072025;
|
||||||
|
|
||||||
|
-- Regular Postgres tables
|
||||||
|
CREATE TABLE postgres_target_1 (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE postgres_target_2 (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE postgres_source (sid integer, delta float);
|
||||||
|
INSERT INTO postgres_target_1 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO postgres_target_2 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO postgres_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||||
|
|
||||||
|
-- Citus local tables
|
||||||
|
CREATE TABLE citus_local_target (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE citus_local_source (sid integer, delta float);
|
||||||
|
SELECT citus_add_local_table_to_metadata('citus_local_target');
|
||||||
|
SELECT citus_add_local_table_to_metadata('citus_local_source');
|
||||||
|
INSERT INTO citus_local_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO citus_local_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||||
|
-- Citus distributed tables
|
||||||
|
CREATE TABLE citus_distributed_target (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE citus_distributed_source (sid integer, delta float);
|
||||||
|
SELECT create_distributed_table('citus_distributed_target', 'tid');
|
||||||
|
SELECT create_distributed_table('citus_distributed_source', 'sid');
|
||||||
|
INSERT INTO citus_distributed_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO citus_distributed_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||||
|
-- Citus reference tables
|
||||||
|
CREATE TABLE citus_reference_target (tid integer, balance float, val text);
|
||||||
|
CREATE TABLE citus_reference_source (sid integer, delta float);
|
||||||
|
SELECT create_reference_table('citus_reference_target');
|
||||||
|
SELECT create_reference_table('citus_reference_source');
|
||||||
|
INSERT INTO citus_reference_target SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||||
|
INSERT INTO citus_reference_source SELECT id, id * 10 FROM generate_series(1,14) AS id;
|
||||||
|
|
||||||
|
-- Try all combinations of tables with two queries:
|
||||||
|
-- 1: Simple Merge
|
||||||
|
-- 2: Merge with a constant qual
|
||||||
|
|
||||||
|
-- Run the merge queries with the postgres tables
|
||||||
|
-- to save the expected output
|
||||||
|
|
||||||
|
-- try simple MERGE
|
||||||
|
MERGE INTO postgres_target_1 t
|
||||||
|
USING postgres_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM postgres_target_1 ORDER BY tid, val;
|
||||||
|
|
||||||
|
-- same with a constant qual
|
||||||
|
MERGE INTO postgres_target_2 t
|
||||||
|
USING postgres_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT * FROM postgres_target_2 ORDER BY tid, val;
|
||||||
|
|
||||||
|
-- function to compare the output from Citus tables
|
||||||
|
-- with the expected output from Postgres tables
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION compare_tables(table1 TEXT, table2 TEXT) RETURNS BOOLEAN AS $$
|
||||||
|
DECLARE ret BOOL;
|
||||||
|
BEGIN
|
||||||
|
EXECUTE 'select count(*) = 0 from ((
|
||||||
|
SELECT * FROM ' || table1 ||
|
||||||
|
' EXCEPT
|
||||||
|
SELECT * FROM ' || table2 || ' )
|
||||||
|
UNION ALL (
|
||||||
|
SELECT * FROM ' || table2 ||
|
||||||
|
' EXCEPT
|
||||||
|
SELECT * FROM ' || table1 || ' ))' INTO ret;
|
||||||
|
RETURN ret;
|
||||||
|
END
|
||||||
|
$$ LANGUAGE PLPGSQL;
|
||||||
|
|
||||||
|
-- Local-Local
|
||||||
|
-- Let's also print the command here
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
SET citus.log_local_commands TO on;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_local_target', 'postgres_target_1');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED BY TARGET THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_local_target', 'postgres_target_2');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Local-Reference
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED BY TARGET THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_local_target', 'postgres_target_1');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_local_target', 'postgres_target_2');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Local-Distributed - Merge currently not supported, Feature in development.
|
||||||
|
-- try simple MERGE
|
||||||
|
MERGE INTO citus_local_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
|
||||||
|
-- Distributed-Local
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_distributed_target', 'postgres_target_1');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_local_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED BY TARGET THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_distributed_target', 'postgres_target_2');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Distributed-Distributed
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_distributed_target', 'postgres_target_1');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_distributed_target', 'postgres_target_2');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Distributed-Reference
|
||||||
|
-- try simple MERGE
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_distributed_target', 'postgres_target_1');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- same with a constant qual
|
||||||
|
BEGIN;
|
||||||
|
MERGE INTO citus_distributed_target t
|
||||||
|
USING citus_reference_source s
|
||||||
|
ON t.tid = s.sid AND tid = 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
SELECT compare_tables('citus_distributed_target', 'postgres_target_2');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Reference-N/A - Reference table as target is not allowed in Merge
|
||||||
|
-- try simple MERGE
|
||||||
|
MERGE INTO citus_reference_target t
|
||||||
|
USING citus_distributed_source s
|
||||||
|
ON t.tid = s.sid
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET val = val || ' not matched by source';
|
||||||
|
|
||||||
|
-- Complex repartition query example with a mix of tables
|
||||||
|
-- Example from blog post
|
||||||
|
-- https://www.citusdata.com/blog/2023/07/27/how-citus-12-supports-postgres-merge
|
||||||
|
|
||||||
|
-- Contains information about the machines in the manufacturing facility
|
||||||
|
CREATE TABLE machines (
|
||||||
|
machine_id NUMERIC PRIMARY KEY,
|
||||||
|
machine_name VARCHAR(100),
|
||||||
|
location VARCHAR(50),
|
||||||
|
status VARCHAR(20)
|
||||||
|
);
|
||||||
|
SELECT create_reference_table('machines');
|
||||||
|
|
||||||
|
-- Holds data on the various sensors installed on each machine
|
||||||
|
CREATE TABLE sensors (
|
||||||
|
sensor_id NUMERIC PRIMARY KEY,
|
||||||
|
sensor_name VARCHAR(100),
|
||||||
|
machine_id NUMERIC,
|
||||||
|
sensor_type VARCHAR(50)
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('sensors', 'sensor_id');
|
||||||
|
|
||||||
|
-- Stores real-time readings from the sensors
|
||||||
|
CREATE TABLE sensor_readings (
|
||||||
|
reading_id NUMERIC ,
|
||||||
|
sensor_id NUMERIC,
|
||||||
|
reading_value NUMERIC,
|
||||||
|
reading_timestamp TIMESTAMP
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('sensor_readings', 'sensor_id');
|
||||||
|
|
||||||
|
-- Holds real-time sensor readings for machines on 'Production Floor 1'
|
||||||
|
CREATE TABLE real_sensor_readings (
|
||||||
|
real_reading_id NUMERIC ,
|
||||||
|
sensor_id NUMERIC,
|
||||||
|
reading_value NUMERIC,
|
||||||
|
reading_timestamp TIMESTAMP
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('real_sensor_readings', 'sensor_id');
|
||||||
|
|
||||||
|
-- Insert data into the machines table
|
||||||
|
INSERT INTO machines (machine_id, machine_name, location, status)
|
||||||
|
VALUES
|
||||||
|
(1, 'Machine A', 'Production Floor 1', 'Active'),
|
||||||
|
(2, 'Machine B', 'Production Floor 2', 'Active'),
|
||||||
|
(3, 'Machine C', 'Production Floor 1', 'Inactive');
|
||||||
|
|
||||||
|
-- Insert data into the sensors table
|
||||||
|
INSERT INTO sensors (sensor_id, sensor_name, machine_id, sensor_type)
|
||||||
|
VALUES
|
||||||
|
(1, 'Temperature Sensor 1', 1, 'Temperature'),
|
||||||
|
(2, 'Pressure Sensor 1', 1, 'Pressure'),
|
||||||
|
(3, 'Temperature Sensor 2', 2, 'Temperature'),
|
||||||
|
(4, 'Vibration Sensor 1', 3, 'Vibration');
|
||||||
|
|
||||||
|
-- Insert data into the real_sensor_readings table
|
||||||
|
INSERT INTO real_sensor_readings (real_reading_id, sensor_id, reading_value, reading_timestamp)
|
||||||
|
VALUES
|
||||||
|
(1, 1, 35.6, TIMESTAMP '2023-07-20 10:15:00'),
|
||||||
|
(2, 1, 36.8, TIMESTAMP '2023-07-20 10:30:00'),
|
||||||
|
(3, 2, 100.5, TIMESTAMP '2023-07-20 10:15:00'),
|
||||||
|
(4, 2, 101.2, TIMESTAMP '2023-07-20 10:30:00'),
|
||||||
|
(5, 3, 36.2, TIMESTAMP '2023-07-20 10:15:00'),
|
||||||
|
(6, 3, 36.5, TIMESTAMP '2023-07-20 10:30:00'),
|
||||||
|
(7, 4, 0.02, TIMESTAMP '2023-07-20 10:15:00'),
|
||||||
|
(8, 4, 0.03, TIMESTAMP '2023-07-20 10:30:00');
|
||||||
|
|
||||||
|
-- Insert DUMMY data to use for WHEN NOT MATCHED BY SOURCE
|
||||||
|
INSERT INTO sensor_readings VALUES (0, 0, 0, TIMESTAMP '2023-07-20 10:15:00');
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
-- Complex merge query which needs repartitioning
|
||||||
|
MERGE INTO sensor_readings SR
|
||||||
|
USING (SELECT
|
||||||
|
rsr.sensor_id,
|
||||||
|
AVG(rsr.reading_value) AS average_reading,
|
||||||
|
MAX(rsr.reading_timestamp) AS last_reading_timestamp,
|
||||||
|
MAX(rsr.real_reading_id) AS rid
|
||||||
|
FROM sensors s
|
||||||
|
INNER JOIN machines m ON s.machine_id = m.machine_id
|
||||||
|
INNER JOIN real_sensor_readings rsr ON s.sensor_id = rsr.sensor_id
|
||||||
|
WHERE m.location = 'Production Floor 1'
|
||||||
|
GROUP BY rsr.sensor_id
|
||||||
|
) NEW_READINGS
|
||||||
|
|
||||||
|
ON (SR.sensor_id = NEW_READINGS.sensor_id)
|
||||||
|
|
||||||
|
-- Existing reading, update it
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET reading_value = NEW_READINGS.average_reading, reading_timestamp = NEW_READINGS.last_reading_timestamp
|
||||||
|
|
||||||
|
-- New reading, record it
|
||||||
|
WHEN NOT MATCHED BY TARGET THEN
|
||||||
|
INSERT (reading_id, sensor_id, reading_value, reading_timestamp)
|
||||||
|
VALUES (NEW_READINGS.rid, NEW_READINGS.sensor_id,
|
||||||
|
NEW_READINGS.average_reading, NEW_READINGS.last_reading_timestamp)
|
||||||
|
|
||||||
|
-- Target has dummy entry not matched by source
|
||||||
|
-- dummy move change reading_value to 100 to notice the change
|
||||||
|
WHEN NOT MATCHED BY SOURCE THEN
|
||||||
|
UPDATE SET reading_value = 100;
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
-- Expected output is:
|
||||||
|
-- reading_id | sensor_id | reading_value | reading_timestamp
|
||||||
|
-- ------------+-----------+------------------------+---------------------
|
||||||
|
-- 0 | 0 | 100 | 2023-07-20 10:15:00
|
||||||
|
-- 2 | 1 | 36.2000000000000000 | 2023-07-20 10:30:00
|
||||||
|
-- 4 | 2 | 100.8500000000000000 | 2023-07-20 10:30:00
|
||||||
|
-- 8 | 4 | 0.02500000000000000000 | 2023-07-20 10:30:00
|
||||||
|
SELECT * FROM sensor_readings ORDER BY 1;
|
||||||
|
|
||||||
|
-- End of MERGE ... WHEN NOT MATCHED BY SOURCE tests
|
||||||
|
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
|
|
Loading…
Reference in New Issue