Add node_conninfo GUC and related logic

To support more flexible (i.e. not at compile-time) specification of
libpq connection parameters, this change adds a new GUC, node_conninfo,
which must be a space-separated string of key-value pairs suitable for
parsing by libpq's connection establishment methods.

To avoid rebuilding and parsing these values at connection time, this
change also adds a cache in front of the configuration params to permit
immediate use of any previously-calculated parameters.
pull/2191/head
Jason Petersen 2018-05-28 21:42:38 -06:00
parent a0651df574
commit 57b3f253c5
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
6 changed files with 490 additions and 85 deletions

View File

@ -9,6 +9,7 @@ OBJS = src/backend/distributed/shared_library_init.o \
src/backend/distributed/commands/drop_distributed_table.o \
src/backend/distributed/commands/multi_copy.o \
src/backend/distributed/commands/transmit.o \
src/backend/distributed/connection/connection_configuration.o \
src/backend/distributed/connection/connection_management.o \
src/backend/distributed/connection/placement_connection.o \
src/backend/distributed/connection/remote_commands.o \

View File

@ -0,0 +1,336 @@
/*-------------------------------------------------------------------------
*
* connection_configuration.c
* Functions for controlling configuration of Citus connections
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h"
#include "distributed/worker_manager.h"
#include "distributed/task_tracker.h"
#include "postmaster/postmaster.h"
#include "mb/pg_wchar.h"
#include "utils/builtins.h"
/* stores the string representation of our node connection GUC */
char *NodeConninfo = "";
/* represents a list of libpq parameter settings */
typedef struct ConnParamsInfo
{
char **keywords; /* libpq keywords */
char **values; /* desired values for above */
Size size; /* current used size of arrays */
Size maxSize; /* maximum allocated size of arrays (similar to e.g. StringInfo) */
} ConnParamsInfo;
/*
* Stores parsed global libpq parameter settings. static because all access
* is encapsulated in the other public functions in this file.
*/
static ConnParamsInfo ConnParams;
/* helper functions for processing connection info */
static Size CalculateMaxSize(void);
static int uri_prefix_length(const char *connstr);
/*
* InitConnParms initializes the ConnParams field to point to enough memory to
* store settings for every valid libpq value, though these regions are set to
* zeros from the outset and the size appropriately also set to zero.
*
* This function must be called before others in this file, though calling it
* after use of the initialized ConnParams structure will result in any
* populated parameter settings being lost.
*/
void
InitConnParams()
{
Size maxSize = CalculateMaxSize();
ConnParamsInfo connParams = {
.keywords = malloc(maxSize * sizeof(char *)),
.values = malloc(maxSize * sizeof(char *)),
.size = 0,
.maxSize = maxSize
};
memset(connParams.keywords, 0, maxSize * sizeof(char *));
memset(connParams.values, 0, maxSize * sizeof(char *));
ConnParams = connParams;
}
/*
* ResetConnParams frees all strings in the keywords and parameters arrays,
* sets their elements to null, and resets the ConnParamsSize to zero before
* adding back any hardcoded global connection settings (at present, only the
* fallback_application_name of 'citus').
*/
void
ResetConnParams()
{
Index paramIdx = 0;
for (paramIdx = 0; paramIdx < ConnParams.size; paramIdx++)
{
free((void *) ConnParams.keywords[paramIdx]);
free((void *) ConnParams.values[paramIdx]);
ConnParams.keywords[paramIdx] = ConnParams.values[paramIdx] = NULL;
}
ConnParams.size = 0;
AddConnParam("fallback_application_name", "citus");
}
/*
* AddConnParam adds a parameter setting to the global libpq settings according
* to the provided keyword and value. Under assert-enabled builds, array bounds
* checking is performed.
*/
void
AddConnParam(const char *keyword, const char *value)
{
Assert((ConnParams.size + 1) < ConnParams.maxSize);
ConnParams.keywords[ConnParams.size] = strdup(keyword);
ConnParams.values[ConnParams.size] = strdup(value);
ConnParams.size++;
ConnParams.keywords[ConnParams.size] = ConnParams.values[ConnParams.size] = NULL;
}
/*
* CheckConninfo is a building block to help implement check constraints and
* other check hooks against libpq-like conninfo strings. In particular, the
* provided conninfo must:
*
* - Not use a uri-prefix such as postgres:// (it must be only keys and values)
* - Parse using PQconninfoParse
* - Only set keywords contained in the provided whitelist
*
* This function returns true if all of the above are satisfied, otherwise it
* returns false. If the provided errmsg pointer is not NULL, it will be set
* to an appropriate message if the check fails.
*
* The provided whitelist must be sorted in a manner usable by bsearch, though
* this is only validated during assert-enabled builds.
*/
bool
CheckConninfo(const char *conninfo, const char **whitelist,
Size whitelistLength, char **errorMsg)
{
PQconninfoOption *optionArray = NULL;
PQconninfoOption *option = NULL;
Index whitelistIdx PG_USED_FOR_ASSERTS_ONLY = 0;
char *errorMsgString = NULL;
/*
* If the user doesn't need a message, just overwrite errmsg with a stack
* variable so we can always safely write to it.
*/
if (errorMsg == NULL)
{
errorMsg = &errorMsgString;
}
/* sure, it can be null */
if (conninfo == NULL)
{
return true;
}
/* the libpq prefix form is more complex than we need; ban it */
if (uri_prefix_length(conninfo) != 0)
{
*errorMsg = "Citus connection info strings must be in "
"'k1=v1 k2=v2 [...] kn=vn' format";
return false;
}
/* this should at least parse */
optionArray = PQconninfoParse(conninfo, NULL);
if (optionArray == NULL)
{
*errorMsg = "Provided string is not a valid libpq connection info string";
return false;
}
#ifdef USE_ASSERT_CHECKING
/* verify that the whitelist is in ascending order */
for (whitelistIdx = 1; whitelistIdx < whitelistLength; whitelistIdx++)
{
const char *prev = whitelist[whitelistIdx - 1];
const char *curr = whitelist[whitelistIdx];
AssertArg(strcmp(prev, curr) < 0);
}
#endif
for (option = optionArray; option->keyword != NULL; option++)
{
void *matchingKeyword = NULL;
if (option->val == NULL || option->val[0] == '\0')
{
continue;
}
matchingKeyword = bsearch(&option->keyword, whitelist, whitelistLength,
sizeof(char *), pg_qsort_strcmp);
if (matchingKeyword == NULL)
{
/* the whitelist lacks this keyword; error out! */
StringInfoData msgString;
initStringInfo(&msgString);
appendStringInfo(&msgString, "Prohibited conninfo keyword detected: %s",
option->keyword);
*errorMsg = msgString.data;
break;
}
}
PQconninfoFree(optionArray);
/* if error message is set we found an invalid keyword */
return (*errorMsg == NULL);
}
/*
* GetConnParams uses the provided key to determine libpq parameters needed to
* establish a connection using that key. The keywords and values are placed in
* the like-named out parameters. All parameter strings are allocated in the
* context provided by the caller, to save the caller needing to copy strings
* into an appropriate context later.
*/
void
GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
MemoryContext context)
{
/* make space for the port as a string: sign, 10 digits, NUL */
char *nodePortString = MemoryContextAlloc(context, 12 * sizeof(char *));
/*
* This function has three sections:
* - Initialize the keywords and values with copies of global parameters
* - Append user/host-specific parameters calculated from the given key
* - (Enterprise-only) append user/host-specific authentication params
*
* The global parameters have already been assigned from a GUC, so begin by
* calculating the key-specific parameters (basically just the fields of
* the key and the active database encoding).
*/
const char *runtimeKeywords[] = {
"host", "port", "dbname", "user", "client_encoding"
};
const char *runtimeValues[] = {
MemoryContextStrdup(context, key->hostname),
nodePortString,
MemoryContextStrdup(context, key->database),
MemoryContextStrdup(context, key->user),
GetDatabaseEncodingName()
};
/*
* Declare local params for readability; we'll assign to outparams at end.
* Need to zero enough space for all possible libpq parameters.
*/
char **connKeywords = MemoryContextAllocZero(context, ConnParams.maxSize *
sizeof(char *));
char **connValues = MemoryContextAllocZero(context, ConnParams.maxSize *
sizeof(char *));
/* auth keywords will begin after global and runtime ones are appended */
Index authParamsIdx = ConnParams.size + lengthof(runtimeKeywords);
pg_ltoa(key->port, nodePortString); /* populate node port string with port */
/* first step: copy global parameters to beginning of array */
memcpy(connKeywords, ConnParams.keywords, ConnParams.size * sizeof(char *));
memcpy(connValues, ConnParams.values, ConnParams.size * sizeof(char *));
/* second step: begin at end of global params and copy runtime ones */
memcpy(&connKeywords[ConnParams.size], runtimeKeywords, sizeof(runtimeKeywords));
memcpy(&connValues[ConnParams.size], runtimeValues, sizeof(runtimeValues));
/* final step: add terminal NULL, required by libpq */
connKeywords[authParamsIdx] = connValues[authParamsIdx] = NULL;
*keywords = connKeywords;
*values = connValues;
}
/*
* CalculateMaxSize simply counts the number of elements returned by
* PQconnDefaults, including the final NULL. This helps us know how space would
* be used if a connection utilizes every known libpq parameter.
*/
static Size
CalculateMaxSize()
{
PQconninfoOption *defaults = PQconndefaults();
PQconninfoOption *option = NULL;
Size maxSize = 0;
for (option = defaults;
option->keyword != NULL;
option++, maxSize++)
{
/* do nothing, we're just counting the elements */
}
PQconninfoFree(defaults);
/* we've counted elements but libpq needs a final NULL, so add one */
maxSize++;
return maxSize;
}
/* *INDENT-OFF* */
/*
* Checks if connection string starts with either of the valid URI prefix
* designators.
*
* Returns the URI prefix length, 0 if the string doesn't contain a URI prefix.
*
* This implementation (mostly) taken from libpq/fe-connect.c.
*/
static int
uri_prefix_length(const char *connstr)
{
const char uri_designator[] = "postgresql://";
const char short_uri_designator[] = "postgres://";
if (strncmp(connstr, uri_designator,
sizeof(uri_designator) - 1) == 0)
return sizeof(uri_designator) - 1;
if (strncmp(connstr, short_uri_designator,
sizeof(short_uri_designator) - 1) == 0)
return sizeof(short_uri_designator) - 1;
return 0;
}
/* *INDENT-ON* */

View File

@ -33,11 +33,10 @@
int NodeConnectionTimeout = 5000;
int CitusSSLMode = CITUS_SSL_MODE_PREFER;
HTAB *ConnectionHash = NULL;
HTAB *ConnParamsHash = NULL;
MemoryContext ConnectionContext = NULL;
static uint32 ConnectionHashHash(const void *key, Size keysize);
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key);
@ -55,10 +54,9 @@ static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
void
InitializeConnectionManagement(void)
{
HASHCTL info;
HASHCTL info, connParamsInfo;
uint32 hashFlags = 0;
/*
* Create a single context for connection and transaction related memory
* management. Doing so, instead of allocating in TopMemoryContext, makes
@ -79,8 +77,14 @@ InitializeConnectionManagement(void)
info.hcxt = ConnectionContext;
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
memcpy(&connParamsInfo, &info, sizeof(HASHCTL));
connParamsInfo.entrysize = sizeof(ConnParamsHashEntry);
ConnectionHash = hash_create("citus connection cache (host,port,user,database)",
64, &info, hashFlags);
ConnParamsHash = hash_create("citus connparams cache (host,port,user,database)",
64, &connParamsInfo, hashFlags);
}
@ -635,30 +639,29 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize)
static MultiConnection *
StartConnectionEstablishment(ConnectionHashKey *key)
{
char nodePortString[12];
const char *clientEncoding = GetDatabaseEncodingName();
bool found = false;
MultiConnection *connection = NULL;
const char *sslmode = CitusSSLModeString();
ConnParamsHashEntry *entry = NULL;
const char *keywords[] = {
"host", "port", "dbname", "user", "sslmode",
"client_encoding", "fallback_application_name",
NULL
};
const char *values[] = {
key->hostname, nodePortString, key->database, key->user, sslmode,
clientEncoding, "citus", NULL
};
/* search our cache for precomputed connection settings */
entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found);
if (!found)
{
/* if they're not found, compute them from GUC, runtime, etc. */
GetConnParams(key, &entry->keywords, &entry->values, ConnectionContext);
}
connection = MemoryContextAllocZero(ConnectionContext, sizeof(MultiConnection));
sprintf(nodePortString, "%d", key->port);
strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH);
connection->port = key->port;
strlcpy(connection->database, key->database, NAMEDATALEN);
strlcpy(connection->user, key->user, NAMEDATALEN);
connection->pgConn = PQconnectStartParams(keywords, values, false);
connection->pgConn = PQconnectStartParams((const char **) entry->keywords,
(const char **) entry->values,
false);
connection->connectionStart = GetCurrentTimestamp();
/*
@ -674,52 +677,6 @@ StartConnectionEstablishment(ConnectionHashKey *key)
}
/*
* CitusSSLModeString returns the current value of citus.sslmode.
*/
char *
CitusSSLModeString(void)
{
switch (CitusSSLMode)
{
case CITUS_SSL_MODE_DISABLE:
{
return "disable";
}
case CITUS_SSL_MODE_ALLOW:
{
return "allow";
}
case CITUS_SSL_MODE_PREFER:
{
return "prefer";
}
case CITUS_SSL_MODE_REQUIRE:
{
return "require";
}
case CITUS_SSL_MODE_VERIFY_CA:
{
return "verify-ca";
}
case CITUS_SSL_MODE_VERIFY_FULL:
{
return "verify-full";
}
default:
{
ereport(ERROR, (errmsg("unrecognized value for citus.sslmode")));
}
}
}
/*
* Close all remote connections if necessary anymore (i.e. not session
* lifetime), or if in a failed state.

View File

@ -67,6 +67,8 @@ static void RegisterCitusConfigVariables(void);
static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra,
GucSource source);
static void NormalizeWorkerListPath(void);
static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source);
static void NodeConninfoGucAssignHook(const char *newval, void *extra);
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
source);
@ -74,6 +76,19 @@ static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSour
static bool ExpireCachedShards = false;
static int LargeTableShardCount = 0;
/*
* SSL modes available for connecting to worker nodes.
*/
enum CitusSSLMode
{
CITUS_SSL_MODE_DISABLE = 1 << 0,
CITUS_SSL_MODE_ALLOW = 1 << 1,
CITUS_SSL_MODE_PREFER = 1 << 2,
CITUS_SSL_MODE_REQUIRE = 1 << 3,
CITUS_SSL_MODE_VERIFY_CA = 1 << 4,
CITUS_SSL_MODE_VERIFY_FULL = 1 << 5
};
static int CitusSSLMode = CITUS_SSL_MODE_PREFER;
/* *INDENT-OFF* */
/* GUC enum definitions */
@ -184,6 +199,8 @@ _PG_init(void)
CreateRequiredDirectories();
}
InitConnParams();
/*
* Register Citus configuration variables. Do so before intercepting
* hooks or calling initialization functions, in case we want to do the
@ -329,15 +346,13 @@ RegisterCitusConfigVariables(void)
DefineCustomEnumVariable(
"citus.sslmode",
gettext_noop("SSL mode to use for connections to worker nodes."),
gettext_noop("When connecting to a worker node, specify whether the SSL mode"
"mode for the connection is 'disable', 'allow', 'prefer' "
"(the default), 'require', 'verify-ca' or 'verify-full'."),
gettext_noop("This GUC variable has been deprecated."),
NULL,
&CitusSSLMode,
CITUS_SSL_MODE_PREFER,
citus_ssl_mode_options,
PGC_POSTMASTER,
GUC_SUPERUSER_ONLY,
GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
@ -926,6 +941,20 @@ RegisterCitusConfigVariables(void)
&StatisticsCollectionGucCheckHook,
NULL, NULL);
DefineCustomStringVariable(
"citus.node_conninfo",
gettext_noop("Sets parameters used for outbound connections."),
NULL,
&NodeConninfo,
"sslmode=prefer",
PGC_POSTMASTER,
GUC_SUPERUSER_ONLY,
NodeConninfoGucCheckHook,
NodeConninfoGucAssignHook,
NULL);
NormalizeWorkerListPath();
/* warn about config items in the citus namespace that are not registered above */
EmitWarningsOnPlaceholders("citus");
}
@ -997,6 +1026,87 @@ NormalizeWorkerListPath(void)
}
/*
* NodeConninfoGucCheckHook ensures conninfo settings are in the expected form
* and that the keywords of all non-null settings are on a whitelist devised to
* keep users from setting options that may result in confusion.
*/
static bool
NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source)
{
/* this array _must_ be kept in an order usable by bsearch */
const char *whitelist[] = {
"application_name",
"connect_timeout",
#if defined(ENABLE_GSS) && defined(ENABLE_SSPI)
"gsslib",
#endif
"keepalives",
"keepalives_count",
"keepalives_idle",
"keepalives_interval",
#if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
"krbsrvname",
#endif
"sslcompression",
"sslcrl",
"sslmode",
"sslrootcert"
};
char *errorMsg = NULL;
bool conninfoValid = CheckConninfo(*newval, whitelist, lengthof(whitelist),
&errorMsg);
if (!conninfoValid)
{
GUC_check_errdetail("%s", errorMsg);
}
return conninfoValid;
}
/*
* NodeConninfoGucAssignHook is the assignment hook for the node_conninfo GUC
* variable. Though this GUC is a "string", we actually parse it as a non-URI
* PQconninfo key/value setting, storing the resultant PQconninfoOption values
* using the public functions in connection_configuration.c.
*/
static void
NodeConninfoGucAssignHook(const char *newval, void *extra)
{
PQconninfoOption *optionArray = NULL;
PQconninfoOption *option = NULL;
if (newval == NULL)
{
newval = "";
}
optionArray = PQconninfoParse(newval, NULL);
if (optionArray == NULL)
{
ereport(FATAL, (errmsg("cannot parse node_conninfo value"),
errdetail("The GUC check hook should prevent "
"all malformed values.")));
}
ResetConnParams();
for (option = optionArray; option->keyword != NULL; option++)
{
if (option->val == NULL || option->val[0] == '\0')
{
continue;
}
AddConnParam(option->keyword, option->val);
}
PQconninfoFree(optionArray);
}
static bool
StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
{

View File

@ -9,7 +9,6 @@
#include "stdint.h"
#include "postgres.h"
#include "miscadmin.h"
#include "access/genam.h"

View File

@ -113,26 +113,21 @@ typedef struct ConnectionHashEntry
dlist_head *connections;
} ConnectionHashEntry;
/*
* SSL modes available for connecting to worker nodes.
*/
enum CitusSSLMode
/* hash entry for cached connection parameters */
typedef struct ConnParamsHashEntry
{
CITUS_SSL_MODE_DISABLE = 1 << 0,
CITUS_SSL_MODE_ALLOW = 1 << 1,
CITUS_SSL_MODE_PREFER = 1 << 2,
CITUS_SSL_MODE_REQUIRE = 1 << 3,
CITUS_SSL_MODE_VERIFY_CA = 1 << 4,
CITUS_SSL_MODE_VERIFY_FULL = 1 << 5
};
ConnectionHashKey key;
char **keywords;
char **values;
} ConnParamsHashEntry;
/* SSL mode to use when connecting to worker nodes */
extern int CitusSSLMode;
/* maximum duration to wait for connection */
extern int NodeConnectionTimeout;
/* parameters used for outbound connections */
extern char *NodeConninfo;
/* the hash table */
extern HTAB *ConnectionHash;
@ -143,6 +138,14 @@ extern struct MemoryContextData *ConnectionContext;
extern void AfterXactConnectionHandling(bool isCommit);
extern void InitializeConnectionManagement(void);
extern void InitConnParams(void);
extern void ResetConnParams(void);
extern void AddConnParam(const char *keyword, const char *value);
extern void GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
MemoryContext context);
extern bool CheckConninfo(const char *conninfo, const char **whitelist,
Size whitelistLength, char **errmsg);
/* Low-level connection establishment APIs */
extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname,
@ -157,7 +160,6 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
int32 port,
const char *user,
const char *database);
extern char * CitusSSLModeString(void);
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
extern void CloseConnection(MultiConnection *connection);
extern void ShutdownConnection(MultiConnection *connection);