From 6fc1dea85c54f2332dc4003457c9ddfc49ed1773 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 17 Sep 2020 09:28:34 +0200 Subject: [PATCH] Improve the robustness of function call delegation Pushing down the CALLs to the node that the CALL is executed is dangerous and could lead to infinite recursion. When the coordinator added as worker, Citus was by chance preventing this. The coordinator was marked as "not metadatasynced" node in pg_dist_node, which prevented CALL/function delegation to happen. With this commit, we do the following: - Fix metadatasynced column for the coordinator on pg_dist_node - Prevent pushdown of function/procedure to the same node that the function/procedure is being executed. Today, we do not sync pg_dist_object (e.g., distributed functions metadata) to the worker nodes. But, even if we do it now, the function call delegation would prevent the infinite recursion. --- src/backend/distributed/commands/call.c | 13 ++++++++ .../distributed/metadata/metadata_sync.c | 18 ++++++++++ .../distributed/metadata/node_metadata.c | 33 +++++++++++++++++++ .../planner/function_call_delegation.c | 12 +++++++ src/include/distributed/metadata_sync.h | 1 + src/test/regress/expected/single_node.out | 25 +++++++++++++- src/test/regress/sql/single_node.sql | 11 +++++++ 7 files changed, 112 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 0cf87fe51..ff547f1cb 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -129,6 +129,18 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, ereport(DEBUG1, (errmsg("there is no worker node with metadata"))); return false; } + else if (workerNode->groupId == GetLocalGroupId()) + { + /* + * Two reasons for this: + * (a) It would lead to infinite recursion as the node would + * keep pushing down the procedure as it gets + * (b) It doesn't have any value to pushdown as we are already + * on the node itself + */ + ereport(DEBUG1, (errmsg("not pushing down procedure to the same node"))); + return false; + } ereport(DEBUG1, (errmsg("pushing down the procedure"))); @@ -175,6 +187,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, tupleDesc); executionParams->expectResults = expectResults; executionParams->xactProperties = xactProperties; + executionParams->isUtilityCommand = true; ExecuteTaskListExtended(executionParams); DisableWorkerMessagePropagation(); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 163cea3cc..e6e411ed6 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -926,6 +926,24 @@ ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards) } +/* + * MetadataSyncedUpdateCommand generates a command that can be executed to + * update the metadatasynced column of a node in pg_dist_node table. + */ +char * +MetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced) +{ + StringInfo nodeStateUpdateCommand = makeStringInfo(); + char *metadataSyncedString = metadataSynced ? "TRUE" : "FALSE"; + + appendStringInfo(nodeStateUpdateCommand, + "UPDATE pg_catalog.pg_dist_node SET metadatasynced = %s " + "WHERE nodeid = %u", metadataSyncedString, nodeId); + + return nodeStateUpdateCommand->data; +} + + /* * ColocationIdUpdateCommand creates the SQL command to change the colocationId * of the table with the given name to the given colocationId in pg_dist_partition diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index ba28fedb6..49ff2dfa6 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -91,6 +91,8 @@ static void RemoveNodeFromCluster(char *nodeName, int32 nodePort); static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata, bool *nodeAlreadyExists); static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive); +static WorkerNode * SetNodeMetadataSync(char *nodeName, int nodePort, bool + metadataSynced); static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort); static int32 GetNextGroupId(void); static int GetNextNodeId(void); @@ -642,6 +644,16 @@ ActivateNode(char *nodeName, int nodePort) WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive); + /* + * Coordinator has always the authoritative metadata, reflect this + * fact in the pg_dist_node. + */ + if (newWorkerNode->groupId == COORDINATOR_GROUP_ID) + { + bool metadataSynced = true; + SetNodeMetadataSync(nodeName, nodePort, metadataSynced); + } + SetUpDistributedTableDependencies(newWorkerNode); return newWorkerNode->nodeId; } @@ -1260,6 +1272,13 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) break; } + case Anum_pg_dist_node_metadatasynced: + { + metadataSyncCommand = MetadataSyncedUpdateCommand(workerNode->nodeId, + DatumGetBool(value)); + break; + } + default: { ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", @@ -1322,6 +1341,20 @@ SetNodeState(char *nodeName, int nodePort, bool isActive) } +/* + * SetNodeState function sets the isactive column of the specified worker in + * pg_dist_node to isActive. + * It returns the new worker node after the modification. + */ +static WorkerNode * +SetNodeMetadataSync(char *nodeName, int nodePort, bool metadataSynced) +{ + WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); + return SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, + BoolGetDatum(metadataSynced)); +} + + /* * GetNodeTuple function returns the heap tuple of given nodeName and nodePort. If the * node is not found this function returns NULL. diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 533a26689..e389a2b92 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -302,6 +302,18 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) ereport(DEBUG1, (errmsg("the worker node does not have metadata"))); return NULL; } + else if (workerNode->groupId == GetLocalGroupId()) + { + /* + * Two reasons for this: + * (a) It would lead to infinite recursion as the node would + * keep pushing down the procedure as it gets + * (b) It doesn't have any value to pushdown as we are already + * on the node itself + */ + ereport(DEBUG1, (errmsg("not pushing down function to the same node"))); + return NULL; + } (void) expression_tree_walker((Node *) funcExpr->args, contain_param_walker, &walkerParamContext); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 78391ebbf..888f64c1d 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -43,6 +43,7 @@ extern List * ShardDeleteCommandList(ShardInterval *shardInterval); extern char * NodeDeleteCommand(uint32 nodeId); extern char * NodeStateUpdateCommand(uint32 nodeId, bool isActive); extern char * ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards); +extern char * MetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced); extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId); extern char * CreateSchemaDDLCommand(Oid schemaId); extern List * GrantOnSchemaDDLCommands(Oid schemaId); diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 3eb117231..d16df188b 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -729,7 +729,7 @@ END;$$; SELECT * FROM pg_dist_node; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards --------------------------------------------------------------------- - 1 | 0 | localhost | 57636 | default | t | t | primary | default | f | t + 1 | 0 | localhost | 57636 | default | t | t | primary | default | t | t (1 row) SELECT create_distributed_function('call_delegation(int)', '$1', 'test'); @@ -738,13 +738,36 @@ SELECT create_distributed_function('call_delegation(int)', '$1', 'test'); (1 row) +CREATE FUNCTION function_delegation(int) RETURNS void AS $$ +BEGIN +UPDATE test SET y = y + 1 WHERE x < $1; +END; +$$ LANGUAGE plpgsql; +SELECT create_distributed_function('function_delegation(int)', '$1', 'test'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; CALL call_delegation(1); +<<<<<<< HEAD DROP TABLE test CASCADE; NOTICE: drop cascades to view single_node_view -- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added SELECT 1 FROM master_remove_node('localhost', :master_port); ERROR: cannot remove the last worker node because there are reference tables and it would cause data loss on reference tables HINT: To proceed, either drop the reference tables or use undistribute_table() function to convert them to local tables +======= +DEBUG: not pushing down procedure to the same node +SELECT function_delegation(1); +DEBUG: not pushing down function to the same node + function_delegation +--------------------------------------------------------------------- + +(1 row) + +>>>>>>> 61cf557a8... Improve the robustness of function call delegation -- Cleanup SET client_min_messages TO WARNING; DROP SCHEMA single_node CASCADE; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 2a744178f..69464a833 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -360,7 +360,18 @@ BEGIN END;$$; SELECT * FROM pg_dist_node; SELECT create_distributed_function('call_delegation(int)', '$1', 'test'); + +CREATE FUNCTION function_delegation(int) RETURNS void AS $$ +BEGIN +UPDATE test SET y = y + 1 WHERE x < $1; +END; +$$ LANGUAGE plpgsql; +SELECT create_distributed_function('function_delegation(int)', '$1', 'test'); + +SET client_min_messages TO DEBUG1; CALL call_delegation(1); +SELECT function_delegation(1); + DROP TABLE test CASCADE; -- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added SELECT 1 FROM master_remove_node('localhost', :master_port);