Sait Talha Nisanci 2021-01-19 10:49:42 +03:00
parent 720008c918
commit bc747206d5
3 changed files with 359 additions and 104 deletions

View File

@ -9,17 +9,21 @@
#include "catalog/pg_operator.h" #include "catalog/pg_operator.h"
#include "catalog/pg_proc.h" #include "catalog/pg_proc.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "catalog/pg_trigger.h"
#include "catalog/toasting.h" #include "catalog/toasting.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "commands/tablecmds.h" #include "commands/tablecmds.h"
#include "commands/trigger.h" #include "commands/trigger.h"
#include "commands/view.h" #include "commands/view.h"
#include "distributed/commands.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/pg_cimv.h" #include "distributed/pg_cimv.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/security_utils.h" #include "distributed/security_utils.h"
#include "distributed/sequence_utils.h" #include "distributed/sequence_utils.h"
#include "distributed/worker_protocol.h"
#include "distributed/multi_executor.h"
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "executor/spi.h" #include "executor/spi.h"
#include "miscadmin.h" #include "miscadmin.h"
@ -50,6 +54,7 @@ typedef struct
typedef struct typedef struct
{ {
const CreateTableAsStmt *stmt; const CreateTableAsStmt *stmt;
const char* queryString;
MatViewCreateOptions *createOptions; MatViewCreateOptions *createOptions;
Form_pg_cimv formCimv; Form_pg_cimv formCimv;
RangeVar *baseTableName; RangeVar *baseTableName;
@ -62,6 +67,7 @@ typedef struct
RangeVar *landingTableNameQuoted; RangeVar *landingTableNameQuoted;
RangeVar *userViewNameQuoted; RangeVar *userViewNameQuoted;
RangeVar *refreshViewNameQuoted; RangeVar *refreshViewNameQuoted;
RangeVar* insertTable;
List *targetListEntries; List *targetListEntries;
List *groupTargetListEntries; List *groupTargetListEntries;
List *aggTargetListEntries; List *aggTargetListEntries;
@ -70,6 +76,9 @@ typedef struct
bool supportsDelete; bool supportsDelete;
char* prefix; char* prefix;
int prefixId; int prefixId;
List* triggerNameList;
MemoryContext memoryContext;
} CimvCreate; } CimvCreate;
static void CreateCimv(CimvCreate *cimvCreate); static void CreateCimv(CimvCreate *cimvCreate);
@ -80,8 +89,9 @@ static void CreateUserView(CimvCreate *cimvCreate);
static void CreateRefreshView(CimvCreate *cimvCreate); static void CreateRefreshView(CimvCreate *cimvCreate);
static void CreateDataChangeTriggerFunction(CimvCreate *cimvCreate); static void CreateDataChangeTriggerFunction(CimvCreate *cimvCreate);
static void CreateCronJob(CimvCreate *cimvCreate); static void CreateCronJob(CimvCreate *cimvCreate);
static void DataChangeTriggerFunctionAppendInsertDelete(CimvCreate *cimvCreate, StringInfo static char* DataChangeTriggerInsertDeleteQueryString(CimvCreate *cimvCreate, bool isInsert,
buf, bool isInsert); char* insertTableName, char* newTableName);
static char* DataChangeTriggerTruncateQueryString(char* insertTableName);
static void DataChangeTriggerFunctionAppendErrorOnDelete(CimvCreate *cimvCreate, static void DataChangeTriggerFunctionAppendErrorOnDelete(CimvCreate *cimvCreate,
StringInfo buf); StringInfo buf);
static void AppendOnConflict(CimvCreate *cimvCreate, StringInfo buf, bool isInsert); static void AppendOnConflict(CimvCreate *cimvCreate, StringInfo buf, bool isInsert);
@ -93,7 +103,8 @@ static void ValidateAgg(Aggref *agg, bool *supportsDelete);
static void AddCountAgg(Query *query, bool isInsert); static void AddCountAgg(Query *query, bool isInsert);
static Node * PartializeAggs(Node *node, void *context); static Node * PartializeAggs(Node *node, void *context);
static CimvCreate * InitializeCimvCreate(const CreateTableAsStmt *stmt, static CimvCreate * InitializeCimvCreate(const CreateTableAsStmt *stmt,
MatViewCreateOptions *createOptions); MatViewCreateOptions *createOptions,
const char* query_string, char* prefix);
static MatViewCreateOptions * GetMatViewCreateOptions(const CreateTableAsStmt *stmt); static MatViewCreateOptions * GetMatViewCreateOptions(const CreateTableAsStmt *stmt);
static ObjectAddress DefineVirtualRelation(RangeVar *relation, List *tlist, static ObjectAddress DefineVirtualRelation(RangeVar *relation, List *tlist,
@ -106,81 +117,211 @@ static Oid AggregateFunctionOid(const char *functionName, Oid inputType);
static char* CIMVTriggerFuncName(int prefixId, const char* relname); static char* CIMVTriggerFuncName(int prefixId, const char* relname);
static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId); static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId);
static void AlterTableOwner(RangeVar* tableName, char* ownerName); static void AlterTableOwner(RangeVar* tableName, char* ownerName);
static void CreateDependencyFromTriggersToView(Oid baseRelationId, List* triggerNameList, Oid userViewId);
static CreateTableAsStmt* ParseQueryStringToCreateTableAsStmt(const char* queryString);
static char* InsertTableName(RangeVar* insertTable);
static void CreateDependenciesFromTriggersToView(CimvCreate* cimvCreate);
static void CheckSPIResultForColocatedRun(void);
extern Datum trigf(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(cimv_trigger);
PG_FUNCTION_INFO_V1(worker_record_trigger_dependency);
PG_FUNCTION_INFO_V1(trigf); static char *str_replace(char *orig, char *rep, char *with);
Datum Datum
trigf(PG_FUNCTION_ARGS) worker_record_trigger_dependency(PG_FUNCTION_ARGS)
{ {
TriggerData *trigdata = (TriggerData *) fcinfo->context; Oid relationOid = PG_GETARG_OID(0);
Trigger* trigger = trigdata->tg_trigger; Oid userViewOid = PG_GETARG_OID(1);
TupleDesc tupdesc; char* triggerName = text_to_cstring(PG_GETARG_TEXT_P(2));
HeapTuple rettuple;
char *when;
bool checknull = false;
bool isnull;
int ret, i;
CreateDependencyFromTriggersToView(relationOid, list_make1(triggerName), userViewOid);
PG_RETURN_VOID();
}
Datum cimv_trigger(PG_FUNCTION_ARGS)
{
/* make sure it's called as a trigger at all */ /* make sure it's called as a trigger at all */
if (!CALLED_AS_TRIGGER(fcinfo)) if (!CALLED_AS_TRIGGER(fcinfo))
elog(ERROR, "trigf: not called by trigger manager"); elog(ERROR, "trigf: not called by trigger manager");
/* tuple to return to executor */ TriggerData *trigdata = (TriggerData *) fcinfo->context;
if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) Trigger* trigger = trigdata->tg_trigger;
rettuple = trigdata->tg_newtuple; TupleDesc tupdesc;
else HeapTuple rettuple;
rettuple = trigdata->tg_trigtuple; char *when;
if (trigger == NULL || trigger->tgnargs < 2) {
if (TRIGGER_FOR_INSERT(trigger->tgtype)) { elog(ERROR, "cimv_trigger: should be called with at least two arguments");
} }
/* check for null values */ List* queryStringList = NIL;
if (!TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)
&& TRIGGER_FIRED_BEFORE(trigdata->tg_event))
checknull = true;
if (TRIGGER_FIRED_BEFORE(trigdata->tg_event)) char* createViewQueryString = trigger->tgargs[0];
when = "before"; char* prefix = trigger->tgargs[1];
else
when = "after "; if (trigger->tgnargs >= 3) {
char* baseTableName = trigger->tgargs[2];
char* relName = get_rel_name(trigdata->tg_relation->rd_id);
StringInfo first = makeStringInfo();
appendStringInfo(first, "FROM %s", baseTableName);
StringInfo second = makeStringInfo();
appendStringInfo(second, "FROM %s", relName);
createViewQueryString = str_replace(createViewQueryString, first->data, second->data);
}
CreateTableAsStmt* stmt = ParseQueryStringToCreateTableAsStmt(createViewQueryString);
MatViewCreateOptions *options = GetMatViewCreateOptions(stmt);
CimvCreate* cimvCreate = InitializeCimvCreate(stmt, options, NULL, prefix);
/* we dont really need to validate but this also sets some fields, we should possibly not set
any fields in a validation */
ValidateCimv(cimvCreate);
char* insertTableName = NULL;
if (trigger->tgnargs >= 4) {
insertTableName = trigger->tgargs[3];
}else {
RangeVar *insertTable = cimvCreate->createOptions->schedule == NULL ?
cimvCreate->matTableNameQuoted :
cimvCreate->landingTableNameQuoted;
insertTableName = InsertTableName(insertTable);
}
if (TRIGGER_FOR_INSERT(trigger->tgtype) || TRIGGER_FOR_UPDATE(trigger->tgtype)) {
char* newTableName = trigger->tgnewtable;
char* insertDeleteQueryString =
DataChangeTriggerInsertDeleteQueryString(cimvCreate, true, insertTableName, newTableName);
queryStringList = lappend(queryStringList, insertDeleteQueryString);
}
if (TRIGGER_FOR_DELETE(trigger->tgtype) || TRIGGER_FOR_UPDATE(trigger->tgtype)) {
if (cimvCreate->supportsDelete)
{
char* oldTableName = trigger->tgoldtable;
char* insertDeleteQueryString =
DataChangeTriggerInsertDeleteQueryString(cimvCreate, false, insertTableName, oldTableName);
queryStringList = lappend(queryStringList, insertDeleteQueryString);
}
else
{
elog(ERROR,
"MATERIALIZED VIEW %s on table %s does not support UPDATE/DELETE",
cimvCreate->userViewNameQuoted->relname,
cimvCreate->baseTableNameQuoted->relname);
}
}
if (TRIGGER_FOR_TRUNCATE(trigger->tgtype)) {
char* truncateQueryString =
DataChangeTriggerTruncateQueryString(insertTableName);
queryStringList = lappend(queryStringList, truncateQueryString);
}
/* tuple to return to executor */
if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) {
rettuple = trigdata->tg_newtuple;
}
else {
rettuple = trigdata->tg_trigtuple;
}
tupdesc = trigdata->tg_relation->rd_att; tupdesc = trigdata->tg_relation->rd_att;
/* connect to SPI manager */ /* connect to SPI manager */
if ((ret = SPI_connect()) < 0) if ( SPI_connect() < 0)
elog(INFO, "trigf (fired %s): SPI_connect returned %d", when, ret); elog(ERROR, "cimv_trigger: could not connect to SPI");
/* get number of rows in table */ /* make the temporary tables available to this transaction */
ret = SPI_exec("SELECT count(*) FROM ttest", 0); if (SPI_register_trigger_data(trigdata) < 0) {
elog(ERROR, "cimv_trigger: could not create temporary trigger tables");
}
if (ret < 0) char* queryString = NULL;
elog(NOTICE, "trigf (fired %s): SPI_exec returned %d", when, ret); foreach_ptr(queryString, queryStringList) {
if (SPI_execute(queryString, false, 0) < 0) {
elog(ERROR, "cimv_trigger: failed to run %s", queryString);
}
}
/* count(*) returns int8, so be careful to convert */ if (SPI_finish() != SPI_OK_FINISH)
i = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0], {
SPI_tuptable->tupdesc, elog(ERROR, "SPI_finish failed");
1, }
&isnull));
elog (INFO, "trigf (fired %s): there are %d rows in ttest", when, i);
SPI_finish();
if (checknull)
{
SPI_getbinval(rettuple, tupdesc, 1, &isnull);
if (isnull)
rettuple = NULL;
}
return PointerGetDatum(rettuple); return PointerGetDatum(rettuple);
} }
char *str_replace(char *orig, char *rep, char *with) {
char *result; // the return string
char *ins; // the next insert point
char *tmp; // varies
int len_rep; // length of rep (the string to remove)
int len_with; // length of with (the string to replace rep with)
int len_front; // distance between rep and end of last rep
int count; // number of replacements
// sanity checks and initialization
if (!orig || !rep)
return NULL;
len_rep = strlen(rep);
if (len_rep == 0)
return NULL; // empty rep causes infinite loop during count
if (!with)
with = "";
len_with = strlen(with);
// count the number of replacements needed
ins = orig;
for (count = 0; tmp = strstr(ins, rep); ++count) {
ins = tmp + len_rep;
}
tmp = result = malloc(strlen(orig) + (len_with - len_rep) * count + 1);
if (!result)
return NULL;
// first time through the loop, all the variable are set correctly
// from here on,
// tmp points to the end of the result string
// ins points to the next occurrence of rep in orig
// orig points to the remainder of orig after "end of rep"
while (count--) {
ins = strstr(orig, rep);
len_front = ins - orig;
tmp = strncpy(tmp, orig, len_front) + len_front;
tmp = strcpy(tmp, with) + len_with;
orig += len_front + len_rep; // move to next "end of rep"
}
strcpy(tmp, orig);
return result;
}
static CreateTableAsStmt* ParseQueryStringToCreateTableAsStmt(const char* queryString) {
Query *query = ParseQueryString(queryString, NULL, 0);
int cursorOptions = 0;
ParamListInfo paramListInfo = NULL;
PlannedStmt *plan =
linitial(pg_plan_queries(list_make1(query), queryString, cursorOptions, paramListInfo));
Node* parsetree = plan->utilityStmt;
if (!IsA(parsetree, CreateTableAsStmt)){
elog(ERROR, "cimv_trigger: the first argument should be a CreateTableAsStmt query string");
}
CreateTableAsStmt* stmt = (CreateTableAsStmt*) parsetree;
return stmt;
}
static char* InsertTableName(RangeVar* insertTable) {
StringInfo stringInfo = makeStringInfo();
appendStringInfo(stringInfo, "%s.%s", insertTable->schemaname, insertTable->relname);
return stringInfo->data;
}
bool bool
ProcessCreateMaterializedViewStmt(const CreateTableAsStmt *stmt, const char *query_string, ProcessCreateMaterializedViewStmt(const CreateTableAsStmt *stmt, const char *query_string,
@ -198,7 +339,8 @@ ProcessCreateMaterializedViewStmt(const CreateTableAsStmt *stmt, const char *que
if (options->isCimv) if (options->isCimv)
{ {
cimvCreate = InitializeCimvCreate(stmt, options); char* prefix = NULL; /* here we don't know the prefix so need to create it */
cimvCreate = InitializeCimvCreate(stmt, options, query_string, prefix);
ValidateCimv(cimvCreate); ValidateCimv(cimvCreate);
CreateCimv(cimvCreate); CreateCimv(cimvCreate);
@ -236,7 +378,7 @@ CreateCimv(CimvCreate *cimvCreate)
CreateUserView(cimvCreate); CreateUserView(cimvCreate);
CreateRefreshView(cimvCreate); CreateRefreshView(cimvCreate);
CreateDataChangeTriggerFunction(cimvCreate); // CreateDataChangeTriggerFunction(cimvCreate);
CreateDataChangeTriggers(cimvCreate); CreateDataChangeTriggers(cimvCreate);
InsertIntoPgCimv(cimvCreate->formCimv); InsertIntoPgCimv(cimvCreate->formCimv);
@ -250,6 +392,46 @@ CreateCimv(CimvCreate *cimvCreate)
RefreshCimv(cimvCreate->formCimv, cimvCreate->stmt->into->skipData, true); RefreshCimv(cimvCreate->formCimv, cimvCreate->stmt->into->skipData, true);
} }
CreateDependenciesFromTriggersToView(cimvCreate);
}
static void CreateDependenciesFromTriggersToView(CimvCreate* cimvCreate) {
if (cimvCreate->citusTable) {
if (SPI_connect() != SPI_OK_CONNECT)
{
elog(ERROR, "SPI_connect failed");
}
char* triggerName = NULL;
foreach_ptr(triggerName, cimvCreate->triggerNameList) {
StringInfo queryBuf = makeStringInfo();
appendStringInfo(queryBuf,
"SELECT bool_and(success), max(result) FROM run_command_on_colocated_placements($param$%s.%s$param$, $param$%s.%s$param$, $create_dependency$ ",
cimvCreate->baseTableNameQuoted->schemaname,
cimvCreate->baseTableNameQuoted->relname,
cimvCreate->insertTable->schemaname,
cimvCreate->insertTable->relname);
appendStringInfo(queryBuf,
"SELECT worker_record_trigger_dependency($base_table$%%s$base_table$, $insert_table$%%s$insert_table$, $trigger_name$%s$trigger_name$)",
triggerName);
appendStringInfoString(queryBuf, "$create_dependency$);");
if (SPI_execute(queryBuf->data, false, 0) != SPI_OK_SELECT)
{
elog(ERROR, "SPI_exec failed: %s", queryBuf->data);
}
CheckSPIResultForColocatedRun();
}
if (SPI_finish() != SPI_OK_FINISH)
{
elog(ERROR, "SPI_finish failed");
}
}else {
CreateDependencyFromTriggersToView(cimvCreate->formCimv->basetable,
cimvCreate->triggerNameList, cimvCreate->formCimv->userview);
}
} }
@ -682,7 +864,7 @@ CreateDataChangeTriggerFunction(CimvCreate *cimvCreate)
/* INSERT */ /* INSERT */
appendStringInfoString(&buf, appendStringInfoString(&buf,
"IF (TG_OP = $inside_trigger_function$INSERT$inside_trigger_function$ OR TG_OP = $inside_trigger_function$UPDATE$inside_trigger_function$) THEN\n"); "IF (TG_OP = $inside_trigger_function$INSERT$inside_trigger_function$ OR TG_OP = $inside_trigger_function$UPDATE$inside_trigger_function$) THEN\n");
DataChangeTriggerFunctionAppendInsertDelete(cimvCreate, &buf, true); // DataChangeTriggerInsertDeleteQueryString(cimvCreate, &buf, true);
appendStringInfoString(&buf, "END IF;\n"); appendStringInfoString(&buf, "END IF;\n");
/* DELETE */ /* DELETE */
@ -690,7 +872,7 @@ CreateDataChangeTriggerFunction(CimvCreate *cimvCreate)
"IF (TG_OP = $inside_trigger_function$DELETE$inside_trigger_function$ OR TG_OP = $inside_trigger_function$UPDATE$inside_trigger_function$) THEN\n"); "IF (TG_OP = $inside_trigger_function$DELETE$inside_trigger_function$ OR TG_OP = $inside_trigger_function$UPDATE$inside_trigger_function$) THEN\n");
if (cimvCreate->supportsDelete) if (cimvCreate->supportsDelete)
{ {
DataChangeTriggerFunctionAppendInsertDelete(cimvCreate, &buf, false); // DataChangeTriggerInsertDeleteQueryString(cimvCreate, &buf, false);
} }
else else
{ {
@ -738,13 +920,51 @@ CreateDataChangeTriggerFunction(CimvCreate *cimvCreate)
pfree(buf.data); pfree(buf.data);
} }
static char* DataChangeTriggerTruncateQueryString(char* insertTableName) {
StringInfo queryBuf = makeStringInfo();
appendStringInfo(queryBuf, "TRUNCATE TABLE %s", insertTableName);
/* TODO: also truncate landing table if it exists */
return queryBuf->data;
}
static void static void CreateDependencyFromTriggersToView(Oid baseRelationId, List* triggerNameList, Oid userViewId) {
DataChangeTriggerFunctionAppendInsertDelete(CimvCreate *cimvCreate, StringInfo buf, bool
isInsert) List* triggerIdList = GetExplicitTriggerIdList(baseRelationId);
char* targetTriggerName = NULL;
foreach_ptr(targetTriggerName, triggerNameList) {
Oid triggerId = InvalidOid;
foreach_oid(triggerId, triggerIdList)
{
bool missingOk = false;
HeapTuple triggerTuple = GetTriggerTupleById(triggerId, missingOk);
Form_pg_trigger triggerForm = (Form_pg_trigger) GETSTRUCT(triggerTuple);
char* name = NameStr(triggerForm->tgname);
if (namestrcmp(&triggerForm->tgname, targetTriggerName) == 0) {
ObjectAddress triggerAddr = {
.classId = TriggerRelationId,
.objectId = triggerId,
.objectSubId = 0
};
ObjectAddress userViewAddr = {
.classId = RelationRelationId,
.objectId = userViewId,
.objectSubId = 0
};
/* dependency from trigger to user view */
recordDependencyOn(&triggerAddr, &userViewAddr, DEPENDENCY_AUTO);
}
}
}
}
static char*
DataChangeTriggerInsertDeleteQueryString(CimvCreate *cimvCreate, bool
isInsert, char* insertTableName, char* newTableName)
{ {
StringInfoData querybuf; StringInfo viewQueryBuf = makeStringInfo();
initStringInfo(&querybuf); StringInfo queryBuf = makeStringInfo();
Query *query = (Query *) copyObject(cimvCreate->stmt->into->viewQuery); Query *query = (Query *) copyObject(cimvCreate->stmt->into->viewQuery);
int inverse = isInsert ? 0 : 1; int inverse = isInsert ? 0 : 1;
@ -753,31 +973,27 @@ DataChangeTriggerFunctionAppendInsertDelete(CimvCreate *cimvCreate, StringInfo b
RangeTblEntry *baseRte = (RangeTblEntry *) linitial(query->rtable); RangeTblEntry *baseRte = (RangeTblEntry *) linitial(query->rtable);
baseRte->rtekind = RTE_CTE; baseRte->rtekind = RTE_CTE;
baseRte->ctename = isInsert ? "__ins__" : "__del__"; baseRte->ctename = newTableName;
pg_get_query_def(query, &querybuf); pg_get_query_def(query, viewQueryBuf);
appendStringInfoString(buf, "EXECUTE format($exec_format$INSERT INTO %s AS __mat__ "); appendStringInfo(queryBuf, "INSERT INTO %s AS __mat__ %s", insertTableName, viewQueryBuf->data);
/* SELECT */
appendStringInfoString(buf, querybuf.data);
/* ON CONFLICT */ /* ON CONFLICT */
if (cimvCreate->createOptions->schedule == NULL) if (cimvCreate->createOptions->schedule == NULL)
{ {
AppendOnConflict(cimvCreate, buf, isInsert); AppendOnConflict(cimvCreate, queryBuf, isInsert);
} }
appendStringInfoString(buf, ";\n$exec_format$, TG_ARGV[0]);"); appendStringInfoChar(queryBuf, ';');
if (!isInsert && cimvCreate->createOptions->schedule == NULL) if (!isInsert && cimvCreate->createOptions->schedule == NULL)
{ {
appendStringInfoString(buf, appendStringInfo(queryBuf,
"EXECUTE format($exec_format$DELETE FROM %s WHERE __count__ = 0;$exec_format$, TG_ARGV[0]);"); "DELETE FROM %s WHERE __count__ = 0;", insertTableName);
} }
pfree(querybuf.data); return queryBuf->data;
pfree(query);
} }
@ -847,13 +1063,16 @@ CreateDataChangeTriggers(CimvCreate *cimvCreate)
static void static void
CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent) CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent)
{ {
StringInfoData buf; StringInfoData buf;
initStringInfo(&buf); initStringInfo(&buf);
bool isCitusTable = cimvCreate->citusTable != NULL; bool isCitusTable = cimvCreate->citusTable != NULL;
char *event;
char *referencing; char* event = NULL;
char* referencing = NULL;
switch (triggerEvent) switch (triggerEvent)
{ {
case TRIGGER_EVENT_INSERT: case TRIGGER_EVENT_INSERT:
@ -883,8 +1102,11 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent)
/* TODO: UPDATE [ OF column_name [, ... ] ] */ /* TODO: UPDATE [ OF column_name [, ... ] ] */
StringInfoData triggerName; StringInfoData triggerName;
initStringInfo(&triggerName); initStringInfo(&triggerName);
appendStringInfo(&triggerName, "%s_%s", NameStr(cimvCreate->formCimv->triggerfnname), appendStringInfo(&triggerName, "%s_%s", cimvCreate->prefix, event);
event);
MemoryContext oldMemoryContext = MemoryContextSwitchTo(cimvCreate->memoryContext);
cimvCreate->triggerNameList = lappend(cimvCreate->triggerNameList, pstrdup(triggerName.data));
MemoryContextSwitchTo(oldMemoryContext);
RangeVar *insertTable = cimvCreate->createOptions->schedule == NULL ? RangeVar *insertTable = cimvCreate->createOptions->schedule == NULL ?
cimvCreate->matTableNameQuoted : cimvCreate->matTableNameQuoted :
@ -901,13 +1123,16 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent)
appendStringInfo(&buf, appendStringInfo(&buf,
"CREATE TRIGGER %s AFTER %s ON %%s %s " "CREATE TRIGGER %s AFTER %s ON %%s %s "
"FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s(%%L)", "FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s($view_def$%s$view_def$, $prefix$%s$prefix$, $baserel$%s$baserel$, %%L )",
quote_identifier(triggerName.data), quote_identifier(triggerName.data),
event, event,
referencing, referencing,
quote_identifier(NameStr( quote_identifier(NameStr(
cimvCreate->formCimv->triggerfnnamespace)), cimvCreate->formCimv->triggerfnnamespace)),
quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname))); quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname)),
cimvCreate->queryString,
cimvCreate->prefix,
cimvCreate->baseTableNameQuoted->relname);
appendStringInfoString(&buf, "$create_trigger$);"); appendStringInfoString(&buf, "$create_trigger$);");
} }
@ -915,7 +1140,7 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent)
{ {
appendStringInfo(&buf, appendStringInfo(&buf,
"CREATE TRIGGER %s AFTER %s ON %s.%s %s " "CREATE TRIGGER %s AFTER %s ON %s.%s %s "
"FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s('%s.%s')", "FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s($view_def$%s$view_def$, $prefix$%s$prefix$)",
quote_identifier(triggerName.data), quote_identifier(triggerName.data),
event, event,
cimvCreate->baseTableNameQuoted->schemaname, cimvCreate->baseTableNameQuoted->schemaname,
@ -924,8 +1149,8 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent)
quote_identifier(NameStr( quote_identifier(NameStr(
cimvCreate->formCimv->triggerfnnamespace)), cimvCreate->formCimv->triggerfnnamespace)),
quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname)), quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname)),
insertTable->schemaname, cimvCreate->queryString,
insertTable->relname); cimvCreate->prefix);
} }
int expectedResult = isCitusTable ? SPI_OK_SELECT : SPI_OK_UTILITY; int expectedResult = isCitusTable ? SPI_OK_SELECT : SPI_OK_UTILITY;
@ -936,36 +1161,43 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent)
if (isCitusTable) if (isCitusTable)
{ {
if (SPI_tuptable != NULL && SPI_tuptable->tupdesc->natts == 2 && SPI_processed == CheckSPIResultForColocatedRun();
1)
{
SPITupleTable *tuptable = SPI_tuptable;
TupleDesc tupdesc = tuptable->tupdesc;
HeapTuple tuple = tuptable->vals[0];
SPI_getvalue(tuple, tupdesc, 1);
bool isNull;
Datum isSuccessDatum = SPI_getbinval(tuple, tupdesc, 1, &isNull);
if (!isNull && !DatumGetBool(isSuccessDatum))
{
elog(ERROR, "SPI_exec failed: %s", SPI_getvalue(tuple, tupdesc, 2));
}
}
} }
pfree(buf.data); pfree(buf.data);
} }
static void CheckSPIResultForColocatedRun(void) {
if (SPI_tuptable != NULL && SPI_tuptable->tupdesc->natts == 2 && SPI_processed ==
1)
{
SPITupleTable *tuptable = SPI_tuptable;
TupleDesc tupdesc = tuptable->tupdesc;
HeapTuple tuple = tuptable->vals[0];
SPI_getvalue(tuple, tupdesc, 1);
bool isNull;
Datum isSuccessDatum = SPI_getbinval(tuple, tupdesc, 1, &isNull);
if (!isNull && !DatumGetBool(isSuccessDatum))
{
elog(ERROR, "SPI_exec failed: %s", SPI_getvalue(tuple, tupdesc, 2));
}
}
}
static CimvCreate * static CimvCreate *
InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *createOptions) InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *createOptions,
const char* query_string, char* prefix)
{ {
CimvCreate *cimvCreate = palloc(sizeof(CimvCreate)); CimvCreate *cimvCreate = palloc(sizeof(CimvCreate));
cimvCreate->formCimv = palloc(sizeof(FormData_pg_cimv)); cimvCreate->formCimv = palloc(sizeof(FormData_pg_cimv));
cimvCreate->memoryContext = CurrentMemoryContext;
cimvCreate->formCimv->jobid = 0; cimvCreate->formCimv->jobid = 0;
cimvCreate->formCimv->landingtable = InvalidOid; cimvCreate->formCimv->landingtable = InvalidOid;
cimvCreate->queryString = query_string;
Query *query = (Query *) stmt->query; Query *query = (Query *) stmt->query;
RangeTblEntry *baseRte = (RangeTblEntry *) linitial(query->rtable); RangeTblEntry *baseRte = (RangeTblEntry *) linitial(query->rtable);
@ -979,11 +1211,16 @@ InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *create
cimvCreate->formCimv->basetable = baseRte->relid; cimvCreate->formCimv->basetable = baseRte->relid;
cimvCreate->prefixId = UniqueId(); if (prefix) {
cimvCreate->prefix = CIMVInternalPrefix(cimvCreate->baseTableName, cimvCreate->prefixId); cimvCreate->prefix = prefix;
}else {
cimvCreate->prefixId = UniqueId();
cimvCreate->prefix = CIMVInternalPrefix(cimvCreate->baseTableName, cimvCreate->prefixId);
}
namestrcpy(&cimvCreate->formCimv->triggerfnnamespace, CIMV_INTERNAL_SCHEMA); namestrcpy(&cimvCreate->formCimv->triggerfnnamespace, CIMV_INTERNAL_SCHEMA);
char* funcName = CIMVTriggerFuncName(cimvCreate->prefixId, stmt->into->rel->relname); char* funcName = CIMVTriggerFuncName(cimvCreate->prefixId, stmt->into->rel->relname);
namestrcpy(&cimvCreate->formCimv->triggerfnname, funcName); namestrcpy(&cimvCreate->formCimv->triggerfnname, "cimv_trigger");
StringInfo mat = makeStringInfo(); StringInfo mat = makeStringInfo();
appendStringInfo(mat, "%s_cimv_%s", cimvCreate->prefix, MATERIALIZATION_TABLE_SUFFIX); appendStringInfo(mat, "%s_cimv_%s", cimvCreate->prefix, MATERIALIZATION_TABLE_SUFFIX);
@ -1000,6 +1237,7 @@ InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *create
cimvCreate->targetListEntries = NIL; cimvCreate->targetListEntries = NIL;
cimvCreate->groupTargetListEntries = NIL; cimvCreate->groupTargetListEntries = NIL;
cimvCreate->aggTargetListEntries = NIL; cimvCreate->aggTargetListEntries = NIL;
cimvCreate->triggerNameList = NIL;
cimvCreate->citusTable = IsCitusTable(baseRte->relid) ? LookupCitusTableCacheEntry( cimvCreate->citusTable = IsCitusTable(baseRte->relid) ? LookupCitusTableCacheEntry(
baseRte->relid) : NULL; baseRte->relid) : NULL;
cimvCreate->partitionColumn = NULL; cimvCreate->partitionColumn = NULL;
@ -1022,6 +1260,10 @@ InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *create
(char *) quote_identifier(cimvCreate->landingTableName->schemaname), (char *) quote_identifier(cimvCreate->landingTableName->schemaname),
(char *) quote_identifier(cimvCreate->landingTableName->relname), -1); (char *) quote_identifier(cimvCreate->landingTableName->relname), -1);
cimvCreate->insertTable = cimvCreate->createOptions->schedule == NULL ?
cimvCreate->matTableNameQuoted :
cimvCreate->landingTableNameQuoted;
return cimvCreate; return cimvCreate;
} }

View File

@ -151,7 +151,7 @@ DropCimv(Form_pg_cimv formCimv, DropBehavior behavior)
} }
DropCronJob(formCimv); DropCronJob(formCimv);
DropDmlTriggers(formCimv); // DropDmlTriggers(formCimv);
/* Lock */ /* Lock */
if (OidIsValid(matTableAddress.objectId)) if (OidIsValid(matTableAddress.objectId))

View File

@ -11,4 +11,17 @@ DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass);
#include "../../columnar/sql/columnar--9.5-1--10.0-1.sql" #include "../../columnar/sql/columnar--9.5-1--10.0-1.sql"
CREATE SCHEMA cimv_internal; CREATE SCHEMA cimv_internal;
GRANT ALL ON SCHEMA cimv_internal to public; GRANT ALL ON SCHEMA cimv_internal to public;
CREATE FUNCTION cimv_internal.cimv_trigger()
RETURNS trigger
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$cimv_trigger$$;
CREATE FUNCTION pg_catalog.worker_record_trigger_dependency(basetable_name regclass, inserttable_name regclass, trigger_name text)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', 'worker_record_trigger_dependency';
COMMENT ON FUNCTION pg_catalog.worker_record_trigger_dependency(regclass,regclass,text)
IS 'record the fact that the trigger depends on the table in pg_depend';