From 57b3f253c5c042187038f3e5722c28a8759297ef Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Mon, 28 May 2018 21:42:38 -0600 Subject: [PATCH] Add node_conninfo GUC and related logic To support more flexible (i.e. not at compile-time) specification of libpq connection parameters, this change adds a new GUC, node_conninfo, which must be a space-separated string of key-value pairs suitable for parsing by libpq's connection establishment methods. To avoid rebuilding and parsing these values at connection time, this change also adds a cache in front of the configuration params to permit immediate use of any previously-calculated parameters. --- Makefile | 1 + .../connection/connection_configuration.c | 336 ++++++++++++++++++ .../connection/connection_management.c | 85 ++--- src/backend/distributed/shared_library_init.c | 120 ++++++- .../distributed/utils/metadata_cache.c | 1 - .../distributed/connection_management.h | 32 +- 6 files changed, 490 insertions(+), 85 deletions(-) create mode 100644 src/backend/distributed/connection/connection_configuration.c diff --git a/Makefile b/Makefile index f22fdf9da..1521977de 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,7 @@ OBJS = src/backend/distributed/shared_library_init.o \ src/backend/distributed/commands/drop_distributed_table.o \ src/backend/distributed/commands/multi_copy.o \ src/backend/distributed/commands/transmit.o \ + src/backend/distributed/connection/connection_configuration.o \ src/backend/distributed/connection/connection_management.o \ src/backend/distributed/connection/placement_connection.o \ src/backend/distributed/connection/remote_commands.o \ diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c new file mode 100644 index 000000000..532d03c82 --- /dev/null +++ b/src/backend/distributed/connection/connection_configuration.c @@ -0,0 +1,336 @@ +/*------------------------------------------------------------------------- + * + * connection_configuration.c + * Functions for controlling configuration of Citus connections + * + * Copyright (c) 2018, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/connection_management.h" +#include "distributed/metadata_cache.h" +#include "distributed/worker_manager.h" +#include "distributed/task_tracker.h" +#include "postmaster/postmaster.h" +#include "mb/pg_wchar.h" +#include "utils/builtins.h" + +/* stores the string representation of our node connection GUC */ +char *NodeConninfo = ""; + +/* represents a list of libpq parameter settings */ +typedef struct ConnParamsInfo +{ + char **keywords; /* libpq keywords */ + char **values; /* desired values for above */ + Size size; /* current used size of arrays */ + Size maxSize; /* maximum allocated size of arrays (similar to e.g. StringInfo) */ +} ConnParamsInfo; + +/* + * Stores parsed global libpq parameter settings. static because all access + * is encapsulated in the other public functions in this file. + */ +static ConnParamsInfo ConnParams; + +/* helper functions for processing connection info */ +static Size CalculateMaxSize(void); +static int uri_prefix_length(const char *connstr); + +/* + * InitConnParms initializes the ConnParams field to point to enough memory to + * store settings for every valid libpq value, though these regions are set to + * zeros from the outset and the size appropriately also set to zero. + * + * This function must be called before others in this file, though calling it + * after use of the initialized ConnParams structure will result in any + * populated parameter settings being lost. + */ +void +InitConnParams() +{ + Size maxSize = CalculateMaxSize(); + ConnParamsInfo connParams = { + .keywords = malloc(maxSize * sizeof(char *)), + .values = malloc(maxSize * sizeof(char *)), + .size = 0, + .maxSize = maxSize + }; + + memset(connParams.keywords, 0, maxSize * sizeof(char *)); + memset(connParams.values, 0, maxSize * sizeof(char *)); + + ConnParams = connParams; +} + + +/* + * ResetConnParams frees all strings in the keywords and parameters arrays, + * sets their elements to null, and resets the ConnParamsSize to zero before + * adding back any hardcoded global connection settings (at present, only the + * fallback_application_name of 'citus'). + */ +void +ResetConnParams() +{ + Index paramIdx = 0; + + for (paramIdx = 0; paramIdx < ConnParams.size; paramIdx++) + { + free((void *) ConnParams.keywords[paramIdx]); + free((void *) ConnParams.values[paramIdx]); + + ConnParams.keywords[paramIdx] = ConnParams.values[paramIdx] = NULL; + } + + ConnParams.size = 0; + + AddConnParam("fallback_application_name", "citus"); +} + + +/* + * AddConnParam adds a parameter setting to the global libpq settings according + * to the provided keyword and value. Under assert-enabled builds, array bounds + * checking is performed. + */ +void +AddConnParam(const char *keyword, const char *value) +{ + Assert((ConnParams.size + 1) < ConnParams.maxSize); + + ConnParams.keywords[ConnParams.size] = strdup(keyword); + ConnParams.values[ConnParams.size] = strdup(value); + ConnParams.size++; + + ConnParams.keywords[ConnParams.size] = ConnParams.values[ConnParams.size] = NULL; +} + + +/* + * CheckConninfo is a building block to help implement check constraints and + * other check hooks against libpq-like conninfo strings. In particular, the + * provided conninfo must: + * + * - Not use a uri-prefix such as postgres:// (it must be only keys and values) + * - Parse using PQconninfoParse + * - Only set keywords contained in the provided whitelist + * + * This function returns true if all of the above are satisfied, otherwise it + * returns false. If the provided errmsg pointer is not NULL, it will be set + * to an appropriate message if the check fails. + * + * The provided whitelist must be sorted in a manner usable by bsearch, though + * this is only validated during assert-enabled builds. + */ +bool +CheckConninfo(const char *conninfo, const char **whitelist, + Size whitelistLength, char **errorMsg) +{ + PQconninfoOption *optionArray = NULL; + PQconninfoOption *option = NULL; + Index whitelistIdx PG_USED_FOR_ASSERTS_ONLY = 0; + char *errorMsgString = NULL; + + /* + * If the user doesn't need a message, just overwrite errmsg with a stack + * variable so we can always safely write to it. + */ + if (errorMsg == NULL) + { + errorMsg = &errorMsgString; + } + + /* sure, it can be null */ + if (conninfo == NULL) + { + return true; + } + + /* the libpq prefix form is more complex than we need; ban it */ + if (uri_prefix_length(conninfo) != 0) + { + *errorMsg = "Citus connection info strings must be in " + "'k1=v1 k2=v2 [...] kn=vn' format"; + + return false; + } + + /* this should at least parse */ + optionArray = PQconninfoParse(conninfo, NULL); + if (optionArray == NULL) + { + *errorMsg = "Provided string is not a valid libpq connection info string"; + + return false; + } + +#ifdef USE_ASSERT_CHECKING + + /* verify that the whitelist is in ascending order */ + for (whitelistIdx = 1; whitelistIdx < whitelistLength; whitelistIdx++) + { + const char *prev = whitelist[whitelistIdx - 1]; + const char *curr = whitelist[whitelistIdx]; + + AssertArg(strcmp(prev, curr) < 0); + } +#endif + + for (option = optionArray; option->keyword != NULL; option++) + { + void *matchingKeyword = NULL; + + if (option->val == NULL || option->val[0] == '\0') + { + continue; + } + + matchingKeyword = bsearch(&option->keyword, whitelist, whitelistLength, + sizeof(char *), pg_qsort_strcmp); + if (matchingKeyword == NULL) + { + /* the whitelist lacks this keyword; error out! */ + StringInfoData msgString; + initStringInfo(&msgString); + + appendStringInfo(&msgString, "Prohibited conninfo keyword detected: %s", + option->keyword); + + *errorMsg = msgString.data; + + break; + } + } + + PQconninfoFree(optionArray); + + /* if error message is set we found an invalid keyword */ + return (*errorMsg == NULL); +} + + +/* + * GetConnParams uses the provided key to determine libpq parameters needed to + * establish a connection using that key. The keywords and values are placed in + * the like-named out parameters. All parameter strings are allocated in the + * context provided by the caller, to save the caller needing to copy strings + * into an appropriate context later. + */ +void +GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, + MemoryContext context) +{ + /* make space for the port as a string: sign, 10 digits, NUL */ + char *nodePortString = MemoryContextAlloc(context, 12 * sizeof(char *)); + + /* + * This function has three sections: + * - Initialize the keywords and values with copies of global parameters + * - Append user/host-specific parameters calculated from the given key + * - (Enterprise-only) append user/host-specific authentication params + * + * The global parameters have already been assigned from a GUC, so begin by + * calculating the key-specific parameters (basically just the fields of + * the key and the active database encoding). + */ + const char *runtimeKeywords[] = { + "host", "port", "dbname", "user", "client_encoding" + }; + const char *runtimeValues[] = { + MemoryContextStrdup(context, key->hostname), + nodePortString, + MemoryContextStrdup(context, key->database), + MemoryContextStrdup(context, key->user), + GetDatabaseEncodingName() + }; + + /* + * Declare local params for readability; we'll assign to outparams at end. + * Need to zero enough space for all possible libpq parameters. + */ + char **connKeywords = MemoryContextAllocZero(context, ConnParams.maxSize * + sizeof(char *)); + char **connValues = MemoryContextAllocZero(context, ConnParams.maxSize * + sizeof(char *)); + + /* auth keywords will begin after global and runtime ones are appended */ + Index authParamsIdx = ConnParams.size + lengthof(runtimeKeywords); + + pg_ltoa(key->port, nodePortString); /* populate node port string with port */ + + /* first step: copy global parameters to beginning of array */ + memcpy(connKeywords, ConnParams.keywords, ConnParams.size * sizeof(char *)); + memcpy(connValues, ConnParams.values, ConnParams.size * sizeof(char *)); + + /* second step: begin at end of global params and copy runtime ones */ + memcpy(&connKeywords[ConnParams.size], runtimeKeywords, sizeof(runtimeKeywords)); + memcpy(&connValues[ConnParams.size], runtimeValues, sizeof(runtimeValues)); + + /* final step: add terminal NULL, required by libpq */ + connKeywords[authParamsIdx] = connValues[authParamsIdx] = NULL; + + *keywords = connKeywords; + *values = connValues; +} + + +/* + * CalculateMaxSize simply counts the number of elements returned by + * PQconnDefaults, including the final NULL. This helps us know how space would + * be used if a connection utilizes every known libpq parameter. + */ +static Size +CalculateMaxSize() +{ + PQconninfoOption *defaults = PQconndefaults(); + PQconninfoOption *option = NULL; + Size maxSize = 0; + + for (option = defaults; + option->keyword != NULL; + option++, maxSize++) + { + /* do nothing, we're just counting the elements */ + } + + PQconninfoFree(defaults); + + /* we've counted elements but libpq needs a final NULL, so add one */ + maxSize++; + + return maxSize; +} + + +/* *INDENT-OFF* */ + +/* + * Checks if connection string starts with either of the valid URI prefix + * designators. + * + * Returns the URI prefix length, 0 if the string doesn't contain a URI prefix. + * + * This implementation (mostly) taken from libpq/fe-connect.c. + */ +static int +uri_prefix_length(const char *connstr) +{ + const char uri_designator[] = "postgresql://"; + const char short_uri_designator[] = "postgres://"; + + if (strncmp(connstr, uri_designator, + sizeof(uri_designator) - 1) == 0) + return sizeof(uri_designator) - 1; + + if (strncmp(connstr, short_uri_designator, + sizeof(short_uri_designator) - 1) == 0) + return sizeof(short_uri_designator) - 1; + + return 0; +} + +/* *INDENT-ON* */ diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index aa610ece1..81f2b4fde 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -33,11 +33,10 @@ int NodeConnectionTimeout = 5000; -int CitusSSLMode = CITUS_SSL_MODE_PREFER; HTAB *ConnectionHash = NULL; +HTAB *ConnParamsHash = NULL; MemoryContext ConnectionContext = NULL; - static uint32 ConnectionHashHash(const void *key, Size keysize); static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); @@ -55,10 +54,9 @@ static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL; void InitializeConnectionManagement(void) { - HASHCTL info; + HASHCTL info, connParamsInfo; uint32 hashFlags = 0; - /* * Create a single context for connection and transaction related memory * management. Doing so, instead of allocating in TopMemoryContext, makes @@ -79,8 +77,14 @@ InitializeConnectionManagement(void) info.hcxt = ConnectionContext; hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE); + memcpy(&connParamsInfo, &info, sizeof(HASHCTL)); + connParamsInfo.entrysize = sizeof(ConnParamsHashEntry); + ConnectionHash = hash_create("citus connection cache (host,port,user,database)", 64, &info, hashFlags); + + ConnParamsHash = hash_create("citus connparams cache (host,port,user,database)", + 64, &connParamsInfo, hashFlags); } @@ -635,30 +639,29 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize) static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key) { - char nodePortString[12]; - const char *clientEncoding = GetDatabaseEncodingName(); + bool found = false; MultiConnection *connection = NULL; - const char *sslmode = CitusSSLModeString(); + ConnParamsHashEntry *entry = NULL; - const char *keywords[] = { - "host", "port", "dbname", "user", "sslmode", - "client_encoding", "fallback_application_name", - NULL - }; - const char *values[] = { - key->hostname, nodePortString, key->database, key->user, sslmode, - clientEncoding, "citus", NULL - }; + /* search our cache for precomputed connection settings */ + entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found); + if (!found) + { + /* if they're not found, compute them from GUC, runtime, etc. */ + GetConnParams(key, &entry->keywords, &entry->values, ConnectionContext); + } connection = MemoryContextAllocZero(ConnectionContext, sizeof(MultiConnection)); - sprintf(nodePortString, "%d", key->port); strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH); connection->port = key->port; strlcpy(connection->database, key->database, NAMEDATALEN); strlcpy(connection->user, key->user, NAMEDATALEN); - connection->pgConn = PQconnectStartParams(keywords, values, false); + + connection->pgConn = PQconnectStartParams((const char **) entry->keywords, + (const char **) entry->values, + false); connection->connectionStart = GetCurrentTimestamp(); /* @@ -674,52 +677,6 @@ StartConnectionEstablishment(ConnectionHashKey *key) } -/* - * CitusSSLModeString returns the current value of citus.sslmode. - */ -char * -CitusSSLModeString(void) -{ - switch (CitusSSLMode) - { - case CITUS_SSL_MODE_DISABLE: - { - return "disable"; - } - - case CITUS_SSL_MODE_ALLOW: - { - return "allow"; - } - - case CITUS_SSL_MODE_PREFER: - { - return "prefer"; - } - - case CITUS_SSL_MODE_REQUIRE: - { - return "require"; - } - - case CITUS_SSL_MODE_VERIFY_CA: - { - return "verify-ca"; - } - - case CITUS_SSL_MODE_VERIFY_FULL: - { - return "verify-full"; - } - - default: - { - ereport(ERROR, (errmsg("unrecognized value for citus.sslmode"))); - } - } -} - - /* * Close all remote connections if necessary anymore (i.e. not session * lifetime), or if in a failed state. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 29eb737be..4d89e3f0d 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -67,6 +67,8 @@ static void RegisterCitusConfigVariables(void); static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra, GucSource source); static void NormalizeWorkerListPath(void); +static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source); +static void NodeConninfoGucAssignHook(const char *newval, void *extra); static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source); @@ -74,6 +76,19 @@ static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSour static bool ExpireCachedShards = false; static int LargeTableShardCount = 0; +/* + * SSL modes available for connecting to worker nodes. + */ +enum CitusSSLMode +{ + CITUS_SSL_MODE_DISABLE = 1 << 0, + CITUS_SSL_MODE_ALLOW = 1 << 1, + CITUS_SSL_MODE_PREFER = 1 << 2, + CITUS_SSL_MODE_REQUIRE = 1 << 3, + CITUS_SSL_MODE_VERIFY_CA = 1 << 4, + CITUS_SSL_MODE_VERIFY_FULL = 1 << 5 +}; +static int CitusSSLMode = CITUS_SSL_MODE_PREFER; /* *INDENT-OFF* */ /* GUC enum definitions */ @@ -184,6 +199,8 @@ _PG_init(void) CreateRequiredDirectories(); } + InitConnParams(); + /* * Register Citus configuration variables. Do so before intercepting * hooks or calling initialization functions, in case we want to do the @@ -329,15 +346,13 @@ RegisterCitusConfigVariables(void) DefineCustomEnumVariable( "citus.sslmode", - gettext_noop("SSL mode to use for connections to worker nodes."), - gettext_noop("When connecting to a worker node, specify whether the SSL mode" - "mode for the connection is 'disable', 'allow', 'prefer' " - "(the default), 'require', 'verify-ca' or 'verify-full'."), + gettext_noop("This GUC variable has been deprecated."), + NULL, &CitusSSLMode, CITUS_SSL_MODE_PREFER, citus_ssl_mode_options, PGC_POSTMASTER, - GUC_SUPERUSER_ONLY, + GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL, NULL, NULL, NULL); DefineCustomBoolVariable( @@ -926,6 +941,20 @@ RegisterCitusConfigVariables(void) &StatisticsCollectionGucCheckHook, NULL, NULL); + DefineCustomStringVariable( + "citus.node_conninfo", + gettext_noop("Sets parameters used for outbound connections."), + NULL, + &NodeConninfo, + "sslmode=prefer", + PGC_POSTMASTER, + GUC_SUPERUSER_ONLY, + NodeConninfoGucCheckHook, + NodeConninfoGucAssignHook, + NULL); + NormalizeWorkerListPath(); + + /* warn about config items in the citus namespace that are not registered above */ EmitWarningsOnPlaceholders("citus"); } @@ -997,6 +1026,87 @@ NormalizeWorkerListPath(void) } +/* + * NodeConninfoGucCheckHook ensures conninfo settings are in the expected form + * and that the keywords of all non-null settings are on a whitelist devised to + * keep users from setting options that may result in confusion. + */ +static bool +NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source) +{ + /* this array _must_ be kept in an order usable by bsearch */ + const char *whitelist[] = { + "application_name", + "connect_timeout", + #if defined(ENABLE_GSS) && defined(ENABLE_SSPI) + "gsslib", + #endif + "keepalives", + "keepalives_count", + "keepalives_idle", + "keepalives_interval", + #if defined(ENABLE_GSS) || defined(ENABLE_SSPI) + "krbsrvname", + #endif + "sslcompression", + "sslcrl", + "sslmode", + "sslrootcert" + }; + char *errorMsg = NULL; + bool conninfoValid = CheckConninfo(*newval, whitelist, lengthof(whitelist), + &errorMsg); + + if (!conninfoValid) + { + GUC_check_errdetail("%s", errorMsg); + } + + return conninfoValid; +} + + +/* + * NodeConninfoGucAssignHook is the assignment hook for the node_conninfo GUC + * variable. Though this GUC is a "string", we actually parse it as a non-URI + * PQconninfo key/value setting, storing the resultant PQconninfoOption values + * using the public functions in connection_configuration.c. + */ +static void +NodeConninfoGucAssignHook(const char *newval, void *extra) +{ + PQconninfoOption *optionArray = NULL; + PQconninfoOption *option = NULL; + + if (newval == NULL) + { + newval = ""; + } + + optionArray = PQconninfoParse(newval, NULL); + if (optionArray == NULL) + { + ereport(FATAL, (errmsg("cannot parse node_conninfo value"), + errdetail("The GUC check hook should prevent " + "all malformed values."))); + } + + ResetConnParams(); + + for (option = optionArray; option->keyword != NULL; option++) + { + if (option->val == NULL || option->val[0] == '\0') + { + continue; + } + + AddConnParam(option->keyword, option->val); + } + + PQconninfoFree(optionArray); +} + + static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source) { diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 5be9a64f1..480ec695c 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -9,7 +9,6 @@ #include "stdint.h" #include "postgres.h" - #include "miscadmin.h" #include "access/genam.h" diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 912aee9af..fa833a29b 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -113,26 +113,21 @@ typedef struct ConnectionHashEntry dlist_head *connections; } ConnectionHashEntry; -/* - * SSL modes available for connecting to worker nodes. - */ -enum CitusSSLMode +/* hash entry for cached connection parameters */ +typedef struct ConnParamsHashEntry { - CITUS_SSL_MODE_DISABLE = 1 << 0, - CITUS_SSL_MODE_ALLOW = 1 << 1, - CITUS_SSL_MODE_PREFER = 1 << 2, - CITUS_SSL_MODE_REQUIRE = 1 << 3, - CITUS_SSL_MODE_VERIFY_CA = 1 << 4, - CITUS_SSL_MODE_VERIFY_FULL = 1 << 5 -}; + ConnectionHashKey key; + char **keywords; + char **values; +} ConnParamsHashEntry; -/* SSL mode to use when connecting to worker nodes */ -extern int CitusSSLMode; - /* maximum duration to wait for connection */ extern int NodeConnectionTimeout; +/* parameters used for outbound connections */ +extern char *NodeConninfo; + /* the hash table */ extern HTAB *ConnectionHash; @@ -143,6 +138,14 @@ extern struct MemoryContextData *ConnectionContext; extern void AfterXactConnectionHandling(bool isCommit); extern void InitializeConnectionManagement(void); +extern void InitConnParams(void); +extern void ResetConnParams(void); +extern void AddConnParam(const char *keyword, const char *value); +extern void GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, + MemoryContext context); +extern bool CheckConninfo(const char *conninfo, const char **whitelist, + Size whitelistLength, char **errmsg); + /* Low-level connection establishment APIs */ extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname, @@ -157,7 +160,6 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, int32 port, const char *user, const char *database); -extern char * CitusSSLModeString(void); extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); extern void CloseConnection(MultiConnection *connection); extern void ShutdownConnection(MultiConnection *connection);