From 00d836e5a334b2a872cc00b6b7e9f5a3ab3c3f02 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Sun, 17 Mar 2019 22:50:43 -0600 Subject: [PATCH 1/3] alloc non-global conn. params in provided context Having DATA-segment string literals made blindly freeing the keywords/ values difficult, so I've switched to allocating all in the provided context; because of this (and with the knowledge of the end point of the global parameters), we can safely pfree non-global parameters when we come across an invalid connection parameter entry. --- .../connection/connection_configuration.c | 29 ++++++++++++------- .../connection/connection_management.c | 26 +++++++++++++++-- .../distributed/connection_management.h | 3 +- 3 files changed, 45 insertions(+), 13 deletions(-) diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index a689a6a92..86f5ebcf6 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -80,6 +80,7 @@ ResetConnParams() for (paramIdx = 0; paramIdx < ConnParams.size; paramIdx++) { + /* FIXME: People still have references to these! */ free((void *) ConnParams.keywords[paramIdx]); free((void *) ConnParams.values[paramIdx]); @@ -94,15 +95,14 @@ ResetConnParams() /* * AddConnParam adds a parameter setting to the global libpq settings according - * to the provided keyword and value. Under assert-enabled builds, array bounds - * checking is performed. + * to the provided keyword and value. */ void AddConnParam(const char *keyword, const char *value) { if (ConnParams.size + 1 >= ConnParams.maxSize) { - /* we expect developers to see that error messages */ + /* hopefully this error is only seen by developers */ ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_RESOURCES), errmsg("ConnParams arrays bound check failed"))); } @@ -227,7 +227,7 @@ CheckConninfo(const char *conninfo, const char **whitelist, */ void GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, - MemoryContext context) + Index *nonGlobalParamStart, MemoryContext context) { /* make space for the port as a string: sign, 10 digits, NUL */ char *nodePortString = MemoryContextAlloc(context, 12 * sizeof(char *)); @@ -241,16 +241,24 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, * The global parameters have already been assigned from a GUC, so begin by * calculating the key-specific parameters (basically just the fields of * the key and the active database encoding). + * + * We allocate everything in the provided context so as to facilitate using + * pfree on all runtime parameters when connections using these entries are + * invalidated during config reloads. */ const char *runtimeKeywords[] = { - "host", "port", "dbname", "user", "client_encoding" + MemoryContextStrdup(context, "host"), + MemoryContextStrdup(context, "port"), + MemoryContextStrdup(context, "dbname"), + MemoryContextStrdup(context, "user"), + MemoryContextStrdup(context, "client_encoding") }; const char *runtimeValues[] = { MemoryContextStrdup(context, key->hostname), nodePortString, MemoryContextStrdup(context, key->database), MemoryContextStrdup(context, key->user), - GetDatabaseEncodingName() + MemoryContextStrdup(context, GetDatabaseEncodingName()) }; /* @@ -265,12 +273,12 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, /* auth keywords will begin after global and runtime ones are appended */ Index authParamsIdx = ConnParams.size + lengthof(runtimeKeywords); - int paramIndex = 0; - int runtimeParamIndex = 0; + Index paramIndex = 0; + Index runtimeParamIndex = 0; if (ConnParams.size + lengthof(runtimeKeywords) >= ConnParams.maxSize) { - /* unexpected, intended as developers rather than users */ + /* hopefully this error is only seen by developers */ ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("too many connParams entries"))); } @@ -302,6 +310,7 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, *keywords = connKeywords; *values = connValues; + *nonGlobalParamStart = ConnParams.size; } @@ -312,7 +321,7 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, const char * GetConnParam(const char *keyword) { - int i = 0; + Index i = 0; for (i = 0; i < ConnParams.size; i++) { diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index f95f5f8ab..602858fb3 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -694,8 +694,30 @@ StartConnectionEstablishment(ConnectionHashKey *key) entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found); if (!found || !entry->isValid) { - /* if they're not found, compute them from GUC, runtime, etc. */ - GetConnParams(key, &entry->keywords, &entry->values, ConnectionContext); + if (found && !entry->isValid) + { + char **keyword = &entry->keywords[entry->nonGlobalParamStart]; + char **value = &entry->values[entry->nonGlobalParamStart]; + + while (*keyword != NULL) + { + pfree(*keyword); + keyword++; + } + + while (*value != NULL) + { + pfree(*value); + value++; + } + + pfree(entry->keywords); + pfree(entry->values); + } + + /* if not found or not valid, compute them from GUC, runtime, etc. */ + GetConnParams(key, &entry->keywords, &entry->values, &entry->nonGlobalParamStart, + ConnectionContext); entry->isValid = true; } diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 6b2e209c0..5964b95a9 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -118,6 +118,7 @@ typedef struct ConnParamsHashEntry { ConnectionHashKey key; bool isValid; + Index nonGlobalParamStart; char **keywords; char **values; } ConnParamsHashEntry; @@ -144,7 +145,7 @@ extern void InitConnParams(void); extern void ResetConnParams(void); extern void AddConnParam(const char *keyword, const char *value); extern void GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, - MemoryContext context); + Index *nonGlobalParamStart, MemoryContext context); extern const char * GetConnParam(const char *keyword); extern bool CheckConninfo(const char *conninfo, const char **whitelist, Size whitelistLength, char **errmsg); From 04aa34da68e1f05f6ec4a65a0ae090b672b1d68e Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Sun, 17 Mar 2019 23:07:11 -0600 Subject: [PATCH 2/3] Invalidate ConnParamsHash at config reload At configuration reload, we free all "global" (i.e. GUC-set) connection parameters, but these may still have live references in the connection parameters hash. By marking the entries as invalid, we can ensure they will not be used after free. --- .../connection/connection_configuration.c | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index 86f5ebcf6..c6331e935 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -77,16 +77,27 @@ void ResetConnParams() { Index paramIdx = 0; - for (paramIdx = 0; paramIdx < ConnParams.size; paramIdx++) { - /* FIXME: People still have references to these! */ free((void *) ConnParams.keywords[paramIdx]); free((void *) ConnParams.values[paramIdx]); ConnParams.keywords[paramIdx] = ConnParams.values[paramIdx] = NULL; } + if (ConnParamsHash != NULL) + { + ConnParamsHashEntry *entry = NULL; + HASH_SEQ_STATUS status; + + hash_seq_init(&status, ConnParamsHash); + while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL) + { + entry->isValid = false; + } + } + + ConnParams.size = 0; AddConnParam("fallback_application_name", "citus"); From a2c6f596f9ff665c86f2b167ac11120b33b60528 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Thu, 21 Mar 2019 00:03:18 -0600 Subject: [PATCH 3/3] Address code review comments --- .../connection/connection_configuration.c | 21 ++---- .../connection/connection_management.c | 70 ++++++++++++++----- .../distributed/utils/metadata_cache.c | 10 +-- src/backend/distributed/worker/task_tracker.c | 10 +-- .../distributed/connection_management.h | 5 +- .../regress/expected/multi_size_queries.out | 41 +++++++++++ src/test/regress/sql/multi_size_queries.sql | 14 ++++ 7 files changed, 118 insertions(+), 53 deletions(-) diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index c6331e935..4852a6ef5 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -85,21 +85,10 @@ ResetConnParams() ConnParams.keywords[paramIdx] = ConnParams.values[paramIdx] = NULL; } - if (ConnParamsHash != NULL) - { - ConnParamsHashEntry *entry = NULL; - HASH_SEQ_STATUS status; - - hash_seq_init(&status, ConnParamsHash); - while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL) - { - entry->isValid = false; - } - } - - ConnParams.size = 0; + InvalidateConnParamsHashEntries(); + AddConnParam("fallback_application_name", "citus"); } @@ -238,7 +227,7 @@ CheckConninfo(const char *conninfo, const char **whitelist, */ void GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, - Index *nonGlobalParamStart, MemoryContext context) + Index *runtimeParamStart, MemoryContext context) { /* make space for the port as a string: sign, 10 digits, NUL */ char *nodePortString = MemoryContextAlloc(context, 12 * sizeof(char *)); @@ -304,6 +293,9 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, connValues[paramIndex] = ConnParams.values[paramIndex]; } + /* remember where global/GUC params end and runtime ones start */ + *runtimeParamStart = ConnParams.size; + /* second step: begin at end of global params and copy runtime ones */ for (runtimeParamIndex = 0; runtimeParamIndex < lengthof(runtimeKeywords); @@ -321,7 +313,6 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, *keywords = connKeywords; *values = connValues; - *nonGlobalParamStart = ConnParams.size; } diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 602858fb3..10a0e93af 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -42,6 +42,7 @@ MemoryContext ConnectionContext = NULL; static uint32 ConnectionHashHash(const void *key, Size keysize); static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); +static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static void DefaultCitusNoticeProcessor(void *arg, const char *message); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); @@ -91,6 +92,26 @@ InitializeConnectionManagement(void) } +/* + * InvalidateConnParamsHashEntries sets every hash entry's isValid flag to false. + */ +void +InvalidateConnParamsHashEntries(void) +{ + if (ConnParamsHash != NULL) + { + ConnParamsHashEntry *entry = NULL; + HASH_SEQ_STATUS status; + + hash_seq_init(&status, ConnParamsHash); + while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL) + { + entry->isValid = false; + } + } +} + + /* * Perform connection management activity after the end of a transaction. Both * COMMIT and ABORT paths are handled here. @@ -694,29 +715,14 @@ StartConnectionEstablishment(ConnectionHashKey *key) entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found); if (!found || !entry->isValid) { + /* avoid leaking memory in the keys and values arrays */ if (found && !entry->isValid) { - char **keyword = &entry->keywords[entry->nonGlobalParamStart]; - char **value = &entry->values[entry->nonGlobalParamStart]; - - while (*keyword != NULL) - { - pfree(*keyword); - keyword++; - } - - while (*value != NULL) - { - pfree(*value); - value++; - } - - pfree(entry->keywords); - pfree(entry->values); + FreeConnParamsHashEntryFields(entry); } /* if not found or not valid, compute them from GUC, runtime, etc. */ - GetConnParams(key, &entry->keywords, &entry->values, &entry->nonGlobalParamStart, + GetConnParams(key, &entry->keywords, &entry->values, &entry->runtimeParamStart, ConnectionContext); entry->isValid = true; @@ -748,6 +754,34 @@ StartConnectionEstablishment(ConnectionHashKey *key) } +/* + * FreeConnParamsHashEntryFields frees any dynamically allocated memory reachable + * from the fields of the provided ConnParamsHashEntry. This includes all runtime + * libpq keywords and values, as well as the actual arrays storing them. + */ +static void +FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry) +{ + char **keyword = &entry->keywords[entry->runtimeParamStart]; + char **value = &entry->values[entry->runtimeParamStart]; + + while (*keyword != NULL) + { + pfree(*keyword); + keyword++; + } + + while (*value != NULL) + { + pfree(*value); + value++; + } + + pfree(entry->keywords); + pfree(entry->values); +} + + /* * Close all remote connections if necessary anymore (i.e. not session * lifetime), or if in a failed state. diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 14159a3ae..352239c2f 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -3178,15 +3178,7 @@ CreateDistTableCache(void) void InvalidateMetadataSystemCache(void) { - ConnParamsHashEntry *entry = NULL; - HASH_SEQ_STATUS status; - - hash_seq_init(&status, ConnParamsHash); - - while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL) - { - entry->isValid = false; - } + InvalidateConnParamsHashEntries(); memset(&MetadataCache, 0, sizeof(MetadataCache)); workerNodeHashValid = false; diff --git a/src/backend/distributed/worker/task_tracker.c b/src/backend/distributed/worker/task_tracker.c index 8bd110007..3ff754e39 100644 --- a/src/backend/distributed/worker/task_tracker.c +++ b/src/backend/distributed/worker/task_tracker.c @@ -861,15 +861,7 @@ ManageWorkerTasksHash(HTAB *WorkerTasksHash) if (!WorkerTasksSharedState->conninfosValid) { - ConnParamsHashEntry *entry = NULL; - HASH_SEQ_STATUS status; - - hash_seq_init(&status, ConnParamsHash); - - while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL) - { - entry->isValid = false; - } + InvalidateConnParamsHashEntries(); } /* schedule new tasks if we have any */ diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 5964b95a9..b6b6b0f1e 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -118,7 +118,7 @@ typedef struct ConnParamsHashEntry { ConnectionHashKey key; bool isValid; - Index nonGlobalParamStart; + Index runtimeParamStart; char **keywords; char **values; } ConnParamsHashEntry; @@ -143,9 +143,10 @@ extern void InitializeConnectionManagement(void); extern void InitConnParams(void); extern void ResetConnParams(void); +extern void InvalidateConnParamsHashEntries(void); extern void AddConnParam(const char *keyword, const char *value); extern void GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, - Index *nonGlobalParamStart, MemoryContext context); + Index *runtimeParamStart, MemoryContext context); extern const char * GetConnParam(const char *keyword); extern bool CheckConninfo(const char *conninfo, const char **whitelist, Size whitelistLength, char **errmsg); diff --git a/src/test/regress/expected/multi_size_queries.out b/src/test/regress/expected/multi_size_queries.out index 648b81b78..27db22100 100644 --- a/src/test/regress/expected/multi_size_queries.out +++ b/src/test/regress/expected/multi_size_queries.out @@ -125,5 +125,46 @@ ALTER TABLE supplier ALTER COLUMN s_suppkey SET NOT NULL; select citus_table_size('supplier'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications END; +show citus.node_conninfo; + citus.node_conninfo +--------------------- + sslmode=require +(1 row) + +ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require'; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +-- make sure that any invalidation to the connection info +-- wouldn't prevent future commands to fail +SELECT citus_total_relation_size('customer_copy_hash'); + citus_total_relation_size +--------------------------- + 2646016 +(1 row) + +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +SELECT citus_total_relation_size('customer_copy_hash'); + citus_total_relation_size +--------------------------- + 2646016 +(1 row) + +-- reset back to the original node_conninfo +ALTER SYSTEM RESET citus.node_conninfo; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + DROP INDEX index_1; DROP INDEX index_2; diff --git a/src/test/regress/sql/multi_size_queries.sql b/src/test/regress/sql/multi_size_queries.sql index 8387667c2..220c4e18d 100644 --- a/src/test/regress/sql/multi_size_queries.sql +++ b/src/test/regress/sql/multi_size_queries.sql @@ -65,5 +65,19 @@ ALTER TABLE supplier ALTER COLUMN s_suppkey SET NOT NULL; select citus_table_size('supplier'); END; +show citus.node_conninfo; +ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require'; +SELECT pg_reload_conf(); + +-- make sure that any invalidation to the connection info +-- wouldn't prevent future commands to fail +SELECT citus_total_relation_size('customer_copy_hash'); +SELECT pg_reload_conf(); +SELECT citus_total_relation_size('customer_copy_hash'); + +-- reset back to the original node_conninfo +ALTER SYSTEM RESET citus.node_conninfo; +SELECT pg_reload_conf(); + DROP INDEX index_1; DROP INDEX index_2;