Fixes review notes

create_drop_db_gh
gindibay 2023-10-13 03:10:17 +03:00
parent 81766e48da
commit 761fb13ac8
10 changed files with 57 additions and 145 deletions

View File

@ -39,8 +39,6 @@
static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid); static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid);
static List * CreateDDLTaskList(char *command, List *workerNodeList,
bool outsideTransaction);
PG_FUNCTION_INFO_V1(citus_internal_database_command); PG_FUNCTION_INFO_V1(citus_internal_database_command);
static Oid get_database_owner(Oid db_oid); static Oid get_database_owner(Oid db_oid);
@ -226,36 +224,6 @@ PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *queryString,
#endif #endif
/*
* CreateDDLTaskList creates a task list for running a single DDL command.
*/
static List *
CreateDDLTaskList(char *command, List *workerNodeList, bool outsideTransaction)
{
List *commandList = list_make3(DISABLE_DDL_PROPAGATION,
command,
ENABLE_DDL_PROPAGATION);
Task *task = CitusMakeNode(Task);
task->taskType = DDL_TASK;
SetTaskQueryStringList(task, commandList);
task->cannotBeExecutedInTransaction = outsideTransaction;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
targetPlacement->nodeName = workerNode->workerName;
targetPlacement->nodePort = workerNode->workerPort;
targetPlacement->groupId = workerNode->groupId;
task->taskPlacementList = lappend(task->taskPlacementList,
targetPlacement);
}
return list_make1(task);
}
/* /*
* PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local * PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local
* postgres instance. * postgres instance.
@ -295,55 +263,25 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
List * List *
PostprocessCreateDatabaseStmt(Node *node, const char *queryString) PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
{ {
if (!EnableCreateDatabasePropagation || !ShouldPropagate()) if (!ShouldPropagate())
{ {
return NIL; return NIL;
} }
EnsureCoordinator(); EnsureCoordinator();
CreatedbStmt *stmt = castNode(CreatedbStmt, node); char *createDatabaseCommand = DeparseTreeNode(node);
char *databaseName = stmt->dbname;
bool missingOk = false;
Oid databaseOid = get_database_oid(databaseName, missingOk);
/* StringInfo internalCreateCommand = makeStringInfo();
* TODO: try to reuse regular DDL infrastructure appendStringInfo(internalCreateCommand,
* "SELECT pg_catalog.citus_internal_database_command(%s)",
* We do not do this right now because of the AssignDatabaseToShard at the end. quote_literal_cstr(createDatabaseCommand));
*/
List *workerNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
RowShareLock);
if (list_length(workerNodes) > 0)
{
char *createDatabaseCommand = DeparseTreeNode(node);
StringInfo internalCreateCommand = makeStringInfo(); List *commands = list_make3(DISABLE_DDL_PROPAGATION,
appendStringInfo(internalCreateCommand, (void *) internalCreateCommand->data,
"SELECT pg_catalog.citus_internal_database_command(%s)", ENABLE_DDL_PROPAGATION);
quote_literal_cstr(createDatabaseCommand));
/* return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
* For the moment, we run CREATE DATABASE in 2PC, though that prevents
* us from immediately doing a pg_dump | pg_restore when dealing with
* a remote template database.
*/
bool outsideTransaction = false;
List *taskList = CreateDDLTaskList(internalCreateCommand->data, workerNodes,
outsideTransaction);
bool localExecutionSupported = false;
ExecuteUtilityTaskList(taskList, localExecutionSupported);
}
/* synchronize pg_dist_object records */
ObjectAddress dbAddress = { 0 };
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
MarkObjectDistributed(&dbAddress);
return NIL;
} }
@ -418,37 +356,13 @@ List *
PreprocessDropDatabaseStmt(Node *node, const char *queryString, PreprocessDropDatabaseStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext) ProcessUtilityContext processUtilityContext)
{ {
if (!EnableCreateDatabasePropagation || !ShouldPropagate()) if (!ShouldPropagate())
{ {
return NIL; return NIL;
} }
EnsureCoordinator(); EnsureCoordinator();
DropdbStmt *stmt = (DropdbStmt *) node;
char *databaseName = stmt->dbname;
bool missingOk = true;
Oid databaseOid = get_database_oid(databaseName, missingOk);
if (databaseOid == InvalidOid)
{
/* let regular ProcessUtility deal with IF NOT EXISTS */
return NIL;
}
ObjectAddress dbAddress = { 0 };
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
if (!IsObjectDistributed(&dbAddress))
{
return NIL;
}
List *workerNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
RowShareLock);
if (list_length(workerNodes) == 0)
{
return NIL;
}
char *dropDatabaseCommand = DeparseTreeNode(node); char *dropDatabaseCommand = DeparseTreeNode(node);
StringInfo internalDropCommand = makeStringInfo(); StringInfo internalDropCommand = makeStringInfo();
@ -456,20 +370,9 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
"SELECT pg_catalog.citus_internal_database_command(%s)", "SELECT pg_catalog.citus_internal_database_command(%s)",
quote_literal_cstr(dropDatabaseCommand)); quote_literal_cstr(dropDatabaseCommand));
/* Delete from pg_dist_object */ List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) internalDropCommand->data,
ENABLE_DDL_PROPAGATION);
UnmarkObjectDistributed(&dbAddress); return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
/* ExecuteDistributedDDLJob could not be used since it depends on namespace and
* database does not have namespace.
*/
bool outsideTransaction = false;
List *taskList = CreateDDLTaskList(internalDropCommand->data, workerNodes,
outsideTransaction);
bool localExecutionSupported = false;
ExecuteUtilityTaskList(taskList, localExecutionSupported);
return NIL;
} }

View File

@ -938,7 +938,7 @@ CreateIndexTaskList(IndexStmt *indexStmt)
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = ActiveShardPlacementList(shardId); task->taskPlacementList = ActiveShardPlacementList(shardId);
task->cannotBeExecutedInTransaction = indexStmt->concurrent; task->cannotBeExecutedInTransction = indexStmt->concurrent;
taskList = lappend(taskList, task); taskList = lappend(taskList, task);
@ -983,7 +983,7 @@ CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt)
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = ActiveShardPlacementList(shardId); task->taskPlacementList = ActiveShardPlacementList(shardId);
task->cannotBeExecutedInTransaction = task->cannotBeExecutedInTransction =
IsReindexWithParam_compat(reindexStmt, "concurrently"); IsReindexWithParam_compat(reindexStmt, "concurrently");
taskList = lappend(taskList, task); taskList = lappend(taskList, task);
@ -1309,7 +1309,7 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt)
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = ActiveShardPlacementList(shardId); task->taskPlacementList = ActiveShardPlacementList(shardId);
task->cannotBeExecutedInTransaction = dropStmt->concurrent; task->cannotBeExecutedInTransction = dropStmt->concurrent;
taskList = lappend(taskList, task); taskList = lappend(taskList, task);

View File

@ -279,7 +279,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = ActiveShardPlacementList(shardId); task->taskPlacementList = ActiveShardPlacementList(shardId);
task->cannotBeExecutedInTransaction = ((vacuumParams.options) & VACOPT_VACUUM); task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM);
taskList = lappend(taskList, task); taskList = lappend(taskList, task);
} }
@ -719,7 +719,7 @@ ExecuteUnqualifiedVacuumTasks(VacuumStmt *vacuumStmt, CitusVacuumParams vacuumPa
SetTaskQueryStringList(task, unqualifiedVacuumCommands); SetTaskQueryStringList(task, unqualifiedVacuumCommands);
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->cannotBeExecutedInTransaction = ((vacuumParams.options) & VACOPT_VACUUM); task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM);
bool hasPeerWorker = false; bool hasPeerWorker = false;

View File

@ -31,36 +31,36 @@ optionToStatement(StringInfo buf, DefElem *option, const struct
{ {
if (strcmp(name, opt_formats[i].name) == 0) if (strcmp(name, opt_formats[i].name) == 0)
{ {
if (strcmp(opt_formats[i].type, "string") == 0) if (opt_formats[i].type == OPTION_FORMAT_STRING)
{ {
char *value = defGetString(option); char *value = defGetString(option);
appendStringInfo(buf, opt_formats[i].format, quote_identifier(value)); appendStringInfo(buf, opt_formats[i].format, quote_identifier(value));
} }
else if (strcmp(opt_formats[i].type, "integer") == 0) else if (opt_formats[i].type == OPTION_FORMAT_INTEGER)
{ {
int32 value = defGetInt32(option); int32 value = defGetInt32(option);
appendStringInfo(buf, opt_formats[i].format, value); appendStringInfo(buf, opt_formats[i].format, value);
} }
else if (strcmp(opt_formats[i].type, "boolean") == 0) else if (opt_formats[i].type == OPTION_FORMAT_BOOLEAN)
{ {
bool value = defGetBoolean(option); bool value = defGetBoolean(option);
appendStringInfo(buf, opt_formats[i].format, value ? "true" : "false"); appendStringInfo(buf, opt_formats[i].format, value ? "true" : "false");
} }
#if PG_VERSION_NUM >= PG_VERSION_15 #if PG_VERSION_NUM >= PG_VERSION_15
else if (strcmp(opt_formats[i].type, "object_id") == 0) else if (opt_formats[i].type == OPTION_FORMAT_OBJECT_ID)
{ {
Oid value = defGetObjectId(option); Oid value = defGetObjectId(option);
appendStringInfo(buf, opt_formats[i].format, value); appendStringInfo(buf, opt_formats[i].format, value);
} }
#endif #endif
else if (strcmp(opt_formats[i].type, "literal_cstr") == 0) else if (opt_formats[i].type == OPTION_FORMAT_LITERAL_CSTR)
{ {
char *value = defGetString(option); char *value = defGetString(option);
appendStringInfo(buf, opt_formats[i].format, quote_literal_cstr(value)); appendStringInfo(buf, opt_formats[i].format, quote_literal_cstr(value));
} }
else else
{ {
elog(ERROR, "unrecognized option type: %s", opt_formats[i].type); elog(ERROR, "unrecognized option type: %d", opt_formats[i].type);
} }
break; break;
} }

View File

@ -31,22 +31,22 @@ static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt);
static void AppendDefElemConnLimit(StringInfo buf, DefElem *def); static void AppendDefElemConnLimit(StringInfo buf, DefElem *def);
const struct option_format create_database_option_formats[] = { const struct option_format create_database_option_formats[] = {
{ "template", " TEMPLATE %s", "string" }, { "owner", " OWNER %s", OPTION_FORMAT_STRING },
{ "owner", " OWNER %s", "string" }, { "template", " TEMPLATE %s", OPTION_FORMAT_STRING },
{ "tablespace", " TABLESPACE %s", "string" }, { "encoding", " ENCODING %s", OPTION_FORMAT_LITERAL_CSTR },
{ "connection_limit", " CONNECTION LIMIT %d", "integer" }, { "strategy", " STRATEGY %s", OPTION_FORMAT_LITERAL_CSTR },
{ "encoding", " ENCODING %s", "literal_cstr" }, { "locale", " LOCALE %s", OPTION_FORMAT_LITERAL_CSTR },
{ "locale", " LOCALE %s", "literal_cstr" }, { "lc_collate", " LC_COLLATE %s", OPTION_FORMAT_LITERAL_CSTR },
{ "lc_collate", " LC_COLLATE %s", "literal_cstr" }, { "lc_ctype", " LC_CTYPE %s", OPTION_FORMAT_LITERAL_CSTR },
{ "lc_ctype", " LC_CTYPE %s", "literal_cstr" }, { "icu_locale", " ICU_LOCALE %s", OPTION_FORMAT_LITERAL_CSTR },
{ "icu_locale", " ICU_LOCALE %s", "literal_cstr" }, { "icu_rules", " ICU_RULES %s", OPTION_FORMAT_LITERAL_CSTR },
{ "icu_rules", " ICU_RULES %s", "literal_cstr" }, { "locale_provider", " LOCALE_PROVIDER %s", OPTION_FORMAT_LITERAL_CSTR },
{ "locale_provider", " LOCALE_PROVIDER %s", "literal_cstr" }, { "collation_version", " COLLATION_VERSION %s", OPTION_FORMAT_LITERAL_CSTR },
{ "is_template", " IS_TEMPLATE %s", "boolean" }, { "tablespace", " TABLESPACE %s", OPTION_FORMAT_STRING },
{ "allow_connections", " ALLOW_CONNECTIONS %s", "boolean" }, { "allow_connections", " ALLOW_CONNECTIONS %s", OPTION_FORMAT_BOOLEAN },
{ "collation_version", " COLLATION_VERSION %s", "literal_cstr" }, { "connection_limit", " CONNECTION LIMIT %d", OPTION_FORMAT_INTEGER },
{ "strategy", " STRATEGY %s", "literal_cstr" }, { "is_template", " IS_TEMPLATE %s", OPTION_FORMAT_BOOLEAN },
{ "oid", " OID %d", "object_id" }, { "oid", " OID %d", OPTION_FORMAT_OBJECT_ID }
}; };
char * char *

View File

@ -61,7 +61,7 @@ TaskListRequiresRollback(List *taskList)
} }
Task *task = (Task *) linitial(taskList); Task *task = (Task *) linitial(taskList);
if (task->cannotBeExecutedInTransaction) if (task->cannotBeExecutedInTransction)
{ {
/* vacuum, create index concurrently etc. */ /* vacuum, create index concurrently etc. */
return false; return false;
@ -164,7 +164,7 @@ TaskListCannotBeExecutedInTransaction(List *taskList)
Task *task = NULL; Task *task = NULL;
foreach_ptr(task, taskList) foreach_ptr(task, taskList)
{ {
if (task->cannotBeExecutedInTransaction) if (task->cannotBeExecutedInTransction)
{ {
return true; return true;
} }

View File

@ -326,7 +326,7 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_STRING_FIELD(fetchedExplainAnalyzePlan); COPY_STRING_FIELD(fetchedExplainAnalyzePlan);
COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration); COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration);
COPY_SCALAR_FIELD(isLocalTableModification); COPY_SCALAR_FIELD(isLocalTableModification);
COPY_SCALAR_FIELD(cannotBeExecutedInTransaction); COPY_SCALAR_FIELD(cannotBeExecutedInTransction);
} }

View File

@ -535,7 +535,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_STRING_FIELD(fetchedExplainAnalyzePlan); WRITE_STRING_FIELD(fetchedExplainAnalyzePlan);
WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f"); WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
WRITE_BOOL_FIELD(isLocalTableModification); WRITE_BOOL_FIELD(isLocalTableModification);
WRITE_BOOL_FIELD(cannotBeExecutedInTransaction); WRITE_BOOL_FIELD(cannotBeExecutedInTransction);
} }

View File

@ -127,9 +127,18 @@ struct option_format
{ {
const char *name; const char *name;
const char *format; const char *format;
const char *type; const int type;
}; };
typedef enum OptionFormatType
{
OPTION_FORMAT_STRING,
OPTION_FORMAT_LITERAL_CSTR,
OPTION_FORMAT_BOOLEAN,
OPTION_FORMAT_INTEGER,
OPTION_FORMAT_OBJECT_ID
} OptionFormatType;
extern void optionToStatement(StringInfo buf, DefElem *option, const struct extern void optionToStatement(StringInfo buf, DefElem *option, const struct
option_format *opt_formats, int option_format *opt_formats, int

View File

@ -329,7 +329,7 @@ typedef struct Task
/* /*
* Vacuum, create/drop/reindex concurrently cannot be executed in a transaction. * Vacuum, create/drop/reindex concurrently cannot be executed in a transaction.
*/ */
bool cannotBeExecutedInTransaction; bool cannotBeExecutedInTransction;
Const *partitionKeyValue; Const *partitionKeyValue;
int colocationId; int colocationId;