mirror of https://github.com/citusdata/citus.git
Add failure handling for CREATE DATABASE commands
parent
2fae91c5df
commit
a5748fc9f3
|
@ -40,15 +40,38 @@
|
|||
#include "distributed/deparse_shard_query.h"
|
||||
#include "distributed/deparser.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/local_executor.h"
|
||||
#include "distributed/metadata/distobject.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/metadata_utility.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/serialize_distributed_ddls.h"
|
||||
#include "distributed/shard_cleaner.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
|
||||
|
||||
/*
|
||||
* Used to save original name of the database before it is replaced with a
|
||||
* temporary name for failure handling purposes in PreprocessCreateDatabaseStmt().
|
||||
*/
|
||||
static char *CreateDatabaseCommandOriginalDbName = NULL;
|
||||
|
||||
|
||||
/*
|
||||
* The format string used when creating a temporary databases for failure
|
||||
* handling purposes.
|
||||
*
|
||||
* The fields are as follows to ensure using a unique name for each temporary
|
||||
* database:
|
||||
* - operationId: The operation id returned by RegisterOperationNeedingCleanup().
|
||||
* - groupId: The group id of the worker node where CREATE DATABASE command
|
||||
* is issued from.
|
||||
*/
|
||||
#define TEMP_DATABASE_NAME_FMT "citus_temp_database_%lu_%d"
|
||||
|
||||
|
||||
/*
|
||||
* DatabaseCollationInfo is used to store collation related information of a database.
|
||||
*/
|
||||
|
@ -453,7 +476,12 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
|
|||
*
|
||||
* In this stage, we perform validations that we want to ensure before delegating to
|
||||
* previous utility hooks because it might not be convenient to throw an error in an
|
||||
* implicit transaction that creates a database.
|
||||
* implicit transaction that creates a database. Also in this stage, we save the original
|
||||
* database name and replace dbname field with a temporary name for failure handling
|
||||
* purposes. We let Postgres create the database with the temporary name, insert a cleanup
|
||||
* record for the temporary database name on all workers and let PostprocessCreateDatabaseStmt()
|
||||
* to return the distributed DDL job that both creates the database with the temporary name
|
||||
* and then renames it back to its original name.
|
||||
*
|
||||
* We also serialize database commands globally by acquiring a Citus specific advisory
|
||||
* lock based on OCLASS_DATABASE on the first primary worker node.
|
||||
|
@ -474,14 +502,41 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
|
|||
|
||||
SerializeDistributedDDLsOnObjectClass(OCLASS_DATABASE);
|
||||
|
||||
OperationId operationId = RegisterOperationNeedingCleanup();
|
||||
|
||||
char *tempDatabaseName = psprintf(TEMP_DATABASE_NAME_FMT,
|
||||
operationId, GetLocalGroupId());
|
||||
|
||||
List *allNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock);
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, allNodes)
|
||||
{
|
||||
InsertCleanupRecordInSubtransaction(
|
||||
CLEANUP_OBJECT_DATABASE,
|
||||
pstrdup(quote_identifier(tempDatabaseName)),
|
||||
workerNode->groupId,
|
||||
CLEANUP_ON_FAILURE
|
||||
);
|
||||
}
|
||||
|
||||
CreateDatabaseCommandOriginalDbName = stmt->dbname;
|
||||
stmt->dbname = tempDatabaseName;
|
||||
|
||||
return NIL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PostprocessCreateDatabaseStmt is executed after the statement is applied to the local
|
||||
* postgres instance. In this stage we prepare the commands that need to be run on
|
||||
* all workers to create the database.
|
||||
* postgres instance.
|
||||
*
|
||||
* In this stage, we first rename the temporary database back to its original name for
|
||||
* local node and then return a list of distributed DDL jobs to create the database with
|
||||
* the temporary name and then to rename it back to its original name. That way, if CREATE
|
||||
* DATABASE fails on any of the nodes, the temporary database will be cleaned up by the
|
||||
* cleanup records that we inserted in PreprocessCreateDatabaseStmt() and in case of a
|
||||
* failure, we won't leak any databases called as the name that user intended to use for
|
||||
* the database.
|
||||
*
|
||||
*/
|
||||
List *
|
||||
|
@ -517,7 +572,51 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
|
|||
*/
|
||||
List *createDatabaseDDLJobList =
|
||||
NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands);
|
||||
return createDatabaseDDLJobList;
|
||||
|
||||
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
|
||||
|
||||
char *renameDatabaseCommand =
|
||||
psprintf("ALTER DATABASE %s RENAME TO %s",
|
||||
quote_identifier(stmt->dbname),
|
||||
quote_identifier(CreateDatabaseCommandOriginalDbName));
|
||||
|
||||
List *renameDatabaseCommands = list_make3(DISABLE_DDL_PROPAGATION,
|
||||
renameDatabaseCommand,
|
||||
ENABLE_DDL_PROPAGATION);
|
||||
|
||||
/*
|
||||
* We use NodeDDLTaskList() to send the RENAME DATABASE statement to the
|
||||
* workers because we want to execute it in a coordinated transaction.
|
||||
*/
|
||||
List *renameDatabaseDDLJobList =
|
||||
NodeDDLTaskList(REMOTE_NODES, renameDatabaseCommands);
|
||||
|
||||
/*
|
||||
* Temporarily disable citus.enable_ddl_propagation before issuing
|
||||
* rename command locally because we don't want to execute it on remote
|
||||
* nodes yet. We will execute it on remote nodes by returning it as a
|
||||
* distributed DDL job.
|
||||
*
|
||||
* The reason why we don't want to execute it on remote nodes yet is that
|
||||
* the database is not created on remote nodes yet.
|
||||
*/
|
||||
int saveNestLevel = NewGUCNestLevel();
|
||||
set_config_option("citus.enable_ddl_propagation", "off",
|
||||
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
|
||||
GUC_ACTION_LOCAL, true, 0, false);
|
||||
|
||||
ExecuteUtilityCommand(renameDatabaseCommand);
|
||||
|
||||
AtEOXact_GUC(true, saveNestLevel);
|
||||
|
||||
/*
|
||||
* Restore the original database name because MarkObjectDistributed()
|
||||
* resolves oid of the object based on the database name and is called
|
||||
* after executing the distributed DDL job that renames temporary database.
|
||||
*/
|
||||
stmt->dbname = CreateDatabaseCommandOriginalDbName;
|
||||
|
||||
return list_concat(createDatabaseDDLJobList, renameDatabaseDDLJobList);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -92,6 +92,8 @@ static bool TryDropReplicationSlotOutsideTransaction(char *replicationSlotName,
|
|||
char *nodeName,
|
||||
int nodePort);
|
||||
static bool TryDropUserOutsideTransaction(char *username, char *nodeName, int nodePort);
|
||||
static bool TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName,
|
||||
int nodePort);
|
||||
|
||||
static CleanupRecord * GetCleanupRecordByNameAndType(char *objectName,
|
||||
CleanupObject type);
|
||||
|
@ -141,7 +143,6 @@ Datum
|
|||
citus_cleanup_orphaned_resources(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
EnsureCoordinator();
|
||||
PreventInTransactionBlock(true, "citus_cleanup_orphaned_resources");
|
||||
|
||||
int droppedCount = DropOrphanedResourcesForCleanup();
|
||||
|
@ -245,12 +246,6 @@ TryDropOrphanedResources()
|
|||
static int
|
||||
DropOrphanedResourcesForCleanup()
|
||||
{
|
||||
/* Only runs on Coordinator */
|
||||
if (!IsCoordinator())
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
List *cleanupRecordList = ListCleanupRecords();
|
||||
|
||||
/*
|
||||
|
@ -603,6 +598,12 @@ TryDropResourceByCleanupRecordOutsideTransaction(CleanupRecord *record,
|
|||
return TryDropUserOutsideTransaction(record->objectName, nodeName, nodePort);
|
||||
}
|
||||
|
||||
case CLEANUP_OBJECT_DATABASE:
|
||||
{
|
||||
return TryDropDatabaseOutsideTransaction(record->objectName, nodeName,
|
||||
nodePort);
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
ereport(WARNING, (errmsg(
|
||||
|
@ -883,6 +884,64 @@ TryDropUserOutsideTransaction(char *username,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* TryDropDatabaseOutsideTransaction drops the database with the given name
|
||||
* if it exists.
|
||||
*/
|
||||
static bool
|
||||
TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName, int nodePort)
|
||||
{
|
||||
int connectionFlags = (OUTSIDE_TRANSACTION | FORCE_NEW_CONNECTION);
|
||||
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
|
||||
nodeName, nodePort,
|
||||
CitusExtensionOwnerName(),
|
||||
NULL);
|
||||
|
||||
if (PQstatus(connection->pgConn) != CONNECTION_OK)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* We want to disable DDL propagation and set lock_timeout before issuing
|
||||
* the DROP DATABASE command but we cannot do so in a way that's scoped
|
||||
* to the DROP DATABASE command. This is because, we cannot use a
|
||||
* transaction block for the DROP DATABASE command.
|
||||
*
|
||||
* For this reason, to avoid leaking the lock_timeout and DDL propagation
|
||||
* settings to future commands, we force the connection to close at the end
|
||||
* of the transaction.
|
||||
*/
|
||||
ForceConnectionCloseAtTransactionEnd(connection);
|
||||
|
||||
/*
|
||||
* The DROP DATABASE command should not propagate, so we disable DDL
|
||||
* propagation.
|
||||
*/
|
||||
List *commandList = list_make3(
|
||||
"SET lock_timeout TO '1s'",
|
||||
"SET citus.enable_ddl_propagation TO OFF;",
|
||||
psprintf("DROP DATABASE IF EXISTS %s;", quote_identifier(databaseName))
|
||||
);
|
||||
|
||||
bool executeCommand = true;
|
||||
|
||||
const char *commandString = NULL;
|
||||
foreach_ptr(commandString, commandList)
|
||||
{
|
||||
if (ExecuteOptionalRemoteCommand(connection, commandString, NULL) !=
|
||||
RESPONSE_OKAY)
|
||||
{
|
||||
executeCommand = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
CloseConnection(connection);
|
||||
return executeCommand;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfCleanupRecordForShardExists errors out if a cleanup record for the given
|
||||
* shard name exists.
|
||||
|
|
|
@ -41,7 +41,8 @@ typedef enum CleanupObject
|
|||
CLEANUP_OBJECT_SUBSCRIPTION = 2,
|
||||
CLEANUP_OBJECT_REPLICATION_SLOT = 3,
|
||||
CLEANUP_OBJECT_PUBLICATION = 4,
|
||||
CLEANUP_OBJECT_USER = 5
|
||||
CLEANUP_OBJECT_USER = 5,
|
||||
CLEANUP_OBJECT_DATABASE = 6
|
||||
} CleanupObject;
|
||||
|
||||
/*
|
||||
|
|
|
@ -140,7 +140,12 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
|||
NOTICE: issuing ALTER DATABASE regression RESET lock_timeout
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
set citus.enable_create_database_propagation=on;
|
||||
SET citus.next_operation_id TO 3000;
|
||||
create database "regression!'2";
|
||||
NOTICE: issuing ALTER DATABASE citus_temp_database_3000_0 RENAME TO "regression!'2"
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE citus_temp_database_3000_0 RENAME TO "regression!'2"
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
alter database "regression!'2" with CONNECTION LIMIT 100;
|
||||
NOTICE: issuing ALTER DATABASE "regression!'2" WITH CONNECTION LIMIT 100;
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
|
@ -189,7 +194,12 @@ alter DATABASE local_regression rename to local_regression2;
|
|||
drop database local_regression2;
|
||||
set citus.enable_create_database_propagation=on;
|
||||
drop database regression3;
|
||||
SET citus.next_operation_id TO 3100;
|
||||
create database "regression!'4";
|
||||
NOTICE: issuing ALTER DATABASE citus_temp_database_3100_0 RENAME TO "regression!'4"
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing ALTER DATABASE citus_temp_database_3100_0 RENAME TO "regression!'4"
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
SELECT result FROM run_command_on_all_nodes(
|
||||
$$
|
||||
ALTER TABLESPACE alter_db_tablespace RENAME TO "ts-needs\!escape"
|
||||
|
|
|
@ -428,10 +428,11 @@ SELECT * FROM public.check_database_on_all_nodes('my_template_database') ORDER B
|
|||
set citus.enable_create_database_propagation=on;
|
||||
SET citus.log_remote_commands = true;
|
||||
set citus.grep_remote_commands = '%CREATE DATABASE%';
|
||||
SET citus.next_operation_id TO 2000;
|
||||
create database "mydatabase#1'2";
|
||||
NOTICE: issuing CREATE DATABASE "mydatabase#1'2"
|
||||
NOTICE: issuing CREATE DATABASE citus_temp_database_2000_0
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
NOTICE: issuing CREATE DATABASE "mydatabase#1'2"
|
||||
NOTICE: issuing CREATE DATABASE citus_temp_database_2000_0
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
set citus.grep_remote_commands = '%DROP DATABASE%';
|
||||
drop database if exists "mydatabase#1'2";
|
||||
|
@ -1264,6 +1265,69 @@ SELECT 1 FROM run_command_on_all_nodes($$REVOKE ALL ON TABLESPACE pg_default FRO
|
|||
|
||||
DROP DATABASE no_createdb;
|
||||
DROP USER no_createdb;
|
||||
-- Test a failure scenario by trying to create a distributed database that
|
||||
-- already exists on one of the nodes.
|
||||
\c - - - :worker_1_port
|
||||
CREATE DATABASE "test_\!failure";
|
||||
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
|
||||
DETAIL: Citus does not propagate CREATE DATABASE command to other nodes
|
||||
HINT: You can manually create a database and its extensions on other nodes.
|
||||
\c - - - :master_port
|
||||
SET citus.enable_create_database_propagation TO ON;
|
||||
CREATE DATABASE "test_\!failure";
|
||||
ERROR: database "test_\!failure" already exists
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
SET client_min_messages TO WARNING;
|
||||
CALL citus_cleanup_orphaned_resources();
|
||||
RESET client_min_messages;
|
||||
SELECT result AS database_cleanedup_on_node FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$);
|
||||
database_cleanedup_on_node
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
t
|
||||
t
|
||||
(3 rows)
|
||||
|
||||
SELECT * FROM public.check_database_on_all_nodes($$test_\!failure$$) ORDER BY node_type, result;
|
||||
node_type | result
|
||||
---------------------------------------------------------------------
|
||||
coordinator (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
|
||||
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
|
||||
worker node (remote) | {"database_properties": {"datacl": null, "datname": "test_\\!failure", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
|
||||
(3 rows)
|
||||
|
||||
SET citus.enable_create_database_propagation TO OFF;
|
||||
CREATE DATABASE "test_\!failure1";
|
||||
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
|
||||
DETAIL: Citus does not propagate CREATE DATABASE command to other nodes
|
||||
HINT: You can manually create a database and its extensions on other nodes.
|
||||
\c - - - :worker_1_port
|
||||
DROP DATABASE "test_\!failure";
|
||||
SET citus.enable_create_database_propagation TO ON;
|
||||
CREATE DATABASE "test_\!failure1";
|
||||
ERROR: database "test_\!failure1" already exists
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
SET client_min_messages TO WARNING;
|
||||
CALL citus_cleanup_orphaned_resources();
|
||||
RESET client_min_messages;
|
||||
SELECT result AS database_cleanedup_on_node FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$);
|
||||
database_cleanedup_on_node
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
t
|
||||
t
|
||||
(3 rows)
|
||||
|
||||
SELECT * FROM public.check_database_on_all_nodes($$test_\!failure1$$) ORDER BY node_type, result;
|
||||
node_type | result
|
||||
---------------------------------------------------------------------
|
||||
coordinator (remote) | {"database_properties": {"datacl": null, "datname": "test_\\!failure1", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
|
||||
worker node (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
|
||||
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
|
||||
(3 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
DROP DATABASE "test_\!failure1";
|
||||
SET citus.enable_create_database_propagation TO ON;
|
||||
--clean up resources created by this test
|
||||
-- DROP TABLESPACE is not supported, so we need to drop it manually.
|
||||
|
|
|
@ -49,6 +49,7 @@ alter database regression set lock_timeout to DEFAULT;
|
|||
alter database regression RESET lock_timeout;
|
||||
|
||||
set citus.enable_create_database_propagation=on;
|
||||
SET citus.next_operation_id TO 3000;
|
||||
create database "regression!'2";
|
||||
alter database "regression!'2" with CONNECTION LIMIT 100;
|
||||
alter database "regression!'2" with IS_TEMPLATE true CONNECTION LIMIT 50;
|
||||
|
@ -90,6 +91,7 @@ set citus.enable_create_database_propagation=on;
|
|||
|
||||
drop database regression3;
|
||||
|
||||
SET citus.next_operation_id TO 3100;
|
||||
create database "regression!'4";
|
||||
|
||||
|
||||
|
|
|
@ -219,6 +219,7 @@ SELECT * FROM public.check_database_on_all_nodes('my_template_database') ORDER B
|
|||
set citus.enable_create_database_propagation=on;
|
||||
SET citus.log_remote_commands = true;
|
||||
set citus.grep_remote_commands = '%CREATE DATABASE%';
|
||||
SET citus.next_operation_id TO 2000;
|
||||
|
||||
create database "mydatabase#1'2";
|
||||
|
||||
|
@ -746,6 +747,46 @@ SELECT 1 FROM run_command_on_all_nodes($$REVOKE ALL ON TABLESPACE pg_default FRO
|
|||
DROP DATABASE no_createdb;
|
||||
DROP USER no_createdb;
|
||||
|
||||
-- Test a failure scenario by trying to create a distributed database that
|
||||
-- already exists on one of the nodes.
|
||||
|
||||
\c - - - :worker_1_port
|
||||
CREATE DATABASE "test_\!failure";
|
||||
|
||||
\c - - - :master_port
|
||||
|
||||
SET citus.enable_create_database_propagation TO ON;
|
||||
|
||||
CREATE DATABASE "test_\!failure";
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
CALL citus_cleanup_orphaned_resources();
|
||||
RESET client_min_messages;
|
||||
|
||||
SELECT result AS database_cleanedup_on_node FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$);
|
||||
SELECT * FROM public.check_database_on_all_nodes($$test_\!failure$$) ORDER BY node_type, result;
|
||||
|
||||
SET citus.enable_create_database_propagation TO OFF;
|
||||
CREATE DATABASE "test_\!failure1";
|
||||
|
||||
\c - - - :worker_1_port
|
||||
DROP DATABASE "test_\!failure";
|
||||
|
||||
SET citus.enable_create_database_propagation TO ON;
|
||||
|
||||
CREATE DATABASE "test_\!failure1";
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
CALL citus_cleanup_orphaned_resources();
|
||||
RESET client_min_messages;
|
||||
|
||||
SELECT result AS database_cleanedup_on_node FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$);
|
||||
SELECT * FROM public.check_database_on_all_nodes($$test_\!failure1$$) ORDER BY node_type, result;
|
||||
|
||||
\c - - - :master_port
|
||||
|
||||
DROP DATABASE "test_\!failure1";
|
||||
|
||||
SET citus.enable_create_database_propagation TO ON;
|
||||
|
||||
--clean up resources created by this test
|
||||
|
|
Loading…
Reference in New Issue