Schedule parallel shard moves in background rebalancer by removing task dependencies between shard moves across colocation groups. (#6756)

DESCRIPTION: This PR removes the task dependencies between shard moves
for which the shards belong to different colocation groups. This change
results in scheduling multiple tasks in the RUNNABLE state. Therefore it
is possible that the background task monitor can run them concurrently.

Previously, all the shard moves planned in a rebalance operation took
dependency on each other sequentially.
For instance, given the following table and shards 

colocation group 1 colocation group 2
table1 table2 table3 table4 table 5
shard11 shard21 shard31 shard41 shard51
shard12 shard22 shard32 shard42 shard52
  
 if the rebalancer planner returned the below set of moves 
` {move(shard11), move(shard12), move(shard41), move(shard42)}`

background rebalancer scheduled them such that they depend on each other
sequentially.
```
      {move(reftables) if there is any, none}
               |
      move( shard11)
               |
      move(shard12)
               |                {move(shard41)<--- move(shard12)} This is an artificial dependency  
      move(shard41)
               |
      move(shard42) 

```
This results in artificial dependencies between otherwise independent
moves.

Considering that the shards in different colocation groups can be moved
concurrently, this PR changes the dependency relationship between the
moves as follows:

```
      {move(reftables) if there is any, none}          {move(reftables) if there is any, none}     
               |                                                            |
      move(shard11)                                                  move(shard41)
               |                                                            |
      move(shard12)                                                   move(shard42) 
   
```

---------

Co-authored-by: Jelte Fennema <jelte.fennema@microsoft.com>
issue/6694^2
Emel Şimşek 2023-03-29 22:03:37 +03:00 committed by GitHub
parent ce4bcf6de0
commit d3fb9288ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 693 additions and 24 deletions

View File

@ -190,6 +190,19 @@ typedef struct WorkerShardStatistics
HTAB *statistics;
} WorkerShardStatistics;
/* ShardMoveDependencyHashEntry contains the taskId which any new shard move task within the corresponding colocation group must take a dependency on */
typedef struct ShardMoveDependencyInfo
{
int64 key;
int64 taskId;
} ShardMoveDependencyInfo;
typedef struct ShardMoveDependencies
{
HTAB *colocationDependencies;
HTAB *nodeDependencies;
} ShardMoveDependencies;
char *VariablesToBePassedToNewConnections = NULL;
/* static declarations for main logic */
@ -1898,6 +1911,137 @@ ErrorOnConcurrentRebalance(RebalanceOptions *options)
}
/*
* GetColocationId function returns the colocationId of the shard in a PlacementUpdateEvent.
*/
static int64
GetColocationId(PlacementUpdateEvent *move)
{
ShardInterval *shardInterval = LoadShardInterval(move->shardId);
CitusTableCacheEntry *citusTableCacheEntry = GetCitusTableCacheEntry(
shardInterval->relationId);
return citusTableCacheEntry->colocationId;
}
/*
* InitializeShardMoveDependencies function creates the hash maps that we use to track
* the latest moves so that subsequent moves with the same properties must take a dependency
* on them. There are two hash maps. One is for tracking the latest move scheduled in a
* given colocation group and the other one is for tracking the latest move which involves
* a given node either as its source node or its target node.
*/
static ShardMoveDependencies
InitializeShardMoveDependencies()
{
ShardMoveDependencies shardMoveDependencies;
shardMoveDependencies.colocationDependencies = CreateSimpleHashWithNameAndSize(int64,
ShardMoveDependencyInfo,
"colocationDependencyHashMap",
6);
shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int64,
ShardMoveDependencyInfo,
"nodeDependencyHashMap",
6);
return shardMoveDependencies;
}
/*
* GenerateTaskMoveDependencyList creates and returns a List of taskIds that
* the move must take a dependency on.
*/
static int64 *
GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
ShardMoveDependencies shardMoveDependencies, int *nDepends)
{
HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64,
"shardMoveDependencyList", 0);
bool found;
/* Check if there exists a move in the same colocation group scheduled earlier. */
ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search(
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, &found);
if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}
/* Check if there exists a move scheduled earlier whose source or target node
* overlaps with the current move's source node. */
shardMoveDependencyInfo = hash_search(
shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER,
&found);
if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}
/* Check if there exists a move scheduled earlier whose source or target node
* overlaps with the current move's target node. */
shardMoveDependencyInfo = hash_search(
shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_ENTER,
&found);
if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}
*nDepends = hash_get_num_entries(dependsList);
int64 *dependsArray = NULL;
if (*nDepends > 0)
{
HASH_SEQ_STATUS seq;
dependsArray = palloc((*nDepends) * sizeof(int64));
hash_seq_init(&seq, dependsList);
int i = 0;
int64 *dependsTaskId;
while ((dependsTaskId = (int64 *) hash_seq_search(&seq)) != NULL)
{
dependsArray[i++] = *dependsTaskId;
}
}
return dependsArray;
}
/*
* UpdateShardMoveDependencies function updates the dependency maps with the latest move's taskId.
*/
static void
UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId,
ShardMoveDependencies shardMoveDependencies)
{
ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search(
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL);
shardMoveDependencyInfo->taskId = taskId;
shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies,
&move->sourceNode->nodeId, HASH_ENTER, NULL);
shardMoveDependencyInfo->taskId = taskId;
shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies,
&move->targetNode->nodeId, HASH_ENTER, NULL);
shardMoveDependencyInfo->taskId = taskId;
}
/*
* RebalanceTableShardsBackground rebalances the shards for the relations
* inside the relationIdList across the different workers. It does so using our
@ -1974,18 +2118,8 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
StringInfoData buf = { 0 };
initStringInfo(&buf);
/*
* Currently we only have two tasks that any move can depend on:
* - replicating reference tables
* - the previous move
*
* prevJobIdx tells what slot to write the id of the task into. We only use both slots
* if we are actually replicating reference tables.
*/
int64 prevJobId[2] = { 0 };
int prevJobIdx = 0;
List *referenceTableIdList = NIL;
int64 replicateRefTablesTaskId = 0;
if (HasNodesWithMissingReferenceTables(&referenceTableIdList))
{
@ -2001,15 +2135,15 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
appendStringInfo(&buf,
"SELECT pg_catalog.replicate_reference_tables(%s)",
quote_literal_cstr(shardTranferModeLabel));
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
prevJobIdx, prevJobId);
prevJobId[prevJobIdx] = task->taskid;
prevJobIdx++;
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, 0,
NULL);
replicateRefTablesTaskId = task->taskid;
}
PlacementUpdateEvent *move = NULL;
bool first = true;
int prevMoveIndex = prevJobIdx;
ShardMoveDependencies shardMoveDependencies = InitializeShardMoveDependencies();
foreach_ptr(move, placementUpdateList)
{
resetStringInfo(&buf);
@ -2021,14 +2155,27 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
move->targetNode->nodeId,
quote_literal_cstr(shardTranferModeLabel));
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
prevJobIdx, prevJobId);
prevJobId[prevMoveIndex] = task->taskid;
if (first)
int64 colocationId = GetColocationId(move);
int nDepends = 0;
int64 *dependsArray = GenerateTaskMoveDependencyList(move, colocationId,
shardMoveDependencies,
&nDepends);
if (nDepends == 0 && replicateRefTablesTaskId > 0)
{
first = false;
prevJobIdx++;
nDepends = 1;
dependsArray = palloc(nDepends * sizeof(int64));
dependsArray[0] = replicateRefTablesTaskId;
}
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
nDepends,
dependsArray);
UpdateShardMoveDependencies(move, colocationId, task->taskid,
shardMoveDependencies);
}
ereport(NOTICE,

View File

@ -223,7 +223,7 @@ check-follower-cluster: all
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_follower_schedule $(EXTRA_TESTS)
check-operations: all
$(pg_regress_multi_check) --load-extension=citus \
$(pg_regress_multi_check) --load-extension=citus --worker-count=6 \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/operations_schedule $(EXTRA_TESTS)
check-columnar: all

View File

@ -110,6 +110,14 @@ if __name__ == "__main__":
"multi_mx_function_table_reference",
],
),
"background_rebalance_parallel": TestDeps(
None,
[
"multi_test_helpers",
"multi_cluster_management",
],
worker_count=6,
),
"multi_mx_modifying_xacts": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_router_planner": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_copy_data": TestDeps(None, ["multi_mx_create_table"]),

View File

@ -291,6 +291,12 @@ SELECT state, details from citus_rebalance_status();
finished | {"tasks": [], "task_state_counts": {"done": 2}}
(1 row)
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- Remove coordinator again to allow rerunning of this test
SELECT 1 FROM citus_remove_node('localhost', :master_port);
?column?

View File

@ -0,0 +1,364 @@
/*
Test to check if the background tasks scheduled by the background rebalancer
has the correct dependencies.
*/
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674000;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO WARNING;
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050;
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50;
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
/* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
/* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg2 (b int primary key);
SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2');
create_distributed_table
---------------------------------------------------------------------
(1 row)
/* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3');
create_distributed_table
---------------------------------------------------------------------
(1 row)
/* Add two new node so that we can rebalance */
SELECT 1 FROM citus_add_node('localhost', :worker_3_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_4_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid;
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
table1_colg1 | 85674000 | 0 | localhost | 57637 | localhost | 57640
table1_colg1 | 85674001 | 0 | localhost | 57638 | localhost | 57639
table2_colg1 | 85674004 | 0 | localhost | 57637 | localhost | 57640
table2_colg1 | 85674005 | 0 | localhost | 57638 | localhost | 57639
table1_colg2 | 85674008 | 0 | localhost | 57637 | localhost | 57640
table1_colg2 | 85674009 | 0 | localhost | 57638 | localhost | 57639
table2_colg2 | 85674012 | 0 | localhost | 57637 | localhost | 57640
table2_colg2 | 85674013 | 0 | localhost | 57638 | localhost | 57639
table1_colg3 | 85674016 | 0 | localhost | 57637 | localhost | 57640
table1_colg3 | 85674017 | 0 | localhost | 57638 | localhost | 57639
table2_colg3 | 85674020 | 0 | localhost | 57637 | localhost | 57640
table2_colg3 | 85674021 | 0 | localhost | 57638 | localhost | 57639
(12 rows)
SELECT * FROM citus_rebalance_start();
citus_rebalance_start
---------------------------------------------------------------------
17777
(1 row)
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
/*Check that a move is dependent on
1. any other move scheduled earlier in its colocation group.
2. any other move scheduled earlier whose source node or target
node overlaps with the current moves nodes. */
SELECT S.shardid, P.colocationid
FROM pg_dist_shard S, pg_dist_partition P
WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC;
shardid | colocationid
---------------------------------------------------------------------
85674000 | 50050
85674001 | 50050
85674002 | 50050
85674003 | 50050
85674004 | 50050
85674005 | 50050
85674006 | 50050
85674007 | 50050
85674008 | 50051
85674009 | 50051
85674010 | 50051
85674011 | 50051
85674012 | 50051
85674013 | 50051
85674014 | 50051
85674015 | 50051
85674016 | 50052
85674017 | 50052
85674018 | 50052
85674019 | 50052
85674020 | 50052
85674021 | 50052
85674022 | 50052
85674023 | 50052
(24 rows)
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, D.depends_on ASC;
task_id | command | depends_on | command
---------------------------------------------------------------------
1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto')
1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto') | 1000 | SELECT pg_catalog.citus_move_shard_placement(85674001,51,52,'auto')
1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1001 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,53,'auto')
1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto')
1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto') | 1002 | SELECT pg_catalog.citus_move_shard_placement(85674009,51,52,'auto')
1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1003 | SELECT pg_catalog.citus_move_shard_placement(85674008,50,53,'auto')
1005 | SELECT pg_catalog.citus_move_shard_placement(85674016,50,53,'auto') | 1004 | SELECT pg_catalog.citus_move_shard_placement(85674017,51,52,'auto')
(7 rows)
/* Check that if there is a reference table that needs to be synched to a node,
any move without a dependency must depend on the move task for reference table. */
SELECT 1 FROM citus_drain_node('localhost',:worker_4_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true);
?column?
---------------------------------------------------------------------
1
(1 row)
/* Drain worker_3 so that we can move only one colocation group to worker_3
to create an unbalance that would cause parallel rebalancing. */
SELECT 1 FROM citus_drain_node('localhost',:worker_3_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
CREATE TABLE ref_table(a int PRIMARY KEY);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
/* Move all the shards of Colocation group 3 to worker_3.*/
SELECT
master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes')
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port
ORDER BY
shardid;
master_move_shard_placement
---------------------------------------------------------------------
(4 rows)
CALL citus_cleanup_orphaned_resources();
/* Activate and new nodes so that we can rebalance. */
SELECT 1 FROM citus_activate_node('localhost', :worker_4_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_5_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_6_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM citus_rebalance_start();
citus_rebalance_start
---------------------------------------------------------------------
17778
(1 row)
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
SELECT S.shardid, P.colocationid
FROM pg_dist_shard S, pg_dist_partition P
WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC;
shardid | colocationid
---------------------------------------------------------------------
85674000 | 50050
85674001 | 50050
85674002 | 50050
85674003 | 50050
85674004 | 50050
85674005 | 50050
85674006 | 50050
85674007 | 50050
85674008 | 50051
85674009 | 50051
85674010 | 50051
85674011 | 50051
85674012 | 50051
85674013 | 50051
85674014 | 50051
85674015 | 50051
85674016 | 50052
85674017 | 50052
85674018 | 50052
85674019 | 50052
85674020 | 50052
85674021 | 50052
85674022 | 50052
85674023 | 50052
85674024 | 50053
(25 rows)
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, D.depends_on ASC;
task_id | command | depends_on | command
---------------------------------------------------------------------
1007 | SELECT pg_catalog.citus_move_shard_placement(85674016,52,53,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto')
1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto') | 1006 | SELECT pg_catalog.replicate_reference_tables('auto')
1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto')
1010 | SELECT pg_catalog.citus_move_shard_placement(85674017,52,53,'auto') | 1007 | SELECT pg_catalog.citus_move_shard_placement(85674016,52,53,'auto')
1011 | SELECT pg_catalog.citus_move_shard_placement(85674008,51,54,'auto') | 1008 | SELECT pg_catalog.citus_move_shard_placement(85674003,51,54,'auto')
1012 | SELECT pg_catalog.citus_move_shard_placement(85674001,50,55,'auto') | 1009 | SELECT pg_catalog.citus_move_shard_placement(85674000,50,55,'auto')
(6 rows)
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_3_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_4_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_5_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_6_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- keep the rest of the tests inact that depends node/group ids
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;

View File

@ -5,6 +5,7 @@ test: shard_rebalancer_unit
test: shard_rebalancer
test: background_rebalance
test: worker_copy_table_to_node
test: background_rebalance_parallel
test: foreign_key_to_reference_shard_rebalance
test: multi_move_mx
test: shard_move_deferred_delete

View File

@ -104,6 +104,8 @@ SELECT 1 FROM citus_rebalance_start(shard_transfer_mode := 'force_logical');
SELECT citus_rebalance_wait();
SELECT state, details from citus_rebalance_status();
SELECT public.wait_for_resource_cleanup();
-- Remove coordinator again to allow rerunning of this test
SELECT 1 FROM citus_remove_node('localhost', :master_port);
SELECT public.wait_until_metadata_sync(30000);

View File

@ -0,0 +1,141 @@
/*
Test to check if the background tasks scheduled by the background rebalancer
has the correct dependencies.
*/
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674000;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO WARNING;
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050;
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50;
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf();
/* Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group */
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4 , colocate_with => 'none');
CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b' , colocate_with => 'table1_colg1');
/* Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group */
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2 ', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg2 (b int primary key);
SELECT create_distributed_table('table2_colg2', 'b' , colocate_with => 'table1_colg2');
/* Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group */
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3 ', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b' , colocate_with => 'table1_colg3');
/* Add two new node so that we can rebalance */
SELECT 1 FROM citus_add_node('localhost', :worker_3_port);
SELECT 1 FROM citus_add_node('localhost', :worker_4_port);
SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid;
SELECT * FROM citus_rebalance_start();
SELECT citus_rebalance_wait();
/*Check that a move is dependent on
1. any other move scheduled earlier in its colocation group.
2. any other move scheduled earlier whose source node or target
node overlaps with the current moves nodes. */
SELECT S.shardid, P.colocationid
FROM pg_dist_shard S, pg_dist_partition P
WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC;
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id = 17777 ORDER BY D.task_id, D.depends_on ASC;
/* Check that if there is a reference table that needs to be synched to a node,
any move without a dependency must depend on the move task for reference table. */
SELECT 1 FROM citus_drain_node('localhost',:worker_4_port);
SELECT public.wait_for_resource_cleanup();
SELECT 1 FROM citus_disable_node('localhost', :worker_4_port, synchronous:=true);
/* Drain worker_3 so that we can move only one colocation group to worker_3
to create an unbalance that would cause parallel rebalancing. */
SELECT 1 FROM citus_drain_node('localhost',:worker_3_port);
SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true);
CALL citus_cleanup_orphaned_resources();
CREATE TABLE ref_table(a int PRIMARY KEY);
SELECT create_reference_table('ref_table');
/* Move all the shards of Colocation group 3 to worker_3.*/
SELECT
master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes')
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port
ORDER BY
shardid;
CALL citus_cleanup_orphaned_resources();
/* Activate and new nodes so that we can rebalance. */
SELECT 1 FROM citus_activate_node('localhost', :worker_4_port);
SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true);
SELECT 1 FROM citus_add_node('localhost', :worker_5_port);
SELECT 1 FROM citus_add_node('localhost', :worker_6_port);
SELECT * FROM citus_rebalance_start();
SELECT citus_rebalance_wait();
SELECT S.shardid, P.colocationid
FROM pg_dist_shard S, pg_dist_partition P
WHERE S.logicalrelid = P.logicalrelid ORDER BY S.shardid ASC;
SELECT D.task_id,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id = 17778 ORDER BY D.task_id, D.depends_on ASC;
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
SELECT public.wait_for_resource_cleanup();
select citus_remove_node('localhost', :worker_3_port);
select citus_remove_node('localhost', :worker_4_port);
select citus_remove_node('localhost', :worker_5_port);
select citus_remove_node('localhost', :worker_6_port);
-- keep the rest of the tests inact that depends node/group ids
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;