mirror of https://github.com/citusdata/citus.git
Merge pull request #4214 from citusdata/degrade-gracefully-when-no-background-workers
Degrade gracefully when no background workers availablepull/4216/head
commit
889fc2db5f
|
@ -41,6 +41,7 @@
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "distributed/shared_connection_stats.h"
|
#include "distributed/shared_connection_stats.h"
|
||||||
|
#include "distributed/string_utils.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
|
@ -101,6 +102,7 @@ static void SetUpDistributedTableDependencies(WorkerNode *workerNode);
|
||||||
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
|
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
|
||||||
static void PropagateNodeWideObjects(WorkerNode *newWorkerNode);
|
static void PropagateNodeWideObjects(WorkerNode *newWorkerNode);
|
||||||
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
|
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
|
||||||
|
static void SetLockTimeoutLocally(int32 lock_cooldown);
|
||||||
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
|
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
|
||||||
static bool UnsetMetadataSyncedForAll(void);
|
static bool UnsetMetadataSyncedForAll(void);
|
||||||
static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value,
|
static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value,
|
||||||
|
@ -757,6 +759,24 @@ master_update_node(PG_FUNCTION_ARGS)
|
||||||
if (force)
|
if (force)
|
||||||
{
|
{
|
||||||
handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown);
|
handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown);
|
||||||
|
if (!handle)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We failed to start a background worker, which probably means that we exceeded
|
||||||
|
* max_worker_processes, and this is unlikely to be resolved by retrying. We do not want
|
||||||
|
* to repeatedly throw an error because if master_update_node is called to complete a
|
||||||
|
* failover then finishing is the only way to bring the cluster back up. Therefore we
|
||||||
|
* give up on killing other backends and simply wait for the lock. We do set
|
||||||
|
* lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock.
|
||||||
|
*/
|
||||||
|
SetLockTimeoutLocally(lock_cooldown);
|
||||||
|
ereport(WARNING, (errmsg(
|
||||||
|
"could not start background worker to kill backends with conflicting"
|
||||||
|
" locks to force the update. Degrading to acquiring locks "
|
||||||
|
"with a lock time out."),
|
||||||
|
errhint(
|
||||||
|
"Increasing max_worker_processes might help.")));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
|
placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
|
||||||
|
@ -807,6 +827,19 @@ master_update_node(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SetLockTimeoutLocally sets the lock_timeout to the given value.
|
||||||
|
* This setting is local.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
SetLockTimeoutLocally(int32 lockCooldown)
|
||||||
|
{
|
||||||
|
set_config_option("lock_timeout", ConvertIntToString(lockCooldown),
|
||||||
|
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
|
||||||
|
GUC_ACTION_LOCAL, true, 0, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort)
|
UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort)
|
||||||
{
|
{
|
||||||
|
|
|
@ -94,9 +94,7 @@ StartLockAcquireHelperBackgroundWorker(int backendToHelp, int32 lock_cooldown)
|
||||||
|
|
||||||
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("could not start lock acquiring background worker to "
|
return NULL;
|
||||||
"force the update"),
|
|
||||||
errhint("Increasing max_worker_processes might help.")));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
MemoryContextCallback *workerCleanup = palloc0(sizeof(MemoryContextCallback));
|
MemoryContextCallback *workerCleanup = palloc0(sizeof(MemoryContextCallback));
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* string_utils.c
|
||||||
|
*
|
||||||
|
* This file contains functions to perform useful operations on strings.
|
||||||
|
*
|
||||||
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "distributed/relay_utility.h"
|
||||||
|
#include "distributed/string_utils.h"
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ConvertIntToString returns the string version of given integer.
|
||||||
|
*/
|
||||||
|
char *
|
||||||
|
ConvertIntToString(int val)
|
||||||
|
{
|
||||||
|
StringInfo str = makeStringInfo();
|
||||||
|
|
||||||
|
appendStringInfo(str, "%d", val);
|
||||||
|
|
||||||
|
return str->data;
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* string_utils.h
|
||||||
|
* Utilities related to strings.
|
||||||
|
*
|
||||||
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef CITUS_STRING_UTILS_H
|
||||||
|
#define CITUS_STRING_UTILS_H
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
extern char * ConvertIntToString(int val);
|
||||||
|
|
||||||
|
#endif /* CITUS_STRING_UTILS_H */
|
Loading…
Reference in New Issue