diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 93264b8b1..f1db92c07 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -110,6 +110,7 @@ static void SyncDistributedObjectsToNode(WorkerNode *workerNode); static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode); static void SyncPgDistTableMetadataToNode(WorkerNode *workerNode); static List * InterTableRelationshipCommandList(); +static void BlockDistributedQueriesOnMetadataNodes(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static List * PropagateNodeWideObjectsCommandList(); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); @@ -488,7 +489,7 @@ citus_disable_node(PG_FUNCTION_ARGS) "citus_disable_node('%s', %d, force:=true, " "synchronous:=true); " "Passing synchronous:=false might cause replicated shards " - "to diverge.",workerNode->workerName, nodePort), + "to diverge.", workerNode->workerName, nodePort), errdetail("Citus uses the first worker node in the " "metadata for certain internal operations when " "replicated tables are modified. Synchronous mode " @@ -545,8 +546,19 @@ citus_disable_node(PG_FUNCTION_ARGS) * than the current node being disabled), the sync option would * fail because it'd try to sync the metadata changes to a node * that is not up and running. - * */ + if (firstWorkerNode && firstWorkerNode->nodeId == workerNode->nodeId) + { + /* + * We cannot let any modification query on a replicated table to run + * concurrently with citus_disable_node() on the first worker node. If + * we let that, some worker nodes might calculate FirstWorkerNode() + * different than others. See LockShardListResourcesOnFirstWorker() + * for the details. + */ + BlockDistributedQueriesOnMetadataNodes(); + } + SyncNodeMetadataToNodes(); } else if (UnsetMetadataSyncedForAllWorkers()) @@ -571,6 +583,48 @@ citus_disable_node(PG_FUNCTION_ARGS) } +/* + * BlockDistributedQueriesOnMetadataNodes blocks all the + */ +static void +BlockDistributedQueriesOnMetadataNodes(void) +{ + /* first, block on the coordinator */ + LockRelationOid(DistNodeRelationId(), ExclusiveLock); + + List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock); + WorkerNode *workerNodeToLock = NULL; + foreach_ptr(workerNodeToLock, workerList) + { + if (!workerNodeToLock->hasMetadata) + { + /* non-metadata workers cannot run distributed queries, so skip */ + continue; + } + + /* + * Note that we might re-design this lock to be more granular than + * pg_dist_node, scoping only for modifications on the replicated + * tables. However, we currently do not have any such mechanism and + * given that citus_disable_node() runs instantly, it seems acceptable + * to block reads (or modifications on non-replicated tables) for + * a while. + */ + + /* only superuser can disable node */ + Assert(superuser()); + + List *commandList = + list_make1("LOCK TABLE pg_dist_node IN EXCLUSIVE MODE;"); + SendMetadataCommandListToWorkerInCoordinatedTransaction( + workerNodeToLock->workerName, + workerNodeToLock->workerPort, + CurrentUserName(), + commandList); + } +} + + /* * master_disable_node is a wrapper function for old UDF name. */ diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 1054049e4..16c0afb54 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -393,7 +393,7 @@ NodeNamePortCompare(const char *workerLhsName, const char *workerRhsName, WorkerNode * GetFirstPrimaryWorkerNode(void) { - List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(RowShareLock); WorkerNode *firstWorkerNode = NULL; WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList)