Introduce a new GUC to propagate local settings to new connections in rebalancer (#6396)

DESCRIPTION: Introduce
```citus.propagate_session_settings_for_loopback_connection``` GUC to
propagate local settings to new connections.

Fixes: #5289
pull/6438/head
Gokhan Gulbiz 2022-10-18 12:50:30 +03:00 committed by GitHub
parent 60eb67b908
commit e87eda6496
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 207 additions and 50 deletions

View File

@ -25,10 +25,6 @@
#include "utils/varlena.h"
#include "distributed/remote_commands.h"
static bool IsSettingSafeToPropagate(char *name);
/*
* ShouldPropagateSetCommand determines whether a SET or RESET command should be
* propagated to the workers.
@ -80,37 +76,6 @@ ShouldPropagateSetCommand(VariableSetStmt *setStmt)
}
/*
* IsSettingSafeToPropagate returns whether a SET LOCAL is safe to propagate.
*
* We exclude settings that are highly specific to the client or session and also
* ban propagating the citus.propagate_set_commands setting (not for correctness,
* more to avoid confusion).
*/
static bool
IsSettingSafeToPropagate(char *name)
{
/* if this list grows considerably we should switch to bsearch */
const char *skipSettings[] = {
"application_name",
"citus.propagate_set_commands",
"client_encoding",
"exit_on_error",
"max_stack_depth"
};
for (Index settingIndex = 0; settingIndex < lengthof(skipSettings); settingIndex++)
{
if (pg_strcasecmp(skipSettings[settingIndex], name) == 0)
{
return false;
}
}
return true;
}
/*
* PostprocessVariableSetStmt actually does the work of propagating a provided SET stmt
* to currently-participating worker nodes and adding the SET command test to a string

View File

@ -1206,3 +1206,34 @@ StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString)
appendStringInfo(queryResultString, "%s", errorMessage);
}
/*
* IsSettingSafeToPropagate returns whether a SET LOCAL is safe to propagate.
*
* We exclude settings that are highly specific to the client or session and also
* ban propagating the citus.propagate_set_commands setting (not for correctness,
* more to avoid confusion).
*/
bool
IsSettingSafeToPropagate(const char *name)
{
/* if this list grows considerably we should switch to bsearch */
const char *skipSettings[] = {
"application_name",
"citus.propagate_set_commands",
"client_encoding",
"exit_on_error",
"max_stack_depth"
};
for (Index settingIndex = 0; settingIndex < lengthof(skipSettings); settingIndex++)
{
if (pg_strcasecmp(skipSettings[settingIndex], name) == 0)
{
return false;
}
}
return true;
}

View File

@ -187,7 +187,11 @@ citus_cleanup_orphaned_resources(PG_FUNCTION_ARGS)
void
DropOrphanedShardsInSeparateTransaction(void)
{
ExecuteRebalancerCommandInSeparateTransaction("CALL citus_cleanup_orphaned_shards()");
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName,
PostPortNumber);
ExecuteCriticalRemoteCommand(connection, "CALL citus_cleanup_orphaned_shards()");
CloseConnection(connection);
}

View File

@ -64,7 +64,9 @@
#include "utils/pg_lsn.h"
#include "utils/syscache.h"
#include "common/hashfn.h"
#include "utils/varlena.h"
#include "utils/guc_tables.h"
#include "distributed/commands/utility_hook.h"
/* RebalanceOptions are the options used to control the rebalance algorithm */
typedef struct RebalanceOptions
@ -189,6 +191,7 @@ typedef struct WorkerShardStatistics
HTAB *statistics;
} WorkerShardStatistics;
char *VariablesToBePassedToNewConnections = NULL;
/* static declarations for main logic */
static int ShardActivePlacementCount(HTAB *activePlacementsHash, uint64 shardId,
@ -258,7 +261,7 @@ static void AddToWorkerShardIdSet(HTAB *shardsByWorker, char *workerName, int wo
uint64 shardId);
static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics);
static void ErrorOnConcurrentRebalance(RebalanceOptions *);
static List * GetSetCommandListForNewConnections(void);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(rebalance_table_shards);
@ -276,6 +279,7 @@ PG_FUNCTION_INFO_V1(citus_rebalance_wait);
bool RunningUnderIsolationTest = false;
int MaxRebalancerLoggedIgnoredMoves = 5;
bool PropagateSessionSettingsForLoopbackConnection = false;
static const char *PlacementUpdateTypeNames[] = {
[PLACEMENT_UPDATE_INVALID_FIRST] = "unknown",
@ -2069,9 +2073,10 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent,
/*
* ExecuteCriticalCommandInSeparateTransaction runs a command in a separate
* ExecuteRebalancerCommandInSeparateTransaction runs a command in a separate
* transaction that is commited right away. This is useful for things that you
* don't want to rollback when the current transaction is rolled back.
* Set true to 'useExclusiveTransactionBlock' to initiate a BEGIN and COMMIT statements.
*/
void
ExecuteRebalancerCommandInSeparateTransaction(char *command)
@ -2079,17 +2084,56 @@ ExecuteRebalancerCommandInSeparateTransaction(char *command)
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName,
PostPortNumber);
StringInfo setApplicationName = makeStringInfo();
appendStringInfo(setApplicationName, "SET application_name TO %s",
CITUS_REBALANCER_NAME);
List *commandList = NIL;
ExecuteCriticalRemoteCommand(connection, setApplicationName->data);
ExecuteCriticalRemoteCommand(connection, command);
commandList = lappend(commandList, psprintf("SET LOCAL application_name TO %s;",
CITUS_REBALANCER_NAME));
if (PropagateSessionSettingsForLoopbackConnection)
{
List *setCommands = GetSetCommandListForNewConnections();
char *setCommand = NULL;
foreach_ptr(setCommand, setCommands)
{
commandList = lappend(commandList, setCommand);
}
}
commandList = lappend(commandList, command);
SendCommandListToWorkerOutsideTransactionWithConnection(connection, commandList);
CloseConnection(connection);
}
/*
* GetSetCommandListForNewConnections returns a list of SET statements to
* be executed in new connections to worker nodes.
*/
static List *
GetSetCommandListForNewConnections(void)
{
List *commandList = NIL;
struct config_generic **guc_vars = get_guc_variables();
int gucCount = GetNumConfigOptions();
for (int gucIndex = 0; gucIndex < gucCount; gucIndex++)
{
struct config_generic *var = (struct config_generic *) guc_vars[gucIndex];
if (var->source == PGC_S_SESSION && IsSettingSafeToPropagate(var->name))
{
const char *variableValue = GetConfigOption(var->name, true, true);
commandList = lappend(commandList, psprintf("SET LOCAL %s TO '%s';",
var->name, variableValue));
}
}
return commandList;
}
/*
* RebalancePlacementUpdates returns a list of placement updates which makes the
* cluster balanced. We move shards to these nodes until all nodes become utilized.

View File

@ -1985,6 +1985,17 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.propagate_session_settings_for_loopback_connection",
gettext_noop(
"When enabled, rebalancer propagates all the allowed GUC settings to new connections."),
NULL,
&PropagateSessionSettingsForLoopbackConnection,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.propagate_set_commands",
gettext_noop("Sets which SET commands are propagated to workers."),
@ -2366,6 +2377,7 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
/* warn about config items in the citus namespace that are not registered above */
EmitWarningsOnPlaceholders("citus");

View File

@ -70,4 +70,7 @@ extern bool EvaluateSingleQueryResult(MultiConnection *connection, PGresult *que
StringInfo queryResultString);
extern void StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString);
extern bool IsSettingSafeToPropagate(const char *name);
#endif /* REMOTE_COMMAND_H */

View File

@ -186,8 +186,10 @@ typedef struct RebalancePlanFunctions
void *context;
} RebalancePlanFunctions;
extern char *VariablesToBePassedToNewConnections;
extern int MaxRebalancerLoggedIgnoredMoves;
extern bool RunningUnderIsolationTest;
extern bool PropagateSessionSettingsForLoopbackConnection;
/* External function declarations */
extern Datum shard_placement_rebalance_array(PG_FUNCTION_ARGS);

View File

@ -178,7 +178,6 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running
(1 row)
SET citus.shard_replication_factor TO 2;
SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes');
ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: <system specific error>
ALTER SYSTEM RESET citus.local_hostname;
@ -195,17 +194,106 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running
(1 row)
-- replicate reference table should ignore the coordinator
SET citus.shard_replication_factor TO 2;
SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes');
SET citus.node_connection_timeout to 60;
BEGIN;
SET LOCAL citus.shard_replication_factor TO 2;
SET citus.log_remote_commands TO ON;
SET SESSION citus.max_adaptive_executor_pool_size TO 5;
SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes');
NOTICE: issuing CALL citus_cleanup_orphaned_shards()
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL application_name TO citus_rebalancer;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.max_adaptive_executor_pool_size TO '5';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.next_shard_id TO '433105';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.node_connection_timeout TO '60';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(433101,'localhost',57637,'localhost',57638,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL application_name TO citus_rebalancer;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.max_adaptive_executor_pool_size TO '5';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.next_shard_id TO '433105';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.node_connection_timeout TO '60';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(433102,'localhost',57638,'localhost',57637,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL application_name TO citus_rebalancer;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.max_adaptive_executor_pool_size TO '5';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.next_shard_id TO '433105';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.node_connection_timeout TO '60';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(433103,'localhost',57637,'localhost',57638,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL application_name TO citus_rebalancer;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.max_adaptive_executor_pool_size TO '5';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.next_shard_id TO '433105';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.node_connection_timeout TO '60';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(433104,'localhost',57638,'localhost',57637,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
replicate_table_shards
---------------------------------------------------------------------
(1 row)
COMMIT;
RESET citus.node_connection_timeout;
RESET citus.log_remote_commands;
DROP TABLE dist_table_test, dist_table_test_2, ref_table_test, postgres_table_test;
RESET citus.shard_count;
RESET citus.shard_replication_factor;

View File

@ -3,6 +3,7 @@
--
SET citus.next_shard_id TO 433000;
CREATE TABLE ref_table_test(a int primary key);
SELECT create_reference_table('ref_table_test');
CREATE TABLE dist_table_test(a int primary key);
@ -89,7 +90,6 @@ ALTER SYSTEM SET citus.local_hostname TO 'foobar';
SELECT pg_reload_conf();
SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC
SET citus.shard_replication_factor TO 2;
SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes');
ALTER SYSTEM RESET citus.local_hostname;
@ -97,8 +97,16 @@ SELECT pg_reload_conf();
SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC
-- replicate reference table should ignore the coordinator
SET citus.shard_replication_factor TO 2;
SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes');
SET citus.node_connection_timeout to 60;
BEGIN;
SET LOCAL citus.shard_replication_factor TO 2;
SET citus.log_remote_commands TO ON;
SET SESSION citus.max_adaptive_executor_pool_size TO 5;
SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes');
COMMIT;
RESET citus.node_connection_timeout;
RESET citus.log_remote_commands;
DROP TABLE dist_table_test, dist_table_test_2, ref_table_test, postgres_table_test;
RESET citus.shard_count;