From bc747206d535a14f2d5d8ef482e459c4d3f47d3d Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Tue, 19 Jan 2021 10:49:42 +0300 Subject: [PATCH] wip --- src/backend/distributed/cimv/create.c | 446 ++++++++++++++---- src/backend/distributed/cimv/drop.c | 2 +- .../distributed/sql/citus--9.5-1--10.0-1.sql | 15 +- 3 files changed, 359 insertions(+), 104 deletions(-) diff --git a/src/backend/distributed/cimv/create.c b/src/backend/distributed/cimv/create.c index a6372d552..7f5f3acd6 100644 --- a/src/backend/distributed/cimv/create.c +++ b/src/backend/distributed/cimv/create.c @@ -9,17 +9,21 @@ #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "catalog/pg_trigger.h" #include "catalog/toasting.h" #include "commands/defrem.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "commands/view.h" +#include "distributed/commands.h" #include "distributed/citus_ruleutils.h" #include "distributed/pg_cimv.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/security_utils.h" #include "distributed/sequence_utils.h" +#include "distributed/worker_protocol.h" +#include "distributed/multi_executor.h" #include "distributed/coordinator_protocol.h" #include "executor/spi.h" #include "miscadmin.h" @@ -50,6 +54,7 @@ typedef struct typedef struct { const CreateTableAsStmt *stmt; + const char* queryString; MatViewCreateOptions *createOptions; Form_pg_cimv formCimv; RangeVar *baseTableName; @@ -62,6 +67,7 @@ typedef struct RangeVar *landingTableNameQuoted; RangeVar *userViewNameQuoted; RangeVar *refreshViewNameQuoted; + RangeVar* insertTable; List *targetListEntries; List *groupTargetListEntries; List *aggTargetListEntries; @@ -70,6 +76,9 @@ typedef struct bool supportsDelete; char* prefix; int prefixId; + List* triggerNameList; + MemoryContext memoryContext; + } CimvCreate; static void CreateCimv(CimvCreate *cimvCreate); @@ -80,8 +89,9 @@ static void CreateUserView(CimvCreate *cimvCreate); static void CreateRefreshView(CimvCreate *cimvCreate); static void CreateDataChangeTriggerFunction(CimvCreate *cimvCreate); static void CreateCronJob(CimvCreate *cimvCreate); -static void DataChangeTriggerFunctionAppendInsertDelete(CimvCreate *cimvCreate, StringInfo - buf, bool isInsert); +static char* DataChangeTriggerInsertDeleteQueryString(CimvCreate *cimvCreate, bool isInsert, + char* insertTableName, char* newTableName); +static char* DataChangeTriggerTruncateQueryString(char* insertTableName); static void DataChangeTriggerFunctionAppendErrorOnDelete(CimvCreate *cimvCreate, StringInfo buf); static void AppendOnConflict(CimvCreate *cimvCreate, StringInfo buf, bool isInsert); @@ -93,7 +103,8 @@ static void ValidateAgg(Aggref *agg, bool *supportsDelete); static void AddCountAgg(Query *query, bool isInsert); static Node * PartializeAggs(Node *node, void *context); static CimvCreate * InitializeCimvCreate(const CreateTableAsStmt *stmt, - MatViewCreateOptions *createOptions); + MatViewCreateOptions *createOptions, + const char* query_string, char* prefix); static MatViewCreateOptions * GetMatViewCreateOptions(const CreateTableAsStmt *stmt); static ObjectAddress DefineVirtualRelation(RangeVar *relation, List *tlist, @@ -106,81 +117,211 @@ static Oid AggregateFunctionOid(const char *functionName, Oid inputType); static char* CIMVTriggerFuncName(int prefixId, const char* relname); static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId); static void AlterTableOwner(RangeVar* tableName, char* ownerName); +static void CreateDependencyFromTriggersToView(Oid baseRelationId, List* triggerNameList, Oid userViewId); +static CreateTableAsStmt* ParseQueryStringToCreateTableAsStmt(const char* queryString); +static char* InsertTableName(RangeVar* insertTable); +static void CreateDependenciesFromTriggersToView(CimvCreate* cimvCreate); +static void CheckSPIResultForColocatedRun(void); -extern Datum trigf(PG_FUNCTION_ARGS); +PG_FUNCTION_INFO_V1(cimv_trigger); +PG_FUNCTION_INFO_V1(worker_record_trigger_dependency); -PG_FUNCTION_INFO_V1(trigf); +static char *str_replace(char *orig, char *rep, char *with); Datum -trigf(PG_FUNCTION_ARGS) +worker_record_trigger_dependency(PG_FUNCTION_ARGS) { - TriggerData *trigdata = (TriggerData *) fcinfo->context; - Trigger* trigger = trigdata->tg_trigger; - TupleDesc tupdesc; - HeapTuple rettuple; - char *when; - bool checknull = false; - bool isnull; - int ret, i; + Oid relationOid = PG_GETARG_OID(0); + Oid userViewOid = PG_GETARG_OID(1); + char* triggerName = text_to_cstring(PG_GETARG_TEXT_P(2)); + CreateDependencyFromTriggersToView(relationOid, list_make1(triggerName), userViewOid); + + PG_RETURN_VOID(); +} + +Datum cimv_trigger(PG_FUNCTION_ARGS) +{ /* make sure it's called as a trigger at all */ if (!CALLED_AS_TRIGGER(fcinfo)) elog(ERROR, "trigf: not called by trigger manager"); - /* tuple to return to executor */ - if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) - rettuple = trigdata->tg_newtuple; - else - rettuple = trigdata->tg_trigtuple; - - if (TRIGGER_FOR_INSERT(trigger->tgtype)) { - + TriggerData *trigdata = (TriggerData *) fcinfo->context; + Trigger* trigger = trigdata->tg_trigger; + TupleDesc tupdesc; + HeapTuple rettuple; + char *when; + if (trigger == NULL || trigger->tgnargs < 2) { + elog(ERROR, "cimv_trigger: should be called with at least two arguments"); } - /* check for null values */ - if (!TRIGGER_FIRED_BY_DELETE(trigdata->tg_event) - && TRIGGER_FIRED_BEFORE(trigdata->tg_event)) - checknull = true; + List* queryStringList = NIL; - if (TRIGGER_FIRED_BEFORE(trigdata->tg_event)) - when = "before"; - else - when = "after "; + char* createViewQueryString = trigger->tgargs[0]; + char* prefix = trigger->tgargs[1]; + + if (trigger->tgnargs >= 3) { + char* baseTableName = trigger->tgargs[2]; + char* relName = get_rel_name(trigdata->tg_relation->rd_id); + StringInfo first = makeStringInfo(); + appendStringInfo(first, "FROM %s", baseTableName); + + StringInfo second = makeStringInfo(); + appendStringInfo(second, "FROM %s", relName); + createViewQueryString = str_replace(createViewQueryString, first->data, second->data); + + } + + CreateTableAsStmt* stmt = ParseQueryStringToCreateTableAsStmt(createViewQueryString); + MatViewCreateOptions *options = GetMatViewCreateOptions(stmt); + CimvCreate* cimvCreate = InitializeCimvCreate(stmt, options, NULL, prefix); + /* we dont really need to validate but this also sets some fields, we should possibly not set + any fields in a validation */ + ValidateCimv(cimvCreate); + + char* insertTableName = NULL; + if (trigger->tgnargs >= 4) { + insertTableName = trigger->tgargs[3]; + }else { + RangeVar *insertTable = cimvCreate->createOptions->schedule == NULL ? + cimvCreate->matTableNameQuoted : + cimvCreate->landingTableNameQuoted; + insertTableName = InsertTableName(insertTable); + } + + if (TRIGGER_FOR_INSERT(trigger->tgtype) || TRIGGER_FOR_UPDATE(trigger->tgtype)) { + char* newTableName = trigger->tgnewtable; + char* insertDeleteQueryString = + DataChangeTriggerInsertDeleteQueryString(cimvCreate, true, insertTableName, newTableName); + queryStringList = lappend(queryStringList, insertDeleteQueryString); + } + + if (TRIGGER_FOR_DELETE(trigger->tgtype) || TRIGGER_FOR_UPDATE(trigger->tgtype)) { + if (cimvCreate->supportsDelete) + { + char* oldTableName = trigger->tgoldtable; + char* insertDeleteQueryString = + DataChangeTriggerInsertDeleteQueryString(cimvCreate, false, insertTableName, oldTableName); + queryStringList = lappend(queryStringList, insertDeleteQueryString); + } + else + { + elog(ERROR, + "MATERIALIZED VIEW %s on table %s does not support UPDATE/DELETE", + cimvCreate->userViewNameQuoted->relname, + cimvCreate->baseTableNameQuoted->relname); + } + } + + if (TRIGGER_FOR_TRUNCATE(trigger->tgtype)) { + char* truncateQueryString = + DataChangeTriggerTruncateQueryString(insertTableName); + queryStringList = lappend(queryStringList, truncateQueryString); + } + + + /* tuple to return to executor */ + if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) { + rettuple = trigdata->tg_newtuple; + } + else { + rettuple = trigdata->tg_trigtuple; + } tupdesc = trigdata->tg_relation->rd_att; - /* connect to SPI manager */ - if ((ret = SPI_connect()) < 0) - elog(INFO, "trigf (fired %s): SPI_connect returned %d", when, ret); + if ( SPI_connect() < 0) + elog(ERROR, "cimv_trigger: could not connect to SPI"); - /* get number of rows in table */ - ret = SPI_exec("SELECT count(*) FROM ttest", 0); + /* make the temporary tables available to this transaction */ + if (SPI_register_trigger_data(trigdata) < 0) { + elog(ERROR, "cimv_trigger: could not create temporary trigger tables"); + } - if (ret < 0) - elog(NOTICE, "trigf (fired %s): SPI_exec returned %d", when, ret); + char* queryString = NULL; + foreach_ptr(queryString, queryStringList) { + if (SPI_execute(queryString, false, 0) < 0) { + elog(ERROR, "cimv_trigger: failed to run %s", queryString); + } + } - /* count(*) returns int8, so be careful to convert */ - i = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0], - SPI_tuptable->tupdesc, - 1, - &isnull)); + if (SPI_finish() != SPI_OK_FINISH) + { + elog(ERROR, "SPI_finish failed"); + } - elog (INFO, "trigf (fired %s): there are %d rows in ttest", when, i); - - SPI_finish(); - - if (checknull) - { - SPI_getbinval(rettuple, tupdesc, 1, &isnull); - if (isnull) - rettuple = NULL; - } return PointerGetDatum(rettuple); } +char *str_replace(char *orig, char *rep, char *with) { + char *result; // the return string + char *ins; // the next insert point + char *tmp; // varies + int len_rep; // length of rep (the string to remove) + int len_with; // length of with (the string to replace rep with) + int len_front; // distance between rep and end of last rep + int count; // number of replacements + + // sanity checks and initialization + if (!orig || !rep) + return NULL; + len_rep = strlen(rep); + if (len_rep == 0) + return NULL; // empty rep causes infinite loop during count + if (!with) + with = ""; + len_with = strlen(with); + + // count the number of replacements needed + ins = orig; + for (count = 0; tmp = strstr(ins, rep); ++count) { + ins = tmp + len_rep; + } + + tmp = result = malloc(strlen(orig) + (len_with - len_rep) * count + 1); + + if (!result) + return NULL; + + // first time through the loop, all the variable are set correctly + // from here on, + // tmp points to the end of the result string + // ins points to the next occurrence of rep in orig + // orig points to the remainder of orig after "end of rep" + while (count--) { + ins = strstr(orig, rep); + len_front = ins - orig; + tmp = strncpy(tmp, orig, len_front) + len_front; + tmp = strcpy(tmp, with) + len_with; + orig += len_front + len_rep; // move to next "end of rep" + } + strcpy(tmp, orig); + return result; +} + + +static CreateTableAsStmt* ParseQueryStringToCreateTableAsStmt(const char* queryString) { + Query *query = ParseQueryString(queryString, NULL, 0); + int cursorOptions = 0; + ParamListInfo paramListInfo = NULL; + PlannedStmt *plan = + linitial(pg_plan_queries(list_make1(query), queryString, cursorOptions, paramListInfo)); + Node* parsetree = plan->utilityStmt; + if (!IsA(parsetree, CreateTableAsStmt)){ + elog(ERROR, "cimv_trigger: the first argument should be a CreateTableAsStmt query string"); + } + CreateTableAsStmt* stmt = (CreateTableAsStmt*) parsetree; + return stmt; +} + +static char* InsertTableName(RangeVar* insertTable) { + StringInfo stringInfo = makeStringInfo(); + appendStringInfo(stringInfo, "%s.%s", insertTable->schemaname, insertTable->relname); + return stringInfo->data; +} bool ProcessCreateMaterializedViewStmt(const CreateTableAsStmt *stmt, const char *query_string, @@ -198,7 +339,8 @@ ProcessCreateMaterializedViewStmt(const CreateTableAsStmt *stmt, const char *que if (options->isCimv) { - cimvCreate = InitializeCimvCreate(stmt, options); + char* prefix = NULL; /* here we don't know the prefix so need to create it */ + cimvCreate = InitializeCimvCreate(stmt, options, query_string, prefix); ValidateCimv(cimvCreate); CreateCimv(cimvCreate); @@ -236,7 +378,7 @@ CreateCimv(CimvCreate *cimvCreate) CreateUserView(cimvCreate); CreateRefreshView(cimvCreate); - CreateDataChangeTriggerFunction(cimvCreate); + // CreateDataChangeTriggerFunction(cimvCreate); CreateDataChangeTriggers(cimvCreate); InsertIntoPgCimv(cimvCreate->formCimv); @@ -250,6 +392,46 @@ CreateCimv(CimvCreate *cimvCreate) RefreshCimv(cimvCreate->formCimv, cimvCreate->stmt->into->skipData, true); } + CreateDependenciesFromTriggersToView(cimvCreate); + +} + +static void CreateDependenciesFromTriggersToView(CimvCreate* cimvCreate) { + if (cimvCreate->citusTable) { + if (SPI_connect() != SPI_OK_CONNECT) + { + elog(ERROR, "SPI_connect failed"); + } + char* triggerName = NULL; + foreach_ptr(triggerName, cimvCreate->triggerNameList) { + StringInfo queryBuf = makeStringInfo(); + appendStringInfo(queryBuf, + "SELECT bool_and(success), max(result) FROM run_command_on_colocated_placements($param$%s.%s$param$, $param$%s.%s$param$, $create_dependency$ ", + cimvCreate->baseTableNameQuoted->schemaname, + cimvCreate->baseTableNameQuoted->relname, + cimvCreate->insertTable->schemaname, + cimvCreate->insertTable->relname); + + appendStringInfo(queryBuf, + "SELECT worker_record_trigger_dependency($base_table$%%s$base_table$, $insert_table$%%s$insert_table$, $trigger_name$%s$trigger_name$)", + triggerName); + + appendStringInfoString(queryBuf, "$create_dependency$);"); + if (SPI_execute(queryBuf->data, false, 0) != SPI_OK_SELECT) + { + elog(ERROR, "SPI_exec failed: %s", queryBuf->data); + } + CheckSPIResultForColocatedRun(); + } + if (SPI_finish() != SPI_OK_FINISH) + { + elog(ERROR, "SPI_finish failed"); + } + }else { + CreateDependencyFromTriggersToView(cimvCreate->formCimv->basetable, + cimvCreate->triggerNameList, cimvCreate->formCimv->userview); + } + } @@ -682,7 +864,7 @@ CreateDataChangeTriggerFunction(CimvCreate *cimvCreate) /* INSERT */ appendStringInfoString(&buf, "IF (TG_OP = $inside_trigger_function$INSERT$inside_trigger_function$ OR TG_OP = $inside_trigger_function$UPDATE$inside_trigger_function$) THEN\n"); - DataChangeTriggerFunctionAppendInsertDelete(cimvCreate, &buf, true); + // DataChangeTriggerInsertDeleteQueryString(cimvCreate, &buf, true); appendStringInfoString(&buf, "END IF;\n"); /* DELETE */ @@ -690,7 +872,7 @@ CreateDataChangeTriggerFunction(CimvCreate *cimvCreate) "IF (TG_OP = $inside_trigger_function$DELETE$inside_trigger_function$ OR TG_OP = $inside_trigger_function$UPDATE$inside_trigger_function$) THEN\n"); if (cimvCreate->supportsDelete) { - DataChangeTriggerFunctionAppendInsertDelete(cimvCreate, &buf, false); + // DataChangeTriggerInsertDeleteQueryString(cimvCreate, &buf, false); } else { @@ -738,13 +920,51 @@ CreateDataChangeTriggerFunction(CimvCreate *cimvCreate) pfree(buf.data); } +static char* DataChangeTriggerTruncateQueryString(char* insertTableName) { + StringInfo queryBuf = makeStringInfo(); + appendStringInfo(queryBuf, "TRUNCATE TABLE %s", insertTableName); + /* TODO: also truncate landing table if it exists */ + return queryBuf->data; +} -static void -DataChangeTriggerFunctionAppendInsertDelete(CimvCreate *cimvCreate, StringInfo buf, bool - isInsert) +static void CreateDependencyFromTriggersToView(Oid baseRelationId, List* triggerNameList, Oid userViewId) { + + List* triggerIdList = GetExplicitTriggerIdList(baseRelationId); + char* targetTriggerName = NULL; + foreach_ptr(targetTriggerName, triggerNameList) { + Oid triggerId = InvalidOid; + foreach_oid(triggerId, triggerIdList) + { + bool missingOk = false; + HeapTuple triggerTuple = GetTriggerTupleById(triggerId, missingOk); + Form_pg_trigger triggerForm = (Form_pg_trigger) GETSTRUCT(triggerTuple); + char* name = NameStr(triggerForm->tgname); + if (namestrcmp(&triggerForm->tgname, targetTriggerName) == 0) { + ObjectAddress triggerAddr = { + .classId = TriggerRelationId, + .objectId = triggerId, + .objectSubId = 0 + }; + ObjectAddress userViewAddr = { + .classId = RelationRelationId, + .objectId = userViewId, + .objectSubId = 0 + }; + + /* dependency from trigger to user view */ + recordDependencyOn(&triggerAddr, &userViewAddr, DEPENDENCY_AUTO); + } + } + } + +} + +static char* +DataChangeTriggerInsertDeleteQueryString(CimvCreate *cimvCreate, bool + isInsert, char* insertTableName, char* newTableName) { - StringInfoData querybuf; - initStringInfo(&querybuf); + StringInfo viewQueryBuf = makeStringInfo(); + StringInfo queryBuf = makeStringInfo(); Query *query = (Query *) copyObject(cimvCreate->stmt->into->viewQuery); int inverse = isInsert ? 0 : 1; @@ -753,31 +973,27 @@ DataChangeTriggerFunctionAppendInsertDelete(CimvCreate *cimvCreate, StringInfo b RangeTblEntry *baseRte = (RangeTblEntry *) linitial(query->rtable); baseRte->rtekind = RTE_CTE; - baseRte->ctename = isInsert ? "__ins__" : "__del__"; + baseRte->ctename = newTableName; - pg_get_query_def(query, &querybuf); + pg_get_query_def(query, viewQueryBuf); - appendStringInfoString(buf, "EXECUTE format($exec_format$INSERT INTO %s AS __mat__ "); - - /* SELECT */ - appendStringInfoString(buf, querybuf.data); + appendStringInfo(queryBuf, "INSERT INTO %s AS __mat__ %s", insertTableName, viewQueryBuf->data); /* ON CONFLICT */ if (cimvCreate->createOptions->schedule == NULL) { - AppendOnConflict(cimvCreate, buf, isInsert); + AppendOnConflict(cimvCreate, queryBuf, isInsert); } - appendStringInfoString(buf, ";\n$exec_format$, TG_ARGV[0]);"); + appendStringInfoChar(queryBuf, ';'); if (!isInsert && cimvCreate->createOptions->schedule == NULL) { - appendStringInfoString(buf, - "EXECUTE format($exec_format$DELETE FROM %s WHERE __count__ = 0;$exec_format$, TG_ARGV[0]);"); + appendStringInfo(queryBuf, + "DELETE FROM %s WHERE __count__ = 0;", insertTableName); } - pfree(querybuf.data); - pfree(query); + return queryBuf->data; } @@ -847,13 +1063,16 @@ CreateDataChangeTriggers(CimvCreate *cimvCreate) static void CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent) { + StringInfoData buf; initStringInfo(&buf); bool isCitusTable = cimvCreate->citusTable != NULL; - char *event; - char *referencing; + + char* event = NULL; + char* referencing = NULL; + switch (triggerEvent) { case TRIGGER_EVENT_INSERT: @@ -883,8 +1102,11 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent) /* TODO: UPDATE [ OF column_name [, ... ] ] */ StringInfoData triggerName; initStringInfo(&triggerName); - appendStringInfo(&triggerName, "%s_%s", NameStr(cimvCreate->formCimv->triggerfnname), - event); + appendStringInfo(&triggerName, "%s_%s", cimvCreate->prefix, event); + + MemoryContext oldMemoryContext = MemoryContextSwitchTo(cimvCreate->memoryContext); + cimvCreate->triggerNameList = lappend(cimvCreate->triggerNameList, pstrdup(triggerName.data)); + MemoryContextSwitchTo(oldMemoryContext); RangeVar *insertTable = cimvCreate->createOptions->schedule == NULL ? cimvCreate->matTableNameQuoted : @@ -901,13 +1123,16 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent) appendStringInfo(&buf, "CREATE TRIGGER %s AFTER %s ON %%s %s " - "FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s(%%L)", + "FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s($view_def$%s$view_def$, $prefix$%s$prefix$, $baserel$%s$baserel$, %%L )", quote_identifier(triggerName.data), event, referencing, quote_identifier(NameStr( cimvCreate->formCimv->triggerfnnamespace)), - quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname))); + quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname)), + cimvCreate->queryString, + cimvCreate->prefix, + cimvCreate->baseTableNameQuoted->relname); appendStringInfoString(&buf, "$create_trigger$);"); } @@ -915,7 +1140,7 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent) { appendStringInfo(&buf, "CREATE TRIGGER %s AFTER %s ON %s.%s %s " - "FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s('%s.%s')", + "FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s($view_def$%s$view_def$, $prefix$%s$prefix$)", quote_identifier(triggerName.data), event, cimvCreate->baseTableNameQuoted->schemaname, @@ -924,8 +1149,8 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent) quote_identifier(NameStr( cimvCreate->formCimv->triggerfnnamespace)), quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname)), - insertTable->schemaname, - insertTable->relname); + cimvCreate->queryString, + cimvCreate->prefix); } int expectedResult = isCitusTable ? SPI_OK_SELECT : SPI_OK_UTILITY; @@ -936,36 +1161,43 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent) if (isCitusTable) { - if (SPI_tuptable != NULL && SPI_tuptable->tupdesc->natts == 2 && SPI_processed == - 1) - { - SPITupleTable *tuptable = SPI_tuptable; - TupleDesc tupdesc = tuptable->tupdesc; - HeapTuple tuple = tuptable->vals[0]; - - SPI_getvalue(tuple, tupdesc, 1); - bool isNull; - Datum isSuccessDatum = SPI_getbinval(tuple, tupdesc, 1, &isNull); - - if (!isNull && !DatumGetBool(isSuccessDatum)) - { - elog(ERROR, "SPI_exec failed: %s", SPI_getvalue(tuple, tupdesc, 2)); - } - } + CheckSPIResultForColocatedRun(); } pfree(buf.data); } +static void CheckSPIResultForColocatedRun(void) { + if (SPI_tuptable != NULL && SPI_tuptable->tupdesc->natts == 2 && SPI_processed == + 1) + { + SPITupleTable *tuptable = SPI_tuptable; + TupleDesc tupdesc = tuptable->tupdesc; + HeapTuple tuple = tuptable->vals[0]; + + SPI_getvalue(tuple, tupdesc, 1); + bool isNull; + Datum isSuccessDatum = SPI_getbinval(tuple, tupdesc, 1, &isNull); + + if (!isNull && !DatumGetBool(isSuccessDatum)) + { + elog(ERROR, "SPI_exec failed: %s", SPI_getvalue(tuple, tupdesc, 2)); + } + } +} static CimvCreate * -InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *createOptions) +InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *createOptions, + const char* query_string, char* prefix) { CimvCreate *cimvCreate = palloc(sizeof(CimvCreate)); cimvCreate->formCimv = palloc(sizeof(FormData_pg_cimv)); + cimvCreate->memoryContext = CurrentMemoryContext; + cimvCreate->formCimv->jobid = 0; cimvCreate->formCimv->landingtable = InvalidOid; + cimvCreate->queryString = query_string; Query *query = (Query *) stmt->query; RangeTblEntry *baseRte = (RangeTblEntry *) linitial(query->rtable); @@ -979,11 +1211,16 @@ InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *create cimvCreate->formCimv->basetable = baseRte->relid; - cimvCreate->prefixId = UniqueId(); - cimvCreate->prefix = CIMVInternalPrefix(cimvCreate->baseTableName, cimvCreate->prefixId); + if (prefix) { + cimvCreate->prefix = prefix; + }else { + cimvCreate->prefixId = UniqueId(); + cimvCreate->prefix = CIMVInternalPrefix(cimvCreate->baseTableName, cimvCreate->prefixId); + } + namestrcpy(&cimvCreate->formCimv->triggerfnnamespace, CIMV_INTERNAL_SCHEMA); char* funcName = CIMVTriggerFuncName(cimvCreate->prefixId, stmt->into->rel->relname); - namestrcpy(&cimvCreate->formCimv->triggerfnname, funcName); + namestrcpy(&cimvCreate->formCimv->triggerfnname, "cimv_trigger"); StringInfo mat = makeStringInfo(); appendStringInfo(mat, "%s_cimv_%s", cimvCreate->prefix, MATERIALIZATION_TABLE_SUFFIX); @@ -1000,6 +1237,7 @@ InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *create cimvCreate->targetListEntries = NIL; cimvCreate->groupTargetListEntries = NIL; cimvCreate->aggTargetListEntries = NIL; + cimvCreate->triggerNameList = NIL; cimvCreate->citusTable = IsCitusTable(baseRte->relid) ? LookupCitusTableCacheEntry( baseRte->relid) : NULL; cimvCreate->partitionColumn = NULL; @@ -1022,6 +1260,10 @@ InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *create (char *) quote_identifier(cimvCreate->landingTableName->schemaname), (char *) quote_identifier(cimvCreate->landingTableName->relname), -1); + cimvCreate->insertTable = cimvCreate->createOptions->schedule == NULL ? + cimvCreate->matTableNameQuoted : + cimvCreate->landingTableNameQuoted; + return cimvCreate; } diff --git a/src/backend/distributed/cimv/drop.c b/src/backend/distributed/cimv/drop.c index b709eec9e..37dd6b876 100644 --- a/src/backend/distributed/cimv/drop.c +++ b/src/backend/distributed/cimv/drop.c @@ -151,7 +151,7 @@ DropCimv(Form_pg_cimv formCimv, DropBehavior behavior) } DropCronJob(formCimv); - DropDmlTriggers(formCimv); + // DropDmlTriggers(formCimv); /* Lock */ if (OidIsValid(matTableAddress.objectId)) diff --git a/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql b/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql index 63fef6b0a..49afe7a73 100644 --- a/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql +++ b/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql @@ -11,4 +11,17 @@ DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass); #include "../../columnar/sql/columnar--9.5-1--10.0-1.sql" CREATE SCHEMA cimv_internal; -GRANT ALL ON SCHEMA cimv_internal to public; \ No newline at end of file +GRANT ALL ON SCHEMA cimv_internal to public; + +CREATE FUNCTION cimv_internal.cimv_trigger() + RETURNS trigger + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$cimv_trigger$$; + +CREATE FUNCTION pg_catalog.worker_record_trigger_dependency(basetable_name regclass, inserttable_name regclass, trigger_name text) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', 'worker_record_trigger_dependency'; +COMMENT ON FUNCTION pg_catalog.worker_record_trigger_dependency(regclass,regclass,text) + IS 'record the fact that the trigger depends on the table in pg_depend'; + \ No newline at end of file