mirror of https://github.com/citusdata/citus.git
Implementation of a dedicated maintenance database
parent
5880ecc680
commit
9f26f744b2
|
@ -42,6 +42,12 @@
|
||||||
#include "distributed/time_constants.h"
|
#include "distributed/time_constants.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_log_messages.h"
|
#include "distributed/worker_log_messages.h"
|
||||||
|
#include "mb/pg_wchar.h"
|
||||||
|
#include "pg_config.h"
|
||||||
|
#include "portability/instr_time.h"
|
||||||
|
#include "storage/ipc.h"
|
||||||
|
#include "utils/hsearch.h"
|
||||||
|
#include "utils/memutils.h"
|
||||||
|
|
||||||
|
|
||||||
int NodeConnectionTimeout = 30000;
|
int NodeConnectionTimeout = 30000;
|
||||||
|
@ -1504,14 +1510,16 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
|
||||||
* escalating the number of cached connections. We can recognize such backends
|
* escalating the number of cached connections. We can recognize such backends
|
||||||
* from their application name.
|
* from their application name.
|
||||||
*/
|
*/
|
||||||
return (isCitusMaintenanceDaemonBackend() || IsCitusInternalBackend() || IsRebalancerInternalBackend()) ||
|
return ((IsCitusMaintenanceDaemonBackend() && !IsMaintenanceManagementDatabase(MyDatabaseId)) ||
|
||||||
|
IsCitusInternalBackend() ||
|
||||||
|
IsRebalancerInternalBackend()) ||
|
||||||
connection->initializationState != POOL_STATE_INITIALIZED ||
|
connection->initializationState != POOL_STATE_INITIALIZED ||
|
||||||
cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
|
cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
|
||||||
connection->forceCloseAtTransactionEnd ||
|
connection->forceCloseAtTransactionEnd ||
|
||||||
PQstatus(connection->pgConn) != CONNECTION_OK ||
|
PQstatus(connection->pgConn) != CONNECTION_OK ||
|
||||||
!RemoteTransactionIdle(connection) ||
|
!RemoteTransactionIdle(connection) ||
|
||||||
connection->requiresReplication ||
|
connection->requiresReplication ||
|
||||||
connection->isReplicationOriginSessionSetup ||
|
connection->isReplicationOriginSessionSetup ||
|
||||||
(MaxCachedConnectionLifetime >= 0
|
(MaxCachedConnectionLifetime >= 0
|
||||||
&& MillisecondsToTimeout(connection->connectionEstablishmentStart, MaxCachedConnectionLifetime) <= 0);
|
&& MillisecondsToTimeout(connection->connectionEstablishmentStart, MaxCachedConnectionLifetime) <= 0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2706,6 +2706,16 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_STANDARD,
|
GUC_STANDARD,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomStringVariable(
|
||||||
|
"citus.maintenance_management_database",
|
||||||
|
gettext_noop("Database for cluster-wide maintenance operations across all databases"),
|
||||||
|
NULL,
|
||||||
|
&MaintenanceManagementDatabase,
|
||||||
|
"",
|
||||||
|
PGC_SIGHUP,
|
||||||
|
GUC_STANDARD,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
/* warn about config items in the citus namespace that are not registered above */
|
/* warn about config items in the citus namespace that are not registered above */
|
||||||
EmitWarningsOnPlaceholders("citus");
|
EmitWarningsOnPlaceholders("citus");
|
||||||
|
|
||||||
|
|
|
@ -1446,7 +1446,8 @@ IsCitusShardTransferBackend(void)
|
||||||
prefixLength) == 0;
|
prefixLength) == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isCitusMaintenanceDaemonBackend(void)
|
bool
|
||||||
|
IsCitusMaintenanceDaemonBackend(void)
|
||||||
{
|
{
|
||||||
if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED)
|
if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED)
|
||||||
{
|
{
|
||||||
|
|
|
@ -96,6 +96,7 @@ typedef struct MaintenanceDaemonDBData
|
||||||
|
|
||||||
/* config variable for distributed deadlock detection timeout */
|
/* config variable for distributed deadlock detection timeout */
|
||||||
double DistributedDeadlockDetectionTimeoutFactor = 2.0;
|
double DistributedDeadlockDetectionTimeoutFactor = 2.0;
|
||||||
|
char *MaintenanceManagementDatabase = "";
|
||||||
int Recover2PCInterval = 60000;
|
int Recover2PCInterval = 60000;
|
||||||
int DeferShardDeleteInterval = 15000;
|
int DeferShardDeleteInterval = 15000;
|
||||||
int BackgroundTaskQueueCheckInterval = 5000;
|
int BackgroundTaskQueueCheckInterval = 5000;
|
||||||
|
@ -705,32 +706,39 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
timeout = Min(timeout, Recover2PCInterval);
|
timeout = Min(timeout, Recover2PCInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* the config value -1 disables the distributed deadlock detection */
|
/*
|
||||||
if (DistributedDeadlockDetectionTimeoutFactor != -1.0)
|
* Execute only on the maintenance database, if it configured, otherwise run from every daemon.
|
||||||
|
* The config value -1 disables the distributed deadlock detection
|
||||||
|
*/
|
||||||
|
if (DistributedDeadlockDetectionTimeoutFactor != -1.0)
|
||||||
{
|
{
|
||||||
double deadlockTimeout =
|
double deadlockTimeout =
|
||||||
DistributedDeadlockDetectionTimeoutFactor * (double) DeadlockTimeout;
|
DistributedDeadlockDetectionTimeoutFactor * (double) DeadlockTimeout;
|
||||||
|
|
||||||
InvalidateMetadataSystemCache();
|
InvalidateMetadataSystemCache();
|
||||||
StartTransactionCommand();
|
StartTransactionCommand();
|
||||||
|
|
||||||
/*
|
|
||||||
* We skip the deadlock detection if citus extension
|
if ((strcmp(GetMaintenanceManagementDatabase(), "") == 0 || IsMaintenanceManagementDatabase(databaseOid)))
|
||||||
* is not accessible.
|
{
|
||||||
*
|
/*
|
||||||
* Similarly, we skip to run the deadlock checks if
|
* We skip the deadlock detection if citus extension
|
||||||
* there exists any version mismatch or the extension
|
* is not accessible.
|
||||||
* is not fully created yet.
|
*
|
||||||
*/
|
* Similarly, we skip to run the deadlock checks if
|
||||||
if (!LockCitusExtension())
|
* there exists any version mismatch or the extension
|
||||||
{
|
* is not fully created yet.
|
||||||
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
|
*/
|
||||||
"skipping deadlock detection")));
|
if (!LockCitusExtension())
|
||||||
}
|
{
|
||||||
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
|
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
|
||||||
{
|
"skipping deadlock detection")));
|
||||||
foundDeadlock = CheckForDistributedDeadlocks();
|
}
|
||||||
}
|
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
|
||||||
|
{
|
||||||
|
foundDeadlock = CheckForDistributedDeadlocks();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
CommitTransactionCommand();
|
CommitTransactionCommand();
|
||||||
|
|
||||||
|
@ -1228,3 +1236,35 @@ MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData)
|
||||||
|
|
||||||
return metadataSyncTriggered;
|
return metadataSyncTriggered;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char
|
||||||
|
*GetMaintenanceManagementDatabase(void)
|
||||||
|
{
|
||||||
|
char *result = MaintenanceManagementDatabase;
|
||||||
|
/* If MaintenanceManagementDatabase is not set, all maintenance daemons are considered independent */
|
||||||
|
if (strcmp(MaintenanceManagementDatabase, "") != 0)
|
||||||
|
{
|
||||||
|
Oid maintenanceDatabaseOid = get_database_oid(MaintenanceManagementDatabase, true);
|
||||||
|
if (!maintenanceDatabaseOid)
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
|
errmsg("Database %s doesn't exists, please check the citus.maintenance_management_database parameter.",
|
||||||
|
MaintenanceManagementDatabase)));
|
||||||
|
result = "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
IsMaintenanceManagementDatabase(Oid databaseOid)
|
||||||
|
{
|
||||||
|
if (strcmp(GetMaintenanceManagementDatabase(), "") == 0)
|
||||||
|
{
|
||||||
|
/* If MaintenanceManagementDatabase is not set, all maintenance daemons are considered independent */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Oid maintenanceDatabaseOid = get_database_oid(MaintenanceManagementDatabase, true);
|
||||||
|
return maintenanceDatabaseOid == databaseOid;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -78,7 +78,7 @@ extern bool IsRebalancerInternalBackend(void);
|
||||||
extern bool IsCitusRunCommandBackend(void);
|
extern bool IsCitusRunCommandBackend(void);
|
||||||
extern bool IsExternalClientBackend(void);
|
extern bool IsExternalClientBackend(void);
|
||||||
extern bool IsCitusShardTransferBackend(void);
|
extern bool IsCitusShardTransferBackend(void);
|
||||||
extern bool isCitusMaintenanceDaemonBackend(void);
|
extern bool IsCitusMaintenanceDaemonBackend(void);
|
||||||
|
|
||||||
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
|
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
|
||||||
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999
|
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999
|
||||||
|
|
|
@ -12,6 +12,8 @@
|
||||||
#ifndef MAINTENANCED_H
|
#ifndef MAINTENANCED_H
|
||||||
#define MAINTENANCED_H
|
#define MAINTENANCED_H
|
||||||
|
|
||||||
|
#include "commands/dbcommands.h"
|
||||||
|
|
||||||
/* collect statistics every 24 hours */
|
/* collect statistics every 24 hours */
|
||||||
#define STATS_COLLECTION_TIMEOUT_MILLIS (24 * 60 * 60 * 1000)
|
#define STATS_COLLECTION_TIMEOUT_MILLIS (24 * 60 * 60 * 1000)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue