mirror of https://github.com/citusdata/citus.git
Implement master_update_node
parent
531de5d848
commit
7bff12571d
|
@ -152,12 +152,7 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
|
||||||
Datum values[REMOTE_CONNECTION_STATS_COLUMNS];
|
Datum values[REMOTE_CONNECTION_STATS_COLUMNS];
|
||||||
bool isNulls[REMOTE_CONNECTION_STATS_COLUMNS];
|
bool isNulls[REMOTE_CONNECTION_STATS_COLUMNS];
|
||||||
|
|
||||||
/*
|
/* we're reading all shared connections, prevent any changes */
|
||||||
* TODO: We should not do all the iterations/operations while
|
|
||||||
* holding the spinlock.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/* we're reading all distributed transactions, prevent new backends */
|
|
||||||
LockConnectionSharedMemory(LW_SHARED);
|
LockConnectionSharedMemory(LW_SHARED);
|
||||||
|
|
||||||
HASH_SEQ_STATUS status;
|
HASH_SEQ_STATUS status;
|
||||||
|
@ -189,6 +184,29 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoveAllSharedConnectionEntriesForNode gets a nodename and nodeport, and removes
|
||||||
|
* the corresponding entry in the shared memory for the current database.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
RemoveAllSharedConnectionEntriesForNode(char *hostname, int port)
|
||||||
|
{
|
||||||
|
SharedConnStatsHashKey connKey;
|
||||||
|
|
||||||
|
connKey.databaseOid = MyDatabaseId;
|
||||||
|
connKey.port = port;
|
||||||
|
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
|
||||||
|
|
||||||
|
/* we're reading all shared connections, prevent any changes */
|
||||||
|
LockConnectionSharedMemory(LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
bool entryFound = false;
|
||||||
|
hash_search(SharedConnStatsHash, &connKey, HASH_REMOVE, &entryFound);
|
||||||
|
|
||||||
|
UnLockConnectionSharedMemory();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled
|
* GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled
|
||||||
* via a GUC.
|
* via a GUC.
|
||||||
|
@ -386,8 +404,23 @@ DecrementSharedConnectionCounter(const char *hostname, int port)
|
||||||
SharedConnStatsHashEntry *connectionEntry =
|
SharedConnStatsHashEntry *connectionEntry =
|
||||||
hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound);
|
hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound);
|
||||||
|
|
||||||
/* we should never decrement for non-existing connections */
|
/* this worker node is removed or updated */
|
||||||
Assert((connectionEntry && entryFound && connectionEntry->connectionCount > 0));
|
if (!entryFound)
|
||||||
|
{
|
||||||
|
UnLockConnectionSharedMemory();
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We might have some connection lingering for worker nodes
|
||||||
|
* that are removed/updated.
|
||||||
|
*/
|
||||||
|
if (connectionEntry->connectionCount <= 0)
|
||||||
|
{
|
||||||
|
UnLockConnectionSharedMemory();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
connectionEntry->connectionCount -= 1;
|
connectionEntry->connectionCount -= 1;
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,7 @@
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
|
#include "distributed/shared_connection_stats.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
|
@ -651,6 +652,7 @@ master_update_node(PG_FUNCTION_ARGS)
|
||||||
errmsg("node %u not found", nodeId)));
|
errmsg("node %u not found", nodeId)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If the node is a primary node we block reads and writes.
|
* If the node is a primary node we block reads and writes.
|
||||||
*
|
*
|
||||||
|
@ -692,10 +694,22 @@ master_update_node(PG_FUNCTION_ARGS)
|
||||||
LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock);
|
LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char *oldWorkerName = pstrdup(workerNode->workerName);
|
||||||
|
int oldWorkerPort = workerNode->workerPort;
|
||||||
|
|
||||||
|
/* make sure we don't have any lingering session lifespan connections */
|
||||||
|
CloseNodeConnectionsAfterTransaction(oldWorkerName, oldWorkerPort);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We're not sure that there is no concurrent queries going on, so remove all
|
||||||
|
* entries in shared connection stats for the current database.
|
||||||
|
*/
|
||||||
|
RemoveAllSharedConnectionEntriesForNode(oldWorkerName, oldWorkerPort);
|
||||||
|
|
||||||
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
|
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
|
||||||
|
|
||||||
strlcpy(workerNode->workerName, newNodeNameString, WORKER_LENGTH);
|
workerNode = FindWorkerNode(newNodeNameString, newNodePort);
|
||||||
workerNode->workerPort = newNodePort;
|
Assert(workerNode->nodeId == nodeId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Propagate the updated pg_dist_node entry to all metadata workers.
|
* Propagate the updated pg_dist_node entry to all metadata workers.
|
||||||
|
|
|
@ -16,6 +16,7 @@ extern int ConnectionRetryTimout;
|
||||||
|
|
||||||
|
|
||||||
extern void InitializeSharedConnectionStats(void);
|
extern void InitializeSharedConnectionStats(void);
|
||||||
|
extern void RemoveAllSharedConnectionEntriesForNode(char *hostname, int port);
|
||||||
extern int GetMaxSharedPoolSize(void);
|
extern int GetMaxSharedPoolSize(void);
|
||||||
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
|
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
|
||||||
extern void WaitOrErrorForSharedConnection(const char *hostname, int port);
|
extern void WaitOrErrorForSharedConnection(const char *hostname, int port);
|
||||||
|
|
|
@ -10,7 +10,6 @@ select create_distributed_table('test', 'a');
|
||||||
|
|
||||||
-- Make sure a connection is opened and cached
|
-- Make sure a connection is opened and cached
|
||||||
select count(*) from test where a = 0;
|
select count(*) from test where a = 0;
|
||||||
|
|
||||||
show citus.node_conninfo;
|
show citus.node_conninfo;
|
||||||
|
|
||||||
-- Set sslmode to something that does not work when connecting
|
-- Set sslmode to something that does not work when connecting
|
||||||
|
@ -30,8 +29,8 @@ show citus.node_conninfo;
|
||||||
|
|
||||||
-- Should work again
|
-- Should work again
|
||||||
select count(*) from test where a = 0;
|
select count(*) from test where a = 0;
|
||||||
|
|
||||||
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist';
|
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist';
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
-- Should still work (no SIGHUP yet);
|
-- Should still work (no SIGHUP yet);
|
||||||
select count(*) from test where a = 0;
|
select count(*) from test where a = 0;
|
||||||
|
@ -43,19 +42,15 @@ show citus.node_conninfo;
|
||||||
-- query
|
-- query
|
||||||
select count(*) from test where a = 0;
|
select count(*) from test where a = 0;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
-- Should fail now with connection error, when transaction is finished
|
-- Should fail now with connection error, when transaction is finished
|
||||||
select count(*) from test where a = 0;
|
select count(*) from test where a = 0;
|
||||||
|
|
||||||
-- Reset it again
|
-- Reset it again
|
||||||
ALTER SYSTEM RESET citus.node_conninfo;
|
ALTER SYSTEM RESET citus.node_conninfo;
|
||||||
select pg_reload_conf();
|
select pg_reload_conf();
|
||||||
select pg_sleep(0.1); -- wait for config reload to apply
|
select pg_sleep(0.1); -- wait for config reload to apply
|
||||||
show citus.node_conninfo;
|
show citus.node_conninfo;
|
||||||
|
|
||||||
-- Should work again
|
-- Should work again
|
||||||
select count(*) from test where a = 0;
|
select count(*) from test where a = 0;
|
||||||
|
|
||||||
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist';
|
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=doesnotexist';
|
||||||
BEGIN;
|
BEGIN;
|
||||||
-- Should still work (no SIGHUP yet);
|
-- Should still work (no SIGHUP yet);
|
||||||
|
@ -68,10 +63,8 @@ show citus.node_conninfo;
|
||||||
-- query
|
-- query
|
||||||
select count(*) from test where a = 0;
|
select count(*) from test where a = 0;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
-- Should fail now, when transaction is finished
|
-- Should fail now, when transaction is finished
|
||||||
select count(*) from test where a = 0;
|
select count(*) from test where a = 0;
|
||||||
|
|
||||||
-- Reset it again
|
-- Reset it again
|
||||||
ALTER SYSTEM RESET citus.node_conninfo;
|
ALTER SYSTEM RESET citus.node_conninfo;
|
||||||
select pg_reload_conf();
|
select pg_reload_conf();
|
||||||
|
|
Loading…
Reference in New Issue