use all nodes again

pull/7483/head
Onur Tirtir 2024-02-22 14:09:39 +03:00
parent 4c46d4b265
commit 63e0e8397e
3 changed files with 39 additions and 19 deletions

View File

@ -480,7 +480,7 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
* implicit transaction that creates a database. Also in this stage, we save the original
* database name and replace dbname field with a temporary name for failure handling
* purposes. We let Postgres create the database with the temporary name, insert a cleanup
* record for the temporary database name on all workers and let PostprocessCreateDatabaseStmt()
* record for the temporary database name on all nodes and let PostprocessCreateDatabaseStmt()
* to return the distributed DDL job that both creates the database with the temporary name
* and then renames it back to its original name.
*
@ -496,7 +496,7 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
return NIL;
}
EnsurePropagationToCoordinator();
EnsureCoordinatorIsInMetadata();
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
EnsureSupportedCreateDatabaseCommand(stmt);
@ -508,11 +508,7 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
char *tempDatabaseName = psprintf(TEMP_DATABASE_NAME_FMT,
operationId, GetLocalGroupId());
/*
* Temporary database creation on local node would anyway rollback
* in case of a failure, so only insert the records for remote nodes.
*/
List *remoteNodes = TargetWorkerSetNodeList(REMOTE_NODES, RowShareLock);
List *remoteNodes = TargetWorkerSetNodeList(ALL_SHARD_NODES, RowShareLock);
WorkerNode *remoteNode = NULL;
foreach_ptr(remoteNode, remoteNodes)
{

View File

@ -1,5 +1,11 @@
SET citus.enable_create_database_propagation TO ON;
SET client_min_messages TO WARNING;
SELECT 1 FROM citus_add_node('localhost', :master_port, 0);
?column?
---------------------------------------------------------------------
1
(1 row)
CREATE FUNCTION get_temp_databases_on_nodes()
RETURNS TEXT AS $func$
SELECT array_agg(DISTINCT result ORDER BY result) AS temp_databases_on_nodes FROM run_command_on_all_nodes($$SELECT datname FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$) WHERE result != '';
@ -66,9 +72,10 @@ SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
node_type | result
---------------------------------------------------------------------
coordinator (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(2 rows)
(3 rows)
SELECT citus.mitmproxy('conn.onQuery(query="^CREATE DATABASE").cancel(' || pg_backend_pid() || ')');
mitmproxy
@ -93,7 +100,7 @@ SELECT get_temp_databases_on_nodes();
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
citus_temp_database_4000_0 | 2
citus_temp_database_4000_0 | 3
(1 row)
CALL citus_cleanup_orphaned_resources();
@ -106,9 +113,10 @@ SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
node_type | result
---------------------------------------------------------------------
coordinator (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(2 rows)
(3 rows)
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER DATABASE").cancel(' || pg_backend_pid() || ')');
mitmproxy
@ -133,7 +141,7 @@ SELECT get_temp_databases_on_nodes();
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
citus_temp_database_4001_0 | 2
citus_temp_database_4001_0 | 3
(1 row)
CALL citus_cleanup_orphaned_resources();
@ -146,9 +154,10 @@ SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
node_type | result
---------------------------------------------------------------------
coordinator (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(2 rows)
(3 rows)
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
mitmproxy
@ -185,9 +194,10 @@ SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
node_type | result
---------------------------------------------------------------------
coordinator (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(2 rows)
(3 rows)
SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()');
mitmproxy
@ -213,7 +223,7 @@ SELECT get_temp_databases_on_nodes();
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
citus_temp_database_4002_0 | 2
citus_temp_database_4002_0 | 3
(1 row)
CALL citus_cleanup_orphaned_resources();
@ -226,9 +236,10 @@ SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
node_type | result
---------------------------------------------------------------------
coordinator (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(2 rows)
(3 rows)
SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT PREPARED").kill()');
mitmproxy
@ -262,9 +273,10 @@ SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
node_type | result
---------------------------------------------------------------------
coordinator (local) | {"database_properties": {"datacl": null, "datname": "db1", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": true, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": {"datacl": null, "datname": "db1", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": true, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": {"datacl": null, "datname": "db1", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "pg_default", "daticurules": null, "datallowconn": true, "datconnlimit": -1, "daticulocale": null, "datistemplate": false, "database_owner": "postgres", "datcollversion": null, "datlocprovider": "c"}, "pg_dist_object_record_for_db_exists": true, "stale_pg_dist_object_record_for_a_db_exists": false}
(2 rows)
(3 rows)
DROP DATABASE db1;
-- after recovering the prepared transactions, cleanup records should also be removed
@ -308,9 +320,10 @@ SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
node_type | result
---------------------------------------------------------------------
coordinator (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(2 rows)
(3 rows)
SELECT citus.mitmproxy('conn.onParse(query="^WITH distributed_object_data").kill()');
mitmproxy
@ -336,7 +349,7 @@ SELECT get_temp_databases_on_nodes();
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
citus_temp_database_4004_0 | 2
citus_temp_database_4004_0 | 3
(1 row)
CALL citus_cleanup_orphaned_resources();
@ -349,9 +362,10 @@ SELECT ensure_no_temp_databases_on_any_nodes();
SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, result;
node_type | result
---------------------------------------------------------------------
coordinator (local) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(2 rows)
(3 rows)
CREATE DATABASE db1;
-- show that a successful database creation doesn't leave any pg_dist_cleanup records behind
@ -364,3 +378,9 @@ DROP DATABASE db1;
DROP FUNCTION get_temp_databases_on_nodes();
DROP FUNCTION ensure_no_temp_databases_on_any_nodes();
DROP FUNCTION count_db_cleanup_records();
SELECT 1 FROM citus_remove_node('localhost', :master_port);
?column?
---------------------------------------------------------------------
1
(1 row)

View File

@ -1,6 +1,8 @@
SET citus.enable_create_database_propagation TO ON;
SET client_min_messages TO WARNING;
SELECT 1 FROM citus_add_node('localhost', :master_port, 0);
CREATE FUNCTION get_temp_databases_on_nodes()
RETURNS TEXT AS $func$
SELECT array_agg(DISTINCT result ORDER BY result) AS temp_databases_on_nodes FROM run_command_on_all_nodes($$SELECT datname FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$) WHERE result != '';
@ -122,3 +124,5 @@ DROP DATABASE db1;
DROP FUNCTION get_temp_databases_on_nodes();
DROP FUNCTION ensure_no_temp_databases_on_any_nodes();
DROP FUNCTION count_db_cleanup_records();
SELECT 1 FROM citus_remove_node('localhost', :master_port);