Support changing CPU priorities for backends and shard moves (#6126)

**Intro**
This adds support to Citus to change the CPU priority values of
backends. This is created with two main usecases in mind:

1. Users might want to run the logical replication part of the shard moves
   or shard splits at a higher speed than they would do by themselves. 
   This might cause some small loss of DB performance for their regular 
   queries, but this is often worth it. During high load it's very possible
   that the logical replication WAL sender is not able to keep up with the
   WAL that is generated. This is especially a big problem when the
   machine is close to running out of disk when doing a rebalance.
2. Users might have certain long running queries that they don't impact
   their regular workload too much.

**Be very careful!!!**
Using CPU priorities to control scheduling can be helpful in some cases
to control which processes are getting more CPU time than others. 
However, due to an issue called "[priority inversion][1]" it's possible that
using CPU priorities together with the many locks that are used within
Postgres cause the exact opposite behavior of what you intended. This
is why this PR only allows the PG superuser to change the CPU priority 
of its own processes. Currently it's not recommended to set `citus.cpu_priority`
directly. Currently the only recommended interface for users is the setting 
called `citus.cpu_priority_for_logical_replication_senders`. This setting
controls CPU priority for a very limited set of processes (the logical 
replication senders). So, the dangers of priority inversion are also limited
with when using it for this usecase.

**Background**
Before reading the rest it's important to understand some basic
background regarding process CPU priorities, because they are a bit
counter intuitive. A lower priority value, means that the process will
be scheduled more and whatever it's doing will thus complete faster. The
default priority for processes is 0. Valid values are from -20 to 19
inclusive. On Linux a larger difference between values of two processes
will result in a bigger difference in percentage of scheduling.

**Handling the usecases**
Usecase 1 can be achieved by setting `citus.cpu_priority_for_logical_replication_senders`
to the priority value that you want it to have. It's necessary to set
this both on the workers and the coordinator. Example:
```
citus.cpu_priority_for_logical_replication_senders = -10
```

Usecase 2 can with this PR be achieved by running the following as
superuser. Note that this is only possible as superuser currently 
due to the dangers mentioned in the "Be very carefull!!!" section. 
And although this is possible it's **NOT** recommended:
```sql
ALTER USER background_job_user SET citus.cpu_priority = 5;
```

**OS configuration**
To actually make these settings work well it's important to run Postgres
with more a more permissive value for the 'nice' resource limit than
Linux will do by default. By default Linux will not allow a process to
set its priority lower than it currently is, even if it was lower when
the process originally started. This capability is necessary to reset
the CPU priority to its original value after a transaction finishes.
Depending on how you run Postgres this needs to be done in one of two
ways:

If you use systemd to start Postgres all you have to do is add  a line
like this to the systemd service file:
```conf
LimitNice=+0 # the + is important, otherwise its interpreted incorrectly as 20
```

If that's not the case you'll have to configure `/etc/security/limits.conf` 
like so, assuming that you are running Postgres as the `postgres` OS user:
```
postgres            soft    nice            0
postgres            hard    nice            0
```
Finally you'd have add the following line to `/etc/pam.d/common-session`
```
session required pam_limits.so
```

These settings would allow to change the priority back after setting it
to a higher value.

However, to actually allow you to set priorities even lower than the
default priority value you would need to change the values in the 
config to something lower than 0. So for example:
```conf
LimitNice=-10
```

or

```
postgres            soft    nice            -10
postgres            hard    nice            -10
```

If you use WSL2 you'll likely have to do another thing. You have to 
open a new shell, because when PAM is only used during login, and 
WSL2 doesn't actually log you in. You can force a login like this:
```
sudo su $USER --shell /bin/bash
```
Source: https://stackoverflow.com/a/68322992/2570866

[1]: https://en.wikipedia.org/wiki/Priority_inversion
pull/6167/head
Jelte Fennema 2022-08-16 12:07:17 +02:00 committed by GitHub
parent 1a01c896f0
commit 78a5013e24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 588 additions and 2 deletions

View File

@ -18,6 +18,7 @@
#include "distributed/citus_ruleutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/priority.h"
#include "distributed/worker_shard_copy.h"
PG_FUNCTION_INFO_V1(worker_copy_table_to_node);

View File

@ -41,6 +41,7 @@
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_replication.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/priority.h"
#include "distributed/distributed_planner.h"
#include "distributed/remote_commands.h"
#include "distributed/repair_shards.h"
@ -1794,6 +1795,13 @@ CreateSubscriptions(MultiConnection *sourceConnection,
sourceConnection->port,
escape_param_str(sourceConnection->user), escape_param_str(
databaseName));
if (CpuPriorityLogicalRepSender != CPU_PRIORITY_INHERIT &&
list_length(logicalRepTargetList) <= MaxHighPriorityBackgroundProcesess)
{
appendStringInfo(conninfo,
" options='-c citus.cpu_priority=%d'",
CpuPriorityLogicalRepSender);
}
StringInfo createSubscriptionCommand = makeStringInfo();
appendStringInfo(createSubscriptionCommand,

View File

@ -16,6 +16,7 @@
#include "distributed/hash_helpers.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/priority.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/connection_management.h"
#include "distributed/remote_commands.h"

View File

@ -67,6 +67,7 @@
#include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/placement_connection.h"
#include "distributed/priority.h"
#include "distributed/query_stats.h"
#include "distributed/recursive_planning.h"
#include "distributed/reference_table_utils.h"
@ -184,6 +185,7 @@ static bool ShowShardsForAppNamePrefixesCheckHook(char **newval, void **extra,
GucSource source);
static void ShowShardsForAppNamePrefixesAssignHook(const char *newval, void *extra);
static void ApplicationNameAssignHook(const char *newval, void *extra);
static void CpuPriorityAssignHook(int newval, void *extra);
static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source);
static void NodeConninfoGucAssignHook(const char *newval, void *extra);
static const char * MaxSharedPoolSizeGucShowHook(void);
@ -288,6 +290,64 @@ static const struct config_enum_entry create_object_propagation_options[] = {
{NULL, 0, false}
};
/*
* This used to choose CPU priorities for GUCs. For most other integer options
* we use the -1 value as inherit/default/unset. For CPU priorities this isn't
* possible, because they can actually have negative values. So we need a value
* outside of the range that's valid for priorities. But if this is only one
* more or less than the valid values, this can also be quite confusing for
* people that don't know the exact range of valid values.
*
* So, instead we opt for using an enum that contains all valid priority values
* as strings, as well as the "inherit" string to indicate that the priority
* value should not be changed.
*/
static const struct config_enum_entry cpu_priority_options[] = {
{ "inherit", CPU_PRIORITY_INHERIT, false },
{ "-20", -20, false},
{ "-19", -19, false},
{ "-18", -18, false},
{ "-17", -17, false},
{ "-16", -16, false},
{ "-15", -15, false},
{ "-14", -14, false},
{ "-13", -13, false},
{ "-12", -12, false},
{ "-11", -11, false},
{ "-10", -10, false},
{ "-9", -9, false},
{ "-8", -8, false},
{ "-7", -7, false},
{ "-6", -6, false},
{ "-5", -5, false},
{ "-4", -4, false},
{ "-3", -3, false},
{ "-2", -2, false},
{ "-1", -1, false},
{ "0", 0, false},
{ "1", 1, false},
{ "2", 2, false},
{ "3", 3, false},
{ "4", 4, false},
{ "5", 5, false},
{ "6", 6, false},
{ "7", 7, false},
{ "8", 8, false},
{ "9", 9, false},
{ "10", 10, false},
{ "11", 11, false},
{ "12", 12, false},
{ "13", 13, false},
{ "14", 14, false},
{ "15", 15, false},
{ "16", 16, false},
{ "17", 17, false},
{ "18", 18, false},
{ "19", 19, false},
{ NULL, 0, false}
};
/* *INDENT-ON* */
@ -843,6 +903,46 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
/*
* This doesn't use cpu_priority_options on purpose, because we always need
* to know the actual priority value so that `RESET citus.cpu_priority`
* actually changes the priority back.
*/
DefineCustomIntVariable(
"citus.cpu_priority",
gettext_noop("Sets the CPU priority of the current backend."),
gettext_noop("Lower numbers cause more favorable scheduling, so the "
"queries that this backend runs will be able to use more "
"CPU resources compared to queries from other backends. "
"WARNING: Changing this setting can lead to a pnemomenom "
"called 'priority inversion', due to locks being held "
"between different backends. This means that processes "
"might be scheduled in the exact oposite way of what you "
"want, i.e. processes that you want scheduled a lot, are "
"scheduled very little. So use this setting at your own "
"risk."),
&CpuPriority,
GetOwnPriority(), -20, 19,
PGC_SUSET,
GUC_STANDARD,
NULL, CpuPriorityAssignHook, NULL);
DefineCustomEnumVariable(
"citus.cpu_priority_for_logical_replication_senders",
gettext_noop("Sets the CPU priority for backends that send logical "
"replication changes to other nodes for online shard "
"moves and splits."),
gettext_noop("Lower numbers cause more favorable scheduling, so the "
"backends used to do the shard move will get more CPU "
"resources. 'inherit' is a special value and disables "
"overriding the CPU priority for backends that send "
"logical replication changes."),
&CpuPriorityLogicalRepSender,
CPU_PRIORITY_INHERIT, cpu_priority_options,
PGC_SUSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.create_object_propagation",
gettext_noop("Controls the behavior of CREATE statements in transactions for "
@ -1564,6 +1664,20 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_high_priority_background_processes",
gettext_noop("Sets the maximum number of background processes "
"that can have their CPU priority increased at the same "
"time on a specific node."),
gettext_noop("This setting is useful to make sure logical replication "
"senders don't take over the CPU of the entire machine."),
&MaxHighPriorityBackgroundProcesess,
2, 0, 10000,
PGC_SUSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_intermediate_result_size",
gettext_noop("Sets the maximum size of the intermediate results in KB for "
@ -2353,6 +2467,17 @@ NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source)
}
/*
* CpuPriorityAssignHook changes the priority of the current backend to match
* the chosen value.
*/
static void
CpuPriorityAssignHook(int newval, void *extra)
{
SetOwnPriority(newval);
}
/*
* NodeConninfoGucAssignHook is the assignment hook for the node_conninfo GUC
* variable. Though this GUC is a "string", we actually parse it as a non-URI

View File

@ -0,0 +1,72 @@
/*-------------------------------------------------------------------------
*
* priority.c
* Utilities for managing CPU priority.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <unistd.h>
#include <sys/resource.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include "distributed/priority.h"
int CpuPriority = 0;
int CpuPriorityLogicalRepSender = CPU_PRIORITY_INHERIT;
int MaxHighPriorityBackgroundProcesess = 2;
/*
* SetOwnPriority changes the CPU priority of the current backend to the given
* priority. If the OS disallows us to set the priority to the given value, we
* only warn about it.
*/
void
SetOwnPriority(int priority)
{
if (priority == CPU_PRIORITY_INHERIT)
{
return;
}
if (setpriority(PRIO_PROCESS, getpid(), priority) == -1)
{
ereport(WARNING, (
errmsg("could not set cpu priority to %d: %m", priority),
errhint("Try changing the 'nice' resource limit by changing "
"/etc/security/limits.conf for the postgres user "
"and/or by setting LimitNICE in your the systemd "
"service file (depending on how you start "
"postgres)."
)));
}
}
/*
* GetOwnPriority returns the current CPU priority value of the backend.
*/
int
GetOwnPriority(void)
{
errno = 0;
int result = getpriority(PRIO_PROCESS, getpid());
/*
* We explicitly check errno too because getpriority can return -1 on
* success too, if the actual priority value is -1
*/
if (result == -1 && errno != 0)
{
ereport(WARNING, (errmsg("could not get current cpu priority value, "
"assuming 0: %m")));
return 0;
}
return result;
}

View File

@ -0,0 +1,31 @@
/*-------------------------------------------------------------------------
*
* priority.h
* Shared declarations for managing CPU priority.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CITUS_PRIORITY_H
#define CITUS_PRIORITY_H
#include "c.h"
#include "lib/stringinfo.h"
#include "nodes/parsenodes.h"
#include "storage/fd.h"
extern int CpuPriority;
extern int CpuPriorityLogicalRepSender;
extern int MaxHighPriorityBackgroundProcesess;
#define CPU_PRIORITY_INHERIT 1234
/* Function declarations for transmitting files between two nodes */
extern void SetOwnPriority(int priority);
extern int GetOwnPriority(void);
#endif /* CITUS_PRIORITY_H */

View File

@ -43,6 +43,11 @@ s/"citus_local_table_([0-9]+)_[0-9]+"/"citus_local_table_\1_xxxxxxx"/g
# normalize relation oid suffix for the truncate triggers created by citus
s/truncate_trigger_[0-9]+/truncate_trigger_xxxxxxx/g
# shard move subscription and publication names contain the oid of the
# table owner, which can change across runs
s/(citus_shard_(move|split)_subscription_)[0-9]+/\1xxxxxxx/g
s/(citus_shard_(move|split)_(slot|publication)_)[0-9]+_[0-9]+/\1xxxxxxx_xxxxxxx/g
# In foreign_key_restriction_enforcement, normalize shard names
s/"(on_update_fkey_table_|fkey_)[0-9]+"/"\1xxxxxxx"/g

View File

@ -0,0 +1,198 @@
CREATE SCHEMA cpu_priority;
SET search_path TO cpu_priority;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 11568900;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- This test depends on the fact that CI and dev machines don't have their
-- resource limits configured in a way that allows raising CPU priority. This
-- still tries to test as much functionality as possible by expecting certain
-- error messages to appear, but it's not the environment production is
-- supposed to be running as.
CREATE USER cpu_priority_user1;
CREATE USER cpu_priority_user2;
CREATE USER cpu_priority_user3;
CREATE USER cpu_priority_user_background;
GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user1;
GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user2;
GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user3;
GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user_background;
CREATE TABLE t1(a int);
CREATE TABLE t2(a int);
CREATE TABLE t3(a int);
CREATE TABLE t4(a int);
SELECT create_distributed_table('t1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('t2', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('t3', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('t4', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO t1 SELECT generate_series(1, 100);
INSERT INTO t2 SELECT generate_series(1, 100);
INSERT INTO t3 SELECT generate_series(1, 100);
INSERT INTO t4 SELECT generate_series(1, 100);
ALTER TABLE t1 OWNER TO cpu_priority_user1;
ALTER TABLE t2 OWNER TO cpu_priority_user2;
ALTER TABLE t3 OWNER TO cpu_priority_user3;
-- gives a warning because this is not allowed
SET citus.cpu_priority = -20;
WARNING: could not set cpu priority to -20: Permission denied
HINT: Try changing the 'nice' resource limit by changing /etc/security/limits.conf for the postgres user and/or by setting LimitNICE in your the systemd service file (depending on how you start postgres).
-- no-op should be allowed
SET citus.cpu_priority = 0;
-- lowering should be allowed
SET citus.cpu_priority = 1;
-- resetting should be allowed, but warn;
RESET citus.cpu_priority;
WARNING: could not set cpu priority to 0: Permission denied
HINT: Try changing the 'nice' resource limit by changing /etc/security/limits.conf for the postgres user and/or by setting LimitNICE in your the systemd service file (depending on how you start postgres).
SET citus.propagate_set_commands = local;
BEGIN;
SET LOCAL citus.cpu_priority = 10;
SELECT count(*) FROM t1;
count
---------------------------------------------------------------------
100
(1 row)
-- warning is expected here because raising isn't allowed by the OS
COMMIT;
WARNING: could not set cpu priority to 0: Permission denied
HINT: Try changing the 'nice' resource limit by changing /etc/security/limits.conf for the postgres user and/or by setting LimitNICE in your the systemd service file (depending on how you start postgres).
-- reconnect to get a new backend to reset our priority
\c - - - -
SET search_path TO cpu_priority;
-- Make sure shard moves use citus.cpu_priority_for_logical_replication_senders
-- in their CREATE SUBSCRIPTION commands.
SET citus.log_remote_commands TO ON;
SET citus.grep_remote_commands = '%CREATE SUBSCRIPTION%';
SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
SET citus.cpu_priority_for_logical_replication_senders = 15;
SELECT master_move_shard_placement(11568900, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
SET citus.max_high_priority_background_processes = 3;
SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 4 orphaned shards
-- Make sure shard splits use citus.cpu_priority_for_logical_replication_senders
-- in their CREATE SUBSCRIPTION commands.
SELECT pg_catalog.citus_split_shard_by_split_points(
11568900,
ARRAY['-1500000000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
ALTER USER cpu_priority_user_background SET citus.cpu_priority = 5;
\c - cpu_priority_user_background - -
show citus.cpu_priority;
citus.cpu_priority
---------------------------------------------------------------------
5
(1 row)
show citus.cpu_priority_for_logical_replication_senders;
citus.cpu_priority_for_logical_replication_senders
---------------------------------------------------------------------
inherit
(1 row)
show citus.max_high_priority_background_processes;
citus.max_high_priority_background_processes
---------------------------------------------------------------------
2
(1 row)
-- not alowed to change any of the settings related to CPU priority
SET citus.cpu_priority = 4;
ERROR: permission denied to set parameter "citus.cpu_priority"
SET citus.cpu_priority = 6;
ERROR: permission denied to set parameter "citus.cpu_priority"
SET citus.cpu_priority_for_logical_replication_senders = 15;
ERROR: permission denied to set parameter "citus.cpu_priority_for_logical_replication_senders"
SET citus.max_high_priority_background_processes = 3;
ERROR: permission denied to set parameter "citus.max_high_priority_background_processes"
\c - postgres - -
SET search_path TO cpu_priority;
SET client_min_messages TO WARNING;
DROP SCHEMA cpu_priority CASCADE;
DROP USER cpu_priority_user1;
DROP USER cpu_priority_user2;
DROP USER cpu_priority_user3;
DROP USER cpu_priority_user_background;

View File

@ -4,6 +4,19 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13000000;
SET citus.shard_count TO 6;
SET citus.shard_replication_factor TO 1;
-- this function is dropped in Citus10, added here for tests
SET citus.enable_metadata_sync TO OFF;
CREATE OR REPLACE FUNCTION pg_catalog.master_create_distributed_table(table_name regclass,
distribution_column text,
distribution_method citus.distribution_type)
RETURNS void
LANGUAGE C STRICT
AS 'citus', $$master_create_distributed_table$$;
COMMENT ON FUNCTION pg_catalog.master_create_distributed_table(table_name regclass,
distribution_column text,
distribution_method citus.distribution_type)
IS 'define the table distribution functions';
RESET citus.enable_metadata_sync;
-- create distributed tables
CREATE TABLE table1_group1 ( id int PRIMARY KEY);
SELECT create_distributed_table('table1_group1', 'id', 'hash');
@ -886,6 +899,11 @@ WHERE logicalrelid::text LIKE 'move_partitions.events%' AND nodeport = :worker_1
(1 row)
DROP TABLE move_partitions.events;
-- set back to the defaults and drop the table
-- set back to the defaults and drop the tables
SET client_min_messages TO DEFAULT;
DROP TABLE test_with_pkey;
DROP TABLE table2_group1;
DROP TABLE table1_group1;
DROP TABLE table5_groupX;
DROP TABLE table6_append;
DROP TABLE serial_move_test;

View File

@ -9,4 +9,5 @@ test: multi_move_mx
test: shard_move_deferred_delete
test: multi_colocated_shard_rebalance
test: ignoring_orphaned_shards
test: cpu_priority
test: check_mx

View File

@ -458,6 +458,10 @@ push(@pgOptions, "wal_receiver_status_interval=1");
# src/backend/replication/logical/launcher.c.
push(@pgOptions, "wal_retrieve_retry_interval=1000");
push(@pgOptions, "max_logical_replication_workers=50");
push(@pgOptions, "max_wal_senders=50");
push(@pgOptions, "max_worker_processes=50");
if ($majorversion >= "14") {
# disable compute_query_id so that we don't get Query Identifiers
# in explain outputs

View File

@ -0,0 +1,103 @@
CREATE SCHEMA cpu_priority;
SET search_path TO cpu_priority;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 11568900;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- This test depends on the fact that CI and dev machines don't have their
-- resource limits configured in a way that allows raising CPU priority. This
-- still tries to test as much functionality as possible by expecting certain
-- error messages to appear, but it's not the environment production is
-- supposed to be running as.
CREATE USER cpu_priority_user1;
CREATE USER cpu_priority_user2;
CREATE USER cpu_priority_user3;
CREATE USER cpu_priority_user_background;
GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user1;
GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user2;
GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user3;
GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user_background;
CREATE TABLE t1(a int);
CREATE TABLE t2(a int);
CREATE TABLE t3(a int);
CREATE TABLE t4(a int);
SELECT create_distributed_table('t1', 'a');
SELECT create_distributed_table('t2', 'a');
SELECT create_distributed_table('t3', 'a');
SELECT create_distributed_table('t4', 'a');
INSERT INTO t1 SELECT generate_series(1, 100);
INSERT INTO t2 SELECT generate_series(1, 100);
INSERT INTO t3 SELECT generate_series(1, 100);
INSERT INTO t4 SELECT generate_series(1, 100);
ALTER TABLE t1 OWNER TO cpu_priority_user1;
ALTER TABLE t2 OWNER TO cpu_priority_user2;
ALTER TABLE t3 OWNER TO cpu_priority_user3;
-- gives a warning because this is not allowed
SET citus.cpu_priority = -20;
-- no-op should be allowed
SET citus.cpu_priority = 0;
-- lowering should be allowed
SET citus.cpu_priority = 1;
-- resetting should be allowed, but warn;
RESET citus.cpu_priority;
SET citus.propagate_set_commands = local;
BEGIN;
SET LOCAL citus.cpu_priority = 10;
SELECT count(*) FROM t1;
-- warning is expected here because raising isn't allowed by the OS
COMMIT;
-- reconnect to get a new backend to reset our priority
\c - - - -
SET search_path TO cpu_priority;
-- Make sure shard moves use citus.cpu_priority_for_logical_replication_senders
-- in their CREATE SUBSCRIPTION commands.
SET citus.log_remote_commands TO ON;
SET citus.grep_remote_commands = '%CREATE SUBSCRIPTION%';
SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
SET citus.cpu_priority_for_logical_replication_senders = 15;
SELECT master_move_shard_placement(11568900, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
SET citus.max_high_priority_background_processes = 3;
SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
CALL citus_cleanup_orphaned_shards();
-- Make sure shard splits use citus.cpu_priority_for_logical_replication_senders
-- in their CREATE SUBSCRIPTION commands.
SELECT pg_catalog.citus_split_shard_by_split_points(
11568900,
ARRAY['-1500000000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
ALTER USER cpu_priority_user_background SET citus.cpu_priority = 5;
\c - cpu_priority_user_background - -
show citus.cpu_priority;
show citus.cpu_priority_for_logical_replication_senders;
show citus.max_high_priority_background_processes;
-- not alowed to change any of the settings related to CPU priority
SET citus.cpu_priority = 4;
SET citus.cpu_priority = 6;
SET citus.cpu_priority_for_logical_replication_senders = 15;
SET citus.max_high_priority_background_processes = 3;
\c - postgres - -
SET search_path TO cpu_priority;
SET client_min_messages TO WARNING;
DROP SCHEMA cpu_priority CASCADE;
DROP USER cpu_priority_user1;
DROP USER cpu_priority_user2;
DROP USER cpu_priority_user3;
DROP USER cpu_priority_user_background;

View File

@ -7,6 +7,20 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13000000;
SET citus.shard_count TO 6;
SET citus.shard_replication_factor TO 1;
-- this function is dropped in Citus10, added here for tests
SET citus.enable_metadata_sync TO OFF;
CREATE OR REPLACE FUNCTION pg_catalog.master_create_distributed_table(table_name regclass,
distribution_column text,
distribution_method citus.distribution_type)
RETURNS void
LANGUAGE C STRICT
AS 'citus', $$master_create_distributed_table$$;
COMMENT ON FUNCTION pg_catalog.master_create_distributed_table(table_name regclass,
distribution_column text,
distribution_method citus.distribution_type)
IS 'define the table distribution functions';
RESET citus.enable_metadata_sync;
-- create distributed tables
CREATE TABLE table1_group1 ( id int PRIMARY KEY);
@ -479,6 +493,11 @@ WHERE logicalrelid::text LIKE 'move_partitions.events%' AND nodeport = :worker_1
DROP TABLE move_partitions.events;
-- set back to the defaults and drop the table
-- set back to the defaults and drop the tables
SET client_min_messages TO DEFAULT;
DROP TABLE test_with_pkey;
DROP TABLE table2_group1;
DROP TABLE table1_group1;
DROP TABLE table5_groupX;
DROP TABLE table6_append;
DROP TABLE serial_move_test;