mirror of https://github.com/citusdata/citus.git
Implement methods to process & recreate triggers on citus tables
parent
5af64084ea
commit
6e6bc155a9
|
@ -0,0 +1,141 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
* trigger.c
|
||||||
|
*
|
||||||
|
* This file contains functions to create and process trigger objects on
|
||||||
|
* citus tables.
|
||||||
|
*
|
||||||
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "distributed/pg_version_constants.h"
|
||||||
|
|
||||||
|
#include "access/genam.h"
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_12
|
||||||
|
#include "access/table.h"
|
||||||
|
#else
|
||||||
|
#include "access/heapam.h"
|
||||||
|
#include "access/htup_details.h"
|
||||||
|
#endif
|
||||||
|
#include "catalog/indexing.h"
|
||||||
|
#include "catalog/namespace.h"
|
||||||
|
#include "catalog/pg_trigger.h"
|
||||||
|
#include "distributed/citus_ruleutils.h"
|
||||||
|
#include "distributed/commands.h"
|
||||||
|
#include "distributed/listutils.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "utils/fmgroids.h"
|
||||||
|
#include "utils/lsyscache.h"
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetExplicitTriggerCommandList returns the list of DDL commands to create
|
||||||
|
* triggers that are explicitly created for the table with relationId. See
|
||||||
|
* comment of GetExplicitTriggerIdList function.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
GetExplicitTriggerCommandList(Oid relationId)
|
||||||
|
{
|
||||||
|
List *createTriggerCommandList = NIL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Set search_path to NIL so that all objects outside of pg_catalog will be
|
||||||
|
* schema-prefixed. pg_catalog will be added automatically when we call
|
||||||
|
* PushOverrideSearchPath(), since we set addCatalog to true;
|
||||||
|
*/
|
||||||
|
OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext);
|
||||||
|
overridePath->schemas = NIL;
|
||||||
|
overridePath->addCatalog = true;
|
||||||
|
PushOverrideSearchPath(overridePath);
|
||||||
|
|
||||||
|
List *triggerIdList = GetExplicitTriggerIdList(relationId);
|
||||||
|
|
||||||
|
Oid triggerId = InvalidOid;
|
||||||
|
foreach_oid(triggerId, triggerIdList)
|
||||||
|
{
|
||||||
|
char *createTriggerCommand = pg_get_triggerdef_command(triggerId);
|
||||||
|
|
||||||
|
createTriggerCommandList = lappend(createTriggerCommandList,
|
||||||
|
createTriggerCommand);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* revert back to original search_path */
|
||||||
|
PopOverrideSearchPath();
|
||||||
|
|
||||||
|
return createTriggerCommandList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetExplicitTriggerIdList returns a list of OIDs corresponding to the triggers
|
||||||
|
* that are explicitly created on the relation with relationId. That means,
|
||||||
|
* this function discards internal triggers implicitly created by postgres for
|
||||||
|
* foreign key constraint validation and the citus_truncate_trigger.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
GetExplicitTriggerIdList(Oid relationId)
|
||||||
|
{
|
||||||
|
List *triggerIdList = NIL;
|
||||||
|
|
||||||
|
Relation pgTrigger = heap_open(TriggerRelationId, AccessShareLock);
|
||||||
|
|
||||||
|
int scanKeyCount = 1;
|
||||||
|
ScanKeyData scanKey[1];
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_pg_trigger_tgrelid,
|
||||||
|
BTEqualStrategyNumber, F_OIDEQ, relationId);
|
||||||
|
|
||||||
|
bool useIndex = true;
|
||||||
|
SysScanDesc scanDescriptor = systable_beginscan(pgTrigger, TriggerRelidNameIndexId,
|
||||||
|
useIndex, NULL, scanKeyCount,
|
||||||
|
scanKey);
|
||||||
|
|
||||||
|
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
while (HeapTupleIsValid(heapTuple))
|
||||||
|
{
|
||||||
|
Form_pg_trigger triggerForm = (Form_pg_trigger) GETSTRUCT(heapTuple);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Note that we mark truncate trigger that we create on citus tables as
|
||||||
|
* internal. Hence, below we discard citus_truncate_trigger as well as
|
||||||
|
* the implicit triggers created by postgres for foreign key validation.
|
||||||
|
*/
|
||||||
|
if (!triggerForm->tgisinternal)
|
||||||
|
{
|
||||||
|
Oid triggerId = get_relation_trigger_oid_compat(heapTuple);
|
||||||
|
triggerIdList = lappend_oid(triggerIdList, triggerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
}
|
||||||
|
|
||||||
|
systable_endscan(scanDescriptor);
|
||||||
|
heap_close(pgTrigger, NoLock);
|
||||||
|
|
||||||
|
return triggerIdList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* get_relation_trigger_oid_compat returns OID of the trigger represented
|
||||||
|
* by the constraintForm, which is passed as an heapTuple. OID of the
|
||||||
|
* trigger is already stored in the triggerForm struct if major PostgreSQL
|
||||||
|
* version is 12. However, in the older versions, we should utilize
|
||||||
|
* HeapTupleGetOid to deduce that OID with no cost.
|
||||||
|
*/
|
||||||
|
Oid
|
||||||
|
get_relation_trigger_oid_compat(HeapTuple heapTuple)
|
||||||
|
{
|
||||||
|
Assert(HeapTupleIsValid(heapTuple));
|
||||||
|
|
||||||
|
Oid triggerOid = InvalidOid;
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_12
|
||||||
|
Form_pg_trigger triggerForm = (Form_pg_trigger) GETSTRUCT(heapTuple);
|
||||||
|
triggerOid = triggerForm->oid;
|
||||||
|
#else
|
||||||
|
triggerOid = HeapTupleGetOid(heapTuple);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
return triggerOid;
|
||||||
|
}
|
|
@ -7485,6 +7485,16 @@ get_tablesample_def(TableSampleClause *tablesample, deparse_context *context)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char *
|
||||||
|
pg_get_triggerdef_command(Oid triggerId)
|
||||||
|
{
|
||||||
|
Assert(OidIsValid(triggerId));
|
||||||
|
|
||||||
|
/* no need to have pretty SQL command */
|
||||||
|
bool prettyOutput = false;
|
||||||
|
return pg_get_triggerdef_worker(triggerId, prettyOutput);
|
||||||
|
}
|
||||||
|
|
||||||
static char *
|
static char *
|
||||||
pg_get_triggerdef_worker(Oid trigid, bool pretty)
|
pg_get_triggerdef_worker(Oid trigid, bool pretty)
|
||||||
{
|
{
|
||||||
|
|
|
@ -7485,6 +7485,16 @@ get_tablesample_def(TableSampleClause *tablesample, deparse_context *context)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char *
|
||||||
|
pg_get_triggerdef_command(Oid triggerId)
|
||||||
|
{
|
||||||
|
Assert(OidIsValid(triggerId));
|
||||||
|
|
||||||
|
/* no need to have pretty SQL command */
|
||||||
|
bool prettyOutput = false;
|
||||||
|
return pg_get_triggerdef_worker(triggerId, prettyOutput);
|
||||||
|
}
|
||||||
|
|
||||||
static char *
|
static char *
|
||||||
pg_get_triggerdef_worker(Oid trigid, bool pretty)
|
pg_get_triggerdef_worker(Oid trigid, bool pretty)
|
||||||
{
|
{
|
||||||
|
|
|
@ -522,7 +522,7 @@ ResolveRelationId(text *relationName, bool missingOk)
|
||||||
* DEFAULT clauses for columns getting their default values from a sequence.
|
* DEFAULT clauses for columns getting their default values from a sequence.
|
||||||
* These DDL commands are all palloced; and include the table's schema
|
* These DDL commands are all palloced; and include the table's schema
|
||||||
* definition, optional column storage and statistics definitions, and index
|
* definition, optional column storage and statistics definitions, and index
|
||||||
* and constraint definitions.
|
* constraint and trigger definitions.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
|
GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
|
||||||
|
@ -542,6 +542,9 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
|
||||||
List *policyCommands = CreatePolicyCommands(relationId);
|
List *policyCommands = CreatePolicyCommands(relationId);
|
||||||
tableDDLEventList = list_concat(tableDDLEventList, policyCommands);
|
tableDDLEventList = list_concat(tableDDLEventList, policyCommands);
|
||||||
|
|
||||||
|
List *triggerCommands = GetExplicitTriggerCommandList(relationId);
|
||||||
|
tableDDLEventList = list_concat(tableDDLEventList, triggerCommands);
|
||||||
|
|
||||||
return tableDDLEventList;
|
return tableDDLEventList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,6 +48,7 @@ extern void pg_get_query_def(Query *query, StringInfo buffer);
|
||||||
char * pg_get_rule_expr(Node *expression);
|
char * pg_get_rule_expr(Node *expression);
|
||||||
extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid,
|
extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid,
|
||||||
StringInfo buffer);
|
StringInfo buffer);
|
||||||
|
extern char * pg_get_triggerdef_command(Oid triggerId);
|
||||||
extern char * generate_relation_name(Oid relid, List *namespaces);
|
extern char * generate_relation_name(Oid relid, List *namespaces);
|
||||||
extern char * generate_qualified_relation_name(Oid relid);
|
extern char * generate_qualified_relation_name(Oid relid);
|
||||||
extern char * generate_operator_name(Oid operid, Oid arg1, Oid arg2);
|
extern char * generate_operator_name(Oid operid, Oid arg1, Oid arg2);
|
||||||
|
|
|
@ -274,6 +274,11 @@ extern ObjectWithArgs * ObjectWithArgsFromOid(Oid funcOid);
|
||||||
/* vacuum.c - forward declarations */
|
/* vacuum.c - forward declarations */
|
||||||
extern void PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
|
extern void PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
|
||||||
|
|
||||||
|
/* trigger.c - forward declarations */
|
||||||
|
extern List * GetExplicitTriggerCommandList(Oid relationId);
|
||||||
|
extern List * GetExplicitTriggerIdList(Oid relationId);
|
||||||
|
extern Oid get_relation_trigger_oid_compat(HeapTuple heapTuple);
|
||||||
|
|
||||||
extern bool ShouldPropagateSetCommand(VariableSetStmt *setStmt);
|
extern bool ShouldPropagateSetCommand(VariableSetStmt *setStmt);
|
||||||
extern void PostprocessVariableSetStmt(VariableSetStmt *setStmt, const char *setCommand);
|
extern void PostprocessVariableSetStmt(VariableSetStmt *setStmt, const char *setCommand);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue