From aed5f817fadebd55e99f3d35d9e525dc8e15fb6a Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 12 Jan 2017 03:16:56 +0200 Subject: [PATCH] Refactor CheckShardPlacements() and improve support for node removal This commit refactors CheckShardPlacements() so that it only considers modifyingConnection. Also, it skips nodes which are removed from the cluster. --- .../connection/connection_management.c | 18 +++++----- .../connection/placement_connection.c | 33 +++++-------------- src/backend/distributed/utils/node_metadata.c | 4 +-- .../distributed/connection_management.h | 2 +- 4 files changed, 21 insertions(+), 36 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 2ce448332..22e391b4f 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -307,11 +307,11 @@ GetConnectionFromPGconn(struct pg_conn *pqConn) /* - * CloseNodeConnections closes all the connections to a particular node. - * This is mainly used when a worker leaves the cluster + * CloseNodeConnectionsAfterTransaction sets the sessionLifespan flag of the connections + * to a particular node as false. This is mainly used when a worker leaves the cluster. */ void -CloseNodeConnections(char *nodeName, int nodePort) +CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort) { HASH_SEQ_STATUS status; ConnectionHashEntry *entry; @@ -319,21 +319,21 @@ CloseNodeConnections(char *nodeName, int nodePort) hash_seq_init(&status, ConnectionHash); while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0) { - dlist_head *connections = entry->connections; + dlist_iter iter; + dlist_head *connections = NULL; if (strcmp(entry->key.hostname, nodeName) != 0 || entry->key.port != nodePort) { continue; } - while (!dlist_is_empty(connections)) + connections = entry->connections; + dlist_foreach(iter, connections) { - dlist_node *currentNode = dlist_pop_head_node(connections); - MultiConnection *connection = - dlist_container(MultiConnection, connectionNode, currentNode); + dlist_container(MultiConnection, connectionNode, iter.cur); - CloseConnection(connection); + connection->sessionLifespan = false; } } } diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 0b3d7e78b..4ad5bb947 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -546,35 +546,20 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry, { ConnectionPlacementHashEntry *placementEntry = dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur); - dlist_iter referenceIter; + ConnectionReference *modifyingConnection = placementEntry->modifyingConnection; + MultiConnection *connection = NULL; - dlist_foreach(referenceIter, &placementEntry->connectionReferences) + /* we only consider shards that are modified */ + if (modifyingConnection == NULL) { - ConnectionReference *reference = - dlist_container(ConnectionReference, placementNode, referenceIter.cur); - MultiConnection *connection = reference->connection; - - /* - * If neither DDL nor DML were executed, there's no need for - * invalidation. - */ - if (!reference->hadDDL && !reference->hadDML) - { - continue; - } - - /* - * Failed if connection was closed, or remote transaction was - * unsuccessful. - */ - if (!connection || connection->remoteTransaction.transactionFailed) - { - placementEntry->failed = true; - } + continue; } - if (placementEntry->failed) + connection = modifyingConnection->connection; + + if (!connection || connection->remoteTransaction.transactionFailed) { + placementEntry->failed = true; failures++; } else diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 263b11d74..ebcc52215 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -381,8 +381,8 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); - /* make sure we don't have any open connections */ - CloseNodeConnections(nodeName, nodePort); + /* make sure we don't have any lingering session lifespan connections */ + CloseNodeConnectionsAfterTransaction(nodeName, nodePort); SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand); } diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index c3967365d..11b023630 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -131,7 +131,7 @@ extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, const char *user, const char *database); extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn); -extern void CloseNodeConnections(char *nodeName, int nodePort); +extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); extern void CloseConnection(MultiConnection *connection); extern void CloseConnectionByPGconn(struct pg_conn *pqConn);