Propagate node add/remove to the nodes with hasmetadata=true

This change propagates the changes done by `master_add_node` and `master_remove_node`
to the workers that contain metadata.
pull/996/head
Eren Basak 2016-11-08 11:47:12 -08:00
parent a4096c9f45
commit fb88b167a7
5 changed files with 345 additions and 10 deletions

View File

@ -63,9 +63,6 @@ GetWorkerTransactions(void)
workerConnectionList = OpenWorkerTransactions(); workerConnectionList = OpenWorkerTransactions();
} }
/* ensure that number of workers has not change */
Assert(list_length(WorkerNodeList()) == list_length(workerConnectionList));
return workerConnectionList; return workerConnectionList;
} }
@ -267,6 +264,29 @@ IsWorkerTransactionActive(void)
} }
/*
* RemoveWorkerTransaction removes the transaction connection to the specified node from
* the transaction connection list.
*/
void
RemoveWorkerTransaction(char *nodeName, int32 nodePort)
{
TransactionConnection *transactionConnection =
GetWorkerTransaction(nodeName, nodePort);
/* transactionConnection = NULL if the worker transactions have not opened before */
if (transactionConnection != NULL)
{
PGconn *connection = transactionConnection->connection;
/* closing the connection will rollback all uncommited transactions */
PQfinish(connection);
workerConnectionList = list_delete(workerConnectionList, transactionConnection);
}
}
/* /*
* EnableXactCallback registers the CompleteWorkerTransactions function as the callback * EnableXactCallback registers the CompleteWorkerTransactions function as the callback
* of the worker transactions. * of the worker transactions.

View File

@ -26,8 +26,10 @@
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/pg_dist_node.h" #include "distributed/pg_dist_node.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "storage/lock.h" #include "storage/lock.h"
#include "storage/fd.h" #include "storage/fd.h"
@ -81,7 +83,8 @@ master_add_node(PG_FUNCTION_ARGS)
/* /*
* master_remove_node function removes the provided node from the pg_dist_node table. * master_remove_node function removes the provided node from the pg_dist_node table of
* the master node and all nodes with metadata.
* The call to the master_remove_node should be done by the super user and the specified * The call to the master_remove_node should be done by the super user and the specified
* node should not have any active placements. * node should not have any active placements.
*/ */
@ -91,7 +94,9 @@ master_remove_node(PG_FUNCTION_ARGS)
text *nodeName = PG_GETARG_TEXT_P(0); text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1); int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName); char *nodeNameString = text_to_cstring(nodeName);
char *nodeDeleteCommand = NULL;
bool hasShardPlacements = false; bool hasShardPlacements = false;
WorkerNode *workerNode = NULL;
EnsureSuperUser(); EnsureSuperUser();
@ -102,8 +107,16 @@ master_remove_node(PG_FUNCTION_ARGS)
"shard placements"))); "shard placements")));
} }
workerNode = FindWorkerNode(nodeNameString, nodePort);
DeleteNodeRow(nodeNameString, nodePort); DeleteNodeRow(nodeNameString, nodePort);
nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
RemoveWorkerTransaction(nodeNameString, nodePort);
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -196,12 +209,12 @@ ReadWorkerNodes()
/* /*
* AddNodeMetadata checks the given node information and adds the specified node to the * AddNodeMetadata checks the given node information and adds the specified node to the
* pg_dist_node table. If the node already exists, the function returns with the * pg_dist_node table of the master and workers with metadata.
* information about the node. If not, the following prodecure is followed while adding a * If the node already exists, the function returns the information about the node.
* node. * If not, the following prodecure is followed while adding a node: If the groupId is not
* If the groupId is not explicitly given by the user, the function picks the * explicitly given by the user, the function picks the group that the new node should
* group that the new node should be in with respect to GroupSize. Then, the * be in with respect to GroupSize. Then, the new node is inserted into the local
* new node is inserted into the local pg_dist_node. * pg_dist_node as well as the nodes with hasmetadata=true.
*/ */
static Datum static Datum
AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
@ -211,6 +224,9 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
int nextNodeIdInt = 0; int nextNodeIdInt = 0;
Datum returnData = 0; Datum returnData = 0;
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
char *nodeDeleteCommand = NULL;
char *nodeInsertCommand = NULL;
List *workerNodeList = NIL;
EnsureSuperUser(); EnsureSuperUser();
@ -250,6 +266,17 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata); InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata);
workerNode = FindWorkerNode(nodeName, nodePort);
/* send the delete command all nodes with metadata */
nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand);
/* finally prepare the insert command and send it to all primary nodes */
workerNodeList = list_make1(workerNode);
nodeInsertCommand = NodeListInsertCommand(workerNodeList);
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand);
heap_close(pgDistNode, AccessExclusiveLock); heap_close(pgDistNode, AccessExclusiveLock);
/* fetch the worker node, and generate the output */ /* fetch the worker node, and generate the output */

View File

@ -35,6 +35,7 @@ extern void SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *co
const char *const *parameterValues); const char *const *parameterValues);
extern void SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, extern void SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort,
char *nodeUser, List *commandList); char *nodeUser, List *commandList);
extern void RemoveWorkerTransaction(char *nodeName, int32 nodePort);
/* helper functions for worker transactions */ /* helper functions for worker transactions */
extern bool IsWorkerTransactionActive(void); extern bool IsWorkerTransactionActive(void);

View File

@ -132,3 +132,209 @@ SELECT master_add_node('localhost', :worker_2_port);
UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port; UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port;
DROP TABLE cluster_management_test; DROP TABLE cluster_management_test;
-- check that adding/removing nodes are propagated to nodes with hasmetadata=true
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
---------------------------------
(5,5,localhost,57638,default,f)
(1 row)
\c - - - :worker_1_port
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
nodename | nodeport
-----------+----------
localhost | 57638
(1 row)
\c - - - :master_port
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
\c - - - :worker_1_port
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
nodename | nodeport
----------+----------
(0 rows)
\c - - - :master_port
-- check that added nodes are not propagated to nodes with hasmetadata=false
UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port;
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
---------------------------------
(6,6,localhost,57638,default,f)
(1 row)
\c - - - :worker_1_port
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
nodename | nodeport
----------+----------
(0 rows)
\c - - - :master_port
-- check that removing two nodes in the same transaction works
SELECT
master_remove_node('localhost', :worker_1_port),
master_remove_node('localhost', :worker_2_port);
master_remove_node | master_remove_node
--------------------+--------------------
|
(1 row)
SELECT * FROM pg_dist_node ORDER BY nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata
--------+---------+----------+----------+----------+-------------
(0 rows)
-- check that adding two nodes in the same transaction works
SELECT
master_add_node('localhost', :worker_1_port),
master_add_node('localhost', :worker_2_port);
master_add_node | master_add_node
---------------------------------+---------------------------------
(7,7,localhost,57637,default,f) | (8,8,localhost,57638,default,f)
(1 row)
SELECT * FROM pg_dist_node ORDER BY nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata
--------+---------+-----------+----------+----------+-------------
7 | 7 | localhost | 57637 | default | f
8 | 8 | localhost | 57638 | default | f
(2 rows)
-- check that mixed add/remove node commands work fine inside transaction
BEGIN;
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
---------------------------------
(9,9,localhost,57638,default,f)
(1 row)
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
COMMIT;
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
nodename | nodeport
----------+----------
(0 rows)
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------
(10,10,localhost,57638,default,f)
(1 row)
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------
(11,11,localhost,57638,default,f)
(1 row)
COMMIT;
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
nodename | nodeport
-----------+----------
localhost | 57638
(1 row)
\c - - - :worker_1_port
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
nodename | nodeport
-----------+----------
localhost | 57638
(1 row)
\c - - - :master_port
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
master_remove_node
--------------------
(2 rows)
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-----------------------------------
(12,12,localhost,57637,default,f)
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------
(13,13,localhost,57638,default,f)
(1 row)
-- check that a distributed table can be created after adding a node in a transaction
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------
(14,14,localhost,57638,default,f)
(1 row)
CREATE TABLE temp(col1 text, col2 int);
SELECT create_distributed_table('temp', 'col1');
create_distributed_table
--------------------------
(1 row)
INSERT INTO temp VALUES ('row1', 1);
INSERT INTO temp VALUES ('row2', 2);
COMMIT;
SELECT col1, col2 FROM temp ORDER BY col1;
col1 | col2
------+------
row1 | 1
row2 | 2
(2 rows)
SELECT
count(*)
FROM
pg_dist_shard_placement, pg_dist_shard
WHERE
pg_dist_shard_placement.shardid = pg_dist_shard.shardid
AND pg_dist_shard.logicalrelid = 'temp'::regclass
AND pg_dist_shard_placement.nodeport = :worker_2_port;
count
-------
32
(1 row)
DROP TABLE temp;

View File

@ -47,3 +47,84 @@ SELECT master_get_active_worker_nodes();
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port; UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port;
DROP TABLE cluster_management_test; DROP TABLE cluster_management_test;
-- check that adding/removing nodes are propagated to nodes with hasmetadata=true
SELECT master_remove_node('localhost', :worker_2_port);
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
SELECT master_add_node('localhost', :worker_2_port);
\c - - - :worker_1_port
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
\c - - - :master_port
SELECT master_remove_node('localhost', :worker_2_port);
\c - - - :worker_1_port
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
\c - - - :master_port
-- check that added nodes are not propagated to nodes with hasmetadata=false
UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port;
SELECT master_add_node('localhost', :worker_2_port);
\c - - - :worker_1_port
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
\c - - - :master_port
-- check that removing two nodes in the same transaction works
SELECT
master_remove_node('localhost', :worker_1_port),
master_remove_node('localhost', :worker_2_port);
SELECT * FROM pg_dist_node ORDER BY nodeid;
-- check that adding two nodes in the same transaction works
SELECT
master_add_node('localhost', :worker_1_port),
master_add_node('localhost', :worker_2_port);
SELECT * FROM pg_dist_node ORDER BY nodeid;
-- check that mixed add/remove node commands work fine inside transaction
BEGIN;
SELECT master_remove_node('localhost', :worker_2_port);
SELECT master_add_node('localhost', :worker_2_port);
SELECT master_remove_node('localhost', :worker_2_port);
COMMIT;
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
SELECT master_remove_node('localhost', :worker_2_port);
SELECT master_add_node('localhost', :worker_2_port);
COMMIT;
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
\c - - - :worker_1_port
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
\c - - - :master_port
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
SELECT master_add_node('localhost', :worker_1_port);
SELECT master_add_node('localhost', :worker_2_port);
-- check that a distributed table can be created after adding a node in a transaction
SELECT master_remove_node('localhost', :worker_2_port);
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
CREATE TABLE temp(col1 text, col2 int);
SELECT create_distributed_table('temp', 'col1');
INSERT INTO temp VALUES ('row1', 1);
INSERT INTO temp VALUES ('row2', 2);
COMMIT;
SELECT col1, col2 FROM temp ORDER BY col1;
SELECT
count(*)
FROM
pg_dist_shard_placement, pg_dist_shard
WHERE
pg_dist_shard_placement.shardid = pg_dist_shard.shardid
AND pg_dist_shard.logicalrelid = 'temp'::regclass
AND pg_dist_shard_placement.nodeport = :worker_2_port;
DROP TABLE temp;