address feedback

pull/7483/head
Onur Tirtir 2024-02-21 17:40:07 +03:00
parent 62857d59ab
commit 015c598948
9 changed files with 132 additions and 15 deletions

View File

@ -309,8 +309,9 @@ PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
* NontransactionalNodeDDLTask to run the command on the workers outside
* the transaction block.
*/
return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands);
bool warnForPartialFailure = true;
return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands,
warnForPartialFailure);
}
else
{
@ -522,6 +523,14 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
CreateDatabaseCommandOriginalDbName = stmt->dbname;
stmt->dbname = tempDatabaseName;
/*
* Delete cleanup records in the same transaction so that if the current
* transactions fails for some reason, then the cleanup records won't be
* deleted. In the happy path, we will delete the cleanup records without
* deferring them to the background worker.
*/
FinalizeOperationNeedingCleanupOnSuccess("create database");
return NIL;
}
@ -537,7 +546,6 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
* cleanup records that we inserted in PreprocessCreateDatabaseStmt() and in case of a
* failure, we won't leak any databases called as the name that user intended to use for
* the database.
*
*/
List *
PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
@ -570,8 +578,10 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
* block, we need to use NontransactionalNodeDDLTaskList() to send the CREATE
* DATABASE statement to the workers.
*/
bool warnForPartialFailure = false;
List *createDatabaseDDLJobList =
NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands);
NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands,
warnForPartialFailure);
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
@ -670,8 +680,10 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
* use NontransactionalNodeDDLTaskList() to send the DROP DATABASE statement
* to the workers.
*/
bool warnForPartialFailure = true;
List *dropDatabaseDDLJobList =
NontransactionalNodeDDLTaskList(REMOTE_NODES, dropDatabaseCommands);
NontransactionalNodeDDLTaskList(REMOTE_NODES, dropDatabaseCommands,
warnForPartialFailure);
return dropDatabaseDDLJobList;
}

View File

@ -493,6 +493,7 @@ GenerateCreateIndexDDLJob(IndexStmt *createIndexStatement, const char *createInd
ddlJob->startNewTransaction = createIndexStatement->concurrent;
ddlJob->metadataSyncCommand = createIndexCommand;
ddlJob->taskList = CreateIndexTaskList(createIndexStatement);
ddlJob->warnForPartialFailure = true;
return ddlJob;
}
@ -652,6 +653,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand,
"concurrently");
ddlJob->metadataSyncCommand = reindexCommand;
ddlJob->taskList = CreateReindexTaskList(relationId, reindexStatement);
ddlJob->warnForPartialFailure = true;
ddlJobs = list_make1(ddlJob);
}
@ -780,6 +782,7 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand,
ddlJob->metadataSyncCommand = dropIndexCommand;
ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId,
dropIndexStatement);
ddlJob->warnForPartialFailure = true;
ddlJobs = list_make1(ddlJob);
}

View File

@ -1287,7 +1287,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
errhint("Use DROP INDEX CONCURRENTLY IF EXISTS to remove the "
"invalid index, then retry the original command.")));
}
else
else if (ddlJob->warnForPartialFailure)
{
ereport(WARNING,
(errmsg(
@ -1296,9 +1296,9 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
"state.\nIf the problematic command is a CREATE operation, "
"consider using the 'IF EXISTS' syntax to drop the object,"
"\nif applicable, and then re-attempt the original command.")));
PG_RE_THROW();
}
PG_RE_THROW();
}
PG_END_TRY();
}
@ -1514,9 +1514,12 @@ DDLTaskList(Oid relationId, const char *commandString)
* NontransactionalNodeDDLTaskList builds a list of tasks to execute a DDL command on a
* given target set of nodes with cannotBeExecutedInTransaction is set to make sure
* that task list is executed outside a transaction block.
*
* Also sets warnForPartialFailure for the returned DDLJobs.
*/
List *
NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands)
NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands,
bool warnForPartialFailure)
{
List *ddlJobs = NodeDDLTaskList(targets, commands);
DDLJob *ddlJob = NULL;
@ -1527,6 +1530,8 @@ NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands)
{
task->cannotBeExecutedInTransaction = true;
}
ddlJob->warnForPartialFailure = warnForPartialFailure;
}
return ddlJobs;
}

View File

@ -929,6 +929,11 @@ TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName, int nodePo
const char *commandString = NULL;
foreach_ptr(commandString, commandList)
{
/*
* Cannot use SendOptionalCommandListToWorkerOutsideTransactionWithConnection()
* because we don't want to open a transaction block on remote nodes as DROP
* DATABASE commands cannot be run inside a transaction block.
*/
if (ExecuteOptionalRemoteCommand(connection, commandString, NULL) !=
RESPONSE_OKAY)
{

View File

@ -75,6 +75,15 @@ typedef struct DDLJob
const char *metadataSyncCommand;
List *taskList; /* worker DDL tasks to execute */
/*
* Only applicable when any of the tasks cannot be executed in a
* transaction block.
*
* Controls whether to emit a warning within the utility hook in case of a
* failure.
*/
bool warnForPartialFailure;
} DDLJob;
extern ProcessUtility_hook_type PrevProcessUtility;
@ -94,7 +103,8 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString,
extern void MarkInvalidateForeignKeyGraph(void);
extern void InvalidateForeignKeyGraphForDDL(void);
extern List * DDLTaskList(Oid relationId, const char *commandString);
extern List * NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands);
extern List * NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands,
bool warnForPartialFailure);
extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands);
extern bool AlterTableInProgress(void);
extern bool DropSchemaOrDBInProgress(void);

View File

@ -427,13 +427,17 @@ SELECT * FROM public.check_database_on_all_nodes('my_template_database') ORDER B
--tests for special characters in database name
set citus.enable_create_database_propagation=on;
SET citus.log_remote_commands = true;
set citus.grep_remote_commands = '%CREATE DATABASE%';
set citus.grep_remote_commands = '%DATABASE%';
SET citus.next_operation_id TO 2000;
create database "mydatabase#1'2";
NOTICE: issuing CREATE DATABASE citus_temp_database_2000_0
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE DATABASE citus_temp_database_2000_0
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE citus_temp_database_2000_0 RENAME TO "mydatabase#1'2"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE citus_temp_database_2000_0 RENAME TO "mydatabase#1'2"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
set citus.grep_remote_commands = '%DROP DATABASE%';
drop database if exists "mydatabase#1'2";
NOTICE: issuing DROP DATABASE IF EXISTS "mydatabase#1'2"

View File

@ -5,6 +5,11 @@ RETURNS TEXT AS $func$
SELECT array_agg(DISTINCT result ORDER BY result) AS temp_databases_on_nodes FROM run_command_on_all_nodes($$SELECT datname FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$) WHERE result != '';
$func$
LANGUAGE sql;
CREATE FUNCTION count_db_cleanup_records()
RETURNS TABLE(object_name TEXT, count INTEGER) AS $func$
SELECT object_name, COUNT(*) FROM pg_dist_cleanup WHERE object_name LIKE 'citus_temp_database_%' GROUP BY object_name;
$func$
LANGUAGE sql;
CREATE FUNCTION ensure_no_temp_databases_on_any_nodes()
RETURNS BOOLEAN AS $func$
SELECT bool_and(result::boolean) AS no_temp_databases_on_any_nodes FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$);
@ -46,6 +51,11 @@ SELECT get_temp_databases_on_nodes();
(1 row)
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
(0 rows)
CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
ensure_no_temp_databases_on_any_nodes
@ -67,9 +77,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="^CREATE DATABASE").cancel(' || pg_ba
(1 row)
CREATE DATABASE db1;
WARNING: Commands that are not transaction-safe may result in partial failure, potentially leading to an inconsistent state.
If the problematic command is a CREATE operation, consider using the 'IF EXISTS' syntax to drop the object,
if applicable, and then re-attempt the original command.
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
@ -83,6 +90,12 @@ SELECT get_temp_databases_on_nodes();
{citus_temp_database_4000_0}
(1 row)
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
citus_temp_database_4000_0 | 2
(1 row)
CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
ensure_no_temp_databases_on_any_nodes
@ -117,6 +130,12 @@ SELECT get_temp_databases_on_nodes();
{citus_temp_database_4001_0}
(1 row)
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
citus_temp_database_4001_0 | 2
(1 row)
CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
ensure_no_temp_databases_on_any_nodes
@ -151,6 +170,11 @@ SELECT get_temp_databases_on_nodes();
(1 row)
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
(0 rows)
CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
ensure_no_temp_databases_on_any_nodes
@ -186,6 +210,12 @@ SELECT get_temp_databases_on_nodes();
{citus_temp_database_4002_0}
(1 row)
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
citus_temp_database_4002_0 | 2
(1 row)
CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
ensure_no_temp_databases_on_any_nodes
@ -237,6 +267,12 @@ SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, re
(2 rows)
DROP DATABASE db1;
-- after recovering the prepared transactions, cleanup records should also be removed
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
(0 rows)
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_internal.acquire_citus_advisory_object_class_lock").kill()');
mitmproxy
---------------------------------------------------------------------
@ -257,6 +293,11 @@ SELECT get_temp_databases_on_nodes();
(1 row)
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
(0 rows)
CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
ensure_no_temp_databases_on_any_nodes
@ -292,6 +333,12 @@ SELECT get_temp_databases_on_nodes();
{citus_temp_database_4004_0}
(1 row)
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
citus_temp_database_4004_0 | 2
(1 row)
CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
ensure_no_temp_databases_on_any_nodes
@ -306,5 +353,13 @@ SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, re
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(2 rows)
CREATE DATABASE db1;
-- show that a successful database creation doesn't leave any pg_dist_cleanup records behind
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
(0 rows)
DROP DATABASE db1;
DROP FUNCTION get_temp_databases_on_nodes();
DROP FUNCTION ensure_no_temp_databases_on_any_nodes();

View File

@ -218,7 +218,7 @@ SELECT * FROM public.check_database_on_all_nodes('my_template_database') ORDER B
--tests for special characters in database name
set citus.enable_create_database_propagation=on;
SET citus.log_remote_commands = true;
set citus.grep_remote_commands = '%CREATE DATABASE%';
set citus.grep_remote_commands = '%DATABASE%';
SET citus.next_operation_id TO 2000;
create database "mydatabase#1'2";

View File

@ -7,6 +7,12 @@ RETURNS TEXT AS $func$
$func$
LANGUAGE sql;
CREATE FUNCTION count_db_cleanup_records()
RETURNS TABLE(object_name TEXT, count INTEGER) AS $func$
SELECT object_name, COUNT(*) FROM pg_dist_cleanup WHERE object_name LIKE 'citus_temp_database_%' GROUP BY object_name;
$func$
LANGUAGE sql;
CREATE FUNCTION ensure_no_temp_databases_on_any_nodes()
RETURNS BOOLEAN AS $func$
SELECT bool_and(result::boolean) AS no_temp_databases_on_any_nodes FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$);
@ -27,6 +33,7 @@ CREATE DATABASE db1;
SELECT citus.mitmproxy('conn.allow()');
SELECT get_temp_databases_on_nodes();
SELECT * FROM count_db_cleanup_records();
CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
@ -36,6 +43,7 @@ CREATE DATABASE db1;
SELECT citus.mitmproxy('conn.allow()');
SELECT get_temp_databases_on_nodes();
SELECT * FROM count_db_cleanup_records();
CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
@ -45,6 +53,7 @@ CREATE DATABASE db1;
SELECT citus.mitmproxy('conn.allow()');
SELECT get_temp_databases_on_nodes();
SELECT * FROM count_db_cleanup_records();
CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
@ -54,6 +63,7 @@ CREATE DATABASE db1;
SELECT citus.mitmproxy('conn.allow()');
SELECT get_temp_databases_on_nodes();
SELECT * FROM count_db_cleanup_records();
CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
@ -63,6 +73,7 @@ CREATE DATABASE db1;
SELECT citus.mitmproxy('conn.allow()');
SELECT get_temp_databases_on_nodes();
SELECT * FROM count_db_cleanup_records();
CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
@ -78,11 +89,15 @@ SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, re
DROP DATABASE db1;
-- after recovering the prepared transactions, cleanup records should also be removed
SELECT * FROM count_db_cleanup_records();
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_internal.acquire_citus_advisory_object_class_lock").kill()');
CREATE DATABASE db1;
SELECT citus.mitmproxy('conn.allow()');
SELECT get_temp_databases_on_nodes();
SELECT * FROM count_db_cleanup_records();
CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
@ -92,9 +107,17 @@ CREATE DATABASE db1;
SELECT citus.mitmproxy('conn.allow()');
SELECT get_temp_databases_on_nodes();
SELECT * FROM count_db_cleanup_records();
CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
CREATE DATABASE db1;
-- show that a successful database creation doesn't leave any pg_dist_cleanup records behind
SELECT * FROM count_db_cleanup_records();
DROP DATABASE db1;
DROP FUNCTION get_temp_databases_on_nodes();
DROP FUNCTION ensure_no_temp_databases_on_any_nodes();