Commit Graph

3 Commits (17f09d4ad7ec0e95704e01bdf5492c9b4cb7b316)

Author SHA1 Message Date
Hanefi Onaldi 15f5796eee
Fix flaky background rebalance parallel test (#6893)
A test in background_rebalance_parallel.sql was failing intermittently
where the order of tasks in the output was not deterministic. This
commit fixes the test by removing id columns for the background tasks in
the output.

A sample failing diff before this patch is below:

```diff
 SELECT D.task_id,
        (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
        D.depends_on,
        (SELECT T.command FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
 FROM pg_dist_background_task_depend D  WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
  task_id |                               command                               | depends_on |                               command
 ---------+---------------------------------------------------------------------+------------+---------------------------------------------------------------------
-    1014 | SELECT pg_catalog.citus_move_shard_placement(85674026,50,57,'auto') |       1013 | SELECT pg_catalog.citus_move_shard_placement(85674025,50,56,'auto')
-    1016 | SELECT pg_catalog.citus_move_shard_placement(85674032,50,57,'auto') |       1015 | SELECT pg_catalog.citus_move_shard_placement(85674031,50,56,'auto')
-    1018 | SELECT pg_catalog.citus_move_shard_placement(85674038,50,57,'auto') |       1017 | SELECT pg_catalog.citus_move_shard_placement(85674037,50,56,'auto')
-    1020 | SELECT pg_catalog.citus_move_shard_placement(85674044,50,57,'auto') |       1019 | SELECT pg_catalog.citus_move_shard_placement(85674043,50,56,'auto')
+    1014 | SELECT pg_catalog.citus_move_shard_placement(85674038,50,57,'auto') |       1013 | SELECT pg_catalog.citus_move_shard_placement(85674037,50,56,'auto')
+    1016 | SELECT pg_catalog.citus_move_shard_placement(85674044,50,57,'auto') |       1015 | SELECT pg_catalog.citus_move_shard_placement(85674043,50,56,'auto')
+    1018 | SELECT pg_catalog.citus_move_shard_placement(85674026,50,57,'auto') |       1017 | SELECT pg_catalog.citus_move_shard_placement(85674025,50,56,'auto')
+    1020 | SELECT pg_catalog.citus_move_shard_placement(85674032,50,57,'auto') |       1019 | SELECT pg_catalog.citus_move_shard_placement(85674031,50,56,'auto')
 (4 rows)
```

Notice that the dependent and dependee tasks have some commands, but
they have different task ids.

(cherry picked from commit 3217e3f181)
2023-05-05 12:18:39 +03:00
Naisila Puka 84f2d8685a
Adds control for background task executors involving a node (#6771)
DESCRIPTION: Adds control for background task executors involving a node

### Background and motivation

Nonblocking concurrent task execution via background workers was
introduced in [#6459](https://github.com/citusdata/citus/pull/6459), and
concurrent shard moves in the background rebalancer were introduced in
[#6756](https://github.com/citusdata/citus/pull/6756) - with a hard
dependency that limits to 1 shard move per node. As we know, a shard
move consists of a shard moving from a source node to a target node. The
hard dependency was used because the background task runner didn't have
an option to limit the parallel shard moves per node.

With the motivation of controlling the number of concurrent shard
moves that involve a particular node, either as source or target, this
PR introduces a general new GUC
citus.max_background_task_executors_per_node to be used in the
background task runner infrastructure. So, why do we even want to
control and limit the concurrency? Well, it's all about resource
availability: because the moves involve the same nodes, extra
parallelism won’t make the rebalance complete faster if some resource is
already maxed out (usually cpu or disk). Or, if the cluster is being
used in a production setting, the moves might compete for resources with
production queries much more than if they had been executed
sequentially.

### How does it work?

A new column named nodes_involved is added to the catalog table that
keeps track of the scheduled background tasks,
pg_dist_background_task. It is of type integer[] - to store a list
of node ids. It is NULL by default - the column will be filled by the
rebalancer, but we may not care about the nodes involved in other uses
of the background task runner.

Table "pg_catalog.pg_dist_background_task"

     Column     |           Type           
============================================
 job_id         | bigint
 task_id        | bigint
 owner          | regrole
 pid            | integer
 status         | citus_task_status
 command        | text
 retry_count    | integer
 not_before     | timestamp with time zone
 message        | text
+nodes_involved | integer[]

A hashtable named ParallelTasksPerNode keeps track of the number of
parallel running background tasks per node. An entry in the hashtable is
as follows:

ParallelTasksPerNodeEntry
{
	node_id // The node is used as the hash table key 
	counter // Number of concurrent background tasks that involve node node_id
                // The counter limit is citus.max_background_task_executors_per_node
}

When the background task runner assigns a runnable task to a new
executor, it increments the counter for each of the nodes involved with
that runnable task. The limit of each counter is
citus.max_background_task_executors_per_node. If the limit is reached
for any of the nodes involved, this runnable task is skipped. And then,
later, when the running task finishes, the background task runner
decrements the counter for each of the nodes involved with the done
task. The following functions take care of these increment-decrement
steps:

IncrementParallelTaskCountForNodesInvolved(task)
DecrementParallelTaskCountForNodesInvolved(task)

citus.max_background_task_executors_per_node can be changed in the
fly. In the background rebalancer, we simply give {source_node,
target_node} as the nodesInvolved input to the
ScheduleBackgroundTask function. The rest is taken care of by the
general background task runner infrastructure explained above. Check
background_task_queue_monitor.sql and
background_rebalance_parallel.sql tests for detailed examples.

#### Note

This PR also adds a hard node dependency if a node is first being used
as a source for a move, and then later as a target. The reason this
should be a hard dependency is that the first move might make space for
the second move. So, we could run out of disk space (or at least
overload the node) if we move the second shard to it before the first
one is moved away.

Fixes https://github.com/citusdata/citus/issues/6716
2023-04-06 14:12:39 +03:00
Emel Şimşek d3fb9288ab
Schedule parallel shard moves in background rebalancer by removing task dependencies between shard moves across colocation groups. (#6756)
DESCRIPTION: This PR removes the task dependencies between shard moves
for which the shards belong to different colocation groups. This change
results in scheduling multiple tasks in the RUNNABLE state. Therefore it
is possible that the background task monitor can run them concurrently.

Previously, all the shard moves planned in a rebalance operation took
dependency on each other sequentially.
For instance, given the following table and shards 

colocation group 1 colocation group 2
table1 table2 table3 table4 table 5
shard11 shard21 shard31 shard41 shard51
shard12 shard22 shard32 shard42 shard52
  
 if the rebalancer planner returned the below set of moves 
` {move(shard11), move(shard12), move(shard41), move(shard42)}`

background rebalancer scheduled them such that they depend on each other
sequentially.
```
      {move(reftables) if there is any, none}
               |
      move( shard11)
               |
      move(shard12)
               |                {move(shard41)<--- move(shard12)} This is an artificial dependency  
      move(shard41)
               |
      move(shard42) 

```
This results in artificial dependencies between otherwise independent
moves.

Considering that the shards in different colocation groups can be moved
concurrently, this PR changes the dependency relationship between the
moves as follows:

```
      {move(reftables) if there is any, none}          {move(reftables) if there is any, none}     
               |                                                            |
      move(shard11)                                                  move(shard41)
               |                                                            |
      move(shard12)                                                   move(shard42) 
   
```

---------

Co-authored-by: Jelte Fennema <jelte.fennema@microsoft.com>
2023-03-29 22:03:37 +03:00