diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 5b7005354..b96882649 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1006,7 +1006,13 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, EnsureTableNotDistributed(relationId); EnsureLocalTableEmptyIfNecessary(relationId, distributionMethod, viaDeprecatedAPI); - EnsureRelationHasNoTriggers(relationId); + + /* user really wants triggers? */ + if (!EnableUnsafeTriggers) + { + EnsureRelationHasNoTriggers(relationId); + } + /* we assume callers took necessary locks */ Relation relation = relation_open(relationId, NoLock); @@ -1328,11 +1334,11 @@ EnsureRelationHasNoTriggers(Oid relationId) Assert(relationName != NULL); ereport(ERROR, (errmsg("cannot distribute relation \"%s\" because it has " - "triggers ", relationName), - errdetail("Citus does not support distributing tables with " - "triggers."), - errhint("Drop all the triggers on \"%s\" and retry.", - relationName))); + "triggers and \"citus.enable_unsafe_triggers\" is " + "set to \"false\"", relationName), + errhint("Consider setting \"citus.enable_unsafe_triggers\" " + "to \"true\", or drop all the triggers on \"%s\" " + "and retry.", relationName))); } } diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 37758aec3..f1aa1fa1e 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -564,7 +564,7 @@ static DistributeObjectOps TextSearchConfig_Rename = { static DistributeObjectOps Trigger_AlterObjectDepends = { .deparse = NULL, .qualify = NULL, - .preprocess = NULL, + .preprocess = PreprocessAlterTriggerDependsStmt, .postprocess = PostprocessAlterTriggerDependsStmt, .address = NULL, .markDistributed = false, diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 5718e675a..4bf1ff373 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -1082,15 +1082,9 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand, } else if (AlterTableCommandTypeIsTrigger(alterTableType)) { - /* - * We already error'ed out for ENABLE/DISABLE trigger commands for - * other citus table types in ErrorIfUnsupportedAlterTableStmt. - */ - Assert(IsCitusTableType(leftRelationId, CITUS_LOCAL_TABLE)); - char *triggerName = command->name; - return CitusLocalTableTriggerCommandDDLJob(leftRelationId, triggerName, - alterTableCommand); + return CitusCreateTriggerCommandDDLJob(leftRelationId, triggerName, + alterTableCommand); } /* @@ -2570,7 +2564,7 @@ ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod, * ALTER TABLE REPLICA IDENTITY * ALTER TABLE SET () * ALTER TABLE RESET () - * ALTER TABLE ENABLE/DISABLE TRIGGER (only for citus local tables) + * ALTER TABLE ENABLE/DISABLE TRIGGER (if enable_unsafe_triggers is not set, we only support triggers for citus local tables) */ static void ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement) @@ -2904,7 +2898,7 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement) errhint("You can issue each subcommand separately"))); } - ErrorOutForTriggerIfNotCitusLocalTable(relationId); + ErrorOutForTriggerIfNotSupported(relationId); break; } diff --git a/src/backend/distributed/commands/trigger.c b/src/backend/distributed/commands/trigger.c index 15ef208aa..a277cb372 100644 --- a/src/backend/distributed/commands/trigger.c +++ b/src/backend/distributed/commands/trigger.c @@ -16,13 +16,16 @@ #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_trigger.h" +#include "commands/extension.h" #include "commands/trigger.h" #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" #include "distributed/coordinator_protocol.h" #include "distributed/deparser.h" #include "distributed/listutils.h" +#include "distributed/metadata/distobject.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/namespace_utils.h" #include "distributed/shard_utils.h" #include "distributed/worker_protocol.h" @@ -52,6 +55,10 @@ static void ErrorIfDropStmtDropsMultipleTriggers(DropStmt *dropTriggerStmt); static int16 GetTriggerTypeById(Oid triggerId); +/* GUC that overrides trigger checks for distributed tables and reference tables */ +bool EnableUnsafeTriggers = false; + + /* * GetExplicitTriggerCommandList returns the list of DDL commands to create * triggers that are explicitly created for the table with relationId. See @@ -215,20 +222,14 @@ PostprocessCreateTriggerStmt(Node *node, const char *queryString) } EnsureCoordinator(); + ErrorOutForTriggerIfNotSupported(relationId); - ErrorOutForTriggerIfNotCitusLocalTable(relationId); + ObjectAddress objectAddress = GetObjectAddressFromParseTree(node, missingOk); + EnsureDependenciesExistOnAllNodes(&objectAddress); - if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) - { - ObjectAddress objectAddress = GetObjectAddressFromParseTree(node, missingOk); - EnsureDependenciesExistOnAllNodes(&objectAddress); - - char *triggerName = createTriggerStmt->trigname; - return CitusLocalTableTriggerCommandDDLJob(relationId, triggerName, - queryString); - } - - return NIL; + char *triggerName = createTriggerStmt->trigname; + return CitusCreateTriggerCommandDDLJob(relationId, triggerName, + queryString); } @@ -328,17 +329,12 @@ PostprocessAlterTriggerRenameStmt(Node *node, const char *queryString) } EnsureCoordinator(); - ErrorOutForTriggerIfNotCitusLocalTable(relationId); + ErrorOutForTriggerIfNotSupported(relationId); - if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) - { - /* use newname as standard process utility already renamed it */ - char *triggerName = renameTriggerStmt->newname; - return CitusLocalTableTriggerCommandDDLJob(relationId, triggerName, - queryString); - } - - return NIL; + /* use newname as standard process utility already renamed it */ + char *triggerName = renameTriggerStmt->newname; + return CitusCreateTriggerCommandDDLJob(relationId, triggerName, + queryString); } @@ -368,6 +364,70 @@ AlterTriggerRenameEventExtendNames(RenameStmt *renameTriggerStmt, char *schemaNa } +/* + * PreprocessAlterTriggerDependsStmt is called during the planning phase of an + * ALTER TRIGGER ... DEPENDS ON EXTENSION ... statement. Since triggers depending on + * extensions are assumed to be Owned by an extension we assume the extension to keep + * the trigger in sync. + * + * If we would allow users to create a dependency between a distributed trigger and an + * extension, our pruning logic for which objects to distribute as dependencies of other + * objects will change significantly, which could cause issues adding new workers. Hence + * we don't allow this dependency to be created. + */ +List * +PreprocessAlterTriggerDependsStmt(Node *node, const char *queryString, + ProcessUtilityContext processUtilityContext) +{ + AlterObjectDependsStmt *alterTriggerDependsStmt = + castNode(AlterObjectDependsStmt, node); + Assert(alterTriggerDependsStmt->objectType == OBJECT_TRIGGER); + + if (creating_extension) + { + /* + * extensions should be created separately on the workers, triggers cascading + * from an extension should therefore not be propagated here. + */ + return NIL; + } + + if (!EnableMetadataSync) + { + /* + * we are configured to disable object propagation, should not propagate anything + */ + return NIL; + } + + RangeVar *relation = alterTriggerDependsStmt->relation; + + bool missingOk = false; + Oid relationId = RangeVarGetRelid(relation, ALTER_TRIGGER_LOCK_MODE, missingOk); + + if (!IsCitusTable(relationId)) + { + return NIL; + } + + /* + * Distributed objects should not start depending on an extension, this will break + * the dependency resolving mechanism we use to replicate distributed objects to new + * workers + */ + + Value *triggerNameValue = + GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt); + ereport(ERROR, (errmsg( + "Triggers \"%s\" on distributed tables and local tables added to metadata " + "are not allowed to depend on an extension", strVal( + triggerNameValue)), + errdetail( + "Triggers from extensions are expected to be created on the workers " + "by the extension they depend on."))); +} + + /* * PostprocessAlterTriggerDependsStmt is called after a ALTER TRIGGER DEPENDS ON * command has been executed by standard process utility. This function errors out @@ -392,17 +452,12 @@ PostprocessAlterTriggerDependsStmt(Node *node, const char *queryString) } EnsureCoordinator(); - ErrorOutForTriggerIfNotCitusLocalTable(relationId); + ErrorOutForTriggerIfNotSupported(relationId); - if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) - { - Value *triggerNameValue = - GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt); - return CitusLocalTableTriggerCommandDDLJob(relationId, strVal(triggerNameValue), - queryString); - } - - return NIL; + Value *triggerNameValue = + GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt); + return CitusCreateTriggerCommandDDLJob(relationId, strVal(triggerNameValue), + queryString); } @@ -459,7 +514,7 @@ GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt *alterTriggerDepen * unsupported commands or creates ddl job for supported DROP TRIGGER commands. * The reason we process drop trigger commands before standard process utility * (unlike the other type of trigger commands) is that we act according to trigger - * type in CitusLocalTableTriggerCommandDDLJob but trigger wouldn't exist after + * type in CitusCreateTriggerCommandDDLJob but trigger wouldn't exist after * standard process utility. */ List * @@ -487,15 +542,10 @@ PreprocessDropTriggerStmt(Node *node, const char *queryString, ErrorIfUnsupportedDropTriggerCommand(dropTriggerStmt); - if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) - { - char *triggerName = NULL; - ExtractDropStmtTriggerAndRelationName(dropTriggerStmt, &triggerName, NULL); - return CitusLocalTableTriggerCommandDDLJob(relationId, triggerName, - queryString); - } - - return NIL; + char *triggerName = NULL; + ExtractDropStmtTriggerAndRelationName(dropTriggerStmt, &triggerName, NULL); + return CitusCreateTriggerCommandDDLJob(relationId, triggerName, + queryString); } @@ -517,24 +567,34 @@ ErrorIfUnsupportedDropTriggerCommand(DropStmt *dropTriggerStmt) } EnsureCoordinator(); - ErrorOutForTriggerIfNotCitusLocalTable(relationId); + ErrorOutForTriggerIfNotSupported(relationId); } /* - * ErrorOutForTriggerIfNotCitusLocalTable is a helper function to error + * ErrorOutForTriggerIfNotSupported is a helper function to error * out for unsupported trigger commands depending on the citus table type. */ void -ErrorOutForTriggerIfNotCitusLocalTable(Oid relationId) +ErrorOutForTriggerIfNotSupported(Oid relationId) { - if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) + if (EnableUnsafeTriggers) { + /* user really wants triggers */ return; } + else if (IsCitusTableType(relationId, REFERENCE_TABLE)) + { + ereport(ERROR, (errmsg("triggers are not supported on reference tables"))); + } + else if (IsCitusTableType(relationId, DISTRIBUTED_TABLE)) + { + ereport(ERROR, (errmsg("triggers are not supported on distributed tables " + "when \"citus.enable_unsafe_triggers\" is set to " + "\"false\""))); + } - ereport(ERROR, (errmsg("triggers are only supported for local tables added " - "to metadata"))); + /* we always support triggers on citus local tables */ } @@ -643,13 +703,13 @@ ErrorIfDropStmtDropsMultipleTriggers(DropStmt *dropTriggerStmt) /* - * CitusLocalTableTriggerCommandDDLJob creates a ddl job to execute given + * CitusCreateTriggerCommandDDLJob creates a ddl job to execute given * queryString trigger command on shell relation(s) in mx worker(s) and to * execute necessary ddl task on citus local table shard (if needed). */ List * -CitusLocalTableTriggerCommandDDLJob(Oid relationId, char *triggerName, - const char *queryString) +CitusCreateTriggerCommandDDLJob(Oid relationId, char *triggerName, + const char *queryString) { DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 521d3e406..a3a6cb3c7 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -979,6 +979,17 @@ RegisterCitusConfigVariables(void) GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_unsafe_triggers", + gettext_noop("Enables arbitrary triggers on distributed tables which may cause " + "visibility and deadlock issues. Use at your own risk."), + NULL, + &EnableUnsafeTriggers, + false, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_version_checks", gettext_noop("Enables version checks during CREATE/ALTER EXTENSION commands"), diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 1b2b92590..c2bf66d5b 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -26,6 +26,9 @@ extern bool AddAllLocalTablesToMetadata; /* controlled via GUC, should be accessed via EnableLocalReferenceForeignKeys() */ extern bool EnableLocalReferenceForeignKeys; +extern bool EnableUnsafeTriggers; + + extern void SwitchToSequentialAndLocalExecutionIfRelationNameTooLong(Oid relationId, char * finalRelationName); @@ -580,16 +583,19 @@ extern List * PostprocessAlterTriggerRenameStmt(Node *node, const char *queryStr extern void AlterTriggerRenameEventExtendNames(RenameStmt *renameTriggerStmt, char *schemaName, uint64 shardId); extern List * PostprocessAlterTriggerDependsStmt(Node *node, const char *queryString); +extern List * PreprocessAlterTriggerDependsStmt(Node *node, const char *queryString, + ProcessUtilityContext + processUtilityContext); extern void AlterTriggerDependsEventExtendNames( AlterObjectDependsStmt *alterTriggerDependsStmt, char *schemaName, uint64 shardId); +extern void ErrorOutForTriggerIfNotSupported(Oid relationId); extern List * PreprocessDropTriggerStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); -extern void ErrorOutForTriggerIfNotCitusLocalTable(Oid relationId); extern void DropTriggerEventExtendNames(DropStmt *dropTriggerStmt, char *schemaName, uint64 shardId); -extern List * CitusLocalTableTriggerCommandDDLJob(Oid relationId, char *triggerName, - const char *queryString); +extern List * CitusCreateTriggerCommandDDLJob(Oid relationId, char *triggerName, + const char *queryString); extern Oid GetTriggerFunctionId(Oid triggerId); /* cascade_table_operation_for_connected_relations.c */ diff --git a/src/test/regress/citus_tests/config.py b/src/test/regress/citus_tests/config.py index f3091b359..52b0714c8 100644 --- a/src/test/regress/citus_tests/config.py +++ b/src/test/regress/citus_tests/config.py @@ -161,6 +161,7 @@ class CitusDefaultClusterConfig(CitusBaseClusterConfig): new_settings = { "client_min_messages": "WARNING", "citus.sort_returning": True, + "citus.use_citus_managed_tables": True, } self.settings.update(new_settings) self.add_coordinator_to_metadata = True @@ -291,6 +292,7 @@ class CitusUnusualQuerySettingsConfig(CitusMXBaseClusterConfig): def __init__(self, arguments): super().__init__(arguments) self.new_settings = { + "citus.use_citus_managed_tables": False, "citus.task_assignment_policy": "first-replica", "citus.enable_fast_path_router_planner": False, "citus.enable_local_execution": False, diff --git a/src/test/regress/expected/citus_local_table_triggers.out b/src/test/regress/expected/citus_local_table_triggers.out index 007986fab..fa89b78d4 100644 --- a/src/test/regress/expected/citus_local_table_triggers.out +++ b/src/test/regress/expected/citus_local_table_triggers.out @@ -194,29 +194,29 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 BEFORE INSERT ON "interesting!schema"."citus_local!_table" FOR EACH STATEMENT EXECUTE FUNCTION dummy_function();') CREATE EXTENSION seg; +-- ALTER TRIGGER DEPENDS ON +ALTER TRIGGER "trigger\'name" ON "interesting!schema"."citus_local!_table" DEPENDS ON EXTENSION seg; +ERROR: Triggers "trigger\'name" on distributed tables and local tables added to metadata are not allowed to depend on an extension BEGIN; - -- ALTER TRIGGER DEPENDS ON - ALTER TRIGGER "trigger\'name" ON "interesting!schema"."citus_local!_table" DEPENDS ON EXTENSION seg; -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507008, 'interesting!schema', E'ALTER TRIGGER "trigger\\''name" ON "interesting!schema"."citus_local!_table" DEPENDS ON EXTENSION seg;') - -- show that triggers on both shell relation and shard relation are depending on seg + -- show that triggers on both shell relation and shard relation are not depending on seg SELECT tgname FROM pg_depend, pg_trigger, pg_extension WHERE deptype = 'x' and classid='pg_trigger'::regclass and pg_trigger.oid=pg_depend.objid and extname='seg' ORDER BY 1; - tgname + tgname --------------------------------------------------------------------- - trigger\'name - trigger\'name_1507008 -(2 rows) +(0 rows) DROP EXTENSION seg; - -- show that dropping extension drops the triggers automatically + -- show that dropping extension doesn't drop the triggers automatically SELECT * FROM citus_local_table_triggers WHERE tgname NOT LIKE 'RI_ConstraintTrigger%'; - tgname | tgrelid | tgenabled + tgname | tgrelid | tgenabled --------------------------------------------------------------------- - truncate_trigger_xxxxxxx | "interesting!schema"."citus_local!_table" | O -(1 row) + trigger\'name | "interesting!schema"."citus_local!_table" | O + trigger\'name_1507008 | "interesting!schema"."citus_local!_table_1507008" | O + truncate_trigger_xxxxxxx | "interesting!schema"."citus_local!_table" | O +(3 rows) ROLLBACK; -- ALTER TRIGGER RENAME diff --git a/src/test/regress/expected/citus_local_tables_mx.out b/src/test/regress/expected/citus_local_tables_mx.out index 0052c63af..0a50232ba 100644 --- a/src/test/regress/expected/citus_local_tables_mx.out +++ b/src/test/regress/expected/citus_local_tables_mx.out @@ -51,25 +51,22 @@ SELECT 1 FROM citus_activate_node('localhost', :worker_1_port); 1 (1 row) +-- show that the trigger is not allowed to depend(explicitly) on an extension. CREATE EXTENSION seg; ALTER TRIGGER dummy_function_trigger ON citus_local_table DEPENDS ON EXTENSION seg; -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1508000, 'citus_local_tables_mx', 'ALTER TRIGGER dummy_function_trigger ON citus_local_table DEPENDS ON EXTENSION seg;') +ERROR: Triggers "dummy_function_trigger" on distributed tables and local tables added to metadata are not allowed to depend on an extension ALTER TRIGGER dummy_function_trigger ON citus_local_table RENAME TO renamed_trigger; NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1508000, 'citus_local_tables_mx', 'ALTER TRIGGER dummy_function_trigger ON citus_local_table RENAME TO renamed_trigger;') ALTER TABLE citus_local_table DISABLE TRIGGER ALL; NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1508000, 'citus_local_tables_mx', 'ALTER TABLE citus_local_table DISABLE TRIGGER ALL;') --- show that update trigger mx relation are depending on seg, renamed and disabled. +-- show that update trigger mx relation is renamed and disabled. -- both workers should should print 1. SELECT run_command_on_workers( $$ -SELECT COUNT(*) FROM pg_depend, pg_trigger, pg_extension +SELECT COUNT(*) FROM pg_trigger WHERE pg_trigger.tgrelid='citus_local_tables_mx.citus_local_table'::regclass AND pg_trigger.tgname='renamed_trigger' AND - pg_trigger.tgenabled='D' AND - pg_depend.classid='pg_trigger'::regclass AND - pg_depend.deptype='x' AND - pg_trigger.oid=pg_depend.objid AND - pg_extension.extname='seg' + pg_trigger.tgenabled='D' $$); run_command_on_workers --------------------------------------------------------------------- diff --git a/src/test/regress/expected/citus_table_triggers.out b/src/test/regress/expected/citus_table_triggers.out index f9252c166..e5d14b69e 100644 --- a/src/test/regress/expected/citus_table_triggers.out +++ b/src/test/regress/expected/citus_table_triggers.out @@ -34,11 +34,11 @@ SELECT create_reference_table('reference_table'); CREATE TRIGGER update_value_dist AFTER INSERT ON distributed_table FOR EACH ROW EXECUTE FUNCTION update_value(); -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false" CREATE TRIGGER update_value_ref AFTER INSERT ON reference_table FOR EACH ROW EXECUTE FUNCTION update_value(); -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on reference tables --------------------------------------------------------------------- -- show that we error out for trigger commands on distributed & reference tables --------------------------------------------------------------------- @@ -56,42 +56,42 @@ SET citus.enable_ddl_propagation to ON; CREATE EXTENSION seg; -- below all should error out ALTER TRIGGER update_value_dist ON distributed_table RENAME TO update_value_dist1; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false" ALTER TRIGGER update_value_dist ON distributed_table DEPENDS ON EXTENSION seg; -ERROR: triggers are only supported for local tables added to metadata +ERROR: Triggers "update_value_dist" on distributed tables and local tables added to metadata are not allowed to depend on an extension DROP TRIGGER update_value_dist ON distributed_table; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false" ALTER TABLE distributed_table DISABLE TRIGGER ALL; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false" ALTER TABLE distributed_table DISABLE TRIGGER USER; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false" ALTER TABLE distributed_table DISABLE TRIGGER update_value_dist; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false" ALTER TABLE distributed_table ENABLE TRIGGER ALL; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false" ALTER TABLE distributed_table ENABLE TRIGGER USER; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false" ALTER TABLE distributed_table ENABLE TRIGGER update_value_dist; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false" -- below all should error out ALTER TRIGGER update_value_ref ON reference_table RENAME TO update_value_ref1; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on reference tables ALTER TRIGGER update_value_ref ON reference_table DEPENDS ON EXTENSION seg; -ERROR: triggers are only supported for local tables added to metadata +ERROR: Triggers "update_value_ref" on distributed tables and local tables added to metadata are not allowed to depend on an extension DROP TRIGGER update_value_ref ON reference_table; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on reference tables ALTER TABLE reference_table DISABLE TRIGGER ALL; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on reference tables ALTER TABLE reference_table DISABLE TRIGGER USER; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on reference tables ALTER TABLE reference_table DISABLE TRIGGER update_value_ref; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on reference tables ALTER TABLE reference_table ENABLE TRIGGER ALL; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on reference tables ALTER TABLE reference_table ENABLE TRIGGER USER; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on reference tables ALTER TABLE reference_table ENABLE TRIGGER update_value_ref; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on reference tables --------------------------------------------------------------------- -- show that we do not allow creating citus tables if the -- table has already triggers @@ -106,9 +106,9 @@ AFTER INSERT ON reference_table_1 FOR EACH ROW EXECUTE FUNCTION update_value(); -- below two should fail SELECT create_distributed_table('distributed_table_1', 'value'); -ERROR: cannot distribute relation "distributed_table_1" because it has triggers +ERROR: cannot distribute relation "distributed_table_1" because it has triggers and "citus.enable_unsafe_triggers" is set to "false" SELECT create_reference_table('reference_table_1'); -ERROR: cannot distribute relation "reference_table_1" because it has triggers +ERROR: cannot distribute relation "reference_table_1" because it has triggers and "citus.enable_unsafe_triggers" is set to "false" --------------------------------------------------------------------- -- test deparse logic for CREATE TRIGGER commands -- via master_get_table_ddl_events diff --git a/src/test/regress/expected/create_ref_dist_from_citus_local.out b/src/test/regress/expected/create_ref_dist_from_citus_local.out index 7f6821b1f..14fffed22 100644 --- a/src/test/regress/expected/create_ref_dist_from_citus_local.out +++ b/src/test/regress/expected/create_ref_dist_from_citus_local.out @@ -346,7 +346,7 @@ BEGIN; FOR EACH ROW EXECUTE PROCEDURE update_value(); -- show that we error out as we don't supprt triggers on distributed tables SELECT create_distributed_table('citus_local_table_6', 'col_1'); -ERROR: cannot distribute relation "citus_local_table_6" because it has triggers +ERROR: cannot distribute relation "citus_local_table_6" because it has triggers and "citus.enable_unsafe_triggers" is set to "false" ROLLBACK; -- make sure that creating append / range distributed tables is also ok BEGIN; diff --git a/src/test/regress/expected/distributed_triggers.out b/src/test/regress/expected/distributed_triggers.out new file mode 100644 index 000000000..ac038be51 --- /dev/null +++ b/src/test/regress/expected/distributed_triggers.out @@ -0,0 +1,921 @@ +SET citus.log_remote_commands TO OFF; +DROP SCHEMA IF EXISTS distributed_triggers CASCADE; +NOTICE: schema "distributed_triggers" does not exist, skipping +CREATE SCHEMA distributed_triggers; +SET search_path TO 'distributed_triggers'; +SET citus.shard_replication_factor = 1; +SET citus.shard_count = 32; +SET citus.next_shard_id TO 800000; +-- +-- Test citus.enable_unsafe_triggers +-- Enables arbitrary triggers on distributed tables +-- +CREATE TABLE data ( + shard_key_value text not null, + object_id text not null, + value jsonb not null +); +ALTER TABLE data +ADD CONSTRAINT data_pk +PRIMARY KEY (shard_key_value, object_id); +/* table of changes */ +CREATE TABLE data_changes ( + shard_key_value text not null, + object_id text not null, + change_id bigint not null, + change_time timestamptz default now(), + operation_type text not null, + new_value jsonb +); +ALTER TABLE data_changes +ADD CONSTRAINT data_changes_pk +PRIMARY KEY (shard_key_value, object_id, change_id); +SELECT create_distributed_table('data', 'shard_key_value'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('data_changes', 'shard_key_value', colocate_with := 'data'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET citus.enable_unsafe_triggers TO true; +SELECT run_command_on_workers('ALTER SYSTEM SET citus.enable_unsafe_triggers TO true;'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"ALTER SYSTEM") + (localhost,57638,t,"ALTER SYSTEM") +(2 rows) + +SELECT run_command_on_workers('SELECT pg_reload_conf();'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,t) + (localhost,57638,t,t) +(2 rows) + +/* trigger function that is called after any change */ +CREATE OR REPLACE FUNCTION record_change() +RETURNS trigger +AS $$ +DECLARE + last_change_id bigint; +BEGIN + IF (TG_OP = 'DELETE') THEN + /* get the last change ID for object key in OLD via index(-only) scan */ + SELECT change_id INTO last_change_id + FROM distributed_triggers.data_changes + WHERE shard_key_value = OLD.shard_key_value AND object_id = OLD.object_id + ORDER BY change_id DESC LIMIT 1; + + /* insert a change record for the delete */ + INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type) + VALUES (OLD.shard_key_value, OLD.object_id, COALESCE(last_change_id + 1, 1), TG_OP); + ELSE + /* get the last change ID for object key in NEW via index(-only) scan */ + SELECT change_id INTO last_change_id + FROM distributed_triggers.data_changes + WHERE shard_key_value = NEW.shard_key_value AND object_id = NEW.object_id + ORDER BY change_id DESC LIMIT 1; + + /* insert a change record for the insert/update */ + INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value) + VALUES (NEW.shard_key_value, NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value); + END IF; + + RETURN NULL; +END; +$$ LANGUAGE plpgsql; +SELECT proname from pg_proc WHERE oid='distributed_triggers.record_change'::regproc; + proname +--------------------------------------------------------------------- + record_change +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE oid='distributed_triggers.record_change'::regproc$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,1) + (localhost,57638,t,1) +(2 rows) + +CREATE TRIGGER record_change_trigger +AFTER INSERT OR UPDATE OR DELETE ON data +FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_change(); +-- Trigger function should appear on workers +SELECT proname from pg_proc WHERE oid='distributed_triggers.record_change'::regproc; + proname +--------------------------------------------------------------------- + record_change +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE oid='distributed_triggers.record_change'::regproc$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,1) + (localhost,57638,t,1) +(2 rows) + +INSERT INTO data VALUES ('hello','world','{"hello":"world"}'); +INSERT INTO data VALUES ('hello2','world2','{"hello2":"world2"}'); +INSERT INTO data VALUES ('hello3','world3','{"hello3":"world3"}'); +DELETE FROM data where shard_key_value = 'hello'; +BEGIN; +UPDATE data SET value = '{}' where shard_key_value = 'hello3'; +END; +DELETE FROM data where shard_key_value = 'hello3'; +SELECT * FROM data +ORDER BY shard_key_value, object_id; + shard_key_value | object_id | value +--------------------------------------------------------------------- + hello2 | world2 | {"hello2": "world2"} +(1 row) + +SELECT shard_key_value, object_id, change_id, operation_type, new_value +FROM data_changes +ORDER BY shard_key_value, object_id, change_id; + shard_key_value | object_id | change_id | operation_type | new_value +--------------------------------------------------------------------- + hello | world | 1 | INSERT | {"hello": "world"} + hello | world | 2 | DELETE | + hello2 | world2 | 1 | INSERT | {"hello2": "world2"} + hello3 | world3 | 1 | INSERT | {"hello3": "world3"} + hello3 | world3 | 2 | UPDATE | {} + hello3 | world3 | 3 | DELETE | +(6 rows) + +CREATE FUNCTION insert_delete_document(key text, id text) +RETURNS void LANGUAGE plpgsql AS $fn$ +BEGIN + INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}'); + DELETE FROM distributed_triggers.data where shard_key_value = key; +END; +$fn$; +SELECT create_distributed_function( + 'insert_delete_document(text, text)', 'key', + colocate_with := 'data', + force_delegation := true +); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT insert_delete_document('hello4', 'world4'); + insert_delete_document +--------------------------------------------------------------------- + +(1 row) + +BEGIN; +SELECT insert_delete_document('hello4', 'world4'); + insert_delete_document +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +SELECT * FROM data +ORDER BY shard_key_value, object_id; + shard_key_value | object_id | value +--------------------------------------------------------------------- + hello2 | world2 | {"hello2": "world2"} +(1 row) + +SELECT shard_key_value, object_id, change_id, operation_type, new_value +FROM data_changes +ORDER BY shard_key_value, object_id, change_id; + shard_key_value | object_id | change_id | operation_type | new_value +--------------------------------------------------------------------- + hello | world | 1 | INSERT | {"hello": "world"} + hello | world | 2 | DELETE | + hello2 | world2 | 1 | INSERT | {"hello2": "world2"} + hello3 | world3 | 1 | INSERT | {"hello3": "world3"} + hello3 | world3 | 2 | UPDATE | {} + hello3 | world3 | 3 | DELETE | + hello4 | world4 | 1 | INSERT | {"id1": "id2"} + hello4 | world4 | 2 | DELETE | + hello4 | world4 | 3 | INSERT | {"id1": "id2"} + hello4 | world4 | 4 | DELETE | +(10 rows) + +SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'record_change_trigger%' ORDER BY 1,2; + tgrelid | tgname +--------------------------------------------------------------------- + data | record_change_trigger +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'record_change_trigger%';$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,17) + (localhost,57638,t,17) +(2 rows) + +ALTER TRIGGER "record_change_trigger" ON "distributed_triggers"."data" RENAME TO "new_record_change_trigger"; +SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'record_change_trigger%' ORDER BY 1,2; + tgrelid | tgname +--------------------------------------------------------------------- +(0 rows) + +SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'new_record_change_trigger%' ORDER BY 1,2; + tgrelid | tgname +--------------------------------------------------------------------- + data | new_record_change_trigger +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'record_change_trigger%';$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,0) + (localhost,57638,t,0) +(2 rows) + +SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'new_record_change_trigger%';$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,17) + (localhost,57638,t,17) +(2 rows) + +--This should fail +DROP TRIGGER record_change_trigger ON data; +ERROR: trigger "record_change_trigger" for table "data" does not exist +DROP TRIGGER new_record_change_trigger ON data; +--Trigger should go away +SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'new_record_change_trigger%' ORDER BY 1,2; + tgrelid | tgname +--------------------------------------------------------------------- +(0 rows) + +-- +-- Run bad triggers +-- +CREATE OR REPLACE FUNCTION bad_shardkey_record_change() +RETURNS trigger +AS $$ +DECLARE + last_change_id bigint; +BEGIN + INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value) + VALUES ('BAD', NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value); + RETURN NULL; +END; +$$ LANGUAGE plpgsql; +CREATE TRIGGER bad_shardkey_record_change_trigger +AFTER INSERT OR UPDATE OR DELETE ON data +FOR EACH ROW EXECUTE FUNCTION distributed_triggers.bad_shardkey_record_change(); +-- Bad trigger fired from an individual SQL +-- Query-on-distributed table exception should catch this +INSERT INTO data VALUES ('hello6','world6','{"hello6":"world6"}'); +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value) + VALUES ('BAD', NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value)" +PL/pgSQL function bad_shardkey_record_change() line XX at SQL statement +while executing command on localhost:xxxxx +-- Bad trigger fired from SQL inside a force-delegated function +-- Incorrect distribution key exception should catch this +SELECT insert_delete_document('hello6', 'world6'); +ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown +HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false) +CONTEXT: SQL statement "INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value) + VALUES ('BAD', NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value)" +PL/pgSQL function distributed_triggers.bad_shardkey_record_change() line XX at SQL statement +SQL statement "INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}')" +PL/pgSQL function distributed_triggers.insert_delete_document(text,text) line XX at SQL statement +while executing command on localhost:xxxxx +SELECT * FROM data +ORDER BY shard_key_value, object_id; + shard_key_value | object_id | value +--------------------------------------------------------------------- + hello2 | world2 | {"hello2": "world2"} +(1 row) + +SELECT shard_key_value, object_id, change_id, operation_type, new_value +FROM data_changes +ORDER BY shard_key_value, object_id, change_id; + shard_key_value | object_id | change_id | operation_type | new_value +--------------------------------------------------------------------- + hello | world | 1 | INSERT | {"hello": "world"} + hello | world | 2 | DELETE | + hello2 | world2 | 1 | INSERT | {"hello2": "world2"} + hello3 | world3 | 1 | INSERT | {"hello3": "world3"} + hello3 | world3 | 2 | UPDATE | {} + hello3 | world3 | 3 | DELETE | + hello4 | world4 | 1 | INSERT | {"id1": "id2"} + hello4 | world4 | 2 | DELETE | + hello4 | world4 | 3 | INSERT | {"id1": "id2"} + hello4 | world4 | 4 | DELETE | +(10 rows) + +DROP TRIGGER bad_shardkey_record_change_trigger ON data; +CREATE OR REPLACE FUNCTION remote_shardkey_record_change() +RETURNS trigger +AS $$ +DECLARE + last_change_id bigint; +BEGIN + UPDATE distributed_triggers.data_changes SET operation_type = TG_OP; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; +CREATE TRIGGER remote_shardkey_record_change_trigger +AFTER INSERT OR UPDATE OR DELETE ON data +FOR EACH ROW EXECUTE FUNCTION distributed_triggers.remote_shardkey_record_change(); +CREATE FUNCTION insert_document(key text, id text) +RETURNS void LANGUAGE plpgsql AS $fn$ +BEGIN + INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}'); + DELETE FROM distributed_triggers.data where shard_key_value = key; +END; +$fn$; +SELECT create_distributed_function( + 'insert_document(text, text)', 'key', + colocate_with := 'data', + force_delegation := false +); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +BEGIN; +SELECT insert_document('hello7', 'world7'); +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "UPDATE distributed_triggers.data_changes SET operation_type = TG_OP" +PL/pgSQL function remote_shardkey_record_change() line XX at SQL statement +while executing command on localhost:xxxxx +SQL statement "INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}')" +PL/pgSQL function insert_document(text,text) line XX at SQL statement +END; +SELECT insert_document('hello7', 'world7'); +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "UPDATE distributed_triggers.data_changes SET operation_type = TG_OP" +PL/pgSQL function remote_shardkey_record_change() line XX at SQL statement +SQL statement "INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}')" +PL/pgSQL function insert_document(text,text) line XX at SQL statement +while executing command on localhost:xxxxx +SELECT * FROM data +ORDER BY shard_key_value, object_id; + shard_key_value | object_id | value +--------------------------------------------------------------------- + hello2 | world2 | {"hello2": "world2"} +(1 row) + +SELECT shard_key_value, object_id, change_id, operation_type, new_value +FROM data_changes +ORDER BY shard_key_value, object_id, change_id; + shard_key_value | object_id | change_id | operation_type | new_value +--------------------------------------------------------------------- + hello | world | 1 | INSERT | {"hello": "world"} + hello | world | 2 | DELETE | + hello2 | world2 | 1 | INSERT | {"hello2": "world2"} + hello3 | world3 | 1 | INSERT | {"hello3": "world3"} + hello3 | world3 | 2 | UPDATE | {} + hello3 | world3 | 3 | DELETE | + hello4 | world4 | 1 | INSERT | {"id1": "id2"} + hello4 | world4 | 2 | DELETE | + hello4 | world4 | 3 | INSERT | {"id1": "id2"} + hello4 | world4 | 4 | DELETE | +(10 rows) + +-- +-- Triggers (tables) which are not colocated +-- +CREATE TABLE emptest ( + empname text NOT NULL, + salary integer +); +CREATE TABLE emptest_audit( + operation char(1) NOT NULL, + stamp timestamp NOT NULL, + userid text NOT NULL, + empname text NOT NULL, + salary integer +); +SELECT create_distributed_table('emptest','empname',colocate_with :='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('emptest_audit','empname',colocate_with :='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION process_emp_audit() RETURNS TRIGGER AS $emp_audit$ + BEGIN + -- + -- Create a row in emp_audit to reflect the operation performed on emp, + -- making use of the special variable TG_OP to work out the operation. + -- + IF (TG_OP = 'DELETE') THEN + INSERT INTO distributed_triggers.emptest_audit SELECT 'D', now(), user, OLD.*; + ELSIF (TG_OP = 'UPDATE') THEN + INSERT INTO distributed_triggers.emptest_audit SELECT 'U', now(), user, NEW.*; + ELSIF (TG_OP = 'INSERT') THEN + INSERT INTO distributed_triggers.emptest_audit SELECT 'I', now(), user, NEW.*; + END IF; + RETURN NULL; -- result is ignored since this is an AFTER trigger + END; +$emp_audit$ LANGUAGE plpgsql; +CREATE TRIGGER emptest_audit +AFTER INSERT OR UPDATE OR DELETE ON emptest + FOR EACH ROW EXECUTE FUNCTION distributed_triggers.process_emp_audit(); +INSERT INTO emptest VALUES ('test1', 1); +INSERT INTO emptest VALUES ('test2', 1); +INSERT INTO emptest VALUES ('test3', 1); +INSERT INTO emptest VALUES ('test4', 1); +SELECT operation, userid, empname, salary +FROM emptest_audit +ORDER BY 3,1; + operation | userid | empname | salary +--------------------------------------------------------------------- + I | postgres | test1 | 1 + I | postgres | test2 | 1 + I | postgres | test3 | 1 + I | postgres | test4 | 1 +(4 rows) + +DELETE from emptest; +SELECT operation, userid, empname, salary +FROM emptest_audit +ORDER BY 3,1; + operation | userid | empname | salary +--------------------------------------------------------------------- + D | postgres | test1 | 1 + I | postgres | test1 | 1 + D | postgres | test2 | 1 + I | postgres | test2 | 1 + D | postgres | test3 | 1 + I | postgres | test3 | 1 + D | postgres | test4 | 1 + I | postgres | test4 | 1 +(8 rows) + +CREATE VIEW emp_triggers AS + SELECT tgname, tgrelid::regclass, tgenabled + FROM pg_trigger + WHERE tgrelid::regclass::text like 'emptest%' + ORDER BY 1, 2; +SELECT * FROM emp_triggers ORDER BY 1,2; + tgname | tgrelid | tgenabled +--------------------------------------------------------------------- + emptest_audit | emptest | O + truncate_trigger_xxxxxxx | emptest | O + truncate_trigger_xxxxxxx | emptest_audit | O +(3 rows) + +-- Triggers "FOR EACH STATEMENT" +CREATE TABLE record_op ( + empname text NOT NULL, + operation_type text not null, + stamp timestamp NOT NULL +); +SELECT create_distributed_table('record_op', 'empname', colocate_with := 'emptest'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION record_emp() RETURNS TRIGGER AS $rec_audit$ + BEGIN + INSERT INTO distributed_triggers.record_op SELECT 'dummy', TG_OP, now(); + RETURN NULL; -- result is ignored since this is an AFTER trigger + END; +$rec_audit$ LANGUAGE plpgsql; +CREATE TRIGGER record_emp_trig +AFTER INSERT OR UPDATE OR DELETE ON emptest + FOR EACH STATEMENT EXECUTE FUNCTION distributed_triggers.record_emp(); +INSERT INTO emptest VALUES ('test5', 1); +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "INSERT INTO distributed_triggers.record_op SELECT 'dummy', TG_OP, now()" +PL/pgSQL function record_emp() line XX at SQL statement +while executing command on localhost:xxxxx +DELETE FROM emptest; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "INSERT INTO distributed_triggers.record_op SELECT 'dummy', TG_OP, now()" +PL/pgSQL function distributed_triggers.record_emp() line XX at SQL statement +while executing command on localhost:xxxxx +SELECT * FROM emptest; + empname | salary +--------------------------------------------------------------------- +(0 rows) + +SELECT operation_type FROM record_op; + operation_type +--------------------------------------------------------------------- +(0 rows) + +-- +-- Triggers on reference tables +-- +CREATE TABLE data_ref_table ( + shard_key_value text not null, + object_id text not null, + value jsonb not null +); +ALTER TABLE data_ref_table +ADD CONSTRAINT data_ref_pk +PRIMARY KEY (shard_key_value, object_id); +SELECT create_reference_table('data_ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- Trigger function record_change operates on data_changes which is *not* colocated with the reference table +CREATE TRIGGER record_change_trigger +AFTER INSERT OR UPDATE OR DELETE ON data_ref_table +FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_change(); +TRUNCATE TABLE data_changes; +INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}'); +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "SELECT change_id FROM distributed_triggers.data_changes + WHERE shard_key_value = NEW.shard_key_value AND object_id = NEW.object_id + ORDER BY change_id DESC LIMIT 1" +PL/pgSQL function record_change() line XX at SQL statement +while executing command on localhost:xxxxx +INSERT INTO data_ref_table VALUES ('hello2','world2','{"ref":"table"}'); +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "SELECT change_id FROM distributed_triggers.data_changes + WHERE shard_key_value = NEW.shard_key_value AND object_id = NEW.object_id + ORDER BY change_id DESC LIMIT 1" +PL/pgSQL function record_change() line XX at SQL statement +while executing command on localhost:xxxxx +DELETE FROM data_ref_table where shard_key_value = 'hello'; +BEGIN; +UPDATE data_ref_table SET value = '{}' where shard_key_value = 'hello2'; +END; +TABLE data_changes ORDER BY shard_key_value, object_id, change_id; + shard_key_value | object_id | change_id | change_time | operation_type | new_value +--------------------------------------------------------------------- +(0 rows) + +TABLE data_ref_table ORDER BY shard_key_value, object_id; + shard_key_value | object_id | value +--------------------------------------------------------------------- +(0 rows) + +-- Colocate data_changes table with reference table +SELECT undistribute_table('data_changes'); +NOTICE: creating a new table for distributed_triggers.data_changes +NOTICE: moving the data of distributed_triggers.data_changes +NOTICE: dropping the old distributed_triggers.data_changes +NOTICE: renaming the new table to distributed_triggers.data_changes + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('data_changes'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}'); +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value) + VALUES (NEW.shard_key_value, NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value)" +PL/pgSQL function record_change() line XX at SQL statement +while executing command on localhost:xxxxx +TABLE data_changes ORDER BY shard_key_value, object_id, change_id; + shard_key_value | object_id | change_id | change_time | operation_type | new_value +--------------------------------------------------------------------- +(0 rows) + +TABLE data_ref_table ORDER BY shard_key_value, object_id; + shard_key_value | object_id | value +--------------------------------------------------------------------- +(0 rows) + +-- Create data_changes table locally with reference table +DROP TABLE data_changes; +/* table of changes local to each placement of the reference table */ +CREATE TABLE data_changes ( + shard_key_value text not null, + object_id text not null, + change_id bigint not null, + change_time timestamptz default now(), + operation_type text not null, + new_value jsonb +); +SELECT run_command_on_workers($$CREATE TABLE distributed_triggers.data_changes( + shard_key_value text not null, + object_id text not null, + change_id bigint not null, + change_time timestamptz default now(), + operation_type text not null, + new_value jsonb); +$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"CREATE TABLE") + (localhost,57638,t,"CREATE TABLE") +(2 rows) + +SELECT run_command_on_workers('SELECT count(*) FROM distributed_triggers.data_changes;'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,0) + (localhost,57638,t,0) +(2 rows) + +INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}'); +INSERT INTO data_ref_table VALUES ('hello2','world2','{"ref":"table"}'); +BEGIN; +UPDATE data_ref_table SET value = '{}'; +END; +SELECT run_command_on_workers('SELECT count(*) FROM distributed_triggers.data_changes;'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,4) + (localhost,57638,t,4) +(2 rows) + +TABLE data_ref_table ORDER BY shard_key_value, object_id; + shard_key_value | object_id | value +--------------------------------------------------------------------- + hello | world | {} + hello2 | world2 | {} +(2 rows) + +-- +--Triggers on partitioned tables +-- +CREATE TABLE sale(sale_date date not null, state_code text, product_sku text, units integer) +PARTITION BY list (state_code); +ALTER TABLE sale ADD CONSTRAINT sale_pk PRIMARY KEY (state_code, sale_date); +CREATE TABLE sale_newyork PARTITION OF sale FOR VALUES IN ('NY'); +CREATE TABLE sale_california PARTITION OF sale FOR VALUES IN ('CA'); +CREATE TABLE record_sale(operation_type text not null, product_sku text, state_code text); +SELECT create_distributed_table('sale', 'state_code'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('record_sale', 'state_code', colocate_with := 'sale'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION record_sale() +RETURNS trigger +AS $$ +BEGIN + INSERT INTO distributed_triggers.record_sale(operation_type, product_sku, state_code) + VALUES (TG_OP, NEW.product_sku, NEW.state_code); + RETURN NULL; +END; +$$ LANGUAGE plpgsql; +CREATE TRIGGER record_sale_trigger +AFTER INSERT OR UPDATE OR DELETE ON sale +FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_sale(); +INSERT INTO sale(sale_date,state_code,product_sku,units) VALUES +('2019-01-01', 'CA', 'AZ-000A1', 85), +('2019-01-02', 'CA', 'AZ-000A1', 6), +('2019-01-03', 'NY', 'AZ-000A2', 83), +('2019-02-01', 'CA', 'AZ-000A2', 59), +('2019-02-02', 'CA', 'AZ-000A1', 9), +('2019-02-03', 'NY', 'AZ-000A1', 47); +TABLE sale ORDER BY state_code, sale_date; + sale_date | state_code | product_sku | units +--------------------------------------------------------------------- + 01-01-2019 | CA | AZ-000A1 | 85 + 01-02-2019 | CA | AZ-000A1 | 6 + 02-01-2019 | CA | AZ-000A2 | 59 + 02-02-2019 | CA | AZ-000A1 | 9 + 01-03-2019 | NY | AZ-000A2 | 83 + 02-03-2019 | NY | AZ-000A1 | 47 +(6 rows) + +TABLE record_sale ORDER BY 1,2,3; + operation_type | product_sku | state_code +--------------------------------------------------------------------- + INSERT | AZ-000A1 | CA + INSERT | AZ-000A1 | CA + INSERT | AZ-000A1 | CA + INSERT | AZ-000A1 | NY + INSERT | AZ-000A2 | CA + INSERT | AZ-000A2 | NY +(6 rows) + +-- +--Test ALTER TRIGGER +-- +CREATE VIEW sale_triggers AS + SELECT tgname, tgrelid::regclass, tgenabled + FROM pg_trigger + WHERE tgrelid::regclass::text like 'sale%' + ORDER BY 1, 2; +SELECT * FROM sale_triggers ORDER BY 1,2; + tgname | tgrelid | tgenabled +--------------------------------------------------------------------- + record_sale_trigger | sale | O + record_sale_trigger | sale_newyork | O + record_sale_trigger | sale_california | O + truncate_trigger_xxxxxxx | sale | O + truncate_trigger_xxxxxxx | sale_california | O + truncate_trigger_xxxxxxx | sale_newyork | O +(6 rows) + +ALTER TRIGGER "record_sale_trigger" ON "distributed_triggers"."sale" RENAME TO "new_record_sale_trigger"; +SELECT * FROM sale_triggers ORDER BY 1,2; + tgname | tgrelid | tgenabled +--------------------------------------------------------------------- + new_record_sale_trigger | sale | O + record_sale_trigger | sale_newyork | O + record_sale_trigger | sale_california | O + truncate_trigger_xxxxxxx | sale | O + truncate_trigger_xxxxxxx | sale_california | O + truncate_trigger_xxxxxxx | sale_newyork | O +(6 rows) + +CREATE EXTENSION seg; +ALTER TRIGGER "emptest_audit" ON "emptest" DEPENDS ON EXTENSION seg; +ERROR: Triggers "emptest_audit" on distributed tables and local tables added to metadata are not allowed to depend on an extension +DETAIL: Triggers from extensions are expected to be created on the workers by the extension they depend on. +DROP TABLE data_ref_table; +-- +--Triggers with add/remove node +-- +SELECT * FROM master_drain_node('localhost', :worker_2_port); +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... + master_drain_node +--------------------------------------------------------------------- + +(1 row) + +SELECT 1 from master_remove_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +CREATE TABLE distributed_table(value int); +CREATE TABLE distributed_table_change(value int); +SELECT create_distributed_table('distributed_table', 'value', colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('distributed_table_change', 'value', colocate_with => 'distributed_table'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE FUNCTION insert_99() RETURNS trigger AS $insert_99$ +BEGIN + INSERT INTO distributed_triggers.distributed_table_change VALUES (99); + RETURN NEW; +END; +$insert_99$ LANGUAGE plpgsql; +CREATE TRIGGER insert_99_trigger +AFTER DELETE ON distributed_table +FOR EACH ROW EXECUTE FUNCTION distributed_triggers.insert_99(); +SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'insert_99_trigger%' ORDER BY 1,2; + tgrelid | tgname +--------------------------------------------------------------------- + distributed_table | insert_99_trigger +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'insert_99_trigger%'$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,33) +(1 row) + +INSERT INTO distributed_table VALUES (99); +DELETE FROM distributed_table; +SELECT * FROM distributed_table_change; + value +--------------------------------------------------------------------- + 99 +(1 row) + +-- add the node back +SELECT 1 from master_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +INSERT INTO distributed_table VALUES (99); +DELETE FROM distributed_table; +SELECT * FROM distributed_table_change; + value +--------------------------------------------------------------------- + 99 + 99 +(2 rows) + +SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'insert_99_trigger%' ORDER BY 1,2; + tgrelid | tgname +--------------------------------------------------------------------- + distributed_table | insert_99_trigger +(1 row) + +SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'insert_99_trigger%'$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,33) + (localhost,57638,t,1) +(2 rows) + +RESET client_min_messages; +SET citus.enable_unsafe_triggers TO false; +SELECT run_command_on_workers('ALTER SYSTEM SET citus.enable_unsafe_triggers TO false;'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"ALTER SYSTEM") + (localhost,57638,t,"ALTER SYSTEM") +(2 rows) + +SELECT run_command_on_workers('SELECT pg_reload_conf();'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,t) + (localhost,57638,t,t) +(2 rows) + +SET citus.log_remote_commands TO off; +DROP SCHEMA distributed_triggers CASCADE; +NOTICE: drop cascades to 21 other objects +DETAIL: drop cascades to table data +drop cascades to function record_change() +drop cascades to function insert_delete_document(text,text) +drop cascades to function bad_shardkey_record_change() +drop cascades to function remote_shardkey_record_change() +drop cascades to function insert_document(text,text) +drop cascades to table emptest +drop cascades to table emptest_audit +drop cascades to function process_emp_audit() +drop cascades to view emp_triggers +drop cascades to table record_op +drop cascades to function record_emp() +drop cascades to table data_changes +drop cascades to table sale +drop cascades to table record_sale +drop cascades to function record_sale() +drop cascades to view sale_triggers +drop cascades to extension seg +drop cascades to table distributed_table +drop cascades to table distributed_table_change +drop cascades to function insert_99() diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index a95efbac9..da7eb9c79 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -30,6 +30,7 @@ test: propagate_extension_commands test: escape_extension_name test: ref_citus_local_fkeys test: alter_database_owner +test: distributed_triggers test: multi_test_catalog_views test: multi_table_ddl diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index 2e26ea425..2e01b0518 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -1007,7 +1007,7 @@ SELECT value, count(*) FROM trigger_table GROUP BY value ORDER BY value; (1 row) ALTER TABLE trigger_table DISABLE TRIGGER ALL; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false" INSERT INTO trigger_table VALUES (1, 'trigger disabled'); SELECT value, count(*) FROM trigger_table GROUP BY value ORDER BY value; value | count @@ -1016,7 +1016,7 @@ SELECT value, count(*) FROM trigger_table GROUP BY value ORDER BY value; (1 row) ALTER TABLE trigger_table ENABLE TRIGGER ALL; -ERROR: triggers are only supported for local tables added to metadata +ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false" INSERT INTO trigger_table VALUES (1, 'trigger disabled'); SELECT value, count(*) FROM trigger_table GROUP BY value ORDER BY value; value | count diff --git a/src/test/regress/sql/citus_local_table_triggers.sql b/src/test/regress/sql/citus_local_table_triggers.sql index 9bb1ee81d..57394b306 100644 --- a/src/test/regress/sql/citus_local_table_triggers.sql +++ b/src/test/regress/sql/citus_local_table_triggers.sql @@ -161,12 +161,11 @@ BEFORE INSERT ON "interesting!schema"."citus_local!_table" FOR EACH STATEMENT EXECUTE FUNCTION dummy_function(); CREATE EXTENSION seg; +-- ALTER TRIGGER DEPENDS ON +ALTER TRIGGER "trigger\'name" ON "interesting!schema"."citus_local!_table" DEPENDS ON EXTENSION seg; BEGIN; - -- ALTER TRIGGER DEPENDS ON - ALTER TRIGGER "trigger\'name" ON "interesting!schema"."citus_local!_table" DEPENDS ON EXTENSION seg; - - -- show that triggers on both shell relation and shard relation are depending on seg + -- show that triggers on both shell relation and shard relation are not depending on seg SELECT tgname FROM pg_depend, pg_trigger, pg_extension WHERE deptype = 'x' and classid='pg_trigger'::regclass and pg_trigger.oid=pg_depend.objid and extname='seg' @@ -174,7 +173,7 @@ BEGIN; DROP EXTENSION seg; - -- show that dropping extension drops the triggers automatically + -- show that dropping extension doesn't drop the triggers automatically SELECT * FROM citus_local_table_triggers WHERE tgname NOT LIKE 'RI_ConstraintTrigger%'; ROLLBACK; diff --git a/src/test/regress/sql/citus_local_tables_mx.sql b/src/test/regress/sql/citus_local_tables_mx.sql index 58745ef1c..4e72c6c13 100644 --- a/src/test/regress/sql/citus_local_tables_mx.sql +++ b/src/test/regress/sql/citus_local_tables_mx.sql @@ -37,22 +37,19 @@ FOR EACH ROW EXECUTE FUNCTION dummy_function(); -- the function that trigger needs in mx workers too. SELECT 1 FROM citus_activate_node('localhost', :worker_1_port); +-- show that the trigger is not allowed to depend(explicitly) on an extension. CREATE EXTENSION seg; ALTER TRIGGER dummy_function_trigger ON citus_local_table DEPENDS ON EXTENSION seg; ALTER TRIGGER dummy_function_trigger ON citus_local_table RENAME TO renamed_trigger; ALTER TABLE citus_local_table DISABLE TRIGGER ALL; --- show that update trigger mx relation are depending on seg, renamed and disabled. +-- show that update trigger mx relation is renamed and disabled. -- both workers should should print 1. SELECT run_command_on_workers( $$ -SELECT COUNT(*) FROM pg_depend, pg_trigger, pg_extension +SELECT COUNT(*) FROM pg_trigger WHERE pg_trigger.tgrelid='citus_local_tables_mx.citus_local_table'::regclass AND pg_trigger.tgname='renamed_trigger' AND - pg_trigger.tgenabled='D' AND - pg_depend.classid='pg_trigger'::regclass AND - pg_depend.deptype='x' AND - pg_trigger.oid=pg_depend.objid AND - pg_extension.extname='seg' + pg_trigger.tgenabled='D' $$); CREATE FUNCTION another_dummy_function() RETURNS trigger AS $another_dummy_function$ diff --git a/src/test/regress/sql/distributed_triggers.sql b/src/test/regress/sql/distributed_triggers.sql new file mode 100644 index 000000000..9f8a2e3b7 --- /dev/null +++ b/src/test/regress/sql/distributed_triggers.sql @@ -0,0 +1,470 @@ +SET citus.log_remote_commands TO OFF; +DROP SCHEMA IF EXISTS distributed_triggers CASCADE; +CREATE SCHEMA distributed_triggers; +SET search_path TO 'distributed_triggers'; +SET citus.shard_replication_factor = 1; +SET citus.shard_count = 32; +SET citus.next_shard_id TO 800000; + +-- +-- Test citus.enable_unsafe_triggers +-- Enables arbitrary triggers on distributed tables +-- +CREATE TABLE data ( + shard_key_value text not null, + object_id text not null, + value jsonb not null +); +ALTER TABLE data +ADD CONSTRAINT data_pk +PRIMARY KEY (shard_key_value, object_id); + +/* table of changes */ +CREATE TABLE data_changes ( + shard_key_value text not null, + object_id text not null, + change_id bigint not null, + change_time timestamptz default now(), + operation_type text not null, + new_value jsonb +); +ALTER TABLE data_changes +ADD CONSTRAINT data_changes_pk +PRIMARY KEY (shard_key_value, object_id, change_id); + +SELECT create_distributed_table('data', 'shard_key_value'); +SELECT create_distributed_table('data_changes', 'shard_key_value', colocate_with := 'data'); + +SET citus.enable_unsafe_triggers TO true; +SELECT run_command_on_workers('ALTER SYSTEM SET citus.enable_unsafe_triggers TO true;'); +SELECT run_command_on_workers('SELECT pg_reload_conf();'); + +/* trigger function that is called after any change */ +CREATE OR REPLACE FUNCTION record_change() +RETURNS trigger +AS $$ +DECLARE + last_change_id bigint; +BEGIN + IF (TG_OP = 'DELETE') THEN + /* get the last change ID for object key in OLD via index(-only) scan */ + SELECT change_id INTO last_change_id + FROM distributed_triggers.data_changes + WHERE shard_key_value = OLD.shard_key_value AND object_id = OLD.object_id + ORDER BY change_id DESC LIMIT 1; + + /* insert a change record for the delete */ + INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type) + VALUES (OLD.shard_key_value, OLD.object_id, COALESCE(last_change_id + 1, 1), TG_OP); + ELSE + /* get the last change ID for object key in NEW via index(-only) scan */ + SELECT change_id INTO last_change_id + FROM distributed_triggers.data_changes + WHERE shard_key_value = NEW.shard_key_value AND object_id = NEW.object_id + ORDER BY change_id DESC LIMIT 1; + + /* insert a change record for the insert/update */ + INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value) + VALUES (NEW.shard_key_value, NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value); + END IF; + + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +SELECT proname from pg_proc WHERE oid='distributed_triggers.record_change'::regproc; +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE oid='distributed_triggers.record_change'::regproc$$); + +CREATE TRIGGER record_change_trigger +AFTER INSERT OR UPDATE OR DELETE ON data +FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_change(); + +-- Trigger function should appear on workers +SELECT proname from pg_proc WHERE oid='distributed_triggers.record_change'::regproc; +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE oid='distributed_triggers.record_change'::regproc$$); + +INSERT INTO data VALUES ('hello','world','{"hello":"world"}'); +INSERT INTO data VALUES ('hello2','world2','{"hello2":"world2"}'); +INSERT INTO data VALUES ('hello3','world3','{"hello3":"world3"}'); +DELETE FROM data where shard_key_value = 'hello'; +BEGIN; +UPDATE data SET value = '{}' where shard_key_value = 'hello3'; +END; +DELETE FROM data where shard_key_value = 'hello3'; + +SELECT * FROM data +ORDER BY shard_key_value, object_id; +SELECT shard_key_value, object_id, change_id, operation_type, new_value +FROM data_changes +ORDER BY shard_key_value, object_id, change_id; + +CREATE FUNCTION insert_delete_document(key text, id text) +RETURNS void LANGUAGE plpgsql AS $fn$ +BEGIN + INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}'); + DELETE FROM distributed_triggers.data where shard_key_value = key; +END; +$fn$; + +SELECT create_distributed_function( + 'insert_delete_document(text, text)', 'key', + colocate_with := 'data', + force_delegation := true +); + +SELECT insert_delete_document('hello4', 'world4'); +BEGIN; +SELECT insert_delete_document('hello4', 'world4'); +COMMIT; + +SELECT * FROM data +ORDER BY shard_key_value, object_id; +SELECT shard_key_value, object_id, change_id, operation_type, new_value +FROM data_changes +ORDER BY shard_key_value, object_id, change_id; + +SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'record_change_trigger%' ORDER BY 1,2; +SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'record_change_trigger%';$$); + +ALTER TRIGGER "record_change_trigger" ON "distributed_triggers"."data" RENAME TO "new_record_change_trigger"; + +SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'record_change_trigger%' ORDER BY 1,2; +SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'new_record_change_trigger%' ORDER BY 1,2; +SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'record_change_trigger%';$$); +SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'new_record_change_trigger%';$$); + +--This should fail +DROP TRIGGER record_change_trigger ON data; +DROP TRIGGER new_record_change_trigger ON data; +--Trigger should go away +SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'new_record_change_trigger%' ORDER BY 1,2; + +-- +-- Run bad triggers +-- +CREATE OR REPLACE FUNCTION bad_shardkey_record_change() +RETURNS trigger +AS $$ +DECLARE + last_change_id bigint; +BEGIN + INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value) + VALUES ('BAD', NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value); + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER bad_shardkey_record_change_trigger +AFTER INSERT OR UPDATE OR DELETE ON data +FOR EACH ROW EXECUTE FUNCTION distributed_triggers.bad_shardkey_record_change(); + +-- Bad trigger fired from an individual SQL +-- Query-on-distributed table exception should catch this +INSERT INTO data VALUES ('hello6','world6','{"hello6":"world6"}'); + +-- Bad trigger fired from SQL inside a force-delegated function +-- Incorrect distribution key exception should catch this +SELECT insert_delete_document('hello6', 'world6'); + +SELECT * FROM data +ORDER BY shard_key_value, object_id; +SELECT shard_key_value, object_id, change_id, operation_type, new_value +FROM data_changes +ORDER BY shard_key_value, object_id, change_id; +DROP TRIGGER bad_shardkey_record_change_trigger ON data; + +CREATE OR REPLACE FUNCTION remote_shardkey_record_change() +RETURNS trigger +AS $$ +DECLARE + last_change_id bigint; +BEGIN + UPDATE distributed_triggers.data_changes SET operation_type = TG_OP; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER remote_shardkey_record_change_trigger +AFTER INSERT OR UPDATE OR DELETE ON data +FOR EACH ROW EXECUTE FUNCTION distributed_triggers.remote_shardkey_record_change(); + +CREATE FUNCTION insert_document(key text, id text) +RETURNS void LANGUAGE plpgsql AS $fn$ +BEGIN + INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}'); + DELETE FROM distributed_triggers.data where shard_key_value = key; +END; +$fn$; + +SELECT create_distributed_function( + 'insert_document(text, text)', 'key', + colocate_with := 'data', + force_delegation := false +); + +BEGIN; +SELECT insert_document('hello7', 'world7'); +END; + +SELECT insert_document('hello7', 'world7'); + +SELECT * FROM data +ORDER BY shard_key_value, object_id; +SELECT shard_key_value, object_id, change_id, operation_type, new_value +FROM data_changes +ORDER BY shard_key_value, object_id, change_id; + +-- +-- Triggers (tables) which are not colocated +-- +CREATE TABLE emptest ( + empname text NOT NULL, + salary integer +); + +CREATE TABLE emptest_audit( + operation char(1) NOT NULL, + stamp timestamp NOT NULL, + userid text NOT NULL, + empname text NOT NULL, + salary integer +); + +SELECT create_distributed_table('emptest','empname',colocate_with :='none'); +SELECT create_distributed_table('emptest_audit','empname',colocate_with :='none'); + +CREATE OR REPLACE FUNCTION process_emp_audit() RETURNS TRIGGER AS $emp_audit$ + BEGIN + -- + -- Create a row in emp_audit to reflect the operation performed on emp, + -- making use of the special variable TG_OP to work out the operation. + -- + IF (TG_OP = 'DELETE') THEN + INSERT INTO distributed_triggers.emptest_audit SELECT 'D', now(), user, OLD.*; + ELSIF (TG_OP = 'UPDATE') THEN + INSERT INTO distributed_triggers.emptest_audit SELECT 'U', now(), user, NEW.*; + ELSIF (TG_OP = 'INSERT') THEN + INSERT INTO distributed_triggers.emptest_audit SELECT 'I', now(), user, NEW.*; + END IF; + RETURN NULL; -- result is ignored since this is an AFTER trigger + END; +$emp_audit$ LANGUAGE plpgsql; + +CREATE TRIGGER emptest_audit +AFTER INSERT OR UPDATE OR DELETE ON emptest + FOR EACH ROW EXECUTE FUNCTION distributed_triggers.process_emp_audit(); + +INSERT INTO emptest VALUES ('test1', 1); +INSERT INTO emptest VALUES ('test2', 1); +INSERT INTO emptest VALUES ('test3', 1); +INSERT INTO emptest VALUES ('test4', 1); + +SELECT operation, userid, empname, salary +FROM emptest_audit +ORDER BY 3,1; + +DELETE from emptest; + +SELECT operation, userid, empname, salary +FROM emptest_audit +ORDER BY 3,1; + +CREATE VIEW emp_triggers AS + SELECT tgname, tgrelid::regclass, tgenabled + FROM pg_trigger + WHERE tgrelid::regclass::text like 'emptest%' + ORDER BY 1, 2; +SELECT * FROM emp_triggers ORDER BY 1,2; + +-- Triggers "FOR EACH STATEMENT" +CREATE TABLE record_op ( + empname text NOT NULL, + operation_type text not null, + stamp timestamp NOT NULL +); +SELECT create_distributed_table('record_op', 'empname', colocate_with := 'emptest'); +CREATE OR REPLACE FUNCTION record_emp() RETURNS TRIGGER AS $rec_audit$ + BEGIN + INSERT INTO distributed_triggers.record_op SELECT 'dummy', TG_OP, now(); + RETURN NULL; -- result is ignored since this is an AFTER trigger + END; +$rec_audit$ LANGUAGE plpgsql; + +CREATE TRIGGER record_emp_trig +AFTER INSERT OR UPDATE OR DELETE ON emptest + FOR EACH STATEMENT EXECUTE FUNCTION distributed_triggers.record_emp(); + +INSERT INTO emptest VALUES ('test5', 1); +DELETE FROM emptest; +SELECT * FROM emptest; +SELECT operation_type FROM record_op; + +-- +-- Triggers on reference tables +-- +CREATE TABLE data_ref_table ( + shard_key_value text not null, + object_id text not null, + value jsonb not null +); +ALTER TABLE data_ref_table +ADD CONSTRAINT data_ref_pk +PRIMARY KEY (shard_key_value, object_id); +SELECT create_reference_table('data_ref_table'); + +-- Trigger function record_change operates on data_changes which is *not* colocated with the reference table +CREATE TRIGGER record_change_trigger +AFTER INSERT OR UPDATE OR DELETE ON data_ref_table +FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_change(); + +TRUNCATE TABLE data_changes; +INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}'); +INSERT INTO data_ref_table VALUES ('hello2','world2','{"ref":"table"}'); +DELETE FROM data_ref_table where shard_key_value = 'hello'; +BEGIN; +UPDATE data_ref_table SET value = '{}' where shard_key_value = 'hello2'; +END; +TABLE data_changes ORDER BY shard_key_value, object_id, change_id; +TABLE data_ref_table ORDER BY shard_key_value, object_id; + +-- Colocate data_changes table with reference table +SELECT undistribute_table('data_changes'); +SELECT create_reference_table('data_changes'); + +INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}'); +TABLE data_changes ORDER BY shard_key_value, object_id, change_id; +TABLE data_ref_table ORDER BY shard_key_value, object_id; + +-- Create data_changes table locally with reference table +DROP TABLE data_changes; + +/* table of changes local to each placement of the reference table */ +CREATE TABLE data_changes ( + shard_key_value text not null, + object_id text not null, + change_id bigint not null, + change_time timestamptz default now(), + operation_type text not null, + new_value jsonb +); +SELECT run_command_on_workers($$CREATE TABLE distributed_triggers.data_changes( + shard_key_value text not null, + object_id text not null, + change_id bigint not null, + change_time timestamptz default now(), + operation_type text not null, + new_value jsonb); +$$); + +SELECT run_command_on_workers('SELECT count(*) FROM distributed_triggers.data_changes;'); + +INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}'); +INSERT INTO data_ref_table VALUES ('hello2','world2','{"ref":"table"}'); +BEGIN; +UPDATE data_ref_table SET value = '{}'; +END; + +SELECT run_command_on_workers('SELECT count(*) FROM distributed_triggers.data_changes;'); +TABLE data_ref_table ORDER BY shard_key_value, object_id; + +-- +--Triggers on partitioned tables +-- +CREATE TABLE sale(sale_date date not null, state_code text, product_sku text, units integer) +PARTITION BY list (state_code); +ALTER TABLE sale ADD CONSTRAINT sale_pk PRIMARY KEY (state_code, sale_date); +CREATE TABLE sale_newyork PARTITION OF sale FOR VALUES IN ('NY'); +CREATE TABLE sale_california PARTITION OF sale FOR VALUES IN ('CA'); + +CREATE TABLE record_sale(operation_type text not null, product_sku text, state_code text); + +SELECT create_distributed_table('sale', 'state_code'); +SELECT create_distributed_table('record_sale', 'state_code', colocate_with := 'sale'); + +CREATE OR REPLACE FUNCTION record_sale() +RETURNS trigger +AS $$ +BEGIN + INSERT INTO distributed_triggers.record_sale(operation_type, product_sku, state_code) + VALUES (TG_OP, NEW.product_sku, NEW.state_code); + RETURN NULL; +END; +$$ LANGUAGE plpgsql; +CREATE TRIGGER record_sale_trigger +AFTER INSERT OR UPDATE OR DELETE ON sale +FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_sale(); + +INSERT INTO sale(sale_date,state_code,product_sku,units) VALUES +('2019-01-01', 'CA', 'AZ-000A1', 85), +('2019-01-02', 'CA', 'AZ-000A1', 6), +('2019-01-03', 'NY', 'AZ-000A2', 83), +('2019-02-01', 'CA', 'AZ-000A2', 59), +('2019-02-02', 'CA', 'AZ-000A1', 9), +('2019-02-03', 'NY', 'AZ-000A1', 47); + +TABLE sale ORDER BY state_code, sale_date; +TABLE record_sale ORDER BY 1,2,3; + +-- +--Test ALTER TRIGGER +-- +CREATE VIEW sale_triggers AS + SELECT tgname, tgrelid::regclass, tgenabled + FROM pg_trigger + WHERE tgrelid::regclass::text like 'sale%' + ORDER BY 1, 2; + +SELECT * FROM sale_triggers ORDER BY 1,2; +ALTER TRIGGER "record_sale_trigger" ON "distributed_triggers"."sale" RENAME TO "new_record_sale_trigger"; +SELECT * FROM sale_triggers ORDER BY 1,2; + +CREATE EXTENSION seg; +ALTER TRIGGER "emptest_audit" ON "emptest" DEPENDS ON EXTENSION seg; + +DROP TABLE data_ref_table; +-- +--Triggers with add/remove node +-- +SELECT * FROM master_drain_node('localhost', :worker_2_port); +SELECT 1 from master_remove_node('localhost', :worker_2_port); + +CREATE TABLE distributed_table(value int); +CREATE TABLE distributed_table_change(value int); + +SELECT create_distributed_table('distributed_table', 'value', colocate_with => 'none'); +SELECT create_distributed_table('distributed_table_change', 'value', colocate_with => 'distributed_table'); + +CREATE FUNCTION insert_99() RETURNS trigger AS $insert_99$ +BEGIN + INSERT INTO distributed_triggers.distributed_table_change VALUES (99); + RETURN NEW; +END; +$insert_99$ LANGUAGE plpgsql; + +CREATE TRIGGER insert_99_trigger +AFTER DELETE ON distributed_table +FOR EACH ROW EXECUTE FUNCTION distributed_triggers.insert_99(); + +SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'insert_99_trigger%' ORDER BY 1,2; +SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'insert_99_trigger%'$$); + +INSERT INTO distributed_table VALUES (99); +DELETE FROM distributed_table; +SELECT * FROM distributed_table_change; + +-- add the node back +SELECT 1 from master_add_node('localhost', :worker_2_port); +INSERT INTO distributed_table VALUES (99); +DELETE FROM distributed_table; +SELECT * FROM distributed_table_change; + +SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'insert_99_trigger%' ORDER BY 1,2; +SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'insert_99_trigger%'$$); + +RESET client_min_messages; +SET citus.enable_unsafe_triggers TO false; +SELECT run_command_on_workers('ALTER SYSTEM SET citus.enable_unsafe_triggers TO false;'); +SELECT run_command_on_workers('SELECT pg_reload_conf();'); +SET citus.log_remote_commands TO off; + +DROP SCHEMA distributed_triggers CASCADE;