diff --git a/src/backend/distributed/operations/worker_copy_table_to_node_udf.c b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c index 46391160c..7af80ef55 100644 --- a/src/backend/distributed/operations/worker_copy_table_to_node_udf.c +++ b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c @@ -18,6 +18,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" +#include "distributed/priority.h" #include "distributed/worker_shard_copy.h" PG_FUNCTION_INFO_V1(worker_copy_table_to_node); diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 794481b9a..fae6f7c39 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -41,6 +41,7 @@ #include "distributed/multi_join_order.h" #include "distributed/multi_logical_replication.h" #include "distributed/multi_partitioning_utils.h" +#include "distributed/priority.h" #include "distributed/distributed_planner.h" #include "distributed/remote_commands.h" #include "distributed/repair_shards.h" @@ -1794,6 +1795,13 @@ CreateSubscriptions(MultiConnection *sourceConnection, sourceConnection->port, escape_param_str(sourceConnection->user), escape_param_str( databaseName)); + if (CpuPriorityLogicalRepSender != CPU_PRIORITY_INHERIT && + list_length(logicalRepTargetList) <= MaxHighPriorityBackgroundProcesess) + { + appendStringInfo(conninfo, + " options='-c citus.cpu_priority=%d'", + CpuPriorityLogicalRepSender); + } StringInfo createSubscriptionCommand = makeStringInfo(); appendStringInfo(createSubscriptionCommand, diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index e9b703e11..8ffccb90c 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -16,6 +16,7 @@ #include "distributed/hash_helpers.h" #include "distributed/metadata_cache.h" #include "distributed/multi_partitioning_utils.h" +#include "distributed/priority.h" #include "distributed/shardinterval_utils.h" #include "distributed/connection_management.h" #include "distributed/remote_commands.h" diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index a76a9a6a1..dd79f8d1e 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -67,6 +67,7 @@ #include "distributed/multi_server_executor.h" #include "distributed/pg_dist_partition.h" #include "distributed/placement_connection.h" +#include "distributed/priority.h" #include "distributed/query_stats.h" #include "distributed/recursive_planning.h" #include "distributed/reference_table_utils.h" @@ -184,6 +185,7 @@ static bool ShowShardsForAppNamePrefixesCheckHook(char **newval, void **extra, GucSource source); static void ShowShardsForAppNamePrefixesAssignHook(const char *newval, void *extra); static void ApplicationNameAssignHook(const char *newval, void *extra); +static void CpuPriorityAssignHook(int newval, void *extra); static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source); static void NodeConninfoGucAssignHook(const char *newval, void *extra); static const char * MaxSharedPoolSizeGucShowHook(void); @@ -288,6 +290,64 @@ static const struct config_enum_entry create_object_propagation_options[] = { {NULL, 0, false} }; +/* + * This used to choose CPU priorities for GUCs. For most other integer options + * we use the -1 value as inherit/default/unset. For CPU priorities this isn't + * possible, because they can actually have negative values. So we need a value + * outside of the range that's valid for priorities. But if this is only one + * more or less than the valid values, this can also be quite confusing for + * people that don't know the exact range of valid values. + * + * So, instead we opt for using an enum that contains all valid priority values + * as strings, as well as the "inherit" string to indicate that the priority + * value should not be changed. + */ +static const struct config_enum_entry cpu_priority_options[] = { + { "inherit", CPU_PRIORITY_INHERIT, false }, + { "-20", -20, false}, + { "-19", -19, false}, + { "-18", -18, false}, + { "-17", -17, false}, + { "-16", -16, false}, + { "-15", -15, false}, + { "-14", -14, false}, + { "-13", -13, false}, + { "-12", -12, false}, + { "-11", -11, false}, + { "-10", -10, false}, + { "-9", -9, false}, + { "-8", -8, false}, + { "-7", -7, false}, + { "-6", -6, false}, + { "-5", -5, false}, + { "-4", -4, false}, + { "-3", -3, false}, + { "-2", -2, false}, + { "-1", -1, false}, + { "0", 0, false}, + { "1", 1, false}, + { "2", 2, false}, + { "3", 3, false}, + { "4", 4, false}, + { "5", 5, false}, + { "6", 6, false}, + { "7", 7, false}, + { "8", 8, false}, + { "9", 9, false}, + { "10", 10, false}, + { "11", 11, false}, + { "12", 12, false}, + { "13", 13, false}, + { "14", 14, false}, + { "15", 15, false}, + { "16", 16, false}, + { "17", 17, false}, + { "18", 18, false}, + { "19", 19, false}, + { NULL, 0, false} +}; + + /* *INDENT-ON* */ @@ -843,6 +903,46 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + /* + * This doesn't use cpu_priority_options on purpose, because we always need + * to know the actual priority value so that `RESET citus.cpu_priority` + * actually changes the priority back. + */ + DefineCustomIntVariable( + "citus.cpu_priority", + gettext_noop("Sets the CPU priority of the current backend."), + gettext_noop("Lower numbers cause more favorable scheduling, so the " + "queries that this backend runs will be able to use more " + "CPU resources compared to queries from other backends. " + "WARNING: Changing this setting can lead to a pnemomenom " + "called 'priority inversion', due to locks being held " + "between different backends. This means that processes " + "might be scheduled in the exact oposite way of what you " + "want, i.e. processes that you want scheduled a lot, are " + "scheduled very little. So use this setting at your own " + "risk."), + &CpuPriority, + GetOwnPriority(), -20, 19, + PGC_SUSET, + GUC_STANDARD, + NULL, CpuPriorityAssignHook, NULL); + + DefineCustomEnumVariable( + "citus.cpu_priority_for_logical_replication_senders", + gettext_noop("Sets the CPU priority for backends that send logical " + "replication changes to other nodes for online shard " + "moves and splits."), + gettext_noop("Lower numbers cause more favorable scheduling, so the " + "backends used to do the shard move will get more CPU " + "resources. 'inherit' is a special value and disables " + "overriding the CPU priority for backends that send " + "logical replication changes."), + &CpuPriorityLogicalRepSender, + CPU_PRIORITY_INHERIT, cpu_priority_options, + PGC_SUSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomEnumVariable( "citus.create_object_propagation", gettext_noop("Controls the behavior of CREATE statements in transactions for " @@ -1564,6 +1664,20 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_high_priority_background_processes", + gettext_noop("Sets the maximum number of background processes " + "that can have their CPU priority increased at the same " + "time on a specific node."), + gettext_noop("This setting is useful to make sure logical replication " + "senders don't take over the CPU of the entire machine."), + &MaxHighPriorityBackgroundProcesess, + 2, 0, 10000, + PGC_SUSET, + GUC_STANDARD, + NULL, NULL, NULL); + + DefineCustomIntVariable( "citus.max_intermediate_result_size", gettext_noop("Sets the maximum size of the intermediate results in KB for " @@ -2353,6 +2467,17 @@ NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source) } +/* + * CpuPriorityAssignHook changes the priority of the current backend to match + * the chosen value. + */ +static void +CpuPriorityAssignHook(int newval, void *extra) +{ + SetOwnPriority(newval); +} + + /* * NodeConninfoGucAssignHook is the assignment hook for the node_conninfo GUC * variable. Though this GUC is a "string", we actually parse it as a non-URI diff --git a/src/backend/distributed/utils/priority.c b/src/backend/distributed/utils/priority.c new file mode 100644 index 000000000..2e7972d2d --- /dev/null +++ b/src/backend/distributed/utils/priority.c @@ -0,0 +1,72 @@ +/*------------------------------------------------------------------------- + * + * priority.c + * Utilities for managing CPU priority. + * + * Copyright (c) Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include +#include +#include +#include +#include + +#include "distributed/priority.h" + +int CpuPriority = 0; +int CpuPriorityLogicalRepSender = CPU_PRIORITY_INHERIT; +int MaxHighPriorityBackgroundProcesess = 2; + + +/* + * SetOwnPriority changes the CPU priority of the current backend to the given + * priority. If the OS disallows us to set the priority to the given value, we + * only warn about it. + */ +void +SetOwnPriority(int priority) +{ + if (priority == CPU_PRIORITY_INHERIT) + { + return; + } + + if (setpriority(PRIO_PROCESS, getpid(), priority) == -1) + { + ereport(WARNING, ( + errmsg("could not set cpu priority to %d: %m", priority), + errhint("Try changing the 'nice' resource limit by changing " + "/etc/security/limits.conf for the postgres user " + "and/or by setting LimitNICE in your the systemd " + "service file (depending on how you start " + "postgres)." + ))); + } +} + + +/* + * GetOwnPriority returns the current CPU priority value of the backend. + */ +int +GetOwnPriority(void) +{ + errno = 0; + int result = getpriority(PRIO_PROCESS, getpid()); + + /* + * We explicitly check errno too because getpriority can return -1 on + * success too, if the actual priority value is -1 + */ + if (result == -1 && errno != 0) + { + ereport(WARNING, (errmsg("could not get current cpu priority value, " + "assuming 0: %m"))); + return 0; + } + return result; +} diff --git a/src/include/distributed/priority.h b/src/include/distributed/priority.h new file mode 100644 index 000000000..0f2d2b863 --- /dev/null +++ b/src/include/distributed/priority.h @@ -0,0 +1,31 @@ +/*------------------------------------------------------------------------- + * + * priority.h + * Shared declarations for managing CPU priority. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_PRIORITY_H +#define CITUS_PRIORITY_H + +#include "c.h" + +#include "lib/stringinfo.h" +#include "nodes/parsenodes.h" +#include "storage/fd.h" + +extern int CpuPriority; +extern int CpuPriorityLogicalRepSender; +extern int MaxHighPriorityBackgroundProcesess; + +#define CPU_PRIORITY_INHERIT 1234 + +/* Function declarations for transmitting files between two nodes */ +extern void SetOwnPriority(int priority); +extern int GetOwnPriority(void); + + +#endif /* CITUS_PRIORITY_H */ diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 8bae7763f..19bbdaaea 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -43,6 +43,11 @@ s/"citus_local_table_([0-9]+)_[0-9]+"/"citus_local_table_\1_xxxxxxx"/g # normalize relation oid suffix for the truncate triggers created by citus s/truncate_trigger_[0-9]+/truncate_trigger_xxxxxxx/g +# shard move subscription and publication names contain the oid of the +# table owner, which can change across runs +s/(citus_shard_(move|split)_subscription_)[0-9]+/\1xxxxxxx/g +s/(citus_shard_(move|split)_(slot|publication)_)[0-9]+_[0-9]+/\1xxxxxxx_xxxxxxx/g + # In foreign_key_restriction_enforcement, normalize shard names s/"(on_update_fkey_table_|fkey_)[0-9]+"/"\1xxxxxxx"/g diff --git a/src/test/regress/expected/cpu_priority.out b/src/test/regress/expected/cpu_priority.out new file mode 100644 index 000000000..5a397ef1c --- /dev/null +++ b/src/test/regress/expected/cpu_priority.out @@ -0,0 +1,198 @@ +CREATE SCHEMA cpu_priority; +SET search_path TO cpu_priority; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 11568900; +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +-- This test depends on the fact that CI and dev machines don't have their +-- resource limits configured in a way that allows raising CPU priority. This +-- still tries to test as much functionality as possible by expecting certain +-- error messages to appear, but it's not the environment production is +-- supposed to be running as. +CREATE USER cpu_priority_user1; +CREATE USER cpu_priority_user2; +CREATE USER cpu_priority_user3; +CREATE USER cpu_priority_user_background; +GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user1; +GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user2; +GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user3; +GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user_background; +CREATE TABLE t1(a int); +CREATE TABLE t2(a int); +CREATE TABLE t3(a int); +CREATE TABLE t4(a int); +SELECT create_distributed_table('t1', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('t2', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('t3', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('t4', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO t1 SELECT generate_series(1, 100); +INSERT INTO t2 SELECT generate_series(1, 100); +INSERT INTO t3 SELECT generate_series(1, 100); +INSERT INTO t4 SELECT generate_series(1, 100); +ALTER TABLE t1 OWNER TO cpu_priority_user1; +ALTER TABLE t2 OWNER TO cpu_priority_user2; +ALTER TABLE t3 OWNER TO cpu_priority_user3; +-- gives a warning because this is not allowed +SET citus.cpu_priority = -20; +WARNING: could not set cpu priority to -20: Permission denied +HINT: Try changing the 'nice' resource limit by changing /etc/security/limits.conf for the postgres user and/or by setting LimitNICE in your the systemd service file (depending on how you start postgres). +-- no-op should be allowed +SET citus.cpu_priority = 0; +-- lowering should be allowed +SET citus.cpu_priority = 1; +-- resetting should be allowed, but warn; +RESET citus.cpu_priority; +WARNING: could not set cpu priority to 0: Permission denied +HINT: Try changing the 'nice' resource limit by changing /etc/security/limits.conf for the postgres user and/or by setting LimitNICE in your the systemd service file (depending on how you start postgres). +SET citus.propagate_set_commands = local; +BEGIN; + SET LOCAL citus.cpu_priority = 10; + SELECT count(*) FROM t1; + count +--------------------------------------------------------------------- + 100 +(1 row) + + -- warning is expected here because raising isn't allowed by the OS +COMMIT; +WARNING: could not set cpu priority to 0: Permission denied +HINT: Try changing the 'nice' resource limit by changing /etc/security/limits.conf for the postgres user and/or by setting LimitNICE in your the systemd service file (depending on how you start postgres). +-- reconnect to get a new backend to reset our priority +\c - - - - +SET search_path TO cpu_priority; +-- Make sure shard moves use citus.cpu_priority_for_logical_replication_senders +-- in their CREATE SUBSCRIPTION commands. +SET citus.log_remote_commands TO ON; +SET citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; +SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SET citus.cpu_priority_for_logical_replication_senders = 15; +SELECT master_move_shard_placement(11568900, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical'); +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SET citus.max_high_priority_background_processes = 3; +SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 4 orphaned shards +-- Make sure shard splits use citus.cpu_priority_for_logical_replication_senders +-- in their CREATE SUBSCRIPTION commands. +SELECT pg_catalog.citus_split_shard_by_split_points( + 11568900, + ARRAY['-1500000000'], + ARRAY[:worker_1_node, :worker_2_node], + 'force_logical'); +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +ALTER USER cpu_priority_user_background SET citus.cpu_priority = 5; +\c - cpu_priority_user_background - - +show citus.cpu_priority; + citus.cpu_priority +--------------------------------------------------------------------- + 5 +(1 row) + +show citus.cpu_priority_for_logical_replication_senders; + citus.cpu_priority_for_logical_replication_senders +--------------------------------------------------------------------- + inherit +(1 row) + +show citus.max_high_priority_background_processes; + citus.max_high_priority_background_processes +--------------------------------------------------------------------- + 2 +(1 row) + +-- not alowed to change any of the settings related to CPU priority +SET citus.cpu_priority = 4; +ERROR: permission denied to set parameter "citus.cpu_priority" +SET citus.cpu_priority = 6; +ERROR: permission denied to set parameter "citus.cpu_priority" +SET citus.cpu_priority_for_logical_replication_senders = 15; +ERROR: permission denied to set parameter "citus.cpu_priority_for_logical_replication_senders" +SET citus.max_high_priority_background_processes = 3; +ERROR: permission denied to set parameter "citus.max_high_priority_background_processes" +\c - postgres - - +SET search_path TO cpu_priority; +SET client_min_messages TO WARNING; +DROP SCHEMA cpu_priority CASCADE; +DROP USER cpu_priority_user1; +DROP USER cpu_priority_user2; +DROP USER cpu_priority_user3; +DROP USER cpu_priority_user_background; diff --git a/src/test/regress/expected/multi_colocated_shard_rebalance.out b/src/test/regress/expected/multi_colocated_shard_rebalance.out index 7484dc702..68b8925ed 100644 --- a/src/test/regress/expected/multi_colocated_shard_rebalance.out +++ b/src/test/regress/expected/multi_colocated_shard_rebalance.out @@ -4,6 +4,19 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13000000; SET citus.shard_count TO 6; SET citus.shard_replication_factor TO 1; +-- this function is dropped in Citus10, added here for tests +SET citus.enable_metadata_sync TO OFF; +CREATE OR REPLACE FUNCTION pg_catalog.master_create_distributed_table(table_name regclass, + distribution_column text, + distribution_method citus.distribution_type) + RETURNS void + LANGUAGE C STRICT + AS 'citus', $$master_create_distributed_table$$; +COMMENT ON FUNCTION pg_catalog.master_create_distributed_table(table_name regclass, + distribution_column text, + distribution_method citus.distribution_type) + IS 'define the table distribution functions'; +RESET citus.enable_metadata_sync; -- create distributed tables CREATE TABLE table1_group1 ( id int PRIMARY KEY); SELECT create_distributed_table('table1_group1', 'id', 'hash'); @@ -886,6 +899,11 @@ WHERE logicalrelid::text LIKE 'move_partitions.events%' AND nodeport = :worker_1 (1 row) DROP TABLE move_partitions.events; --- set back to the defaults and drop the table +-- set back to the defaults and drop the tables SET client_min_messages TO DEFAULT; DROP TABLE test_with_pkey; +DROP TABLE table2_group1; +DROP TABLE table1_group1; +DROP TABLE table5_groupX; +DROP TABLE table6_append; +DROP TABLE serial_move_test; diff --git a/src/test/regress/operations_schedule b/src/test/regress/operations_schedule index 353eabfcd..3eeb3e8db 100644 --- a/src/test/regress/operations_schedule +++ b/src/test/regress/operations_schedule @@ -9,4 +9,5 @@ test: multi_move_mx test: shard_move_deferred_delete test: multi_colocated_shard_rebalance test: ignoring_orphaned_shards +test: cpu_priority test: check_mx diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 4b60a2d6a..4bceecd7f 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -458,6 +458,10 @@ push(@pgOptions, "wal_receiver_status_interval=1"); # src/backend/replication/logical/launcher.c. push(@pgOptions, "wal_retrieve_retry_interval=1000"); +push(@pgOptions, "max_logical_replication_workers=50"); +push(@pgOptions, "max_wal_senders=50"); +push(@pgOptions, "max_worker_processes=50"); + if ($majorversion >= "14") { # disable compute_query_id so that we don't get Query Identifiers # in explain outputs diff --git a/src/test/regress/sql/cpu_priority.sql b/src/test/regress/sql/cpu_priority.sql new file mode 100644 index 000000000..787018622 --- /dev/null +++ b/src/test/regress/sql/cpu_priority.sql @@ -0,0 +1,103 @@ +CREATE SCHEMA cpu_priority; +SET search_path TO cpu_priority; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 11568900; + +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + +-- This test depends on the fact that CI and dev machines don't have their +-- resource limits configured in a way that allows raising CPU priority. This +-- still tries to test as much functionality as possible by expecting certain +-- error messages to appear, but it's not the environment production is +-- supposed to be running as. + +CREATE USER cpu_priority_user1; +CREATE USER cpu_priority_user2; +CREATE USER cpu_priority_user3; +CREATE USER cpu_priority_user_background; +GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user1; +GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user2; +GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user3; +GRANT ALL ON SCHEMA cpu_priority to cpu_priority_user_background; + +CREATE TABLE t1(a int); +CREATE TABLE t2(a int); +CREATE TABLE t3(a int); +CREATE TABLE t4(a int); +SELECT create_distributed_table('t1', 'a'); +SELECT create_distributed_table('t2', 'a'); +SELECT create_distributed_table('t3', 'a'); +SELECT create_distributed_table('t4', 'a'); +INSERT INTO t1 SELECT generate_series(1, 100); +INSERT INTO t2 SELECT generate_series(1, 100); +INSERT INTO t3 SELECT generate_series(1, 100); +INSERT INTO t4 SELECT generate_series(1, 100); + +ALTER TABLE t1 OWNER TO cpu_priority_user1; +ALTER TABLE t2 OWNER TO cpu_priority_user2; +ALTER TABLE t3 OWNER TO cpu_priority_user3; + +-- gives a warning because this is not allowed +SET citus.cpu_priority = -20; +-- no-op should be allowed +SET citus.cpu_priority = 0; +-- lowering should be allowed +SET citus.cpu_priority = 1; +-- resetting should be allowed, but warn; +RESET citus.cpu_priority; + +SET citus.propagate_set_commands = local; +BEGIN; + SET LOCAL citus.cpu_priority = 10; + SELECT count(*) FROM t1; + -- warning is expected here because raising isn't allowed by the OS +COMMIT; + +-- reconnect to get a new backend to reset our priority +\c - - - - +SET search_path TO cpu_priority; + +-- Make sure shard moves use citus.cpu_priority_for_logical_replication_senders +-- in their CREATE SUBSCRIPTION commands. +SET citus.log_remote_commands TO ON; +SET citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; +SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); +SET citus.cpu_priority_for_logical_replication_senders = 15; +SELECT master_move_shard_placement(11568900, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical'); +SET citus.max_high_priority_background_processes = 3; +SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); +CALL citus_cleanup_orphaned_shards(); + +-- Make sure shard splits use citus.cpu_priority_for_logical_replication_senders +-- in their CREATE SUBSCRIPTION commands. +SELECT pg_catalog.citus_split_shard_by_split_points( + 11568900, + ARRAY['-1500000000'], + ARRAY[:worker_1_node, :worker_2_node], + 'force_logical'); + +ALTER USER cpu_priority_user_background SET citus.cpu_priority = 5; + +\c - cpu_priority_user_background - - + +show citus.cpu_priority; +show citus.cpu_priority_for_logical_replication_senders; +show citus.max_high_priority_background_processes; +-- not alowed to change any of the settings related to CPU priority +SET citus.cpu_priority = 4; +SET citus.cpu_priority = 6; +SET citus.cpu_priority_for_logical_replication_senders = 15; +SET citus.max_high_priority_background_processes = 3; + + +\c - postgres - - +SET search_path TO cpu_priority; + +SET client_min_messages TO WARNING; +DROP SCHEMA cpu_priority CASCADE; +DROP USER cpu_priority_user1; +DROP USER cpu_priority_user2; +DROP USER cpu_priority_user3; +DROP USER cpu_priority_user_background; diff --git a/src/test/regress/sql/multi_colocated_shard_rebalance.sql b/src/test/regress/sql/multi_colocated_shard_rebalance.sql index 9d05170cc..2c43460a2 100644 --- a/src/test/regress/sql/multi_colocated_shard_rebalance.sql +++ b/src/test/regress/sql/multi_colocated_shard_rebalance.sql @@ -7,6 +7,20 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13000000; SET citus.shard_count TO 6; SET citus.shard_replication_factor TO 1; +-- this function is dropped in Citus10, added here for tests +SET citus.enable_metadata_sync TO OFF; +CREATE OR REPLACE FUNCTION pg_catalog.master_create_distributed_table(table_name regclass, + distribution_column text, + distribution_method citus.distribution_type) + RETURNS void + LANGUAGE C STRICT + AS 'citus', $$master_create_distributed_table$$; +COMMENT ON FUNCTION pg_catalog.master_create_distributed_table(table_name regclass, + distribution_column text, + distribution_method citus.distribution_type) + IS 'define the table distribution functions'; +RESET citus.enable_metadata_sync; + -- create distributed tables CREATE TABLE table1_group1 ( id int PRIMARY KEY); @@ -479,6 +493,11 @@ WHERE logicalrelid::text LIKE 'move_partitions.events%' AND nodeport = :worker_1 DROP TABLE move_partitions.events; --- set back to the defaults and drop the table +-- set back to the defaults and drop the tables SET client_min_messages TO DEFAULT; DROP TABLE test_with_pkey; +DROP TABLE table2_group1; +DROP TABLE table1_group1; +DROP TABLE table5_groupX; +DROP TABLE table6_append; +DROP TABLE serial_move_test;