Use the executor to sync objects during node activation

marcocitus/sync-via-executor
Marco Slot 2023-01-04 18:59:44 +01:00
parent eb75decbeb
commit 61ccecbd26
3 changed files with 36 additions and 11 deletions

View File

@ -21,6 +21,7 @@
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "commands/sequence.h"
#include "distributed/adaptive_executor.h"
#include "distributed/citus_acquire_lock.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/colocation_utils.h"
@ -29,6 +30,7 @@
#include "distributed/connection_management.h"
#include "distributed/maintenanced.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/metadata_utility.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_cache.h"
@ -101,6 +103,7 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada
*nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SyncDistributedObjectsToNodeList(List *workerNodeList);
static void ExecuteCommandListOnWorkerList(List *commandList, List *workerList);
static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode);
static void SyncPgDistTableMetadataToNodeList(List *nodeList);
static List * InterTableRelationshipCommandList();
@ -873,16 +876,41 @@ SyncDistributedObjectsToNodeList(List *workerNodeList)
EnsureSequentialModeMetadataOperations();
Assert(superuser());
Assert(ShouldPropagate());
List *commandList = SyncDistributedObjectsCommandList(workerNode);
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerListInCoordinatedTransaction(
workerNodesToSync,
CurrentUserName(),
commandList);
ExecuteCommandListOnWorkerList(commandList, workerNodesToSync);
}
/*
* ExecuteCommandListOnWorkerList executes a command list on all nodes
* in the given list.
*/
static void
ExecuteCommandListOnWorkerList(List *commandList, List *workerList)
{
List *taskList = NIL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerList)
{
ShardPlacement *placement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(placement, workerNode);
Task *task = CitusMakeNode(Task);
task->taskType = DDL_TASK;
SetTaskQueryStringList(task, commandList);
task->taskPlacementList = list_make1(placement);
taskList = lappend(taskList, task);
}
bool localExecutionSupported = false;
ExecuteUtilityTaskList(taskList, localExecutionSupported);
}

View File

@ -105,9 +105,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA").kill()');
(1 row)
SELECT master_activate_node('localhost', :worker_2_proxy_port);
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
ERROR: failure on connection marked as essential: localhost:xxxxx
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- verify node is not activated
SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;

View File

@ -210,9 +210,8 @@ SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extnam
-- adding the second node will fail as the text search template needs to be created manually
SELECT 1 from master_add_node('localhost', :worker_2_port);
WARNING: text search template "public.intdict_template" does not exist
ERROR: text search template "public.intdict_template" does not exist
CONTEXT: while executing command on localhost:xxxxx
ERROR: failure on connection marked as essential: localhost:xxxxx
-- create the text search template manually on the worker
\c - - - :worker_2_port
SET citus.enable_metadata_sync TO false;