mirror of https://github.com/citusdata/citus.git
Refactor CheckShardPlacements() and improve support for node removal
This commit refactors CheckShardPlacements() so that it only considers modifyingConnection. Also, it skips nodes which are removed from the cluster.pull/1122/head
parent
dbaf7f0e7e
commit
aed5f817fa
|
@ -307,11 +307,11 @@ GetConnectionFromPGconn(struct pg_conn *pqConn)
|
|||
|
||||
|
||||
/*
|
||||
* CloseNodeConnections closes all the connections to a particular node.
|
||||
* This is mainly used when a worker leaves the cluster
|
||||
* CloseNodeConnectionsAfterTransaction sets the sessionLifespan flag of the connections
|
||||
* to a particular node as false. This is mainly used when a worker leaves the cluster.
|
||||
*/
|
||||
void
|
||||
CloseNodeConnections(char *nodeName, int nodePort)
|
||||
CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
ConnectionHashEntry *entry;
|
||||
|
@ -319,21 +319,21 @@ CloseNodeConnections(char *nodeName, int nodePort)
|
|||
hash_seq_init(&status, ConnectionHash);
|
||||
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
|
||||
{
|
||||
dlist_head *connections = entry->connections;
|
||||
dlist_iter iter;
|
||||
dlist_head *connections = NULL;
|
||||
|
||||
if (strcmp(entry->key.hostname, nodeName) != 0 || entry->key.port != nodePort)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
while (!dlist_is_empty(connections))
|
||||
connections = entry->connections;
|
||||
dlist_foreach(iter, connections)
|
||||
{
|
||||
dlist_node *currentNode = dlist_pop_head_node(connections);
|
||||
|
||||
MultiConnection *connection =
|
||||
dlist_container(MultiConnection, connectionNode, currentNode);
|
||||
dlist_container(MultiConnection, connectionNode, iter.cur);
|
||||
|
||||
CloseConnection(connection);
|
||||
connection->sessionLifespan = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -546,35 +546,20 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry,
|
|||
{
|
||||
ConnectionPlacementHashEntry *placementEntry =
|
||||
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
|
||||
dlist_iter referenceIter;
|
||||
ConnectionReference *modifyingConnection = placementEntry->modifyingConnection;
|
||||
MultiConnection *connection = NULL;
|
||||
|
||||
dlist_foreach(referenceIter, &placementEntry->connectionReferences)
|
||||
{
|
||||
ConnectionReference *reference =
|
||||
dlist_container(ConnectionReference, placementNode, referenceIter.cur);
|
||||
MultiConnection *connection = reference->connection;
|
||||
|
||||
/*
|
||||
* If neither DDL nor DML were executed, there's no need for
|
||||
* invalidation.
|
||||
*/
|
||||
if (!reference->hadDDL && !reference->hadDML)
|
||||
/* we only consider shards that are modified */
|
||||
if (modifyingConnection == NULL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* Failed if connection was closed, or remote transaction was
|
||||
* unsuccessful.
|
||||
*/
|
||||
connection = modifyingConnection->connection;
|
||||
|
||||
if (!connection || connection->remoteTransaction.transactionFailed)
|
||||
{
|
||||
placementEntry->failed = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (placementEntry->failed)
|
||||
{
|
||||
failures++;
|
||||
}
|
||||
else
|
||||
|
|
|
@ -381,8 +381,8 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove)
|
|||
|
||||
nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
|
||||
|
||||
/* make sure we don't have any open connections */
|
||||
CloseNodeConnections(nodeName, nodePort);
|
||||
/* make sure we don't have any lingering session lifespan connections */
|
||||
CloseNodeConnectionsAfterTransaction(nodeName, nodePort);
|
||||
|
||||
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand);
|
||||
}
|
||||
|
|
|
@ -131,7 +131,7 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
|
|||
const char *user,
|
||||
const char *database);
|
||||
extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn);
|
||||
extern void CloseNodeConnections(char *nodeName, int nodePort);
|
||||
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
|
||||
extern void CloseConnection(MultiConnection *connection);
|
||||
extern void CloseConnectionByPGconn(struct pg_conn *pqConn);
|
||||
|
||||
|
|
Loading…
Reference in New Issue