diff --git a/src/backend/distributed/cimv/create.c b/src/backend/distributed/cimv/create.c index 0a25af7b9..fe63f8bf8 100644 --- a/src/backend/distributed/cimv/create.c +++ b/src/backend/distributed/cimv/create.c @@ -23,6 +23,7 @@ #include "distributed/metadata_cache.h" #include "distributed/security_utils.h" #include "distributed/sequence_utils.h" +#include "distributed/string_utils.h" #include "distributed/worker_protocol.h" #include "distributed/multi_executor.h" #include "distributed/coordinator_protocol.h" @@ -89,13 +90,10 @@ static void CreateIndexOnMatTable(CimvCreate *cimvCreate); static void DistributeTable(CimvCreate *cimvCreate, RangeVar *tableName); static void CreateUserView(CimvCreate *cimvCreate); static void CreateRefreshView(CimvCreate *cimvCreate); -static void CreateDataChangeTriggerFunction(CimvCreate *cimvCreate); static void CreateCronJob(CimvCreate *cimvCreate); static char* DataChangeTriggerInsertDeleteQueryString(CimvCreate *cimvCreate, bool isInsert, char* insertTableName, char* newTableName); static char* DataChangeTriggerTruncateQueryString(char* insertTableName); -static void DataChangeTriggerFunctionAppendErrorOnDelete(CimvCreate *cimvCreate, - StringInfo buf); static void AppendOnConflict(CimvCreate *cimvCreate, StringInfo buf, bool isInsert); static void CreateDataChangeTriggers(CimvCreate *cimvCreate); static void CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent); @@ -116,9 +114,7 @@ static Oid PartialAggOid(void); static void AppendQuotedLiteral(StringInfo buf, const char *val); static void AppendStringInfoFunction(StringInfo buf, Oid fnoid); static Oid AggregateFunctionOid(const char *functionName, Oid inputType); -static char* CIMVTriggerFuncName(int prefixId, const char* relname); static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId); -static void AlterTableOwner(RangeVar* tableName, char* ownerName); static void CreateDependencyFromTriggersToView(Oid baseRelationId, List* triggerNameList, Oid userViewId); @@ -127,40 +123,54 @@ static char* InsertTableName(RangeVar* insertTable); static void CreateDependenciesFromTriggersToView(CimvCreate* cimvCreate); static void CheckSPIResultForColocatedRun(void); static char* CreateViewCommandForShard(CimvCreate* cimvCreate, char* shardViewQueryDef); +static List* ReadTriggerNameList(ArrayType *triggerNameArrayObject); PG_FUNCTION_INFO_V1(cimv_trigger); PG_FUNCTION_INFO_V1(worker_record_trigger_dependency); -Datum -worker_record_trigger_dependency(PG_FUNCTION_ARGS) +Datum worker_record_trigger_dependency(PG_FUNCTION_ARGS) { Oid relationOid = PG_GETARG_OID(0); Oid userViewOid = PG_GETARG_OID(1); - char* triggerName = text_to_cstring(PG_GETARG_TEXT_P(2)); - CreateDependencyFromTriggersToView(relationOid, list_make1(triggerName), userViewOid); + ArrayType *triggerNameArrayObject = PG_GETARG_ARRAYTYPE_P(2); + List* triggerNameList = ReadTriggerNameList(triggerNameArrayObject); + + CreateDependencyFromTriggersToView(relationOid, triggerNameList, userViewOid); PG_RETURN_VOID(); } +static List* ReadTriggerNameList(ArrayType *triggerNameArrayObject) { + int triggerCount = ArrayObjectCount(triggerNameArrayObject); + Datum *triggerNameDatumArray = DeconstructArrayObject(triggerNameArrayObject); + + List* triggerNameList = NIL; + for (int i = 0; i < triggerCount; i++) + { + text* triggerName = (text*) DatumGetPointer(triggerNameDatumArray[i]); + triggerNameList = lappend(triggerNameList, text_to_cstring(triggerName)); + } + return triggerNameList; +} + Datum cimv_trigger(PG_FUNCTION_ARGS) { - /* make sure it's called as a trigger at all */ if (!CALLED_AS_TRIGGER(fcinfo)) elog(ERROR, "trigf: not called by trigger manager"); TriggerData *trigdata = (TriggerData *) fcinfo->context; Trigger* trigger = trigdata->tg_trigger; - HeapTuple rettuple; - if (trigger == NULL || trigger->tgnargs < 2) { - elog(ERROR, "cimv_trigger: should be called with at least two arguments"); + if (trigger == NULL || trigger->tgnargs < 3) { + elog(ERROR, "cimv_trigger: should be called with at least 3 arguments"); } List* queryStringList = NIL; char* createViewQueryString = trigger->tgargs[0]; char* prefix = trigger->tgargs[1]; + char* insertTableName = trigger->tgargs[2]; CreateTableAsStmt* stmt = ParseQueryStringToCreateTableAsStmt(createViewQueryString); MatViewCreateOptions *options = GetMatViewCreateOptions(stmt); @@ -169,16 +179,6 @@ Datum cimv_trigger(PG_FUNCTION_ARGS) any fields in a validation */ ValidateCimv(cimvCreate); - char* insertTableName = NULL; - if (trigger->tgnargs >= 3) { - insertTableName = trigger->tgargs[2]; - }else { - RangeVar *insertTable = cimvCreate->createOptions->schedule == NULL ? - cimvCreate->matTableNameQuoted : - cimvCreate->landingTableNameQuoted; - insertTableName = InsertTableName(insertTable); - } - if (TRIGGER_FOR_INSERT(trigger->tgtype) || TRIGGER_FOR_UPDATE(trigger->tgtype)) { char* newTableName = trigger->tgnewtable; char* insertDeleteQueryString = @@ -209,15 +209,6 @@ Datum cimv_trigger(PG_FUNCTION_ARGS) queryStringList = lappend(queryStringList, truncateQueryString); } - - /* tuple to return to executor */ - if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) { - rettuple = trigdata->tg_newtuple; - } - else { - rettuple = trigdata->tg_trigtuple; - } - /* connect to SPI manager */ if ( SPI_connect() < 0) elog(ERROR, "cimv_trigger: could not connect to SPI"); @@ -239,8 +230,7 @@ Datum cimv_trigger(PG_FUNCTION_ARGS) elog(ERROR, "SPI_finish failed"); } - - return PointerGetDatum(rettuple); + PG_RETURN_VOID(); } static CreateTableAsStmt* ParseQueryStringToCreateTableAsStmt(const char* queryString) { @@ -281,15 +271,8 @@ CreateTriggerTaskList(CimvCreate* cimvCreate, char* triggerName, char* event, ch uint64 jobId = INVALID_JOB_ID; int taskId = 1; - Oid leftSchemaId = get_rel_namespace(leftRelationId); char* qualifiedLeftRelName = generate_qualified_relation_name(leftRelationId); - char *leftSchemaName = get_namespace_name(leftSchemaId); - char *escapedLeftSchemaName = quote_literal_cstr(leftSchemaName); - - Oid rightSchemaId = get_rel_namespace(rightRelationId); char* qualifiedRightRelName = generate_qualified_relation_name(rightRelationId); - char *rightSchemaName = get_namespace_name(rightSchemaId); - char *escapedRightSchemaName = quote_literal_cstr(rightSchemaName); List *taskList = NIL; @@ -414,7 +397,6 @@ CreateCimv(CimvCreate *cimvCreate) CreateUserView(cimvCreate); CreateRefreshView(cimvCreate); - // CreateDataChangeTriggerFunction(cimvCreate); CreateDataChangeTriggers(cimvCreate); InsertIntoPgCimv(cimvCreate->formCimv); @@ -440,27 +422,27 @@ static void CreateDependenciesFromTriggersToView(CimvCreate* cimvCreate) { } SPI_commit(); SPI_start_transaction(); - char* triggerName = NULL; - foreach_ptr(triggerName, cimvCreate->triggerNameList) { - StringInfo queryBuf = makeStringInfo(); - appendStringInfo(queryBuf, - "SELECT bool_and(success), max(result) FROM run_command_on_colocated_placements($param$%s.%s$param$, $param$%s.%s$param$, $create_dependency$ ", - cimvCreate->baseTableNameQuoted->schemaname, - cimvCreate->baseTableNameQuoted->relname, - cimvCreate->insertTable->schemaname, - cimvCreate->insertTable->relname); + char* concatenatedTriggerNames = ConcatenateStringListWithDelimiter(cimvCreate->triggerNameList, ','); + StringInfo queryBuf = makeStringInfo(); + appendStringInfo(queryBuf, + "SELECT bool_and(success), max(result) FROM run_command_on_colocated_placements($param$%s.%s$param$, $param$%s.%s$param$, $create_dependency$ ", + cimvCreate->baseTableNameQuoted->schemaname, + cimvCreate->baseTableNameQuoted->relname, + cimvCreate->insertTable->schemaname, + cimvCreate->insertTable->relname); - appendStringInfo(queryBuf, - "SELECT worker_record_trigger_dependency($base_table$%%s$base_table$, $insert_table$%%s$insert_table$, $trigger_name$%s$trigger_name$)", - triggerName); + appendStringInfo(queryBuf, + "SELECT worker_record_trigger_dependency($base_table$%%s$base_table$, $insert_table$%%s$insert_table$, ARRAY[%s])", + concatenatedTriggerNames); - appendStringInfoString(queryBuf, "$create_dependency$);"); - if (SPI_execute(queryBuf->data, false, 0) != SPI_OK_SELECT) - { - elog(ERROR, "SPI_exec failed: %s", queryBuf->data); - } - CheckSPIResultForColocatedRun(); + appendStringInfoString(queryBuf, "$create_dependency$);"); + + if (SPI_execute(queryBuf->data, false, 0) != SPI_OK_SELECT) + { + elog(ERROR, "SPI_exec failed: %s", queryBuf->data); } + CheckSPIResultForColocatedRun(); + if (SPI_finish() != SPI_OK_FINISH) { elog(ERROR, "SPI_finish failed"); @@ -472,7 +454,6 @@ static void CreateDependenciesFromTriggersToView(CimvCreate* cimvCreate) { } - static void CreateMatTable(CimvCreate *cimvCreate, bool isLandingZone) { @@ -643,25 +624,6 @@ DistributeTable(CimvCreate *cimvCreate, RangeVar *tableName) pfree(querybuf.data); } -static void AlterTableOwner(RangeVar* tableName, char* ownerName) { - StringInfoData querybuf; - initStringInfo(&querybuf); - - appendStringInfo(&querybuf, - "ALTER TABLE %s.%s OWNER TO %s;", - tableName->schemaname ? tableName->schemaname : "public", - tableName->relname, - ownerName); - - if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY) - { - elog(ERROR, "SPI_exec failed: %s", querybuf.data); - } - - pfree(querybuf.data); -} - - static void CreateUserView(CimvCreate *cimvCreate) { @@ -870,95 +832,6 @@ CreateCronJob(CimvCreate *cimvCreate) pfree(queryText.data); } - -static void -CreateDataChangeTriggerFunction(CimvCreate *cimvCreate) -{ - StringInfoData buf; - initStringInfo(&buf); - - bool isCitusTable = cimvCreate->citusTable != NULL; - - if (isCitusTable) - { - appendStringInfo(&buf, - "SELECT * FROM run_command_on_workers($cmd$CREATE FUNCTION %s.%s() RETURNS trigger AS $$ BEGIN RETURN null; END; $$ LANGUAGE plpgsql$cmd$)", - quote_identifier(NameStr( - cimvCreate->formCimv->triggerfnnamespace)), - quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname))); - - if (SPI_execute(buf.data, false, 0) != SPI_OK_SELECT) - { - elog(ERROR, "SPI_exec failed: %s", buf.data); - } - - resetStringInfo(&buf); - } - - appendStringInfo(&buf, - "CREATE OR REPLACE FUNCTION %s.%s() RETURNS TRIGGER AS $trigger_function$ BEGIN\n", - quote_identifier(NameStr(cimvCreate->formCimv->triggerfnnamespace)), - quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname))); - - /* INSERT */ - appendStringInfoString(&buf, - "IF (TG_OP = $inside_trigger_function$INSERT$inside_trigger_function$ OR TG_OP = $inside_trigger_function$UPDATE$inside_trigger_function$) THEN\n"); - // DataChangeTriggerInsertDeleteQueryString(cimvCreate, &buf, true); - appendStringInfoString(&buf, "END IF;\n"); - - /* DELETE */ - appendStringInfoString(&buf, - "IF (TG_OP = $inside_trigger_function$DELETE$inside_trigger_function$ OR TG_OP = $inside_trigger_function$UPDATE$inside_trigger_function$) THEN\n"); - if (cimvCreate->supportsDelete) - { - // DataChangeTriggerInsertDeleteQueryString(cimvCreate, &buf, false); - } - else - { - DataChangeTriggerFunctionAppendErrorOnDelete(cimvCreate, &buf); - } - - appendStringInfoString(&buf, "END IF;\n"); - - /* TRUNCATE */ - appendStringInfoString(&buf, - "IF (TG_OP = $inside_trigger_function$TRUNCATE$inside_trigger_function$) THEN\n"); - - appendStringInfoString(&buf, - "EXECUTE format($exec_format$TRUNCATE TABLE %s; $exec_format$, TG_ARGV[0]);"); - - /* TODO: also truncate landing table if it exists */ - - appendStringInfoString(&buf, "END IF;\n"); - - appendStringInfoString(&buf, - "RETURN NULL; END; $trigger_function$ LANGUAGE plpgsql;"); - - /* CREATE */ - if (SPI_execute(buf.data, false, 0) != SPI_OK_UTILITY) - { - elog(ERROR, "SPI_exec failed: %s", buf.data); - } - - if (isCitusTable) - { - resetStringInfo(&buf); - appendStringInfo(&buf, - "SELECT FROM create_distributed_function($cdfn$%s.%s()$cdfn$)", - quote_identifier(NameStr( - cimvCreate->formCimv->triggerfnnamespace)), - quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname))); - } - - int expectedResult = isCitusTable ? SPI_OK_SELECT : SPI_OK_UTILITY; - if (SPI_execute(buf.data, false, 0) != expectedResult) - { - elog(ERROR, "SPI_exec failed: %s", buf.data); - } - - pfree(buf.data); -} - static char* DataChangeTriggerTruncateQueryString(char* insertTableName) { StringInfo queryBuf = makeStringInfo(); appendStringInfo(queryBuf, "TRUNCATE TABLE %s", insertTableName); @@ -977,7 +850,6 @@ static void CreateDependencyFromTriggersToView(Oid baseRelationId, List* trigger bool missingOk = false; HeapTuple triggerTuple = GetTriggerTupleById(triggerId, missingOk); Form_pg_trigger triggerForm = (Form_pg_trigger) GETSTRUCT(triggerTuple); - char* name = NameStr(triggerForm->tgname); if (namestrcmp(&triggerForm->tgname, targetTriggerName) == 0) { ObjectAddress triggerAddr = { .classId = TriggerRelationId, @@ -1035,17 +907,6 @@ DataChangeTriggerInsertDeleteQueryString(CimvCreate *cimvCreate, bool return queryBuf->data; } - -static void -DataChangeTriggerFunctionAppendErrorOnDelete(CimvCreate *cimvCreate, StringInfo buf) -{ - appendStringInfo(buf, - "RAISE EXCEPTION $ex$MATERIALIZED VIEW '%s' on table '%s' does not support UPDATE/DELETE$ex$;\n", - cimvCreate->userViewNameQuoted->relname, - cimvCreate->baseTableNameQuoted->relname); -} - - static void AppendOnConflict(CimvCreate *cimvCreate, StringInfo buf, bool isInsert) { @@ -1147,10 +1008,6 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent) cimvCreate->triggerNameList = lappend(cimvCreate->triggerNameList, pstrdup(triggerName.data)); MemoryContextSwitchTo(oldMemoryContext); - RangeVar *insertTable = cimvCreate->createOptions->schedule == NULL ? - cimvCreate->matTableNameQuoted : - cimvCreate->landingTableNameQuoted; - if (isCitusTable) { List* taskList = CreateTriggerTaskList(cimvCreate, triggerName.data, event, referencing); @@ -1174,7 +1031,7 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent) { appendStringInfo(&buf, "CREATE TRIGGER %s AFTER %s ON %s.%s %s " - "FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s($view_def$%s$view_def$, $prefix$%s$prefix$)", + "FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s($view_def$%s$view_def$, $prefix$%s$prefix$, $insert_table$%s$insert_table$)", quote_identifier(triggerName.data), event, cimvCreate->baseTableNameQuoted->schemaname, @@ -1184,7 +1041,8 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent) cimvCreate->formCimv->triggerfnnamespace)), quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname)), cimvCreate->queryString, - cimvCreate->prefix); + cimvCreate->prefix, + InsertTableName(cimvCreate->insertTable)); } int expectedResult = isCitusTable ? SPI_OK_SELECT : SPI_OK_UTILITY; @@ -1253,7 +1111,6 @@ InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *create } namestrcpy(&cimvCreate->formCimv->triggerfnnamespace, CIMV_INTERNAL_SCHEMA); - char* funcName = CIMVTriggerFuncName(cimvCreate->prefixId, stmt->into->rel->relname); namestrcpy(&cimvCreate->formCimv->triggerfnname, "cimv_trigger"); StringInfo mat = makeStringInfo(); appendStringInfo(mat, "%s_cimv_%s", cimvCreate->prefix, MATERIALIZATION_TABLE_SUFFIX); @@ -1301,12 +1158,6 @@ InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *create return cimvCreate; } -static char* CIMVTriggerFuncName(int prefixId, const char* relname) { - StringInfo funcName = makeStringInfo(); - appendStringInfo(funcName, "%s_%d",quote_identifier(relname), prefixId); - return funcName->data; -} - static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId) { if (baseTable->schemaname == NULL || baseTable->relname == NULL) { diff --git a/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql b/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql index 49afe7a73..8bab6eb91 100644 --- a/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql +++ b/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql @@ -18,10 +18,10 @@ CREATE FUNCTION cimv_internal.cimv_trigger() LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$cimv_trigger$$; -CREATE FUNCTION pg_catalog.worker_record_trigger_dependency(basetable_name regclass, inserttable_name regclass, trigger_name text) +CREATE FUNCTION pg_catalog.worker_record_trigger_dependency(basetable_name regclass, inserttable_name regclass, trigger_name text[]) RETURNS VOID LANGUAGE C STRICT AS 'MODULE_PATHNAME', 'worker_record_trigger_dependency'; -COMMENT ON FUNCTION pg_catalog.worker_record_trigger_dependency(regclass,regclass,text) +COMMENT ON FUNCTION pg_catalog.worker_record_trigger_dependency(regclass,regclass,text[]) IS 'record the fact that the trigger depends on the table in pg_depend'; \ No newline at end of file diff --git a/src/backend/distributed/utils/string_utils.c b/src/backend/distributed/utils/string_utils.c index de2e8ba38..ed45165cc 100644 --- a/src/backend/distributed/utils/string_utils.c +++ b/src/backend/distributed/utils/string_utils.c @@ -11,9 +11,13 @@ #include "postgres.h" +#include "utils/builtins.h" + +#include "distributed/listutils.h" #include "distributed/relay_utility.h" #include "distributed/string_utils.h" + /* * ConvertIntToString returns the string version of given integer. */ @@ -26,3 +30,19 @@ ConvertIntToString(int val) return str->data; } + +char* ConcatenateStringListWithDelimiter(List* stringList, char delimiter) { + StringInfo result = makeStringInfo(); + + bool isFirst = true; + char* string = NULL; + foreach_ptr(string, stringList) { + if (!isFirst) { + appendStringInfoChar(result, delimiter); + }else { + isFirst = false; + } + appendStringInfoString(result, quote_literal_cstr(string)); + } + return result->data; +} \ No newline at end of file diff --git a/src/include/distributed/string_utils.h b/src/include/distributed/string_utils.h index c43e63ce8..8f7675d9f 100644 --- a/src/include/distributed/string_utils.h +++ b/src/include/distributed/string_utils.h @@ -14,5 +14,6 @@ #include "postgres.h" extern char * ConvertIntToString(int val); +extern char* ConcatenateStringListWithDelimiter(List* stringList, char delimiter); #endif /* CITUS_STRING_UTILS_H */ diff --git a/src/test/regress/expected/cimv.out b/src/test/regress/expected/cimv.out index c35582906..61c181390 100644 --- a/src/test/regress/expected/cimv.out +++ b/src/test/regress/expected/cimv.out @@ -186,11 +186,10 @@ ORDER BY a, d_hour; (60 rows) DELETE FROM events WHERE b < 100; -ERROR: MATERIALIZED VIEW 'mv' on table 'events' does not support UPDATE/DELETE +ERROR: MATERIALIZED VIEW mv on table events does not support UPDATE/DELETE DROP VIEW mv; ERROR: DROP VIEW not supported for mv DROP MATERIALIZED VIEW mv; -NOTICE: drop cascades to 4 other objects CREATE MATERIALIZED VIEW mv WITH (citus.cimv) AS SELECT a, date_trunc('hour', d) AS d_hour, @@ -408,7 +407,6 @@ ORDER BY a, d_hour; (60 rows) DROP MATERIALIZED VIEW mv; -NOTICE: drop cascades to 4 other objects CREATE MATERIALIZED VIEW mv WITH (citus.cimv, citus.insertonlycapture) AS SELECT a, date_trunc('hour', d) AS d_hour, @@ -780,7 +778,6 @@ ORDER BY a, d_hour; (60 rows) DROP MATERIALIZED VIEW mv; -NOTICE: drop cascades to trigger mv_3_INSERT on table events CREATE MATERIALIZED VIEW mv WITH (citus.cimv, citus.insertonlycapture) AS SELECT a, date_trunc('hour', d) AS d_hour, @@ -794,7 +791,6 @@ SELECT * FROM mv; (0 rows) DROP MATERIALIZED VIEW mv; -NOTICE: drop cascades to trigger mv_4_INSERT on table events SELECT create_distributed_table('events', 'a'); NOTICE: Copying data from local table... NOTICE: copying the data has completed @@ -992,7 +988,7 @@ ORDER BY a, d_hour; (70 rows) DELETE FROM events WHERE b < 100; -ERROR: MATERIALIZED VIEW 'mv' on table 'events' does not support UPDATE/DELETE +ERROR: MATERIALIZED VIEW mv on table events_400000 does not support UPDATE/DELETE DROP VIEW mv; ERROR: DROP VIEW not supported for mv DROP MATERIALIZED VIEW mv; @@ -1697,7 +1693,6 @@ FROM events WHERE b > 10 GROUP BY a, d_hour; DROP MATERIALIZED VIEW mv; -NOTICE: drop cascades to 4 other objects -- test that another user can create CIMV as well CREATE USER new_user; NOTICE: not propagating CREATE ROLE/USER commands to worker nodes @@ -1721,6 +1716,5 @@ WHERE b > 10 GROUP BY a, d_hour; REFRESH MATERIALIZED VIEW mv; DROP MATERIALIZED VIEW mv CASCADE; -NOTICE: drop cascades to 4 other objects SET client_min_messages TO WARNING; -- suppress cascade messages DROP SCHEMA cimv CASCADE;