mirror of https://github.com/citusdata/citus.git
Address code review comments
parent
04aa34da68
commit
a2c6f596f9
|
@ -85,21 +85,10 @@ ResetConnParams()
|
|||
ConnParams.keywords[paramIdx] = ConnParams.values[paramIdx] = NULL;
|
||||
}
|
||||
|
||||
if (ConnParamsHash != NULL)
|
||||
{
|
||||
ConnParamsHashEntry *entry = NULL;
|
||||
HASH_SEQ_STATUS status;
|
||||
|
||||
hash_seq_init(&status, ConnParamsHash);
|
||||
while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
entry->isValid = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ConnParams.size = 0;
|
||||
|
||||
InvalidateConnParamsHashEntries();
|
||||
|
||||
AddConnParam("fallback_application_name", "citus");
|
||||
}
|
||||
|
||||
|
@ -238,7 +227,7 @@ CheckConninfo(const char *conninfo, const char **whitelist,
|
|||
*/
|
||||
void
|
||||
GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
||||
Index *nonGlobalParamStart, MemoryContext context)
|
||||
Index *runtimeParamStart, MemoryContext context)
|
||||
{
|
||||
/* make space for the port as a string: sign, 10 digits, NUL */
|
||||
char *nodePortString = MemoryContextAlloc(context, 12 * sizeof(char *));
|
||||
|
@ -304,6 +293,9 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
|||
connValues[paramIndex] = ConnParams.values[paramIndex];
|
||||
}
|
||||
|
||||
/* remember where global/GUC params end and runtime ones start */
|
||||
*runtimeParamStart = ConnParams.size;
|
||||
|
||||
/* second step: begin at end of global params and copy runtime ones */
|
||||
for (runtimeParamIndex = 0;
|
||||
runtimeParamIndex < lengthof(runtimeKeywords);
|
||||
|
@ -321,7 +313,6 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
|||
|
||||
*keywords = connKeywords;
|
||||
*values = connValues;
|
||||
*nonGlobalParamStart = ConnParams.size;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ MemoryContext ConnectionContext = NULL;
|
|||
static uint32 ConnectionHashHash(const void *key, Size keysize);
|
||||
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
|
||||
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key);
|
||||
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
|
||||
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
|
||||
static void DefaultCitusNoticeProcessor(void *arg, const char *message);
|
||||
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
|
||||
|
@ -91,6 +92,26 @@ InitializeConnectionManagement(void)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* InvalidateConnParamsHashEntries sets every hash entry's isValid flag to false.
|
||||
*/
|
||||
void
|
||||
InvalidateConnParamsHashEntries(void)
|
||||
{
|
||||
if (ConnParamsHash != NULL)
|
||||
{
|
||||
ConnParamsHashEntry *entry = NULL;
|
||||
HASH_SEQ_STATUS status;
|
||||
|
||||
hash_seq_init(&status, ConnParamsHash);
|
||||
while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
entry->isValid = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Perform connection management activity after the end of a transaction. Both
|
||||
* COMMIT and ABORT paths are handled here.
|
||||
|
@ -694,29 +715,14 @@ StartConnectionEstablishment(ConnectionHashKey *key)
|
|||
entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found);
|
||||
if (!found || !entry->isValid)
|
||||
{
|
||||
/* avoid leaking memory in the keys and values arrays */
|
||||
if (found && !entry->isValid)
|
||||
{
|
||||
char **keyword = &entry->keywords[entry->nonGlobalParamStart];
|
||||
char **value = &entry->values[entry->nonGlobalParamStart];
|
||||
|
||||
while (*keyword != NULL)
|
||||
{
|
||||
pfree(*keyword);
|
||||
keyword++;
|
||||
}
|
||||
|
||||
while (*value != NULL)
|
||||
{
|
||||
pfree(*value);
|
||||
value++;
|
||||
}
|
||||
|
||||
pfree(entry->keywords);
|
||||
pfree(entry->values);
|
||||
FreeConnParamsHashEntryFields(entry);
|
||||
}
|
||||
|
||||
/* if not found or not valid, compute them from GUC, runtime, etc. */
|
||||
GetConnParams(key, &entry->keywords, &entry->values, &entry->nonGlobalParamStart,
|
||||
GetConnParams(key, &entry->keywords, &entry->values, &entry->runtimeParamStart,
|
||||
ConnectionContext);
|
||||
|
||||
entry->isValid = true;
|
||||
|
@ -748,6 +754,34 @@ StartConnectionEstablishment(ConnectionHashKey *key)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* FreeConnParamsHashEntryFields frees any dynamically allocated memory reachable
|
||||
* from the fields of the provided ConnParamsHashEntry. This includes all runtime
|
||||
* libpq keywords and values, as well as the actual arrays storing them.
|
||||
*/
|
||||
static void
|
||||
FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry)
|
||||
{
|
||||
char **keyword = &entry->keywords[entry->runtimeParamStart];
|
||||
char **value = &entry->values[entry->runtimeParamStart];
|
||||
|
||||
while (*keyword != NULL)
|
||||
{
|
||||
pfree(*keyword);
|
||||
keyword++;
|
||||
}
|
||||
|
||||
while (*value != NULL)
|
||||
{
|
||||
pfree(*value);
|
||||
value++;
|
||||
}
|
||||
|
||||
pfree(entry->keywords);
|
||||
pfree(entry->values);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Close all remote connections if necessary anymore (i.e. not session
|
||||
* lifetime), or if in a failed state.
|
||||
|
|
|
@ -3178,15 +3178,7 @@ CreateDistTableCache(void)
|
|||
void
|
||||
InvalidateMetadataSystemCache(void)
|
||||
{
|
||||
ConnParamsHashEntry *entry = NULL;
|
||||
HASH_SEQ_STATUS status;
|
||||
|
||||
hash_seq_init(&status, ConnParamsHash);
|
||||
|
||||
while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
entry->isValid = false;
|
||||
}
|
||||
InvalidateConnParamsHashEntries();
|
||||
|
||||
memset(&MetadataCache, 0, sizeof(MetadataCache));
|
||||
workerNodeHashValid = false;
|
||||
|
|
|
@ -861,15 +861,7 @@ ManageWorkerTasksHash(HTAB *WorkerTasksHash)
|
|||
|
||||
if (!WorkerTasksSharedState->conninfosValid)
|
||||
{
|
||||
ConnParamsHashEntry *entry = NULL;
|
||||
HASH_SEQ_STATUS status;
|
||||
|
||||
hash_seq_init(&status, ConnParamsHash);
|
||||
|
||||
while ((entry = (ConnParamsHashEntry *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
entry->isValid = false;
|
||||
}
|
||||
InvalidateConnParamsHashEntries();
|
||||
}
|
||||
|
||||
/* schedule new tasks if we have any */
|
||||
|
|
|
@ -118,7 +118,7 @@ typedef struct ConnParamsHashEntry
|
|||
{
|
||||
ConnectionHashKey key;
|
||||
bool isValid;
|
||||
Index nonGlobalParamStart;
|
||||
Index runtimeParamStart;
|
||||
char **keywords;
|
||||
char **values;
|
||||
} ConnParamsHashEntry;
|
||||
|
@ -143,9 +143,10 @@ extern void InitializeConnectionManagement(void);
|
|||
|
||||
extern void InitConnParams(void);
|
||||
extern void ResetConnParams(void);
|
||||
extern void InvalidateConnParamsHashEntries(void);
|
||||
extern void AddConnParam(const char *keyword, const char *value);
|
||||
extern void GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
||||
Index *nonGlobalParamStart, MemoryContext context);
|
||||
Index *runtimeParamStart, MemoryContext context);
|
||||
extern const char * GetConnParam(const char *keyword);
|
||||
extern bool CheckConninfo(const char *conninfo, const char **whitelist,
|
||||
Size whitelistLength, char **errmsg);
|
||||
|
|
|
@ -125,5 +125,46 @@ ALTER TABLE supplier ALTER COLUMN s_suppkey SET NOT NULL;
|
|||
select citus_table_size('supplier');
|
||||
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
|
||||
END;
|
||||
show citus.node_conninfo;
|
||||
citus.node_conninfo
|
||||
---------------------
|
||||
sslmode=require
|
||||
(1 row)
|
||||
|
||||
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require';
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
----------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- make sure that any invalidation to the connection info
|
||||
-- wouldn't prevent future commands to fail
|
||||
SELECT citus_total_relation_size('customer_copy_hash');
|
||||
citus_total_relation_size
|
||||
---------------------------
|
||||
2646016
|
||||
(1 row)
|
||||
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
----------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT citus_total_relation_size('customer_copy_hash');
|
||||
citus_total_relation_size
|
||||
---------------------------
|
||||
2646016
|
||||
(1 row)
|
||||
|
||||
-- reset back to the original node_conninfo
|
||||
ALTER SYSTEM RESET citus.node_conninfo;
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
----------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
DROP INDEX index_1;
|
||||
DROP INDEX index_2;
|
||||
|
|
|
@ -65,5 +65,19 @@ ALTER TABLE supplier ALTER COLUMN s_suppkey SET NOT NULL;
|
|||
select citus_table_size('supplier');
|
||||
END;
|
||||
|
||||
show citus.node_conninfo;
|
||||
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require';
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
-- make sure that any invalidation to the connection info
|
||||
-- wouldn't prevent future commands to fail
|
||||
SELECT citus_total_relation_size('customer_copy_hash');
|
||||
SELECT pg_reload_conf();
|
||||
SELECT citus_total_relation_size('customer_copy_hash');
|
||||
|
||||
-- reset back to the original node_conninfo
|
||||
ALTER SYSTEM RESET citus.node_conninfo;
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
DROP INDEX index_1;
|
||||
DROP INDEX index_2;
|
||||
|
|
Loading…
Reference in New Issue