Improve node matedata when coordinator is added

Coordinator should always be always active, hasmetadata and
metadasynced. Prevent changing those fields.
pull/4167/head
Onder Kalaci 2020-09-18 15:49:52 +02:00
parent 6fc1dea85c
commit 5d017cd123
5 changed files with 83 additions and 65 deletions

View File

@ -926,24 +926,6 @@ ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards)
}
/*
* MetadataSyncedUpdateCommand generates a command that can be executed to
* update the metadatasynced column of a node in pg_dist_node table.
*/
char *
MetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced)
{
StringInfo nodeStateUpdateCommand = makeStringInfo();
char *metadataSyncedString = metadataSynced ? "TRUE" : "FALSE";
appendStringInfo(nodeStateUpdateCommand,
"UPDATE pg_catalog.pg_dist_node SET metadatasynced = %s "
"WHERE nodeid = %u", metadataSyncedString, nodeId);
return nodeStateUpdateCommand->data;
}
/*
* ColocationIdUpdateCommand creates the SQL command to change the colocationId
* of the table with the given name to the given colocationId in pg_dist_partition

View File

@ -91,8 +91,6 @@ static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata
*nodeMetadata, bool *nodeAlreadyExists);
static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive);
static WorkerNode * SetNodeMetadataSync(char *nodeName, int nodePort, bool
metadataSynced);
static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort);
static int32 GetNextGroupId(void);
static int GetNextNodeId(void);
@ -105,6 +103,8 @@ static void PropagateNodeWideObjects(WorkerNode *newWorkerNode);
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
static bool UnsetMetadataSyncedForAll(void);
static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value,
char *field);
static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards);
/* declarations for dynamic loading */
@ -211,6 +211,11 @@ master_add_inactive_node(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
if (nodeMetadata.groupId == COORDINATOR_GROUP_ID)
{
ereport(ERROR, (errmsg("coordinator node cannot be added as inactive node")));
}
int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
&nodeAlreadyExists);
TransactionModifiedNodeMetadata = true;
@ -333,13 +338,23 @@ master_disable_node(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(savedContext);
ErrorData *edata = CopyErrorData();
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Disabling %s:%d failed", workerNode->workerName,
nodePort),
errdetail("%s", edata->message),
errhint(
"If you are using MX, try stop_metadata_sync_to_node(hostname, port) "
"for nodes that are down before disabling them.")));
if (ClusterHasKnownMetadataWorkers())
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Disabling %s:%d failed", workerNode->workerName,
nodePort),
errdetail("%s", edata->message),
errhint(
"If you are using MX, try stop_metadata_sync_to_node(hostname, port) "
"for nodes that are down before disabling them.")));
}
else
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Disabling %s:%d failed", workerNode->workerName,
nodePort),
errdetail("%s", edata->message)));
}
}
PG_END_TRY();
@ -644,16 +659,6 @@ ActivateNode(char *nodeName, int nodePort)
WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive);
/*
* Coordinator has always the authoritative metadata, reflect this
* fact in the pg_dist_node.
*/
if (newWorkerNode->groupId == COORDINATOR_GROUP_ID)
{
bool metadataSynced = true;
SetNodeMetadataSync(nodeName, nodePort, metadataSynced);
}
SetUpDistributedTableDependencies(newWorkerNode);
return newWorkerNode->nodeId;
}
@ -1186,7 +1191,18 @@ AddNodeMetadata(char *nodeName, int32 nodePort,
if (nodeMetadata->groupId == COORDINATOR_GROUP_ID)
{
nodeMetadata->shouldHaveShards = false;
/*
* Coordinator has always the authoritative metadata, reflect this
* fact in the pg_dist_node.
*/
nodeMetadata->hasMetadata = true;
nodeMetadata->metadataSynced = true;
/*
* There is no concept of "inactive" coordinator, so hard code it.
*/
nodeMetadata->isActive = true;
}
/* if nodeRole hasn't been added yet there's a constraint for one-node-per-group */
@ -1258,8 +1274,17 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
switch (columnIndex)
{
case Anum_pg_dist_node_hasmetadata:
{
ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "hasmetadata");
break;
}
case Anum_pg_dist_node_isactive:
{
ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "isactive");
metadataSyncCommand = NodeStateUpdateCommand(workerNode->nodeId,
DatumGetBool(value));
break;
@ -1274,8 +1299,8 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
case Anum_pg_dist_node_metadatasynced:
{
metadataSyncCommand = MetadataSyncedUpdateCommand(workerNode->nodeId,
DatumGetBool(value));
ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "metadatasynced");
break;
}
@ -1314,6 +1339,22 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
}
/*
* ErrorIfCoordinatorMetadataSetFalse throws an error if the input node
* is the coordinator and the value is false.
*/
static void
ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *field)
{
bool valueBool = DatumGetBool(value);
if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID)
{
ereport(ERROR, (errmsg("cannot change \"%s\" field of the "
"coordinator node", field)));
}
}
/*
* SetShouldHaveShards function sets the shouldhaveshards column of the
* specified worker in pg_dist_node.
@ -1341,20 +1382,6 @@ SetNodeState(char *nodeName, int nodePort, bool isActive)
}
/*
* SetNodeState function sets the isactive column of the specified worker in
* pg_dist_node to isActive.
* It returns the new worker node after the modification.
*/
static WorkerNode *
SetNodeMetadataSync(char *nodeName, int nodePort, bool metadataSynced)
{
WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
return SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(metadataSynced));
}
/*
* GetNodeTuple function returns the heap tuple of given nodeName and nodePort. If the
* node is not found this function returns NULL.

View File

@ -43,7 +43,6 @@ extern List * ShardDeleteCommandList(ShardInterval *shardInterval);
extern char * NodeDeleteCommand(uint32 nodeId);
extern char * NodeStateUpdateCommand(uint32 nodeId, bool isActive);
extern char * ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards);
extern char * MetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced);
extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId);
extern char * CreateSchemaDDLCommand(Oid schemaId);
extern List * GrantOnSchemaDDLCommands(Oid schemaId);

View File

@ -4,6 +4,9 @@ SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 90630500;
SET citus.replication_model TO 'streaming';
-- adding the coordinator as inactive is disallowed
SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);
ERROR: coordinator node cannot be added as inactive node
-- idempotently add node to allow this test to run without add_coordinator
SET client_min_messages TO WARNING;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
@ -12,6 +15,10 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
1
(1 row)
-- coordinator cannot be disabled
SELECT 1 FROM master_disable_node('localhost', :master_port);
ERROR: Disabling localhost:xxxxx failed
DETAIL: cannot change "isactive" field of the coordinator node
RESET client_min_messages;
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
?column?
@ -751,14 +758,6 @@ SELECT create_distributed_function('function_delegation(int)', '$1', 'test');
SET client_min_messages TO DEBUG1;
CALL call_delegation(1);
<<<<<<< HEAD
DROP TABLE test CASCADE;
NOTICE: drop cascades to view single_node_view
-- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added
SELECT 1 FROM master_remove_node('localhost', :master_port);
ERROR: cannot remove the last worker node because there are reference tables and it would cause data loss on reference tables
HINT: To proceed, either drop the reference tables or use undistribute_table() function to convert them to local tables
=======
DEBUG: not pushing down procedure to the same node
SELECT function_delegation(1);
DEBUG: not pushing down function to the same node
@ -767,9 +766,13 @@ DEBUG: not pushing down function to the same node
(1 row)
>>>>>>> 61cf557a8... Improve the robustness of function call delegation
-- Cleanup
SET client_min_messages TO WARNING;
DROP TABLE test CASCADE;
-- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added
SELECT 1 FROM master_remove_node('localhost', :master_port);
ERROR: cannot remove the last worker node because there are reference tables and it would cause data loss on reference tables
HINT: To proceed, either drop the reference tables or use undistribute_table() function to convert them to local tables
-- Cleanup
DROP SCHEMA single_node CASCADE;
-- Remove the coordinator again
SELECT 1 FROM master_remove_node('localhost', :master_port);

View File

@ -5,9 +5,16 @@ SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 90630500;
SET citus.replication_model TO 'streaming';
-- adding the coordinator as inactive is disallowed
SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);
-- idempotently add node to allow this test to run without add_coordinator
SET client_min_messages TO WARNING;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
-- coordinator cannot be disabled
SELECT 1 FROM master_disable_node('localhost', :master_port);
RESET client_min_messages;
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
@ -372,12 +379,12 @@ SET client_min_messages TO DEBUG1;
CALL call_delegation(1);
SELECT function_delegation(1);
SET client_min_messages TO WARNING;
DROP TABLE test CASCADE;
-- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added
SELECT 1 FROM master_remove_node('localhost', :master_port);
-- Cleanup
SET client_min_messages TO WARNING;
DROP SCHEMA single_node CASCADE;
-- Remove the coordinator again
SELECT 1 FROM master_remove_node('localhost', :master_port);