wip: working version of c as a trigger function

cimv
Sait Talha Nisanci 2021-01-20 19:41:38 +03:00
parent d5c14f3b0b
commit 24551ccb30
2 changed files with 159 additions and 90 deletions

View File

@ -17,6 +17,7 @@
#include "commands/view.h"
#include "distributed/commands.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/pg_cimv.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
@ -25,6 +26,7 @@
#include "distributed/worker_protocol.h"
#include "distributed/multi_executor.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/resource_lock.h"
#include "executor/spi.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
@ -119,16 +121,16 @@ static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId);
static void AlterTableOwner(RangeVar* tableName, char* ownerName);
static void CreateDependencyFromTriggersToView(Oid baseRelationId, List* triggerNameList, Oid userViewId);
static CreateTableAsStmt* ParseQueryStringToCreateTableAsStmt(const char* queryString);
static char* InsertTableName(RangeVar* insertTable);
static void CreateDependenciesFromTriggersToView(CimvCreate* cimvCreate);
static void CheckSPIResultForColocatedRun(void);
static char* CreateViewCommandForShard(CimvCreate* cimvCreate, char* shardViewQueryDef);
PG_FUNCTION_INFO_V1(cimv_trigger);
PG_FUNCTION_INFO_V1(worker_record_trigger_dependency);
static char *str_replace(char *orig, char *rep, char *with);
Datum
worker_record_trigger_dependency(PG_FUNCTION_ARGS)
{
@ -150,9 +152,7 @@ Datum cimv_trigger(PG_FUNCTION_ARGS)
TriggerData *trigdata = (TriggerData *) fcinfo->context;
Trigger* trigger = trigdata->tg_trigger;
TupleDesc tupdesc;
HeapTuple rettuple;
char *when;
if (trigger == NULL || trigger->tgnargs < 2) {
elog(ERROR, "cimv_trigger: should be called with at least two arguments");
}
@ -162,18 +162,6 @@ Datum cimv_trigger(PG_FUNCTION_ARGS)
char* createViewQueryString = trigger->tgargs[0];
char* prefix = trigger->tgargs[1];
if (trigger->tgnargs >= 3) {
char* baseTableName = trigger->tgargs[2];
char* relName = get_rel_name(trigdata->tg_relation->rd_id);
StringInfo first = makeStringInfo();
appendStringInfo(first, "FROM %s", baseTableName);
StringInfo second = makeStringInfo();
appendStringInfo(second, "FROM %s", relName);
createViewQueryString = str_replace(createViewQueryString, first->data, second->data);
}
CreateTableAsStmt* stmt = ParseQueryStringToCreateTableAsStmt(createViewQueryString);
MatViewCreateOptions *options = GetMatViewCreateOptions(stmt);
CimvCreate* cimvCreate = InitializeCimvCreate(stmt, options, NULL, prefix);
@ -182,8 +170,8 @@ Datum cimv_trigger(PG_FUNCTION_ARGS)
ValidateCimv(cimvCreate);
char* insertTableName = NULL;
if (trigger->tgnargs >= 4) {
insertTableName = trigger->tgargs[3];
if (trigger->tgnargs >= 3) {
insertTableName = trigger->tgargs[2];
}else {
RangeVar *insertTable = cimvCreate->createOptions->schedule == NULL ?
cimvCreate->matTableNameQuoted :
@ -230,7 +218,6 @@ Datum cimv_trigger(PG_FUNCTION_ARGS)
rettuple = trigdata->tg_trigtuple;
}
tupdesc = trigdata->tg_relation->rd_att;
/* connect to SPI manager */
if ( SPI_connect() < 0)
elog(ERROR, "cimv_trigger: could not connect to SPI");
@ -256,53 +243,6 @@ Datum cimv_trigger(PG_FUNCTION_ARGS)
return PointerGetDatum(rettuple);
}
char *str_replace(char *orig, char *rep, char *with) {
char *result; // the return string
char *ins; // the next insert point
char *tmp; // varies
int len_rep; // length of rep (the string to remove)
int len_with; // length of with (the string to replace rep with)
int len_front; // distance between rep and end of last rep
int count; // number of replacements
// sanity checks and initialization
if (!orig || !rep)
return NULL;
len_rep = strlen(rep);
if (len_rep == 0)
return NULL; // empty rep causes infinite loop during count
if (!with)
with = "";
len_with = strlen(with);
// count the number of replacements needed
ins = orig;
for (count = 0; tmp = strstr(ins, rep); ++count) {
ins = tmp + len_rep;
}
tmp = result = malloc(strlen(orig) + (len_with - len_rep) * count + 1);
if (!result)
return NULL;
// first time through the loop, all the variable are set correctly
// from here on,
// tmp points to the end of the result string
// ins points to the next occurrence of rep in orig
// orig points to the remainder of orig after "end of rep"
while (count--) {
ins = strstr(orig, rep);
len_front = ins - orig;
tmp = strncpy(tmp, orig, len_front) + len_front;
tmp = strcpy(tmp, with) + len_with;
orig += len_front + len_rep; // move to next "end of rep"
}
strcpy(tmp, orig);
return result;
}
static CreateTableAsStmt* ParseQueryStringToCreateTableAsStmt(const char* queryString) {
Query *query = ParseQueryString(queryString, NULL, 0);
int cursorOptions = 0;
@ -323,6 +263,102 @@ static char* InsertTableName(RangeVar* insertTable) {
return stringInfo->data;
}
static List *
CreateTriggerTaskList(CimvCreate* cimvCreate, char* triggerName, char* event, char* referencing)
{
Oid leftRelationId = cimvCreate->formCimv->basetable;
Oid rightRelationId = cimvCreate->createOptions->schedule == NULL ?
cimvCreate->formCimv->mattable :
cimvCreate->formCimv->landingtable;
List *leftShardList = LoadShardIntervalList(leftRelationId);
List *rightShardList = LoadShardIntervalList(rightRelationId);
/* lock metadata before getting placement lists */
LockShardListMetadata(leftShardList, ShareLock);
uint64 jobId = INVALID_JOB_ID;
int taskId = 1;
Oid leftSchemaId = get_rel_namespace(leftRelationId);
char* qualifiedLeftRelName = generate_qualified_relation_name(leftRelationId);
char *leftSchemaName = get_namespace_name(leftSchemaId);
char *escapedLeftSchemaName = quote_literal_cstr(leftSchemaName);
Oid rightSchemaId = get_rel_namespace(rightRelationId);
char* qualifiedRightRelName = generate_qualified_relation_name(rightRelationId);
char *rightSchemaName = get_namespace_name(rightSchemaId);
char *escapedRightSchemaName = quote_literal_cstr(rightSchemaName);
List *taskList = NIL;
Query* originalUserView = (Query*)cimvCreate->stmt->query;
RangeTblEntry* baseRte = (RangeTblEntry*) linitial(originalUserView->rtable);
ListCell *leftShardCell = NULL;
ListCell *rightShardCell = NULL;
forboth(leftShardCell, leftShardList, rightShardCell, rightShardList)
{
ShardInterval *leftShardInterval = (ShardInterval *) lfirst(leftShardCell);
ShardInterval *rightShardInterval = (ShardInterval *) lfirst(rightShardCell);
uint64 leftShardId = leftShardInterval->shardId;
uint64 rightShardId = rightShardInterval->shardId;
char* insertShardTable = pstrdup(qualifiedRightRelName);
AppendShardIdToName(&insertShardTable, rightShardId);
char* baseShardTable = pstrdup(qualifiedLeftRelName);
AppendShardIdToName(&baseShardTable, leftShardId);
StringInfo userViewBuf = makeStringInfo();
deparse_shard_query(originalUserView, baseRte->relid, leftShardId, userViewBuf);
char* queryString = CreateViewCommandForShard(cimvCreate, userViewBuf->data);
StringInfo applyCommand = makeStringInfo();
appendStringInfo(applyCommand,
"CREATE TRIGGER %s AFTER %s ON %s %s "
"FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s(%s, %s, %s )",
quote_identifier(triggerName),
event,
baseShardTable,
referencing,
quote_identifier(NameStr(
cimvCreate->formCimv->triggerfnnamespace)),
quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname)),
quote_literal_cstr(queryString),
quote_literal_cstr(cimvCreate->prefix),
quote_literal_cstr(insertShardTable)
);
Task *task = CitusMakeNode(Task);
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = DDL_TASK;
SetTaskQueryString(task, applyCommand->data);
task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = leftShardId;
task->taskPlacementList = ActiveShardPlacementList(leftShardId);
RelationShard *leftRelationShard = CitusMakeNode(RelationShard);
leftRelationShard->relationId = leftShardInterval->relationId;
leftRelationShard->shardId = leftShardInterval->shardId;
RelationShard *rightRelationShard = CitusMakeNode(RelationShard);
rightRelationShard->relationId = rightShardInterval->relationId;
rightRelationShard->shardId = rightShardInterval->shardId;
task->relationShardList = list_make2(leftRelationShard, rightRelationShard);
taskList = lappend(taskList, task);
}
return taskList;
}
bool
ProcessCreateMaterializedViewStmt(const CreateTableAsStmt *stmt, const char *query_string,
PlannedStmt *pstmt)
@ -398,10 +434,12 @@ CreateCimv(CimvCreate *cimvCreate)
static void CreateDependenciesFromTriggersToView(CimvCreate* cimvCreate) {
if (cimvCreate->citusTable) {
if (SPI_connect() != SPI_OK_CONNECT)
if (SPI_connect_ext(SPI_OPT_NONATOMIC) != SPI_OK_CONNECT)
{
elog(ERROR, "SPI_connect failed");
}
SPI_commit();
SPI_start_transaction();
char* triggerName = NULL;
foreach_ptr(triggerName, cimvCreate->triggerNameList) {
StringInfo queryBuf = makeStringInfo();
@ -1115,27 +1153,22 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent)
if (isCitusTable)
{
appendStringInfo(&buf,
"SELECT bool_and(success), max(result) FROM run_command_on_colocated_placements($param$%s.%s$param$, $param$%s.%s$param$, $create_trigger$ ",
cimvCreate->baseTableNameQuoted->schemaname,
cimvCreate->baseTableNameQuoted->relname,
insertTable->schemaname,
insertTable->relname);
List* taskList = CreateTriggerTaskList(cimvCreate, triggerName.data, event, referencing);
TransactionProperties xactProperties = {
.errorOnAnyFailure = true,
.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_ALLOWED,
.requires2PC = false
};
appendStringInfo(&buf,
"CREATE TRIGGER %s AFTER %s ON %%s %s "
"FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s($view_def$%s$view_def$, $prefix$%s$prefix$, $baserel$%s$baserel$, %%L )",
quote_identifier(triggerName.data),
event,
referencing,
quote_identifier(NameStr(
cimvCreate->formCimv->triggerfnnamespace)),
quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname)),
cimvCreate->queryString,
cimvCreate->prefix,
cimvCreate->baseTableNameQuoted->relname);
appendStringInfoString(&buf, "$create_trigger$);");
bool localExecutionSupported = true;
ExecutionParams *executionParams = CreateBasicExecutionParams(
ROW_MODIFY_NONE, taskList, MaxAdaptiveExecutorPoolSize,
localExecutionSupported
);
executionParams->xactProperties = xactProperties;
executionParams->isUtilityCommand = true;
ExecuteTaskListExtended(executionParams);
return;
}
else
{
@ -1337,6 +1370,44 @@ GetMatViewCreateOptions(const CreateTableAsStmt *stmt)
return result;
}
static char* CreateViewCommandForShard(CimvCreate* cimvCreate, char* shardViewQueryDef) {
StringInfo command = makeStringInfo();
appendStringInfo(command, "CREATE MATERIALIZED VIEW");
if (cimvCreate->stmt->if_not_exists) {
appendStringInfo(command, " IF NOT EXISTS");
}
RangeVar* rel = cimvCreate->stmt->into->rel;
char* qualifiedRelName = quote_qualified_identifier(rel->schemaname, rel->relname);
appendStringInfo(command, " %s WITH(", qualifiedRelName);
bool isFirst = true;
DefElem* def = NULL;
foreach_ptr(def, cimvCreate->stmt->into->options)
{
if (def->defnamespace != NULL && pg_strcasecmp(def->defnamespace,
CITUS_NAMESPACE) == 0)
{
if (!isFirst) {
appendStringInfoChar(command, ',');
}else {
isFirst = false;
}
if (defGetBoolean(def)) {
appendStringInfo(command, "citus.%s", def->defname);
}
}
}
appendStringInfo(command, ") AS %s", shardViewQueryDef);
if (cimvCreate->stmt->into->skipData) {
appendStringInfo(command, " WITH NO DATA;");
}else {
appendStringInfo(command, " WITH DATA;");
}
return command->data;
}
static ObjectAddress
DefineVirtualRelation(RangeVar *relation, List *tlist, Query *viewParse)

View File

@ -461,9 +461,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
bool continueProcessing = true;
if (IsA(parsetree, CreateTableAsStmt))
{
StringInfo buf = makeStringInfo();
pg_get_query_def(parsetree, buf);
elog(WARNING, "%s", buf->data);
continueProcessing = !ProcessCreateMaterializedViewStmt((const
CreateTableAsStmt *)
parsetree, queryString,