mirror of https://github.com/citusdata/citus.git
Merge pull request #2632 from citusdata/fix_conninfo_memory_bugs
Fix conninfo memory bugs cr: @onderkalaci, @marcocituspull/2635/head
commit
1a7c73c37b
|
@ -77,7 +77,6 @@ void
|
||||||
ResetConnParams()
|
ResetConnParams()
|
||||||
{
|
{
|
||||||
Index paramIdx = 0;
|
Index paramIdx = 0;
|
||||||
|
|
||||||
for (paramIdx = 0; paramIdx < ConnParams.size; paramIdx++)
|
for (paramIdx = 0; paramIdx < ConnParams.size; paramIdx++)
|
||||||
{
|
{
|
||||||
free((void *) ConnParams.keywords[paramIdx]);
|
free((void *) ConnParams.keywords[paramIdx]);
|
||||||
|
@ -88,21 +87,22 @@ ResetConnParams()
|
||||||
|
|
||||||
ConnParams.size = 0;
|
ConnParams.size = 0;
|
||||||
|
|
||||||
|
InvalidateConnParamsHashEntries();
|
||||||
|
|
||||||
AddConnParam("fallback_application_name", "citus");
|
AddConnParam("fallback_application_name", "citus");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AddConnParam adds a parameter setting to the global libpq settings according
|
* AddConnParam adds a parameter setting to the global libpq settings according
|
||||||
* to the provided keyword and value. Under assert-enabled builds, array bounds
|
* to the provided keyword and value.
|
||||||
* checking is performed.
|
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
AddConnParam(const char *keyword, const char *value)
|
AddConnParam(const char *keyword, const char *value)
|
||||||
{
|
{
|
||||||
if (ConnParams.size + 1 >= ConnParams.maxSize)
|
if (ConnParams.size + 1 >= ConnParams.maxSize)
|
||||||
{
|
{
|
||||||
/* we expect developers to see that error messages */
|
/* hopefully this error is only seen by developers */
|
||||||
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
|
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
|
||||||
errmsg("ConnParams arrays bound check failed")));
|
errmsg("ConnParams arrays bound check failed")));
|
||||||
}
|
}
|
||||||
|
@ -227,7 +227,7 @@ CheckConninfo(const char *conninfo, const char **whitelist,
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
||||||
MemoryContext context)
|
Index *runtimeParamStart, MemoryContext context)
|
||||||
{
|
{
|
||||||
/* make space for the port as a string: sign, 10 digits, NUL */
|
/* make space for the port as a string: sign, 10 digits, NUL */
|
||||||
char *nodePortString = MemoryContextAlloc(context, 12 * sizeof(char *));
|
char *nodePortString = MemoryContextAlloc(context, 12 * sizeof(char *));
|
||||||
|
@ -241,16 +241,24 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
||||||
* The global parameters have already been assigned from a GUC, so begin by
|
* The global parameters have already been assigned from a GUC, so begin by
|
||||||
* calculating the key-specific parameters (basically just the fields of
|
* calculating the key-specific parameters (basically just the fields of
|
||||||
* the key and the active database encoding).
|
* the key and the active database encoding).
|
||||||
|
*
|
||||||
|
* We allocate everything in the provided context so as to facilitate using
|
||||||
|
* pfree on all runtime parameters when connections using these entries are
|
||||||
|
* invalidated during config reloads.
|
||||||
*/
|
*/
|
||||||
const char *runtimeKeywords[] = {
|
const char *runtimeKeywords[] = {
|
||||||
"host", "port", "dbname", "user", "client_encoding"
|
MemoryContextStrdup(context, "host"),
|
||||||
|
MemoryContextStrdup(context, "port"),
|
||||||
|
MemoryContextStrdup(context, "dbname"),
|
||||||
|
MemoryContextStrdup(context, "user"),
|
||||||
|
MemoryContextStrdup(context, "client_encoding")
|
||||||
};
|
};
|
||||||
const char *runtimeValues[] = {
|
const char *runtimeValues[] = {
|
||||||
MemoryContextStrdup(context, key->hostname),
|
MemoryContextStrdup(context, key->hostname),
|
||||||
nodePortString,
|
nodePortString,
|
||||||
MemoryContextStrdup(context, key->database),
|
MemoryContextStrdup(context, key->database),
|
||||||
MemoryContextStrdup(context, key->user),
|
MemoryContextStrdup(context, key->user),
|
||||||
GetDatabaseEncodingName()
|
MemoryContextStrdup(context, GetDatabaseEncodingName())
|
||||||
};
|
};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -265,12 +273,12 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
||||||
/* auth keywords will begin after global and runtime ones are appended */
|
/* auth keywords will begin after global and runtime ones are appended */
|
||||||
Index authParamsIdx = ConnParams.size + lengthof(runtimeKeywords);
|
Index authParamsIdx = ConnParams.size + lengthof(runtimeKeywords);
|
||||||
|
|
||||||
int paramIndex = 0;
|
Index paramIndex = 0;
|
||||||
int runtimeParamIndex = 0;
|
Index runtimeParamIndex = 0;
|
||||||
|
|
||||||
if (ConnParams.size + lengthof(runtimeKeywords) >= ConnParams.maxSize)
|
if (ConnParams.size + lengthof(runtimeKeywords) >= ConnParams.maxSize)
|
||||||
{
|
{
|
||||||
/* unexpected, intended as developers rather than users */
|
/* hopefully this error is only seen by developers */
|
||||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
errmsg("too many connParams entries")));
|
errmsg("too many connParams entries")));
|
||||||
}
|
}
|
||||||
|
@ -285,6 +293,9 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
||||||
connValues[paramIndex] = ConnParams.values[paramIndex];
|
connValues[paramIndex] = ConnParams.values[paramIndex];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* remember where global/GUC params end and runtime ones start */
|
||||||
|
*runtimeParamStart = ConnParams.size;
|
||||||
|
|
||||||
/* second step: begin at end of global params and copy runtime ones */
|
/* second step: begin at end of global params and copy runtime ones */
|
||||||
for (runtimeParamIndex = 0;
|
for (runtimeParamIndex = 0;
|
||||||
runtimeParamIndex < lengthof(runtimeKeywords);
|
runtimeParamIndex < lengthof(runtimeKeywords);
|
||||||
|
@ -312,7 +323,7 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
||||||
const char *
|
const char *
|
||||||
GetConnParam(const char *keyword)
|
GetConnParam(const char *keyword)
|
||||||
{
|
{
|
||||||
int i = 0;
|
Index i = 0;
|
||||||
|
|
||||||
for (i = 0; i < ConnParams.size; i++)
|
for (i = 0; i < ConnParams.size; i++)
|
||||||
{
|
{
|
||||||
|
|
|
@ -42,6 +42,7 @@ MemoryContext ConnectionContext = NULL;
|
||||||
static uint32 ConnectionHashHash(const void *key, Size keysize);
|
static uint32 ConnectionHashHash(const void *key, Size keysize);
|
||||||
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
|
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
|
||||||
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key);
|
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key);
|
||||||
|
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
|
||||||
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
|
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
|
||||||
static void DefaultCitusNoticeProcessor(void *arg, const char *message);
|
static void DefaultCitusNoticeProcessor(void *arg, const char *message);
|
||||||
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
|
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
|
||||||
|
@ -91,6 +92,26 @@ InitializeConnectionManagement(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InvalidateConnParamsHashEntries sets every hash entry's isValid flag to false.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
InvalidateConnParamsHashEntries(void)
|
||||||
|
{
|
||||||
|
if (ConnParamsHash != NULL)
|
||||||
|
{
|
||||||
|
ConnParamsHashEntry *entry = NULL;
|
||||||
|
HASH_SEQ_STATUS status;
|
||||||
|
|
||||||
|
hash_seq_init(&status, ConnParamsHash);
|
||||||
|
while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL)
|
||||||
|
{
|
||||||
|
entry->isValid = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Perform connection management activity after the end of a transaction. Both
|
* Perform connection management activity after the end of a transaction. Both
|
||||||
* COMMIT and ABORT paths are handled here.
|
* COMMIT and ABORT paths are handled here.
|
||||||
|
@ -694,8 +715,15 @@ StartConnectionEstablishment(ConnectionHashKey *key)
|
||||||
entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found);
|
entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found);
|
||||||
if (!found || !entry->isValid)
|
if (!found || !entry->isValid)
|
||||||
{
|
{
|
||||||
/* if they're not found, compute them from GUC, runtime, etc. */
|
/* avoid leaking memory in the keys and values arrays */
|
||||||
GetConnParams(key, &entry->keywords, &entry->values, ConnectionContext);
|
if (found && !entry->isValid)
|
||||||
|
{
|
||||||
|
FreeConnParamsHashEntryFields(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* if not found or not valid, compute them from GUC, runtime, etc. */
|
||||||
|
GetConnParams(key, &entry->keywords, &entry->values, &entry->runtimeParamStart,
|
||||||
|
ConnectionContext);
|
||||||
|
|
||||||
entry->isValid = true;
|
entry->isValid = true;
|
||||||
}
|
}
|
||||||
|
@ -726,6 +754,34 @@ StartConnectionEstablishment(ConnectionHashKey *key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FreeConnParamsHashEntryFields frees any dynamically allocated memory reachable
|
||||||
|
* from the fields of the provided ConnParamsHashEntry. This includes all runtime
|
||||||
|
* libpq keywords and values, as well as the actual arrays storing them.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry)
|
||||||
|
{
|
||||||
|
char **keyword = &entry->keywords[entry->runtimeParamStart];
|
||||||
|
char **value = &entry->values[entry->runtimeParamStart];
|
||||||
|
|
||||||
|
while (*keyword != NULL)
|
||||||
|
{
|
||||||
|
pfree(*keyword);
|
||||||
|
keyword++;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (*value != NULL)
|
||||||
|
{
|
||||||
|
pfree(*value);
|
||||||
|
value++;
|
||||||
|
}
|
||||||
|
|
||||||
|
pfree(entry->keywords);
|
||||||
|
pfree(entry->values);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Close all remote connections if necessary anymore (i.e. not session
|
* Close all remote connections if necessary anymore (i.e. not session
|
||||||
* lifetime), or if in a failed state.
|
* lifetime), or if in a failed state.
|
||||||
|
|
|
@ -3178,15 +3178,7 @@ CreateDistTableCache(void)
|
||||||
void
|
void
|
||||||
InvalidateMetadataSystemCache(void)
|
InvalidateMetadataSystemCache(void)
|
||||||
{
|
{
|
||||||
ConnParamsHashEntry *entry = NULL;
|
InvalidateConnParamsHashEntries();
|
||||||
HASH_SEQ_STATUS status;
|
|
||||||
|
|
||||||
hash_seq_init(&status, ConnParamsHash);
|
|
||||||
|
|
||||||
while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL)
|
|
||||||
{
|
|
||||||
entry->isValid = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
memset(&MetadataCache, 0, sizeof(MetadataCache));
|
memset(&MetadataCache, 0, sizeof(MetadataCache));
|
||||||
workerNodeHashValid = false;
|
workerNodeHashValid = false;
|
||||||
|
|
|
@ -861,15 +861,7 @@ ManageWorkerTasksHash(HTAB *WorkerTasksHash)
|
||||||
|
|
||||||
if (!WorkerTasksSharedState->conninfosValid)
|
if (!WorkerTasksSharedState->conninfosValid)
|
||||||
{
|
{
|
||||||
ConnParamsHashEntry *entry = NULL;
|
InvalidateConnParamsHashEntries();
|
||||||
HASH_SEQ_STATUS status;
|
|
||||||
|
|
||||||
hash_seq_init(&status, ConnParamsHash);
|
|
||||||
|
|
||||||
while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL)
|
|
||||||
{
|
|
||||||
entry->isValid = false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* schedule new tasks if we have any */
|
/* schedule new tasks if we have any */
|
||||||
|
|
|
@ -118,6 +118,7 @@ typedef struct ConnParamsHashEntry
|
||||||
{
|
{
|
||||||
ConnectionHashKey key;
|
ConnectionHashKey key;
|
||||||
bool isValid;
|
bool isValid;
|
||||||
|
Index runtimeParamStart;
|
||||||
char **keywords;
|
char **keywords;
|
||||||
char **values;
|
char **values;
|
||||||
} ConnParamsHashEntry;
|
} ConnParamsHashEntry;
|
||||||
|
@ -142,9 +143,10 @@ extern void InitializeConnectionManagement(void);
|
||||||
|
|
||||||
extern void InitConnParams(void);
|
extern void InitConnParams(void);
|
||||||
extern void ResetConnParams(void);
|
extern void ResetConnParams(void);
|
||||||
|
extern void InvalidateConnParamsHashEntries(void);
|
||||||
extern void AddConnParam(const char *keyword, const char *value);
|
extern void AddConnParam(const char *keyword, const char *value);
|
||||||
extern void GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
extern void GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
||||||
MemoryContext context);
|
Index *runtimeParamStart, MemoryContext context);
|
||||||
extern const char * GetConnParam(const char *keyword);
|
extern const char * GetConnParam(const char *keyword);
|
||||||
extern bool CheckConninfo(const char *conninfo, const char **whitelist,
|
extern bool CheckConninfo(const char *conninfo, const char **whitelist,
|
||||||
Size whitelistLength, char **errmsg);
|
Size whitelistLength, char **errmsg);
|
||||||
|
|
|
@ -125,5 +125,46 @@ ALTER TABLE supplier ALTER COLUMN s_suppkey SET NOT NULL;
|
||||||
select citus_table_size('supplier');
|
select citus_table_size('supplier');
|
||||||
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
|
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
|
||||||
END;
|
END;
|
||||||
|
show citus.node_conninfo;
|
||||||
|
citus.node_conninfo
|
||||||
|
---------------------
|
||||||
|
sslmode=require
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require';
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
pg_reload_conf
|
||||||
|
----------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- make sure that any invalidation to the connection info
|
||||||
|
-- wouldn't prevent future commands to fail
|
||||||
|
SELECT citus_total_relation_size('customer_copy_hash');
|
||||||
|
citus_total_relation_size
|
||||||
|
---------------------------
|
||||||
|
2646016
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
pg_reload_conf
|
||||||
|
----------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_total_relation_size('customer_copy_hash');
|
||||||
|
citus_total_relation_size
|
||||||
|
---------------------------
|
||||||
|
2646016
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- reset back to the original node_conninfo
|
||||||
|
ALTER SYSTEM RESET citus.node_conninfo;
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
pg_reload_conf
|
||||||
|
----------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
DROP INDEX index_1;
|
DROP INDEX index_1;
|
||||||
DROP INDEX index_2;
|
DROP INDEX index_2;
|
||||||
|
|
|
@ -65,5 +65,19 @@ ALTER TABLE supplier ALTER COLUMN s_suppkey SET NOT NULL;
|
||||||
select citus_table_size('supplier');
|
select citus_table_size('supplier');
|
||||||
END;
|
END;
|
||||||
|
|
||||||
|
show citus.node_conninfo;
|
||||||
|
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require';
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
|
||||||
|
-- make sure that any invalidation to the connection info
|
||||||
|
-- wouldn't prevent future commands to fail
|
||||||
|
SELECT citus_total_relation_size('customer_copy_hash');
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
SELECT citus_total_relation_size('customer_copy_hash');
|
||||||
|
|
||||||
|
-- reset back to the original node_conninfo
|
||||||
|
ALTER SYSTEM RESET citus.node_conninfo;
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
|
||||||
DROP INDEX index_1;
|
DROP INDEX index_1;
|
||||||
DROP INDEX index_2;
|
DROP INDEX index_2;
|
||||||
|
|
Loading…
Reference in New Issue