diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 0cf87fe51..ff547f1cb 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -129,6 +129,18 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, ereport(DEBUG1, (errmsg("there is no worker node with metadata"))); return false; } + else if (workerNode->groupId == GetLocalGroupId()) + { + /* + * Two reasons for this: + * (a) It would lead to infinite recursion as the node would + * keep pushing down the procedure as it gets + * (b) It doesn't have any value to pushdown as we are already + * on the node itself + */ + ereport(DEBUG1, (errmsg("not pushing down procedure to the same node"))); + return false; + } ereport(DEBUG1, (errmsg("pushing down the procedure"))); @@ -175,6 +187,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, tupleDesc); executionParams->expectResults = expectResults; executionParams->xactProperties = xactProperties; + executionParams->isUtilityCommand = true; ExecuteTaskListExtended(executionParams); DisableWorkerMessagePropagation(); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 163cea3cc..e6e411ed6 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -926,6 +926,24 @@ ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards) } +/* + * MetadataSyncedUpdateCommand generates a command that can be executed to + * update the metadatasynced column of a node in pg_dist_node table. + */ +char * +MetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced) +{ + StringInfo nodeStateUpdateCommand = makeStringInfo(); + char *metadataSyncedString = metadataSynced ? "TRUE" : "FALSE"; + + appendStringInfo(nodeStateUpdateCommand, + "UPDATE pg_catalog.pg_dist_node SET metadatasynced = %s " + "WHERE nodeid = %u", metadataSyncedString, nodeId); + + return nodeStateUpdateCommand->data; +} + + /* * ColocationIdUpdateCommand creates the SQL command to change the colocationId * of the table with the given name to the given colocationId in pg_dist_partition diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index ba28fedb6..49ff2dfa6 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -91,6 +91,8 @@ static void RemoveNodeFromCluster(char *nodeName, int32 nodePort); static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata, bool *nodeAlreadyExists); static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive); +static WorkerNode * SetNodeMetadataSync(char *nodeName, int nodePort, bool + metadataSynced); static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort); static int32 GetNextGroupId(void); static int GetNextNodeId(void); @@ -642,6 +644,16 @@ ActivateNode(char *nodeName, int nodePort) WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive); + /* + * Coordinator has always the authoritative metadata, reflect this + * fact in the pg_dist_node. + */ + if (newWorkerNode->groupId == COORDINATOR_GROUP_ID) + { + bool metadataSynced = true; + SetNodeMetadataSync(nodeName, nodePort, metadataSynced); + } + SetUpDistributedTableDependencies(newWorkerNode); return newWorkerNode->nodeId; } @@ -1260,6 +1272,13 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) break; } + case Anum_pg_dist_node_metadatasynced: + { + metadataSyncCommand = MetadataSyncedUpdateCommand(workerNode->nodeId, + DatumGetBool(value)); + break; + } + default: { ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", @@ -1322,6 +1341,20 @@ SetNodeState(char *nodeName, int nodePort, bool isActive) } +/* + * SetNodeState function sets the isactive column of the specified worker in + * pg_dist_node to isActive. + * It returns the new worker node after the modification. + */ +static WorkerNode * +SetNodeMetadataSync(char *nodeName, int nodePort, bool metadataSynced) +{ + WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); + return SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, + BoolGetDatum(metadataSynced)); +} + + /* * GetNodeTuple function returns the heap tuple of given nodeName and nodePort. If the * node is not found this function returns NULL. diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 533a26689..e389a2b92 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -302,6 +302,18 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) ereport(DEBUG1, (errmsg("the worker node does not have metadata"))); return NULL; } + else if (workerNode->groupId == GetLocalGroupId()) + { + /* + * Two reasons for this: + * (a) It would lead to infinite recursion as the node would + * keep pushing down the procedure as it gets + * (b) It doesn't have any value to pushdown as we are already + * on the node itself + */ + ereport(DEBUG1, (errmsg("not pushing down function to the same node"))); + return NULL; + } (void) expression_tree_walker((Node *) funcExpr->args, contain_param_walker, &walkerParamContext); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 78391ebbf..888f64c1d 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -43,6 +43,7 @@ extern List * ShardDeleteCommandList(ShardInterval *shardInterval); extern char * NodeDeleteCommand(uint32 nodeId); extern char * NodeStateUpdateCommand(uint32 nodeId, bool isActive); extern char * ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards); +extern char * MetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced); extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId); extern char * CreateSchemaDDLCommand(Oid schemaId); extern List * GrantOnSchemaDDLCommands(Oid schemaId); diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 3eb117231..d16df188b 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -729,7 +729,7 @@ END;$$; SELECT * FROM pg_dist_node; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards --------------------------------------------------------------------- - 1 | 0 | localhost | 57636 | default | t | t | primary | default | f | t + 1 | 0 | localhost | 57636 | default | t | t | primary | default | t | t (1 row) SELECT create_distributed_function('call_delegation(int)', '$1', 'test'); @@ -738,13 +738,36 @@ SELECT create_distributed_function('call_delegation(int)', '$1', 'test'); (1 row) +CREATE FUNCTION function_delegation(int) RETURNS void AS $$ +BEGIN +UPDATE test SET y = y + 1 WHERE x < $1; +END; +$$ LANGUAGE plpgsql; +SELECT create_distributed_function('function_delegation(int)', '$1', 'test'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; CALL call_delegation(1); +<<<<<<< HEAD DROP TABLE test CASCADE; NOTICE: drop cascades to view single_node_view -- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added SELECT 1 FROM master_remove_node('localhost', :master_port); ERROR: cannot remove the last worker node because there are reference tables and it would cause data loss on reference tables HINT: To proceed, either drop the reference tables or use undistribute_table() function to convert them to local tables +======= +DEBUG: not pushing down procedure to the same node +SELECT function_delegation(1); +DEBUG: not pushing down function to the same node + function_delegation +--------------------------------------------------------------------- + +(1 row) + +>>>>>>> 61cf557a8... Improve the robustness of function call delegation -- Cleanup SET client_min_messages TO WARNING; DROP SCHEMA single_node CASCADE; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 2a744178f..69464a833 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -360,7 +360,18 @@ BEGIN END;$$; SELECT * FROM pg_dist_node; SELECT create_distributed_function('call_delegation(int)', '$1', 'test'); + +CREATE FUNCTION function_delegation(int) RETURNS void AS $$ +BEGIN +UPDATE test SET y = y + 1 WHERE x < $1; +END; +$$ LANGUAGE plpgsql; +SELECT create_distributed_function('function_delegation(int)', '$1', 'test'); + +SET client_min_messages TO DEBUG1; CALL call_delegation(1); +SELECT function_delegation(1); + DROP TABLE test CASCADE; -- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added SELECT 1 FROM master_remove_node('localhost', :master_port);