Improve code quality

cimv
Sait Talha Nisanci 2021-01-21 19:14:34 +03:00
parent 24551ccb30
commit 09abf75e5a
5 changed files with 70 additions and 204 deletions

View File

@ -23,6 +23,7 @@
#include "distributed/metadata_cache.h"
#include "distributed/security_utils.h"
#include "distributed/sequence_utils.h"
#include "distributed/string_utils.h"
#include "distributed/worker_protocol.h"
#include "distributed/multi_executor.h"
#include "distributed/coordinator_protocol.h"
@ -89,13 +90,10 @@ static void CreateIndexOnMatTable(CimvCreate *cimvCreate);
static void DistributeTable(CimvCreate *cimvCreate, RangeVar *tableName);
static void CreateUserView(CimvCreate *cimvCreate);
static void CreateRefreshView(CimvCreate *cimvCreate);
static void CreateDataChangeTriggerFunction(CimvCreate *cimvCreate);
static void CreateCronJob(CimvCreate *cimvCreate);
static char* DataChangeTriggerInsertDeleteQueryString(CimvCreate *cimvCreate, bool isInsert,
char* insertTableName, char* newTableName);
static char* DataChangeTriggerTruncateQueryString(char* insertTableName);
static void DataChangeTriggerFunctionAppendErrorOnDelete(CimvCreate *cimvCreate,
StringInfo buf);
static void AppendOnConflict(CimvCreate *cimvCreate, StringInfo buf, bool isInsert);
static void CreateDataChangeTriggers(CimvCreate *cimvCreate);
static void CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent);
@ -116,9 +114,7 @@ static Oid PartialAggOid(void);
static void AppendQuotedLiteral(StringInfo buf, const char *val);
static void AppendStringInfoFunction(StringInfo buf, Oid fnoid);
static Oid AggregateFunctionOid(const char *functionName, Oid inputType);
static char* CIMVTriggerFuncName(int prefixId, const char* relname);
static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId);
static void AlterTableOwner(RangeVar* tableName, char* ownerName);
static void CreateDependencyFromTriggersToView(Oid baseRelationId, List* triggerNameList, Oid userViewId);
@ -127,40 +123,54 @@ static char* InsertTableName(RangeVar* insertTable);
static void CreateDependenciesFromTriggersToView(CimvCreate* cimvCreate);
static void CheckSPIResultForColocatedRun(void);
static char* CreateViewCommandForShard(CimvCreate* cimvCreate, char* shardViewQueryDef);
static List* ReadTriggerNameList(ArrayType *triggerNameArrayObject);
PG_FUNCTION_INFO_V1(cimv_trigger);
PG_FUNCTION_INFO_V1(worker_record_trigger_dependency);
Datum
worker_record_trigger_dependency(PG_FUNCTION_ARGS)
Datum worker_record_trigger_dependency(PG_FUNCTION_ARGS)
{
Oid relationOid = PG_GETARG_OID(0);
Oid userViewOid = PG_GETARG_OID(1);
char* triggerName = text_to_cstring(PG_GETARG_TEXT_P(2));
CreateDependencyFromTriggersToView(relationOid, list_make1(triggerName), userViewOid);
ArrayType *triggerNameArrayObject = PG_GETARG_ARRAYTYPE_P(2);
List* triggerNameList = ReadTriggerNameList(triggerNameArrayObject);
CreateDependencyFromTriggersToView(relationOid, triggerNameList, userViewOid);
PG_RETURN_VOID();
}
static List* ReadTriggerNameList(ArrayType *triggerNameArrayObject) {
int triggerCount = ArrayObjectCount(triggerNameArrayObject);
Datum *triggerNameDatumArray = DeconstructArrayObject(triggerNameArrayObject);
List* triggerNameList = NIL;
for (int i = 0; i < triggerCount; i++)
{
text* triggerName = (text*) DatumGetPointer(triggerNameDatumArray[i]);
triggerNameList = lappend(triggerNameList, text_to_cstring(triggerName));
}
return triggerNameList;
}
Datum cimv_trigger(PG_FUNCTION_ARGS)
{
/* make sure it's called as a trigger at all */
if (!CALLED_AS_TRIGGER(fcinfo))
elog(ERROR, "trigf: not called by trigger manager");
TriggerData *trigdata = (TriggerData *) fcinfo->context;
Trigger* trigger = trigdata->tg_trigger;
HeapTuple rettuple;
if (trigger == NULL || trigger->tgnargs < 2) {
elog(ERROR, "cimv_trigger: should be called with at least two arguments");
if (trigger == NULL || trigger->tgnargs < 3) {
elog(ERROR, "cimv_trigger: should be called with at least 3 arguments");
}
List* queryStringList = NIL;
char* createViewQueryString = trigger->tgargs[0];
char* prefix = trigger->tgargs[1];
char* insertTableName = trigger->tgargs[2];
CreateTableAsStmt* stmt = ParseQueryStringToCreateTableAsStmt(createViewQueryString);
MatViewCreateOptions *options = GetMatViewCreateOptions(stmt);
@ -169,16 +179,6 @@ Datum cimv_trigger(PG_FUNCTION_ARGS)
any fields in a validation */
ValidateCimv(cimvCreate);
char* insertTableName = NULL;
if (trigger->tgnargs >= 3) {
insertTableName = trigger->tgargs[2];
}else {
RangeVar *insertTable = cimvCreate->createOptions->schedule == NULL ?
cimvCreate->matTableNameQuoted :
cimvCreate->landingTableNameQuoted;
insertTableName = InsertTableName(insertTable);
}
if (TRIGGER_FOR_INSERT(trigger->tgtype) || TRIGGER_FOR_UPDATE(trigger->tgtype)) {
char* newTableName = trigger->tgnewtable;
char* insertDeleteQueryString =
@ -209,15 +209,6 @@ Datum cimv_trigger(PG_FUNCTION_ARGS)
queryStringList = lappend(queryStringList, truncateQueryString);
}
/* tuple to return to executor */
if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) {
rettuple = trigdata->tg_newtuple;
}
else {
rettuple = trigdata->tg_trigtuple;
}
/* connect to SPI manager */
if ( SPI_connect() < 0)
elog(ERROR, "cimv_trigger: could not connect to SPI");
@ -239,8 +230,7 @@ Datum cimv_trigger(PG_FUNCTION_ARGS)
elog(ERROR, "SPI_finish failed");
}
return PointerGetDatum(rettuple);
PG_RETURN_VOID();
}
static CreateTableAsStmt* ParseQueryStringToCreateTableAsStmt(const char* queryString) {
@ -281,15 +271,8 @@ CreateTriggerTaskList(CimvCreate* cimvCreate, char* triggerName, char* event, ch
uint64 jobId = INVALID_JOB_ID;
int taskId = 1;
Oid leftSchemaId = get_rel_namespace(leftRelationId);
char* qualifiedLeftRelName = generate_qualified_relation_name(leftRelationId);
char *leftSchemaName = get_namespace_name(leftSchemaId);
char *escapedLeftSchemaName = quote_literal_cstr(leftSchemaName);
Oid rightSchemaId = get_rel_namespace(rightRelationId);
char* qualifiedRightRelName = generate_qualified_relation_name(rightRelationId);
char *rightSchemaName = get_namespace_name(rightSchemaId);
char *escapedRightSchemaName = quote_literal_cstr(rightSchemaName);
List *taskList = NIL;
@ -414,7 +397,6 @@ CreateCimv(CimvCreate *cimvCreate)
CreateUserView(cimvCreate);
CreateRefreshView(cimvCreate);
// CreateDataChangeTriggerFunction(cimvCreate);
CreateDataChangeTriggers(cimvCreate);
InsertIntoPgCimv(cimvCreate->formCimv);
@ -440,27 +422,27 @@ static void CreateDependenciesFromTriggersToView(CimvCreate* cimvCreate) {
}
SPI_commit();
SPI_start_transaction();
char* triggerName = NULL;
foreach_ptr(triggerName, cimvCreate->triggerNameList) {
StringInfo queryBuf = makeStringInfo();
appendStringInfo(queryBuf,
"SELECT bool_and(success), max(result) FROM run_command_on_colocated_placements($param$%s.%s$param$, $param$%s.%s$param$, $create_dependency$ ",
cimvCreate->baseTableNameQuoted->schemaname,
cimvCreate->baseTableNameQuoted->relname,
cimvCreate->insertTable->schemaname,
cimvCreate->insertTable->relname);
char* concatenatedTriggerNames = ConcatenateStringListWithDelimiter(cimvCreate->triggerNameList, ',');
StringInfo queryBuf = makeStringInfo();
appendStringInfo(queryBuf,
"SELECT bool_and(success), max(result) FROM run_command_on_colocated_placements($param$%s.%s$param$, $param$%s.%s$param$, $create_dependency$ ",
cimvCreate->baseTableNameQuoted->schemaname,
cimvCreate->baseTableNameQuoted->relname,
cimvCreate->insertTable->schemaname,
cimvCreate->insertTable->relname);
appendStringInfo(queryBuf,
"SELECT worker_record_trigger_dependency($base_table$%%s$base_table$, $insert_table$%%s$insert_table$, $trigger_name$%s$trigger_name$)",
triggerName);
appendStringInfo(queryBuf,
"SELECT worker_record_trigger_dependency($base_table$%%s$base_table$, $insert_table$%%s$insert_table$, ARRAY[%s])",
concatenatedTriggerNames);
appendStringInfoString(queryBuf, "$create_dependency$);");
if (SPI_execute(queryBuf->data, false, 0) != SPI_OK_SELECT)
{
elog(ERROR, "SPI_exec failed: %s", queryBuf->data);
}
CheckSPIResultForColocatedRun();
appendStringInfoString(queryBuf, "$create_dependency$);");
if (SPI_execute(queryBuf->data, false, 0) != SPI_OK_SELECT)
{
elog(ERROR, "SPI_exec failed: %s", queryBuf->data);
}
CheckSPIResultForColocatedRun();
if (SPI_finish() != SPI_OK_FINISH)
{
elog(ERROR, "SPI_finish failed");
@ -472,7 +454,6 @@ static void CreateDependenciesFromTriggersToView(CimvCreate* cimvCreate) {
}
static void
CreateMatTable(CimvCreate *cimvCreate, bool isLandingZone)
{
@ -643,25 +624,6 @@ DistributeTable(CimvCreate *cimvCreate, RangeVar *tableName)
pfree(querybuf.data);
}
static void AlterTableOwner(RangeVar* tableName, char* ownerName) {
StringInfoData querybuf;
initStringInfo(&querybuf);
appendStringInfo(&querybuf,
"ALTER TABLE %s.%s OWNER TO %s;",
tableName->schemaname ? tableName->schemaname : "public",
tableName->relname,
ownerName);
if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY)
{
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
}
pfree(querybuf.data);
}
static void
CreateUserView(CimvCreate *cimvCreate)
{
@ -870,95 +832,6 @@ CreateCronJob(CimvCreate *cimvCreate)
pfree(queryText.data);
}
static void
CreateDataChangeTriggerFunction(CimvCreate *cimvCreate)
{
StringInfoData buf;
initStringInfo(&buf);
bool isCitusTable = cimvCreate->citusTable != NULL;
if (isCitusTable)
{
appendStringInfo(&buf,
"SELECT * FROM run_command_on_workers($cmd$CREATE FUNCTION %s.%s() RETURNS trigger AS $$ BEGIN RETURN null; END; $$ LANGUAGE plpgsql$cmd$)",
quote_identifier(NameStr(
cimvCreate->formCimv->triggerfnnamespace)),
quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname)));
if (SPI_execute(buf.data, false, 0) != SPI_OK_SELECT)
{
elog(ERROR, "SPI_exec failed: %s", buf.data);
}
resetStringInfo(&buf);
}
appendStringInfo(&buf,
"CREATE OR REPLACE FUNCTION %s.%s() RETURNS TRIGGER AS $trigger_function$ BEGIN\n",
quote_identifier(NameStr(cimvCreate->formCimv->triggerfnnamespace)),
quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname)));
/* INSERT */
appendStringInfoString(&buf,
"IF (TG_OP = $inside_trigger_function$INSERT$inside_trigger_function$ OR TG_OP = $inside_trigger_function$UPDATE$inside_trigger_function$) THEN\n");
// DataChangeTriggerInsertDeleteQueryString(cimvCreate, &buf, true);
appendStringInfoString(&buf, "END IF;\n");
/* DELETE */
appendStringInfoString(&buf,
"IF (TG_OP = $inside_trigger_function$DELETE$inside_trigger_function$ OR TG_OP = $inside_trigger_function$UPDATE$inside_trigger_function$) THEN\n");
if (cimvCreate->supportsDelete)
{
// DataChangeTriggerInsertDeleteQueryString(cimvCreate, &buf, false);
}
else
{
DataChangeTriggerFunctionAppendErrorOnDelete(cimvCreate, &buf);
}
appendStringInfoString(&buf, "END IF;\n");
/* TRUNCATE */
appendStringInfoString(&buf,
"IF (TG_OP = $inside_trigger_function$TRUNCATE$inside_trigger_function$) THEN\n");
appendStringInfoString(&buf,
"EXECUTE format($exec_format$TRUNCATE TABLE %s; $exec_format$, TG_ARGV[0]);");
/* TODO: also truncate landing table if it exists */
appendStringInfoString(&buf, "END IF;\n");
appendStringInfoString(&buf,
"RETURN NULL; END; $trigger_function$ LANGUAGE plpgsql;");
/* CREATE */
if (SPI_execute(buf.data, false, 0) != SPI_OK_UTILITY)
{
elog(ERROR, "SPI_exec failed: %s", buf.data);
}
if (isCitusTable)
{
resetStringInfo(&buf);
appendStringInfo(&buf,
"SELECT FROM create_distributed_function($cdfn$%s.%s()$cdfn$)",
quote_identifier(NameStr(
cimvCreate->formCimv->triggerfnnamespace)),
quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname)));
}
int expectedResult = isCitusTable ? SPI_OK_SELECT : SPI_OK_UTILITY;
if (SPI_execute(buf.data, false, 0) != expectedResult)
{
elog(ERROR, "SPI_exec failed: %s", buf.data);
}
pfree(buf.data);
}
static char* DataChangeTriggerTruncateQueryString(char* insertTableName) {
StringInfo queryBuf = makeStringInfo();
appendStringInfo(queryBuf, "TRUNCATE TABLE %s", insertTableName);
@ -977,7 +850,6 @@ static void CreateDependencyFromTriggersToView(Oid baseRelationId, List* trigger
bool missingOk = false;
HeapTuple triggerTuple = GetTriggerTupleById(triggerId, missingOk);
Form_pg_trigger triggerForm = (Form_pg_trigger) GETSTRUCT(triggerTuple);
char* name = NameStr(triggerForm->tgname);
if (namestrcmp(&triggerForm->tgname, targetTriggerName) == 0) {
ObjectAddress triggerAddr = {
.classId = TriggerRelationId,
@ -1035,17 +907,6 @@ DataChangeTriggerInsertDeleteQueryString(CimvCreate *cimvCreate, bool
return queryBuf->data;
}
static void
DataChangeTriggerFunctionAppendErrorOnDelete(CimvCreate *cimvCreate, StringInfo buf)
{
appendStringInfo(buf,
"RAISE EXCEPTION $ex$MATERIALIZED VIEW '%s' on table '%s' does not support UPDATE/DELETE$ex$;\n",
cimvCreate->userViewNameQuoted->relname,
cimvCreate->baseTableNameQuoted->relname);
}
static void
AppendOnConflict(CimvCreate *cimvCreate, StringInfo buf, bool isInsert)
{
@ -1147,10 +1008,6 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent)
cimvCreate->triggerNameList = lappend(cimvCreate->triggerNameList, pstrdup(triggerName.data));
MemoryContextSwitchTo(oldMemoryContext);
RangeVar *insertTable = cimvCreate->createOptions->schedule == NULL ?
cimvCreate->matTableNameQuoted :
cimvCreate->landingTableNameQuoted;
if (isCitusTable)
{
List* taskList = CreateTriggerTaskList(cimvCreate, triggerName.data, event, referencing);
@ -1174,7 +1031,7 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent)
{
appendStringInfo(&buf,
"CREATE TRIGGER %s AFTER %s ON %s.%s %s "
"FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s($view_def$%s$view_def$, $prefix$%s$prefix$)",
"FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s($view_def$%s$view_def$, $prefix$%s$prefix$, $insert_table$%s$insert_table$)",
quote_identifier(triggerName.data),
event,
cimvCreate->baseTableNameQuoted->schemaname,
@ -1184,7 +1041,8 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent)
cimvCreate->formCimv->triggerfnnamespace)),
quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname)),
cimvCreate->queryString,
cimvCreate->prefix);
cimvCreate->prefix,
InsertTableName(cimvCreate->insertTable));
}
int expectedResult = isCitusTable ? SPI_OK_SELECT : SPI_OK_UTILITY;
@ -1253,7 +1111,6 @@ InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *create
}
namestrcpy(&cimvCreate->formCimv->triggerfnnamespace, CIMV_INTERNAL_SCHEMA);
char* funcName = CIMVTriggerFuncName(cimvCreate->prefixId, stmt->into->rel->relname);
namestrcpy(&cimvCreate->formCimv->triggerfnname, "cimv_trigger");
StringInfo mat = makeStringInfo();
appendStringInfo(mat, "%s_cimv_%s", cimvCreate->prefix, MATERIALIZATION_TABLE_SUFFIX);
@ -1301,12 +1158,6 @@ InitializeCimvCreate(const CreateTableAsStmt *stmt, MatViewCreateOptions *create
return cimvCreate;
}
static char* CIMVTriggerFuncName(int prefixId, const char* relname) {
StringInfo funcName = makeStringInfo();
appendStringInfo(funcName, "%s_%d",quote_identifier(relname), prefixId);
return funcName->data;
}
static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId) {
if (baseTable->schemaname == NULL || baseTable->relname == NULL) {

View File

@ -18,10 +18,10 @@ CREATE FUNCTION cimv_internal.cimv_trigger()
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$cimv_trigger$$;
CREATE FUNCTION pg_catalog.worker_record_trigger_dependency(basetable_name regclass, inserttable_name regclass, trigger_name text)
CREATE FUNCTION pg_catalog.worker_record_trigger_dependency(basetable_name regclass, inserttable_name regclass, trigger_name text[])
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', 'worker_record_trigger_dependency';
COMMENT ON FUNCTION pg_catalog.worker_record_trigger_dependency(regclass,regclass,text)
COMMENT ON FUNCTION pg_catalog.worker_record_trigger_dependency(regclass,regclass,text[])
IS 'record the fact that the trigger depends on the table in pg_depend';

View File

@ -11,9 +11,13 @@
#include "postgres.h"
#include "utils/builtins.h"
#include "distributed/listutils.h"
#include "distributed/relay_utility.h"
#include "distributed/string_utils.h"
/*
* ConvertIntToString returns the string version of given integer.
*/
@ -26,3 +30,19 @@ ConvertIntToString(int val)
return str->data;
}
char* ConcatenateStringListWithDelimiter(List* stringList, char delimiter) {
StringInfo result = makeStringInfo();
bool isFirst = true;
char* string = NULL;
foreach_ptr(string, stringList) {
if (!isFirst) {
appendStringInfoChar(result, delimiter);
}else {
isFirst = false;
}
appendStringInfoString(result, quote_literal_cstr(string));
}
return result->data;
}

View File

@ -14,5 +14,6 @@
#include "postgres.h"
extern char * ConvertIntToString(int val);
extern char* ConcatenateStringListWithDelimiter(List* stringList, char delimiter);
#endif /* CITUS_STRING_UTILS_H */

View File

@ -186,11 +186,10 @@ ORDER BY a, d_hour;
(60 rows)
DELETE FROM events WHERE b < 100;
ERROR: MATERIALIZED VIEW 'mv' on table 'events' does not support UPDATE/DELETE
ERROR: MATERIALIZED VIEW mv on table events does not support UPDATE/DELETE
DROP VIEW mv;
ERROR: DROP VIEW not supported for mv
DROP MATERIALIZED VIEW mv;
NOTICE: drop cascades to 4 other objects
CREATE MATERIALIZED VIEW mv WITH (citus.cimv) AS
SELECT a,
date_trunc('hour', d) AS d_hour,
@ -408,7 +407,6 @@ ORDER BY a, d_hour;
(60 rows)
DROP MATERIALIZED VIEW mv;
NOTICE: drop cascades to 4 other objects
CREATE MATERIALIZED VIEW mv WITH (citus.cimv, citus.insertonlycapture) AS
SELECT a,
date_trunc('hour', d) AS d_hour,
@ -780,7 +778,6 @@ ORDER BY a, d_hour;
(60 rows)
DROP MATERIALIZED VIEW mv;
NOTICE: drop cascades to trigger mv_3_INSERT on table events
CREATE MATERIALIZED VIEW mv WITH (citus.cimv, citus.insertonlycapture) AS
SELECT a,
date_trunc('hour', d) AS d_hour,
@ -794,7 +791,6 @@ SELECT * FROM mv;
(0 rows)
DROP MATERIALIZED VIEW mv;
NOTICE: drop cascades to trigger mv_4_INSERT on table events
SELECT create_distributed_table('events', 'a');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
@ -992,7 +988,7 @@ ORDER BY a, d_hour;
(70 rows)
DELETE FROM events WHERE b < 100;
ERROR: MATERIALIZED VIEW 'mv' on table 'events' does not support UPDATE/DELETE
ERROR: MATERIALIZED VIEW mv on table events_400000 does not support UPDATE/DELETE
DROP VIEW mv;
ERROR: DROP VIEW not supported for mv
DROP MATERIALIZED VIEW mv;
@ -1697,7 +1693,6 @@ FROM events
WHERE b > 10
GROUP BY a, d_hour;
DROP MATERIALIZED VIEW mv;
NOTICE: drop cascades to 4 other objects
-- test that another user can create CIMV as well
CREATE USER new_user;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
@ -1721,6 +1716,5 @@ WHERE b > 10
GROUP BY a, d_hour;
REFRESH MATERIALIZED VIEW mv;
DROP MATERIALIZED VIEW mv CASCADE;
NOTICE: drop cascades to 4 other objects
SET client_min_messages TO WARNING; -- suppress cascade messages
DROP SCHEMA cimv CASCADE;