diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 086d9360e..3b993250f 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1262,19 +1262,21 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, CreateTruncateTrigger(relationId); } - /* create shards for hash distributed and reference tables */ if (tableType == HASH_DISTRIBUTED) { + /* create shards for hash distributed table */ CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount, colocatedTableId, localTableEmpty); } else if (tableType == REFERENCE_TABLE) { + /* create shards for reference table */ CreateReferenceTableShard(relationId); } else if (tableType == SINGLE_SHARD_DISTRIBUTED) { + /* create the shard of given single-shard distributed table */ CreateSingleShardTableShard(relationId, colocatedTableId, colocationId); } @@ -1900,7 +1902,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount, /* - * CreateHashDistributedTableShards creates the shard of given single-shard + * CreateSingleShardTableShard creates the shard of given single-shard * distributed table. */ static void diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 26946515b..d0fcc9612 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -83,6 +83,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); List *insertedShardPlacements = NIL; + List *insertedShardIds = NIL; /* make sure table is hash partitioned */ CheckHashPartitionedTable(distributedTableId); @@ -174,7 +175,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, /* initialize the hash token space for this shard */ int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement); int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); - uint64 shardId = GetNextShardId(); + uint64 *shardIdPtr = (uint64 *) palloc0(sizeof(uint64)); + *shardIdPtr = GetNextShardId(); + insertedShardIds = lappend(insertedShardIds, shardIdPtr); /* if we are at the last shard, make sure the max token value is INT_MAX */ if (shardIndex == (shardCount - 1)) @@ -186,17 +189,27 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, text *minHashTokenText = IntegerToText(shardMinHashToken); text *maxHashTokenText = IntegerToText(shardMaxHashToken); - InsertShardRow(distributedTableId, shardId, shardStorageType, + InsertShardRow(distributedTableId, *shardIdPtr, shardStorageType, minHashTokenText, maxHashTokenText); - List *currentInsertedShardPlacements = InsertShardPlacementRows( - distributedTableId, - shardId, - workerNodeList, - roundRobinNodeIndex, - replicationFactor); + InsertShardPlacementRows(distributedTableId, + *shardIdPtr, + workerNodeList, + roundRobinNodeIndex, + replicationFactor); + } + + /* + * load shard placements for the shard at once after all placement insertions + * finished. This prevents MetadataCache from rebuilding unnecessarily after + * each placement insertion. + */ + uint64 *shardIdPtr; + foreach_ptr(shardIdPtr, insertedShardIds) + { + List *placementsForShard = ShardPlacementList(*shardIdPtr); insertedShardPlacements = list_concat(insertedShardPlacements, - currentInsertedShardPlacements); + placementsForShard); } CreateShardsOnWorkers(distributedTableId, insertedShardPlacements, @@ -292,7 +305,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool /* * load shard placements for the shard at once after all placement insertions - * finished. That prevents MetadataCache from rebuilding unnecessarily after + * finished. This prevents MetadataCache from rebuilding unnecessarily after * each placement insertion. */ uint64 *shardIdPtr; @@ -360,9 +373,18 @@ CreateReferenceTableShard(Oid distributedTableId) InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue, shardMaxValue); - List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId, - nodeList, workerStartIndex, - replicationFactor); + InsertShardPlacementRows(distributedTableId, + shardId, + nodeList, + workerStartIndex, + replicationFactor); + + /* + * load shard placements for the shard at once after all placement insertions + * finished. This prevents MetadataCache from rebuilding unnecessarily after + * each placement insertion. + */ + List *insertedShardPlacements = ShardPlacementList(shardId); CreateShardsOnWorkers(distributedTableId, insertedShardPlacements, useExclusiveConnection); @@ -408,12 +430,18 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio minHashTokenText, maxHashTokenText); int replicationFactor = 1; - List *insertedShardPlacements = InsertShardPlacementRows( - relationId, - shardId, - workerNodeList, - roundRobinNodeIdx, - replicationFactor); + InsertShardPlacementRows(relationId, + shardId, + workerNodeList, + roundRobinNodeIdx, + replicationFactor); + + /* + * load shard placements for the shard at once after all placement insertions + * finished. This prevents MetadataCache from rebuilding unnecessarily after + * each placement insertion. + */ + List *insertedShardPlacements = ShardPlacementList(shardId); /* * We don't need to force using exclusive connections because we're anyway diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index 402f949b2..421593c66 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -383,14 +383,13 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, /* * InsertShardPlacementRows inserts shard placements to the metadata table on - * the coordinator node. Then, returns the list of added shard placements. + * the coordinator node. */ -List * +void InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList, int workerStartIndex, int replicationFactor) { int workerNodeCount = list_length(workerNodeList); - List *insertedShardPlacements = NIL; for (int placementIndex = 0; placementIndex < replicationFactor; placementIndex++) { @@ -399,13 +398,11 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList, uint32 nodeGroupId = workerNode->groupId; const uint64 shardSize = 0; - uint64 shardPlacementId = InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, - shardSize, nodeGroupId); - ShardPlacement *shardPlacement = LoadShardPlacement(shardId, shardPlacementId); - insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement); + InsertShardPlacementRow(shardId, + INVALID_PLACEMENT_ID, + shardSize, + nodeGroupId); } - - return insertedShardPlacements; } diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index c2ccd2478..b46419dc2 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -404,7 +404,7 @@ PendingWorkerTransactionList(MultiConnection *connection) int32 coordinatorId = GetLocalGroupId(); appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts " - "WHERE gid LIKE 'citus\\_%d\\_%%'", + "WHERE gid LIKE 'citus\\_%d\\_%%' and database = current_database()", coordinatorId); int querySent = SendRemoteCommand(connection, command->data); diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index 4f8f12580..0dcc66141 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -251,9 +251,9 @@ extern void CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId replicationFactor); extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, bool useExclusiveConnection); -extern List * InsertShardPlacementRows(Oid relationId, int64 shardId, - List *workerNodeList, int workerStartIndex, - int replicationFactor); +extern void InsertShardPlacementRows(Oid relationId, int64 shardId, + List *workerNodeList, int workerStartIndex, + int replicationFactor); extern uint64 UpdateShardStatistics(int64 shardId); extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, int32 replicationFactor, diff --git a/src/test/regress/expected/multi_transaction_recovery_multiple_databases.out b/src/test/regress/expected/multi_transaction_recovery_multiple_databases.out new file mode 100644 index 000000000..2e396da7d --- /dev/null +++ b/src/test/regress/expected/multi_transaction_recovery_multiple_databases.out @@ -0,0 +1,364 @@ +ALTER SYSTEM SET citus.recover_2pc_interval TO -1; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT $definition$ +CREATE OR REPLACE FUNCTION test.maintenance_worker() + RETURNS pg_stat_activity + LANGUAGE plpgsql +AS $$ +DECLARE + activity record; +BEGIN + DO 'BEGIN END'; -- Force maintenance daemon to start + -- we don't want to wait forever; loop will exit after 20 seconds + FOR i IN 1 .. 200 LOOP + PERFORM pg_stat_clear_snapshot(); + SELECT * INTO activity FROM pg_stat_activity + WHERE application_name = 'Citus Maintenance Daemon' AND datname = current_database(); + IF activity.pid IS NOT NULL THEN + RETURN activity; + ELSE + PERFORM pg_sleep(0.1); + END IF ; + END LOOP; + -- fail if we reach the end of this loop + raise 'Waited too long for maintenance daemon to start'; +END; +$$; +$definition$ create_function_test_maintenance_worker +\gset +CREATE DATABASE db1; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +SELECT oid AS db1_oid +FROM pg_database +WHERE datname = 'db1' +\gset +\c - - - :worker_1_port +CREATE DATABASE db1; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c - - - :worker_2_port +CREATE DATABASE db1; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c db1 - - :worker_1_port +CREATE EXTENSION citus; +\c db1 - - :worker_2_port +CREATE EXTENSION citus; +\c db1 - - :master_port +CREATE EXTENSION citus; +SELECT citus_add_node('localhost', :worker_1_port); + citus_add_node +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_add_node('localhost', :worker_2_port); + citus_add_node +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT current_database(); + current_database +--------------------------------------------------------------------- + db1 +(1 row) + +CREATE SCHEMA test; +:create_function_test_maintenance_worker +-- check maintenance daemon is started +SELECT datname, current_database(), + usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus') +FROM test.maintenance_worker(); + datname | current_database | usename | extowner +--------------------------------------------------------------------- + db1 | db1 | postgres | postgres +(1 row) + +SELECT * +FROM pg_dist_node; + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards +--------------------------------------------------------------------- + 1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t + 2 | 2 | localhost | 57638 | default | t | t | primary | default | t | t +(2 rows) + +CREATE DATABASE db2; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +SELECT oid AS db2_oid +FROM pg_database +WHERE datname = 'db2' +\gset +\c - - - :worker_1_port +CREATE DATABASE db2; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c - - - :worker_2_port +CREATE DATABASE db2; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c db2 - - :worker_1_port +CREATE EXTENSION citus; +\c db2 - - :worker_2_port +CREATE EXTENSION citus; +\c db2 - - :master_port +CREATE EXTENSION citus; +SELECT citus_add_node('localhost', :worker_1_port); + citus_add_node +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_add_node('localhost', :worker_2_port); + citus_add_node +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT current_database(); + current_database +--------------------------------------------------------------------- + db2 +(1 row) + +CREATE SCHEMA test; +:create_function_test_maintenance_worker +-- check maintenance daemon is started +SELECT datname, current_database(), + usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus') +FROM test.maintenance_worker(); + datname | current_database | usename | extowner +--------------------------------------------------------------------- + db2 | db2 | postgres | postgres +(1 row) + +SELECT * +FROM pg_dist_node; + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards +--------------------------------------------------------------------- + 1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t + 2 | 2 | localhost | 57638 | default | t | t | primary | default | t | t +(2 rows) + +SELECT groupid AS worker_1_group_id +FROM pg_dist_node +WHERE nodeport = :worker_1_port; + worker_1_group_id +--------------------------------------------------------------------- + 1 +(1 row) + +\gset +SELECT groupid AS worker_2_group_id +FROM pg_dist_node +WHERE nodeport = :worker_2_port; + worker_2_group_id +--------------------------------------------------------------------- + 2 +(1 row) + +\gset +-- Prepare transactions on first database +\c db1 - - :worker_1_port +BEGIN; +CREATE TABLE should_abort +( + value int +); +SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_1_db_1_name +\gset +PREPARE TRANSACTION :'transaction_1_worker_1_db_1_name'; +BEGIN; +CREATE TABLE should_commit +( + value int +); +SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_1_db_1_name +\gset +PREPARE TRANSACTION :'transaction_2_worker_1_db_1_name'; +\c db1 - - :worker_2_port +BEGIN; +CREATE TABLE should_abort +( + value int +); +SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_2_db_1_name +\gset +PREPARE TRANSACTION :'transaction_1_worker_2_db_1_name'; +BEGIN; +CREATE TABLE should_commit +( + value int +); +SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_2_db_1_name +\gset +PREPARE TRANSACTION :'transaction_2_worker_2_db_1_name'; +-- Prepare transactions on second database +\c db2 - - :worker_1_port +BEGIN; +CREATE TABLE should_abort +( + value int +); +SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_1_db_2_name +\gset +PREPARE TRANSACTION :'transaction_1_worker_1_db_2_name'; +BEGIN; +CREATE TABLE should_commit +( + value int +); +SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_1_db_2_name +\gset +PREPARE TRANSACTION :'transaction_2_worker_1_db_2_name'; +\c db2 - - :worker_2_port +BEGIN; +CREATE TABLE should_abort +( + value int +); +SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_2_db_2_name +\gset +PREPARE TRANSACTION :'transaction_1_worker_2_db_2_name'; +BEGIN; +CREATE TABLE should_commit +( + value int +); +SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_2_db_2_name +\gset +PREPARE TRANSACTION :'transaction_2_worker_2_db_2_name'; +\c db1 - - :master_port +INSERT INTO pg_dist_transaction +VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_1_name'), + (:worker_2_group_id, :'transaction_2_worker_2_db_1_name'); +INSERT INTO pg_dist_transaction +VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid'), + (:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid'); +\c db2 - - :master_port +INSERT INTO pg_dist_transaction +VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_2_name'), + (:worker_2_group_id, :'transaction_2_worker_2_db_2_name'); +INSERT INTO pg_dist_transaction +VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid'), + (:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid'); +\c db1 - - :master_port +SELECT count(*) != 0 +FROM pg_dist_transaction; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT recover_prepared_transactions() > 0; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) = 0 +FROM pg_dist_transaction; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c db2 - - :master_port +SELECT count(*) != 0 +FROM pg_dist_transaction; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT recover_prepared_transactions() > 0; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) = 0 +FROM pg_dist_transaction; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c regression - - :master_port +SELECT count(pg_terminate_backend(pid)) > 0 +FROM pg_stat_activity +WHERE pid <> pg_backend_pid() + AND datname = 'db1' ; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +DROP DATABASE db1; +SELECT count(pg_terminate_backend(pid)) > 0 +FROM pg_stat_activity +WHERE pid <> pg_backend_pid() + AND datname = 'db2' ; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +DROP DATABASE db2; +\c - - - :worker_1_port +SELECT count(pg_terminate_backend(pid)) > 0 +FROM pg_stat_activity +WHERE pid <> pg_backend_pid() + AND datname = 'db1' ; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +DROP DATABASE db1; +SELECT count(pg_terminate_backend(pid)) > 0 +FROM pg_stat_activity +WHERE pid <> pg_backend_pid() + AND datname = 'db2' ; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +DROP DATABASE db2; +\c - - - :worker_2_port +-- Count of terminated sessions is not important for the test, +-- it is just to make output predictable +SELECT count(pg_terminate_backend(pid)) >= 0 +FROM pg_stat_activity +WHERE pid <> pg_backend_pid() + AND datname = 'db1' ; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +DROP DATABASE db1; +SELECT count(pg_terminate_backend(pid)) >= 0 +FROM pg_stat_activity +WHERE pid <> pg_backend_pid() + AND datname = 'db2' ; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +DROP DATABASE db2; diff --git a/src/test/regress/expected/pg16.out b/src/test/regress/expected/pg16.out index d2241c0c6..8c0fdc859 100644 --- a/src/test/regress/expected/pg16.out +++ b/src/test/regress/expected/pg16.out @@ -202,6 +202,118 @@ SELECT create_distributed_table('test_storage', 'a'); ALTER TABLE test_storage ALTER a SET STORAGE default; ERROR: alter table command is currently unsupported DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP|VALIDATE CONSTRAINT, SET (), RESET (), ENABLE|DISABLE|NO FORCE|FORCE ROW LEVEL SECURITY, ATTACH|DETACH PARTITION and TYPE subcommands are supported. +-- New ICU_RULES option added to CREATE DATABASE +-- Relevant PG commit: +-- https://github.com/postgres/postgres/commit/30a53b7 +CREATE DATABASE test_db WITH LOCALE_PROVIDER = 'icu' LOCALE = '' ICU_RULES = '&a < g' TEMPLATE = 'template0'; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +NOTICE: using standard form "und" for ICU locale "" +SELECT result FROM run_command_on_workers +($$CREATE DATABASE test_db WITH LOCALE_PROVIDER = 'icu' LOCALE = '' ICU_RULES = '&a < g' TEMPLATE = 'template0'$$); + result +--------------------------------------------------------------------- + CREATE DATABASE + CREATE DATABASE +(2 rows) + +CREATE TABLE test_db_table (a text); +SELECT create_distributed_table('test_db_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test_db_table VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green'); +-- icu default rules order +SELECT * FROM test_db_table ORDER BY a COLLATE "en-x-icu"; + a +--------------------------------------------------------------------- + Abernathy + apple + bird + Boston + Graham + green +(6 rows) + +-- regression database's default order +SELECT * FROM test_db_table ORDER BY a; + a +--------------------------------------------------------------------- + Abernathy + Boston + Graham + apple + bird + green +(6 rows) + +-- now see the order in the new database +\c test_db +CREATE EXTENSION citus; +\c - - - :worker_1_port +CREATE EXTENSION citus; +\c - - - :worker_2_port +CREATE EXTENSION citus; +\c - - - :master_port +SELECT 1 FROM citus_add_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +CREATE TABLE test_db_table (a text); +SELECT create_distributed_table('test_db_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test_db_table VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green'); +-- icu default rules order +SELECT * FROM test_db_table ORDER BY a COLLATE "en-x-icu"; + a +--------------------------------------------------------------------- + Abernathy + apple + bird + Boston + Graham + green +(6 rows) + +-- test_db database's default order with ICU_RULES = '&a < g' +SELECT * FROM test_db_table ORDER BY a; + a +--------------------------------------------------------------------- + Abernathy + apple + green + bird + Boston + Graham +(6 rows) + +\c regression +\c - - - :master_port +DROP DATABASE test_db; +SELECT result FROM run_command_on_workers +($$DROP DATABASE test_db$$); + result +--------------------------------------------------------------------- + DROP DATABASE + DROP DATABASE +(2 rows) + +SET search_path TO pg16; -- -- COPY FROM ... DEFAULT -- Already supported in Citus, adding all PG tests with a distributed table @@ -542,6 +654,28 @@ SELECT pg_get_viewdef('pg16.prop_view_1', true); \c - - - :master_port SET search_path TO pg16; +-- REINDEX DATABASE/SYSTEM name is optional +-- We already don't propagate these commands automatically +-- Testing here with run_command_on_workers +-- Relevant PG commit: https://github.com/postgres/postgres/commit/2cbc3c1 +REINDEX DATABASE; +SELECT result FROM run_command_on_workers +($$REINDEX DATABASE$$); + result +--------------------------------------------------------------------- + REINDEX + REINDEX +(2 rows) + +REINDEX SYSTEM; +SELECT result FROM run_command_on_workers +($$REINDEX SYSTEM$$); + result +--------------------------------------------------------------------- + REINDEX + REINDEX +(2 rows) + \set VERBOSITY terse SET client_min_messages TO ERROR; DROP SCHEMA pg16 CASCADE; diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 0c9ec4ab3..a43360d2f 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -206,6 +206,7 @@ test: multi_modifying_xacts test: multi_generate_ddl_commands test: multi_create_shards test: multi_transaction_recovery +test: multi_transaction_recovery_multiple_databases test: local_dist_join_modifications test: local_table_join diff --git a/src/test/regress/sql/multi_transaction_recovery_multiple_databases.sql b/src/test/regress/sql/multi_transaction_recovery_multiple_databases.sql new file mode 100644 index 000000000..768cd1628 --- /dev/null +++ b/src/test/regress/sql/multi_transaction_recovery_multiple_databases.sql @@ -0,0 +1,286 @@ +ALTER SYSTEM SET citus.recover_2pc_interval TO -1; +SELECT pg_reload_conf(); + +SELECT $definition$ +CREATE OR REPLACE FUNCTION test.maintenance_worker() + RETURNS pg_stat_activity + LANGUAGE plpgsql +AS $$ +DECLARE + activity record; +BEGIN + DO 'BEGIN END'; -- Force maintenance daemon to start + -- we don't want to wait forever; loop will exit after 20 seconds + FOR i IN 1 .. 200 LOOP + PERFORM pg_stat_clear_snapshot(); + SELECT * INTO activity FROM pg_stat_activity + WHERE application_name = 'Citus Maintenance Daemon' AND datname = current_database(); + IF activity.pid IS NOT NULL THEN + RETURN activity; + ELSE + PERFORM pg_sleep(0.1); + END IF ; + END LOOP; + -- fail if we reach the end of this loop + raise 'Waited too long for maintenance daemon to start'; +END; +$$; +$definition$ create_function_test_maintenance_worker +\gset + +CREATE DATABASE db1; + +SELECT oid AS db1_oid +FROM pg_database +WHERE datname = 'db1' +\gset + +\c - - - :worker_1_port + +CREATE DATABASE db1; + +\c - - - :worker_2_port + +CREATE DATABASE db1; + +\c db1 - - :worker_1_port + +CREATE EXTENSION citus; + +\c db1 - - :worker_2_port + +CREATE EXTENSION citus; + +\c db1 - - :master_port + +CREATE EXTENSION citus; + + +SELECT citus_add_node('localhost', :worker_1_port); +SELECT citus_add_node('localhost', :worker_2_port); + +SELECT current_database(); + +CREATE SCHEMA test; +:create_function_test_maintenance_worker + +-- check maintenance daemon is started +SELECT datname, current_database(), + usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus') +FROM test.maintenance_worker(); + +SELECT * +FROM pg_dist_node; + +CREATE DATABASE db2; + +SELECT oid AS db2_oid +FROM pg_database +WHERE datname = 'db2' +\gset + +\c - - - :worker_1_port +CREATE DATABASE db2; +\c - - - :worker_2_port +CREATE DATABASE db2; + + +\c db2 - - :worker_1_port +CREATE EXTENSION citus; +\c db2 - - :worker_2_port +CREATE EXTENSION citus; +\c db2 - - :master_port +CREATE EXTENSION citus; + +SELECT citus_add_node('localhost', :worker_1_port); +SELECT citus_add_node('localhost', :worker_2_port); + +SELECT current_database(); + +CREATE SCHEMA test; +:create_function_test_maintenance_worker + +-- check maintenance daemon is started +SELECT datname, current_database(), + usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus') +FROM test.maintenance_worker(); + +SELECT * +FROM pg_dist_node; + +SELECT groupid AS worker_1_group_id +FROM pg_dist_node +WHERE nodeport = :worker_1_port; +\gset + +SELECT groupid AS worker_2_group_id +FROM pg_dist_node +WHERE nodeport = :worker_2_port; +\gset + +-- Prepare transactions on first database +\c db1 - - :worker_1_port + +BEGIN; +CREATE TABLE should_abort +( + value int +); +SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_1_db_1_name +\gset +PREPARE TRANSACTION :'transaction_1_worker_1_db_1_name'; + +BEGIN; +CREATE TABLE should_commit +( + value int +); +SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_1_db_1_name +\gset +PREPARE TRANSACTION :'transaction_2_worker_1_db_1_name'; + +\c db1 - - :worker_2_port + +BEGIN; +CREATE TABLE should_abort +( + value int +); +SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_2_db_1_name +\gset +PREPARE TRANSACTION :'transaction_1_worker_2_db_1_name'; + +BEGIN; +CREATE TABLE should_commit +( + value int +); +SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_2_db_1_name +\gset +PREPARE TRANSACTION :'transaction_2_worker_2_db_1_name'; + +-- Prepare transactions on second database +\c db2 - - :worker_1_port + +BEGIN; +CREATE TABLE should_abort +( + value int +); +SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_1_db_2_name +\gset +PREPARE TRANSACTION :'transaction_1_worker_1_db_2_name'; + +BEGIN; +CREATE TABLE should_commit +( + value int +); +SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_1_db_2_name +\gset +PREPARE TRANSACTION :'transaction_2_worker_1_db_2_name'; + +\c db2 - - :worker_2_port + +BEGIN; +CREATE TABLE should_abort +( + value int +); +SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_2_db_2_name +\gset +PREPARE TRANSACTION :'transaction_1_worker_2_db_2_name'; + +BEGIN; +CREATE TABLE should_commit +( + value int +); +SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_2_db_2_name +\gset +PREPARE TRANSACTION :'transaction_2_worker_2_db_2_name'; + +\c db1 - - :master_port + +INSERT INTO pg_dist_transaction +VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_1_name'), + (:worker_2_group_id, :'transaction_2_worker_2_db_1_name'); +INSERT INTO pg_dist_transaction +VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid'), + (:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid'); + +\c db2 - - :master_port + +INSERT INTO pg_dist_transaction +VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_2_name'), + (:worker_2_group_id, :'transaction_2_worker_2_db_2_name'); +INSERT INTO pg_dist_transaction +VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid'), + (:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid'); + +\c db1 - - :master_port + +SELECT count(*) != 0 +FROM pg_dist_transaction; + +SELECT recover_prepared_transactions() > 0; + +SELECT count(*) = 0 +FROM pg_dist_transaction; + +\c db2 - - :master_port + +SELECT count(*) != 0 +FROM pg_dist_transaction; + +SELECT recover_prepared_transactions() > 0; + +SELECT count(*) = 0 +FROM pg_dist_transaction; + +\c regression - - :master_port + +SELECT count(pg_terminate_backend(pid)) > 0 +FROM pg_stat_activity +WHERE pid <> pg_backend_pid() + AND datname = 'db1' ; + +DROP DATABASE db1; + +SELECT count(pg_terminate_backend(pid)) > 0 +FROM pg_stat_activity +WHERE pid <> pg_backend_pid() + AND datname = 'db2' ; +DROP DATABASE db2; + +\c - - - :worker_1_port + +SELECT count(pg_terminate_backend(pid)) > 0 +FROM pg_stat_activity +WHERE pid <> pg_backend_pid() + AND datname = 'db1' ; + +DROP DATABASE db1; + +SELECT count(pg_terminate_backend(pid)) > 0 +FROM pg_stat_activity +WHERE pid <> pg_backend_pid() + AND datname = 'db2' ; +DROP DATABASE db2; + +\c - - - :worker_2_port + +-- Count of terminated sessions is not important for the test, +-- it is just to make output predictable +SELECT count(pg_terminate_backend(pid)) >= 0 +FROM pg_stat_activity +WHERE pid <> pg_backend_pid() + AND datname = 'db1' ; + +DROP DATABASE db1; + +SELECT count(pg_terminate_backend(pid)) >= 0 +FROM pg_stat_activity +WHERE pid <> pg_backend_pid() + AND datname = 'db2' ; +DROP DATABASE db2; diff --git a/src/test/regress/sql/pg16.sql b/src/test/regress/sql/pg16.sql index 4ce1321ee..1df96e6a7 100644 --- a/src/test/regress/sql/pg16.sql +++ b/src/test/regress/sql/pg16.sql @@ -104,6 +104,49 @@ SELECT result FROM run_command_on_all_nodes SELECT create_distributed_table('test_storage', 'a'); ALTER TABLE test_storage ALTER a SET STORAGE default; +-- New ICU_RULES option added to CREATE DATABASE +-- Relevant PG commit: +-- https://github.com/postgres/postgres/commit/30a53b7 + +CREATE DATABASE test_db WITH LOCALE_PROVIDER = 'icu' LOCALE = '' ICU_RULES = '&a < g' TEMPLATE = 'template0'; +SELECT result FROM run_command_on_workers +($$CREATE DATABASE test_db WITH LOCALE_PROVIDER = 'icu' LOCALE = '' ICU_RULES = '&a < g' TEMPLATE = 'template0'$$); + +CREATE TABLE test_db_table (a text); +SELECT create_distributed_table('test_db_table', 'a'); +INSERT INTO test_db_table VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green'); +-- icu default rules order +SELECT * FROM test_db_table ORDER BY a COLLATE "en-x-icu"; +-- regression database's default order +SELECT * FROM test_db_table ORDER BY a; + +-- now see the order in the new database +\c test_db +CREATE EXTENSION citus; +\c - - - :worker_1_port +CREATE EXTENSION citus; +\c - - - :worker_2_port +CREATE EXTENSION citus; +\c - - - :master_port + +SELECT 1 FROM citus_add_node('localhost', :worker_1_port); +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + +CREATE TABLE test_db_table (a text); +SELECT create_distributed_table('test_db_table', 'a'); +INSERT INTO test_db_table VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green'); +-- icu default rules order +SELECT * FROM test_db_table ORDER BY a COLLATE "en-x-icu"; +-- test_db database's default order with ICU_RULES = '&a < g' +SELECT * FROM test_db_table ORDER BY a; + +\c regression +\c - - - :master_port +DROP DATABASE test_db; +SELECT result FROM run_command_on_workers +($$DROP DATABASE test_db$$); +SET search_path TO pg16; + -- -- COPY FROM ... DEFAULT -- Already supported in Citus, adding all PG tests with a distributed table @@ -332,6 +375,19 @@ SELECT pg_get_viewdef('pg16.prop_view_1', true); \c - - - :master_port SET search_path TO pg16; +-- REINDEX DATABASE/SYSTEM name is optional +-- We already don't propagate these commands automatically +-- Testing here with run_command_on_workers +-- Relevant PG commit: https://github.com/postgres/postgres/commit/2cbc3c1 + +REINDEX DATABASE; +SELECT result FROM run_command_on_workers +($$REINDEX DATABASE$$); + +REINDEX SYSTEM; +SELECT result FROM run_command_on_workers +($$REINDEX SYSTEM$$); + \set VERBOSITY terse SET client_min_messages TO ERROR; DROP SCHEMA pg16 CASCADE;