mirror of https://github.com/citusdata/citus.git
Merge pull request #996 from citusdata/sync_pg_dist_node
Propagate node add/remove to the nodes with hasmetadata=truepull/911/head
commit
8013437fc3
|
@ -63,9 +63,6 @@ GetWorkerTransactions(void)
|
|||
workerConnectionList = OpenWorkerTransactions();
|
||||
}
|
||||
|
||||
/* ensure that number of workers has not change */
|
||||
Assert(list_length(WorkerNodeList()) == list_length(workerConnectionList));
|
||||
|
||||
return workerConnectionList;
|
||||
}
|
||||
|
||||
|
@ -267,6 +264,29 @@ IsWorkerTransactionActive(void)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* RemoveWorkerTransaction removes the transaction connection to the specified node from
|
||||
* the transaction connection list.
|
||||
*/
|
||||
void
|
||||
RemoveWorkerTransaction(char *nodeName, int32 nodePort)
|
||||
{
|
||||
TransactionConnection *transactionConnection =
|
||||
GetWorkerTransaction(nodeName, nodePort);
|
||||
|
||||
/* transactionConnection = NULL if the worker transactions have not opened before */
|
||||
if (transactionConnection != NULL)
|
||||
{
|
||||
PGconn *connection = transactionConnection->connection;
|
||||
|
||||
/* closing the connection will rollback all uncommited transactions */
|
||||
PQfinish(connection);
|
||||
|
||||
workerConnectionList = list_delete(workerConnectionList, transactionConnection);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnableXactCallback registers the CompleteWorkerTransactions function as the callback
|
||||
* of the worker transactions.
|
||||
|
|
|
@ -26,8 +26,10 @@
|
|||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/pg_dist_node.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "storage/lock.h"
|
||||
#include "storage/fd.h"
|
||||
|
@ -81,7 +83,8 @@ master_add_node(PG_FUNCTION_ARGS)
|
|||
|
||||
|
||||
/*
|
||||
* master_remove_node function removes the provided node from the pg_dist_node table.
|
||||
* master_remove_node function removes the provided node from the pg_dist_node table of
|
||||
* the master node and all nodes with metadata.
|
||||
* The call to the master_remove_node should be done by the super user and the specified
|
||||
* node should not have any active placements.
|
||||
*/
|
||||
|
@ -91,7 +94,9 @@ master_remove_node(PG_FUNCTION_ARGS)
|
|||
text *nodeName = PG_GETARG_TEXT_P(0);
|
||||
int32 nodePort = PG_GETARG_INT32(1);
|
||||
char *nodeNameString = text_to_cstring(nodeName);
|
||||
char *nodeDeleteCommand = NULL;
|
||||
bool hasShardPlacements = false;
|
||||
WorkerNode *workerNode = NULL;
|
||||
|
||||
EnsureSuperUser();
|
||||
|
||||
|
@ -102,8 +107,16 @@ master_remove_node(PG_FUNCTION_ARGS)
|
|||
"shard placements")));
|
||||
}
|
||||
|
||||
workerNode = FindWorkerNode(nodeNameString, nodePort);
|
||||
|
||||
DeleteNodeRow(nodeNameString, nodePort);
|
||||
|
||||
nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
|
||||
|
||||
RemoveWorkerTransaction(nodeNameString, nodePort);
|
||||
|
||||
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
@ -196,12 +209,12 @@ ReadWorkerNodes()
|
|||
|
||||
/*
|
||||
* AddNodeMetadata checks the given node information and adds the specified node to the
|
||||
* pg_dist_node table. If the node already exists, the function returns with the
|
||||
* information about the node. If not, the following prodecure is followed while adding a
|
||||
* node.
|
||||
* If the groupId is not explicitly given by the user, the function picks the
|
||||
* group that the new node should be in with respect to GroupSize. Then, the
|
||||
* new node is inserted into the local pg_dist_node.
|
||||
* pg_dist_node table of the master and workers with metadata.
|
||||
* If the node already exists, the function returns the information about the node.
|
||||
* If not, the following prodecure is followed while adding a node: If the groupId is not
|
||||
* explicitly given by the user, the function picks the group that the new node should
|
||||
* be in with respect to GroupSize. Then, the new node is inserted into the local
|
||||
* pg_dist_node as well as the nodes with hasmetadata=true.
|
||||
*/
|
||||
static Datum
|
||||
AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
||||
|
@ -211,6 +224,9 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
|||
int nextNodeIdInt = 0;
|
||||
Datum returnData = 0;
|
||||
WorkerNode *workerNode = NULL;
|
||||
char *nodeDeleteCommand = NULL;
|
||||
char *nodeInsertCommand = NULL;
|
||||
List *workerNodeList = NIL;
|
||||
|
||||
EnsureSuperUser();
|
||||
|
||||
|
@ -250,6 +266,17 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
|||
|
||||
InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata);
|
||||
|
||||
workerNode = FindWorkerNode(nodeName, nodePort);
|
||||
|
||||
/* send the delete command all nodes with metadata */
|
||||
nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
|
||||
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand);
|
||||
|
||||
/* finally prepare the insert command and send it to all primary nodes */
|
||||
workerNodeList = list_make1(workerNode);
|
||||
nodeInsertCommand = NodeListInsertCommand(workerNodeList);
|
||||
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand);
|
||||
|
||||
heap_close(pgDistNode, AccessExclusiveLock);
|
||||
|
||||
/* fetch the worker node, and generate the output */
|
||||
|
|
|
@ -35,6 +35,7 @@ extern void SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *co
|
|||
const char *const *parameterValues);
|
||||
extern void SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort,
|
||||
char *nodeUser, List *commandList);
|
||||
extern void RemoveWorkerTransaction(char *nodeName, int32 nodePort);
|
||||
|
||||
/* helper functions for worker transactions */
|
||||
extern bool IsWorkerTransactionActive(void);
|
||||
|
|
|
@ -132,3 +132,209 @@ SELECT master_add_node('localhost', :worker_2_port);
|
|||
|
||||
UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port;
|
||||
DROP TABLE cluster_management_test;
|
||||
-- check that adding/removing nodes are propagated to nodes with hasmetadata=true
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
master_remove_node
|
||||
--------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
|
||||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
master_add_node
|
||||
---------------------------------
|
||||
(5,5,localhost,57638,default,f)
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
nodename | nodeport
|
||||
-----------+----------
|
||||
localhost | 57638
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
master_remove_node
|
||||
--------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
nodename | nodeport
|
||||
----------+----------
|
||||
(0 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
-- check that added nodes are not propagated to nodes with hasmetadata=false
|
||||
UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port;
|
||||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
master_add_node
|
||||
---------------------------------
|
||||
(6,6,localhost,57638,default,f)
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
nodename | nodeport
|
||||
----------+----------
|
||||
(0 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
-- check that removing two nodes in the same transaction works
|
||||
SELECT
|
||||
master_remove_node('localhost', :worker_1_port),
|
||||
master_remove_node('localhost', :worker_2_port);
|
||||
master_remove_node | master_remove_node
|
||||
--------------------+--------------------
|
||||
|
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata
|
||||
--------+---------+----------+----------+----------+-------------
|
||||
(0 rows)
|
||||
|
||||
-- check that adding two nodes in the same transaction works
|
||||
SELECT
|
||||
master_add_node('localhost', :worker_1_port),
|
||||
master_add_node('localhost', :worker_2_port);
|
||||
master_add_node | master_add_node
|
||||
---------------------------------+---------------------------------
|
||||
(7,7,localhost,57637,default,f) | (8,8,localhost,57638,default,f)
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata
|
||||
--------+---------+-----------+----------+----------+-------------
|
||||
7 | 7 | localhost | 57637 | default | f
|
||||
8 | 8 | localhost | 57638 | default | f
|
||||
(2 rows)
|
||||
|
||||
-- check that mixed add/remove node commands work fine inside transaction
|
||||
BEGIN;
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
master_remove_node
|
||||
--------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
master_add_node
|
||||
---------------------------------
|
||||
(9,9,localhost,57638,default,f)
|
||||
(1 row)
|
||||
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
master_remove_node
|
||||
--------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
nodename | nodeport
|
||||
----------+----------
|
||||
(0 rows)
|
||||
|
||||
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
|
||||
BEGIN;
|
||||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
master_add_node
|
||||
-----------------------------------
|
||||
(10,10,localhost,57638,default,f)
|
||||
(1 row)
|
||||
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
master_remove_node
|
||||
--------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
master_add_node
|
||||
-----------------------------------
|
||||
(11,11,localhost,57638,default,f)
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
nodename | nodeport
|
||||
-----------+----------
|
||||
localhost | 57638
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
nodename | nodeport
|
||||
-----------+----------
|
||||
localhost | 57638
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
|
||||
master_remove_node
|
||||
--------------------
|
||||
|
||||
|
||||
(2 rows)
|
||||
|
||||
SELECT master_add_node('localhost', :worker_1_port);
|
||||
master_add_node
|
||||
-----------------------------------
|
||||
(12,12,localhost,57637,default,f)
|
||||
(1 row)
|
||||
|
||||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
master_add_node
|
||||
-----------------------------------
|
||||
(13,13,localhost,57638,default,f)
|
||||
(1 row)
|
||||
|
||||
-- check that a distributed table can be created after adding a node in a transaction
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
master_remove_node
|
||||
--------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
master_add_node
|
||||
-----------------------------------
|
||||
(14,14,localhost,57638,default,f)
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE temp(col1 text, col2 int);
|
||||
SELECT create_distributed_table('temp', 'col1');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO temp VALUES ('row1', 1);
|
||||
INSERT INTO temp VALUES ('row2', 2);
|
||||
COMMIT;
|
||||
SELECT col1, col2 FROM temp ORDER BY col1;
|
||||
col1 | col2
|
||||
------+------
|
||||
row1 | 1
|
||||
row2 | 2
|
||||
(2 rows)
|
||||
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
pg_dist_shard_placement, pg_dist_shard
|
||||
WHERE
|
||||
pg_dist_shard_placement.shardid = pg_dist_shard.shardid
|
||||
AND pg_dist_shard.logicalrelid = 'temp'::regclass
|
||||
AND pg_dist_shard_placement.nodeport = :worker_2_port;
|
||||
count
|
||||
-------
|
||||
32
|
||||
(1 row)
|
||||
|
||||
|
||||
DROP TABLE temp;
|
||||
|
|
|
@ -47,3 +47,84 @@ SELECT master_get_active_worker_nodes();
|
|||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port;
|
||||
DROP TABLE cluster_management_test;
|
||||
|
||||
-- check that adding/removing nodes are propagated to nodes with hasmetadata=true
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
|
||||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
\c - - - :worker_1_port
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
\c - - - :master_port
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
\c - - - :worker_1_port
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
\c - - - :master_port
|
||||
|
||||
-- check that added nodes are not propagated to nodes with hasmetadata=false
|
||||
UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port;
|
||||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
\c - - - :worker_1_port
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
\c - - - :master_port
|
||||
|
||||
-- check that removing two nodes in the same transaction works
|
||||
SELECT
|
||||
master_remove_node('localhost', :worker_1_port),
|
||||
master_remove_node('localhost', :worker_2_port);
|
||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||
|
||||
-- check that adding two nodes in the same transaction works
|
||||
SELECT
|
||||
master_add_node('localhost', :worker_1_port),
|
||||
master_add_node('localhost', :worker_2_port);
|
||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||
|
||||
-- check that mixed add/remove node commands work fine inside transaction
|
||||
BEGIN;
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
COMMIT;
|
||||
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
|
||||
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
|
||||
BEGIN;
|
||||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
COMMIT;
|
||||
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
|
||||
\c - - - :master_port
|
||||
|
||||
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
|
||||
SELECT master_add_node('localhost', :worker_1_port);
|
||||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
|
||||
-- check that a distributed table can be created after adding a node in a transaction
|
||||
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
BEGIN;
|
||||
SELECT master_add_node('localhost', :worker_2_port);
|
||||
CREATE TABLE temp(col1 text, col2 int);
|
||||
SELECT create_distributed_table('temp', 'col1');
|
||||
INSERT INTO temp VALUES ('row1', 1);
|
||||
INSERT INTO temp VALUES ('row2', 2);
|
||||
COMMIT;
|
||||
|
||||
SELECT col1, col2 FROM temp ORDER BY col1;
|
||||
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
pg_dist_shard_placement, pg_dist_shard
|
||||
WHERE
|
||||
pg_dist_shard_placement.shardid = pg_dist_shard.shardid
|
||||
AND pg_dist_shard.logicalrelid = 'temp'::regclass
|
||||
AND pg_dist_shard_placement.nodeport = :worker_2_port;
|
||||
|
||||
DROP TABLE temp;
|
||||
|
|
Loading…
Reference in New Issue