#6548 2PC recovery is extremely ineffective on a cluster with multiple DATABASEs fix (#7174)

pull/7159/head^2
Ivan Vyazmitinov 2023-09-04 21:28:22 +08:00 committed by GitHub
parent de9af078b0
commit e94bf93152
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 652 additions and 1 deletions

View File

@ -404,7 +404,7 @@ PendingWorkerTransactionList(MultiConnection *connection)
int32 coordinatorId = GetLocalGroupId();
appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts "
"WHERE gid LIKE 'citus\\_%d\\_%%'",
"WHERE gid LIKE 'citus\\_%d\\_%%' and database = current_database()",
coordinatorId);
int querySent = SendRemoteCommand(connection, command->data);

View File

@ -0,0 +1,364 @@
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT $definition$
CREATE OR REPLACE FUNCTION test.maintenance_worker()
RETURNS pg_stat_activity
LANGUAGE plpgsql
AS $$
DECLARE
activity record;
BEGIN
DO 'BEGIN END'; -- Force maintenance daemon to start
-- we don't want to wait forever; loop will exit after 20 seconds
FOR i IN 1 .. 200 LOOP
PERFORM pg_stat_clear_snapshot();
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = current_database();
IF activity.pid IS NOT NULL THEN
RETURN activity;
ELSE
PERFORM pg_sleep(0.1);
END IF ;
END LOOP;
-- fail if we reach the end of this loop
raise 'Waited too long for maintenance daemon to start';
END;
$$;
$definition$ create_function_test_maintenance_worker
\gset
CREATE DATABASE db1;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
SELECT oid AS db1_oid
FROM pg_database
WHERE datname = 'db1'
\gset
\c - - - :worker_1_port
CREATE DATABASE db1;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c - - - :worker_2_port
CREATE DATABASE db1;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c db1 - - :worker_1_port
CREATE EXTENSION citus;
\c db1 - - :worker_2_port
CREATE EXTENSION citus;
\c db1 - - :master_port
CREATE EXTENSION citus;
SELECT citus_add_node('localhost', :worker_1_port);
citus_add_node
---------------------------------------------------------------------
1
(1 row)
SELECT citus_add_node('localhost', :worker_2_port);
citus_add_node
---------------------------------------------------------------------
2
(1 row)
SELECT current_database();
current_database
---------------------------------------------------------------------
db1
(1 row)
CREATE SCHEMA test;
:create_function_test_maintenance_worker
-- check maintenance daemon is started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
datname | current_database | usename | extowner
---------------------------------------------------------------------
db1 | db1 | postgres | postgres
(1 row)
SELECT *
FROM pg_dist_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
---------------------------------------------------------------------
1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t
2 | 2 | localhost | 57638 | default | t | t | primary | default | t | t
(2 rows)
CREATE DATABASE db2;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
SELECT oid AS db2_oid
FROM pg_database
WHERE datname = 'db2'
\gset
\c - - - :worker_1_port
CREATE DATABASE db2;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c - - - :worker_2_port
CREATE DATABASE db2;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c db2 - - :worker_1_port
CREATE EXTENSION citus;
\c db2 - - :worker_2_port
CREATE EXTENSION citus;
\c db2 - - :master_port
CREATE EXTENSION citus;
SELECT citus_add_node('localhost', :worker_1_port);
citus_add_node
---------------------------------------------------------------------
1
(1 row)
SELECT citus_add_node('localhost', :worker_2_port);
citus_add_node
---------------------------------------------------------------------
2
(1 row)
SELECT current_database();
current_database
---------------------------------------------------------------------
db2
(1 row)
CREATE SCHEMA test;
:create_function_test_maintenance_worker
-- check maintenance daemon is started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
datname | current_database | usename | extowner
---------------------------------------------------------------------
db2 | db2 | postgres | postgres
(1 row)
SELECT *
FROM pg_dist_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
---------------------------------------------------------------------
1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t
2 | 2 | localhost | 57638 | default | t | t | primary | default | t | t
(2 rows)
SELECT groupid AS worker_1_group_id
FROM pg_dist_node
WHERE nodeport = :worker_1_port;
worker_1_group_id
---------------------------------------------------------------------
1
(1 row)
\gset
SELECT groupid AS worker_2_group_id
FROM pg_dist_node
WHERE nodeport = :worker_2_port;
worker_2_group_id
---------------------------------------------------------------------
2
(1 row)
\gset
-- Prepare transactions on first database
\c db1 - - :worker_1_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_1_db_1_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_1_db_1_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_1_db_1_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_1_db_1_name';
\c db1 - - :worker_2_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_2_db_1_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_2_db_1_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_2_db_1_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_2_db_1_name';
-- Prepare transactions on second database
\c db2 - - :worker_1_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_1_db_2_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_1_db_2_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_1_db_2_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_1_db_2_name';
\c db2 - - :worker_2_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_2_db_2_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_2_db_2_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_2_db_2_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_2_db_2_name';
\c db1 - - :master_port
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_1_name'),
(:worker_2_group_id, :'transaction_2_worker_2_db_1_name');
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid'),
(:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid');
\c db2 - - :master_port
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_2_name'),
(:worker_2_group_id, :'transaction_2_worker_2_db_2_name');
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid'),
(:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid');
\c db1 - - :master_port
SELECT count(*) != 0
FROM pg_dist_transaction;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT recover_prepared_transactions() > 0;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0
FROM pg_dist_transaction;
?column?
---------------------------------------------------------------------
t
(1 row)
\c db2 - - :master_port
SELECT count(*) != 0
FROM pg_dist_transaction;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT recover_prepared_transactions() > 0;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0
FROM pg_dist_transaction;
?column?
---------------------------------------------------------------------
t
(1 row)
\c regression - - :master_port
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db2;
\c - - - :worker_1_port
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db2;
\c - - - :worker_2_port
-- Count of terminated sessions is not important for the test,
-- it is just to make output predictable
SELECT count(pg_terminate_backend(pid)) >= 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) >= 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db2;

View File

@ -205,6 +205,7 @@ test: multi_modifying_xacts
test: multi_generate_ddl_commands
test: multi_create_shards
test: multi_transaction_recovery
test: multi_transaction_recovery_multiple_databases
test: local_dist_join_modifications
test: local_table_join

View File

@ -0,0 +1,286 @@
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
SELECT pg_reload_conf();
SELECT $definition$
CREATE OR REPLACE FUNCTION test.maintenance_worker()
RETURNS pg_stat_activity
LANGUAGE plpgsql
AS $$
DECLARE
activity record;
BEGIN
DO 'BEGIN END'; -- Force maintenance daemon to start
-- we don't want to wait forever; loop will exit after 20 seconds
FOR i IN 1 .. 200 LOOP
PERFORM pg_stat_clear_snapshot();
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = current_database();
IF activity.pid IS NOT NULL THEN
RETURN activity;
ELSE
PERFORM pg_sleep(0.1);
END IF ;
END LOOP;
-- fail if we reach the end of this loop
raise 'Waited too long for maintenance daemon to start';
END;
$$;
$definition$ create_function_test_maintenance_worker
\gset
CREATE DATABASE db1;
SELECT oid AS db1_oid
FROM pg_database
WHERE datname = 'db1'
\gset
\c - - - :worker_1_port
CREATE DATABASE db1;
\c - - - :worker_2_port
CREATE DATABASE db1;
\c db1 - - :worker_1_port
CREATE EXTENSION citus;
\c db1 - - :worker_2_port
CREATE EXTENSION citus;
\c db1 - - :master_port
CREATE EXTENSION citus;
SELECT citus_add_node('localhost', :worker_1_port);
SELECT citus_add_node('localhost', :worker_2_port);
SELECT current_database();
CREATE SCHEMA test;
:create_function_test_maintenance_worker
-- check maintenance daemon is started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
SELECT *
FROM pg_dist_node;
CREATE DATABASE db2;
SELECT oid AS db2_oid
FROM pg_database
WHERE datname = 'db2'
\gset
\c - - - :worker_1_port
CREATE DATABASE db2;
\c - - - :worker_2_port
CREATE DATABASE db2;
\c db2 - - :worker_1_port
CREATE EXTENSION citus;
\c db2 - - :worker_2_port
CREATE EXTENSION citus;
\c db2 - - :master_port
CREATE EXTENSION citus;
SELECT citus_add_node('localhost', :worker_1_port);
SELECT citus_add_node('localhost', :worker_2_port);
SELECT current_database();
CREATE SCHEMA test;
:create_function_test_maintenance_worker
-- check maintenance daemon is started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
SELECT *
FROM pg_dist_node;
SELECT groupid AS worker_1_group_id
FROM pg_dist_node
WHERE nodeport = :worker_1_port;
\gset
SELECT groupid AS worker_2_group_id
FROM pg_dist_node
WHERE nodeport = :worker_2_port;
\gset
-- Prepare transactions on first database
\c db1 - - :worker_1_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_1_db_1_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_1_db_1_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_1_db_1_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_1_db_1_name';
\c db1 - - :worker_2_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_2_db_1_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_2_db_1_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_2_db_1_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_2_db_1_name';
-- Prepare transactions on second database
\c db2 - - :worker_1_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_1_db_2_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_1_db_2_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_1_db_2_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_1_db_2_name';
\c db2 - - :worker_2_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_2_db_2_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_2_db_2_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_2_db_2_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_2_db_2_name';
\c db1 - - :master_port
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_1_name'),
(:worker_2_group_id, :'transaction_2_worker_2_db_1_name');
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid'),
(:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid');
\c db2 - - :master_port
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_2_name'),
(:worker_2_group_id, :'transaction_2_worker_2_db_2_name');
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid'),
(:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid');
\c db1 - - :master_port
SELECT count(*) != 0
FROM pg_dist_transaction;
SELECT recover_prepared_transactions() > 0;
SELECT count(*) = 0
FROM pg_dist_transaction;
\c db2 - - :master_port
SELECT count(*) != 0
FROM pg_dist_transaction;
SELECT recover_prepared_transactions() > 0;
SELECT count(*) = 0
FROM pg_dist_transaction;
\c regression - - :master_port
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
DROP DATABASE db2;
\c - - - :worker_1_port
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
DROP DATABASE db2;
\c - - - :worker_2_port
-- Count of terminated sessions is not important for the test,
-- it is just to make output predictable
SELECT count(pg_terminate_backend(pid)) >= 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) >= 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
DROP DATABASE db2;