mirror of https://github.com/citusdata/citus.git
Merge pull request #5979 from citusdata/improve_metadata_sync_disabled
fix assertion failure & honor enable_metadata_sync in node operationspull/5974/head
commit
fc151a9b80
|
@ -746,8 +746,6 @@ PgDistTableMetadataSyncCommandList(void)
|
||||||
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
|
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
|
||||||
colocationGroupSyncCommandList);
|
colocationGroupSyncCommandList);
|
||||||
|
|
||||||
/* As the last step, propagate the pg_dist_object entities */
|
|
||||||
Assert(ShouldPropagate());
|
|
||||||
List *distributedObjectSyncCommandList = DistributedObjectMetadataSyncCommandList();
|
List *distributedObjectSyncCommandList = DistributedObjectMetadataSyncCommandList();
|
||||||
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
|
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
|
||||||
distributedObjectSyncCommandList);
|
distributedObjectSyncCommandList);
|
||||||
|
@ -924,8 +922,6 @@ SyncPgDistTableMetadataToNodeList(List *nodeList)
|
||||||
/* send commands to new workers, the current user should be a superuser */
|
/* send commands to new workers, the current user should be a superuser */
|
||||||
Assert(superuser());
|
Assert(superuser());
|
||||||
|
|
||||||
List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList();
|
|
||||||
|
|
||||||
List *nodesWithMetadata = NIL;
|
List *nodesWithMetadata = NIL;
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
foreach_ptr(workerNode, nodeList)
|
foreach_ptr(workerNode, nodeList)
|
||||||
|
@ -936,7 +932,12 @@ SyncPgDistTableMetadataToNodeList(List *nodeList)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (nodesWithMetadata == NIL)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList();
|
||||||
SendMetadataCommandListToWorkerListInCoordinatedTransaction(
|
SendMetadataCommandListToWorkerListInCoordinatedTransaction(
|
||||||
nodesWithMetadata,
|
nodesWithMetadata,
|
||||||
CurrentUserName(),
|
CurrentUserName(),
|
||||||
|
@ -1898,12 +1899,15 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
|
||||||
|
|
||||||
RemoveOldShardPlacementForNodeGroup(workerNode->groupId);
|
RemoveOldShardPlacementForNodeGroup(workerNode->groupId);
|
||||||
|
|
||||||
char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
|
|
||||||
|
|
||||||
/* make sure we don't have any lingering session lifespan connections */
|
/* make sure we don't have any lingering session lifespan connections */
|
||||||
CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort);
|
CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort);
|
||||||
|
|
||||||
SendCommandToWorkersWithMetadata(nodeDeleteCommand);
|
if (EnableMetadataSync)
|
||||||
|
{
|
||||||
|
char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
|
||||||
|
|
||||||
|
SendCommandToWorkersWithMetadata(nodeDeleteCommand);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2154,18 +2158,21 @@ AddNodeMetadata(char *nodeName, int32 nodePort,
|
||||||
|
|
||||||
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
|
workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
|
||||||
|
|
||||||
/* send the delete command to all primary nodes with metadata */
|
if (EnableMetadataSync)
|
||||||
char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
|
|
||||||
SendCommandToWorkersWithMetadata(nodeDeleteCommand);
|
|
||||||
|
|
||||||
/* finally prepare the insert command and send it to all primary nodes */
|
|
||||||
uint32 primariesWithMetadata = CountPrimariesWithMetadata();
|
|
||||||
if (primariesWithMetadata != 0)
|
|
||||||
{
|
{
|
||||||
List *workerNodeList = list_make1(workerNode);
|
/* send the delete command to all primary nodes with metadata */
|
||||||
char *nodeInsertCommand = NodeListInsertCommand(workerNodeList);
|
char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
|
||||||
|
SendCommandToWorkersWithMetadata(nodeDeleteCommand);
|
||||||
|
|
||||||
SendCommandToWorkersWithMetadata(nodeInsertCommand);
|
/* finally prepare the insert command and send it to all primary nodes */
|
||||||
|
uint32 primariesWithMetadata = CountPrimariesWithMetadata();
|
||||||
|
if (primariesWithMetadata != 0)
|
||||||
|
{
|
||||||
|
List *workerNodeList = list_make1(workerNode);
|
||||||
|
char *nodeInsertCommand = NodeListInsertCommand(workerNodeList);
|
||||||
|
|
||||||
|
SendCommandToWorkersWithMetadata(nodeInsertCommand);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return workerNode->nodeId;
|
return workerNode->nodeId;
|
||||||
|
@ -2184,11 +2191,13 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
|
||||||
{
|
{
|
||||||
workerNode = SetWorkerColumnLocalOnly(workerNode, columnIndex, value);
|
workerNode = SetWorkerColumnLocalOnly(workerNode, columnIndex, value);
|
||||||
|
|
||||||
char *metadataSyncCommand = GetMetadataSyncCommandToSetNodeColumn(workerNode,
|
if (EnableMetadataSync)
|
||||||
columnIndex,
|
{
|
||||||
value);
|
char *metadataSyncCommand =
|
||||||
|
GetMetadataSyncCommandToSetNodeColumn(workerNode, columnIndex, value);
|
||||||
|
|
||||||
SendCommandToWorkersWithMetadata(metadataSyncCommand);
|
SendCommandToWorkersWithMetadata(metadataSyncCommand);
|
||||||
|
}
|
||||||
|
|
||||||
return workerNode;
|
return workerNode;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1190,6 +1190,52 @@ WHERE logicalrelid = 'test_dist_non_colocated'::regclass GROUP BY nodeport ORDER
|
||||||
|
|
||||||
SELECT * from master_set_node_property('localhost', :worker_2_port, 'bogusproperty', false);
|
SELECT * from master_set_node_property('localhost', :worker_2_port, 'bogusproperty', false);
|
||||||
ERROR: only the 'shouldhaveshards' property can be set using this function
|
ERROR: only the 'shouldhaveshards' property can be set using this function
|
||||||
|
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset
|
||||||
|
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset
|
||||||
|
BEGIN;
|
||||||
|
-- show that we do not send any metadata to any nodes if not enabled
|
||||||
|
SET LOCAL citus.log_remote_commands TO ON;
|
||||||
|
SET LOCAL citus.grep_remote_commands TO '%pg_dist%';
|
||||||
|
SET citus.enable_metadata_sync TO OFF;
|
||||||
|
SELECT start_metadata_sync_to_all_nodes();
|
||||||
|
start_metadata_sync_to_all_nodes
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated;
|
||||||
|
SELECT 1 FROM citus_remove_node('localhost', :worker_1_port);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
|
||||||
|
NOTICE: issuing SELECT metadata ->> 'server_id' AS server_id FROM pg_dist_node_metadata
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
|
||||||
|
NOTICE: issuing SELECT metadata ->> 'server_id' AS server_id FROM pg_dist_node_metadata
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- keep the rest of the tests inact that depends node/group ids
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;
|
||||||
DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated;
|
DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT start_metadata_sync_to_all_nodes();
|
SELECT start_metadata_sync_to_all_nodes();
|
||||||
|
|
|
@ -482,6 +482,26 @@ WHERE logicalrelid = 'test_dist_non_colocated'::regclass GROUP BY nodeport ORDER
|
||||||
|
|
||||||
SELECT * from master_set_node_property('localhost', :worker_2_port, 'bogusproperty', false);
|
SELECT * from master_set_node_property('localhost', :worker_2_port, 'bogusproperty', false);
|
||||||
|
|
||||||
|
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset
|
||||||
|
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- show that we do not send any metadata to any nodes if not enabled
|
||||||
|
SET LOCAL citus.log_remote_commands TO ON;
|
||||||
|
SET LOCAL citus.grep_remote_commands TO '%pg_dist%';
|
||||||
|
SET citus.enable_metadata_sync TO OFF;
|
||||||
|
SELECT start_metadata_sync_to_all_nodes();
|
||||||
|
DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated;
|
||||||
|
SELECT 1 FROM citus_remove_node('localhost', :worker_1_port);
|
||||||
|
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
|
||||||
|
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
|
||||||
|
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- keep the rest of the tests inact that depends node/group ids
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;
|
||||||
|
|
||||||
DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated;
|
DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated;
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
Loading…
Reference in New Issue