mirror of https://github.com/citusdata/citus.git
Metadata sync also syncs nodes in other clusters
parent
0dee4f8418
commit
3fc87a7a29
|
@ -227,7 +227,8 @@ MetadataCreateCommands(void)
|
||||||
List *metadataSnapshotCommandList = NIL;
|
List *metadataSnapshotCommandList = NIL;
|
||||||
List *distributedTableList = DistributedTableList();
|
List *distributedTableList = DistributedTableList();
|
||||||
List *propagatedTableList = NIL;
|
List *propagatedTableList = NIL;
|
||||||
List *workerNodeList = ReadWorkerNodes();
|
bool includeNodesFromOtherClusters = true;
|
||||||
|
List *workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters);
|
||||||
ListCell *distributedTableCell = NULL;
|
ListCell *distributedTableCell = NULL;
|
||||||
char *nodeListInsertCommand = NULL;
|
char *nodeListInsertCommand = NULL;
|
||||||
bool includeSequenceDefaults = true;
|
bool includeSequenceDefaults = true;
|
||||||
|
|
|
@ -2309,6 +2309,7 @@ InitializeWorkerNodeCache(void)
|
||||||
HASHCTL info;
|
HASHCTL info;
|
||||||
int hashFlags = 0;
|
int hashFlags = 0;
|
||||||
long maxTableSize = (long) MaxWorkerNodesTracked;
|
long maxTableSize = (long) MaxWorkerNodesTracked;
|
||||||
|
bool includeNodesFromOtherClusters = false;
|
||||||
|
|
||||||
InitializeCaches();
|
InitializeCaches();
|
||||||
|
|
||||||
|
@ -2331,7 +2332,7 @@ InitializeWorkerNodeCache(void)
|
||||||
&info, hashFlags);
|
&info, hashFlags);
|
||||||
|
|
||||||
/* read the list from pg_dist_node */
|
/* read the list from pg_dist_node */
|
||||||
workerNodeList = ReadWorkerNodes();
|
workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters);
|
||||||
|
|
||||||
/* iterate over the worker node list */
|
/* iterate over the worker node list */
|
||||||
foreach(workerNodeCell, workerNodeList)
|
foreach(workerNodeCell, workerNodeList)
|
||||||
|
|
|
@ -596,9 +596,12 @@ FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort)
|
||||||
* ReadWorkerNodes iterates over pg_dist_node table, converts each row
|
* ReadWorkerNodes iterates over pg_dist_node table, converts each row
|
||||||
* into it's memory representation (i.e., WorkerNode) and adds them into
|
* into it's memory representation (i.e., WorkerNode) and adds them into
|
||||||
* a list. Lastly, the list is returned to the caller.
|
* a list. Lastly, the list is returned to the caller.
|
||||||
|
*
|
||||||
|
* It skips nodes which are not in the current clusters unless requested to do otherwise
|
||||||
|
* by includeNodesFromOtherClusters.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
ReadWorkerNodes()
|
ReadWorkerNodes(bool includeNodesFromOtherClusters)
|
||||||
{
|
{
|
||||||
SysScanDesc scanDescriptor = NULL;
|
SysScanDesc scanDescriptor = NULL;
|
||||||
ScanKeyData scanKey[1];
|
ScanKeyData scanKey[1];
|
||||||
|
@ -620,7 +623,8 @@ ReadWorkerNodes()
|
||||||
{
|
{
|
||||||
WorkerNode *workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
|
WorkerNode *workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
|
||||||
|
|
||||||
if (strncmp(workerNode->nodeCluster, CurrentCluster, WORKER_LENGTH) == 0)
|
if (includeNodesFromOtherClusters ||
|
||||||
|
strncmp(workerNode->nodeCluster, CurrentCluster, WORKER_LENGTH) == 0)
|
||||||
{
|
{
|
||||||
/* the coordinator acts as if it never sees nodes not in it's cluster */
|
/* the coordinator acts as if it never sees nodes not in it's cluster */
|
||||||
workerNodeList = lappend(workerNodeList, workerNode);
|
workerNodeList = lappend(workerNodeList, workerNode);
|
||||||
|
|
|
@ -66,7 +66,7 @@ extern uint32 ActivePrimaryNodeCount(void);
|
||||||
extern List * ActivePrimaryNodeList(void);
|
extern List * ActivePrimaryNodeList(void);
|
||||||
extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
|
extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
|
||||||
extern WorkerNode * FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort);
|
extern WorkerNode * FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort);
|
||||||
extern List * ReadWorkerNodes(void);
|
extern List * ReadWorkerNodes(bool includeNodesFromOtherClusters);
|
||||||
extern void EnsureCoordinator(void);
|
extern void EnsureCoordinator(void);
|
||||||
extern uint32 GroupForNode(char *nodeName, int32 nodePorT);
|
extern uint32 GroupForNode(char *nodeName, int32 nodePorT);
|
||||||
extern WorkerNode * PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes);
|
extern WorkerNode * PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes);
|
||||||
|
|
|
@ -207,6 +207,13 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport = 8888;
|
||||||
f
|
f
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- Add a node to another cluster to make sure it's also synced
|
||||||
|
SELECT master_add_secondary_node('localhost', 8889, 'localhost', :worker_1_port, nodecluster => 'second-cluster');
|
||||||
|
master_add_secondary_node
|
||||||
|
-----------------------------------------------------------
|
||||||
|
(5,1,localhost,8889,default,f,t,secondary,second-cluster)
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- Run start_metadata_sync_to_node and check that it marked hasmetadata for that worker
|
-- Run start_metadata_sync_to_node and check that it marked hasmetadata for that worker
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
start_metadata_sync_to_node
|
start_metadata_sync_to_node
|
||||||
|
@ -229,12 +236,13 @@ SELECT * FROM pg_dist_local_group;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
|
||||||
--------+---------+-----------+----------+----------+-------------+----------+-----------+-------------
|
--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------
|
||||||
1 | 1 | localhost | 57637 | default | t | t | primary | default
|
1 | 1 | localhost | 57637 | default | t | t | primary | default
|
||||||
2 | 2 | localhost | 57638 | default | f | t | primary | default
|
2 | 2 | localhost | 57638 | default | f | t | primary | default
|
||||||
4 | 1 | localhost | 8888 | default | f | t | secondary | default
|
4 | 1 | localhost | 8888 | default | f | t | secondary | default
|
||||||
(3 rows)
|
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
|
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
|
||||||
logicalrelid | partmethod | partkey | colocationid | repmodel
|
logicalrelid | partmethod | partkey | colocationid | repmodel
|
||||||
|
@ -367,12 +375,13 @@ SELECT * FROM pg_dist_local_group;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
|
||||||
--------+---------+-----------+----------+----------+-------------+----------+-----------+-------------
|
--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------
|
||||||
1 | 1 | localhost | 57637 | default | t | t | primary | default
|
1 | 1 | localhost | 57637 | default | t | t | primary | default
|
||||||
2 | 2 | localhost | 57638 | default | f | t | primary | default
|
2 | 2 | localhost | 57638 | default | f | t | primary | default
|
||||||
4 | 1 | localhost | 8888 | default | f | t | secondary | default
|
4 | 1 | localhost | 8888 | default | f | t | secondary | default
|
||||||
(3 rows)
|
5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
|
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
|
||||||
logicalrelid | partmethod | partkey | colocationid | repmodel
|
logicalrelid | partmethod | partkey | colocationid | repmodel
|
||||||
|
@ -1165,7 +1174,7 @@ SELECT create_distributed_table('mx_table', 'a');
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------------
|
---------------------------------------------------
|
||||||
(5,4,localhost,57638,default,f,t,primary,default)
|
(6,4,localhost,57638,default,f,t,primary,default)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||||
|
@ -1378,7 +1387,7 @@ SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "mx_ref" to the node localhost:57638
|
NOTICE: Replicating reference table "mx_ref" to the node localhost:57638
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------------
|
---------------------------------------------------
|
||||||
(6,5,localhost,57638,default,f,t,primary,default)
|
(7,5,localhost,57638,default,f,t,primary,default)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT shardid, nodename, nodeport
|
SELECT shardid, nodename, nodeport
|
||||||
|
|
|
@ -75,6 +75,9 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport = 8888;
|
||||||
SELECT stop_metadata_sync_to_node('localhost', 8888);
|
SELECT stop_metadata_sync_to_node('localhost', 8888);
|
||||||
SELECT hasmetadata FROM pg_dist_node WHERE nodeport = 8888;
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport = 8888;
|
||||||
|
|
||||||
|
-- Add a node to another cluster to make sure it's also synced
|
||||||
|
SELECT master_add_secondary_node('localhost', 8889, 'localhost', :worker_1_port, nodecluster => 'second-cluster');
|
||||||
|
|
||||||
-- Run start_metadata_sync_to_node and check that it marked hasmetadata for that worker
|
-- Run start_metadata_sync_to_node and check that it marked hasmetadata for that worker
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
SELECT nodeid, hasmetadata FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_1_port;
|
SELECT nodeid, hasmetadata FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_1_port;
|
||||||
|
|
Loading…
Reference in New Issue