diff --git a/src/backend/distributed/cimv/create.c b/src/backend/distributed/cimv/create.c index 73fd21f93..0a25af7b9 100644 --- a/src/backend/distributed/cimv/create.c +++ b/src/backend/distributed/cimv/create.c @@ -17,6 +17,7 @@ #include "commands/view.h" #include "distributed/commands.h" #include "distributed/citus_ruleutils.h" +#include "distributed/deparse_shard_query.h" #include "distributed/pg_cimv.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" @@ -25,6 +26,7 @@ #include "distributed/worker_protocol.h" #include "distributed/multi_executor.h" #include "distributed/coordinator_protocol.h" +#include "distributed/resource_lock.h" #include "executor/spi.h" #include "miscadmin.h" #include "nodes/makefuncs.h" @@ -119,16 +121,16 @@ static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId); static void AlterTableOwner(RangeVar* tableName, char* ownerName); static void CreateDependencyFromTriggersToView(Oid baseRelationId, List* triggerNameList, Oid userViewId); + static CreateTableAsStmt* ParseQueryStringToCreateTableAsStmt(const char* queryString); static char* InsertTableName(RangeVar* insertTable); static void CreateDependenciesFromTriggersToView(CimvCreate* cimvCreate); static void CheckSPIResultForColocatedRun(void); +static char* CreateViewCommandForShard(CimvCreate* cimvCreate, char* shardViewQueryDef); PG_FUNCTION_INFO_V1(cimv_trigger); PG_FUNCTION_INFO_V1(worker_record_trigger_dependency); -static char *str_replace(char *orig, char *rep, char *with); - Datum worker_record_trigger_dependency(PG_FUNCTION_ARGS) { @@ -150,9 +152,7 @@ Datum cimv_trigger(PG_FUNCTION_ARGS) TriggerData *trigdata = (TriggerData *) fcinfo->context; Trigger* trigger = trigdata->tg_trigger; - TupleDesc tupdesc; HeapTuple rettuple; - char *when; if (trigger == NULL || trigger->tgnargs < 2) { elog(ERROR, "cimv_trigger: should be called with at least two arguments"); } @@ -162,18 +162,6 @@ Datum cimv_trigger(PG_FUNCTION_ARGS) char* createViewQueryString = trigger->tgargs[0]; char* prefix = trigger->tgargs[1]; - if (trigger->tgnargs >= 3) { - char* baseTableName = trigger->tgargs[2]; - char* relName = get_rel_name(trigdata->tg_relation->rd_id); - StringInfo first = makeStringInfo(); - appendStringInfo(first, "FROM %s", baseTableName); - - StringInfo second = makeStringInfo(); - appendStringInfo(second, "FROM %s", relName); - createViewQueryString = str_replace(createViewQueryString, first->data, second->data); - - } - CreateTableAsStmt* stmt = ParseQueryStringToCreateTableAsStmt(createViewQueryString); MatViewCreateOptions *options = GetMatViewCreateOptions(stmt); CimvCreate* cimvCreate = InitializeCimvCreate(stmt, options, NULL, prefix); @@ -182,8 +170,8 @@ Datum cimv_trigger(PG_FUNCTION_ARGS) ValidateCimv(cimvCreate); char* insertTableName = NULL; - if (trigger->tgnargs >= 4) { - insertTableName = trigger->tgargs[3]; + if (trigger->tgnargs >= 3) { + insertTableName = trigger->tgargs[2]; }else { RangeVar *insertTable = cimvCreate->createOptions->schedule == NULL ? cimvCreate->matTableNameQuoted : @@ -230,7 +218,6 @@ Datum cimv_trigger(PG_FUNCTION_ARGS) rettuple = trigdata->tg_trigtuple; } - tupdesc = trigdata->tg_relation->rd_att; /* connect to SPI manager */ if ( SPI_connect() < 0) elog(ERROR, "cimv_trigger: could not connect to SPI"); @@ -256,53 +243,6 @@ Datum cimv_trigger(PG_FUNCTION_ARGS) return PointerGetDatum(rettuple); } -char *str_replace(char *orig, char *rep, char *with) { - char *result; // the return string - char *ins; // the next insert point - char *tmp; // varies - int len_rep; // length of rep (the string to remove) - int len_with; // length of with (the string to replace rep with) - int len_front; // distance between rep and end of last rep - int count; // number of replacements - - // sanity checks and initialization - if (!orig || !rep) - return NULL; - len_rep = strlen(rep); - if (len_rep == 0) - return NULL; // empty rep causes infinite loop during count - if (!with) - with = ""; - len_with = strlen(with); - - // count the number of replacements needed - ins = orig; - for (count = 0; tmp = strstr(ins, rep); ++count) { - ins = tmp + len_rep; - } - - tmp = result = malloc(strlen(orig) + (len_with - len_rep) * count + 1); - - if (!result) - return NULL; - - // first time through the loop, all the variable are set correctly - // from here on, - // tmp points to the end of the result string - // ins points to the next occurrence of rep in orig - // orig points to the remainder of orig after "end of rep" - while (count--) { - ins = strstr(orig, rep); - len_front = ins - orig; - tmp = strncpy(tmp, orig, len_front) + len_front; - tmp = strcpy(tmp, with) + len_with; - orig += len_front + len_rep; // move to next "end of rep" - } - strcpy(tmp, orig); - return result; -} - - static CreateTableAsStmt* ParseQueryStringToCreateTableAsStmt(const char* queryString) { Query *query = ParseQueryString(queryString, NULL, 0); int cursorOptions = 0; @@ -323,6 +263,102 @@ static char* InsertTableName(RangeVar* insertTable) { return stringInfo->data; } +static List * +CreateTriggerTaskList(CimvCreate* cimvCreate, char* triggerName, char* event, char* referencing) +{ + + Oid leftRelationId = cimvCreate->formCimv->basetable; + + Oid rightRelationId = cimvCreate->createOptions->schedule == NULL ? + cimvCreate->formCimv->mattable : + cimvCreate->formCimv->landingtable; + List *leftShardList = LoadShardIntervalList(leftRelationId); + List *rightShardList = LoadShardIntervalList(rightRelationId); + + /* lock metadata before getting placement lists */ + LockShardListMetadata(leftShardList, ShareLock); + + uint64 jobId = INVALID_JOB_ID; + int taskId = 1; + + Oid leftSchemaId = get_rel_namespace(leftRelationId); + char* qualifiedLeftRelName = generate_qualified_relation_name(leftRelationId); + char *leftSchemaName = get_namespace_name(leftSchemaId); + char *escapedLeftSchemaName = quote_literal_cstr(leftSchemaName); + + Oid rightSchemaId = get_rel_namespace(rightRelationId); + char* qualifiedRightRelName = generate_qualified_relation_name(rightRelationId); + char *rightSchemaName = get_namespace_name(rightSchemaId); + char *escapedRightSchemaName = quote_literal_cstr(rightSchemaName); + + List *taskList = NIL; + + Query* originalUserView = (Query*)cimvCreate->stmt->query; + RangeTblEntry* baseRte = (RangeTblEntry*) linitial(originalUserView->rtable); + + ListCell *leftShardCell = NULL; + ListCell *rightShardCell = NULL; + forboth(leftShardCell, leftShardList, rightShardCell, rightShardList) + { + ShardInterval *leftShardInterval = (ShardInterval *) lfirst(leftShardCell); + ShardInterval *rightShardInterval = (ShardInterval *) lfirst(rightShardCell); + + uint64 leftShardId = leftShardInterval->shardId; + uint64 rightShardId = rightShardInterval->shardId; + + char* insertShardTable = pstrdup(qualifiedRightRelName); + AppendShardIdToName(&insertShardTable, rightShardId); + + char* baseShardTable = pstrdup(qualifiedLeftRelName); + AppendShardIdToName(&baseShardTable, leftShardId); + + StringInfo userViewBuf = makeStringInfo(); + deparse_shard_query(originalUserView, baseRte->relid, leftShardId, userViewBuf); + + char* queryString = CreateViewCommandForShard(cimvCreate, userViewBuf->data); + + StringInfo applyCommand = makeStringInfo(); + + appendStringInfo(applyCommand, + "CREATE TRIGGER %s AFTER %s ON %s %s " + "FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s(%s, %s, %s )", + quote_identifier(triggerName), + event, + baseShardTable, + referencing, + quote_identifier(NameStr( + cimvCreate->formCimv->triggerfnnamespace)), + quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname)), + quote_literal_cstr(queryString), + quote_literal_cstr(cimvCreate->prefix), + quote_literal_cstr(insertShardTable) + ); + + Task *task = CitusMakeNode(Task); + task->jobId = jobId; + task->taskId = taskId++; + task->taskType = DDL_TASK; + SetTaskQueryString(task, applyCommand->data); + task->dependentTaskList = NULL; + task->replicationModel = REPLICATION_MODEL_INVALID; + task->anchorShardId = leftShardId; + task->taskPlacementList = ActiveShardPlacementList(leftShardId); + RelationShard *leftRelationShard = CitusMakeNode(RelationShard); + leftRelationShard->relationId = leftShardInterval->relationId; + leftRelationShard->shardId = leftShardInterval->shardId; + + RelationShard *rightRelationShard = CitusMakeNode(RelationShard); + rightRelationShard->relationId = rightShardInterval->relationId; + rightRelationShard->shardId = rightShardInterval->shardId; + + task->relationShardList = list_make2(leftRelationShard, rightRelationShard); + + taskList = lappend(taskList, task); + } + + return taskList; +} + bool ProcessCreateMaterializedViewStmt(const CreateTableAsStmt *stmt, const char *query_string, PlannedStmt *pstmt) @@ -398,10 +434,12 @@ CreateCimv(CimvCreate *cimvCreate) static void CreateDependenciesFromTriggersToView(CimvCreate* cimvCreate) { if (cimvCreate->citusTable) { - if (SPI_connect() != SPI_OK_CONNECT) + if (SPI_connect_ext(SPI_OPT_NONATOMIC) != SPI_OK_CONNECT) { elog(ERROR, "SPI_connect failed"); } + SPI_commit(); + SPI_start_transaction(); char* triggerName = NULL; foreach_ptr(triggerName, cimvCreate->triggerNameList) { StringInfo queryBuf = makeStringInfo(); @@ -1115,27 +1153,22 @@ CreateDataChangeTrigger(CimvCreate *cimvCreate, int triggerEvent) if (isCitusTable) { - appendStringInfo(&buf, - "SELECT bool_and(success), max(result) FROM run_command_on_colocated_placements($param$%s.%s$param$, $param$%s.%s$param$, $create_trigger$ ", - cimvCreate->baseTableNameQuoted->schemaname, - cimvCreate->baseTableNameQuoted->relname, - insertTable->schemaname, - insertTable->relname); + List* taskList = CreateTriggerTaskList(cimvCreate, triggerName.data, event, referencing); + TransactionProperties xactProperties = { + .errorOnAnyFailure = true, + .useRemoteTransactionBlocks = TRANSACTION_BLOCKS_ALLOWED, + .requires2PC = false + }; - appendStringInfo(&buf, - "CREATE TRIGGER %s AFTER %s ON %%s %s " - "FOR EACH STATEMENT EXECUTE PROCEDURE %s.%s($view_def$%s$view_def$, $prefix$%s$prefix$, $baserel$%s$baserel$, %%L )", - quote_identifier(triggerName.data), - event, - referencing, - quote_identifier(NameStr( - cimvCreate->formCimv->triggerfnnamespace)), - quote_identifier(NameStr(cimvCreate->formCimv->triggerfnname)), - cimvCreate->queryString, - cimvCreate->prefix, - cimvCreate->baseTableNameQuoted->relname); - - appendStringInfoString(&buf, "$create_trigger$);"); + bool localExecutionSupported = true; + ExecutionParams *executionParams = CreateBasicExecutionParams( + ROW_MODIFY_NONE, taskList, MaxAdaptiveExecutorPoolSize, + localExecutionSupported + ); + executionParams->xactProperties = xactProperties; + executionParams->isUtilityCommand = true; + ExecuteTaskListExtended(executionParams); + return; } else { @@ -1337,6 +1370,44 @@ GetMatViewCreateOptions(const CreateTableAsStmt *stmt) return result; } +static char* CreateViewCommandForShard(CimvCreate* cimvCreate, char* shardViewQueryDef) { + StringInfo command = makeStringInfo(); + appendStringInfo(command, "CREATE MATERIALIZED VIEW"); + if (cimvCreate->stmt->if_not_exists) { + appendStringInfo(command, " IF NOT EXISTS"); + } + RangeVar* rel = cimvCreate->stmt->into->rel; + char* qualifiedRelName = quote_qualified_identifier(rel->schemaname, rel->relname); + + appendStringInfo(command, " %s WITH(", qualifiedRelName); + + bool isFirst = true; + DefElem* def = NULL; + foreach_ptr(def, cimvCreate->stmt->into->options) + { + if (def->defnamespace != NULL && pg_strcasecmp(def->defnamespace, + CITUS_NAMESPACE) == 0) + { + if (!isFirst) { + appendStringInfoChar(command, ','); + }else { + isFirst = false; + } + if (defGetBoolean(def)) { + appendStringInfo(command, "citus.%s", def->defname); + } + + } + } + appendStringInfo(command, ") AS %s", shardViewQueryDef); + if (cimvCreate->stmt->into->skipData) { + appendStringInfo(command, " WITH NO DATA;"); + }else { + appendStringInfo(command, " WITH DATA;"); + } + return command->data; +} + static ObjectAddress DefineVirtualRelation(RangeVar *relation, List *tlist, Query *viewParse) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 41643c4ef..58cf089b5 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -461,9 +461,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, bool continueProcessing = true; if (IsA(parsetree, CreateTableAsStmt)) { - StringInfo buf = makeStringInfo(); - pg_get_query_def(parsetree, buf); - elog(WARNING, "%s", buf->data); + continueProcessing = !ProcessCreateMaterializedViewStmt((const CreateTableAsStmt *) parsetree, queryString,