From fb88b167a7913b36b41ee67ee866ba564a6051da Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Tue, 8 Nov 2016 11:47:12 -0800 Subject: [PATCH] Propagate node add/remove to the nodes with hasmetadata=true This change propagates the changes done by `master_add_node` and `master_remove_node` to the workers that contain metadata. --- .../transaction/worker_transaction.c | 26 ++- src/backend/distributed/utils/node_metadata.c | 41 +++- src/include/distributed/worker_transaction.h | 1 + .../expected/multi_cluster_management.out | 206 ++++++++++++++++++ .../regress/sql/multi_cluster_management.sql | 81 +++++++ 5 files changed, 345 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index ef64723ab..ba5cebdb4 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -63,9 +63,6 @@ GetWorkerTransactions(void) workerConnectionList = OpenWorkerTransactions(); } - /* ensure that number of workers has not change */ - Assert(list_length(WorkerNodeList()) == list_length(workerConnectionList)); - return workerConnectionList; } @@ -267,6 +264,29 @@ IsWorkerTransactionActive(void) } +/* + * RemoveWorkerTransaction removes the transaction connection to the specified node from + * the transaction connection list. + */ +void +RemoveWorkerTransaction(char *nodeName, int32 nodePort) +{ + TransactionConnection *transactionConnection = + GetWorkerTransaction(nodeName, nodePort); + + /* transactionConnection = NULL if the worker transactions have not opened before */ + if (transactionConnection != NULL) + { + PGconn *connection = transactionConnection->connection; + + /* closing the connection will rollback all uncommited transactions */ + PQfinish(connection); + + workerConnectionList = list_delete(workerConnectionList, transactionConnection); + } +} + + /* * EnableXactCallback registers the CompleteWorkerTransactions function as the callback * of the worker transactions. diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 9bdf0b592..f6379ca58 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -26,8 +26,10 @@ #include "distributed/master_protocol.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/pg_dist_node.h" #include "distributed/worker_manager.h" +#include "distributed/worker_transaction.h" #include "lib/stringinfo.h" #include "storage/lock.h" #include "storage/fd.h" @@ -81,7 +83,8 @@ master_add_node(PG_FUNCTION_ARGS) /* - * master_remove_node function removes the provided node from the pg_dist_node table. + * master_remove_node function removes the provided node from the pg_dist_node table of + * the master node and all nodes with metadata. * The call to the master_remove_node should be done by the super user and the specified * node should not have any active placements. */ @@ -91,7 +94,9 @@ master_remove_node(PG_FUNCTION_ARGS) text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); + char *nodeDeleteCommand = NULL; bool hasShardPlacements = false; + WorkerNode *workerNode = NULL; EnsureSuperUser(); @@ -102,8 +107,16 @@ master_remove_node(PG_FUNCTION_ARGS) "shard placements"))); } + workerNode = FindWorkerNode(nodeNameString, nodePort); + DeleteNodeRow(nodeNameString, nodePort); + nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); + + RemoveWorkerTransaction(nodeNameString, nodePort); + + SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand); + PG_RETURN_VOID(); } @@ -196,12 +209,12 @@ ReadWorkerNodes() /* * AddNodeMetadata checks the given node information and adds the specified node to the - * pg_dist_node table. If the node already exists, the function returns with the - * information about the node. If not, the following prodecure is followed while adding a - * node. - * If the groupId is not explicitly given by the user, the function picks the - * group that the new node should be in with respect to GroupSize. Then, the - * new node is inserted into the local pg_dist_node. + * pg_dist_node table of the master and workers with metadata. + * If the node already exists, the function returns the information about the node. + * If not, the following prodecure is followed while adding a node: If the groupId is not + * explicitly given by the user, the function picks the group that the new node should + * be in with respect to GroupSize. Then, the new node is inserted into the local + * pg_dist_node as well as the nodes with hasmetadata=true. */ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, @@ -211,6 +224,9 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, int nextNodeIdInt = 0; Datum returnData = 0; WorkerNode *workerNode = NULL; + char *nodeDeleteCommand = NULL; + char *nodeInsertCommand = NULL; + List *workerNodeList = NIL; EnsureSuperUser(); @@ -250,6 +266,17 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata); + workerNode = FindWorkerNode(nodeName, nodePort); + + /* send the delete command all nodes with metadata */ + nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); + SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand); + + /* finally prepare the insert command and send it to all primary nodes */ + workerNodeList = list_make1(workerNode); + nodeInsertCommand = NodeListInsertCommand(workerNodeList); + SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand); + heap_close(pgDistNode, AccessExclusiveLock); /* fetch the worker node, and generate the output */ diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 9fa28c335..871c1ac4c 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -35,6 +35,7 @@ extern void SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *co const char *const *parameterValues); extern void SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char *nodeUser, List *commandList); +extern void RemoveWorkerTransaction(char *nodeName, int32 nodePort); /* helper functions for worker transactions */ extern bool IsWorkerTransactionActive(void); diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 1fe320480..48dfb6130 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -132,3 +132,209 @@ SELECT master_add_node('localhost', :worker_2_port); UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port; DROP TABLE cluster_management_test; +-- check that adding/removing nodes are propagated to nodes with hasmetadata=true +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port; +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +--------------------------------- + (5,5,localhost,57638,default,f) +(1 row) + +\c - - - :worker_1_port +SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port; + nodename | nodeport +-----------+---------- + localhost | 57638 +(1 row) + +\c - - - :master_port +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +\c - - - :worker_1_port +SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port; + nodename | nodeport +----------+---------- +(0 rows) + +\c - - - :master_port +-- check that added nodes are not propagated to nodes with hasmetadata=false +UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port; +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +--------------------------------- + (6,6,localhost,57638,default,f) +(1 row) + +\c - - - :worker_1_port +SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port; + nodename | nodeport +----------+---------- +(0 rows) + +\c - - - :master_port +-- check that removing two nodes in the same transaction works +SELECT + master_remove_node('localhost', :worker_1_port), + master_remove_node('localhost', :worker_2_port); + master_remove_node | master_remove_node +--------------------+-------------------- + | +(1 row) + +SELECT * FROM pg_dist_node ORDER BY nodeid; + nodeid | groupid | nodename | nodeport | noderack | hasmetadata +--------+---------+----------+----------+----------+------------- +(0 rows) + +-- check that adding two nodes in the same transaction works +SELECT + master_add_node('localhost', :worker_1_port), + master_add_node('localhost', :worker_2_port); + master_add_node | master_add_node +---------------------------------+--------------------------------- + (7,7,localhost,57637,default,f) | (8,8,localhost,57638,default,f) +(1 row) + +SELECT * FROM pg_dist_node ORDER BY nodeid; + nodeid | groupid | nodename | nodeport | noderack | hasmetadata +--------+---------+-----------+----------+----------+------------- + 7 | 7 | localhost | 57637 | default | f + 8 | 8 | localhost | 57638 | default | f +(2 rows) + +-- check that mixed add/remove node commands work fine inside transaction +BEGIN; +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +--------------------------------- + (9,9,localhost,57638,default,f) +(1 row) + +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +COMMIT; +SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port; + nodename | nodeport +----------+---------- +(0 rows) + +UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port; +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +----------------------------------- + (10,10,localhost,57638,default,f) +(1 row) + +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +----------------------------------- + (11,11,localhost,57638,default,f) +(1 row) + +COMMIT; +SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port; + nodename | nodeport +-----------+---------- + localhost | 57638 +(1 row) + +\c - - - :worker_1_port +SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port; + nodename | nodeport +-----------+---------- + localhost | 57638 +(1 row) + +\c - - - :master_port +SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; + master_remove_node +-------------------- + + +(2 rows) + +SELECT master_add_node('localhost', :worker_1_port); + master_add_node +----------------------------------- + (12,12,localhost,57637,default,f) +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +----------------------------------- + (13,13,localhost,57638,default,f) +(1 row) + +-- check that a distributed table can be created after adding a node in a transaction +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +----------------------------------- + (14,14,localhost,57638,default,f) +(1 row) + +CREATE TABLE temp(col1 text, col2 int); +SELECT create_distributed_table('temp', 'col1'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO temp VALUES ('row1', 1); +INSERT INTO temp VALUES ('row2', 2); +COMMIT; +SELECT col1, col2 FROM temp ORDER BY col1; + col1 | col2 +------+------ + row1 | 1 + row2 | 2 +(2 rows) + +SELECT + count(*) +FROM + pg_dist_shard_placement, pg_dist_shard +WHERE + pg_dist_shard_placement.shardid = pg_dist_shard.shardid + AND pg_dist_shard.logicalrelid = 'temp'::regclass + AND pg_dist_shard_placement.nodeport = :worker_2_port; + count +------- + 32 +(1 row) + + +DROP TABLE temp; diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index 9225bc2c3..7db69889d 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -47,3 +47,84 @@ SELECT master_get_active_worker_nodes(); SELECT master_add_node('localhost', :worker_2_port); UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port; DROP TABLE cluster_management_test; + +-- check that adding/removing nodes are propagated to nodes with hasmetadata=true +SELECT master_remove_node('localhost', :worker_2_port); +UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port; +SELECT master_add_node('localhost', :worker_2_port); +\c - - - :worker_1_port +SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port; +\c - - - :master_port +SELECT master_remove_node('localhost', :worker_2_port); +\c - - - :worker_1_port +SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port; +\c - - - :master_port + +-- check that added nodes are not propagated to nodes with hasmetadata=false +UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port; +SELECT master_add_node('localhost', :worker_2_port); +\c - - - :worker_1_port +SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port; +\c - - - :master_port + +-- check that removing two nodes in the same transaction works +SELECT + master_remove_node('localhost', :worker_1_port), + master_remove_node('localhost', :worker_2_port); +SELECT * FROM pg_dist_node ORDER BY nodeid; + +-- check that adding two nodes in the same transaction works +SELECT + master_add_node('localhost', :worker_1_port), + master_add_node('localhost', :worker_2_port); +SELECT * FROM pg_dist_node ORDER BY nodeid; + +-- check that mixed add/remove node commands work fine inside transaction +BEGIN; +SELECT master_remove_node('localhost', :worker_2_port); +SELECT master_add_node('localhost', :worker_2_port); +SELECT master_remove_node('localhost', :worker_2_port); +COMMIT; + +SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port; + +UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port; +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +SELECT master_remove_node('localhost', :worker_2_port); +SELECT master_add_node('localhost', :worker_2_port); +COMMIT; + +SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port; + +\c - - - :worker_1_port +SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port; +\c - - - :master_port + +SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; +SELECT master_add_node('localhost', :worker_1_port); +SELECT master_add_node('localhost', :worker_2_port); + +-- check that a distributed table can be created after adding a node in a transaction + +SELECT master_remove_node('localhost', :worker_2_port); +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +CREATE TABLE temp(col1 text, col2 int); +SELECT create_distributed_table('temp', 'col1'); +INSERT INTO temp VALUES ('row1', 1); +INSERT INTO temp VALUES ('row2', 2); +COMMIT; + +SELECT col1, col2 FROM temp ORDER BY col1; + +SELECT + count(*) +FROM + pg_dist_shard_placement, pg_dist_shard +WHERE + pg_dist_shard_placement.shardid = pg_dist_shard.shardid + AND pg_dist_shard.logicalrelid = 'temp'::regclass + AND pg_dist_shard_placement.nodeport = :worker_2_port; + +DROP TABLE temp;