Merge branch 'main' into alter_database_propagation

pull/7178/head
Gürkan İndibay 2023-09-04 18:53:41 +03:00 committed by GitHub
commit fafa508ce2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 902 additions and 34 deletions

View File

@ -1262,19 +1262,21 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
CreateTruncateTrigger(relationId);
}
/* create shards for hash distributed and reference tables */
if (tableType == HASH_DISTRIBUTED)
{
/* create shards for hash distributed table */
CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount,
colocatedTableId,
localTableEmpty);
}
else if (tableType == REFERENCE_TABLE)
{
/* create shards for reference table */
CreateReferenceTableShard(relationId);
}
else if (tableType == SINGLE_SHARD_DISTRIBUTED)
{
/* create the shard of given single-shard distributed table */
CreateSingleShardTableShard(relationId, colocatedTableId,
colocationId);
}
@ -1900,7 +1902,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
/*
* CreateHashDistributedTableShards creates the shard of given single-shard
* CreateSingleShardTableShard creates the shard of given single-shard
* distributed table.
*/
static void

View File

@ -83,6 +83,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
List *insertedShardPlacements = NIL;
List *insertedShardIds = NIL;
/* make sure table is hash partitioned */
CheckHashPartitionedTable(distributedTableId);
@ -174,7 +175,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* initialize the hash token space for this shard */
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
uint64 shardId = GetNextShardId();
uint64 *shardIdPtr = (uint64 *) palloc0(sizeof(uint64));
*shardIdPtr = GetNextShardId();
insertedShardIds = lappend(insertedShardIds, shardIdPtr);
/* if we are at the last shard, make sure the max token value is INT_MAX */
if (shardIndex == (shardCount - 1))
@ -186,17 +189,27 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
text *minHashTokenText = IntegerToText(shardMinHashToken);
text *maxHashTokenText = IntegerToText(shardMaxHashToken);
InsertShardRow(distributedTableId, shardId, shardStorageType,
InsertShardRow(distributedTableId, *shardIdPtr, shardStorageType,
minHashTokenText, maxHashTokenText);
List *currentInsertedShardPlacements = InsertShardPlacementRows(
distributedTableId,
shardId,
workerNodeList,
roundRobinNodeIndex,
replicationFactor);
InsertShardPlacementRows(distributedTableId,
*shardIdPtr,
workerNodeList,
roundRobinNodeIndex,
replicationFactor);
}
/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
uint64 *shardIdPtr;
foreach_ptr(shardIdPtr, insertedShardIds)
{
List *placementsForShard = ShardPlacementList(*shardIdPtr);
insertedShardPlacements = list_concat(insertedShardPlacements,
currentInsertedShardPlacements);
placementsForShard);
}
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
@ -292,7 +305,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
/*
* load shard placements for the shard at once after all placement insertions
* finished. That prevents MetadataCache from rebuilding unnecessarily after
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
uint64 *shardIdPtr;
@ -360,9 +373,18 @@ CreateReferenceTableShard(Oid distributedTableId)
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
shardMaxValue);
List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId,
nodeList, workerStartIndex,
replicationFactor);
InsertShardPlacementRows(distributedTableId,
shardId,
nodeList,
workerStartIndex,
replicationFactor);
/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedShardPlacements = ShardPlacementList(shardId);
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnection);
@ -408,12 +430,18 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
minHashTokenText, maxHashTokenText);
int replicationFactor = 1;
List *insertedShardPlacements = InsertShardPlacementRows(
relationId,
shardId,
workerNodeList,
roundRobinNodeIdx,
replicationFactor);
InsertShardPlacementRows(relationId,
shardId,
workerNodeList,
roundRobinNodeIdx,
replicationFactor);
/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedShardPlacements = ShardPlacementList(shardId);
/*
* We don't need to force using exclusive connections because we're anyway

View File

@ -383,14 +383,13 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
/*
* InsertShardPlacementRows inserts shard placements to the metadata table on
* the coordinator node. Then, returns the list of added shard placements.
* the coordinator node.
*/
List *
void
InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
int workerStartIndex, int replicationFactor)
{
int workerNodeCount = list_length(workerNodeList);
List *insertedShardPlacements = NIL;
for (int placementIndex = 0; placementIndex < replicationFactor; placementIndex++)
{
@ -399,13 +398,11 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
uint32 nodeGroupId = workerNode->groupId;
const uint64 shardSize = 0;
uint64 shardPlacementId = InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID,
shardSize, nodeGroupId);
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, shardPlacementId);
insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement);
InsertShardPlacementRow(shardId,
INVALID_PLACEMENT_ID,
shardSize,
nodeGroupId);
}
return insertedShardPlacements;
}

View File

@ -404,7 +404,7 @@ PendingWorkerTransactionList(MultiConnection *connection)
int32 coordinatorId = GetLocalGroupId();
appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts "
"WHERE gid LIKE 'citus\\_%d\\_%%'",
"WHERE gid LIKE 'citus\\_%d\\_%%' and database = current_database()",
coordinatorId);
int querySent = SendRemoteCommand(connection, command->data);

View File

@ -251,9 +251,9 @@ extern void CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId
replicationFactor);
extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
bool useExclusiveConnection);
extern List * InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern void InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor,

View File

@ -0,0 +1,364 @@
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT $definition$
CREATE OR REPLACE FUNCTION test.maintenance_worker()
RETURNS pg_stat_activity
LANGUAGE plpgsql
AS $$
DECLARE
activity record;
BEGIN
DO 'BEGIN END'; -- Force maintenance daemon to start
-- we don't want to wait forever; loop will exit after 20 seconds
FOR i IN 1 .. 200 LOOP
PERFORM pg_stat_clear_snapshot();
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = current_database();
IF activity.pid IS NOT NULL THEN
RETURN activity;
ELSE
PERFORM pg_sleep(0.1);
END IF ;
END LOOP;
-- fail if we reach the end of this loop
raise 'Waited too long for maintenance daemon to start';
END;
$$;
$definition$ create_function_test_maintenance_worker
\gset
CREATE DATABASE db1;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
SELECT oid AS db1_oid
FROM pg_database
WHERE datname = 'db1'
\gset
\c - - - :worker_1_port
CREATE DATABASE db1;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c - - - :worker_2_port
CREATE DATABASE db1;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c db1 - - :worker_1_port
CREATE EXTENSION citus;
\c db1 - - :worker_2_port
CREATE EXTENSION citus;
\c db1 - - :master_port
CREATE EXTENSION citus;
SELECT citus_add_node('localhost', :worker_1_port);
citus_add_node
---------------------------------------------------------------------
1
(1 row)
SELECT citus_add_node('localhost', :worker_2_port);
citus_add_node
---------------------------------------------------------------------
2
(1 row)
SELECT current_database();
current_database
---------------------------------------------------------------------
db1
(1 row)
CREATE SCHEMA test;
:create_function_test_maintenance_worker
-- check maintenance daemon is started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
datname | current_database | usename | extowner
---------------------------------------------------------------------
db1 | db1 | postgres | postgres
(1 row)
SELECT *
FROM pg_dist_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
---------------------------------------------------------------------
1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t
2 | 2 | localhost | 57638 | default | t | t | primary | default | t | t
(2 rows)
CREATE DATABASE db2;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
SELECT oid AS db2_oid
FROM pg_database
WHERE datname = 'db2'
\gset
\c - - - :worker_1_port
CREATE DATABASE db2;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c - - - :worker_2_port
CREATE DATABASE db2;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c db2 - - :worker_1_port
CREATE EXTENSION citus;
\c db2 - - :worker_2_port
CREATE EXTENSION citus;
\c db2 - - :master_port
CREATE EXTENSION citus;
SELECT citus_add_node('localhost', :worker_1_port);
citus_add_node
---------------------------------------------------------------------
1
(1 row)
SELECT citus_add_node('localhost', :worker_2_port);
citus_add_node
---------------------------------------------------------------------
2
(1 row)
SELECT current_database();
current_database
---------------------------------------------------------------------
db2
(1 row)
CREATE SCHEMA test;
:create_function_test_maintenance_worker
-- check maintenance daemon is started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
datname | current_database | usename | extowner
---------------------------------------------------------------------
db2 | db2 | postgres | postgres
(1 row)
SELECT *
FROM pg_dist_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
---------------------------------------------------------------------
1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t
2 | 2 | localhost | 57638 | default | t | t | primary | default | t | t
(2 rows)
SELECT groupid AS worker_1_group_id
FROM pg_dist_node
WHERE nodeport = :worker_1_port;
worker_1_group_id
---------------------------------------------------------------------
1
(1 row)
\gset
SELECT groupid AS worker_2_group_id
FROM pg_dist_node
WHERE nodeport = :worker_2_port;
worker_2_group_id
---------------------------------------------------------------------
2
(1 row)
\gset
-- Prepare transactions on first database
\c db1 - - :worker_1_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_1_db_1_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_1_db_1_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_1_db_1_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_1_db_1_name';
\c db1 - - :worker_2_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_2_db_1_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_2_db_1_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_2_db_1_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_2_db_1_name';
-- Prepare transactions on second database
\c db2 - - :worker_1_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_1_db_2_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_1_db_2_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_1_db_2_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_1_db_2_name';
\c db2 - - :worker_2_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_2_db_2_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_2_db_2_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_2_db_2_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_2_db_2_name';
\c db1 - - :master_port
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_1_name'),
(:worker_2_group_id, :'transaction_2_worker_2_db_1_name');
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid'),
(:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid');
\c db2 - - :master_port
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_2_name'),
(:worker_2_group_id, :'transaction_2_worker_2_db_2_name');
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid'),
(:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid');
\c db1 - - :master_port
SELECT count(*) != 0
FROM pg_dist_transaction;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT recover_prepared_transactions() > 0;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0
FROM pg_dist_transaction;
?column?
---------------------------------------------------------------------
t
(1 row)
\c db2 - - :master_port
SELECT count(*) != 0
FROM pg_dist_transaction;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT recover_prepared_transactions() > 0;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0
FROM pg_dist_transaction;
?column?
---------------------------------------------------------------------
t
(1 row)
\c regression - - :master_port
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db2;
\c - - - :worker_1_port
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db2;
\c - - - :worker_2_port
-- Count of terminated sessions is not important for the test,
-- it is just to make output predictable
SELECT count(pg_terminate_backend(pid)) >= 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) >= 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db2;

View File

@ -202,6 +202,118 @@ SELECT create_distributed_table('test_storage', 'a');
ALTER TABLE test_storage ALTER a SET STORAGE default;
ERROR: alter table command is currently unsupported
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP|VALIDATE CONSTRAINT, SET (), RESET (), ENABLE|DISABLE|NO FORCE|FORCE ROW LEVEL SECURITY, ATTACH|DETACH PARTITION and TYPE subcommands are supported.
-- New ICU_RULES option added to CREATE DATABASE
-- Relevant PG commit:
-- https://github.com/postgres/postgres/commit/30a53b7
CREATE DATABASE test_db WITH LOCALE_PROVIDER = 'icu' LOCALE = '' ICU_RULES = '&a < g' TEMPLATE = 'template0';
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
NOTICE: using standard form "und" for ICU locale ""
SELECT result FROM run_command_on_workers
($$CREATE DATABASE test_db WITH LOCALE_PROVIDER = 'icu' LOCALE = '' ICU_RULES = '&a < g' TEMPLATE = 'template0'$$);
result
---------------------------------------------------------------------
CREATE DATABASE
CREATE DATABASE
(2 rows)
CREATE TABLE test_db_table (a text);
SELECT create_distributed_table('test_db_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_db_table VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green');
-- icu default rules order
SELECT * FROM test_db_table ORDER BY a COLLATE "en-x-icu";
a
---------------------------------------------------------------------
Abernathy
apple
bird
Boston
Graham
green
(6 rows)
-- regression database's default order
SELECT * FROM test_db_table ORDER BY a;
a
---------------------------------------------------------------------
Abernathy
Boston
Graham
apple
bird
green
(6 rows)
-- now see the order in the new database
\c test_db
CREATE EXTENSION citus;
\c - - - :worker_1_port
CREATE EXTENSION citus;
\c - - - :worker_2_port
CREATE EXTENSION citus;
\c - - - :master_port
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
CREATE TABLE test_db_table (a text);
SELECT create_distributed_table('test_db_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_db_table VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green');
-- icu default rules order
SELECT * FROM test_db_table ORDER BY a COLLATE "en-x-icu";
a
---------------------------------------------------------------------
Abernathy
apple
bird
Boston
Graham
green
(6 rows)
-- test_db database's default order with ICU_RULES = '&a < g'
SELECT * FROM test_db_table ORDER BY a;
a
---------------------------------------------------------------------
Abernathy
apple
green
bird
Boston
Graham
(6 rows)
\c regression
\c - - - :master_port
DROP DATABASE test_db;
SELECT result FROM run_command_on_workers
($$DROP DATABASE test_db$$);
result
---------------------------------------------------------------------
DROP DATABASE
DROP DATABASE
(2 rows)
SET search_path TO pg16;
--
-- COPY FROM ... DEFAULT
-- Already supported in Citus, adding all PG tests with a distributed table
@ -542,6 +654,28 @@ SELECT pg_get_viewdef('pg16.prop_view_1', true);
\c - - - :master_port
SET search_path TO pg16;
-- REINDEX DATABASE/SYSTEM name is optional
-- We already don't propagate these commands automatically
-- Testing here with run_command_on_workers
-- Relevant PG commit: https://github.com/postgres/postgres/commit/2cbc3c1
REINDEX DATABASE;
SELECT result FROM run_command_on_workers
($$REINDEX DATABASE$$);
result
---------------------------------------------------------------------
REINDEX
REINDEX
(2 rows)
REINDEX SYSTEM;
SELECT result FROM run_command_on_workers
($$REINDEX SYSTEM$$);
result
---------------------------------------------------------------------
REINDEX
REINDEX
(2 rows)
\set VERBOSITY terse
SET client_min_messages TO ERROR;
DROP SCHEMA pg16 CASCADE;

View File

@ -206,6 +206,7 @@ test: multi_modifying_xacts
test: multi_generate_ddl_commands
test: multi_create_shards
test: multi_transaction_recovery
test: multi_transaction_recovery_multiple_databases
test: local_dist_join_modifications
test: local_table_join

View File

@ -0,0 +1,286 @@
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
SELECT pg_reload_conf();
SELECT $definition$
CREATE OR REPLACE FUNCTION test.maintenance_worker()
RETURNS pg_stat_activity
LANGUAGE plpgsql
AS $$
DECLARE
activity record;
BEGIN
DO 'BEGIN END'; -- Force maintenance daemon to start
-- we don't want to wait forever; loop will exit after 20 seconds
FOR i IN 1 .. 200 LOOP
PERFORM pg_stat_clear_snapshot();
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = current_database();
IF activity.pid IS NOT NULL THEN
RETURN activity;
ELSE
PERFORM pg_sleep(0.1);
END IF ;
END LOOP;
-- fail if we reach the end of this loop
raise 'Waited too long for maintenance daemon to start';
END;
$$;
$definition$ create_function_test_maintenance_worker
\gset
CREATE DATABASE db1;
SELECT oid AS db1_oid
FROM pg_database
WHERE datname = 'db1'
\gset
\c - - - :worker_1_port
CREATE DATABASE db1;
\c - - - :worker_2_port
CREATE DATABASE db1;
\c db1 - - :worker_1_port
CREATE EXTENSION citus;
\c db1 - - :worker_2_port
CREATE EXTENSION citus;
\c db1 - - :master_port
CREATE EXTENSION citus;
SELECT citus_add_node('localhost', :worker_1_port);
SELECT citus_add_node('localhost', :worker_2_port);
SELECT current_database();
CREATE SCHEMA test;
:create_function_test_maintenance_worker
-- check maintenance daemon is started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
SELECT *
FROM pg_dist_node;
CREATE DATABASE db2;
SELECT oid AS db2_oid
FROM pg_database
WHERE datname = 'db2'
\gset
\c - - - :worker_1_port
CREATE DATABASE db2;
\c - - - :worker_2_port
CREATE DATABASE db2;
\c db2 - - :worker_1_port
CREATE EXTENSION citus;
\c db2 - - :worker_2_port
CREATE EXTENSION citus;
\c db2 - - :master_port
CREATE EXTENSION citus;
SELECT citus_add_node('localhost', :worker_1_port);
SELECT citus_add_node('localhost', :worker_2_port);
SELECT current_database();
CREATE SCHEMA test;
:create_function_test_maintenance_worker
-- check maintenance daemon is started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
SELECT *
FROM pg_dist_node;
SELECT groupid AS worker_1_group_id
FROM pg_dist_node
WHERE nodeport = :worker_1_port;
\gset
SELECT groupid AS worker_2_group_id
FROM pg_dist_node
WHERE nodeport = :worker_2_port;
\gset
-- Prepare transactions on first database
\c db1 - - :worker_1_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_1_db_1_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_1_db_1_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_1_db_1_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_1_db_1_name';
\c db1 - - :worker_2_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_2_db_1_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_2_db_1_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_2_db_1_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_2_db_1_name';
-- Prepare transactions on second database
\c db2 - - :worker_1_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_1_db_2_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_1_db_2_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_1_db_2_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_1_db_2_name';
\c db2 - - :worker_2_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_2_db_2_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_2_db_2_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_2_db_2_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_2_db_2_name';
\c db1 - - :master_port
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_1_name'),
(:worker_2_group_id, :'transaction_2_worker_2_db_1_name');
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid'),
(:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid');
\c db2 - - :master_port
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_2_name'),
(:worker_2_group_id, :'transaction_2_worker_2_db_2_name');
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid'),
(:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid');
\c db1 - - :master_port
SELECT count(*) != 0
FROM pg_dist_transaction;
SELECT recover_prepared_transactions() > 0;
SELECT count(*) = 0
FROM pg_dist_transaction;
\c db2 - - :master_port
SELECT count(*) != 0
FROM pg_dist_transaction;
SELECT recover_prepared_transactions() > 0;
SELECT count(*) = 0
FROM pg_dist_transaction;
\c regression - - :master_port
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
DROP DATABASE db2;
\c - - - :worker_1_port
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
DROP DATABASE db2;
\c - - - :worker_2_port
-- Count of terminated sessions is not important for the test,
-- it is just to make output predictable
SELECT count(pg_terminate_backend(pid)) >= 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) >= 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
DROP DATABASE db2;

View File

@ -104,6 +104,49 @@ SELECT result FROM run_command_on_all_nodes
SELECT create_distributed_table('test_storage', 'a');
ALTER TABLE test_storage ALTER a SET STORAGE default;
-- New ICU_RULES option added to CREATE DATABASE
-- Relevant PG commit:
-- https://github.com/postgres/postgres/commit/30a53b7
CREATE DATABASE test_db WITH LOCALE_PROVIDER = 'icu' LOCALE = '' ICU_RULES = '&a < g' TEMPLATE = 'template0';
SELECT result FROM run_command_on_workers
($$CREATE DATABASE test_db WITH LOCALE_PROVIDER = 'icu' LOCALE = '' ICU_RULES = '&a < g' TEMPLATE = 'template0'$$);
CREATE TABLE test_db_table (a text);
SELECT create_distributed_table('test_db_table', 'a');
INSERT INTO test_db_table VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green');
-- icu default rules order
SELECT * FROM test_db_table ORDER BY a COLLATE "en-x-icu";
-- regression database's default order
SELECT * FROM test_db_table ORDER BY a;
-- now see the order in the new database
\c test_db
CREATE EXTENSION citus;
\c - - - :worker_1_port
CREATE EXTENSION citus;
\c - - - :worker_2_port
CREATE EXTENSION citus;
\c - - - :master_port
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
CREATE TABLE test_db_table (a text);
SELECT create_distributed_table('test_db_table', 'a');
INSERT INTO test_db_table VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green');
-- icu default rules order
SELECT * FROM test_db_table ORDER BY a COLLATE "en-x-icu";
-- test_db database's default order with ICU_RULES = '&a < g'
SELECT * FROM test_db_table ORDER BY a;
\c regression
\c - - - :master_port
DROP DATABASE test_db;
SELECT result FROM run_command_on_workers
($$DROP DATABASE test_db$$);
SET search_path TO pg16;
--
-- COPY FROM ... DEFAULT
-- Already supported in Citus, adding all PG tests with a distributed table
@ -332,6 +375,19 @@ SELECT pg_get_viewdef('pg16.prop_view_1', true);
\c - - - :master_port
SET search_path TO pg16;
-- REINDEX DATABASE/SYSTEM name is optional
-- We already don't propagate these commands automatically
-- Testing here with run_command_on_workers
-- Relevant PG commit: https://github.com/postgres/postgres/commit/2cbc3c1
REINDEX DATABASE;
SELECT result FROM run_command_on_workers
($$REINDEX DATABASE$$);
REINDEX SYSTEM;
SELECT result FROM run_command_on_workers
($$REINDEX SYSTEM$$);
\set VERBOSITY terse
SET client_min_messages TO ERROR;
DROP SCHEMA pg16 CASCADE;