From 61ccecbd26ff8547c9b019f505966968a9d87806 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 4 Jan 2023 18:59:44 +0100 Subject: [PATCH] Use the executor to sync objects during node activation --- .../distributed/metadata/node_metadata.c | 40 ++++++++++++++++--- .../expected/failure_add_disable_node.out | 4 +- .../expected/propagate_extension_commands.out | 3 +- 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 9cc4c44d2..1869cd721 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -21,6 +21,7 @@ #include "catalog/indexing.h" #include "catalog/namespace.h" #include "commands/sequence.h" +#include "distributed/adaptive_executor.h" #include "distributed/citus_acquire_lock.h" #include "distributed/citus_safe_lib.h" #include "distributed/colocation_utils.h" @@ -29,6 +30,7 @@ #include "distributed/connection_management.h" #include "distributed/maintenanced.h" #include "distributed/coordinator_protocol.h" +#include "distributed/deparse_shard_query.h" #include "distributed/metadata_utility.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata_cache.h" @@ -101,6 +103,7 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); static void SyncDistributedObjectsToNodeList(List *workerNodeList); +static void ExecuteCommandListOnWorkerList(List *commandList, List *workerList); static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode); static void SyncPgDistTableMetadataToNodeList(List *nodeList); static List * InterTableRelationshipCommandList(); @@ -873,16 +876,41 @@ SyncDistributedObjectsToNodeList(List *workerNodeList) EnsureSequentialModeMetadataOperations(); + Assert(superuser()); Assert(ShouldPropagate()); List *commandList = SyncDistributedObjectsCommandList(workerNode); - /* send commands to new workers, the current user should be a superuser */ - Assert(superuser()); - SendMetadataCommandListToWorkerListInCoordinatedTransaction( - workerNodesToSync, - CurrentUserName(), - commandList); + ExecuteCommandListOnWorkerList(commandList, workerNodesToSync); +} + + +/* + * ExecuteCommandListOnWorkerList executes a command list on all nodes + * in the given list. + */ +static void +ExecuteCommandListOnWorkerList(List *commandList, List *workerList) +{ + List *taskList = NIL; + + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerList) + { + ShardPlacement *placement = CitusMakeNode(ShardPlacement); + SetPlacementNodeMetadata(placement, workerNode); + + Task *task = CitusMakeNode(Task); + task->taskType = DDL_TASK; + SetTaskQueryStringList(task, commandList); + task->taskPlacementList = list_make1(placement); + + taskList = lappend(taskList, task); + } + + bool localExecutionSupported = false; + + ExecuteUtilityTaskList(taskList, localExecutionSupported); } diff --git a/src/test/regress/expected/failure_add_disable_node.out b/src/test/regress/expected/failure_add_disable_node.out index d2a389d96..f2e842ed2 100644 --- a/src/test/regress/expected/failure_add_disable_node.out +++ b/src/test/regress/expected/failure_add_disable_node.out @@ -105,9 +105,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA").kill()'); (1 row) SELECT master_activate_node('localhost', :worker_2_proxy_port); -WARNING: connection not open -CONTEXT: while executing command on localhost:xxxxx -ERROR: failure on connection marked as essential: localhost:xxxxx +ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -- verify node is not activated SELECT * FROM master_get_active_worker_nodes() ORDER BY 1, 2; diff --git a/src/test/regress/expected/propagate_extension_commands.out b/src/test/regress/expected/propagate_extension_commands.out index 8d43769f1..23e86c2cb 100644 --- a/src/test/regress/expected/propagate_extension_commands.out +++ b/src/test/regress/expected/propagate_extension_commands.out @@ -210,9 +210,8 @@ SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extnam -- adding the second node will fail as the text search template needs to be created manually SELECT 1 from master_add_node('localhost', :worker_2_port); -WARNING: text search template "public.intdict_template" does not exist +ERROR: text search template "public.intdict_template" does not exist CONTEXT: while executing command on localhost:xxxxx -ERROR: failure on connection marked as essential: localhost:xxxxx -- create the text search template manually on the worker \c - - - :worker_2_port SET citus.enable_metadata_sync TO false;