Add support for CREATE DATABASE queries from Citus non-main databases

pull/7439/head
Halil Ozan Akgul 2024-01-23 11:52:03 +03:00 committed by Onur Tirtir
parent b3ef1b7e39
commit 0ab2ee8ed4
6 changed files with 176 additions and 12 deletions

View File

@ -187,6 +187,7 @@ static const NonMainDbDistributedStatementInfo NonMainDbSupportedStatements[] =
{ T_GrantStmt, false, NonMainDbCheckSupportedObjectTypeForGrant } { T_GrantStmt, false, NonMainDbCheckSupportedObjectTypeForGrant }
}; };
static bool IsCommandToCreateOrDropMainDB(Node *parsetree);
/* /*
* ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of * ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of
@ -318,9 +319,23 @@ citus_ProcessUtility(PlannedStmt *pstmt,
if (!CitusHasBeenLoaded()) if (!CitusHasBeenLoaded())
{ {
if (!IsMainDB) /*
* We always execute CREATE/DROP DATABASE from the main database. There are no
* transactional visibility issues, since these commands are non-transactional.
* And this way we only have to consider one codepath when creating databases.
* We don't try to send the query to the main database if the CREATE/DROP DATABASE
* command is for the main database itself, this is a very rare case but it's
* exercised by our test suite.
*/
if (!IsMainDB &&
!IsCommandToCreateOrDropMainDB(parsetree))
{ {
RunPreprocessMainDBCommand(parsetree); RunPreprocessMainDBCommand(parsetree);
if (IsA(parsetree, CreatedbStmt) ||
IsA(parsetree, DropdbStmt))
{
return;
}
} }
/* /*
@ -1679,6 +1694,17 @@ RunPreprocessMainDBCommand(Node *parsetree)
} }
char *queryString = DeparseTreeNode(parsetree); char *queryString = DeparseTreeNode(parsetree);
if (IsA(parsetree, CreatedbStmt) ||
IsA(parsetree, DropdbStmt))
{
IsMainDBCommandInXact = false;
RunCitusMainDBQuery((char *) queryString);
return;
}
IsMainDBCommandInXact = true;
StringInfo mainDBQuery = makeStringInfo(); StringInfo mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery, appendStringInfo(mainDBQuery,
START_MANAGEMENT_TRANSACTION, START_MANAGEMENT_TRANSACTION,
@ -1810,3 +1836,25 @@ NonMainDbCheckSupportedObjectTypeForGrant(Node *node)
GrantStmt *stmt = castNode(GrantStmt, node); GrantStmt *stmt = castNode(GrantStmt, node);
return stmt->objtype == OBJECT_DATABASE; return stmt->objtype == OBJECT_DATABASE;
} }
/*
* IsCommandToCreateOrDropMainDB checks if this query creates or drops the
* main database, so we can make an exception and not send this query to
* the main database.
*/
static bool
IsCommandToCreateOrDropMainDB(Node *parsetree)
{
if (IsA(parsetree, CreatedbStmt))
{
CreatedbStmt *createdbStmt = castNode(CreatedbStmt, parsetree);
return strcmp(createdbStmt->dbname, MainDb) == 0;
}
else if (IsA(parsetree, DropdbStmt))
{
DropdbStmt *dropdbStmt = castNode(DropdbStmt, parsetree);
return strcmp(dropdbStmt->dbname, MainDb) == 0;
}
return false;
}

View File

@ -107,6 +107,12 @@ bool IsMainDB = true;
*/ */
char *SuperuserRole = NULL; char *SuperuserRole = NULL;
/*
* IsMainDBCommandInXact shows if the query sent to the main database requires
* a transaction
*/
bool IsMainDBCommandInXact = true;
/* /*
* start_management_transaction starts a management transaction * start_management_transaction starts a management transaction
@ -190,7 +196,11 @@ RunCitusMainDBQuery(char *query)
PostPortNumber, PostPortNumber,
SuperuserRole, SuperuserRole,
MainDb); MainDb);
RemoteTransactionBegin(MainDBConnection);
if (IsMainDBCommandInXact)
{
RemoteTransactionBegin(MainDBConnection);
}
} }
SendRemoteCommand(MainDBConnection, query); SendRemoteCommand(MainDBConnection, query);

View File

@ -333,7 +333,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* If this is a non-Citus main database we should try to commit the prepared * If this is a non-Citus main database we should try to commit the prepared
* transactions created by the Citus main database on the worker nodes. * transactions created by the Citus main database on the worker nodes.
*/ */
if (!IsMainDB && MainDBConnection != NULL) if (!IsMainDB && MainDBConnection != NULL && IsMainDBCommandInXact)
{ {
RunCitusMainDBQuery(COMMIT_MANAGEMENT_COMMAND_2PC); RunCitusMainDBQuery(COMMIT_MANAGEMENT_COMMAND_2PC);
CleanCitusMainDBConnection(); CleanCitusMainDBConnection();
@ -533,7 +533,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* main database query. So if some error happens on the distributed main * main database query. So if some error happens on the distributed main
* database query we wouldn't have committed the current query. * database query we wouldn't have committed the current query.
*/ */
if (!IsMainDB && MainDBConnection != NULL) if (!IsMainDB && MainDBConnection != NULL && IsMainDBCommandInXact)
{ {
RunCitusMainDBQuery("COMMIT"); RunCitusMainDBQuery("COMMIT");
} }

View File

@ -152,5 +152,6 @@ extern bool IsMainDB;
extern char *SuperuserRole; extern char *SuperuserRole;
extern char *MainDb; extern char *MainDb;
extern struct MultiConnection *MainDBConnection; extern struct MultiConnection *MainDBConnection;
extern bool IsMainDBCommandInXact;
#endif /* REMOTE_TRANSACTION_H */ #endif /* REMOTE_TRANSACTION_H */

View File

@ -98,11 +98,11 @@ REVOKE ALL ON SCHEMA citus_internal FROM nonsuperuser;
DROP USER other_db_user9, nonsuperuser; DROP USER other_db_user9, nonsuperuser;
-- test from a worker -- test from a worker
\c - - - :worker_1_port \c - - - :worker_1_port
CREATE DATABASE other_db2; CREATE DATABASE worker_other_db;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to other nodes DETAIL: Citus does not propagate CREATE DATABASE command to other nodes
HINT: You can manually create a database and its extensions on other nodes. HINT: You can manually create a database and its extensions on other nodes.
\c other_db2 \c worker_other_db
CREATE USER worker_user1; CREATE USER worker_user1;
BEGIN; BEGIN;
CREATE USER worker_user2; CREATE USER worker_user2;
@ -129,8 +129,84 @@ SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
-- some user creation commands will fail but let's make sure we try to drop them just in case -- some user creation commands will fail but let's make sure we try to drop them just in case
DROP USER IF EXISTS worker_user1, worker_user2, worker_user3; DROP USER IF EXISTS worker_user1, worker_user2, worker_user3;
NOTICE: role "worker_user3" does not exist, skipping NOTICE: role "worker_user3" does not exist, skipping
\c - - - :worker_1_port -- test creating and dropping a database from a Citus non-main database
DROP DATABASE other_db2; SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO true$$);
result
---------------------------------------------------------------------
ALTER SYSTEM
ALTER SYSTEM
ALTER SYSTEM
(3 rows)
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
result
---------------------------------------------------------------------
t
t
t
(3 rows)
\c other_db1
CREATE DATABASE other_db3;
\c regression
SELECT result FROM run_command_on_all_nodes($$SELECT datname FROM pg_database WHERE datname = 'other_db3'$$);
result
---------------------------------------------------------------------
other_db3
other_db3
other_db3
(3 rows)
\c other_db1
DROP DATABASE other_db3;
\c regression
SELECT result FROM run_command_on_all_nodes($$SELECT datname FROM pg_database WHERE datname = 'other_db3'$$);
result
---------------------------------------------------------------------
(3 rows)
\c worker_other_db - - :worker_1_port
CREATE DATABASE other_db4;
\c regression
SELECT result FROM run_command_on_all_nodes($$SELECT datname FROM pg_database WHERE datname = 'other_db4'$$);
result
---------------------------------------------------------------------
other_db4
other_db4
other_db4
(3 rows)
\c worker_other_db
DROP DATABASE other_db4;
\c regression
SELECT result FROM run_command_on_all_nodes($$SELECT datname FROM pg_database WHERE datname = 'other_db4'$$);
result
---------------------------------------------------------------------
(3 rows)
DROP DATABASE worker_other_db;
\c - - - :master_port \c - - - :master_port
SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO false$$);
result
---------------------------------------------------------------------
ALTER SYSTEM
ALTER SYSTEM
ALTER SYSTEM
(3 rows)
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
result
---------------------------------------------------------------------
t
t
t
(3 rows)
DROP SCHEMA other_databases; DROP SCHEMA other_databases;
DROP DATABASE other_db1; DROP DATABASE other_db1;

View File

@ -75,9 +75,9 @@ DROP USER other_db_user9, nonsuperuser;
-- test from a worker -- test from a worker
\c - - - :worker_1_port \c - - - :worker_1_port
CREATE DATABASE other_db2; CREATE DATABASE worker_other_db;
\c other_db2 \c worker_other_db
CREATE USER worker_user1; CREATE USER worker_user1;
@ -98,9 +98,38 @@ SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
-- some user creation commands will fail but let's make sure we try to drop them just in case -- some user creation commands will fail but let's make sure we try to drop them just in case
DROP USER IF EXISTS worker_user1, worker_user2, worker_user3; DROP USER IF EXISTS worker_user1, worker_user2, worker_user3;
\c - - - :worker_1_port -- test creating and dropping a database from a Citus non-main database
DROP DATABASE other_db2; SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO true$$);
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
\c other_db1
CREATE DATABASE other_db3;
\c regression
SELECT result FROM run_command_on_all_nodes($$SELECT datname FROM pg_database WHERE datname = 'other_db3'$$);
\c other_db1
DROP DATABASE other_db3;
\c regression
SELECT result FROM run_command_on_all_nodes($$SELECT datname FROM pg_database WHERE datname = 'other_db3'$$);
\c worker_other_db - - :worker_1_port
CREATE DATABASE other_db4;
\c regression
SELECT result FROM run_command_on_all_nodes($$SELECT datname FROM pg_database WHERE datname = 'other_db4'$$);
\c worker_other_db
DROP DATABASE other_db4;
\c regression
SELECT result FROM run_command_on_all_nodes($$SELECT datname FROM pg_database WHERE datname = 'other_db4'$$);
DROP DATABASE worker_other_db;
\c - - - :master_port \c - - - :master_port
SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO false$$);
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
DROP SCHEMA other_databases; DROP SCHEMA other_databases;
DROP DATABASE other_db1; DROP DATABASE other_db1;