Merge branch 'alter_database_propagation' of https://github.com/citusdata/citus into alter_database_propagation

pull/7178/head
gindibay 2023-09-04 19:11:06 +03:00
commit 03d34fc1d5
10 changed files with 902 additions and 34 deletions

View File

@ -1262,19 +1262,21 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
CreateTruncateTrigger(relationId); CreateTruncateTrigger(relationId);
} }
/* create shards for hash distributed and reference tables */
if (tableType == HASH_DISTRIBUTED) if (tableType == HASH_DISTRIBUTED)
{ {
/* create shards for hash distributed table */
CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount, CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount,
colocatedTableId, colocatedTableId,
localTableEmpty); localTableEmpty);
} }
else if (tableType == REFERENCE_TABLE) else if (tableType == REFERENCE_TABLE)
{ {
/* create shards for reference table */
CreateReferenceTableShard(relationId); CreateReferenceTableShard(relationId);
} }
else if (tableType == SINGLE_SHARD_DISTRIBUTED) else if (tableType == SINGLE_SHARD_DISTRIBUTED)
{ {
/* create the shard of given single-shard distributed table */
CreateSingleShardTableShard(relationId, colocatedTableId, CreateSingleShardTableShard(relationId, colocatedTableId,
colocationId); colocationId);
} }
@ -1900,7 +1902,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
/* /*
* CreateHashDistributedTableShards creates the shard of given single-shard * CreateSingleShardTableShard creates the shard of given single-shard
* distributed table. * distributed table.
*/ */
static void static void

View File

@ -83,6 +83,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
{ {
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
List *insertedShardPlacements = NIL; List *insertedShardPlacements = NIL;
List *insertedShardIds = NIL;
/* make sure table is hash partitioned */ /* make sure table is hash partitioned */
CheckHashPartitionedTable(distributedTableId); CheckHashPartitionedTable(distributedTableId);
@ -174,7 +175,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* initialize the hash token space for this shard */ /* initialize the hash token space for this shard */
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement); int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
uint64 shardId = GetNextShardId(); uint64 *shardIdPtr = (uint64 *) palloc0(sizeof(uint64));
*shardIdPtr = GetNextShardId();
insertedShardIds = lappend(insertedShardIds, shardIdPtr);
/* if we are at the last shard, make sure the max token value is INT_MAX */ /* if we are at the last shard, make sure the max token value is INT_MAX */
if (shardIndex == (shardCount - 1)) if (shardIndex == (shardCount - 1))
@ -186,17 +189,27 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
text *minHashTokenText = IntegerToText(shardMinHashToken); text *minHashTokenText = IntegerToText(shardMinHashToken);
text *maxHashTokenText = IntegerToText(shardMaxHashToken); text *maxHashTokenText = IntegerToText(shardMaxHashToken);
InsertShardRow(distributedTableId, shardId, shardStorageType, InsertShardRow(distributedTableId, *shardIdPtr, shardStorageType,
minHashTokenText, maxHashTokenText); minHashTokenText, maxHashTokenText);
List *currentInsertedShardPlacements = InsertShardPlacementRows( InsertShardPlacementRows(distributedTableId,
distributedTableId, *shardIdPtr,
shardId, workerNodeList,
workerNodeList, roundRobinNodeIndex,
roundRobinNodeIndex, replicationFactor);
replicationFactor); }
/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
uint64 *shardIdPtr;
foreach_ptr(shardIdPtr, insertedShardIds)
{
List *placementsForShard = ShardPlacementList(*shardIdPtr);
insertedShardPlacements = list_concat(insertedShardPlacements, insertedShardPlacements = list_concat(insertedShardPlacements,
currentInsertedShardPlacements); placementsForShard);
} }
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements, CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
@ -292,7 +305,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
/* /*
* load shard placements for the shard at once after all placement insertions * load shard placements for the shard at once after all placement insertions
* finished. That prevents MetadataCache from rebuilding unnecessarily after * finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion. * each placement insertion.
*/ */
uint64 *shardIdPtr; uint64 *shardIdPtr;
@ -360,9 +373,18 @@ CreateReferenceTableShard(Oid distributedTableId)
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue, InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
shardMaxValue); shardMaxValue);
List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId, InsertShardPlacementRows(distributedTableId,
nodeList, workerStartIndex, shardId,
replicationFactor); nodeList,
workerStartIndex,
replicationFactor);
/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedShardPlacements = ShardPlacementList(shardId);
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements, CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnection); useExclusiveConnection);
@ -408,12 +430,18 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
minHashTokenText, maxHashTokenText); minHashTokenText, maxHashTokenText);
int replicationFactor = 1; int replicationFactor = 1;
List *insertedShardPlacements = InsertShardPlacementRows( InsertShardPlacementRows(relationId,
relationId, shardId,
shardId, workerNodeList,
workerNodeList, roundRobinNodeIdx,
roundRobinNodeIdx, replicationFactor);
replicationFactor);
/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedShardPlacements = ShardPlacementList(shardId);
/* /*
* We don't need to force using exclusive connections because we're anyway * We don't need to force using exclusive connections because we're anyway

View File

@ -383,14 +383,13 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
/* /*
* InsertShardPlacementRows inserts shard placements to the metadata table on * InsertShardPlacementRows inserts shard placements to the metadata table on
* the coordinator node. Then, returns the list of added shard placements. * the coordinator node.
*/ */
List * void
InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList, InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
int workerStartIndex, int replicationFactor) int workerStartIndex, int replicationFactor)
{ {
int workerNodeCount = list_length(workerNodeList); int workerNodeCount = list_length(workerNodeList);
List *insertedShardPlacements = NIL;
for (int placementIndex = 0; placementIndex < replicationFactor; placementIndex++) for (int placementIndex = 0; placementIndex < replicationFactor; placementIndex++)
{ {
@ -399,13 +398,11 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
uint32 nodeGroupId = workerNode->groupId; uint32 nodeGroupId = workerNode->groupId;
const uint64 shardSize = 0; const uint64 shardSize = 0;
uint64 shardPlacementId = InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, InsertShardPlacementRow(shardId,
shardSize, nodeGroupId); INVALID_PLACEMENT_ID,
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, shardPlacementId); shardSize,
insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement); nodeGroupId);
} }
return insertedShardPlacements;
} }

View File

@ -404,7 +404,7 @@ PendingWorkerTransactionList(MultiConnection *connection)
int32 coordinatorId = GetLocalGroupId(); int32 coordinatorId = GetLocalGroupId();
appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts " appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts "
"WHERE gid LIKE 'citus\\_%d\\_%%'", "WHERE gid LIKE 'citus\\_%d\\_%%' and database = current_database()",
coordinatorId); coordinatorId);
int querySent = SendRemoteCommand(connection, command->data); int querySent = SendRemoteCommand(connection, command->data);

View File

@ -251,9 +251,9 @@ extern void CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId
replicationFactor); replicationFactor);
extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
bool useExclusiveConnection); bool useExclusiveConnection);
extern List * InsertShardPlacementRows(Oid relationId, int64 shardId, extern void InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex, List *workerNodeList, int workerStartIndex,
int replicationFactor); int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId); extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor, int32 replicationFactor,

View File

@ -0,0 +1,364 @@
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT $definition$
CREATE OR REPLACE FUNCTION test.maintenance_worker()
RETURNS pg_stat_activity
LANGUAGE plpgsql
AS $$
DECLARE
activity record;
BEGIN
DO 'BEGIN END'; -- Force maintenance daemon to start
-- we don't want to wait forever; loop will exit after 20 seconds
FOR i IN 1 .. 200 LOOP
PERFORM pg_stat_clear_snapshot();
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = current_database();
IF activity.pid IS NOT NULL THEN
RETURN activity;
ELSE
PERFORM pg_sleep(0.1);
END IF ;
END LOOP;
-- fail if we reach the end of this loop
raise 'Waited too long for maintenance daemon to start';
END;
$$;
$definition$ create_function_test_maintenance_worker
\gset
CREATE DATABASE db1;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
SELECT oid AS db1_oid
FROM pg_database
WHERE datname = 'db1'
\gset
\c - - - :worker_1_port
CREATE DATABASE db1;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c - - - :worker_2_port
CREATE DATABASE db1;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c db1 - - :worker_1_port
CREATE EXTENSION citus;
\c db1 - - :worker_2_port
CREATE EXTENSION citus;
\c db1 - - :master_port
CREATE EXTENSION citus;
SELECT citus_add_node('localhost', :worker_1_port);
citus_add_node
---------------------------------------------------------------------
1
(1 row)
SELECT citus_add_node('localhost', :worker_2_port);
citus_add_node
---------------------------------------------------------------------
2
(1 row)
SELECT current_database();
current_database
---------------------------------------------------------------------
db1
(1 row)
CREATE SCHEMA test;
:create_function_test_maintenance_worker
-- check maintenance daemon is started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
datname | current_database | usename | extowner
---------------------------------------------------------------------
db1 | db1 | postgres | postgres
(1 row)
SELECT *
FROM pg_dist_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
---------------------------------------------------------------------
1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t
2 | 2 | localhost | 57638 | default | t | t | primary | default | t | t
(2 rows)
CREATE DATABASE db2;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
SELECT oid AS db2_oid
FROM pg_database
WHERE datname = 'db2'
\gset
\c - - - :worker_1_port
CREATE DATABASE db2;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c - - - :worker_2_port
CREATE DATABASE db2;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c db2 - - :worker_1_port
CREATE EXTENSION citus;
\c db2 - - :worker_2_port
CREATE EXTENSION citus;
\c db2 - - :master_port
CREATE EXTENSION citus;
SELECT citus_add_node('localhost', :worker_1_port);
citus_add_node
---------------------------------------------------------------------
1
(1 row)
SELECT citus_add_node('localhost', :worker_2_port);
citus_add_node
---------------------------------------------------------------------
2
(1 row)
SELECT current_database();
current_database
---------------------------------------------------------------------
db2
(1 row)
CREATE SCHEMA test;
:create_function_test_maintenance_worker
-- check maintenance daemon is started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
datname | current_database | usename | extowner
---------------------------------------------------------------------
db2 | db2 | postgres | postgres
(1 row)
SELECT *
FROM pg_dist_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
---------------------------------------------------------------------
1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t
2 | 2 | localhost | 57638 | default | t | t | primary | default | t | t
(2 rows)
SELECT groupid AS worker_1_group_id
FROM pg_dist_node
WHERE nodeport = :worker_1_port;
worker_1_group_id
---------------------------------------------------------------------
1
(1 row)
\gset
SELECT groupid AS worker_2_group_id
FROM pg_dist_node
WHERE nodeport = :worker_2_port;
worker_2_group_id
---------------------------------------------------------------------
2
(1 row)
\gset
-- Prepare transactions on first database
\c db1 - - :worker_1_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_1_db_1_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_1_db_1_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_1_db_1_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_1_db_1_name';
\c db1 - - :worker_2_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_2_db_1_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_2_db_1_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_2_db_1_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_2_db_1_name';
-- Prepare transactions on second database
\c db2 - - :worker_1_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_1_db_2_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_1_db_2_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_1_db_2_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_1_db_2_name';
\c db2 - - :worker_2_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_2_db_2_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_2_db_2_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_2_db_2_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_2_db_2_name';
\c db1 - - :master_port
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_1_name'),
(:worker_2_group_id, :'transaction_2_worker_2_db_1_name');
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid'),
(:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid');
\c db2 - - :master_port
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_2_name'),
(:worker_2_group_id, :'transaction_2_worker_2_db_2_name');
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid'),
(:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid');
\c db1 - - :master_port
SELECT count(*) != 0
FROM pg_dist_transaction;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT recover_prepared_transactions() > 0;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0
FROM pg_dist_transaction;
?column?
---------------------------------------------------------------------
t
(1 row)
\c db2 - - :master_port
SELECT count(*) != 0
FROM pg_dist_transaction;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT recover_prepared_transactions() > 0;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0
FROM pg_dist_transaction;
?column?
---------------------------------------------------------------------
t
(1 row)
\c regression - - :master_port
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db2;
\c - - - :worker_1_port
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db2;
\c - - - :worker_2_port
-- Count of terminated sessions is not important for the test,
-- it is just to make output predictable
SELECT count(pg_terminate_backend(pid)) >= 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) >= 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP DATABASE db2;

View File

@ -202,6 +202,118 @@ SELECT create_distributed_table('test_storage', 'a');
ALTER TABLE test_storage ALTER a SET STORAGE default; ALTER TABLE test_storage ALTER a SET STORAGE default;
ERROR: alter table command is currently unsupported ERROR: alter table command is currently unsupported
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP|VALIDATE CONSTRAINT, SET (), RESET (), ENABLE|DISABLE|NO FORCE|FORCE ROW LEVEL SECURITY, ATTACH|DETACH PARTITION and TYPE subcommands are supported. DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP|VALIDATE CONSTRAINT, SET (), RESET (), ENABLE|DISABLE|NO FORCE|FORCE ROW LEVEL SECURITY, ATTACH|DETACH PARTITION and TYPE subcommands are supported.
-- New ICU_RULES option added to CREATE DATABASE
-- Relevant PG commit:
-- https://github.com/postgres/postgres/commit/30a53b7
CREATE DATABASE test_db WITH LOCALE_PROVIDER = 'icu' LOCALE = '' ICU_RULES = '&a < g' TEMPLATE = 'template0';
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
NOTICE: using standard form "und" for ICU locale ""
SELECT result FROM run_command_on_workers
($$CREATE DATABASE test_db WITH LOCALE_PROVIDER = 'icu' LOCALE = '' ICU_RULES = '&a < g' TEMPLATE = 'template0'$$);
result
---------------------------------------------------------------------
CREATE DATABASE
CREATE DATABASE
(2 rows)
CREATE TABLE test_db_table (a text);
SELECT create_distributed_table('test_db_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_db_table VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green');
-- icu default rules order
SELECT * FROM test_db_table ORDER BY a COLLATE "en-x-icu";
a
---------------------------------------------------------------------
Abernathy
apple
bird
Boston
Graham
green
(6 rows)
-- regression database's default order
SELECT * FROM test_db_table ORDER BY a;
a
---------------------------------------------------------------------
Abernathy
Boston
Graham
apple
bird
green
(6 rows)
-- now see the order in the new database
\c test_db
CREATE EXTENSION citus;
\c - - - :worker_1_port
CREATE EXTENSION citus;
\c - - - :worker_2_port
CREATE EXTENSION citus;
\c - - - :master_port
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
CREATE TABLE test_db_table (a text);
SELECT create_distributed_table('test_db_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_db_table VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green');
-- icu default rules order
SELECT * FROM test_db_table ORDER BY a COLLATE "en-x-icu";
a
---------------------------------------------------------------------
Abernathy
apple
bird
Boston
Graham
green
(6 rows)
-- test_db database's default order with ICU_RULES = '&a < g'
SELECT * FROM test_db_table ORDER BY a;
a
---------------------------------------------------------------------
Abernathy
apple
green
bird
Boston
Graham
(6 rows)
\c regression
\c - - - :master_port
DROP DATABASE test_db;
SELECT result FROM run_command_on_workers
($$DROP DATABASE test_db$$);
result
---------------------------------------------------------------------
DROP DATABASE
DROP DATABASE
(2 rows)
SET search_path TO pg16;
-- --
-- COPY FROM ... DEFAULT -- COPY FROM ... DEFAULT
-- Already supported in Citus, adding all PG tests with a distributed table -- Already supported in Citus, adding all PG tests with a distributed table
@ -542,6 +654,28 @@ SELECT pg_get_viewdef('pg16.prop_view_1', true);
\c - - - :master_port \c - - - :master_port
SET search_path TO pg16; SET search_path TO pg16;
-- REINDEX DATABASE/SYSTEM name is optional
-- We already don't propagate these commands automatically
-- Testing here with run_command_on_workers
-- Relevant PG commit: https://github.com/postgres/postgres/commit/2cbc3c1
REINDEX DATABASE;
SELECT result FROM run_command_on_workers
($$REINDEX DATABASE$$);
result
---------------------------------------------------------------------
REINDEX
REINDEX
(2 rows)
REINDEX SYSTEM;
SELECT result FROM run_command_on_workers
($$REINDEX SYSTEM$$);
result
---------------------------------------------------------------------
REINDEX
REINDEX
(2 rows)
\set VERBOSITY terse \set VERBOSITY terse
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;
DROP SCHEMA pg16 CASCADE; DROP SCHEMA pg16 CASCADE;

View File

@ -206,6 +206,7 @@ test: multi_modifying_xacts
test: multi_generate_ddl_commands test: multi_generate_ddl_commands
test: multi_create_shards test: multi_create_shards
test: multi_transaction_recovery test: multi_transaction_recovery
test: multi_transaction_recovery_multiple_databases
test: local_dist_join_modifications test: local_dist_join_modifications
test: local_table_join test: local_table_join

View File

@ -0,0 +1,286 @@
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
SELECT pg_reload_conf();
SELECT $definition$
CREATE OR REPLACE FUNCTION test.maintenance_worker()
RETURNS pg_stat_activity
LANGUAGE plpgsql
AS $$
DECLARE
activity record;
BEGIN
DO 'BEGIN END'; -- Force maintenance daemon to start
-- we don't want to wait forever; loop will exit after 20 seconds
FOR i IN 1 .. 200 LOOP
PERFORM pg_stat_clear_snapshot();
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = current_database();
IF activity.pid IS NOT NULL THEN
RETURN activity;
ELSE
PERFORM pg_sleep(0.1);
END IF ;
END LOOP;
-- fail if we reach the end of this loop
raise 'Waited too long for maintenance daemon to start';
END;
$$;
$definition$ create_function_test_maintenance_worker
\gset
CREATE DATABASE db1;
SELECT oid AS db1_oid
FROM pg_database
WHERE datname = 'db1'
\gset
\c - - - :worker_1_port
CREATE DATABASE db1;
\c - - - :worker_2_port
CREATE DATABASE db1;
\c db1 - - :worker_1_port
CREATE EXTENSION citus;
\c db1 - - :worker_2_port
CREATE EXTENSION citus;
\c db1 - - :master_port
CREATE EXTENSION citus;
SELECT citus_add_node('localhost', :worker_1_port);
SELECT citus_add_node('localhost', :worker_2_port);
SELECT current_database();
CREATE SCHEMA test;
:create_function_test_maintenance_worker
-- check maintenance daemon is started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
SELECT *
FROM pg_dist_node;
CREATE DATABASE db2;
SELECT oid AS db2_oid
FROM pg_database
WHERE datname = 'db2'
\gset
\c - - - :worker_1_port
CREATE DATABASE db2;
\c - - - :worker_2_port
CREATE DATABASE db2;
\c db2 - - :worker_1_port
CREATE EXTENSION citus;
\c db2 - - :worker_2_port
CREATE EXTENSION citus;
\c db2 - - :master_port
CREATE EXTENSION citus;
SELECT citus_add_node('localhost', :worker_1_port);
SELECT citus_add_node('localhost', :worker_2_port);
SELECT current_database();
CREATE SCHEMA test;
:create_function_test_maintenance_worker
-- check maintenance daemon is started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
SELECT *
FROM pg_dist_node;
SELECT groupid AS worker_1_group_id
FROM pg_dist_node
WHERE nodeport = :worker_1_port;
\gset
SELECT groupid AS worker_2_group_id
FROM pg_dist_node
WHERE nodeport = :worker_2_port;
\gset
-- Prepare transactions on first database
\c db1 - - :worker_1_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_1_db_1_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_1_db_1_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_1_db_1_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_1_db_1_name';
\c db1 - - :worker_2_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_0_0_' || :'db1_oid' AS transaction_1_worker_2_db_1_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_2_db_1_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_1_0_' || :'db1_oid' AS transaction_2_worker_2_db_1_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_2_db_1_name';
-- Prepare transactions on second database
\c db2 - - :worker_1_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_1_db_2_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_1_db_2_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_1_db_2_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_1_db_2_name';
\c db2 - - :worker_2_port
BEGIN;
CREATE TABLE should_abort
(
value int
);
SELECT 'citus_0_1234_3_0_' || :'db2_oid' AS transaction_1_worker_2_db_2_name
\gset
PREPARE TRANSACTION :'transaction_1_worker_2_db_2_name';
BEGIN;
CREATE TABLE should_commit
(
value int
);
SELECT 'citus_0_1234_4_0_' || :'db2_oid' AS transaction_2_worker_2_db_2_name
\gset
PREPARE TRANSACTION :'transaction_2_worker_2_db_2_name';
\c db1 - - :master_port
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_1_name'),
(:worker_2_group_id, :'transaction_2_worker_2_db_1_name');
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid'),
(:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db1_oid');
\c db2 - - :master_port
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, :'transaction_2_worker_1_db_2_name'),
(:worker_2_group_id, :'transaction_2_worker_2_db_2_name');
INSERT INTO pg_dist_transaction
VALUES (:worker_1_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid'),
(:worker_2_group_id, 'citus_0_should_be_forgotten_' || :'db2_oid');
\c db1 - - :master_port
SELECT count(*) != 0
FROM pg_dist_transaction;
SELECT recover_prepared_transactions() > 0;
SELECT count(*) = 0
FROM pg_dist_transaction;
\c db2 - - :master_port
SELECT count(*) != 0
FROM pg_dist_transaction;
SELECT recover_prepared_transactions() > 0;
SELECT count(*) = 0
FROM pg_dist_transaction;
\c regression - - :master_port
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
DROP DATABASE db2;
\c - - - :worker_1_port
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) > 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
DROP DATABASE db2;
\c - - - :worker_2_port
-- Count of terminated sessions is not important for the test,
-- it is just to make output predictable
SELECT count(pg_terminate_backend(pid)) >= 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db1' ;
DROP DATABASE db1;
SELECT count(pg_terminate_backend(pid)) >= 0
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND datname = 'db2' ;
DROP DATABASE db2;

View File

@ -104,6 +104,49 @@ SELECT result FROM run_command_on_all_nodes
SELECT create_distributed_table('test_storage', 'a'); SELECT create_distributed_table('test_storage', 'a');
ALTER TABLE test_storage ALTER a SET STORAGE default; ALTER TABLE test_storage ALTER a SET STORAGE default;
-- New ICU_RULES option added to CREATE DATABASE
-- Relevant PG commit:
-- https://github.com/postgres/postgres/commit/30a53b7
CREATE DATABASE test_db WITH LOCALE_PROVIDER = 'icu' LOCALE = '' ICU_RULES = '&a < g' TEMPLATE = 'template0';
SELECT result FROM run_command_on_workers
($$CREATE DATABASE test_db WITH LOCALE_PROVIDER = 'icu' LOCALE = '' ICU_RULES = '&a < g' TEMPLATE = 'template0'$$);
CREATE TABLE test_db_table (a text);
SELECT create_distributed_table('test_db_table', 'a');
INSERT INTO test_db_table VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green');
-- icu default rules order
SELECT * FROM test_db_table ORDER BY a COLLATE "en-x-icu";
-- regression database's default order
SELECT * FROM test_db_table ORDER BY a;
-- now see the order in the new database
\c test_db
CREATE EXTENSION citus;
\c - - - :worker_1_port
CREATE EXTENSION citus;
\c - - - :worker_2_port
CREATE EXTENSION citus;
\c - - - :master_port
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
CREATE TABLE test_db_table (a text);
SELECT create_distributed_table('test_db_table', 'a');
INSERT INTO test_db_table VALUES ('Abernathy'), ('apple'), ('bird'), ('Boston'), ('Graham'), ('green');
-- icu default rules order
SELECT * FROM test_db_table ORDER BY a COLLATE "en-x-icu";
-- test_db database's default order with ICU_RULES = '&a < g'
SELECT * FROM test_db_table ORDER BY a;
\c regression
\c - - - :master_port
DROP DATABASE test_db;
SELECT result FROM run_command_on_workers
($$DROP DATABASE test_db$$);
SET search_path TO pg16;
-- --
-- COPY FROM ... DEFAULT -- COPY FROM ... DEFAULT
-- Already supported in Citus, adding all PG tests with a distributed table -- Already supported in Citus, adding all PG tests with a distributed table
@ -332,6 +375,19 @@ SELECT pg_get_viewdef('pg16.prop_view_1', true);
\c - - - :master_port \c - - - :master_port
SET search_path TO pg16; SET search_path TO pg16;
-- REINDEX DATABASE/SYSTEM name is optional
-- We already don't propagate these commands automatically
-- Testing here with run_command_on_workers
-- Relevant PG commit: https://github.com/postgres/postgres/commit/2cbc3c1
REINDEX DATABASE;
SELECT result FROM run_command_on_workers
($$REINDEX DATABASE$$);
REINDEX SYSTEM;
SELECT result FROM run_command_on_workers
($$REINDEX SYSTEM$$);
\set VERBOSITY terse \set VERBOSITY terse
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;
DROP SCHEMA pg16 CASCADE; DROP SCHEMA pg16 CASCADE;