diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 0cf87fe51..ff547f1cb 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -129,6 +129,18 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, ereport(DEBUG1, (errmsg("there is no worker node with metadata"))); return false; } + else if (workerNode->groupId == GetLocalGroupId()) + { + /* + * Two reasons for this: + * (a) It would lead to infinite recursion as the node would + * keep pushing down the procedure as it gets + * (b) It doesn't have any value to pushdown as we are already + * on the node itself + */ + ereport(DEBUG1, (errmsg("not pushing down procedure to the same node"))); + return false; + } ereport(DEBUG1, (errmsg("pushing down the procedure"))); @@ -175,6 +187,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, tupleDesc); executionParams->expectResults = expectResults; executionParams->xactProperties = xactProperties; + executionParams->isUtilityCommand = true; ExecuteTaskListExtended(executionParams); DisableWorkerMessagePropagation(); diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index ba28fedb6..e24271b6b 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -103,6 +103,8 @@ static void PropagateNodeWideObjects(WorkerNode *newWorkerNode); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static bool UnsetMetadataSyncedForAll(void); +static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, + char *field); static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); /* declarations for dynamic loading */ @@ -209,6 +211,11 @@ master_add_inactive_node(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); + if (nodeMetadata.groupId == COORDINATOR_GROUP_ID) + { + ereport(ERROR, (errmsg("coordinator node cannot be added as inactive node"))); + } + int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, &nodeAlreadyExists); TransactionModifiedNodeMetadata = true; @@ -331,13 +338,23 @@ master_disable_node(PG_FUNCTION_ARGS) MemoryContextSwitchTo(savedContext); ErrorData *edata = CopyErrorData(); - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("Disabling %s:%d failed", workerNode->workerName, - nodePort), - errdetail("%s", edata->message), - errhint( - "If you are using MX, try stop_metadata_sync_to_node(hostname, port) " - "for nodes that are down before disabling them."))); + if (ClusterHasKnownMetadataWorkers()) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Disabling %s:%d failed", workerNode->workerName, + nodePort), + errdetail("%s", edata->message), + errhint( + "If you are using MX, try stop_metadata_sync_to_node(hostname, port) " + "for nodes that are down before disabling them."))); + } + else + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Disabling %s:%d failed", workerNode->workerName, + nodePort), + errdetail("%s", edata->message))); + } } PG_END_TRY(); @@ -1174,7 +1191,18 @@ AddNodeMetadata(char *nodeName, int32 nodePort, if (nodeMetadata->groupId == COORDINATOR_GROUP_ID) { nodeMetadata->shouldHaveShards = false; + + /* + * Coordinator has always the authoritative metadata, reflect this + * fact in the pg_dist_node. + */ nodeMetadata->hasMetadata = true; + nodeMetadata->metadataSynced = true; + + /* + * There is no concept of "inactive" coordinator, so hard code it. + */ + nodeMetadata->isActive = true; } /* if nodeRole hasn't been added yet there's a constraint for one-node-per-group */ @@ -1246,8 +1274,17 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) switch (columnIndex) { + case Anum_pg_dist_node_hasmetadata: + { + ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "hasmetadata"); + + break; + } + case Anum_pg_dist_node_isactive: { + ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "isactive"); + metadataSyncCommand = NodeStateUpdateCommand(workerNode->nodeId, DatumGetBool(value)); break; @@ -1260,6 +1297,13 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) break; } + case Anum_pg_dist_node_metadatasynced: + { + ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "metadatasynced"); + + break; + } + default: { ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", @@ -1295,6 +1339,22 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) } +/* + * ErrorIfCoordinatorMetadataSetFalse throws an error if the input node + * is the coordinator and the value is false. + */ +static void +ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *field) +{ + bool valueBool = DatumGetBool(value); + if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID) + { + ereport(ERROR, (errmsg("cannot change \"%s\" field of the " + "coordinator node", field))); + } +} + + /* * SetShouldHaveShards function sets the shouldhaveshards column of the * specified worker in pg_dist_node. diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 533a26689..e389a2b92 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -302,6 +302,18 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) ereport(DEBUG1, (errmsg("the worker node does not have metadata"))); return NULL; } + else if (workerNode->groupId == GetLocalGroupId()) + { + /* + * Two reasons for this: + * (a) It would lead to infinite recursion as the node would + * keep pushing down the procedure as it gets + * (b) It doesn't have any value to pushdown as we are already + * on the node itself + */ + ereport(DEBUG1, (errmsg("not pushing down function to the same node"))); + return NULL; + } (void) expression_tree_walker((Node *) funcExpr->args, contain_param_walker, &walkerParamContext); diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 3eb117231..d0610d6cf 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -4,6 +4,9 @@ SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 90630500; SET citus.replication_model TO 'streaming'; +-- adding the coordinator as inactive is disallowed +SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); +ERROR: coordinator node cannot be added as inactive node -- idempotently add node to allow this test to run without add_coordinator SET client_min_messages TO WARNING; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); @@ -12,6 +15,10 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); 1 (1 row) +-- coordinator cannot be disabled +SELECT 1 FROM master_disable_node('localhost', :master_port); +ERROR: Disabling localhost:xxxxx failed +DETAIL: cannot change "isactive" field of the coordinator node RESET client_min_messages; SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); ?column? @@ -729,7 +736,7 @@ END;$$; SELECT * FROM pg_dist_node; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards --------------------------------------------------------------------- - 1 | 0 | localhost | 57636 | default | t | t | primary | default | f | t + 1 | 0 | localhost | 57636 | default | t | t | primary | default | t | t (1 row) SELECT create_distributed_function('call_delegation(int)', '$1', 'test'); @@ -738,15 +745,34 @@ SELECT create_distributed_function('call_delegation(int)', '$1', 'test'); (1 row) +CREATE FUNCTION function_delegation(int) RETURNS void AS $$ +BEGIN +UPDATE test SET y = y + 1 WHERE x < $1; +END; +$$ LANGUAGE plpgsql; +SELECT create_distributed_function('function_delegation(int)', '$1', 'test'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; CALL call_delegation(1); +DEBUG: not pushing down procedure to the same node +SELECT function_delegation(1); +DEBUG: not pushing down function to the same node + function_delegation +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO WARNING; DROP TABLE test CASCADE; -NOTICE: drop cascades to view single_node_view -- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added SELECT 1 FROM master_remove_node('localhost', :master_port); ERROR: cannot remove the last worker node because there are reference tables and it would cause data loss on reference tables HINT: To proceed, either drop the reference tables or use undistribute_table() function to convert them to local tables -- Cleanup -SET client_min_messages TO WARNING; DROP SCHEMA single_node CASCADE; -- Remove the coordinator again SELECT 1 FROM master_remove_node('localhost', :master_port); diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 2a744178f..59463bcce 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -5,9 +5,16 @@ SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 90630500; SET citus.replication_model TO 'streaming'; +-- adding the coordinator as inactive is disallowed +SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); + -- idempotently add node to allow this test to run without add_coordinator SET client_min_messages TO WARNING; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); + +-- coordinator cannot be disabled +SELECT 1 FROM master_disable_node('localhost', :master_port); + RESET client_min_messages; SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); @@ -360,13 +367,24 @@ BEGIN END;$$; SELECT * FROM pg_dist_node; SELECT create_distributed_function('call_delegation(int)', '$1', 'test'); + +CREATE FUNCTION function_delegation(int) RETURNS void AS $$ +BEGIN +UPDATE test SET y = y + 1 WHERE x < $1; +END; +$$ LANGUAGE plpgsql; +SELECT create_distributed_function('function_delegation(int)', '$1', 'test'); + +SET client_min_messages TO DEBUG1; CALL call_delegation(1); +SELECT function_delegation(1); + +SET client_min_messages TO WARNING; DROP TABLE test CASCADE; -- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added SELECT 1 FROM master_remove_node('localhost', :master_port); -- Cleanup -SET client_min_messages TO WARNING; DROP SCHEMA single_node CASCADE; -- Remove the coordinator again SELECT 1 FROM master_remove_node('localhost', :master_port);