diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index e6e411ed6..163cea3cc 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -926,24 +926,6 @@ ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards) } -/* - * MetadataSyncedUpdateCommand generates a command that can be executed to - * update the metadatasynced column of a node in pg_dist_node table. - */ -char * -MetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced) -{ - StringInfo nodeStateUpdateCommand = makeStringInfo(); - char *metadataSyncedString = metadataSynced ? "TRUE" : "FALSE"; - - appendStringInfo(nodeStateUpdateCommand, - "UPDATE pg_catalog.pg_dist_node SET metadatasynced = %s " - "WHERE nodeid = %u", metadataSyncedString, nodeId); - - return nodeStateUpdateCommand->data; -} - - /* * ColocationIdUpdateCommand creates the SQL command to change the colocationId * of the table with the given name to the given colocationId in pg_dist_partition diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 49ff2dfa6..e24271b6b 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -91,8 +91,6 @@ static void RemoveNodeFromCluster(char *nodeName, int32 nodePort); static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata, bool *nodeAlreadyExists); static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive); -static WorkerNode * SetNodeMetadataSync(char *nodeName, int nodePort, bool - metadataSynced); static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort); static int32 GetNextGroupId(void); static int GetNextNodeId(void); @@ -105,6 +103,8 @@ static void PropagateNodeWideObjects(WorkerNode *newWorkerNode); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static bool UnsetMetadataSyncedForAll(void); +static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, + char *field); static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); /* declarations for dynamic loading */ @@ -211,6 +211,11 @@ master_add_inactive_node(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); + if (nodeMetadata.groupId == COORDINATOR_GROUP_ID) + { + ereport(ERROR, (errmsg("coordinator node cannot be added as inactive node"))); + } + int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, &nodeAlreadyExists); TransactionModifiedNodeMetadata = true; @@ -333,13 +338,23 @@ master_disable_node(PG_FUNCTION_ARGS) MemoryContextSwitchTo(savedContext); ErrorData *edata = CopyErrorData(); - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("Disabling %s:%d failed", workerNode->workerName, - nodePort), - errdetail("%s", edata->message), - errhint( - "If you are using MX, try stop_metadata_sync_to_node(hostname, port) " - "for nodes that are down before disabling them."))); + if (ClusterHasKnownMetadataWorkers()) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Disabling %s:%d failed", workerNode->workerName, + nodePort), + errdetail("%s", edata->message), + errhint( + "If you are using MX, try stop_metadata_sync_to_node(hostname, port) " + "for nodes that are down before disabling them."))); + } + else + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Disabling %s:%d failed", workerNode->workerName, + nodePort), + errdetail("%s", edata->message))); + } } PG_END_TRY(); @@ -644,16 +659,6 @@ ActivateNode(char *nodeName, int nodePort) WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive); - /* - * Coordinator has always the authoritative metadata, reflect this - * fact in the pg_dist_node. - */ - if (newWorkerNode->groupId == COORDINATOR_GROUP_ID) - { - bool metadataSynced = true; - SetNodeMetadataSync(nodeName, nodePort, metadataSynced); - } - SetUpDistributedTableDependencies(newWorkerNode); return newWorkerNode->nodeId; } @@ -1186,7 +1191,18 @@ AddNodeMetadata(char *nodeName, int32 nodePort, if (nodeMetadata->groupId == COORDINATOR_GROUP_ID) { nodeMetadata->shouldHaveShards = false; + + /* + * Coordinator has always the authoritative metadata, reflect this + * fact in the pg_dist_node. + */ nodeMetadata->hasMetadata = true; + nodeMetadata->metadataSynced = true; + + /* + * There is no concept of "inactive" coordinator, so hard code it. + */ + nodeMetadata->isActive = true; } /* if nodeRole hasn't been added yet there's a constraint for one-node-per-group */ @@ -1258,8 +1274,17 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) switch (columnIndex) { + case Anum_pg_dist_node_hasmetadata: + { + ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "hasmetadata"); + + break; + } + case Anum_pg_dist_node_isactive: { + ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "isactive"); + metadataSyncCommand = NodeStateUpdateCommand(workerNode->nodeId, DatumGetBool(value)); break; @@ -1274,8 +1299,8 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) case Anum_pg_dist_node_metadatasynced: { - metadataSyncCommand = MetadataSyncedUpdateCommand(workerNode->nodeId, - DatumGetBool(value)); + ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "metadatasynced"); + break; } @@ -1314,6 +1339,22 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) } +/* + * ErrorIfCoordinatorMetadataSetFalse throws an error if the input node + * is the coordinator and the value is false. + */ +static void +ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *field) +{ + bool valueBool = DatumGetBool(value); + if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID) + { + ereport(ERROR, (errmsg("cannot change \"%s\" field of the " + "coordinator node", field))); + } +} + + /* * SetShouldHaveShards function sets the shouldhaveshards column of the * specified worker in pg_dist_node. @@ -1341,20 +1382,6 @@ SetNodeState(char *nodeName, int nodePort, bool isActive) } -/* - * SetNodeState function sets the isactive column of the specified worker in - * pg_dist_node to isActive. - * It returns the new worker node after the modification. - */ -static WorkerNode * -SetNodeMetadataSync(char *nodeName, int nodePort, bool metadataSynced) -{ - WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); - return SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, - BoolGetDatum(metadataSynced)); -} - - /* * GetNodeTuple function returns the heap tuple of given nodeName and nodePort. If the * node is not found this function returns NULL. diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 888f64c1d..78391ebbf 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -43,7 +43,6 @@ extern List * ShardDeleteCommandList(ShardInterval *shardInterval); extern char * NodeDeleteCommand(uint32 nodeId); extern char * NodeStateUpdateCommand(uint32 nodeId, bool isActive); extern char * ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards); -extern char * MetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced); extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId); extern char * CreateSchemaDDLCommand(Oid schemaId); extern List * GrantOnSchemaDDLCommands(Oid schemaId); diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index d16df188b..d0610d6cf 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -4,6 +4,9 @@ SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 90630500; SET citus.replication_model TO 'streaming'; +-- adding the coordinator as inactive is disallowed +SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); +ERROR: coordinator node cannot be added as inactive node -- idempotently add node to allow this test to run without add_coordinator SET client_min_messages TO WARNING; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); @@ -12,6 +15,10 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); 1 (1 row) +-- coordinator cannot be disabled +SELECT 1 FROM master_disable_node('localhost', :master_port); +ERROR: Disabling localhost:xxxxx failed +DETAIL: cannot change "isactive" field of the coordinator node RESET client_min_messages; SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); ?column? @@ -751,14 +758,6 @@ SELECT create_distributed_function('function_delegation(int)', '$1', 'test'); SET client_min_messages TO DEBUG1; CALL call_delegation(1); -<<<<<<< HEAD -DROP TABLE test CASCADE; -NOTICE: drop cascades to view single_node_view --- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added -SELECT 1 FROM master_remove_node('localhost', :master_port); -ERROR: cannot remove the last worker node because there are reference tables and it would cause data loss on reference tables -HINT: To proceed, either drop the reference tables or use undistribute_table() function to convert them to local tables -======= DEBUG: not pushing down procedure to the same node SELECT function_delegation(1); DEBUG: not pushing down function to the same node @@ -767,9 +766,13 @@ DEBUG: not pushing down function to the same node (1 row) ->>>>>>> 61cf557a8... Improve the robustness of function call delegation --- Cleanup SET client_min_messages TO WARNING; +DROP TABLE test CASCADE; +-- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added +SELECT 1 FROM master_remove_node('localhost', :master_port); +ERROR: cannot remove the last worker node because there are reference tables and it would cause data loss on reference tables +HINT: To proceed, either drop the reference tables or use undistribute_table() function to convert them to local tables +-- Cleanup DROP SCHEMA single_node CASCADE; -- Remove the coordinator again SELECT 1 FROM master_remove_node('localhost', :master_port); diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 69464a833..59463bcce 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -5,9 +5,16 @@ SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 90630500; SET citus.replication_model TO 'streaming'; +-- adding the coordinator as inactive is disallowed +SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); + -- idempotently add node to allow this test to run without add_coordinator SET client_min_messages TO WARNING; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); + +-- coordinator cannot be disabled +SELECT 1 FROM master_disable_node('localhost', :master_port); + RESET client_min_messages; SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); @@ -372,12 +379,12 @@ SET client_min_messages TO DEBUG1; CALL call_delegation(1); SELECT function_delegation(1); +SET client_min_messages TO WARNING; DROP TABLE test CASCADE; -- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added SELECT 1 FROM master_remove_node('localhost', :master_port); -- Cleanup -SET client_min_messages TO WARNING; DROP SCHEMA single_node CASCADE; -- Remove the coordinator again SELECT 1 FROM master_remove_node('localhost', :master_port);