mirror of https://github.com/citusdata/citus.git
Prevent distributed queries while disabling first worker node
First worker node has a special meaning for modifications on the replicated tables It is used to acquire a remote lock, such that the modifications are serialized. With this commit, we make sure that we do not let any distributed query to see a different 'first worker node' while first worker node is disabled. Note that, maybe implicitly mentioned above, when first worker node is disabled, the first worker node changes, that's why we have to handle the situation.pull/5912/head
parent
db998b3d66
commit
b4dbd84743
|
@ -110,6 +110,7 @@ static void SyncDistributedObjectsToNode(WorkerNode *workerNode);
|
|||
static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode);
|
||||
static void SyncPgDistTableMetadataToNode(WorkerNode *workerNode);
|
||||
static List * InterTableRelationshipCommandList();
|
||||
static void BlockDistributedQueriesOnMetadataNodes(void);
|
||||
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
|
||||
static List * PropagateNodeWideObjectsCommandList();
|
||||
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
|
||||
|
@ -488,7 +489,7 @@ citus_disable_node(PG_FUNCTION_ARGS)
|
|||
"citus_disable_node('%s', %d, force:=true, "
|
||||
"synchronous:=true); "
|
||||
"Passing synchronous:=false might cause replicated shards "
|
||||
"to diverge.",workerNode->workerName, nodePort),
|
||||
"to diverge.", workerNode->workerName, nodePort),
|
||||
errdetail("Citus uses the first worker node in the "
|
||||
"metadata for certain internal operations when "
|
||||
"replicated tables are modified. Synchronous mode "
|
||||
|
@ -545,8 +546,19 @@ citus_disable_node(PG_FUNCTION_ARGS)
|
|||
* than the current node being disabled), the sync option would
|
||||
* fail because it'd try to sync the metadata changes to a node
|
||||
* that is not up and running.
|
||||
*
|
||||
*/
|
||||
if (firstWorkerNode && firstWorkerNode->nodeId == workerNode->nodeId)
|
||||
{
|
||||
/*
|
||||
* We cannot let any modification query on a replicated table to run
|
||||
* concurrently with citus_disable_node() on the first worker node. If
|
||||
* we let that, some worker nodes might calculate FirstWorkerNode()
|
||||
* different than others. See LockShardListResourcesOnFirstWorker()
|
||||
* for the details.
|
||||
*/
|
||||
BlockDistributedQueriesOnMetadataNodes();
|
||||
}
|
||||
|
||||
SyncNodeMetadataToNodes();
|
||||
}
|
||||
else if (UnsetMetadataSyncedForAllWorkers())
|
||||
|
@ -571,6 +583,48 @@ citus_disable_node(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* BlockDistributedQueriesOnMetadataNodes blocks all the
|
||||
*/
|
||||
static void
|
||||
BlockDistributedQueriesOnMetadataNodes(void)
|
||||
{
|
||||
/* first, block on the coordinator */
|
||||
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
|
||||
|
||||
List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
|
||||
WorkerNode *workerNodeToLock = NULL;
|
||||
foreach_ptr(workerNodeToLock, workerList)
|
||||
{
|
||||
if (!workerNodeToLock->hasMetadata)
|
||||
{
|
||||
/* non-metadata workers cannot run distributed queries, so skip */
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* Note that we might re-design this lock to be more granular than
|
||||
* pg_dist_node, scoping only for modifications on the replicated
|
||||
* tables. However, we currently do not have any such mechanism and
|
||||
* given that citus_disable_node() runs instantly, it seems acceptable
|
||||
* to block reads (or modifications on non-replicated tables) for
|
||||
* a while.
|
||||
*/
|
||||
|
||||
/* only superuser can disable node */
|
||||
Assert(superuser());
|
||||
|
||||
List *commandList =
|
||||
list_make1("LOCK TABLE pg_dist_node IN EXCLUSIVE MODE;");
|
||||
SendMetadataCommandListToWorkerInCoordinatedTransaction(
|
||||
workerNodeToLock->workerName,
|
||||
workerNodeToLock->workerPort,
|
||||
CurrentUserName(),
|
||||
commandList);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* master_disable_node is a wrapper function for old UDF name.
|
||||
*/
|
||||
|
|
|
@ -393,7 +393,7 @@ NodeNamePortCompare(const char *workerLhsName, const char *workerRhsName,
|
|||
WorkerNode *
|
||||
GetFirstPrimaryWorkerNode(void)
|
||||
{
|
||||
List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
|
||||
List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(RowShareLock);
|
||||
WorkerNode *firstWorkerNode = NULL;
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerNodeList)
|
||||
|
|
Loading…
Reference in New Issue