mirror of https://github.com/citusdata/citus.git
Merge pull request #2190 from citusdata/conninfo_guc
Add node_conninfo GUC and related logic cr: @marcocituspull/2218/head
commit
95e546ba5f
1
Makefile
1
Makefile
|
@ -9,6 +9,7 @@ OBJS = src/backend/distributed/shared_library_init.o \
|
|||
src/backend/distributed/commands/drop_distributed_table.o \
|
||||
src/backend/distributed/commands/multi_copy.o \
|
||||
src/backend/distributed/commands/transmit.o \
|
||||
src/backend/distributed/connection/connection_configuration.o \
|
||||
src/backend/distributed/connection/connection_management.o \
|
||||
src/backend/distributed/connection/placement_connection.o \
|
||||
src/backend/distributed/connection/remote_commands.o \
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Citus extension
|
||||
comment = 'Citus distributed database'
|
||||
default_version = '7.5-1'
|
||||
default_version = '7.5-2'
|
||||
module_pathname = '$libdir/citus'
|
||||
relocatable = false
|
||||
schema = pg_catalog
|
||||
|
|
|
@ -16,7 +16,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
|||
7.2-1 7.2-2 7.2-3 \
|
||||
7.3-1 7.3-2 7.3-3 \
|
||||
7.4-1 7.4-2 7.4-3 \
|
||||
7.5-1
|
||||
7.5-1 7.5-2
|
||||
|
||||
# All citus--*.sql files in the source directory
|
||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||
|
@ -202,6 +202,8 @@ $(EXTENSION)--7.4-3.sql: $(EXTENSION)--7.4-2.sql $(EXTENSION)--7.4-2--7.4-3.sql
|
|||
cat $^ > $@
|
||||
$(EXTENSION)--7.5-1.sql: $(EXTENSION)--7.4-3.sql $(EXTENSION)--7.4-3--7.5-1.sql
|
||||
cat $^ > $@
|
||||
$(EXTENSION)--7.5-2.sql: $(EXTENSION)--7.5-1.sql $(EXTENSION)--7.5-1--7.5-2.sql
|
||||
cat $^ > $@
|
||||
|
||||
NO_PGXS = 1
|
||||
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
/* citus--7.5-1--7.5-2 */
|
||||
SET search_path = 'pg_catalog';
|
||||
|
||||
-- note that we're not dropping the older version of the function
|
||||
CREATE FUNCTION pg_catalog.role_exists(name)
|
||||
RETURNS boolean
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$role_exists$$;
|
||||
COMMENT ON FUNCTION role_exists(name) IS 'returns whether a role exists';
|
||||
|
||||
CREATE FUNCTION pg_catalog.authinfo_valid(text)
|
||||
RETURNS boolean
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$authinfo_valid$$;
|
||||
COMMENT ON FUNCTION authinfo_valid(text) IS 'returns whether an authinfo is valid';
|
||||
|
||||
CREATE TABLE citus.pg_dist_authinfo (
|
||||
nodeid integer NOT NULL,
|
||||
rolename name NOT NULL
|
||||
CONSTRAINT role_exists
|
||||
CHECK (role_exists(rolename)),
|
||||
authinfo text NOT NULL
|
||||
CONSTRAINT authinfo_valid
|
||||
CHECK (authinfo_valid(authinfo))
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX pg_dist_authinfo_identification_index
|
||||
ON citus.pg_dist_authinfo (rolename, nodeid DESC);
|
||||
|
||||
ALTER TABLE citus.pg_dist_authinfo SET SCHEMA pg_catalog;
|
||||
|
||||
REVOKE ALL ON pg_catalog.pg_dist_authinfo FROM PUBLIC;
|
||||
|
||||
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
|||
# Citus extension
|
||||
comment = 'Citus distributed database'
|
||||
default_version = '7.5-1'
|
||||
default_version = '7.5-2'
|
||||
module_pathname = '$libdir/citus'
|
||||
relocatable = false
|
||||
schema = pg_catalog
|
||||
|
|
|
@ -0,0 +1,336 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* connection_configuration.c
|
||||
* Functions for controlling configuration of Citus connections
|
||||
*
|
||||
* Copyright (c) 2018, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/task_tracker.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "mb/pg_wchar.h"
|
||||
#include "utils/builtins.h"
|
||||
|
||||
/* stores the string representation of our node connection GUC */
|
||||
char *NodeConninfo = "";
|
||||
|
||||
/* represents a list of libpq parameter settings */
|
||||
typedef struct ConnParamsInfo
|
||||
{
|
||||
char **keywords; /* libpq keywords */
|
||||
char **values; /* desired values for above */
|
||||
Size size; /* current used size of arrays */
|
||||
Size maxSize; /* maximum allocated size of arrays (similar to e.g. StringInfo) */
|
||||
} ConnParamsInfo;
|
||||
|
||||
/*
|
||||
* Stores parsed global libpq parameter settings. static because all access
|
||||
* is encapsulated in the other public functions in this file.
|
||||
*/
|
||||
static ConnParamsInfo ConnParams;
|
||||
|
||||
/* helper functions for processing connection info */
|
||||
static Size CalculateMaxSize(void);
|
||||
static int uri_prefix_length(const char *connstr);
|
||||
|
||||
/*
|
||||
* InitConnParms initializes the ConnParams field to point to enough memory to
|
||||
* store settings for every valid libpq value, though these regions are set to
|
||||
* zeros from the outset and the size appropriately also set to zero.
|
||||
*
|
||||
* This function must be called before others in this file, though calling it
|
||||
* after use of the initialized ConnParams structure will result in any
|
||||
* populated parameter settings being lost.
|
||||
*/
|
||||
void
|
||||
InitConnParams()
|
||||
{
|
||||
Size maxSize = CalculateMaxSize();
|
||||
ConnParamsInfo connParams = {
|
||||
.keywords = malloc(maxSize * sizeof(char *)),
|
||||
.values = malloc(maxSize * sizeof(char *)),
|
||||
.size = 0,
|
||||
.maxSize = maxSize
|
||||
};
|
||||
|
||||
memset(connParams.keywords, 0, maxSize * sizeof(char *));
|
||||
memset(connParams.values, 0, maxSize * sizeof(char *));
|
||||
|
||||
ConnParams = connParams;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ResetConnParams frees all strings in the keywords and parameters arrays,
|
||||
* sets their elements to null, and resets the ConnParamsSize to zero before
|
||||
* adding back any hardcoded global connection settings (at present, only the
|
||||
* fallback_application_name of 'citus').
|
||||
*/
|
||||
void
|
||||
ResetConnParams()
|
||||
{
|
||||
Index paramIdx = 0;
|
||||
|
||||
for (paramIdx = 0; paramIdx < ConnParams.size; paramIdx++)
|
||||
{
|
||||
free((void *) ConnParams.keywords[paramIdx]);
|
||||
free((void *) ConnParams.values[paramIdx]);
|
||||
|
||||
ConnParams.keywords[paramIdx] = ConnParams.values[paramIdx] = NULL;
|
||||
}
|
||||
|
||||
ConnParams.size = 0;
|
||||
|
||||
AddConnParam("fallback_application_name", "citus");
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AddConnParam adds a parameter setting to the global libpq settings according
|
||||
* to the provided keyword and value. Under assert-enabled builds, array bounds
|
||||
* checking is performed.
|
||||
*/
|
||||
void
|
||||
AddConnParam(const char *keyword, const char *value)
|
||||
{
|
||||
Assert((ConnParams.size + 1) < ConnParams.maxSize);
|
||||
|
||||
ConnParams.keywords[ConnParams.size] = strdup(keyword);
|
||||
ConnParams.values[ConnParams.size] = strdup(value);
|
||||
ConnParams.size++;
|
||||
|
||||
ConnParams.keywords[ConnParams.size] = ConnParams.values[ConnParams.size] = NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CheckConninfo is a building block to help implement check constraints and
|
||||
* other check hooks against libpq-like conninfo strings. In particular, the
|
||||
* provided conninfo must:
|
||||
*
|
||||
* - Not use a uri-prefix such as postgres:// (it must be only keys and values)
|
||||
* - Parse using PQconninfoParse
|
||||
* - Only set keywords contained in the provided whitelist
|
||||
*
|
||||
* This function returns true if all of the above are satisfied, otherwise it
|
||||
* returns false. If the provided errmsg pointer is not NULL, it will be set
|
||||
* to an appropriate message if the check fails.
|
||||
*
|
||||
* The provided whitelist must be sorted in a manner usable by bsearch, though
|
||||
* this is only validated during assert-enabled builds.
|
||||
*/
|
||||
bool
|
||||
CheckConninfo(const char *conninfo, const char **whitelist,
|
||||
Size whitelistLength, char **errorMsg)
|
||||
{
|
||||
PQconninfoOption *optionArray = NULL;
|
||||
PQconninfoOption *option = NULL;
|
||||
Index whitelistIdx PG_USED_FOR_ASSERTS_ONLY = 0;
|
||||
char *errorMsgString = NULL;
|
||||
|
||||
/*
|
||||
* If the user doesn't need a message, just overwrite errmsg with a stack
|
||||
* variable so we can always safely write to it.
|
||||
*/
|
||||
if (errorMsg == NULL)
|
||||
{
|
||||
errorMsg = &errorMsgString;
|
||||
}
|
||||
|
||||
/* sure, it can be null */
|
||||
if (conninfo == NULL)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
/* the libpq prefix form is more complex than we need; ban it */
|
||||
if (uri_prefix_length(conninfo) != 0)
|
||||
{
|
||||
*errorMsg = "Citus connection info strings must be in "
|
||||
"'k1=v1 k2=v2 [...] kn=vn' format";
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/* this should at least parse */
|
||||
optionArray = PQconninfoParse(conninfo, NULL);
|
||||
if (optionArray == NULL)
|
||||
{
|
||||
*errorMsg = "Provided string is not a valid libpq connection info string";
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
|
||||
/* verify that the whitelist is in ascending order */
|
||||
for (whitelistIdx = 1; whitelistIdx < whitelistLength; whitelistIdx++)
|
||||
{
|
||||
const char *prev = whitelist[whitelistIdx - 1];
|
||||
const char *curr = whitelist[whitelistIdx];
|
||||
|
||||
AssertArg(strcmp(prev, curr) < 0);
|
||||
}
|
||||
#endif
|
||||
|
||||
for (option = optionArray; option->keyword != NULL; option++)
|
||||
{
|
||||
void *matchingKeyword = NULL;
|
||||
|
||||
if (option->val == NULL || option->val[0] == '\0')
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
matchingKeyword = bsearch(&option->keyword, whitelist, whitelistLength,
|
||||
sizeof(char *), pg_qsort_strcmp);
|
||||
if (matchingKeyword == NULL)
|
||||
{
|
||||
/* the whitelist lacks this keyword; error out! */
|
||||
StringInfoData msgString;
|
||||
initStringInfo(&msgString);
|
||||
|
||||
appendStringInfo(&msgString, "Prohibited conninfo keyword detected: %s",
|
||||
option->keyword);
|
||||
|
||||
*errorMsg = msgString.data;
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
PQconninfoFree(optionArray);
|
||||
|
||||
/* if error message is set we found an invalid keyword */
|
||||
return (*errorMsg == NULL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetConnParams uses the provided key to determine libpq parameters needed to
|
||||
* establish a connection using that key. The keywords and values are placed in
|
||||
* the like-named out parameters. All parameter strings are allocated in the
|
||||
* context provided by the caller, to save the caller needing to copy strings
|
||||
* into an appropriate context later.
|
||||
*/
|
||||
void
|
||||
GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
||||
MemoryContext context)
|
||||
{
|
||||
/* make space for the port as a string: sign, 10 digits, NUL */
|
||||
char *nodePortString = MemoryContextAlloc(context, 12 * sizeof(char *));
|
||||
|
||||
/*
|
||||
* This function has three sections:
|
||||
* - Initialize the keywords and values with copies of global parameters
|
||||
* - Append user/host-specific parameters calculated from the given key
|
||||
* - (Enterprise-only) append user/host-specific authentication params
|
||||
*
|
||||
* The global parameters have already been assigned from a GUC, so begin by
|
||||
* calculating the key-specific parameters (basically just the fields of
|
||||
* the key and the active database encoding).
|
||||
*/
|
||||
const char *runtimeKeywords[] = {
|
||||
"host", "port", "dbname", "user", "client_encoding"
|
||||
};
|
||||
const char *runtimeValues[] = {
|
||||
MemoryContextStrdup(context, key->hostname),
|
||||
nodePortString,
|
||||
MemoryContextStrdup(context, key->database),
|
||||
MemoryContextStrdup(context, key->user),
|
||||
GetDatabaseEncodingName()
|
||||
};
|
||||
|
||||
/*
|
||||
* Declare local params for readability; we'll assign to outparams at end.
|
||||
* Need to zero enough space for all possible libpq parameters.
|
||||
*/
|
||||
char **connKeywords = MemoryContextAllocZero(context, ConnParams.maxSize *
|
||||
sizeof(char *));
|
||||
char **connValues = MemoryContextAllocZero(context, ConnParams.maxSize *
|
||||
sizeof(char *));
|
||||
|
||||
/* auth keywords will begin after global and runtime ones are appended */
|
||||
Index authParamsIdx = ConnParams.size + lengthof(runtimeKeywords);
|
||||
|
||||
pg_ltoa(key->port, nodePortString); /* populate node port string with port */
|
||||
|
||||
/* first step: copy global parameters to beginning of array */
|
||||
memcpy(connKeywords, ConnParams.keywords, ConnParams.size * sizeof(char *));
|
||||
memcpy(connValues, ConnParams.values, ConnParams.size * sizeof(char *));
|
||||
|
||||
/* second step: begin at end of global params and copy runtime ones */
|
||||
memcpy(&connKeywords[ConnParams.size], runtimeKeywords, sizeof(runtimeKeywords));
|
||||
memcpy(&connValues[ConnParams.size], runtimeValues, sizeof(runtimeValues));
|
||||
|
||||
/* final step: add terminal NULL, required by libpq */
|
||||
connKeywords[authParamsIdx] = connValues[authParamsIdx] = NULL;
|
||||
|
||||
*keywords = connKeywords;
|
||||
*values = connValues;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CalculateMaxSize simply counts the number of elements returned by
|
||||
* PQconnDefaults, including the final NULL. This helps us know how space would
|
||||
* be used if a connection utilizes every known libpq parameter.
|
||||
*/
|
||||
static Size
|
||||
CalculateMaxSize()
|
||||
{
|
||||
PQconninfoOption *defaults = PQconndefaults();
|
||||
PQconninfoOption *option = NULL;
|
||||
Size maxSize = 0;
|
||||
|
||||
for (option = defaults;
|
||||
option->keyword != NULL;
|
||||
option++, maxSize++)
|
||||
{
|
||||
/* do nothing, we're just counting the elements */
|
||||
}
|
||||
|
||||
PQconninfoFree(defaults);
|
||||
|
||||
/* we've counted elements but libpq needs a final NULL, so add one */
|
||||
maxSize++;
|
||||
|
||||
return maxSize;
|
||||
}
|
||||
|
||||
|
||||
/* *INDENT-OFF* */
|
||||
|
||||
/*
|
||||
* Checks if connection string starts with either of the valid URI prefix
|
||||
* designators.
|
||||
*
|
||||
* Returns the URI prefix length, 0 if the string doesn't contain a URI prefix.
|
||||
*
|
||||
* This implementation (mostly) taken from libpq/fe-connect.c.
|
||||
*/
|
||||
static int
|
||||
uri_prefix_length(const char *connstr)
|
||||
{
|
||||
const char uri_designator[] = "postgresql://";
|
||||
const char short_uri_designator[] = "postgres://";
|
||||
|
||||
if (strncmp(connstr, uri_designator,
|
||||
sizeof(uri_designator) - 1) == 0)
|
||||
return sizeof(uri_designator) - 1;
|
||||
|
||||
if (strncmp(connstr, short_uri_designator,
|
||||
sizeof(short_uri_designator) - 1) == 0)
|
||||
return sizeof(short_uri_designator) - 1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* *INDENT-ON* */
|
|
@ -33,11 +33,10 @@
|
|||
|
||||
|
||||
int NodeConnectionTimeout = 5000;
|
||||
int CitusSSLMode = CITUS_SSL_MODE_PREFER;
|
||||
HTAB *ConnectionHash = NULL;
|
||||
HTAB *ConnParamsHash = NULL;
|
||||
MemoryContext ConnectionContext = NULL;
|
||||
|
||||
|
||||
static uint32 ConnectionHashHash(const void *key, Size keysize);
|
||||
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
|
||||
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key);
|
||||
|
@ -55,10 +54,9 @@ static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
|
|||
void
|
||||
InitializeConnectionManagement(void)
|
||||
{
|
||||
HASHCTL info;
|
||||
HASHCTL info, connParamsInfo;
|
||||
uint32 hashFlags = 0;
|
||||
|
||||
|
||||
/*
|
||||
* Create a single context for connection and transaction related memory
|
||||
* management. Doing so, instead of allocating in TopMemoryContext, makes
|
||||
|
@ -79,8 +77,14 @@ InitializeConnectionManagement(void)
|
|||
info.hcxt = ConnectionContext;
|
||||
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
|
||||
|
||||
memcpy(&connParamsInfo, &info, sizeof(HASHCTL));
|
||||
connParamsInfo.entrysize = sizeof(ConnParamsHashEntry);
|
||||
|
||||
ConnectionHash = hash_create("citus connection cache (host,port,user,database)",
|
||||
64, &info, hashFlags);
|
||||
|
||||
ConnParamsHash = hash_create("citus connparams cache (host,port,user,database)",
|
||||
64, &connParamsInfo, hashFlags);
|
||||
}
|
||||
|
||||
|
||||
|
@ -635,30 +639,29 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize)
|
|||
static MultiConnection *
|
||||
StartConnectionEstablishment(ConnectionHashKey *key)
|
||||
{
|
||||
char nodePortString[12];
|
||||
const char *clientEncoding = GetDatabaseEncodingName();
|
||||
bool found = false;
|
||||
MultiConnection *connection = NULL;
|
||||
const char *sslmode = CitusSSLModeString();
|
||||
ConnParamsHashEntry *entry = NULL;
|
||||
|
||||
const char *keywords[] = {
|
||||
"host", "port", "dbname", "user", "sslmode",
|
||||
"client_encoding", "fallback_application_name",
|
||||
NULL
|
||||
};
|
||||
const char *values[] = {
|
||||
key->hostname, nodePortString, key->database, key->user, sslmode,
|
||||
clientEncoding, "citus", NULL
|
||||
};
|
||||
/* search our cache for precomputed connection settings */
|
||||
entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found);
|
||||
if (!found)
|
||||
{
|
||||
/* if they're not found, compute them from GUC, runtime, etc. */
|
||||
GetConnParams(key, &entry->keywords, &entry->values, ConnectionContext);
|
||||
}
|
||||
|
||||
connection = MemoryContextAllocZero(ConnectionContext, sizeof(MultiConnection));
|
||||
sprintf(nodePortString, "%d", key->port);
|
||||
|
||||
strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH);
|
||||
connection->port = key->port;
|
||||
strlcpy(connection->database, key->database, NAMEDATALEN);
|
||||
strlcpy(connection->user, key->user, NAMEDATALEN);
|
||||
|
||||
connection->pgConn = PQconnectStartParams(keywords, values, false);
|
||||
|
||||
connection->pgConn = PQconnectStartParams((const char **) entry->keywords,
|
||||
(const char **) entry->values,
|
||||
false);
|
||||
connection->connectionStart = GetCurrentTimestamp();
|
||||
|
||||
/*
|
||||
|
@ -674,52 +677,6 @@ StartConnectionEstablishment(ConnectionHashKey *key)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusSSLModeString returns the current value of citus.sslmode.
|
||||
*/
|
||||
char *
|
||||
CitusSSLModeString(void)
|
||||
{
|
||||
switch (CitusSSLMode)
|
||||
{
|
||||
case CITUS_SSL_MODE_DISABLE:
|
||||
{
|
||||
return "disable";
|
||||
}
|
||||
|
||||
case CITUS_SSL_MODE_ALLOW:
|
||||
{
|
||||
return "allow";
|
||||
}
|
||||
|
||||
case CITUS_SSL_MODE_PREFER:
|
||||
{
|
||||
return "prefer";
|
||||
}
|
||||
|
||||
case CITUS_SSL_MODE_REQUIRE:
|
||||
{
|
||||
return "require";
|
||||
}
|
||||
|
||||
case CITUS_SSL_MODE_VERIFY_CA:
|
||||
{
|
||||
return "verify-ca";
|
||||
}
|
||||
|
||||
case CITUS_SSL_MODE_VERIFY_FULL:
|
||||
{
|
||||
return "verify-full";
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
ereport(ERROR, (errmsg("unrecognized value for citus.sslmode")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Close all remote connections if necessary anymore (i.e. not session
|
||||
* lifetime), or if in a failed state.
|
||||
|
|
|
@ -67,6 +67,8 @@ static void RegisterCitusConfigVariables(void);
|
|||
static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra,
|
||||
GucSource source);
|
||||
static void NormalizeWorkerListPath(void);
|
||||
static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source);
|
||||
static void NodeConninfoGucAssignHook(const char *newval, void *extra);
|
||||
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
|
||||
source);
|
||||
|
||||
|
@ -74,6 +76,19 @@ static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSour
|
|||
static bool ExpireCachedShards = false;
|
||||
static int LargeTableShardCount = 0;
|
||||
|
||||
/*
|
||||
* SSL modes available for connecting to worker nodes.
|
||||
*/
|
||||
enum CitusSSLMode
|
||||
{
|
||||
CITUS_SSL_MODE_DISABLE = 1 << 0,
|
||||
CITUS_SSL_MODE_ALLOW = 1 << 1,
|
||||
CITUS_SSL_MODE_PREFER = 1 << 2,
|
||||
CITUS_SSL_MODE_REQUIRE = 1 << 3,
|
||||
CITUS_SSL_MODE_VERIFY_CA = 1 << 4,
|
||||
CITUS_SSL_MODE_VERIFY_FULL = 1 << 5
|
||||
};
|
||||
static int CitusSSLMode = CITUS_SSL_MODE_PREFER;
|
||||
|
||||
/* *INDENT-OFF* */
|
||||
/* GUC enum definitions */
|
||||
|
@ -184,6 +199,8 @@ _PG_init(void)
|
|||
CreateRequiredDirectories();
|
||||
}
|
||||
|
||||
InitConnParams();
|
||||
|
||||
/*
|
||||
* Register Citus configuration variables. Do so before intercepting
|
||||
* hooks or calling initialization functions, in case we want to do the
|
||||
|
@ -329,15 +346,13 @@ RegisterCitusConfigVariables(void)
|
|||
|
||||
DefineCustomEnumVariable(
|
||||
"citus.sslmode",
|
||||
gettext_noop("SSL mode to use for connections to worker nodes."),
|
||||
gettext_noop("When connecting to a worker node, specify whether the SSL mode"
|
||||
"mode for the connection is 'disable', 'allow', 'prefer' "
|
||||
"(the default), 'require', 'verify-ca' or 'verify-full'."),
|
||||
gettext_noop("This GUC variable has been deprecated."),
|
||||
NULL,
|
||||
&CitusSSLMode,
|
||||
CITUS_SSL_MODE_PREFER,
|
||||
citus_ssl_mode_options,
|
||||
PGC_POSTMASTER,
|
||||
GUC_SUPERUSER_ONLY,
|
||||
GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
|
@ -926,6 +941,20 @@ RegisterCitusConfigVariables(void)
|
|||
&StatisticsCollectionGucCheckHook,
|
||||
NULL, NULL);
|
||||
|
||||
DefineCustomStringVariable(
|
||||
"citus.node_conninfo",
|
||||
gettext_noop("Sets parameters used for outbound connections."),
|
||||
NULL,
|
||||
&NodeConninfo,
|
||||
"sslmode=prefer",
|
||||
PGC_POSTMASTER,
|
||||
GUC_SUPERUSER_ONLY,
|
||||
NodeConninfoGucCheckHook,
|
||||
NodeConninfoGucAssignHook,
|
||||
NULL);
|
||||
NormalizeWorkerListPath();
|
||||
|
||||
|
||||
/* warn about config items in the citus namespace that are not registered above */
|
||||
EmitWarningsOnPlaceholders("citus");
|
||||
}
|
||||
|
@ -997,6 +1026,87 @@ NormalizeWorkerListPath(void)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* NodeConninfoGucCheckHook ensures conninfo settings are in the expected form
|
||||
* and that the keywords of all non-null settings are on a whitelist devised to
|
||||
* keep users from setting options that may result in confusion.
|
||||
*/
|
||||
static bool
|
||||
NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source)
|
||||
{
|
||||
/* this array _must_ be kept in an order usable by bsearch */
|
||||
const char *whitelist[] = {
|
||||
"application_name",
|
||||
"connect_timeout",
|
||||
#if defined(ENABLE_GSS) && defined(ENABLE_SSPI)
|
||||
"gsslib",
|
||||
#endif
|
||||
"keepalives",
|
||||
"keepalives_count",
|
||||
"keepalives_idle",
|
||||
"keepalives_interval",
|
||||
#if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
|
||||
"krbsrvname",
|
||||
#endif
|
||||
"sslcompression",
|
||||
"sslcrl",
|
||||
"sslmode",
|
||||
"sslrootcert"
|
||||
};
|
||||
char *errorMsg = NULL;
|
||||
bool conninfoValid = CheckConninfo(*newval, whitelist, lengthof(whitelist),
|
||||
&errorMsg);
|
||||
|
||||
if (!conninfoValid)
|
||||
{
|
||||
GUC_check_errdetail("%s", errorMsg);
|
||||
}
|
||||
|
||||
return conninfoValid;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NodeConninfoGucAssignHook is the assignment hook for the node_conninfo GUC
|
||||
* variable. Though this GUC is a "string", we actually parse it as a non-URI
|
||||
* PQconninfo key/value setting, storing the resultant PQconninfoOption values
|
||||
* using the public functions in connection_configuration.c.
|
||||
*/
|
||||
static void
|
||||
NodeConninfoGucAssignHook(const char *newval, void *extra)
|
||||
{
|
||||
PQconninfoOption *optionArray = NULL;
|
||||
PQconninfoOption *option = NULL;
|
||||
|
||||
if (newval == NULL)
|
||||
{
|
||||
newval = "";
|
||||
}
|
||||
|
||||
optionArray = PQconninfoParse(newval, NULL);
|
||||
if (optionArray == NULL)
|
||||
{
|
||||
ereport(FATAL, (errmsg("cannot parse node_conninfo value"),
|
||||
errdetail("The GUC check hook should prevent "
|
||||
"all malformed values.")));
|
||||
}
|
||||
|
||||
ResetConnParams();
|
||||
|
||||
for (option = optionArray; option->keyword != NULL; option++)
|
||||
{
|
||||
if (option->val == NULL || option->val[0] == '\0')
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
AddConnParam(option->keyword, option->val);
|
||||
}
|
||||
|
||||
PQconninfoFree(optionArray);
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
|
||||
{
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
|
||||
#include "stdint.h"
|
||||
#include "postgres.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "access/genam.h"
|
||||
|
@ -27,6 +27,7 @@
|
|||
#include "commands/extension.h"
|
||||
#include "commands/trigger.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
@ -49,6 +50,7 @@
|
|||
#include "utils/builtins.h"
|
||||
#include "utils/catcache.h"
|
||||
#include "utils/datum.h"
|
||||
#include "utils/elog.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/inval.h"
|
||||
#include "utils/fmgroids.h"
|
||||
|
@ -210,6 +212,8 @@ PG_FUNCTION_INFO_V1(master_dist_shard_cache_invalidate);
|
|||
PG_FUNCTION_INFO_V1(master_dist_placement_cache_invalidate);
|
||||
PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate);
|
||||
PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate);
|
||||
PG_FUNCTION_INFO_V1(role_exists);
|
||||
PG_FUNCTION_INFO_V1(authinfo_valid);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -3430,3 +3434,34 @@ DistNodeMetadata(void)
|
|||
|
||||
return metadata;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* role_exists is a check constraint which ensures that roles referenced in the
|
||||
* pg_dist_authinfo catalog actually exist (at least at the time of insertion).
|
||||
*/
|
||||
Datum
|
||||
role_exists(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Name roleName = PG_GETARG_NAME(0);
|
||||
bool roleExists = SearchSysCacheExists1(AUTHNAME, NameGetDatum(roleName));
|
||||
|
||||
PG_RETURN_BOOL(roleExists);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* authinfo_valid is a check constraint which errors on all rows, intended for
|
||||
* use in prohibiting writes to pg_dist_authinfo in Citus Community.
|
||||
*/
|
||||
Datum
|
||||
authinfo_valid(PG_FUNCTION_ARGS)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot write to pg_dist_authinfo"),
|
||||
errdetail("Citus Community Edition does not support the use of "
|
||||
"custom authentication options."),
|
||||
errhint("To learn more about using advanced authentication schemes "
|
||||
"with Citus, please contact us at "
|
||||
"https://citusdata.com/about/contact_us")));
|
||||
}
|
||||
|
|
|
@ -113,26 +113,21 @@ typedef struct ConnectionHashEntry
|
|||
dlist_head *connections;
|
||||
} ConnectionHashEntry;
|
||||
|
||||
/*
|
||||
* SSL modes available for connecting to worker nodes.
|
||||
*/
|
||||
enum CitusSSLMode
|
||||
/* hash entry for cached connection parameters */
|
||||
typedef struct ConnParamsHashEntry
|
||||
{
|
||||
CITUS_SSL_MODE_DISABLE = 1 << 0,
|
||||
CITUS_SSL_MODE_ALLOW = 1 << 1,
|
||||
CITUS_SSL_MODE_PREFER = 1 << 2,
|
||||
CITUS_SSL_MODE_REQUIRE = 1 << 3,
|
||||
CITUS_SSL_MODE_VERIFY_CA = 1 << 4,
|
||||
CITUS_SSL_MODE_VERIFY_FULL = 1 << 5
|
||||
};
|
||||
ConnectionHashKey key;
|
||||
char **keywords;
|
||||
char **values;
|
||||
} ConnParamsHashEntry;
|
||||
|
||||
|
||||
/* SSL mode to use when connecting to worker nodes */
|
||||
extern int CitusSSLMode;
|
||||
|
||||
/* maximum duration to wait for connection */
|
||||
extern int NodeConnectionTimeout;
|
||||
|
||||
/* parameters used for outbound connections */
|
||||
extern char *NodeConninfo;
|
||||
|
||||
/* the hash table */
|
||||
extern HTAB *ConnectionHash;
|
||||
|
||||
|
@ -143,6 +138,14 @@ extern struct MemoryContextData *ConnectionContext;
|
|||
extern void AfterXactConnectionHandling(bool isCommit);
|
||||
extern void InitializeConnectionManagement(void);
|
||||
|
||||
extern void InitConnParams(void);
|
||||
extern void ResetConnParams(void);
|
||||
extern void AddConnParam(const char *keyword, const char *value);
|
||||
extern void GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
||||
MemoryContext context);
|
||||
extern bool CheckConninfo(const char *conninfo, const char **whitelist,
|
||||
Size whitelistLength, char **errmsg);
|
||||
|
||||
|
||||
/* Low-level connection establishment APIs */
|
||||
extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname,
|
||||
|
@ -157,7 +160,6 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
|
|||
int32 port,
|
||||
const char *user,
|
||||
const char *database);
|
||||
extern char * CitusSSLModeString(void);
|
||||
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
|
||||
extern void CloseConnection(MultiConnection *connection);
|
||||
extern void ShutdownConnection(MultiConnection *connection);
|
||||
|
|
|
@ -19,8 +19,9 @@ WHERE
|
|||
AND nsp.nspname = 'pg_catalog'
|
||||
AND NOT has_table_privilege(pg_class.oid, 'select');
|
||||
oid
|
||||
-----
|
||||
(0 rows)
|
||||
------------------
|
||||
pg_dist_authinfo
|
||||
(1 row)
|
||||
|
||||
RESET role;
|
||||
DROP USER no_access;
|
||||
|
|
|
@ -14,3 +14,7 @@ HINT: Connect to worker nodes directly to manually create all necessary users a
|
|||
CREATE USER new_user;
|
||||
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
|
||||
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
|
||||
INSERT INTO pg_dist_authinfo VALUES (0, 'new_user', 'password=1234');
|
||||
ERROR: cannot write to pg_dist_authinfo
|
||||
DETAIL: Citus Community Edition does not support the use of custom authentication options.
|
||||
HINT: To learn more about using advanced authentication schemes with Citus, please contact us at https://citusdata.com/about/contact_us
|
||||
|
|
|
@ -129,7 +129,7 @@ test: multi_create_schema
|
|||
|
||||
# ----------
|
||||
# Tests to check if we inform the user about potential caveats of creating new
|
||||
# databases, schemas, and roles.
|
||||
# databases, schemas, roles, and authentication information.
|
||||
# ----------
|
||||
test: multi_utility_warnings
|
||||
|
||||
|
|
|
@ -14,3 +14,5 @@ CREATE DATABASE new_database;
|
|||
CREATE ROLE new_role;
|
||||
|
||||
CREATE USER new_user;
|
||||
|
||||
INSERT INTO pg_dist_authinfo VALUES (0, 'new_user', 'password=1234');
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
#define CITUS_EDITION "community"
|
||||
|
||||
/* Extension version expected by this Citus build */
|
||||
#define CITUS_EXTENSIONVERSION "7.5-1"
|
||||
#define CITUS_EXTENSIONVERSION "7.5-2"
|
||||
|
||||
/* Citus major version as a string */
|
||||
#define CITUS_MAJORVERSION "7.5"
|
||||
|
|
Loading…
Reference in New Issue