Add support for create / drop database propagation from non-main databases (#7439)

DESCRIPTION: Adds support for distributed `CREATE/DROP DATABASE `
commands from the databases where Citus is not installed

---------

Co-authored-by: Onur Tirtir <onurcantirtir@gmail.com>
pull/7492/head^2
Halil Ozan Akgül 2024-02-21 13:44:01 +03:00 committed by GitHub
parent b3ef1b7e39
commit 852bcc5483
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 355 additions and 13 deletions

View File

@ -162,6 +162,7 @@ static bool ShouldCheckUndistributeCitusLocalTables(void);
* Functions to support commands used to manage node-wide objects from non-main * Functions to support commands used to manage node-wide objects from non-main
* databases. * databases.
*/ */
static bool IsCommandToCreateOrDropMainDB(Node *parsetree);
static void RunPreprocessMainDBCommand(Node *parsetree); static void RunPreprocessMainDBCommand(Node *parsetree);
static void RunPostprocessMainDBCommand(Node *parsetree); static void RunPostprocessMainDBCommand(Node *parsetree);
static bool IsStatementSupportedFromNonMainDb(Node *parsetree); static bool IsStatementSupportedFromNonMainDb(Node *parsetree);
@ -184,7 +185,9 @@ ObjectType supportedObjectTypesForGrantStmt[] = { OBJECT_DATABASE };
static const NonMainDbDistributedStatementInfo NonMainDbSupportedStatements[] = { static const NonMainDbDistributedStatementInfo NonMainDbSupportedStatements[] = {
{ T_GrantRoleStmt, false, NULL }, { T_GrantRoleStmt, false, NULL },
{ T_CreateRoleStmt, true, NULL }, { T_CreateRoleStmt, true, NULL },
{ T_GrantStmt, false, NonMainDbCheckSupportedObjectTypeForGrant } { T_GrantStmt, false, NonMainDbCheckSupportedObjectTypeForGrant },
{ T_CreatedbStmt, false, NULL },
{ T_DropdbStmt, false, NULL },
}; };
@ -318,9 +321,24 @@ citus_ProcessUtility(PlannedStmt *pstmt,
if (!CitusHasBeenLoaded()) if (!CitusHasBeenLoaded())
{ {
if (!IsMainDB) /*
* We always execute CREATE/DROP DATABASE from the main database. There are no
* transactional visibility issues, since these commands are non-transactional.
* And this way we only have to consider one codepath when creating databases.
* We don't try to send the query to the main database if the CREATE/DROP DATABASE
* command is for the main database itself, this is a very rare case but it's
* exercised by our test suite.
*/
if (!IsMainDB &&
!IsCommandToCreateOrDropMainDB(parsetree))
{ {
RunPreprocessMainDBCommand(parsetree); RunPreprocessMainDBCommand(parsetree);
if (IsA(parsetree, CreatedbStmt) ||
IsA(parsetree, DropdbStmt))
{
return;
}
} }
/* /*
@ -1666,6 +1684,29 @@ DropSchemaOrDBInProgress(void)
} }
/*
* IsCommandToCreateOrDropMainDB checks if this query creates or drops the
* main database, so we can make an exception and not send this query to
* the main database.
*/
static bool
IsCommandToCreateOrDropMainDB(Node *parsetree)
{
if (IsA(parsetree, CreatedbStmt))
{
CreatedbStmt *createdbStmt = castNode(CreatedbStmt, parsetree);
return strcmp(createdbStmt->dbname, MainDb) == 0;
}
else if (IsA(parsetree, DropdbStmt))
{
DropdbStmt *dropdbStmt = castNode(DropdbStmt, parsetree);
return strcmp(dropdbStmt->dbname, MainDb) == 0;
}
return false;
}
/* /*
* RunPreprocessMainDBCommand runs the necessary commands for a query, in main * RunPreprocessMainDBCommand runs the necessary commands for a query, in main
* database before query is run on the local node with PrevProcessUtility * database before query is run on the local node with PrevProcessUtility
@ -1679,6 +1720,17 @@ RunPreprocessMainDBCommand(Node *parsetree)
} }
char *queryString = DeparseTreeNode(parsetree); char *queryString = DeparseTreeNode(parsetree);
if (IsA(parsetree, CreatedbStmt) ||
IsA(parsetree, DropdbStmt))
{
IsMainDBCommandInXact = false;
RunCitusMainDBQuery((char *) queryString);
return;
}
IsMainDBCommandInXact = true;
StringInfo mainDBQuery = makeStringInfo(); StringInfo mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery, appendStringInfo(mainDBQuery,
START_MANAGEMENT_TRANSACTION, START_MANAGEMENT_TRANSACTION,

View File

@ -107,6 +107,12 @@ bool IsMainDB = true;
*/ */
char *SuperuserRole = NULL; char *SuperuserRole = NULL;
/*
* IsMainDBCommandInXact shows if the query sent to the main database requires
* a transaction
*/
bool IsMainDBCommandInXact = true;
/* /*
* start_management_transaction starts a management transaction * start_management_transaction starts a management transaction
@ -190,7 +196,11 @@ RunCitusMainDBQuery(char *query)
PostPortNumber, PostPortNumber,
SuperuserRole, SuperuserRole,
MainDb); MainDb);
RemoteTransactionBegin(MainDBConnection);
if (IsMainDBCommandInXact)
{
RemoteTransactionBegin(MainDBConnection);
}
} }
SendRemoteCommand(MainDBConnection, query); SendRemoteCommand(MainDBConnection, query);

View File

@ -333,7 +333,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* If this is a non-Citus main database we should try to commit the prepared * If this is a non-Citus main database we should try to commit the prepared
* transactions created by the Citus main database on the worker nodes. * transactions created by the Citus main database on the worker nodes.
*/ */
if (!IsMainDB && MainDBConnection != NULL) if (!IsMainDB && MainDBConnection != NULL && IsMainDBCommandInXact)
{ {
RunCitusMainDBQuery(COMMIT_MANAGEMENT_COMMAND_2PC); RunCitusMainDBQuery(COMMIT_MANAGEMENT_COMMAND_2PC);
CleanCitusMainDBConnection(); CleanCitusMainDBConnection();
@ -533,7 +533,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* main database query. So if some error happens on the distributed main * main database query. So if some error happens on the distributed main
* database query we wouldn't have committed the current query. * database query we wouldn't have committed the current query.
*/ */
if (!IsMainDB && MainDBConnection != NULL) if (!IsMainDB && MainDBConnection != NULL && IsMainDBCommandInXact)
{ {
RunCitusMainDBQuery("COMMIT"); RunCitusMainDBQuery("COMMIT");
} }

View File

@ -152,5 +152,6 @@ extern bool IsMainDB;
extern char *SuperuserRole; extern char *SuperuserRole;
extern char *MainDb; extern char *MainDb;
extern struct MultiConnection *MainDBConnection; extern struct MultiConnection *MainDBConnection;
extern bool IsMainDBCommandInXact;
#endif /* REMOTE_TRANSACTION_H */ #endif /* REMOTE_TRANSACTION_H */

View File

@ -98,11 +98,11 @@ REVOKE ALL ON SCHEMA citus_internal FROM nonsuperuser;
DROP USER other_db_user9, nonsuperuser; DROP USER other_db_user9, nonsuperuser;
-- test from a worker -- test from a worker
\c - - - :worker_1_port \c - - - :worker_1_port
CREATE DATABASE other_db2; CREATE DATABASE worker_other_db;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to other nodes DETAIL: Citus does not propagate CREATE DATABASE command to other nodes
HINT: You can manually create a database and its extensions on other nodes. HINT: You can manually create a database and its extensions on other nodes.
\c other_db2 \c worker_other_db
CREATE USER worker_user1; CREATE USER worker_user1;
BEGIN; BEGIN;
CREATE USER worker_user2; CREATE USER worker_user2;
@ -129,8 +129,211 @@ SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
-- some user creation commands will fail but let's make sure we try to drop them just in case -- some user creation commands will fail but let's make sure we try to drop them just in case
DROP USER IF EXISTS worker_user1, worker_user2, worker_user3; DROP USER IF EXISTS worker_user1, worker_user2, worker_user3;
NOTICE: role "worker_user3" does not exist, skipping NOTICE: role "worker_user3" does not exist, skipping
\c - - - :worker_1_port -- test creating and dropping a database from a Citus non-main database
DROP DATABASE other_db2; SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO true$$);
result
---------------------------------------------------------------------
ALTER SYSTEM
ALTER SYSTEM
ALTER SYSTEM
(3 rows)
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
result
---------------------------------------------------------------------
t
t
t
(3 rows)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
\c other_db1
CREATE DATABASE other_db3;
\c regression
SELECT * FROM public.check_database_on_all_nodes('other_db3') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (local) | {"database_properties": {"datacl": null, "datname": "other_db3", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": true, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": {"datacl": null, "datname": "other_db3", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": true, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": {"datacl": null, "datname": "other_db3", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": true, "stale_pg_dist_object_record_for_a_db_exists": false}
(3 rows)
\c other_db1
DROP DATABASE other_db3;
\c regression
SELECT * FROM public.check_database_on_all_nodes('other_db3') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(3 rows)
\c worker_other_db - - :worker_1_port
CREATE DATABASE other_db4;
\c regression
SELECT * FROM public.check_database_on_all_nodes('other_db4') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (remote) | {"database_properties": {"datacl": null, "datname": "other_db4", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": true, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (local) | {"database_properties": {"datacl": null, "datname": "other_db4", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": true, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": {"datacl": null, "datname": "other_db4", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": true, "stale_pg_dist_object_record_for_a_db_exists": false}
(3 rows)
\c worker_other_db
DROP DATABASE other_db4;
\c regression
SELECT * FROM public.check_database_on_all_nodes('other_db4') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(3 rows)
DROP DATABASE worker_other_db;
CREATE DATABASE other_db5;
-- disable create database propagation for the next test
SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO false$$);
result
---------------------------------------------------------------------
ALTER SYSTEM
ALTER SYSTEM
ALTER SYSTEM
(3 rows)
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
result
---------------------------------------------------------------------
t
t
t
(3 rows)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
\c other_db5 - - :worker_2_port
-- locally create a database
CREATE DATABASE local_db;
\c regression - - -
-- re-enable create database propagation
SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO true$$);
result
---------------------------------------------------------------------
ALTER SYSTEM
ALTER SYSTEM
ALTER SYSTEM
(3 rows)
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
result
---------------------------------------------------------------------
t
t
t
(3 rows)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
\c other_db5 - - :master_port
-- Test a scenario where create database fails because the database
-- already exists on another node and we don't crash etc.
CREATE DATABASE local_db;
ERROR: database "local_db" already exists
CONTEXT: while executing command on localhost:xxxxx
while executing command on localhost:xxxxx
\c regression - - -
SELECT * FROM public.check_database_on_all_nodes('local_db') ORDER BY node_type, result;
node_type | result
---------------------------------------------------------------------
coordinator (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": {"datacl": null, "datname": "local_db", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(3 rows)
\c - - - :worker_2_port
-- locally drop the database for cleanup purposes
SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO false$$);
result
---------------------------------------------------------------------
ALTER SYSTEM
ALTER SYSTEM
ALTER SYSTEM
(3 rows)
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
result
---------------------------------------------------------------------
t
t
t
(3 rows)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
DROP DATABASE local_db;
SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO true$$);
result
---------------------------------------------------------------------
ALTER SYSTEM
ALTER SYSTEM
ALTER SYSTEM
(3 rows)
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
result
---------------------------------------------------------------------
t
t
t
(3 rows)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
\c - - - :master_port \c - - - :master_port
DROP DATABASE other_db5;
SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO false$$);
result
---------------------------------------------------------------------
ALTER SYSTEM
ALTER SYSTEM
ALTER SYSTEM
(3 rows)
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
result
---------------------------------------------------------------------
t
t
t
(3 rows)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
DROP SCHEMA other_databases; DROP SCHEMA other_databases;
DROP DATABASE other_db1; DROP DATABASE other_db1;

View File

@ -75,9 +75,9 @@ DROP USER other_db_user9, nonsuperuser;
-- test from a worker -- test from a worker
\c - - - :worker_1_port \c - - - :worker_1_port
CREATE DATABASE other_db2; CREATE DATABASE worker_other_db;
\c other_db2 \c worker_other_db
CREATE USER worker_user1; CREATE USER worker_user1;
@ -98,9 +98,85 @@ SELECT usename FROM pg_user WHERE usename LIKE 'worker\_user%' ORDER BY 1;
-- some user creation commands will fail but let's make sure we try to drop them just in case -- some user creation commands will fail but let's make sure we try to drop them just in case
DROP USER IF EXISTS worker_user1, worker_user2, worker_user3; DROP USER IF EXISTS worker_user1, worker_user2, worker_user3;
\c - - - :worker_1_port -- test creating and dropping a database from a Citus non-main database
DROP DATABASE other_db2; SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO true$$);
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
SELECT pg_sleep(0.1);
\c other_db1
CREATE DATABASE other_db3;
\c regression
SELECT * FROM public.check_database_on_all_nodes('other_db3') ORDER BY node_type;
\c other_db1
DROP DATABASE other_db3;
\c regression
SELECT * FROM public.check_database_on_all_nodes('other_db3') ORDER BY node_type;
\c worker_other_db - - :worker_1_port
CREATE DATABASE other_db4;
\c regression
SELECT * FROM public.check_database_on_all_nodes('other_db4') ORDER BY node_type;
\c worker_other_db
DROP DATABASE other_db4;
\c regression
SELECT * FROM public.check_database_on_all_nodes('other_db4') ORDER BY node_type;
DROP DATABASE worker_other_db;
CREATE DATABASE other_db5;
-- disable create database propagation for the next test
SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO false$$);
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
SELECT pg_sleep(0.1);
\c other_db5 - - :worker_2_port
-- locally create a database
CREATE DATABASE local_db;
\c regression - - -
-- re-enable create database propagation
SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO true$$);
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
SELECT pg_sleep(0.1);
\c other_db5 - - :master_port
-- Test a scenario where create database fails because the database
-- already exists on another node and we don't crash etc.
CREATE DATABASE local_db;
\c regression - - -
SELECT * FROM public.check_database_on_all_nodes('local_db') ORDER BY node_type, result;
\c - - - :worker_2_port
-- locally drop the database for cleanup purposes
SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO false$$);
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
SELECT pg_sleep(0.1);
DROP DATABASE local_db;
SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO true$$);
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
SELECT pg_sleep(0.1);
\c - - - :master_port \c - - - :master_port
DROP DATABASE other_db5;
SELECT result FROM run_command_on_all_nodes($$ALTER SYSTEM SET citus.enable_create_database_propagation TO false$$);
SELECT result FROM run_command_on_all_nodes($$SELECT pg_reload_conf()$$);
SELECT pg_sleep(0.1);
DROP SCHEMA other_databases; DROP SCHEMA other_databases;
DROP DATABASE other_db1; DROP DATABASE other_db1;