diff --git a/src/backend/distributed/commands/variableset.c b/src/backend/distributed/commands/variableset.c index ca17856f1..a086274e6 100644 --- a/src/backend/distributed/commands/variableset.c +++ b/src/backend/distributed/commands/variableset.c @@ -25,10 +25,6 @@ #include "utils/varlena.h" #include "distributed/remote_commands.h" - -static bool IsSettingSafeToPropagate(char *name); - - /* * ShouldPropagateSetCommand determines whether a SET or RESET command should be * propagated to the workers. @@ -80,37 +76,6 @@ ShouldPropagateSetCommand(VariableSetStmt *setStmt) } -/* - * IsSettingSafeToPropagate returns whether a SET LOCAL is safe to propagate. - * - * We exclude settings that are highly specific to the client or session and also - * ban propagating the citus.propagate_set_commands setting (not for correctness, - * more to avoid confusion). - */ -static bool -IsSettingSafeToPropagate(char *name) -{ - /* if this list grows considerably we should switch to bsearch */ - const char *skipSettings[] = { - "application_name", - "citus.propagate_set_commands", - "client_encoding", - "exit_on_error", - "max_stack_depth" - }; - - for (Index settingIndex = 0; settingIndex < lengthof(skipSettings); settingIndex++) - { - if (pg_strcasecmp(skipSettings[settingIndex], name) == 0) - { - return false; - } - } - - return true; -} - - /* * PostprocessVariableSetStmt actually does the work of propagating a provided SET stmt * to currently-participating worker nodes and adding the SET command test to a string diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 2f4ec99a3..6ae514760 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -1206,3 +1206,34 @@ StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString) appendStringInfo(queryResultString, "%s", errorMessage); } + + +/* + * IsSettingSafeToPropagate returns whether a SET LOCAL is safe to propagate. + * + * We exclude settings that are highly specific to the client or session and also + * ban propagating the citus.propagate_set_commands setting (not for correctness, + * more to avoid confusion). + */ +bool +IsSettingSafeToPropagate(const char *name) +{ + /* if this list grows considerably we should switch to bsearch */ + const char *skipSettings[] = { + "application_name", + "citus.propagate_set_commands", + "client_encoding", + "exit_on_error", + "max_stack_depth" + }; + + for (Index settingIndex = 0; settingIndex < lengthof(skipSettings); settingIndex++) + { + if (pg_strcasecmp(skipSettings[settingIndex], name) == 0) + { + return false; + } + } + + return true; +} diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index 8df06b70a..c74b86071 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -187,7 +187,11 @@ citus_cleanup_orphaned_resources(PG_FUNCTION_ARGS) void DropOrphanedShardsInSeparateTransaction(void) { - ExecuteRebalancerCommandInSeparateTransaction("CALL citus_cleanup_orphaned_shards()"); + int connectionFlag = FORCE_NEW_CONNECTION; + MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, + PostPortNumber); + ExecuteCriticalRemoteCommand(connection, "CALL citus_cleanup_orphaned_shards()"); + CloseConnection(connection); } diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 47cc94959..9f348f022 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -64,7 +64,9 @@ #include "utils/pg_lsn.h" #include "utils/syscache.h" #include "common/hashfn.h" - +#include "utils/varlena.h" +#include "utils/guc_tables.h" +#include "distributed/commands/utility_hook.h" /* RebalanceOptions are the options used to control the rebalance algorithm */ typedef struct RebalanceOptions @@ -189,6 +191,7 @@ typedef struct WorkerShardStatistics HTAB *statistics; } WorkerShardStatistics; +char *VariablesToBePassedToNewConnections = NULL; /* static declarations for main logic */ static int ShardActivePlacementCount(HTAB *activePlacementsHash, uint64 shardId, @@ -258,7 +261,7 @@ static void AddToWorkerShardIdSet(HTAB *shardsByWorker, char *workerName, int wo uint64 shardId); static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics); static void ErrorOnConcurrentRebalance(RebalanceOptions *); - +static List * GetSetCommandListForNewConnections(void); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(rebalance_table_shards); @@ -276,6 +279,7 @@ PG_FUNCTION_INFO_V1(citus_rebalance_wait); bool RunningUnderIsolationTest = false; int MaxRebalancerLoggedIgnoredMoves = 5; +bool PropagateSessionSettingsForLoopbackConnection = false; static const char *PlacementUpdateTypeNames[] = { [PLACEMENT_UPDATE_INVALID_FIRST] = "unknown", @@ -2069,9 +2073,10 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, /* - * ExecuteCriticalCommandInSeparateTransaction runs a command in a separate + * ExecuteRebalancerCommandInSeparateTransaction runs a command in a separate * transaction that is commited right away. This is useful for things that you * don't want to rollback when the current transaction is rolled back. + * Set true to 'useExclusiveTransactionBlock' to initiate a BEGIN and COMMIT statements. */ void ExecuteRebalancerCommandInSeparateTransaction(char *command) @@ -2079,17 +2084,56 @@ ExecuteRebalancerCommandInSeparateTransaction(char *command) int connectionFlag = FORCE_NEW_CONNECTION; MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, PostPortNumber); - StringInfo setApplicationName = makeStringInfo(); - appendStringInfo(setApplicationName, "SET application_name TO %s", - CITUS_REBALANCER_NAME); + List *commandList = NIL; - ExecuteCriticalRemoteCommand(connection, setApplicationName->data); - ExecuteCriticalRemoteCommand(connection, command); + commandList = lappend(commandList, psprintf("SET LOCAL application_name TO %s;", + CITUS_REBALANCER_NAME)); + if (PropagateSessionSettingsForLoopbackConnection) + { + List *setCommands = GetSetCommandListForNewConnections(); + char *setCommand = NULL; + + foreach_ptr(setCommand, setCommands) + { + commandList = lappend(commandList, setCommand); + } + } + + commandList = lappend(commandList, command); + + SendCommandListToWorkerOutsideTransactionWithConnection(connection, commandList); CloseConnection(connection); } +/* + * GetSetCommandListForNewConnections returns a list of SET statements to + * be executed in new connections to worker nodes. + */ +static List * +GetSetCommandListForNewConnections(void) +{ + List *commandList = NIL; + + struct config_generic **guc_vars = get_guc_variables(); + int gucCount = GetNumConfigOptions(); + + for (int gucIndex = 0; gucIndex < gucCount; gucIndex++) + { + struct config_generic *var = (struct config_generic *) guc_vars[gucIndex]; + if (var->source == PGC_S_SESSION && IsSettingSafeToPropagate(var->name)) + { + const char *variableValue = GetConfigOption(var->name, true, true); + commandList = lappend(commandList, psprintf("SET LOCAL %s TO '%s';", + var->name, variableValue)); + } + } + + return commandList; +} + + /* * RebalancePlacementUpdates returns a list of placement updates which makes the * cluster balanced. We move shards to these nodes until all nodes become utilized. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index a5868dc31..42822470e 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1985,6 +1985,17 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.propagate_session_settings_for_loopback_connection", + gettext_noop( + "When enabled, rebalancer propagates all the allowed GUC settings to new connections."), + NULL, + &PropagateSessionSettingsForLoopbackConnection, + true, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomEnumVariable( "citus.propagate_set_commands", gettext_noop("Sets which SET commands are propagated to workers."), @@ -2366,6 +2377,7 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + /* warn about config items in the citus namespace that are not registered above */ EmitWarningsOnPlaceholders("citus"); diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 93d4f6bfd..02e7441c7 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -70,4 +70,7 @@ extern bool EvaluateSingleQueryResult(MultiConnection *connection, PGresult *que StringInfo queryResultString); extern void StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString); +extern bool IsSettingSafeToPropagate(const char *name); + + #endif /* REMOTE_COMMAND_H */ diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index ac4864422..90f73e2f3 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -186,8 +186,10 @@ typedef struct RebalancePlanFunctions void *context; } RebalancePlanFunctions; +extern char *VariablesToBePassedToNewConnections; extern int MaxRebalancerLoggedIgnoredMoves; extern bool RunningUnderIsolationTest; +extern bool PropagateSessionSettingsForLoopbackConnection; /* External function declarations */ extern Datum shard_placement_rebalance_array(PG_FUNCTION_ARGS); diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 9cdc0d4a3..2efc70676 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -178,7 +178,6 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running (1 row) -SET citus.shard_replication_factor TO 2; SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes'); ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: ALTER SYSTEM RESET citus.local_hostname; @@ -195,17 +194,106 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running (1 row) -- replicate reference table should ignore the coordinator -SET citus.shard_replication_factor TO 2; -SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes'); +SET citus.node_connection_timeout to 60; +BEGIN; + SET LOCAL citus.shard_replication_factor TO 2; + SET citus.log_remote_commands TO ON; + SET SESSION citus.max_adaptive_executor_pool_size TO 5; + SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes'); +NOTICE: issuing CALL citus_cleanup_orphaned_shards() +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL application_name TO citus_rebalancer; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.max_adaptive_executor_pool_size TO '5'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.next_shard_id TO '433105'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.node_connection_timeout TO '60'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.shard_count TO '4'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT citus_copy_shard_placement(433101,'localhost',57637,'localhost',57638,'block_writes') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL application_name TO citus_rebalancer; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.max_adaptive_executor_pool_size TO '5'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.next_shard_id TO '433105'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.node_connection_timeout TO '60'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.shard_count TO '4'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT citus_copy_shard_placement(433102,'localhost',57638,'localhost',57637,'block_writes') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL application_name TO citus_rebalancer; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.max_adaptive_executor_pool_size TO '5'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.next_shard_id TO '433105'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.node_connection_timeout TO '60'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.shard_count TO '4'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT citus_copy_shard_placement(433103,'localhost',57637,'localhost',57638,'block_writes') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL application_name TO citus_rebalancer; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.max_adaptive_executor_pool_size TO '5'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.next_shard_id TO '433105'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.node_connection_timeout TO '60'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.shard_count TO '4'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2'; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT citus_copy_shard_placement(433104,'localhost',57638,'localhost',57637,'block_writes') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx replicate_table_shards --------------------------------------------------------------------- (1 row) +COMMIT; +RESET citus.node_connection_timeout; +RESET citus.log_remote_commands; DROP TABLE dist_table_test, dist_table_test_2, ref_table_test, postgres_table_test; RESET citus.shard_count; RESET citus.shard_replication_factor; diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index e5ef36b52..83f5b4f6d 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -3,6 +3,7 @@ -- SET citus.next_shard_id TO 433000; + CREATE TABLE ref_table_test(a int primary key); SELECT create_reference_table('ref_table_test'); CREATE TABLE dist_table_test(a int primary key); @@ -89,7 +90,6 @@ ALTER SYSTEM SET citus.local_hostname TO 'foobar'; SELECT pg_reload_conf(); SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC -SET citus.shard_replication_factor TO 2; SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes'); ALTER SYSTEM RESET citus.local_hostname; @@ -97,8 +97,16 @@ SELECT pg_reload_conf(); SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC -- replicate reference table should ignore the coordinator -SET citus.shard_replication_factor TO 2; -SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes'); +SET citus.node_connection_timeout to 60; +BEGIN; + SET LOCAL citus.shard_replication_factor TO 2; + SET citus.log_remote_commands TO ON; + SET SESSION citus.max_adaptive_executor_pool_size TO 5; + SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes'); +COMMIT; + +RESET citus.node_connection_timeout; +RESET citus.log_remote_commands; DROP TABLE dist_table_test, dist_table_test_2, ref_table_test, postgres_table_test; RESET citus.shard_count;