mirror of https://github.com/citusdata/citus.git
Merge branch 'master' into fix/subscript-jsonb
commit
108f5370fe
|
@ -1006,7 +1006,13 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
|
|||
|
||||
EnsureTableNotDistributed(relationId);
|
||||
EnsureLocalTableEmptyIfNecessary(relationId, distributionMethod, viaDeprecatedAPI);
|
||||
EnsureRelationHasNoTriggers(relationId);
|
||||
|
||||
/* user really wants triggers? */
|
||||
if (!EnableUnsafeTriggers)
|
||||
{
|
||||
EnsureRelationHasNoTriggers(relationId);
|
||||
}
|
||||
|
||||
|
||||
/* we assume callers took necessary locks */
|
||||
Relation relation = relation_open(relationId, NoLock);
|
||||
|
@ -1328,11 +1334,11 @@ EnsureRelationHasNoTriggers(Oid relationId)
|
|||
|
||||
Assert(relationName != NULL);
|
||||
ereport(ERROR, (errmsg("cannot distribute relation \"%s\" because it has "
|
||||
"triggers ", relationName),
|
||||
errdetail("Citus does not support distributing tables with "
|
||||
"triggers."),
|
||||
errhint("Drop all the triggers on \"%s\" and retry.",
|
||||
relationName)));
|
||||
"triggers and \"citus.enable_unsafe_triggers\" is "
|
||||
"set to \"false\"", relationName),
|
||||
errhint("Consider setting \"citus.enable_unsafe_triggers\" "
|
||||
"to \"true\", or drop all the triggers on \"%s\" "
|
||||
"and retry.", relationName)));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -564,7 +564,7 @@ static DistributeObjectOps TextSearchConfig_Rename = {
|
|||
static DistributeObjectOps Trigger_AlterObjectDepends = {
|
||||
.deparse = NULL,
|
||||
.qualify = NULL,
|
||||
.preprocess = NULL,
|
||||
.preprocess = PreprocessAlterTriggerDependsStmt,
|
||||
.postprocess = PostprocessAlterTriggerDependsStmt,
|
||||
.address = NULL,
|
||||
.markDistributed = false,
|
||||
|
|
|
@ -1082,15 +1082,9 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
|
|||
}
|
||||
else if (AlterTableCommandTypeIsTrigger(alterTableType))
|
||||
{
|
||||
/*
|
||||
* We already error'ed out for ENABLE/DISABLE trigger commands for
|
||||
* other citus table types in ErrorIfUnsupportedAlterTableStmt.
|
||||
*/
|
||||
Assert(IsCitusTableType(leftRelationId, CITUS_LOCAL_TABLE));
|
||||
|
||||
char *triggerName = command->name;
|
||||
return CitusLocalTableTriggerCommandDDLJob(leftRelationId, triggerName,
|
||||
alterTableCommand);
|
||||
return CitusCreateTriggerCommandDDLJob(leftRelationId, triggerName,
|
||||
alterTableCommand);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -2570,7 +2564,7 @@ ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod,
|
|||
* ALTER TABLE REPLICA IDENTITY
|
||||
* ALTER TABLE SET ()
|
||||
* ALTER TABLE RESET ()
|
||||
* ALTER TABLE ENABLE/DISABLE TRIGGER (only for citus local tables)
|
||||
* ALTER TABLE ENABLE/DISABLE TRIGGER (if enable_unsafe_triggers is not set, we only support triggers for citus local tables)
|
||||
*/
|
||||
static void
|
||||
ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
|
||||
|
@ -2904,7 +2898,7 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
errhint("You can issue each subcommand separately")));
|
||||
}
|
||||
|
||||
ErrorOutForTriggerIfNotCitusLocalTable(relationId);
|
||||
ErrorOutForTriggerIfNotSupported(relationId);
|
||||
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -16,13 +16,16 @@
|
|||
#include "catalog/indexing.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "catalog/pg_trigger.h"
|
||||
#include "commands/extension.h"
|
||||
#include "commands/trigger.h"
|
||||
#include "distributed/commands.h"
|
||||
#include "distributed/commands/utility_hook.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/deparser.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata/distobject.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/namespace_utils.h"
|
||||
#include "distributed/shard_utils.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
|
@ -52,6 +55,10 @@ static void ErrorIfDropStmtDropsMultipleTriggers(DropStmt *dropTriggerStmt);
|
|||
static int16 GetTriggerTypeById(Oid triggerId);
|
||||
|
||||
|
||||
/* GUC that overrides trigger checks for distributed tables and reference tables */
|
||||
bool EnableUnsafeTriggers = false;
|
||||
|
||||
|
||||
/*
|
||||
* GetExplicitTriggerCommandList returns the list of DDL commands to create
|
||||
* triggers that are explicitly created for the table with relationId. See
|
||||
|
@ -215,20 +222,14 @@ PostprocessCreateTriggerStmt(Node *node, const char *queryString)
|
|||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
ErrorOutForTriggerIfNotSupported(relationId);
|
||||
|
||||
ErrorOutForTriggerIfNotCitusLocalTable(relationId);
|
||||
ObjectAddress objectAddress = GetObjectAddressFromParseTree(node, missingOk);
|
||||
EnsureDependenciesExistOnAllNodes(&objectAddress);
|
||||
|
||||
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
|
||||
{
|
||||
ObjectAddress objectAddress = GetObjectAddressFromParseTree(node, missingOk);
|
||||
EnsureDependenciesExistOnAllNodes(&objectAddress);
|
||||
|
||||
char *triggerName = createTriggerStmt->trigname;
|
||||
return CitusLocalTableTriggerCommandDDLJob(relationId, triggerName,
|
||||
queryString);
|
||||
}
|
||||
|
||||
return NIL;
|
||||
char *triggerName = createTriggerStmt->trigname;
|
||||
return CitusCreateTriggerCommandDDLJob(relationId, triggerName,
|
||||
queryString);
|
||||
}
|
||||
|
||||
|
||||
|
@ -328,17 +329,12 @@ PostprocessAlterTriggerRenameStmt(Node *node, const char *queryString)
|
|||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
ErrorOutForTriggerIfNotCitusLocalTable(relationId);
|
||||
ErrorOutForTriggerIfNotSupported(relationId);
|
||||
|
||||
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
|
||||
{
|
||||
/* use newname as standard process utility already renamed it */
|
||||
char *triggerName = renameTriggerStmt->newname;
|
||||
return CitusLocalTableTriggerCommandDDLJob(relationId, triggerName,
|
||||
queryString);
|
||||
}
|
||||
|
||||
return NIL;
|
||||
/* use newname as standard process utility already renamed it */
|
||||
char *triggerName = renameTriggerStmt->newname;
|
||||
return CitusCreateTriggerCommandDDLJob(relationId, triggerName,
|
||||
queryString);
|
||||
}
|
||||
|
||||
|
||||
|
@ -368,6 +364,70 @@ AlterTriggerRenameEventExtendNames(RenameStmt *renameTriggerStmt, char *schemaNa
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* PreprocessAlterTriggerDependsStmt is called during the planning phase of an
|
||||
* ALTER TRIGGER ... DEPENDS ON EXTENSION ... statement. Since triggers depending on
|
||||
* extensions are assumed to be Owned by an extension we assume the extension to keep
|
||||
* the trigger in sync.
|
||||
*
|
||||
* If we would allow users to create a dependency between a distributed trigger and an
|
||||
* extension, our pruning logic for which objects to distribute as dependencies of other
|
||||
* objects will change significantly, which could cause issues adding new workers. Hence
|
||||
* we don't allow this dependency to be created.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterTriggerDependsStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext)
|
||||
{
|
||||
AlterObjectDependsStmt *alterTriggerDependsStmt =
|
||||
castNode(AlterObjectDependsStmt, node);
|
||||
Assert(alterTriggerDependsStmt->objectType == OBJECT_TRIGGER);
|
||||
|
||||
if (creating_extension)
|
||||
{
|
||||
/*
|
||||
* extensions should be created separately on the workers, triggers cascading
|
||||
* from an extension should therefore not be propagated here.
|
||||
*/
|
||||
return NIL;
|
||||
}
|
||||
|
||||
if (!EnableMetadataSync)
|
||||
{
|
||||
/*
|
||||
* we are configured to disable object propagation, should not propagate anything
|
||||
*/
|
||||
return NIL;
|
||||
}
|
||||
|
||||
RangeVar *relation = alterTriggerDependsStmt->relation;
|
||||
|
||||
bool missingOk = false;
|
||||
Oid relationId = RangeVarGetRelid(relation, ALTER_TRIGGER_LOCK_MODE, missingOk);
|
||||
|
||||
if (!IsCitusTable(relationId))
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Distributed objects should not start depending on an extension, this will break
|
||||
* the dependency resolving mechanism we use to replicate distributed objects to new
|
||||
* workers
|
||||
*/
|
||||
|
||||
Value *triggerNameValue =
|
||||
GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt);
|
||||
ereport(ERROR, (errmsg(
|
||||
"Triggers \"%s\" on distributed tables and local tables added to metadata "
|
||||
"are not allowed to depend on an extension", strVal(
|
||||
triggerNameValue)),
|
||||
errdetail(
|
||||
"Triggers from extensions are expected to be created on the workers "
|
||||
"by the extension they depend on.")));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PostprocessAlterTriggerDependsStmt is called after a ALTER TRIGGER DEPENDS ON
|
||||
* command has been executed by standard process utility. This function errors out
|
||||
|
@ -392,17 +452,12 @@ PostprocessAlterTriggerDependsStmt(Node *node, const char *queryString)
|
|||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
ErrorOutForTriggerIfNotCitusLocalTable(relationId);
|
||||
ErrorOutForTriggerIfNotSupported(relationId);
|
||||
|
||||
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
|
||||
{
|
||||
Value *triggerNameValue =
|
||||
GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt);
|
||||
return CitusLocalTableTriggerCommandDDLJob(relationId, strVal(triggerNameValue),
|
||||
queryString);
|
||||
}
|
||||
|
||||
return NIL;
|
||||
Value *triggerNameValue =
|
||||
GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt);
|
||||
return CitusCreateTriggerCommandDDLJob(relationId, strVal(triggerNameValue),
|
||||
queryString);
|
||||
}
|
||||
|
||||
|
||||
|
@ -459,7 +514,7 @@ GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt *alterTriggerDepen
|
|||
* unsupported commands or creates ddl job for supported DROP TRIGGER commands.
|
||||
* The reason we process drop trigger commands before standard process utility
|
||||
* (unlike the other type of trigger commands) is that we act according to trigger
|
||||
* type in CitusLocalTableTriggerCommandDDLJob but trigger wouldn't exist after
|
||||
* type in CitusCreateTriggerCommandDDLJob but trigger wouldn't exist after
|
||||
* standard process utility.
|
||||
*/
|
||||
List *
|
||||
|
@ -487,15 +542,10 @@ PreprocessDropTriggerStmt(Node *node, const char *queryString,
|
|||
|
||||
ErrorIfUnsupportedDropTriggerCommand(dropTriggerStmt);
|
||||
|
||||
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
|
||||
{
|
||||
char *triggerName = NULL;
|
||||
ExtractDropStmtTriggerAndRelationName(dropTriggerStmt, &triggerName, NULL);
|
||||
return CitusLocalTableTriggerCommandDDLJob(relationId, triggerName,
|
||||
queryString);
|
||||
}
|
||||
|
||||
return NIL;
|
||||
char *triggerName = NULL;
|
||||
ExtractDropStmtTriggerAndRelationName(dropTriggerStmt, &triggerName, NULL);
|
||||
return CitusCreateTriggerCommandDDLJob(relationId, triggerName,
|
||||
queryString);
|
||||
}
|
||||
|
||||
|
||||
|
@ -517,24 +567,34 @@ ErrorIfUnsupportedDropTriggerCommand(DropStmt *dropTriggerStmt)
|
|||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
ErrorOutForTriggerIfNotCitusLocalTable(relationId);
|
||||
ErrorOutForTriggerIfNotSupported(relationId);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorOutForTriggerIfNotCitusLocalTable is a helper function to error
|
||||
* ErrorOutForTriggerIfNotSupported is a helper function to error
|
||||
* out for unsupported trigger commands depending on the citus table type.
|
||||
*/
|
||||
void
|
||||
ErrorOutForTriggerIfNotCitusLocalTable(Oid relationId)
|
||||
ErrorOutForTriggerIfNotSupported(Oid relationId)
|
||||
{
|
||||
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
|
||||
if (EnableUnsafeTriggers)
|
||||
{
|
||||
/* user really wants triggers */
|
||||
return;
|
||||
}
|
||||
else if (IsCitusTableType(relationId, REFERENCE_TABLE))
|
||||
{
|
||||
ereport(ERROR, (errmsg("triggers are not supported on reference tables")));
|
||||
}
|
||||
else if (IsCitusTableType(relationId, DISTRIBUTED_TABLE))
|
||||
{
|
||||
ereport(ERROR, (errmsg("triggers are not supported on distributed tables "
|
||||
"when \"citus.enable_unsafe_triggers\" is set to "
|
||||
"\"false\"")));
|
||||
}
|
||||
|
||||
ereport(ERROR, (errmsg("triggers are only supported for local tables added "
|
||||
"to metadata")));
|
||||
/* we always support triggers on citus local tables */
|
||||
}
|
||||
|
||||
|
||||
|
@ -643,13 +703,13 @@ ErrorIfDropStmtDropsMultipleTriggers(DropStmt *dropTriggerStmt)
|
|||
|
||||
|
||||
/*
|
||||
* CitusLocalTableTriggerCommandDDLJob creates a ddl job to execute given
|
||||
* CitusCreateTriggerCommandDDLJob creates a ddl job to execute given
|
||||
* queryString trigger command on shell relation(s) in mx worker(s) and to
|
||||
* execute necessary ddl task on citus local table shard (if needed).
|
||||
*/
|
||||
List *
|
||||
CitusLocalTableTriggerCommandDDLJob(Oid relationId, char *triggerName,
|
||||
const char *queryString)
|
||||
CitusCreateTriggerCommandDDLJob(Oid relationId, char *triggerName,
|
||||
const char *queryString)
|
||||
{
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
ddlJob->targetRelationId = relationId;
|
||||
|
|
|
@ -979,6 +979,17 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.enable_unsafe_triggers",
|
||||
gettext_noop("Enables arbitrary triggers on distributed tables which may cause "
|
||||
"visibility and deadlock issues. Use at your own risk."),
|
||||
NULL,
|
||||
&EnableUnsafeTriggers,
|
||||
false,
|
||||
PGC_USERSET,
|
||||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.enable_version_checks",
|
||||
gettext_noop("Enables version checks during CREATE/ALTER EXTENSION commands"),
|
||||
|
|
|
@ -26,6 +26,9 @@ extern bool AddAllLocalTablesToMetadata;
|
|||
/* controlled via GUC, should be accessed via EnableLocalReferenceForeignKeys() */
|
||||
extern bool EnableLocalReferenceForeignKeys;
|
||||
|
||||
extern bool EnableUnsafeTriggers;
|
||||
|
||||
|
||||
extern void SwitchToSequentialAndLocalExecutionIfRelationNameTooLong(Oid relationId,
|
||||
char *
|
||||
finalRelationName);
|
||||
|
@ -580,16 +583,19 @@ extern List * PostprocessAlterTriggerRenameStmt(Node *node, const char *queryStr
|
|||
extern void AlterTriggerRenameEventExtendNames(RenameStmt *renameTriggerStmt,
|
||||
char *schemaName, uint64 shardId);
|
||||
extern List * PostprocessAlterTriggerDependsStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessAlterTriggerDependsStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext
|
||||
processUtilityContext);
|
||||
extern void AlterTriggerDependsEventExtendNames(
|
||||
AlterObjectDependsStmt *alterTriggerDependsStmt,
|
||||
char *schemaName, uint64 shardId);
|
||||
extern void ErrorOutForTriggerIfNotSupported(Oid relationId);
|
||||
extern List * PreprocessDropTriggerStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
extern void ErrorOutForTriggerIfNotCitusLocalTable(Oid relationId);
|
||||
extern void DropTriggerEventExtendNames(DropStmt *dropTriggerStmt, char *schemaName,
|
||||
uint64 shardId);
|
||||
extern List * CitusLocalTableTriggerCommandDDLJob(Oid relationId, char *triggerName,
|
||||
const char *queryString);
|
||||
extern List * CitusCreateTriggerCommandDDLJob(Oid relationId, char *triggerName,
|
||||
const char *queryString);
|
||||
extern Oid GetTriggerFunctionId(Oid triggerId);
|
||||
|
||||
/* cascade_table_operation_for_connected_relations.c */
|
||||
|
|
|
@ -161,6 +161,7 @@ class CitusDefaultClusterConfig(CitusBaseClusterConfig):
|
|||
new_settings = {
|
||||
"client_min_messages": "WARNING",
|
||||
"citus.sort_returning": True,
|
||||
"citus.use_citus_managed_tables": True,
|
||||
}
|
||||
self.settings.update(new_settings)
|
||||
self.add_coordinator_to_metadata = True
|
||||
|
@ -291,6 +292,7 @@ class CitusUnusualQuerySettingsConfig(CitusMXBaseClusterConfig):
|
|||
def __init__(self, arguments):
|
||||
super().__init__(arguments)
|
||||
self.new_settings = {
|
||||
"citus.use_citus_managed_tables": False,
|
||||
"citus.task_assignment_policy": "first-replica",
|
||||
"citus.enable_fast_path_router_planner": False,
|
||||
"citus.enable_local_execution": False,
|
||||
|
|
|
@ -194,29 +194,29 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1
|
|||
BEFORE INSERT ON "interesting!schema"."citus_local!_table"
|
||||
FOR EACH STATEMENT EXECUTE FUNCTION dummy_function();')
|
||||
CREATE EXTENSION seg;
|
||||
-- ALTER TRIGGER DEPENDS ON
|
||||
ALTER TRIGGER "trigger\'name" ON "interesting!schema"."citus_local!_table" DEPENDS ON EXTENSION seg;
|
||||
ERROR: Triggers "trigger\'name" on distributed tables and local tables added to metadata are not allowed to depend on an extension
|
||||
BEGIN;
|
||||
-- ALTER TRIGGER DEPENDS ON
|
||||
ALTER TRIGGER "trigger\'name" ON "interesting!schema"."citus_local!_table" DEPENDS ON EXTENSION seg;
|
||||
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507008, 'interesting!schema', E'ALTER TRIGGER "trigger\\''name" ON "interesting!schema"."citus_local!_table" DEPENDS ON EXTENSION seg;')
|
||||
-- show that triggers on both shell relation and shard relation are depending on seg
|
||||
-- show that triggers on both shell relation and shard relation are not depending on seg
|
||||
SELECT tgname FROM pg_depend, pg_trigger, pg_extension
|
||||
WHERE deptype = 'x' and classid='pg_trigger'::regclass and
|
||||
pg_trigger.oid=pg_depend.objid and extname='seg'
|
||||
ORDER BY 1;
|
||||
tgname
|
||||
tgname
|
||||
---------------------------------------------------------------------
|
||||
trigger\'name
|
||||
trigger\'name_1507008
|
||||
(2 rows)
|
||||
(0 rows)
|
||||
|
||||
DROP EXTENSION seg;
|
||||
-- show that dropping extension drops the triggers automatically
|
||||
-- show that dropping extension doesn't drop the triggers automatically
|
||||
SELECT * FROM citus_local_table_triggers
|
||||
WHERE tgname NOT LIKE 'RI_ConstraintTrigger%';
|
||||
tgname | tgrelid | tgenabled
|
||||
tgname | tgrelid | tgenabled
|
||||
---------------------------------------------------------------------
|
||||
truncate_trigger_xxxxxxx | "interesting!schema"."citus_local!_table" | O
|
||||
(1 row)
|
||||
trigger\'name | "interesting!schema"."citus_local!_table" | O
|
||||
trigger\'name_1507008 | "interesting!schema"."citus_local!_table_1507008" | O
|
||||
truncate_trigger_xxxxxxx | "interesting!schema"."citus_local!_table" | O
|
||||
(3 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- ALTER TRIGGER RENAME
|
||||
|
|
|
@ -51,25 +51,22 @@ SELECT 1 FROM citus_activate_node('localhost', :worker_1_port);
|
|||
1
|
||||
(1 row)
|
||||
|
||||
-- show that the trigger is not allowed to depend(explicitly) on an extension.
|
||||
CREATE EXTENSION seg;
|
||||
ALTER TRIGGER dummy_function_trigger ON citus_local_table DEPENDS ON EXTENSION seg;
|
||||
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1508000, 'citus_local_tables_mx', 'ALTER TRIGGER dummy_function_trigger ON citus_local_table DEPENDS ON EXTENSION seg;')
|
||||
ERROR: Triggers "dummy_function_trigger" on distributed tables and local tables added to metadata are not allowed to depend on an extension
|
||||
ALTER TRIGGER dummy_function_trigger ON citus_local_table RENAME TO renamed_trigger;
|
||||
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1508000, 'citus_local_tables_mx', 'ALTER TRIGGER dummy_function_trigger ON citus_local_table RENAME TO renamed_trigger;')
|
||||
ALTER TABLE citus_local_table DISABLE TRIGGER ALL;
|
||||
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1508000, 'citus_local_tables_mx', 'ALTER TABLE citus_local_table DISABLE TRIGGER ALL;')
|
||||
-- show that update trigger mx relation are depending on seg, renamed and disabled.
|
||||
-- show that update trigger mx relation is renamed and disabled.
|
||||
-- both workers should should print 1.
|
||||
SELECT run_command_on_workers(
|
||||
$$
|
||||
SELECT COUNT(*) FROM pg_depend, pg_trigger, pg_extension
|
||||
SELECT COUNT(*) FROM pg_trigger
|
||||
WHERE pg_trigger.tgrelid='citus_local_tables_mx.citus_local_table'::regclass AND
|
||||
pg_trigger.tgname='renamed_trigger' AND
|
||||
pg_trigger.tgenabled='D' AND
|
||||
pg_depend.classid='pg_trigger'::regclass AND
|
||||
pg_depend.deptype='x' AND
|
||||
pg_trigger.oid=pg_depend.objid AND
|
||||
pg_extension.extname='seg'
|
||||
pg_trigger.tgenabled='D'
|
||||
$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -34,11 +34,11 @@ SELECT create_reference_table('reference_table');
|
|||
CREATE TRIGGER update_value_dist
|
||||
AFTER INSERT ON distributed_table
|
||||
FOR EACH ROW EXECUTE FUNCTION update_value();
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false"
|
||||
CREATE TRIGGER update_value_ref
|
||||
AFTER INSERT ON reference_table
|
||||
FOR EACH ROW EXECUTE FUNCTION update_value();
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on reference tables
|
||||
---------------------------------------------------------------------
|
||||
-- show that we error out for trigger commands on distributed & reference tables
|
||||
---------------------------------------------------------------------
|
||||
|
@ -56,42 +56,42 @@ SET citus.enable_ddl_propagation to ON;
|
|||
CREATE EXTENSION seg;
|
||||
-- below all should error out
|
||||
ALTER TRIGGER update_value_dist ON distributed_table RENAME TO update_value_dist1;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false"
|
||||
ALTER TRIGGER update_value_dist ON distributed_table DEPENDS ON EXTENSION seg;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: Triggers "update_value_dist" on distributed tables and local tables added to metadata are not allowed to depend on an extension
|
||||
DROP TRIGGER update_value_dist ON distributed_table;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false"
|
||||
ALTER TABLE distributed_table DISABLE TRIGGER ALL;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false"
|
||||
ALTER TABLE distributed_table DISABLE TRIGGER USER;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false"
|
||||
ALTER TABLE distributed_table DISABLE TRIGGER update_value_dist;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false"
|
||||
ALTER TABLE distributed_table ENABLE TRIGGER ALL;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false"
|
||||
ALTER TABLE distributed_table ENABLE TRIGGER USER;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false"
|
||||
ALTER TABLE distributed_table ENABLE TRIGGER update_value_dist;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false"
|
||||
-- below all should error out
|
||||
ALTER TRIGGER update_value_ref ON reference_table RENAME TO update_value_ref1;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on reference tables
|
||||
ALTER TRIGGER update_value_ref ON reference_table DEPENDS ON EXTENSION seg;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: Triggers "update_value_ref" on distributed tables and local tables added to metadata are not allowed to depend on an extension
|
||||
DROP TRIGGER update_value_ref ON reference_table;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on reference tables
|
||||
ALTER TABLE reference_table DISABLE TRIGGER ALL;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on reference tables
|
||||
ALTER TABLE reference_table DISABLE TRIGGER USER;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on reference tables
|
||||
ALTER TABLE reference_table DISABLE TRIGGER update_value_ref;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on reference tables
|
||||
ALTER TABLE reference_table ENABLE TRIGGER ALL;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on reference tables
|
||||
ALTER TABLE reference_table ENABLE TRIGGER USER;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on reference tables
|
||||
ALTER TABLE reference_table ENABLE TRIGGER update_value_ref;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on reference tables
|
||||
---------------------------------------------------------------------
|
||||
-- show that we do not allow creating citus tables if the
|
||||
-- table has already triggers
|
||||
|
@ -106,9 +106,9 @@ AFTER INSERT ON reference_table_1
|
|||
FOR EACH ROW EXECUTE FUNCTION update_value();
|
||||
-- below two should fail
|
||||
SELECT create_distributed_table('distributed_table_1', 'value');
|
||||
ERROR: cannot distribute relation "distributed_table_1" because it has triggers
|
||||
ERROR: cannot distribute relation "distributed_table_1" because it has triggers and "citus.enable_unsafe_triggers" is set to "false"
|
||||
SELECT create_reference_table('reference_table_1');
|
||||
ERROR: cannot distribute relation "reference_table_1" because it has triggers
|
||||
ERROR: cannot distribute relation "reference_table_1" because it has triggers and "citus.enable_unsafe_triggers" is set to "false"
|
||||
---------------------------------------------------------------------
|
||||
-- test deparse logic for CREATE TRIGGER commands
|
||||
-- via master_get_table_ddl_events
|
||||
|
|
|
@ -346,7 +346,7 @@ BEGIN;
|
|||
FOR EACH ROW EXECUTE PROCEDURE update_value();
|
||||
-- show that we error out as we don't supprt triggers on distributed tables
|
||||
SELECT create_distributed_table('citus_local_table_6', 'col_1');
|
||||
ERROR: cannot distribute relation "citus_local_table_6" because it has triggers
|
||||
ERROR: cannot distribute relation "citus_local_table_6" because it has triggers and "citus.enable_unsafe_triggers" is set to "false"
|
||||
ROLLBACK;
|
||||
-- make sure that creating append / range distributed tables is also ok
|
||||
BEGIN;
|
||||
|
|
|
@ -0,0 +1,921 @@
|
|||
SET citus.log_remote_commands TO OFF;
|
||||
DROP SCHEMA IF EXISTS distributed_triggers CASCADE;
|
||||
NOTICE: schema "distributed_triggers" does not exist, skipping
|
||||
CREATE SCHEMA distributed_triggers;
|
||||
SET search_path TO 'distributed_triggers';
|
||||
SET citus.shard_replication_factor = 1;
|
||||
SET citus.shard_count = 32;
|
||||
SET citus.next_shard_id TO 800000;
|
||||
--
|
||||
-- Test citus.enable_unsafe_triggers
|
||||
-- Enables arbitrary triggers on distributed tables
|
||||
--
|
||||
CREATE TABLE data (
|
||||
shard_key_value text not null,
|
||||
object_id text not null,
|
||||
value jsonb not null
|
||||
);
|
||||
ALTER TABLE data
|
||||
ADD CONSTRAINT data_pk
|
||||
PRIMARY KEY (shard_key_value, object_id);
|
||||
/* table of changes */
|
||||
CREATE TABLE data_changes (
|
||||
shard_key_value text not null,
|
||||
object_id text not null,
|
||||
change_id bigint not null,
|
||||
change_time timestamptz default now(),
|
||||
operation_type text not null,
|
||||
new_value jsonb
|
||||
);
|
||||
ALTER TABLE data_changes
|
||||
ADD CONSTRAINT data_changes_pk
|
||||
PRIMARY KEY (shard_key_value, object_id, change_id);
|
||||
SELECT create_distributed_table('data', 'shard_key_value');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('data_changes', 'shard_key_value', colocate_with := 'data');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET citus.enable_unsafe_triggers TO true;
|
||||
SELECT run_command_on_workers('ALTER SYSTEM SET citus.enable_unsafe_triggers TO true;');
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,"ALTER SYSTEM")
|
||||
(localhost,57638,t,"ALTER SYSTEM")
|
||||
(2 rows)
|
||||
|
||||
SELECT run_command_on_workers('SELECT pg_reload_conf();');
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,t)
|
||||
(localhost,57638,t,t)
|
||||
(2 rows)
|
||||
|
||||
/* trigger function that is called after any change */
|
||||
CREATE OR REPLACE FUNCTION record_change()
|
||||
RETURNS trigger
|
||||
AS $$
|
||||
DECLARE
|
||||
last_change_id bigint;
|
||||
BEGIN
|
||||
IF (TG_OP = 'DELETE') THEN
|
||||
/* get the last change ID for object key in OLD via index(-only) scan */
|
||||
SELECT change_id INTO last_change_id
|
||||
FROM distributed_triggers.data_changes
|
||||
WHERE shard_key_value = OLD.shard_key_value AND object_id = OLD.object_id
|
||||
ORDER BY change_id DESC LIMIT 1;
|
||||
|
||||
/* insert a change record for the delete */
|
||||
INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type)
|
||||
VALUES (OLD.shard_key_value, OLD.object_id, COALESCE(last_change_id + 1, 1), TG_OP);
|
||||
ELSE
|
||||
/* get the last change ID for object key in NEW via index(-only) scan */
|
||||
SELECT change_id INTO last_change_id
|
||||
FROM distributed_triggers.data_changes
|
||||
WHERE shard_key_value = NEW.shard_key_value AND object_id = NEW.object_id
|
||||
ORDER BY change_id DESC LIMIT 1;
|
||||
|
||||
/* insert a change record for the insert/update */
|
||||
INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value)
|
||||
VALUES (NEW.shard_key_value, NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value);
|
||||
END IF;
|
||||
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
SELECT proname from pg_proc WHERE oid='distributed_triggers.record_change'::regproc;
|
||||
proname
|
||||
---------------------------------------------------------------------
|
||||
record_change
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE oid='distributed_triggers.record_change'::regproc$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,1)
|
||||
(localhost,57638,t,1)
|
||||
(2 rows)
|
||||
|
||||
CREATE TRIGGER record_change_trigger
|
||||
AFTER INSERT OR UPDATE OR DELETE ON data
|
||||
FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_change();
|
||||
-- Trigger function should appear on workers
|
||||
SELECT proname from pg_proc WHERE oid='distributed_triggers.record_change'::regproc;
|
||||
proname
|
||||
---------------------------------------------------------------------
|
||||
record_change
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE oid='distributed_triggers.record_change'::regproc$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,1)
|
||||
(localhost,57638,t,1)
|
||||
(2 rows)
|
||||
|
||||
INSERT INTO data VALUES ('hello','world','{"hello":"world"}');
|
||||
INSERT INTO data VALUES ('hello2','world2','{"hello2":"world2"}');
|
||||
INSERT INTO data VALUES ('hello3','world3','{"hello3":"world3"}');
|
||||
DELETE FROM data where shard_key_value = 'hello';
|
||||
BEGIN;
|
||||
UPDATE data SET value = '{}' where shard_key_value = 'hello3';
|
||||
END;
|
||||
DELETE FROM data where shard_key_value = 'hello3';
|
||||
SELECT * FROM data
|
||||
ORDER BY shard_key_value, object_id;
|
||||
shard_key_value | object_id | value
|
||||
---------------------------------------------------------------------
|
||||
hello2 | world2 | {"hello2": "world2"}
|
||||
(1 row)
|
||||
|
||||
SELECT shard_key_value, object_id, change_id, operation_type, new_value
|
||||
FROM data_changes
|
||||
ORDER BY shard_key_value, object_id, change_id;
|
||||
shard_key_value | object_id | change_id | operation_type | new_value
|
||||
---------------------------------------------------------------------
|
||||
hello | world | 1 | INSERT | {"hello": "world"}
|
||||
hello | world | 2 | DELETE |
|
||||
hello2 | world2 | 1 | INSERT | {"hello2": "world2"}
|
||||
hello3 | world3 | 1 | INSERT | {"hello3": "world3"}
|
||||
hello3 | world3 | 2 | UPDATE | {}
|
||||
hello3 | world3 | 3 | DELETE |
|
||||
(6 rows)
|
||||
|
||||
CREATE FUNCTION insert_delete_document(key text, id text)
|
||||
RETURNS void LANGUAGE plpgsql AS $fn$
|
||||
BEGIN
|
||||
INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}');
|
||||
DELETE FROM distributed_triggers.data where shard_key_value = key;
|
||||
END;
|
||||
$fn$;
|
||||
SELECT create_distributed_function(
|
||||
'insert_delete_document(text, text)', 'key',
|
||||
colocate_with := 'data',
|
||||
force_delegation := true
|
||||
);
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT insert_delete_document('hello4', 'world4');
|
||||
insert_delete_document
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SELECT insert_delete_document('hello4', 'world4');
|
||||
insert_delete_document
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
SELECT * FROM data
|
||||
ORDER BY shard_key_value, object_id;
|
||||
shard_key_value | object_id | value
|
||||
---------------------------------------------------------------------
|
||||
hello2 | world2 | {"hello2": "world2"}
|
||||
(1 row)
|
||||
|
||||
SELECT shard_key_value, object_id, change_id, operation_type, new_value
|
||||
FROM data_changes
|
||||
ORDER BY shard_key_value, object_id, change_id;
|
||||
shard_key_value | object_id | change_id | operation_type | new_value
|
||||
---------------------------------------------------------------------
|
||||
hello | world | 1 | INSERT | {"hello": "world"}
|
||||
hello | world | 2 | DELETE |
|
||||
hello2 | world2 | 1 | INSERT | {"hello2": "world2"}
|
||||
hello3 | world3 | 1 | INSERT | {"hello3": "world3"}
|
||||
hello3 | world3 | 2 | UPDATE | {}
|
||||
hello3 | world3 | 3 | DELETE |
|
||||
hello4 | world4 | 1 | INSERT | {"id1": "id2"}
|
||||
hello4 | world4 | 2 | DELETE |
|
||||
hello4 | world4 | 3 | INSERT | {"id1": "id2"}
|
||||
hello4 | world4 | 4 | DELETE |
|
||||
(10 rows)
|
||||
|
||||
SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'record_change_trigger%' ORDER BY 1,2;
|
||||
tgrelid | tgname
|
||||
---------------------------------------------------------------------
|
||||
data | record_change_trigger
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'record_change_trigger%';$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,17)
|
||||
(localhost,57638,t,17)
|
||||
(2 rows)
|
||||
|
||||
ALTER TRIGGER "record_change_trigger" ON "distributed_triggers"."data" RENAME TO "new_record_change_trigger";
|
||||
SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'record_change_trigger%' ORDER BY 1,2;
|
||||
tgrelid | tgname
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'new_record_change_trigger%' ORDER BY 1,2;
|
||||
tgrelid | tgname
|
||||
---------------------------------------------------------------------
|
||||
data | new_record_change_trigger
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'record_change_trigger%';$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,0)
|
||||
(localhost,57638,t,0)
|
||||
(2 rows)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'new_record_change_trigger%';$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,17)
|
||||
(localhost,57638,t,17)
|
||||
(2 rows)
|
||||
|
||||
--This should fail
|
||||
DROP TRIGGER record_change_trigger ON data;
|
||||
ERROR: trigger "record_change_trigger" for table "data" does not exist
|
||||
DROP TRIGGER new_record_change_trigger ON data;
|
||||
--Trigger should go away
|
||||
SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'new_record_change_trigger%' ORDER BY 1,2;
|
||||
tgrelid | tgname
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
--
|
||||
-- Run bad triggers
|
||||
--
|
||||
CREATE OR REPLACE FUNCTION bad_shardkey_record_change()
|
||||
RETURNS trigger
|
||||
AS $$
|
||||
DECLARE
|
||||
last_change_id bigint;
|
||||
BEGIN
|
||||
INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value)
|
||||
VALUES ('BAD', NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value);
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE TRIGGER bad_shardkey_record_change_trigger
|
||||
AFTER INSERT OR UPDATE OR DELETE ON data
|
||||
FOR EACH ROW EXECUTE FUNCTION distributed_triggers.bad_shardkey_record_change();
|
||||
-- Bad trigger fired from an individual SQL
|
||||
-- Query-on-distributed table exception should catch this
|
||||
INSERT INTO data VALUES ('hello6','world6','{"hello6":"world6"}');
|
||||
ERROR: cannot execute a distributed query from a query on a shard
|
||||
CONTEXT: SQL statement "INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value)
|
||||
VALUES ('BAD', NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value)"
|
||||
PL/pgSQL function bad_shardkey_record_change() line XX at SQL statement
|
||||
while executing command on localhost:xxxxx
|
||||
-- Bad trigger fired from SQL inside a force-delegated function
|
||||
-- Incorrect distribution key exception should catch this
|
||||
SELECT insert_delete_document('hello6', 'world6');
|
||||
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
|
||||
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
|
||||
CONTEXT: SQL statement "INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value)
|
||||
VALUES ('BAD', NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value)"
|
||||
PL/pgSQL function distributed_triggers.bad_shardkey_record_change() line XX at SQL statement
|
||||
SQL statement "INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}')"
|
||||
PL/pgSQL function distributed_triggers.insert_delete_document(text,text) line XX at SQL statement
|
||||
while executing command on localhost:xxxxx
|
||||
SELECT * FROM data
|
||||
ORDER BY shard_key_value, object_id;
|
||||
shard_key_value | object_id | value
|
||||
---------------------------------------------------------------------
|
||||
hello2 | world2 | {"hello2": "world2"}
|
||||
(1 row)
|
||||
|
||||
SELECT shard_key_value, object_id, change_id, operation_type, new_value
|
||||
FROM data_changes
|
||||
ORDER BY shard_key_value, object_id, change_id;
|
||||
shard_key_value | object_id | change_id | operation_type | new_value
|
||||
---------------------------------------------------------------------
|
||||
hello | world | 1 | INSERT | {"hello": "world"}
|
||||
hello | world | 2 | DELETE |
|
||||
hello2 | world2 | 1 | INSERT | {"hello2": "world2"}
|
||||
hello3 | world3 | 1 | INSERT | {"hello3": "world3"}
|
||||
hello3 | world3 | 2 | UPDATE | {}
|
||||
hello3 | world3 | 3 | DELETE |
|
||||
hello4 | world4 | 1 | INSERT | {"id1": "id2"}
|
||||
hello4 | world4 | 2 | DELETE |
|
||||
hello4 | world4 | 3 | INSERT | {"id1": "id2"}
|
||||
hello4 | world4 | 4 | DELETE |
|
||||
(10 rows)
|
||||
|
||||
DROP TRIGGER bad_shardkey_record_change_trigger ON data;
|
||||
CREATE OR REPLACE FUNCTION remote_shardkey_record_change()
|
||||
RETURNS trigger
|
||||
AS $$
|
||||
DECLARE
|
||||
last_change_id bigint;
|
||||
BEGIN
|
||||
UPDATE distributed_triggers.data_changes SET operation_type = TG_OP;
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE TRIGGER remote_shardkey_record_change_trigger
|
||||
AFTER INSERT OR UPDATE OR DELETE ON data
|
||||
FOR EACH ROW EXECUTE FUNCTION distributed_triggers.remote_shardkey_record_change();
|
||||
CREATE FUNCTION insert_document(key text, id text)
|
||||
RETURNS void LANGUAGE plpgsql AS $fn$
|
||||
BEGIN
|
||||
INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}');
|
||||
DELETE FROM distributed_triggers.data where shard_key_value = key;
|
||||
END;
|
||||
$fn$;
|
||||
SELECT create_distributed_function(
|
||||
'insert_document(text, text)', 'key',
|
||||
colocate_with := 'data',
|
||||
force_delegation := false
|
||||
);
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SELECT insert_document('hello7', 'world7');
|
||||
ERROR: cannot execute a distributed query from a query on a shard
|
||||
CONTEXT: SQL statement "UPDATE distributed_triggers.data_changes SET operation_type = TG_OP"
|
||||
PL/pgSQL function remote_shardkey_record_change() line XX at SQL statement
|
||||
while executing command on localhost:xxxxx
|
||||
SQL statement "INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}')"
|
||||
PL/pgSQL function insert_document(text,text) line XX at SQL statement
|
||||
END;
|
||||
SELECT insert_document('hello7', 'world7');
|
||||
ERROR: cannot execute a distributed query from a query on a shard
|
||||
CONTEXT: SQL statement "UPDATE distributed_triggers.data_changes SET operation_type = TG_OP"
|
||||
PL/pgSQL function remote_shardkey_record_change() line XX at SQL statement
|
||||
SQL statement "INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}')"
|
||||
PL/pgSQL function insert_document(text,text) line XX at SQL statement
|
||||
while executing command on localhost:xxxxx
|
||||
SELECT * FROM data
|
||||
ORDER BY shard_key_value, object_id;
|
||||
shard_key_value | object_id | value
|
||||
---------------------------------------------------------------------
|
||||
hello2 | world2 | {"hello2": "world2"}
|
||||
(1 row)
|
||||
|
||||
SELECT shard_key_value, object_id, change_id, operation_type, new_value
|
||||
FROM data_changes
|
||||
ORDER BY shard_key_value, object_id, change_id;
|
||||
shard_key_value | object_id | change_id | operation_type | new_value
|
||||
---------------------------------------------------------------------
|
||||
hello | world | 1 | INSERT | {"hello": "world"}
|
||||
hello | world | 2 | DELETE |
|
||||
hello2 | world2 | 1 | INSERT | {"hello2": "world2"}
|
||||
hello3 | world3 | 1 | INSERT | {"hello3": "world3"}
|
||||
hello3 | world3 | 2 | UPDATE | {}
|
||||
hello3 | world3 | 3 | DELETE |
|
||||
hello4 | world4 | 1 | INSERT | {"id1": "id2"}
|
||||
hello4 | world4 | 2 | DELETE |
|
||||
hello4 | world4 | 3 | INSERT | {"id1": "id2"}
|
||||
hello4 | world4 | 4 | DELETE |
|
||||
(10 rows)
|
||||
|
||||
--
|
||||
-- Triggers (tables) which are not colocated
|
||||
--
|
||||
CREATE TABLE emptest (
|
||||
empname text NOT NULL,
|
||||
salary integer
|
||||
);
|
||||
CREATE TABLE emptest_audit(
|
||||
operation char(1) NOT NULL,
|
||||
stamp timestamp NOT NULL,
|
||||
userid text NOT NULL,
|
||||
empname text NOT NULL,
|
||||
salary integer
|
||||
);
|
||||
SELECT create_distributed_table('emptest','empname',colocate_with :='none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('emptest_audit','empname',colocate_with :='none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE OR REPLACE FUNCTION process_emp_audit() RETURNS TRIGGER AS $emp_audit$
|
||||
BEGIN
|
||||
--
|
||||
-- Create a row in emp_audit to reflect the operation performed on emp,
|
||||
-- making use of the special variable TG_OP to work out the operation.
|
||||
--
|
||||
IF (TG_OP = 'DELETE') THEN
|
||||
INSERT INTO distributed_triggers.emptest_audit SELECT 'D', now(), user, OLD.*;
|
||||
ELSIF (TG_OP = 'UPDATE') THEN
|
||||
INSERT INTO distributed_triggers.emptest_audit SELECT 'U', now(), user, NEW.*;
|
||||
ELSIF (TG_OP = 'INSERT') THEN
|
||||
INSERT INTO distributed_triggers.emptest_audit SELECT 'I', now(), user, NEW.*;
|
||||
END IF;
|
||||
RETURN NULL; -- result is ignored since this is an AFTER trigger
|
||||
END;
|
||||
$emp_audit$ LANGUAGE plpgsql;
|
||||
CREATE TRIGGER emptest_audit
|
||||
AFTER INSERT OR UPDATE OR DELETE ON emptest
|
||||
FOR EACH ROW EXECUTE FUNCTION distributed_triggers.process_emp_audit();
|
||||
INSERT INTO emptest VALUES ('test1', 1);
|
||||
INSERT INTO emptest VALUES ('test2', 1);
|
||||
INSERT INTO emptest VALUES ('test3', 1);
|
||||
INSERT INTO emptest VALUES ('test4', 1);
|
||||
SELECT operation, userid, empname, salary
|
||||
FROM emptest_audit
|
||||
ORDER BY 3,1;
|
||||
operation | userid | empname | salary
|
||||
---------------------------------------------------------------------
|
||||
I | postgres | test1 | 1
|
||||
I | postgres | test2 | 1
|
||||
I | postgres | test3 | 1
|
||||
I | postgres | test4 | 1
|
||||
(4 rows)
|
||||
|
||||
DELETE from emptest;
|
||||
SELECT operation, userid, empname, salary
|
||||
FROM emptest_audit
|
||||
ORDER BY 3,1;
|
||||
operation | userid | empname | salary
|
||||
---------------------------------------------------------------------
|
||||
D | postgres | test1 | 1
|
||||
I | postgres | test1 | 1
|
||||
D | postgres | test2 | 1
|
||||
I | postgres | test2 | 1
|
||||
D | postgres | test3 | 1
|
||||
I | postgres | test3 | 1
|
||||
D | postgres | test4 | 1
|
||||
I | postgres | test4 | 1
|
||||
(8 rows)
|
||||
|
||||
CREATE VIEW emp_triggers AS
|
||||
SELECT tgname, tgrelid::regclass, tgenabled
|
||||
FROM pg_trigger
|
||||
WHERE tgrelid::regclass::text like 'emptest%'
|
||||
ORDER BY 1, 2;
|
||||
SELECT * FROM emp_triggers ORDER BY 1,2;
|
||||
tgname | tgrelid | tgenabled
|
||||
---------------------------------------------------------------------
|
||||
emptest_audit | emptest | O
|
||||
truncate_trigger_xxxxxxx | emptest | O
|
||||
truncate_trigger_xxxxxxx | emptest_audit | O
|
||||
(3 rows)
|
||||
|
||||
-- Triggers "FOR EACH STATEMENT"
|
||||
CREATE TABLE record_op (
|
||||
empname text NOT NULL,
|
||||
operation_type text not null,
|
||||
stamp timestamp NOT NULL
|
||||
);
|
||||
SELECT create_distributed_table('record_op', 'empname', colocate_with := 'emptest');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE OR REPLACE FUNCTION record_emp() RETURNS TRIGGER AS $rec_audit$
|
||||
BEGIN
|
||||
INSERT INTO distributed_triggers.record_op SELECT 'dummy', TG_OP, now();
|
||||
RETURN NULL; -- result is ignored since this is an AFTER trigger
|
||||
END;
|
||||
$rec_audit$ LANGUAGE plpgsql;
|
||||
CREATE TRIGGER record_emp_trig
|
||||
AFTER INSERT OR UPDATE OR DELETE ON emptest
|
||||
FOR EACH STATEMENT EXECUTE FUNCTION distributed_triggers.record_emp();
|
||||
INSERT INTO emptest VALUES ('test5', 1);
|
||||
ERROR: cannot execute a distributed query from a query on a shard
|
||||
CONTEXT: SQL statement "INSERT INTO distributed_triggers.record_op SELECT 'dummy', TG_OP, now()"
|
||||
PL/pgSQL function record_emp() line XX at SQL statement
|
||||
while executing command on localhost:xxxxx
|
||||
DELETE FROM emptest;
|
||||
ERROR: cannot execute a distributed query from a query on a shard
|
||||
CONTEXT: SQL statement "INSERT INTO distributed_triggers.record_op SELECT 'dummy', TG_OP, now()"
|
||||
PL/pgSQL function distributed_triggers.record_emp() line XX at SQL statement
|
||||
while executing command on localhost:xxxxx
|
||||
SELECT * FROM emptest;
|
||||
empname | salary
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT operation_type FROM record_op;
|
||||
operation_type
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
--
|
||||
-- Triggers on reference tables
|
||||
--
|
||||
CREATE TABLE data_ref_table (
|
||||
shard_key_value text not null,
|
||||
object_id text not null,
|
||||
value jsonb not null
|
||||
);
|
||||
ALTER TABLE data_ref_table
|
||||
ADD CONSTRAINT data_ref_pk
|
||||
PRIMARY KEY (shard_key_value, object_id);
|
||||
SELECT create_reference_table('data_ref_table');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Trigger function record_change operates on data_changes which is *not* colocated with the reference table
|
||||
CREATE TRIGGER record_change_trigger
|
||||
AFTER INSERT OR UPDATE OR DELETE ON data_ref_table
|
||||
FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_change();
|
||||
TRUNCATE TABLE data_changes;
|
||||
INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}');
|
||||
ERROR: cannot execute a distributed query from a query on a shard
|
||||
CONTEXT: SQL statement "SELECT change_id FROM distributed_triggers.data_changes
|
||||
WHERE shard_key_value = NEW.shard_key_value AND object_id = NEW.object_id
|
||||
ORDER BY change_id DESC LIMIT 1"
|
||||
PL/pgSQL function record_change() line XX at SQL statement
|
||||
while executing command on localhost:xxxxx
|
||||
INSERT INTO data_ref_table VALUES ('hello2','world2','{"ref":"table"}');
|
||||
ERROR: cannot execute a distributed query from a query on a shard
|
||||
CONTEXT: SQL statement "SELECT change_id FROM distributed_triggers.data_changes
|
||||
WHERE shard_key_value = NEW.shard_key_value AND object_id = NEW.object_id
|
||||
ORDER BY change_id DESC LIMIT 1"
|
||||
PL/pgSQL function record_change() line XX at SQL statement
|
||||
while executing command on localhost:xxxxx
|
||||
DELETE FROM data_ref_table where shard_key_value = 'hello';
|
||||
BEGIN;
|
||||
UPDATE data_ref_table SET value = '{}' where shard_key_value = 'hello2';
|
||||
END;
|
||||
TABLE data_changes ORDER BY shard_key_value, object_id, change_id;
|
||||
shard_key_value | object_id | change_id | change_time | operation_type | new_value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
TABLE data_ref_table ORDER BY shard_key_value, object_id;
|
||||
shard_key_value | object_id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- Colocate data_changes table with reference table
|
||||
SELECT undistribute_table('data_changes');
|
||||
NOTICE: creating a new table for distributed_triggers.data_changes
|
||||
NOTICE: moving the data of distributed_triggers.data_changes
|
||||
NOTICE: dropping the old distributed_triggers.data_changes
|
||||
NOTICE: renaming the new table to distributed_triggers.data_changes
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_reference_table('data_changes');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}');
|
||||
ERROR: cannot execute a distributed query from a query on a shard
|
||||
CONTEXT: SQL statement "INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value)
|
||||
VALUES (NEW.shard_key_value, NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value)"
|
||||
PL/pgSQL function record_change() line XX at SQL statement
|
||||
while executing command on localhost:xxxxx
|
||||
TABLE data_changes ORDER BY shard_key_value, object_id, change_id;
|
||||
shard_key_value | object_id | change_id | change_time | operation_type | new_value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
TABLE data_ref_table ORDER BY shard_key_value, object_id;
|
||||
shard_key_value | object_id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- Create data_changes table locally with reference table
|
||||
DROP TABLE data_changes;
|
||||
/* table of changes local to each placement of the reference table */
|
||||
CREATE TABLE data_changes (
|
||||
shard_key_value text not null,
|
||||
object_id text not null,
|
||||
change_id bigint not null,
|
||||
change_time timestamptz default now(),
|
||||
operation_type text not null,
|
||||
new_value jsonb
|
||||
);
|
||||
SELECT run_command_on_workers($$CREATE TABLE distributed_triggers.data_changes(
|
||||
shard_key_value text not null,
|
||||
object_id text not null,
|
||||
change_id bigint not null,
|
||||
change_time timestamptz default now(),
|
||||
operation_type text not null,
|
||||
new_value jsonb);
|
||||
$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,"CREATE TABLE")
|
||||
(localhost,57638,t,"CREATE TABLE")
|
||||
(2 rows)
|
||||
|
||||
SELECT run_command_on_workers('SELECT count(*) FROM distributed_triggers.data_changes;');
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,0)
|
||||
(localhost,57638,t,0)
|
||||
(2 rows)
|
||||
|
||||
INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}');
|
||||
INSERT INTO data_ref_table VALUES ('hello2','world2','{"ref":"table"}');
|
||||
BEGIN;
|
||||
UPDATE data_ref_table SET value = '{}';
|
||||
END;
|
||||
SELECT run_command_on_workers('SELECT count(*) FROM distributed_triggers.data_changes;');
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,4)
|
||||
(localhost,57638,t,4)
|
||||
(2 rows)
|
||||
|
||||
TABLE data_ref_table ORDER BY shard_key_value, object_id;
|
||||
shard_key_value | object_id | value
|
||||
---------------------------------------------------------------------
|
||||
hello | world | {}
|
||||
hello2 | world2 | {}
|
||||
(2 rows)
|
||||
|
||||
--
|
||||
--Triggers on partitioned tables
|
||||
--
|
||||
CREATE TABLE sale(sale_date date not null, state_code text, product_sku text, units integer)
|
||||
PARTITION BY list (state_code);
|
||||
ALTER TABLE sale ADD CONSTRAINT sale_pk PRIMARY KEY (state_code, sale_date);
|
||||
CREATE TABLE sale_newyork PARTITION OF sale FOR VALUES IN ('NY');
|
||||
CREATE TABLE sale_california PARTITION OF sale FOR VALUES IN ('CA');
|
||||
CREATE TABLE record_sale(operation_type text not null, product_sku text, state_code text);
|
||||
SELECT create_distributed_table('sale', 'state_code');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('record_sale', 'state_code', colocate_with := 'sale');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE OR REPLACE FUNCTION record_sale()
|
||||
RETURNS trigger
|
||||
AS $$
|
||||
BEGIN
|
||||
INSERT INTO distributed_triggers.record_sale(operation_type, product_sku, state_code)
|
||||
VALUES (TG_OP, NEW.product_sku, NEW.state_code);
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE TRIGGER record_sale_trigger
|
||||
AFTER INSERT OR UPDATE OR DELETE ON sale
|
||||
FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_sale();
|
||||
INSERT INTO sale(sale_date,state_code,product_sku,units) VALUES
|
||||
('2019-01-01', 'CA', 'AZ-000A1', 85),
|
||||
('2019-01-02', 'CA', 'AZ-000A1', 6),
|
||||
('2019-01-03', 'NY', 'AZ-000A2', 83),
|
||||
('2019-02-01', 'CA', 'AZ-000A2', 59),
|
||||
('2019-02-02', 'CA', 'AZ-000A1', 9),
|
||||
('2019-02-03', 'NY', 'AZ-000A1', 47);
|
||||
TABLE sale ORDER BY state_code, sale_date;
|
||||
sale_date | state_code | product_sku | units
|
||||
---------------------------------------------------------------------
|
||||
01-01-2019 | CA | AZ-000A1 | 85
|
||||
01-02-2019 | CA | AZ-000A1 | 6
|
||||
02-01-2019 | CA | AZ-000A2 | 59
|
||||
02-02-2019 | CA | AZ-000A1 | 9
|
||||
01-03-2019 | NY | AZ-000A2 | 83
|
||||
02-03-2019 | NY | AZ-000A1 | 47
|
||||
(6 rows)
|
||||
|
||||
TABLE record_sale ORDER BY 1,2,3;
|
||||
operation_type | product_sku | state_code
|
||||
---------------------------------------------------------------------
|
||||
INSERT | AZ-000A1 | CA
|
||||
INSERT | AZ-000A1 | CA
|
||||
INSERT | AZ-000A1 | CA
|
||||
INSERT | AZ-000A1 | NY
|
||||
INSERT | AZ-000A2 | CA
|
||||
INSERT | AZ-000A2 | NY
|
||||
(6 rows)
|
||||
|
||||
--
|
||||
--Test ALTER TRIGGER
|
||||
--
|
||||
CREATE VIEW sale_triggers AS
|
||||
SELECT tgname, tgrelid::regclass, tgenabled
|
||||
FROM pg_trigger
|
||||
WHERE tgrelid::regclass::text like 'sale%'
|
||||
ORDER BY 1, 2;
|
||||
SELECT * FROM sale_triggers ORDER BY 1,2;
|
||||
tgname | tgrelid | tgenabled
|
||||
---------------------------------------------------------------------
|
||||
record_sale_trigger | sale | O
|
||||
record_sale_trigger | sale_newyork | O
|
||||
record_sale_trigger | sale_california | O
|
||||
truncate_trigger_xxxxxxx | sale | O
|
||||
truncate_trigger_xxxxxxx | sale_california | O
|
||||
truncate_trigger_xxxxxxx | sale_newyork | O
|
||||
(6 rows)
|
||||
|
||||
ALTER TRIGGER "record_sale_trigger" ON "distributed_triggers"."sale" RENAME TO "new_record_sale_trigger";
|
||||
SELECT * FROM sale_triggers ORDER BY 1,2;
|
||||
tgname | tgrelid | tgenabled
|
||||
---------------------------------------------------------------------
|
||||
new_record_sale_trigger | sale | O
|
||||
record_sale_trigger | sale_newyork | O
|
||||
record_sale_trigger | sale_california | O
|
||||
truncate_trigger_xxxxxxx | sale | O
|
||||
truncate_trigger_xxxxxxx | sale_california | O
|
||||
truncate_trigger_xxxxxxx | sale_newyork | O
|
||||
(6 rows)
|
||||
|
||||
CREATE EXTENSION seg;
|
||||
ALTER TRIGGER "emptest_audit" ON "emptest" DEPENDS ON EXTENSION seg;
|
||||
ERROR: Triggers "emptest_audit" on distributed tables and local tables added to metadata are not allowed to depend on an extension
|
||||
DETAIL: Triggers from extensions are expected to be created on the workers by the extension they depend on.
|
||||
DROP TABLE data_ref_table;
|
||||
--
|
||||
--Triggers with add/remove node
|
||||
--
|
||||
SELECT * FROM master_drain_node('localhost', :worker_2_port);
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||
master_drain_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT 1 from master_remove_node('localhost', :worker_2_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE distributed_table(value int);
|
||||
CREATE TABLE distributed_table_change(value int);
|
||||
SELECT create_distributed_table('distributed_table', 'value', colocate_with => 'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('distributed_table_change', 'value', colocate_with => 'distributed_table');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE FUNCTION insert_99() RETURNS trigger AS $insert_99$
|
||||
BEGIN
|
||||
INSERT INTO distributed_triggers.distributed_table_change VALUES (99);
|
||||
RETURN NEW;
|
||||
END;
|
||||
$insert_99$ LANGUAGE plpgsql;
|
||||
CREATE TRIGGER insert_99_trigger
|
||||
AFTER DELETE ON distributed_table
|
||||
FOR EACH ROW EXECUTE FUNCTION distributed_triggers.insert_99();
|
||||
SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'insert_99_trigger%' ORDER BY 1,2;
|
||||
tgrelid | tgname
|
||||
---------------------------------------------------------------------
|
||||
distributed_table | insert_99_trigger
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'insert_99_trigger%'$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,33)
|
||||
(1 row)
|
||||
|
||||
INSERT INTO distributed_table VALUES (99);
|
||||
DELETE FROM distributed_table;
|
||||
SELECT * FROM distributed_table_change;
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
99
|
||||
(1 row)
|
||||
|
||||
-- add the node back
|
||||
SELECT 1 from master_add_node('localhost', :worker_2_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
INSERT INTO distributed_table VALUES (99);
|
||||
DELETE FROM distributed_table;
|
||||
SELECT * FROM distributed_table_change;
|
||||
value
|
||||
---------------------------------------------------------------------
|
||||
99
|
||||
99
|
||||
(2 rows)
|
||||
|
||||
SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'insert_99_trigger%' ORDER BY 1,2;
|
||||
tgrelid | tgname
|
||||
---------------------------------------------------------------------
|
||||
distributed_table | insert_99_trigger
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'insert_99_trigger%'$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,33)
|
||||
(localhost,57638,t,1)
|
||||
(2 rows)
|
||||
|
||||
RESET client_min_messages;
|
||||
SET citus.enable_unsafe_triggers TO false;
|
||||
SELECT run_command_on_workers('ALTER SYSTEM SET citus.enable_unsafe_triggers TO false;');
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,"ALTER SYSTEM")
|
||||
(localhost,57638,t,"ALTER SYSTEM")
|
||||
(2 rows)
|
||||
|
||||
SELECT run_command_on_workers('SELECT pg_reload_conf();');
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,t)
|
||||
(localhost,57638,t,t)
|
||||
(2 rows)
|
||||
|
||||
SET citus.log_remote_commands TO off;
|
||||
DROP SCHEMA distributed_triggers CASCADE;
|
||||
NOTICE: drop cascades to 21 other objects
|
||||
DETAIL: drop cascades to table data
|
||||
drop cascades to function record_change()
|
||||
drop cascades to function insert_delete_document(text,text)
|
||||
drop cascades to function bad_shardkey_record_change()
|
||||
drop cascades to function remote_shardkey_record_change()
|
||||
drop cascades to function insert_document(text,text)
|
||||
drop cascades to table emptest
|
||||
drop cascades to table emptest_audit
|
||||
drop cascades to function process_emp_audit()
|
||||
drop cascades to view emp_triggers
|
||||
drop cascades to table record_op
|
||||
drop cascades to function record_emp()
|
||||
drop cascades to table data_changes
|
||||
drop cascades to table sale
|
||||
drop cascades to table record_sale
|
||||
drop cascades to function record_sale()
|
||||
drop cascades to view sale_triggers
|
||||
drop cascades to extension seg
|
||||
drop cascades to table distributed_table
|
||||
drop cascades to table distributed_table_change
|
||||
drop cascades to function insert_99()
|
|
@ -30,6 +30,7 @@ test: propagate_extension_commands
|
|||
test: escape_extension_name
|
||||
test: ref_citus_local_fkeys
|
||||
test: alter_database_owner
|
||||
test: distributed_triggers
|
||||
|
||||
test: multi_test_catalog_views
|
||||
test: multi_table_ddl
|
||||
|
|
|
@ -1007,7 +1007,7 @@ SELECT value, count(*) FROM trigger_table GROUP BY value ORDER BY value;
|
|||
(1 row)
|
||||
|
||||
ALTER TABLE trigger_table DISABLE TRIGGER ALL;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false"
|
||||
INSERT INTO trigger_table VALUES (1, 'trigger disabled');
|
||||
SELECT value, count(*) FROM trigger_table GROUP BY value ORDER BY value;
|
||||
value | count
|
||||
|
@ -1016,7 +1016,7 @@ SELECT value, count(*) FROM trigger_table GROUP BY value ORDER BY value;
|
|||
(1 row)
|
||||
|
||||
ALTER TABLE trigger_table ENABLE TRIGGER ALL;
|
||||
ERROR: triggers are only supported for local tables added to metadata
|
||||
ERROR: triggers are not supported on distributed tables when "citus.enable_unsafe_triggers" is set to "false"
|
||||
INSERT INTO trigger_table VALUES (1, 'trigger disabled');
|
||||
SELECT value, count(*) FROM trigger_table GROUP BY value ORDER BY value;
|
||||
value | count
|
||||
|
|
|
@ -161,12 +161,11 @@ BEFORE INSERT ON "interesting!schema"."citus_local!_table"
|
|||
FOR EACH STATEMENT EXECUTE FUNCTION dummy_function();
|
||||
|
||||
CREATE EXTENSION seg;
|
||||
-- ALTER TRIGGER DEPENDS ON
|
||||
ALTER TRIGGER "trigger\'name" ON "interesting!schema"."citus_local!_table" DEPENDS ON EXTENSION seg;
|
||||
|
||||
BEGIN;
|
||||
-- ALTER TRIGGER DEPENDS ON
|
||||
ALTER TRIGGER "trigger\'name" ON "interesting!schema"."citus_local!_table" DEPENDS ON EXTENSION seg;
|
||||
|
||||
-- show that triggers on both shell relation and shard relation are depending on seg
|
||||
-- show that triggers on both shell relation and shard relation are not depending on seg
|
||||
SELECT tgname FROM pg_depend, pg_trigger, pg_extension
|
||||
WHERE deptype = 'x' and classid='pg_trigger'::regclass and
|
||||
pg_trigger.oid=pg_depend.objid and extname='seg'
|
||||
|
@ -174,7 +173,7 @@ BEGIN;
|
|||
|
||||
DROP EXTENSION seg;
|
||||
|
||||
-- show that dropping extension drops the triggers automatically
|
||||
-- show that dropping extension doesn't drop the triggers automatically
|
||||
SELECT * FROM citus_local_table_triggers
|
||||
WHERE tgname NOT LIKE 'RI_ConstraintTrigger%';
|
||||
ROLLBACK;
|
||||
|
|
|
@ -37,22 +37,19 @@ FOR EACH ROW EXECUTE FUNCTION dummy_function();
|
|||
-- the function that trigger needs in mx workers too.
|
||||
SELECT 1 FROM citus_activate_node('localhost', :worker_1_port);
|
||||
|
||||
-- show that the trigger is not allowed to depend(explicitly) on an extension.
|
||||
CREATE EXTENSION seg;
|
||||
ALTER TRIGGER dummy_function_trigger ON citus_local_table DEPENDS ON EXTENSION seg;
|
||||
ALTER TRIGGER dummy_function_trigger ON citus_local_table RENAME TO renamed_trigger;
|
||||
ALTER TABLE citus_local_table DISABLE TRIGGER ALL;
|
||||
-- show that update trigger mx relation are depending on seg, renamed and disabled.
|
||||
-- show that update trigger mx relation is renamed and disabled.
|
||||
-- both workers should should print 1.
|
||||
SELECT run_command_on_workers(
|
||||
$$
|
||||
SELECT COUNT(*) FROM pg_depend, pg_trigger, pg_extension
|
||||
SELECT COUNT(*) FROM pg_trigger
|
||||
WHERE pg_trigger.tgrelid='citus_local_tables_mx.citus_local_table'::regclass AND
|
||||
pg_trigger.tgname='renamed_trigger' AND
|
||||
pg_trigger.tgenabled='D' AND
|
||||
pg_depend.classid='pg_trigger'::regclass AND
|
||||
pg_depend.deptype='x' AND
|
||||
pg_trigger.oid=pg_depend.objid AND
|
||||
pg_extension.extname='seg'
|
||||
pg_trigger.tgenabled='D'
|
||||
$$);
|
||||
|
||||
CREATE FUNCTION another_dummy_function() RETURNS trigger AS $another_dummy_function$
|
||||
|
|
|
@ -0,0 +1,470 @@
|
|||
SET citus.log_remote_commands TO OFF;
|
||||
DROP SCHEMA IF EXISTS distributed_triggers CASCADE;
|
||||
CREATE SCHEMA distributed_triggers;
|
||||
SET search_path TO 'distributed_triggers';
|
||||
SET citus.shard_replication_factor = 1;
|
||||
SET citus.shard_count = 32;
|
||||
SET citus.next_shard_id TO 800000;
|
||||
|
||||
--
|
||||
-- Test citus.enable_unsafe_triggers
|
||||
-- Enables arbitrary triggers on distributed tables
|
||||
--
|
||||
CREATE TABLE data (
|
||||
shard_key_value text not null,
|
||||
object_id text not null,
|
||||
value jsonb not null
|
||||
);
|
||||
ALTER TABLE data
|
||||
ADD CONSTRAINT data_pk
|
||||
PRIMARY KEY (shard_key_value, object_id);
|
||||
|
||||
/* table of changes */
|
||||
CREATE TABLE data_changes (
|
||||
shard_key_value text not null,
|
||||
object_id text not null,
|
||||
change_id bigint not null,
|
||||
change_time timestamptz default now(),
|
||||
operation_type text not null,
|
||||
new_value jsonb
|
||||
);
|
||||
ALTER TABLE data_changes
|
||||
ADD CONSTRAINT data_changes_pk
|
||||
PRIMARY KEY (shard_key_value, object_id, change_id);
|
||||
|
||||
SELECT create_distributed_table('data', 'shard_key_value');
|
||||
SELECT create_distributed_table('data_changes', 'shard_key_value', colocate_with := 'data');
|
||||
|
||||
SET citus.enable_unsafe_triggers TO true;
|
||||
SELECT run_command_on_workers('ALTER SYSTEM SET citus.enable_unsafe_triggers TO true;');
|
||||
SELECT run_command_on_workers('SELECT pg_reload_conf();');
|
||||
|
||||
/* trigger function that is called after any change */
|
||||
CREATE OR REPLACE FUNCTION record_change()
|
||||
RETURNS trigger
|
||||
AS $$
|
||||
DECLARE
|
||||
last_change_id bigint;
|
||||
BEGIN
|
||||
IF (TG_OP = 'DELETE') THEN
|
||||
/* get the last change ID for object key in OLD via index(-only) scan */
|
||||
SELECT change_id INTO last_change_id
|
||||
FROM distributed_triggers.data_changes
|
||||
WHERE shard_key_value = OLD.shard_key_value AND object_id = OLD.object_id
|
||||
ORDER BY change_id DESC LIMIT 1;
|
||||
|
||||
/* insert a change record for the delete */
|
||||
INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type)
|
||||
VALUES (OLD.shard_key_value, OLD.object_id, COALESCE(last_change_id + 1, 1), TG_OP);
|
||||
ELSE
|
||||
/* get the last change ID for object key in NEW via index(-only) scan */
|
||||
SELECT change_id INTO last_change_id
|
||||
FROM distributed_triggers.data_changes
|
||||
WHERE shard_key_value = NEW.shard_key_value AND object_id = NEW.object_id
|
||||
ORDER BY change_id DESC LIMIT 1;
|
||||
|
||||
/* insert a change record for the insert/update */
|
||||
INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value)
|
||||
VALUES (NEW.shard_key_value, NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value);
|
||||
END IF;
|
||||
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
SELECT proname from pg_proc WHERE oid='distributed_triggers.record_change'::regproc;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE oid='distributed_triggers.record_change'::regproc$$);
|
||||
|
||||
CREATE TRIGGER record_change_trigger
|
||||
AFTER INSERT OR UPDATE OR DELETE ON data
|
||||
FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_change();
|
||||
|
||||
-- Trigger function should appear on workers
|
||||
SELECT proname from pg_proc WHERE oid='distributed_triggers.record_change'::regproc;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE oid='distributed_triggers.record_change'::regproc$$);
|
||||
|
||||
INSERT INTO data VALUES ('hello','world','{"hello":"world"}');
|
||||
INSERT INTO data VALUES ('hello2','world2','{"hello2":"world2"}');
|
||||
INSERT INTO data VALUES ('hello3','world3','{"hello3":"world3"}');
|
||||
DELETE FROM data where shard_key_value = 'hello';
|
||||
BEGIN;
|
||||
UPDATE data SET value = '{}' where shard_key_value = 'hello3';
|
||||
END;
|
||||
DELETE FROM data where shard_key_value = 'hello3';
|
||||
|
||||
SELECT * FROM data
|
||||
ORDER BY shard_key_value, object_id;
|
||||
SELECT shard_key_value, object_id, change_id, operation_type, new_value
|
||||
FROM data_changes
|
||||
ORDER BY shard_key_value, object_id, change_id;
|
||||
|
||||
CREATE FUNCTION insert_delete_document(key text, id text)
|
||||
RETURNS void LANGUAGE plpgsql AS $fn$
|
||||
BEGIN
|
||||
INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}');
|
||||
DELETE FROM distributed_triggers.data where shard_key_value = key;
|
||||
END;
|
||||
$fn$;
|
||||
|
||||
SELECT create_distributed_function(
|
||||
'insert_delete_document(text, text)', 'key',
|
||||
colocate_with := 'data',
|
||||
force_delegation := true
|
||||
);
|
||||
|
||||
SELECT insert_delete_document('hello4', 'world4');
|
||||
BEGIN;
|
||||
SELECT insert_delete_document('hello4', 'world4');
|
||||
COMMIT;
|
||||
|
||||
SELECT * FROM data
|
||||
ORDER BY shard_key_value, object_id;
|
||||
SELECT shard_key_value, object_id, change_id, operation_type, new_value
|
||||
FROM data_changes
|
||||
ORDER BY shard_key_value, object_id, change_id;
|
||||
|
||||
SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'record_change_trigger%' ORDER BY 1,2;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'record_change_trigger%';$$);
|
||||
|
||||
ALTER TRIGGER "record_change_trigger" ON "distributed_triggers"."data" RENAME TO "new_record_change_trigger";
|
||||
|
||||
SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'record_change_trigger%' ORDER BY 1,2;
|
||||
SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'new_record_change_trigger%' ORDER BY 1,2;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'record_change_trigger%';$$);
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'new_record_change_trigger%';$$);
|
||||
|
||||
--This should fail
|
||||
DROP TRIGGER record_change_trigger ON data;
|
||||
DROP TRIGGER new_record_change_trigger ON data;
|
||||
--Trigger should go away
|
||||
SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'new_record_change_trigger%' ORDER BY 1,2;
|
||||
|
||||
--
|
||||
-- Run bad triggers
|
||||
--
|
||||
CREATE OR REPLACE FUNCTION bad_shardkey_record_change()
|
||||
RETURNS trigger
|
||||
AS $$
|
||||
DECLARE
|
||||
last_change_id bigint;
|
||||
BEGIN
|
||||
INSERT INTO distributed_triggers.data_changes (shard_key_value, object_id, change_id, operation_type, new_value)
|
||||
VALUES ('BAD', NEW.object_id, COALESCE(last_change_id + 1, 1), TG_OP, NEW.value);
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER bad_shardkey_record_change_trigger
|
||||
AFTER INSERT OR UPDATE OR DELETE ON data
|
||||
FOR EACH ROW EXECUTE FUNCTION distributed_triggers.bad_shardkey_record_change();
|
||||
|
||||
-- Bad trigger fired from an individual SQL
|
||||
-- Query-on-distributed table exception should catch this
|
||||
INSERT INTO data VALUES ('hello6','world6','{"hello6":"world6"}');
|
||||
|
||||
-- Bad trigger fired from SQL inside a force-delegated function
|
||||
-- Incorrect distribution key exception should catch this
|
||||
SELECT insert_delete_document('hello6', 'world6');
|
||||
|
||||
SELECT * FROM data
|
||||
ORDER BY shard_key_value, object_id;
|
||||
SELECT shard_key_value, object_id, change_id, operation_type, new_value
|
||||
FROM data_changes
|
||||
ORDER BY shard_key_value, object_id, change_id;
|
||||
DROP TRIGGER bad_shardkey_record_change_trigger ON data;
|
||||
|
||||
CREATE OR REPLACE FUNCTION remote_shardkey_record_change()
|
||||
RETURNS trigger
|
||||
AS $$
|
||||
DECLARE
|
||||
last_change_id bigint;
|
||||
BEGIN
|
||||
UPDATE distributed_triggers.data_changes SET operation_type = TG_OP;
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER remote_shardkey_record_change_trigger
|
||||
AFTER INSERT OR UPDATE OR DELETE ON data
|
||||
FOR EACH ROW EXECUTE FUNCTION distributed_triggers.remote_shardkey_record_change();
|
||||
|
||||
CREATE FUNCTION insert_document(key text, id text)
|
||||
RETURNS void LANGUAGE plpgsql AS $fn$
|
||||
BEGIN
|
||||
INSERT INTO distributed_triggers.data VALUES (key, id, '{"id1":"id2"}');
|
||||
DELETE FROM distributed_triggers.data where shard_key_value = key;
|
||||
END;
|
||||
$fn$;
|
||||
|
||||
SELECT create_distributed_function(
|
||||
'insert_document(text, text)', 'key',
|
||||
colocate_with := 'data',
|
||||
force_delegation := false
|
||||
);
|
||||
|
||||
BEGIN;
|
||||
SELECT insert_document('hello7', 'world7');
|
||||
END;
|
||||
|
||||
SELECT insert_document('hello7', 'world7');
|
||||
|
||||
SELECT * FROM data
|
||||
ORDER BY shard_key_value, object_id;
|
||||
SELECT shard_key_value, object_id, change_id, operation_type, new_value
|
||||
FROM data_changes
|
||||
ORDER BY shard_key_value, object_id, change_id;
|
||||
|
||||
--
|
||||
-- Triggers (tables) which are not colocated
|
||||
--
|
||||
CREATE TABLE emptest (
|
||||
empname text NOT NULL,
|
||||
salary integer
|
||||
);
|
||||
|
||||
CREATE TABLE emptest_audit(
|
||||
operation char(1) NOT NULL,
|
||||
stamp timestamp NOT NULL,
|
||||
userid text NOT NULL,
|
||||
empname text NOT NULL,
|
||||
salary integer
|
||||
);
|
||||
|
||||
SELECT create_distributed_table('emptest','empname',colocate_with :='none');
|
||||
SELECT create_distributed_table('emptest_audit','empname',colocate_with :='none');
|
||||
|
||||
CREATE OR REPLACE FUNCTION process_emp_audit() RETURNS TRIGGER AS $emp_audit$
|
||||
BEGIN
|
||||
--
|
||||
-- Create a row in emp_audit to reflect the operation performed on emp,
|
||||
-- making use of the special variable TG_OP to work out the operation.
|
||||
--
|
||||
IF (TG_OP = 'DELETE') THEN
|
||||
INSERT INTO distributed_triggers.emptest_audit SELECT 'D', now(), user, OLD.*;
|
||||
ELSIF (TG_OP = 'UPDATE') THEN
|
||||
INSERT INTO distributed_triggers.emptest_audit SELECT 'U', now(), user, NEW.*;
|
||||
ELSIF (TG_OP = 'INSERT') THEN
|
||||
INSERT INTO distributed_triggers.emptest_audit SELECT 'I', now(), user, NEW.*;
|
||||
END IF;
|
||||
RETURN NULL; -- result is ignored since this is an AFTER trigger
|
||||
END;
|
||||
$emp_audit$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER emptest_audit
|
||||
AFTER INSERT OR UPDATE OR DELETE ON emptest
|
||||
FOR EACH ROW EXECUTE FUNCTION distributed_triggers.process_emp_audit();
|
||||
|
||||
INSERT INTO emptest VALUES ('test1', 1);
|
||||
INSERT INTO emptest VALUES ('test2', 1);
|
||||
INSERT INTO emptest VALUES ('test3', 1);
|
||||
INSERT INTO emptest VALUES ('test4', 1);
|
||||
|
||||
SELECT operation, userid, empname, salary
|
||||
FROM emptest_audit
|
||||
ORDER BY 3,1;
|
||||
|
||||
DELETE from emptest;
|
||||
|
||||
SELECT operation, userid, empname, salary
|
||||
FROM emptest_audit
|
||||
ORDER BY 3,1;
|
||||
|
||||
CREATE VIEW emp_triggers AS
|
||||
SELECT tgname, tgrelid::regclass, tgenabled
|
||||
FROM pg_trigger
|
||||
WHERE tgrelid::regclass::text like 'emptest%'
|
||||
ORDER BY 1, 2;
|
||||
SELECT * FROM emp_triggers ORDER BY 1,2;
|
||||
|
||||
-- Triggers "FOR EACH STATEMENT"
|
||||
CREATE TABLE record_op (
|
||||
empname text NOT NULL,
|
||||
operation_type text not null,
|
||||
stamp timestamp NOT NULL
|
||||
);
|
||||
SELECT create_distributed_table('record_op', 'empname', colocate_with := 'emptest');
|
||||
CREATE OR REPLACE FUNCTION record_emp() RETURNS TRIGGER AS $rec_audit$
|
||||
BEGIN
|
||||
INSERT INTO distributed_triggers.record_op SELECT 'dummy', TG_OP, now();
|
||||
RETURN NULL; -- result is ignored since this is an AFTER trigger
|
||||
END;
|
||||
$rec_audit$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER record_emp_trig
|
||||
AFTER INSERT OR UPDATE OR DELETE ON emptest
|
||||
FOR EACH STATEMENT EXECUTE FUNCTION distributed_triggers.record_emp();
|
||||
|
||||
INSERT INTO emptest VALUES ('test5', 1);
|
||||
DELETE FROM emptest;
|
||||
SELECT * FROM emptest;
|
||||
SELECT operation_type FROM record_op;
|
||||
|
||||
--
|
||||
-- Triggers on reference tables
|
||||
--
|
||||
CREATE TABLE data_ref_table (
|
||||
shard_key_value text not null,
|
||||
object_id text not null,
|
||||
value jsonb not null
|
||||
);
|
||||
ALTER TABLE data_ref_table
|
||||
ADD CONSTRAINT data_ref_pk
|
||||
PRIMARY KEY (shard_key_value, object_id);
|
||||
SELECT create_reference_table('data_ref_table');
|
||||
|
||||
-- Trigger function record_change operates on data_changes which is *not* colocated with the reference table
|
||||
CREATE TRIGGER record_change_trigger
|
||||
AFTER INSERT OR UPDATE OR DELETE ON data_ref_table
|
||||
FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_change();
|
||||
|
||||
TRUNCATE TABLE data_changes;
|
||||
INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}');
|
||||
INSERT INTO data_ref_table VALUES ('hello2','world2','{"ref":"table"}');
|
||||
DELETE FROM data_ref_table where shard_key_value = 'hello';
|
||||
BEGIN;
|
||||
UPDATE data_ref_table SET value = '{}' where shard_key_value = 'hello2';
|
||||
END;
|
||||
TABLE data_changes ORDER BY shard_key_value, object_id, change_id;
|
||||
TABLE data_ref_table ORDER BY shard_key_value, object_id;
|
||||
|
||||
-- Colocate data_changes table with reference table
|
||||
SELECT undistribute_table('data_changes');
|
||||
SELECT create_reference_table('data_changes');
|
||||
|
||||
INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}');
|
||||
TABLE data_changes ORDER BY shard_key_value, object_id, change_id;
|
||||
TABLE data_ref_table ORDER BY shard_key_value, object_id;
|
||||
|
||||
-- Create data_changes table locally with reference table
|
||||
DROP TABLE data_changes;
|
||||
|
||||
/* table of changes local to each placement of the reference table */
|
||||
CREATE TABLE data_changes (
|
||||
shard_key_value text not null,
|
||||
object_id text not null,
|
||||
change_id bigint not null,
|
||||
change_time timestamptz default now(),
|
||||
operation_type text not null,
|
||||
new_value jsonb
|
||||
);
|
||||
SELECT run_command_on_workers($$CREATE TABLE distributed_triggers.data_changes(
|
||||
shard_key_value text not null,
|
||||
object_id text not null,
|
||||
change_id bigint not null,
|
||||
change_time timestamptz default now(),
|
||||
operation_type text not null,
|
||||
new_value jsonb);
|
||||
$$);
|
||||
|
||||
SELECT run_command_on_workers('SELECT count(*) FROM distributed_triggers.data_changes;');
|
||||
|
||||
INSERT INTO data_ref_table VALUES ('hello','world','{"ref":"table"}');
|
||||
INSERT INTO data_ref_table VALUES ('hello2','world2','{"ref":"table"}');
|
||||
BEGIN;
|
||||
UPDATE data_ref_table SET value = '{}';
|
||||
END;
|
||||
|
||||
SELECT run_command_on_workers('SELECT count(*) FROM distributed_triggers.data_changes;');
|
||||
TABLE data_ref_table ORDER BY shard_key_value, object_id;
|
||||
|
||||
--
|
||||
--Triggers on partitioned tables
|
||||
--
|
||||
CREATE TABLE sale(sale_date date not null, state_code text, product_sku text, units integer)
|
||||
PARTITION BY list (state_code);
|
||||
ALTER TABLE sale ADD CONSTRAINT sale_pk PRIMARY KEY (state_code, sale_date);
|
||||
CREATE TABLE sale_newyork PARTITION OF sale FOR VALUES IN ('NY');
|
||||
CREATE TABLE sale_california PARTITION OF sale FOR VALUES IN ('CA');
|
||||
|
||||
CREATE TABLE record_sale(operation_type text not null, product_sku text, state_code text);
|
||||
|
||||
SELECT create_distributed_table('sale', 'state_code');
|
||||
SELECT create_distributed_table('record_sale', 'state_code', colocate_with := 'sale');
|
||||
|
||||
CREATE OR REPLACE FUNCTION record_sale()
|
||||
RETURNS trigger
|
||||
AS $$
|
||||
BEGIN
|
||||
INSERT INTO distributed_triggers.record_sale(operation_type, product_sku, state_code)
|
||||
VALUES (TG_OP, NEW.product_sku, NEW.state_code);
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE TRIGGER record_sale_trigger
|
||||
AFTER INSERT OR UPDATE OR DELETE ON sale
|
||||
FOR EACH ROW EXECUTE FUNCTION distributed_triggers.record_sale();
|
||||
|
||||
INSERT INTO sale(sale_date,state_code,product_sku,units) VALUES
|
||||
('2019-01-01', 'CA', 'AZ-000A1', 85),
|
||||
('2019-01-02', 'CA', 'AZ-000A1', 6),
|
||||
('2019-01-03', 'NY', 'AZ-000A2', 83),
|
||||
('2019-02-01', 'CA', 'AZ-000A2', 59),
|
||||
('2019-02-02', 'CA', 'AZ-000A1', 9),
|
||||
('2019-02-03', 'NY', 'AZ-000A1', 47);
|
||||
|
||||
TABLE sale ORDER BY state_code, sale_date;
|
||||
TABLE record_sale ORDER BY 1,2,3;
|
||||
|
||||
--
|
||||
--Test ALTER TRIGGER
|
||||
--
|
||||
CREATE VIEW sale_triggers AS
|
||||
SELECT tgname, tgrelid::regclass, tgenabled
|
||||
FROM pg_trigger
|
||||
WHERE tgrelid::regclass::text like 'sale%'
|
||||
ORDER BY 1, 2;
|
||||
|
||||
SELECT * FROM sale_triggers ORDER BY 1,2;
|
||||
ALTER TRIGGER "record_sale_trigger" ON "distributed_triggers"."sale" RENAME TO "new_record_sale_trigger";
|
||||
SELECT * FROM sale_triggers ORDER BY 1,2;
|
||||
|
||||
CREATE EXTENSION seg;
|
||||
ALTER TRIGGER "emptest_audit" ON "emptest" DEPENDS ON EXTENSION seg;
|
||||
|
||||
DROP TABLE data_ref_table;
|
||||
--
|
||||
--Triggers with add/remove node
|
||||
--
|
||||
SELECT * FROM master_drain_node('localhost', :worker_2_port);
|
||||
SELECT 1 from master_remove_node('localhost', :worker_2_port);
|
||||
|
||||
CREATE TABLE distributed_table(value int);
|
||||
CREATE TABLE distributed_table_change(value int);
|
||||
|
||||
SELECT create_distributed_table('distributed_table', 'value', colocate_with => 'none');
|
||||
SELECT create_distributed_table('distributed_table_change', 'value', colocate_with => 'distributed_table');
|
||||
|
||||
CREATE FUNCTION insert_99() RETURNS trigger AS $insert_99$
|
||||
BEGIN
|
||||
INSERT INTO distributed_triggers.distributed_table_change VALUES (99);
|
||||
RETURN NEW;
|
||||
END;
|
||||
$insert_99$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER insert_99_trigger
|
||||
AFTER DELETE ON distributed_table
|
||||
FOR EACH ROW EXECUTE FUNCTION distributed_triggers.insert_99();
|
||||
|
||||
SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'insert_99_trigger%' ORDER BY 1,2;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'insert_99_trigger%'$$);
|
||||
|
||||
INSERT INTO distributed_table VALUES (99);
|
||||
DELETE FROM distributed_table;
|
||||
SELECT * FROM distributed_table_change;
|
||||
|
||||
-- add the node back
|
||||
SELECT 1 from master_add_node('localhost', :worker_2_port);
|
||||
INSERT INTO distributed_table VALUES (99);
|
||||
DELETE FROM distributed_table;
|
||||
SELECT * FROM distributed_table_change;
|
||||
|
||||
SELECT tgrelid::regclass::text, tgname FROM pg_trigger WHERE tgname like 'insert_99_trigger%' ORDER BY 1,2;
|
||||
SELECT run_command_on_workers($$SELECT count(*) FROM pg_trigger WHERE tgname like 'insert_99_trigger%'$$);
|
||||
|
||||
RESET client_min_messages;
|
||||
SET citus.enable_unsafe_triggers TO false;
|
||||
SELECT run_command_on_workers('ALTER SYSTEM SET citus.enable_unsafe_triggers TO false;');
|
||||
SELECT run_command_on_workers('SELECT pg_reload_conf();');
|
||||
SET citus.log_remote_commands TO off;
|
||||
|
||||
DROP SCHEMA distributed_triggers CASCADE;
|
Loading…
Reference in New Issue