From 761fb13ac81409245a8196a7ab9cc51dcd95c5ad Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 13 Oct 2023 03:10:17 +0300 Subject: [PATCH] Fixes review notes --- src/backend/distributed/commands/database.c | 127 +++--------------- src/backend/distributed/commands/index.c | 6 +- src/backend/distributed/commands/vacuum.c | 4 +- .../distributed/deparser/citus_deparseutils.c | 12 +- .../deparser/deparse_database_stmts.c | 32 ++--- .../executor/executor_util_tasks.c | 4 +- .../distributed/utils/citus_copyfuncs.c | 2 +- .../distributed/utils/citus_outfuncs.c | 2 +- src/include/distributed/deparser.h | 11 +- .../distributed/multi_physical_planner.h | 2 +- 10 files changed, 57 insertions(+), 145 deletions(-) diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 067701114..93cf87b42 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -39,8 +39,6 @@ static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid); -static List * CreateDDLTaskList(char *command, List *workerNodeList, - bool outsideTransaction); PG_FUNCTION_INFO_V1(citus_internal_database_command); static Oid get_database_owner(Oid db_oid); @@ -226,36 +224,6 @@ PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString, #endif -/* - * CreateDDLTaskList creates a task list for running a single DDL command. - */ -static List * -CreateDDLTaskList(char *command, List *workerNodeList, bool outsideTransaction) -{ - List *commandList = list_make3(DISABLE_DDL_PROPAGATION, - command, - ENABLE_DDL_PROPAGATION); - - Task *task = CitusMakeNode(Task); - task->taskType = DDL_TASK; - SetTaskQueryStringList(task, commandList); - task->cannotBeExecutedInTransaction = outsideTransaction; - - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, workerNodeList) - { - ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement); - targetPlacement->nodeName = workerNode->workerName; - targetPlacement->nodePort = workerNode->workerPort; - targetPlacement->groupId = workerNode->groupId; - task->taskPlacementList = lappend(task->taskPlacementList, - targetPlacement); - } - - return list_make1(task); -} - - /* * PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local * postgres instance. @@ -295,55 +263,25 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString, List * PostprocessCreateDatabaseStmt(Node *node, const char *queryString) { - if (!EnableCreateDatabasePropagation || !ShouldPropagate()) + if (!ShouldPropagate()) { return NIL; } EnsureCoordinator(); - CreatedbStmt *stmt = castNode(CreatedbStmt, node); - char *databaseName = stmt->dbname; - bool missingOk = false; - Oid databaseOid = get_database_oid(databaseName, missingOk); + char *createDatabaseCommand = DeparseTreeNode(node); - /* - * TODO: try to reuse regular DDL infrastructure - * - * We do not do this right now because of the AssignDatabaseToShard at the end. - */ - List *workerNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, - RowShareLock); - if (list_length(workerNodes) > 0) - { - char *createDatabaseCommand = DeparseTreeNode(node); + StringInfo internalCreateCommand = makeStringInfo(); + appendStringInfo(internalCreateCommand, + "SELECT pg_catalog.citus_internal_database_command(%s)", + quote_literal_cstr(createDatabaseCommand)); - StringInfo internalCreateCommand = makeStringInfo(); - appendStringInfo(internalCreateCommand, - "SELECT pg_catalog.citus_internal_database_command(%s)", - quote_literal_cstr(createDatabaseCommand)); + List *commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) internalCreateCommand->data, + ENABLE_DDL_PROPAGATION); - /* - * For the moment, we run CREATE DATABASE in 2PC, though that prevents - * us from immediately doing a pg_dump | pg_restore when dealing with - * a remote template database. - */ - bool outsideTransaction = false; - - List *taskList = CreateDDLTaskList(internalCreateCommand->data, workerNodes, - outsideTransaction); - - bool localExecutionSupported = false; - ExecuteUtilityTaskList(taskList, localExecutionSupported); - } - - /* synchronize pg_dist_object records */ - ObjectAddress dbAddress = { 0 }; - ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid); - MarkObjectDistributed(&dbAddress); - - - return NIL; + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } @@ -418,37 +356,13 @@ List * PreprocessDropDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext) { - if (!EnableCreateDatabasePropagation || !ShouldPropagate()) + if (!ShouldPropagate()) { return NIL; } EnsureCoordinator(); - DropdbStmt *stmt = (DropdbStmt *) node; - char *databaseName = stmt->dbname; - bool missingOk = true; - Oid databaseOid = get_database_oid(databaseName, missingOk); - if (databaseOid == InvalidOid) - { - /* let regular ProcessUtility deal with IF NOT EXISTS */ - return NIL; - } - - ObjectAddress dbAddress = { 0 }; - ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid); - if (!IsObjectDistributed(&dbAddress)) - { - return NIL; - } - - List *workerNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, - RowShareLock); - if (list_length(workerNodes) == 0) - { - return NIL; - } - char *dropDatabaseCommand = DeparseTreeNode(node); StringInfo internalDropCommand = makeStringInfo(); @@ -456,20 +370,9 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, "SELECT pg_catalog.citus_internal_database_command(%s)", quote_literal_cstr(dropDatabaseCommand)); - /* Delete from pg_dist_object */ + List *commands = list_make3(DISABLE_DDL_PROPAGATION, + (void *) internalDropCommand->data, + ENABLE_DDL_PROPAGATION); - UnmarkObjectDistributed(&dbAddress); - - /* ExecuteDistributedDDLJob could not be used since it depends on namespace and - * database does not have namespace. - */ - - bool outsideTransaction = false; - List *taskList = CreateDDLTaskList(internalDropCommand->data, workerNodes, - outsideTransaction); - - bool localExecutionSupported = false; - ExecuteUtilityTaskList(taskList, localExecutionSupported); - - return NIL; + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index 275f253b3..8271cc4f4 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -938,7 +938,7 @@ CreateIndexTaskList(IndexStmt *indexStmt) task->dependentTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = ActiveShardPlacementList(shardId); - task->cannotBeExecutedInTransaction = indexStmt->concurrent; + task->cannotBeExecutedInTransction = indexStmt->concurrent; taskList = lappend(taskList, task); @@ -983,7 +983,7 @@ CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt) task->dependentTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = ActiveShardPlacementList(shardId); - task->cannotBeExecutedInTransaction = + task->cannotBeExecutedInTransction = IsReindexWithParam_compat(reindexStmt, "concurrently"); taskList = lappend(taskList, task); @@ -1309,7 +1309,7 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt) task->dependentTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = ActiveShardPlacementList(shardId); - task->cannotBeExecutedInTransaction = dropStmt->concurrent; + task->cannotBeExecutedInTransction = dropStmt->concurrent; taskList = lappend(taskList, task); diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index 21638ba7f..ee03aeae1 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -279,7 +279,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; task->taskPlacementList = ActiveShardPlacementList(shardId); - task->cannotBeExecutedInTransaction = ((vacuumParams.options) & VACOPT_VACUUM); + task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM); taskList = lappend(taskList, task); } @@ -719,7 +719,7 @@ ExecuteUnqualifiedVacuumTasks(VacuumStmt *vacuumStmt, CitusVacuumParams vacuumPa SetTaskQueryStringList(task, unqualifiedVacuumCommands); task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; - task->cannotBeExecutedInTransaction = ((vacuumParams.options) & VACOPT_VACUUM); + task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM); bool hasPeerWorker = false; diff --git a/src/backend/distributed/deparser/citus_deparseutils.c b/src/backend/distributed/deparser/citus_deparseutils.c index 1dd8b33bc..52d96930e 100644 --- a/src/backend/distributed/deparser/citus_deparseutils.c +++ b/src/backend/distributed/deparser/citus_deparseutils.c @@ -31,36 +31,36 @@ optionToStatement(StringInfo buf, DefElem *option, const struct { if (strcmp(name, opt_formats[i].name) == 0) { - if (strcmp(opt_formats[i].type, "string") == 0) + if (opt_formats[i].type == OPTION_FORMAT_STRING) { char *value = defGetString(option); appendStringInfo(buf, opt_formats[i].format, quote_identifier(value)); } - else if (strcmp(opt_formats[i].type, "integer") == 0) + else if (opt_formats[i].type == OPTION_FORMAT_INTEGER) { int32 value = defGetInt32(option); appendStringInfo(buf, opt_formats[i].format, value); } - else if (strcmp(opt_formats[i].type, "boolean") == 0) + else if (opt_formats[i].type == OPTION_FORMAT_BOOLEAN) { bool value = defGetBoolean(option); appendStringInfo(buf, opt_formats[i].format, value ? "true" : "false"); } #if PG_VERSION_NUM >= PG_VERSION_15 - else if (strcmp(opt_formats[i].type, "object_id") == 0) + else if (opt_formats[i].type == OPTION_FORMAT_OBJECT_ID) { Oid value = defGetObjectId(option); appendStringInfo(buf, opt_formats[i].format, value); } #endif - else if (strcmp(opt_formats[i].type, "literal_cstr") == 0) + else if (opt_formats[i].type == OPTION_FORMAT_LITERAL_CSTR) { char *value = defGetString(option); appendStringInfo(buf, opt_formats[i].format, quote_literal_cstr(value)); } else { - elog(ERROR, "unrecognized option type: %s", opt_formats[i].type); + elog(ERROR, "unrecognized option type: %d", opt_formats[i].type); } break; } diff --git a/src/backend/distributed/deparser/deparse_database_stmts.c b/src/backend/distributed/deparser/deparse_database_stmts.c index 7c7544694..bf98b9622 100644 --- a/src/backend/distributed/deparser/deparse_database_stmts.c +++ b/src/backend/distributed/deparser/deparse_database_stmts.c @@ -31,22 +31,22 @@ static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt); static void AppendDefElemConnLimit(StringInfo buf, DefElem *def); const struct option_format create_database_option_formats[] = { - { "template", " TEMPLATE %s", "string" }, - { "owner", " OWNER %s", "string" }, - { "tablespace", " TABLESPACE %s", "string" }, - { "connection_limit", " CONNECTION LIMIT %d", "integer" }, - { "encoding", " ENCODING %s", "literal_cstr" }, - { "locale", " LOCALE %s", "literal_cstr" }, - { "lc_collate", " LC_COLLATE %s", "literal_cstr" }, - { "lc_ctype", " LC_CTYPE %s", "literal_cstr" }, - { "icu_locale", " ICU_LOCALE %s", "literal_cstr" }, - { "icu_rules", " ICU_RULES %s", "literal_cstr" }, - { "locale_provider", " LOCALE_PROVIDER %s", "literal_cstr" }, - { "is_template", " IS_TEMPLATE %s", "boolean" }, - { "allow_connections", " ALLOW_CONNECTIONS %s", "boolean" }, - { "collation_version", " COLLATION_VERSION %s", "literal_cstr" }, - { "strategy", " STRATEGY %s", "literal_cstr" }, - { "oid", " OID %d", "object_id" }, + { "owner", " OWNER %s", OPTION_FORMAT_STRING }, + { "template", " TEMPLATE %s", OPTION_FORMAT_STRING }, + { "encoding", " ENCODING %s", OPTION_FORMAT_LITERAL_CSTR }, + { "strategy", " STRATEGY %s", OPTION_FORMAT_LITERAL_CSTR }, + { "locale", " LOCALE %s", OPTION_FORMAT_LITERAL_CSTR }, + { "lc_collate", " LC_COLLATE %s", OPTION_FORMAT_LITERAL_CSTR }, + { "lc_ctype", " LC_CTYPE %s", OPTION_FORMAT_LITERAL_CSTR }, + { "icu_locale", " ICU_LOCALE %s", OPTION_FORMAT_LITERAL_CSTR }, + { "icu_rules", " ICU_RULES %s", OPTION_FORMAT_LITERAL_CSTR }, + { "locale_provider", " LOCALE_PROVIDER %s", OPTION_FORMAT_LITERAL_CSTR }, + { "collation_version", " COLLATION_VERSION %s", OPTION_FORMAT_LITERAL_CSTR }, + { "tablespace", " TABLESPACE %s", OPTION_FORMAT_STRING }, + { "allow_connections", " ALLOW_CONNECTIONS %s", OPTION_FORMAT_BOOLEAN }, + { "connection_limit", " CONNECTION LIMIT %d", OPTION_FORMAT_INTEGER }, + { "is_template", " IS_TEMPLATE %s", OPTION_FORMAT_BOOLEAN }, + { "oid", " OID %d", OPTION_FORMAT_OBJECT_ID } }; char * diff --git a/src/backend/distributed/executor/executor_util_tasks.c b/src/backend/distributed/executor/executor_util_tasks.c index 483fd55a7..abf721196 100644 --- a/src/backend/distributed/executor/executor_util_tasks.c +++ b/src/backend/distributed/executor/executor_util_tasks.c @@ -61,7 +61,7 @@ TaskListRequiresRollback(List *taskList) } Task *task = (Task *) linitial(taskList); - if (task->cannotBeExecutedInTransaction) + if (task->cannotBeExecutedInTransction) { /* vacuum, create index concurrently etc. */ return false; @@ -164,7 +164,7 @@ TaskListCannotBeExecutedInTransaction(List *taskList) Task *task = NULL; foreach_ptr(task, taskList) { - if (task->cannotBeExecutedInTransaction) + if (task->cannotBeExecutedInTransction) { return true; } diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index fe4429f04..7e1379ef3 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -326,7 +326,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_STRING_FIELD(fetchedExplainAnalyzePlan); COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration); COPY_SCALAR_FIELD(isLocalTableModification); - COPY_SCALAR_FIELD(cannotBeExecutedInTransaction); + COPY_SCALAR_FIELD(cannotBeExecutedInTransction); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 9b4ac809c..b4062751a 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -535,7 +535,7 @@ OutTask(OUTFUNC_ARGS) WRITE_STRING_FIELD(fetchedExplainAnalyzePlan); WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f"); WRITE_BOOL_FIELD(isLocalTableModification); - WRITE_BOOL_FIELD(cannotBeExecutedInTransaction); + WRITE_BOOL_FIELD(cannotBeExecutedInTransction); } diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index d47d3c18a..66ead2b4d 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -127,9 +127,18 @@ struct option_format { const char *name; const char *format; - const char *type; + const int type; }; +typedef enum OptionFormatType +{ + OPTION_FORMAT_STRING, + OPTION_FORMAT_LITERAL_CSTR, + OPTION_FORMAT_BOOLEAN, + OPTION_FORMAT_INTEGER, + OPTION_FORMAT_OBJECT_ID +} OptionFormatType; + extern void optionToStatement(StringInfo buf, DefElem *option, const struct option_format *opt_formats, int diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 35d83eb33..b7acc0574 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -329,7 +329,7 @@ typedef struct Task /* * Vacuum, create/drop/reindex concurrently cannot be executed in a transaction. */ - bool cannotBeExecutedInTransaction; + bool cannotBeExecutedInTransction; Const *partitionKeyValue; int colocationId;