From 7bff12571d8da87cd6ada475e2f7f19d401df6e5 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 30 Mar 2020 13:50:17 +0200 Subject: [PATCH] Implement master_update_node --- .../connection/shared_connection_stats.c | 49 ++++++++++++++++--- .../distributed/metadata/node_metadata.c | 18 ++++++- .../distributed/shared_connection_stats.h | 1 + src/test/regress/sql/node_conninfo_reload.sql | 9 +--- 4 files changed, 59 insertions(+), 18 deletions(-) diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index e35925fbc..0cad7efdf 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -152,12 +152,7 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) Datum values[REMOTE_CONNECTION_STATS_COLUMNS]; bool isNulls[REMOTE_CONNECTION_STATS_COLUMNS]; - /* - * TODO: We should not do all the iterations/operations while - * holding the spinlock. - */ - - /* we're reading all distributed transactions, prevent new backends */ + /* we're reading all shared connections, prevent any changes */ LockConnectionSharedMemory(LW_SHARED); HASH_SEQ_STATUS status; @@ -189,6 +184,29 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) } +/* + * RemoveAllSharedConnectionEntriesForNode gets a nodename and nodeport, and removes + * the corresponding entry in the shared memory for the current database. + */ +void +RemoveAllSharedConnectionEntriesForNode(char *hostname, int port) +{ + SharedConnStatsHashKey connKey; + + connKey.databaseOid = MyDatabaseId; + connKey.port = port; + strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); + + /* we're reading all shared connections, prevent any changes */ + LockConnectionSharedMemory(LW_EXCLUSIVE); + + bool entryFound = false; + hash_search(SharedConnStatsHash, &connKey, HASH_REMOVE, &entryFound); + + UnLockConnectionSharedMemory(); +} + + /* * GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled * via a GUC. @@ -386,8 +404,23 @@ DecrementSharedConnectionCounter(const char *hostname, int port) SharedConnStatsHashEntry *connectionEntry = hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound); - /* we should never decrement for non-existing connections */ - Assert((connectionEntry && entryFound && connectionEntry->connectionCount > 0)); + /* this worker node is removed or updated */ + if (!entryFound) + { + UnLockConnectionSharedMemory(); + + return; + } + + /* + * We might have some connection lingering for worker nodes + * that are removed/updated. + */ + if (connectionEntry->connectionCount <= 0) + { + UnLockConnectionSharedMemory(); + return; + } connectionEntry->connectionCount -= 1; diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 2b869d4f9..e87848ed8 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -39,6 +39,7 @@ #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" +#include "distributed/shared_connection_stats.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" #include "lib/stringinfo.h" @@ -651,6 +652,7 @@ master_update_node(PG_FUNCTION_ARGS) errmsg("node %u not found", nodeId))); } + /* * If the node is a primary node we block reads and writes. * @@ -692,10 +694,22 @@ master_update_node(PG_FUNCTION_ARGS) LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock); } + char *oldWorkerName = pstrdup(workerNode->workerName); + int oldWorkerPort = workerNode->workerPort; + + /* make sure we don't have any lingering session lifespan connections */ + CloseNodeConnectionsAfterTransaction(oldWorkerName, oldWorkerPort); + + /* + * We're not sure that there is no concurrent queries going on, so remove all + * entries in shared connection stats for the current database. + */ + RemoveAllSharedConnectionEntriesForNode(oldWorkerName, oldWorkerPort); + UpdateNodeLocation(nodeId, newNodeNameString, newNodePort); - strlcpy(workerNode->workerName, newNodeNameString, WORKER_LENGTH); - workerNode->workerPort = newNodePort; + workerNode = FindWorkerNode(newNodeNameString, newNodePort); + Assert(workerNode->nodeId == nodeId); /* * Propagate the updated pg_dist_node entry to all metadata workers. diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index b2265e678..78ea95183 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -16,6 +16,7 @@ extern int ConnectionRetryTimout; extern void InitializeSharedConnectionStats(void); +extern void RemoveAllSharedConnectionEntriesForNode(char *hostname, int port); extern int GetMaxSharedPoolSize(void); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); extern void WaitOrErrorForSharedConnection(const char *hostname, int port); diff --git a/src/test/regress/sql/node_conninfo_reload.sql b/src/test/regress/sql/node_conninfo_reload.sql index dda8dae21..3754130a7 100644 --- a/src/test/regress/sql/node_conninfo_reload.sql +++ b/src/test/regress/sql/node_conninfo_reload.sql @@ -10,7 +10,6 @@ select create_distributed_table('test', 'a'); -- Make sure a connection is opened and cached select count(*) from test where a = 0; - show citus.node_conninfo; -- Set sslmode to something that does not work when connecting @@ -30,8 +29,8 @@ show citus.node_conninfo; -- Should work again select count(*) from test where a = 0; - ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist'; + BEGIN; -- Should still work (no SIGHUP yet); select count(*) from test where a = 0; @@ -43,19 +42,15 @@ show citus.node_conninfo; -- query select count(*) from test where a = 0; COMMIT; - -- Should fail now with connection error, when transaction is finished select count(*) from test where a = 0; - -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; select pg_reload_conf(); select pg_sleep(0.1); -- wait for config reload to apply show citus.node_conninfo; - -- Should work again select count(*) from test where a = 0; - ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist'; BEGIN; -- Should still work (no SIGHUP yet); @@ -68,10 +63,8 @@ show citus.node_conninfo; -- query select count(*) from test where a = 0; COMMIT; - -- Should fail now, when transaction is finished select count(*) from test where a = 0; - -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; select pg_reload_conf();