mirror of https://github.com/citusdata/citus.git
Merge pull request #1122 from citusdata/onder_update_ref
Refactor CheckShardPlacements() and improve support for node removalpull/1128/head
commit
efa01be99e
|
@ -307,11 +307,11 @@ GetConnectionFromPGconn(struct pg_conn *pqConn)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CloseNodeConnections closes all the connections to a particular node.
|
* CloseNodeConnectionsAfterTransaction sets the sessionLifespan flag of the connections
|
||||||
* This is mainly used when a worker leaves the cluster
|
* to a particular node as false. This is mainly used when a worker leaves the cluster.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
CloseNodeConnections(char *nodeName, int nodePort)
|
CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
|
||||||
{
|
{
|
||||||
HASH_SEQ_STATUS status;
|
HASH_SEQ_STATUS status;
|
||||||
ConnectionHashEntry *entry;
|
ConnectionHashEntry *entry;
|
||||||
|
@ -319,21 +319,21 @@ CloseNodeConnections(char *nodeName, int nodePort)
|
||||||
hash_seq_init(&status, ConnectionHash);
|
hash_seq_init(&status, ConnectionHash);
|
||||||
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
|
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
|
||||||
{
|
{
|
||||||
dlist_head *connections = entry->connections;
|
dlist_iter iter;
|
||||||
|
dlist_head *connections = NULL;
|
||||||
|
|
||||||
if (strcmp(entry->key.hostname, nodeName) != 0 || entry->key.port != nodePort)
|
if (strcmp(entry->key.hostname, nodeName) != 0 || entry->key.port != nodePort)
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (!dlist_is_empty(connections))
|
connections = entry->connections;
|
||||||
|
dlist_foreach(iter, connections)
|
||||||
{
|
{
|
||||||
dlist_node *currentNode = dlist_pop_head_node(connections);
|
|
||||||
|
|
||||||
MultiConnection *connection =
|
MultiConnection *connection =
|
||||||
dlist_container(MultiConnection, connectionNode, currentNode);
|
dlist_container(MultiConnection, connectionNode, iter.cur);
|
||||||
|
|
||||||
CloseConnection(connection);
|
connection->sessionLifespan = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -546,35 +546,20 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry,
|
||||||
{
|
{
|
||||||
ConnectionPlacementHashEntry *placementEntry =
|
ConnectionPlacementHashEntry *placementEntry =
|
||||||
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
|
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
|
||||||
dlist_iter referenceIter;
|
ConnectionReference *modifyingConnection = placementEntry->modifyingConnection;
|
||||||
|
MultiConnection *connection = NULL;
|
||||||
|
|
||||||
dlist_foreach(referenceIter, &placementEntry->connectionReferences)
|
/* we only consider shards that are modified */
|
||||||
{
|
if (modifyingConnection == NULL)
|
||||||
ConnectionReference *reference =
|
|
||||||
dlist_container(ConnectionReference, placementNode, referenceIter.cur);
|
|
||||||
MultiConnection *connection = reference->connection;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If neither DDL nor DML were executed, there's no need for
|
|
||||||
* invalidation.
|
|
||||||
*/
|
|
||||||
if (!reference->hadDDL && !reference->hadDML)
|
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
connection = modifyingConnection->connection;
|
||||||
* Failed if connection was closed, or remote transaction was
|
|
||||||
* unsuccessful.
|
|
||||||
*/
|
|
||||||
if (!connection || connection->remoteTransaction.transactionFailed)
|
if (!connection || connection->remoteTransaction.transactionFailed)
|
||||||
{
|
{
|
||||||
placementEntry->failed = true;
|
placementEntry->failed = true;
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (placementEntry->failed)
|
|
||||||
{
|
|
||||||
failures++;
|
failures++;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
|
@ -381,8 +381,8 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove)
|
||||||
|
|
||||||
nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
|
nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
|
||||||
|
|
||||||
/* make sure we don't have any open connections */
|
/* make sure we don't have any lingering session lifespan connections */
|
||||||
CloseNodeConnections(nodeName, nodePort);
|
CloseNodeConnectionsAfterTransaction(nodeName, nodePort);
|
||||||
|
|
||||||
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand);
|
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand);
|
||||||
}
|
}
|
||||||
|
|
|
@ -131,7 +131,7 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
|
||||||
const char *user,
|
const char *user,
|
||||||
const char *database);
|
const char *database);
|
||||||
extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn);
|
extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn);
|
||||||
extern void CloseNodeConnections(char *nodeName, int nodePort);
|
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
|
||||||
extern void CloseConnection(MultiConnection *connection);
|
extern void CloseConnection(MultiConnection *connection);
|
||||||
extern void CloseConnectionByPGconn(struct pg_conn *pqConn);
|
extern void CloseConnectionByPGconn(struct pg_conn *pqConn);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue