Merge pull request #4167 from citusdata/metadata_improvements

Improve the robustness of function call delegation
pull/4188/head
Önder Kalacı 2020-09-21 15:14:21 +02:00 committed by GitHub
commit bc293d9d5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 140 additions and 11 deletions

View File

@ -129,6 +129,18 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
ereport(DEBUG1, (errmsg("there is no worker node with metadata"))); ereport(DEBUG1, (errmsg("there is no worker node with metadata")));
return false; return false;
} }
else if (workerNode->groupId == GetLocalGroupId())
{
/*
* Two reasons for this:
* (a) It would lead to infinite recursion as the node would
* keep pushing down the procedure as it gets
* (b) It doesn't have any value to pushdown as we are already
* on the node itself
*/
ereport(DEBUG1, (errmsg("not pushing down procedure to the same node")));
return false;
}
ereport(DEBUG1, (errmsg("pushing down the procedure"))); ereport(DEBUG1, (errmsg("pushing down the procedure")));
@ -175,6 +187,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
tupleDesc); tupleDesc);
executionParams->expectResults = expectResults; executionParams->expectResults = expectResults;
executionParams->xactProperties = xactProperties; executionParams->xactProperties = xactProperties;
executionParams->isUtilityCommand = true;
ExecuteTaskListExtended(executionParams); ExecuteTaskListExtended(executionParams);
DisableWorkerMessagePropagation(); DisableWorkerMessagePropagation();

View File

@ -103,6 +103,8 @@ static void PropagateNodeWideObjects(WorkerNode *newWorkerNode);
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
static bool UnsetMetadataSyncedForAll(void); static bool UnsetMetadataSyncedForAll(void);
static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value,
char *field);
static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards);
/* declarations for dynamic loading */ /* declarations for dynamic loading */
@ -209,6 +211,11 @@ master_add_inactive_node(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
if (nodeMetadata.groupId == COORDINATOR_GROUP_ID)
{
ereport(ERROR, (errmsg("coordinator node cannot be added as inactive node")));
}
int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
&nodeAlreadyExists); &nodeAlreadyExists);
TransactionModifiedNodeMetadata = true; TransactionModifiedNodeMetadata = true;
@ -331,13 +338,23 @@ master_disable_node(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(savedContext); MemoryContextSwitchTo(savedContext);
ErrorData *edata = CopyErrorData(); ErrorData *edata = CopyErrorData();
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), if (ClusterHasKnownMetadataWorkers())
errmsg("Disabling %s:%d failed", workerNode->workerName, {
nodePort), ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errdetail("%s", edata->message), errmsg("Disabling %s:%d failed", workerNode->workerName,
errhint( nodePort),
"If you are using MX, try stop_metadata_sync_to_node(hostname, port) " errdetail("%s", edata->message),
"for nodes that are down before disabling them."))); errhint(
"If you are using MX, try stop_metadata_sync_to_node(hostname, port) "
"for nodes that are down before disabling them.")));
}
else
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Disabling %s:%d failed", workerNode->workerName,
nodePort),
errdetail("%s", edata->message)));
}
} }
PG_END_TRY(); PG_END_TRY();
@ -1174,7 +1191,18 @@ AddNodeMetadata(char *nodeName, int32 nodePort,
if (nodeMetadata->groupId == COORDINATOR_GROUP_ID) if (nodeMetadata->groupId == COORDINATOR_GROUP_ID)
{ {
nodeMetadata->shouldHaveShards = false; nodeMetadata->shouldHaveShards = false;
/*
* Coordinator has always the authoritative metadata, reflect this
* fact in the pg_dist_node.
*/
nodeMetadata->hasMetadata = true; nodeMetadata->hasMetadata = true;
nodeMetadata->metadataSynced = true;
/*
* There is no concept of "inactive" coordinator, so hard code it.
*/
nodeMetadata->isActive = true;
} }
/* if nodeRole hasn't been added yet there's a constraint for one-node-per-group */ /* if nodeRole hasn't been added yet there's a constraint for one-node-per-group */
@ -1246,8 +1274,17 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
switch (columnIndex) switch (columnIndex)
{ {
case Anum_pg_dist_node_hasmetadata:
{
ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "hasmetadata");
break;
}
case Anum_pg_dist_node_isactive: case Anum_pg_dist_node_isactive:
{ {
ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "isactive");
metadataSyncCommand = NodeStateUpdateCommand(workerNode->nodeId, metadataSyncCommand = NodeStateUpdateCommand(workerNode->nodeId,
DatumGetBool(value)); DatumGetBool(value));
break; break;
@ -1260,6 +1297,13 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
break; break;
} }
case Anum_pg_dist_node_metadatasynced:
{
ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "metadatasynced");
break;
}
default: default:
{ {
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
@ -1295,6 +1339,22 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
} }
/*
* ErrorIfCoordinatorMetadataSetFalse throws an error if the input node
* is the coordinator and the value is false.
*/
static void
ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *field)
{
bool valueBool = DatumGetBool(value);
if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID)
{
ereport(ERROR, (errmsg("cannot change \"%s\" field of the "
"coordinator node", field)));
}
}
/* /*
* SetShouldHaveShards function sets the shouldhaveshards column of the * SetShouldHaveShards function sets the shouldhaveshards column of the
* specified worker in pg_dist_node. * specified worker in pg_dist_node.

View File

@ -302,6 +302,18 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
ereport(DEBUG1, (errmsg("the worker node does not have metadata"))); ereport(DEBUG1, (errmsg("the worker node does not have metadata")));
return NULL; return NULL;
} }
else if (workerNode->groupId == GetLocalGroupId())
{
/*
* Two reasons for this:
* (a) It would lead to infinite recursion as the node would
* keep pushing down the procedure as it gets
* (b) It doesn't have any value to pushdown as we are already
* on the node itself
*/
ereport(DEBUG1, (errmsg("not pushing down function to the same node")));
return NULL;
}
(void) expression_tree_walker((Node *) funcExpr->args, contain_param_walker, (void) expression_tree_walker((Node *) funcExpr->args, contain_param_walker,
&walkerParamContext); &walkerParamContext);

View File

@ -4,6 +4,9 @@ SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 90630500; SET citus.next_shard_id TO 90630500;
SET citus.replication_model TO 'streaming'; SET citus.replication_model TO 'streaming';
-- adding the coordinator as inactive is disallowed
SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);
ERROR: coordinator node cannot be added as inactive node
-- idempotently add node to allow this test to run without add_coordinator -- idempotently add node to allow this test to run without add_coordinator
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
@ -12,6 +15,10 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
1 1
(1 row) (1 row)
-- coordinator cannot be disabled
SELECT 1 FROM master_disable_node('localhost', :master_port);
ERROR: Disabling localhost:xxxxx failed
DETAIL: cannot change "isactive" field of the coordinator node
RESET client_min_messages; RESET client_min_messages;
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
?column? ?column?
@ -729,7 +736,7 @@ END;$$;
SELECT * FROM pg_dist_node; SELECT * FROM pg_dist_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 0 | localhost | 57636 | default | t | t | primary | default | f | t 1 | 0 | localhost | 57636 | default | t | t | primary | default | t | t
(1 row) (1 row)
SELECT create_distributed_function('call_delegation(int)', '$1', 'test'); SELECT create_distributed_function('call_delegation(int)', '$1', 'test');
@ -738,15 +745,34 @@ SELECT create_distributed_function('call_delegation(int)', '$1', 'test');
(1 row) (1 row)
CREATE FUNCTION function_delegation(int) RETURNS void AS $$
BEGIN
UPDATE test SET y = y + 1 WHERE x < $1;
END;
$$ LANGUAGE plpgsql;
SELECT create_distributed_function('function_delegation(int)', '$1', 'test');
create_distributed_function
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO DEBUG1;
CALL call_delegation(1); CALL call_delegation(1);
DEBUG: not pushing down procedure to the same node
SELECT function_delegation(1);
DEBUG: not pushing down function to the same node
function_delegation
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO WARNING;
DROP TABLE test CASCADE; DROP TABLE test CASCADE;
NOTICE: drop cascades to view single_node_view
-- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added -- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added
SELECT 1 FROM master_remove_node('localhost', :master_port); SELECT 1 FROM master_remove_node('localhost', :master_port);
ERROR: cannot remove the last worker node because there are reference tables and it would cause data loss on reference tables ERROR: cannot remove the last worker node because there are reference tables and it would cause data loss on reference tables
HINT: To proceed, either drop the reference tables or use undistribute_table() function to convert them to local tables HINT: To proceed, either drop the reference tables or use undistribute_table() function to convert them to local tables
-- Cleanup -- Cleanup
SET client_min_messages TO WARNING;
DROP SCHEMA single_node CASCADE; DROP SCHEMA single_node CASCADE;
-- Remove the coordinator again -- Remove the coordinator again
SELECT 1 FROM master_remove_node('localhost', :master_port); SELECT 1 FROM master_remove_node('localhost', :master_port);

View File

@ -5,9 +5,16 @@ SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 90630500; SET citus.next_shard_id TO 90630500;
SET citus.replication_model TO 'streaming'; SET citus.replication_model TO 'streaming';
-- adding the coordinator as inactive is disallowed
SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);
-- idempotently add node to allow this test to run without add_coordinator -- idempotently add node to allow this test to run without add_coordinator
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
-- coordinator cannot be disabled
SELECT 1 FROM master_disable_node('localhost', :master_port);
RESET client_min_messages; RESET client_min_messages;
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
@ -360,13 +367,24 @@ BEGIN
END;$$; END;$$;
SELECT * FROM pg_dist_node; SELECT * FROM pg_dist_node;
SELECT create_distributed_function('call_delegation(int)', '$1', 'test'); SELECT create_distributed_function('call_delegation(int)', '$1', 'test');
CREATE FUNCTION function_delegation(int) RETURNS void AS $$
BEGIN
UPDATE test SET y = y + 1 WHERE x < $1;
END;
$$ LANGUAGE plpgsql;
SELECT create_distributed_function('function_delegation(int)', '$1', 'test');
SET client_min_messages TO DEBUG1;
CALL call_delegation(1); CALL call_delegation(1);
SELECT function_delegation(1);
SET client_min_messages TO WARNING;
DROP TABLE test CASCADE; DROP TABLE test CASCADE;
-- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added -- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added
SELECT 1 FROM master_remove_node('localhost', :master_port); SELECT 1 FROM master_remove_node('localhost', :master_port);
-- Cleanup -- Cleanup
SET client_min_messages TO WARNING;
DROP SCHEMA single_node CASCADE; DROP SCHEMA single_node CASCADE;
-- Remove the coordinator again -- Remove the coordinator again
SELECT 1 FROM master_remove_node('localhost', :master_port); SELECT 1 FROM master_remove_node('localhost', :master_port);