SET citus.log_remote_commands TO OFF; DROP SCHEMA IF EXISTS distributed_triggers CASCADE; NOTICE: schema "distributed_triggers" does not exist, skipping CREATE SCHEMA distributed_triggers; SET search_path TO 'distributed_triggers'; SET citus.shard_replication_factor = 1; SET citus.next_shard_id TO 800000; -- -- Test citus.enable_unsafe_triggers -- Enables arbitrary triggers on distributed tables -- CREATE TABLE data ( shard_key_value text not null, object_id text not null, value jsonb not null ); ALTER TABLE data ADD CONSTRAINT data_pk PRIMARY KEY (shard_key_value, object_id); /* table of changes */ CREATE TABLE data_changes ( shard_key_value text not null, object_id text not null, change_id bigint not null, change_time timestamptz default now(), operation_type text not null, new_value jsonb ); ALTER TABLE data_changes ADD CONSTRAINT data_changes_pk PRIMARY KEY (shard_key_value, object_id, change_id); SELECT create_distributed_table('data', 'shard_key_value'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('data_changes', 'shard_key_value', colocate_with := 'data'); create_distributed_table --------------------------------------------------------------------- (1 row) SET citus.enable_unsafe_triggers TO true; SELECT run_command_on_workers('ALTER SYSTEM SET citus.enable_unsafe_triggers TO true;'); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,"ALTER SYSTEM") (localhost,57638,t,"ALTER SYSTEM") (2 rows) SELECT run_command_on_workers('SELECT pg_reload_conf();'); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,t) (localhost,57638,t,t) (2 rows) /* trigger function that is called after any change */ CREATE OR REPLACE FUNCTION record_change() RETURNS trigger AS $$ DECLARE last_change_id bigint; BEGIN IF (TG_OP = 'DELETE') THEN /* get the last change ID for object key in OLD via index(-only) scan */ SELECT change_id INTO last_change_id FROM distributed_triggers.data_changes WHERE shard_key_value = OLD.shard_key_value AND object_id = OLD.object_id ORDER BY change_id DESC LIMIT 1; /* insert a change record for the delete */ INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type) VALUES (OLD.shard_key_value, OLD.object_id, COALESCE(last_change_id + 1, 1), TG_OP); ELSE /* get the last change ID for object key in NEW via index(-only) scan */ SELECT change_id INTO last_change_id FROM distributed_triggers.data_changes WHERE shard_key_value = NEW.shard_key_value AND object_id = NEW.object_id ORDER BY change_id DESC LIMIT 1; /* insert a change record for the insert/update */ INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value) VALUES (NEW.shard_key_value, NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value); END IF; RETURN NULL; END; $$ LANGUAGE plpgsql; SELECT proname from pg_proc WHERE oid='distributed_triggers.record_change'::regproc; proname --------------------------------------------------------------------- record_change (1 row) SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE oid='distributed_triggers.record_change'::regproc$$); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,1) (localhost,57638,t,1) (2 rows) CREATE TRIGGER record_change_trigger AFTER INSERT OR UPDATE OR DELETE ON data FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_change(); -- Trigger function should appear on workers SELECT proname from pg_proc WHERE oid='distributed_triggers.record_change'::regproc; proname --------------------------------------------------------------------- record_change (1 row) SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE oid='distributed_triggers.record_change'::regproc$$); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,1) (localhost,57638,t,1) (2 rows) INSERT INTO data VALUES ('hello','world','{"hello":"world"}'); INSERT INTO data VALUES ('hello2','world2','{"hello2":"world2"}'); INSERT INTO data VALUES ('hello3','world3','{"hello3":"world3"}'); DELETE FROM data where shard_key_value = 'hello'; BEGIN; UPDATE data SET value = '{}' where shard_key_value = 'hello3'; END; DELETE FROM data where shard_key_value = 'hello3'; SELECT * FROM data ORDER BY shard_key_value, object_id; shard_key_value | object_id | value --------------------------------------------------------------------- hello2 | world2 | {"hello2": "world2"} (1 row) SELECT shard_key_value, object_id, change_id, operation_type, new_value FROM data_changes ORDER BY shard_key_value, object_id, change_id; shard_key_value | object_id | change_id | operation_type | new_value --------------------------------------------------------------------- hello | world | 1 | INSERT | {"hello": "world"} hello | world | 2 | DELETE | hello2 | world2 | 1 | INSERT | {"hello2": "world2"} hello3 | world3 | 1 | INSERT | {"hello3": "world3"} hello3 | world3 | 2 | UPDATE | {} hello3 | world3 | 3 | DELETE | (6 rows) CREATE FUNCTION insert_delete_document(key text, id text) RETURNS void LANGUAGE plpgsql AS $fn$ BEGIN INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}'); DELETE FROM distributed_triggers.data where shard_key_value = key; END; $fn$; SELECT create_distributed_function( 'insert_delete_document(text, text)', 'key', colocate_with := 'data', force_delegation := true ); create_distributed_function --------------------------------------------------------------------- (1 row) SELECT insert_delete_document('hello4', 'world4'); insert_delete_document --------------------------------------------------------------------- (1 row) BEGIN; SELECT insert_delete_document('hello4', 'world4'); insert_delete_document --------------------------------------------------------------------- (1 row) COMMIT; SELECT * FROM data ORDER BY shard_key_value, object_id; shard_key_value | object_id | value --------------------------------------------------------------------- hello2 | world2 | {"hello2": "world2"} (1 row) SELECT shard_key_value, object_id, change_id, operation_type, new_value FROM data_changes ORDER BY shard_key_value, object_id, change_id; shard_key_value | object_id | change_id | operation_type | new_value --------------------------------------------------------------------- hello | world | 1 | INSERT | {"hello": "world"} hello | world | 2 | DELETE | hello2 | world2 | 1 | INSERT | {"hello2": "world2"} hello3 | world3 | 1 | INSERT | {"hello3": "world3"} hello3 | world3 | 2 | UPDATE | {} hello3 | world3 | 3 | DELETE | hello4 | world4 | 1 | INSERT | {"id1": "id2"} hello4 | world4 | 2 | DELETE | hello4 | world4 | 3 | INSERT | {"id1": "id2"} hello4 | world4 | 4 | DELETE | (10 rows) SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'record_change_trigger%' ORDER BY 1,2; tgrelid | tgname --------------------------------------------------------------------- data | record_change_trigger (1 row) SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'record_change_trigger%';$$); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,3) (localhost,57638,t,3) (2 rows) ALTER TRIGGER "record_change_trigger" ON "distributed_triggers"."data" RENAME TO "new_record_change_trigger"; SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'record_change_trigger%' ORDER BY 1,2; tgrelid | tgname --------------------------------------------------------------------- (0 rows) SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'new_record_change_trigger%' ORDER BY 1,2; tgrelid | tgname --------------------------------------------------------------------- data | new_record_change_trigger (1 row) SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'record_change_trigger%';$$); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,0) (localhost,57638,t,0) (2 rows) SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'new_record_change_trigger%';$$); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,3) (localhost,57638,t,3) (2 rows) --This should fail DROP TRIGGER record_change_trigger ON data; ERROR: trigger "record_change_trigger" for table "data" does not exist DROP TRIGGER new_record_change_trigger ON data; --Trigger should go away SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'new_record_change_trigger%' ORDER BY 1,2; tgrelid | tgname --------------------------------------------------------------------- (0 rows) -- -- Run bad triggers -- CREATE OR REPLACE FUNCTION bad_shardkey_record_change() RETURNS trigger AS $$ DECLARE last_change_id bigint; BEGIN INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value) VALUES ('BAD', NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value); RETURN NULL; END; $$ LANGUAGE plpgsql; CREATE TRIGGER bad_shardkey_record_change_trigger AFTER INSERT OR UPDATE OR DELETE ON data FOR EACH ROW EXECUTE FUNCTION distributed_triggers.bad_shardkey_record_change(); -- Bad trigger fired from an individual SQL -- Query-on-distributed table exception should catch this INSERT INTO data VALUES ('hello6','world6','{"hello6":"world6"}'); ERROR: cannot execute a distributed query from a query on a shard DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value) VALUES ('BAD', NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value)" PL/pgSQL function bad_shardkey_record_change() line XX at SQL statement while executing command on localhost:xxxxx -- Bad trigger fired from SQL inside a force-delegated function -- Incorrect distribution key exception should catch this SELECT insert_delete_document('hello6', 'world6'); ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false) CONTEXT: SQL statement "INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value) VALUES ('BAD', NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value)" PL/pgSQL function distributed_triggers.bad_shardkey_record_change() line XX at SQL statement SQL statement "INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}')" PL/pgSQL function distributed_triggers.insert_delete_document(text,text) line XX at SQL statement while executing command on localhost:xxxxx SELECT * FROM data ORDER BY shard_key_value, object_id; shard_key_value | object_id | value --------------------------------------------------------------------- hello2 | world2 | {"hello2": "world2"} (1 row) SELECT shard_key_value, object_id, change_id, operation_type, new_value FROM data_changes ORDER BY shard_key_value, object_id, change_id; shard_key_value | object_id | change_id | operation_type | new_value --------------------------------------------------------------------- hello | world | 1 | INSERT | {"hello": "world"} hello | world | 2 | DELETE | hello2 | world2 | 1 | INSERT | {"hello2": "world2"} hello3 | world3 | 1 | INSERT | {"hello3": "world3"} hello3 | world3 | 2 | UPDATE | {} hello3 | world3 | 3 | DELETE | hello4 | world4 | 1 | INSERT | {"id1": "id2"} hello4 | world4 | 2 | DELETE | hello4 | world4 | 3 | INSERT | {"id1": "id2"} hello4 | world4 | 4 | DELETE | (10 rows) DROP TRIGGER bad_shardkey_record_change_trigger ON data; CREATE OR REPLACE FUNCTION remote_shardkey_record_change() RETURNS trigger SET search_path = 'distributed_triggers' LANGUAGE plpgsql AS $$ DECLARE last_change_id bigint; BEGIN UPDATE distributed_triggers.data_changes SET operation_type = TG_OP; RETURN NULL; END; $$; CREATE TRIGGER remote_shardkey_record_change_trigger AFTER INSERT OR UPDATE OR DELETE ON data FOR EACH ROW EXECUTE FUNCTION distributed_triggers.remote_shardkey_record_change(); CREATE FUNCTION insert_document(key text, id text) RETURNS void LANGUAGE plpgsql SET search_path = 'distributed_triggers' AS $fn$ BEGIN INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}'); DELETE FROM distributed_triggers.data where shard_key_value = key; END; $fn$; SELECT create_distributed_function( 'insert_document(text, text)', 'key', colocate_with := 'data', force_delegation := false ); create_distributed_function --------------------------------------------------------------------- (1 row) BEGIN; SELECT insert_document('hello7', 'world7'); ERROR: cannot execute a distributed query from a query on a shard DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "UPDATE distributed_triggers.data_changes SET operation_type = TG_OP" PL/pgSQL function remote_shardkey_record_change() line XX at SQL statement while executing command on localhost:xxxxx SQL statement "INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}')" PL/pgSQL function insert_document(text,text) line XX at SQL statement END; SELECT insert_document('hello7', 'world7'); ERROR: cannot execute a distributed query from a query on a shard DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "UPDATE distributed_triggers.data_changes SET operation_type = TG_OP" PL/pgSQL function remote_shardkey_record_change() line XX at SQL statement SQL statement "INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}')" PL/pgSQL function insert_document(text,text) line XX at SQL statement while executing command on localhost:xxxxx SELECT * FROM data ORDER BY shard_key_value, object_id; shard_key_value | object_id | value --------------------------------------------------------------------- hello2 | world2 | {"hello2": "world2"} (1 row) SELECT shard_key_value, object_id, change_id, operation_type, new_value FROM data_changes ORDER BY shard_key_value, object_id, change_id; shard_key_value | object_id | change_id | operation_type | new_value --------------------------------------------------------------------- hello | world | 1 | INSERT | {"hello": "world"} hello | world | 2 | DELETE | hello2 | world2 | 1 | INSERT | {"hello2": "world2"} hello3 | world3 | 1 | INSERT | {"hello3": "world3"} hello3 | world3 | 2 | UPDATE | {} hello3 | world3 | 3 | DELETE | hello4 | world4 | 1 | INSERT | {"id1": "id2"} hello4 | world4 | 2 | DELETE | hello4 | world4 | 3 | INSERT | {"id1": "id2"} hello4 | world4 | 4 | DELETE | (10 rows) -- -- Triggers (tables) which are not colocated -- CREATE TABLE emptest ( empname text NOT NULL PRIMARY KEY, salary integer ); CREATE TABLE emptest_audit( operation char(1) NOT NULL, stamp timestamp NOT NULL, userid text NOT NULL, empname text NOT NULL, salary integer, PRIMARY KEY (empname, userid, stamp, operation, salary) ); SELECT create_distributed_table('emptest','empname',colocate_with :='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('emptest_audit','empname',colocate_with :='none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE OR REPLACE FUNCTION process_emp_audit() RETURNS TRIGGER AS $emp_audit$ BEGIN -- -- Create a row in emp_audit to reflect the operation performed on emp, -- making use of the special variable TG_OP to work out the operation. -- IF (TG_OP = 'DELETE') THEN INSERT INTO distributed_triggers.emptest_audit SELECT 'D', now(), user, OLD.*; ELSIF (TG_OP = 'UPDATE') THEN INSERT INTO distributed_triggers.emptest_audit SELECT 'U', now(), user, NEW.*; ELSIF (TG_OP = 'INSERT') THEN INSERT INTO distributed_triggers.emptest_audit SELECT 'I', now(), user, NEW.*; END IF; RETURN NULL; -- result is ignored since this is an AFTER trigger END; $emp_audit$ LANGUAGE plpgsql; CREATE TRIGGER emptest_audit AFTER INSERT OR UPDATE OR DELETE ON emptest FOR EACH ROW EXECUTE FUNCTION distributed_triggers.process_emp_audit(); INSERT INTO emptest VALUES ('test1', 1); INSERT INTO emptest VALUES ('test2', 1); INSERT INTO emptest VALUES ('test3', 1); INSERT INTO emptest VALUES ('test4', 1); SELECT operation, userid, empname, salary FROM emptest_audit ORDER BY 3,1; operation | userid | empname | salary --------------------------------------------------------------------- I | postgres | test1 | 1 I | postgres | test2 | 1 I | postgres | test3 | 1 I | postgres | test4 | 1 (4 rows) DELETE from emptest; SELECT operation, userid, empname, salary FROM emptest_audit ORDER BY 3,1; operation | userid | empname | salary --------------------------------------------------------------------- D | postgres | test1 | 1 I | postgres | test1 | 1 D | postgres | test2 | 1 I | postgres | test2 | 1 D | postgres | test3 | 1 I | postgres | test3 | 1 D | postgres | test4 | 1 I | postgres | test4 | 1 (8 rows) CREATE VIEW emp_triggers AS SELECT tgname, tgrelid::regclass, tgenabled FROM pg_trigger WHERE tgrelid::regclass::text like 'emptest%' ORDER BY 1, 2; SELECT * FROM emp_triggers ORDER BY 1,2; tgname | tgrelid | tgenabled --------------------------------------------------------------------- emptest_audit | emptest | O truncate_trigger_xxxxxxx | emptest | O truncate_trigger_xxxxxxx | emptest_audit | O (3 rows) -- Triggers "FOR EACH STATEMENT" CREATE TABLE record_op ( empname text NOT NULL, operation_type text not null, stamp timestamp NOT NULL ); ALTER TABLE record_op REPLICA IDENTITY FULL; SELECT create_distributed_table('record_op', 'empname', colocate_with := 'emptest'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE OR REPLACE FUNCTION record_emp() RETURNS TRIGGER AS $rec_audit$ BEGIN INSERT INTO distributed_triggers.record_op SELECT 'dummy', TG_OP, now(); RETURN NULL; -- result is ignored since this is an AFTER trigger END; $rec_audit$ LANGUAGE plpgsql; CREATE TRIGGER record_emp_trig AFTER INSERT OR UPDATE OR DELETE ON emptest FOR EACH STATEMENT EXECUTE FUNCTION distributed_triggers.record_emp(); INSERT INTO emptest VALUES ('test6', 1); ERROR: cannot execute a distributed query from a query on a shard DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "INSERT INTO distributed_triggers.record_op SELECT 'dummy', TG_OP, now()" PL/pgSQL function record_emp() line XX at SQL statement while executing command on localhost:xxxxx DELETE FROM emptest; ERROR: cannot execute a distributed query from a query on a shard DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "INSERT INTO distributed_triggers.record_op SELECT 'dummy', TG_OP, now()" PL/pgSQL function distributed_triggers.record_emp() line XX at SQL statement while executing command on localhost:xxxxx SELECT * FROM emptest; empname | salary --------------------------------------------------------------------- (0 rows) SELECT operation_type FROM record_op; operation_type --------------------------------------------------------------------- (0 rows) -- -- Triggers on reference tables -- CREATE TABLE data_ref_table ( shard_key_value text not null, object_id text not null, value jsonb not null ); ALTER TABLE data_ref_table ADD CONSTRAINT data_ref_pk PRIMARY KEY (shard_key_value, object_id); SELECT create_reference_table('data_ref_table'); create_reference_table --------------------------------------------------------------------- (1 row) -- Trigger function record_change operates on data_changes which is *not* colocated with the reference table CREATE TRIGGER record_change_trigger AFTER INSERT OR UPDATE OR DELETE ON data_ref_table FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_change(); TRUNCATE TABLE data_changes; INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}'); ERROR: cannot execute a distributed query from a query on a shard DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "SELECT change_id FROM distributed_triggers.data_changes WHERE shard_key_value = NEW.shard_key_value AND object_id = NEW.object_id ORDER BY change_id DESC LIMIT 1" PL/pgSQL function record_change() line XX at SQL statement while executing command on localhost:xxxxx INSERT INTO data_ref_table VALUES ('hello2','world2','{"ref":"table"}'); ERROR: cannot execute a distributed query from a query on a shard DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "SELECT change_id FROM distributed_triggers.data_changes WHERE shard_key_value = NEW.shard_key_value AND object_id = NEW.object_id ORDER BY change_id DESC LIMIT 1" PL/pgSQL function record_change() line XX at SQL statement while executing command on localhost:xxxxx DELETE FROM data_ref_table where shard_key_value = 'hello'; BEGIN; UPDATE data_ref_table SET value = '{}' where shard_key_value = 'hello2'; END; TABLE data_changes ORDER BY shard_key_value, object_id, change_id; shard_key_value | object_id | change_id | change_time | operation_type | new_value --------------------------------------------------------------------- (0 rows) TABLE data_ref_table ORDER BY shard_key_value, object_id; shard_key_value | object_id | value --------------------------------------------------------------------- (0 rows) -- Colocate data_changes table with reference table SELECT undistribute_table('data_changes'); NOTICE: creating a new table for distributed_triggers.data_changes NOTICE: moving the data of distributed_triggers.data_changes NOTICE: dropping the old distributed_triggers.data_changes NOTICE: renaming the new table to distributed_triggers.data_changes undistribute_table --------------------------------------------------------------------- (1 row) SELECT create_reference_table('data_changes'); create_reference_table --------------------------------------------------------------------- (1 row) INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}'); ERROR: cannot execute a distributed query from a query on a shard DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. CONTEXT: SQL statement "INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value) VALUES (NEW.shard_key_value, NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value)" PL/pgSQL function record_change() line XX at SQL statement while executing command on localhost:xxxxx TABLE data_changes ORDER BY shard_key_value, object_id, change_id; shard_key_value | object_id | change_id | change_time | operation_type | new_value --------------------------------------------------------------------- (0 rows) TABLE data_ref_table ORDER BY shard_key_value, object_id; shard_key_value | object_id | value --------------------------------------------------------------------- (0 rows) -- Create data_changes table locally with reference table DROP TABLE data_changes; /* table of changes local to each placement of the reference table */ CREATE TABLE data_changes ( shard_key_value text not null, object_id text not null, change_id bigint not null, change_time timestamptz default now(), operation_type text not null, new_value jsonb ); SELECT run_command_on_workers($$CREATE TABLE distributed_triggers.data_changes( shard_key_value text not null, object_id text not null, change_id bigint not null, change_time timestamptz default now(), operation_type text not null, new_value jsonb); $$); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,"CREATE TABLE") (localhost,57638,t,"CREATE TABLE") (2 rows) SELECT run_command_on_workers('SELECT count(*) FROM distributed_triggers.data_changes;'); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,0) (localhost,57638,t,0) (2 rows) INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}'); INSERT INTO data_ref_table VALUES ('hello2','world2','{"ref":"table"}'); BEGIN; UPDATE data_ref_table SET value = '{}'; END; SELECT run_command_on_workers('SELECT count(*) FROM distributed_triggers.data_changes;'); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,4) (localhost,57638,t,4) (2 rows) TABLE data_ref_table ORDER BY shard_key_value, object_id; shard_key_value | object_id | value --------------------------------------------------------------------- hello | world | {} hello2 | world2 | {} (2 rows) -- --Triggers on partitioned tables -- CREATE TABLE sale(sale_date date not null, state_code text, product_sku text, units integer) PARTITION BY list (state_code); ALTER TABLE sale ADD CONSTRAINT sale_pk PRIMARY KEY (state_code, sale_date); CREATE TABLE sale_newyork PARTITION OF sale FOR VALUES IN ('NY'); CREATE TABLE sale_california PARTITION OF sale FOR VALUES IN ('CA'); CREATE TABLE record_sale(operation_type text not null, product_sku text, state_code text, units integer, PRIMARY KEY(state_code, product_sku, operation_type, units)); SELECT create_distributed_table('sale', 'state_code'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('record_sale', 'state_code', colocate_with := 'sale'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE OR REPLACE FUNCTION record_sale() RETURNS trigger AS $$ BEGIN INSERT INTO distributed_triggers.record_sale(operation_type, product_sku, state_code, units) VALUES (TG_OP, NEW.product_sku, NEW.state_code, NEW.units); RETURN NULL; END; $$ LANGUAGE plpgsql; CREATE TRIGGER record_sale_trigger AFTER INSERT OR UPDATE OR DELETE ON sale FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_sale(); INSERT INTO sale(sale_date,state_code,product_sku,units) VALUES ('2019-01-01', 'CA', 'AZ-000A1', 85), ('2019-01-02', 'CA', 'AZ-000A1', 6), ('2019-01-03', 'NY', 'AZ-000A2', 83), ('2019-02-01', 'CA', 'AZ-000A2', 59), ('2019-02-02', 'CA', 'AZ-000A1', 9), ('2019-02-03', 'NY', 'AZ-000A1', 47); TABLE sale ORDER BY state_code, sale_date; sale_date | state_code | product_sku | units --------------------------------------------------------------------- 01-01-2019 | CA | AZ-000A1 | 85 01-02-2019 | CA | AZ-000A1 | 6 02-01-2019 | CA | AZ-000A2 | 59 02-02-2019 | CA | AZ-000A1 | 9 01-03-2019 | NY | AZ-000A2 | 83 02-03-2019 | NY | AZ-000A1 | 47 (6 rows) SELECT operation_type, product_sku, state_code FROM record_sale ORDER BY 1,2,3; operation_type | product_sku | state_code --------------------------------------------------------------------- INSERT | AZ-000A1 | CA INSERT | AZ-000A1 | CA INSERT | AZ-000A1 | CA INSERT | AZ-000A1 | NY INSERT | AZ-000A2 | CA INSERT | AZ-000A2 | NY (6 rows) -- --Test ALTER TRIGGER -- -- Pre PG15, renaming the trigger on the parent table didn't rename the same trigger on -- the children as well. Hence, let's not print the trigger names of the children -- In PG15, rename is consistent for all partitions of the parent -- This is tested in pg15.sql file. CREATE VIEW sale_triggers AS SELECT tgname, tgrelid::regclass, tgenabled FROM pg_trigger WHERE tgrelid::regclass::text = 'sale' ORDER BY 1, 2; SELECT * FROM sale_triggers ORDER BY 1,2; tgname | tgrelid | tgenabled --------------------------------------------------------------------- record_sale_trigger | sale | O truncate_trigger_xxxxxxx | sale | O (2 rows) ALTER TRIGGER "record_sale_trigger" ON "distributed_triggers"."sale" RENAME TO "new_record_sale_trigger"; SELECT * FROM sale_triggers ORDER BY 1,2; tgname | tgrelid | tgenabled --------------------------------------------------------------------- new_record_sale_trigger | sale | O truncate_trigger_xxxxxxx | sale | O (2 rows) CREATE EXTENSION seg; ALTER TRIGGER "emptest_audit" ON "emptest" DEPENDS ON EXTENSION seg; ERROR: Triggers "emptest_audit" on distributed tables and local tables added to metadata are not allowed to depend on an extension DETAIL: Triggers from extensions are expected to be created on the workers by the extension they depend on. DROP TABLE data_ref_table; -- --Triggers with add/remove node -- SELECT * FROM master_drain_node('localhost', :worker_2_port); NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... master_drain_node --------------------------------------------------------------------- (1 row) SELECT 1 from master_remove_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) CREATE TABLE distributed_table(value int); CREATE TABLE distributed_table_change(value int); SELECT create_distributed_table('distributed_table', 'value', colocate_with => 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('distributed_table_change', 'value', colocate_with => 'distributed_table'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE FUNCTION insert_99() RETURNS trigger AS $insert_99$ BEGIN INSERT INTO distributed_triggers.distributed_table_change VALUES (99); RETURN NEW; END; $insert_99$ LANGUAGE plpgsql; CREATE TRIGGER insert_99_trigger AFTER DELETE ON distributed_table FOR EACH ROW EXECUTE FUNCTION distributed_triggers.insert_99(); SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'insert_99_trigger%' ORDER BY 1,2; tgrelid | tgname --------------------------------------------------------------------- distributed_table | insert_99_trigger (1 row) SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'insert_99_trigger%'$$); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,5) (1 row) INSERT INTO distributed_table VALUES (99); DELETE FROM distributed_table; SELECT * FROM distributed_table_change; value --------------------------------------------------------------------- 99 (1 row) -- add the node back SELECT 1 from master_add_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) INSERT INTO distributed_table VALUES (99); DELETE FROM distributed_table; SELECT * FROM distributed_table_change; value --------------------------------------------------------------------- 99 99 (2 rows) SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'insert_99_trigger%' ORDER BY 1,2; tgrelid | tgname --------------------------------------------------------------------- distributed_table | insert_99_trigger (1 row) SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'insert_99_trigger%'$$); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,5) (localhost,57638,t,1) (2 rows) RESET client_min_messages; RESET citus.enable_unsafe_triggers; SELECT run_command_on_workers('ALTER SYSTEM RESET citus.enable_unsafe_triggers;'); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,"ALTER SYSTEM") (localhost,57638,t,"ALTER SYSTEM") (2 rows) SELECT run_command_on_workers('SELECT pg_reload_conf();'); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,t) (localhost,57638,t,t) (2 rows) SET citus.log_remote_commands TO off; DROP SCHEMA distributed_triggers CASCADE; NOTICE: drop cascades to 21 other objects DETAIL: drop cascades to table data drop cascades to function record_change() drop cascades to function insert_delete_document(text,text) drop cascades to function bad_shardkey_record_change() drop cascades to function remote_shardkey_record_change() drop cascades to function insert_document(text,text) drop cascades to table emptest drop cascades to table emptest_audit drop cascades to function process_emp_audit() drop cascades to view emp_triggers drop cascades to table record_op drop cascades to function record_emp() drop cascades to table data_changes drop cascades to table sale drop cascades to table record_sale drop cascades to function record_sale() drop cascades to view sale_triggers drop cascades to extension seg drop cascades to table distributed_table drop cascades to table distributed_table_change drop cascades to function insert_99()