mirror of https://github.com/citusdata/citus.git
Introducing new backend type and disabling caching for daemon backends
parent
76d10cc413
commit
19681ca592
|
@ -502,6 +502,12 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((flags & REQUIRE_MAINTENANCE_CONNECTION) &&
|
||||||
|
!connection->useForMaintenanceOperations)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if ((flags & REQUIRE_METADATA_CONNECTION) &&
|
if ((flags & REQUIRE_METADATA_CONNECTION) &&
|
||||||
!connection->useForMetadataOperations)
|
!connection->useForMetadataOperations)
|
||||||
{
|
{
|
||||||
|
@ -1498,7 +1504,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
|
||||||
* escalating the number of cached connections. We can recognize such backends
|
* escalating the number of cached connections. We can recognize such backends
|
||||||
* from their application name.
|
* from their application name.
|
||||||
*/
|
*/
|
||||||
return (IsCitusInternalBackend() || IsRebalancerInternalBackend()) ||
|
return (isCitusMaintenanceDaemonBackend() || IsCitusInternalBackend() || IsRebalancerInternalBackend()) ||
|
||||||
connection->initializationState != POOL_STATE_INITIALIZED ||
|
connection->initializationState != POOL_STATE_INITIALIZED ||
|
||||||
cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
|
cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
|
||||||
connection->forceCloseAtTransactionEnd ||
|
connection->forceCloseAtTransactionEnd ||
|
||||||
|
@ -1506,9 +1512,8 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
|
||||||
!RemoteTransactionIdle(connection) ||
|
!RemoteTransactionIdle(connection) ||
|
||||||
connection->requiresReplication ||
|
connection->requiresReplication ||
|
||||||
connection->isReplicationOriginSessionSetup ||
|
connection->isReplicationOriginSessionSetup ||
|
||||||
(MaxCachedConnectionLifetime >= 0 &&
|
(MaxCachedConnectionLifetime >= 0
|
||||||
MillisecondsToTimeout(connection->connectionEstablishmentStart,
|
&& MillisecondsToTimeout(connection->connectionEstablishmentStart, MaxCachedConnectionLifetime) <= 0);
|
||||||
MaxCachedConnectionLifetime) <= 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -86,6 +86,7 @@ typedef struct BackendManagementShmemData
|
||||||
typedef enum CitusBackendType
|
typedef enum CitusBackendType
|
||||||
{
|
{
|
||||||
CITUS_BACKEND_NOT_ASSIGNED,
|
CITUS_BACKEND_NOT_ASSIGNED,
|
||||||
|
CITUS_MAINTENANCE_DAEMON_BACKEND,
|
||||||
CITUS_INTERNAL_BACKEND,
|
CITUS_INTERNAL_BACKEND,
|
||||||
CITUS_REBALANCER_BACKEND,
|
CITUS_REBALANCER_BACKEND,
|
||||||
CITUS_RUN_COMMAND_BACKEND,
|
CITUS_RUN_COMMAND_BACKEND,
|
||||||
|
@ -96,6 +97,7 @@ static const char *CitusBackendPrefixes[] = {
|
||||||
CITUS_APPLICATION_NAME_PREFIX,
|
CITUS_APPLICATION_NAME_PREFIX,
|
||||||
CITUS_REBALANCER_APPLICATION_NAME_PREFIX,
|
CITUS_REBALANCER_APPLICATION_NAME_PREFIX,
|
||||||
CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX,
|
CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX,
|
||||||
|
CITUS_MAINTENANCE_DAEMON_APPLICATION_NAME_PREFIX,
|
||||||
};
|
};
|
||||||
|
|
||||||
static const CitusBackendType CitusBackendTypes[] = {
|
static const CitusBackendType CitusBackendTypes[] = {
|
||||||
|
@ -1071,7 +1073,6 @@ citus_pid_for_gpid(PG_FUNCTION_ARGS)
|
||||||
PG_RETURN_INT32(ExtractProcessIdFromGlobalPID(globalPID));
|
PG_RETURN_INT32(ExtractProcessIdFromGlobalPID(globalPID));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExtractGlobalPID extracts the global process id from the application name and returns it
|
* ExtractGlobalPID extracts the global process id from the application name and returns it
|
||||||
* if the application name is not compatible with Citus' application names returns 0.
|
* if the application name is not compatible with Citus' application names returns 0.
|
||||||
|
@ -1445,6 +1446,17 @@ IsCitusShardTransferBackend(void)
|
||||||
prefixLength) == 0;
|
prefixLength) == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool isCitusMaintenanceDaemonBackend(void)
|
||||||
|
{
|
||||||
|
if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED)
|
||||||
|
{
|
||||||
|
DetermineCitusBackendType(application_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
return CurrentBackendType == CITUS_MAINTENANCE_DAEMON_BACKEND;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DetermineCitusBackendType determines the type of backend based on the application_name.
|
* DetermineCitusBackendType determines the type of backend based on the application_name.
|
||||||
|
|
|
@ -60,6 +60,7 @@
|
||||||
#include "distributed/statistics_collection.h"
|
#include "distributed/statistics_collection.h"
|
||||||
#include "distributed/transaction_recovery.h"
|
#include "distributed/transaction_recovery.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
|
#include "distributed/connection_management.h"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Shared memory data for all maintenance workers.
|
* Shared memory data for all maintenance workers.
|
||||||
|
@ -506,7 +507,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid);
|
MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid);
|
||||||
|
|
||||||
/* make worker recognizable in pg_stat_activity */
|
/* make worker recognizable in pg_stat_activity */
|
||||||
pgstat_report_appname("Citus Maintenance Daemon");
|
pgstat_report_appname(CITUS_MAINTENANCE_DAEMON_APPLICATION_NAME_PREFIX);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Terminate orphaned metadata sync daemons spawned from previously terminated
|
* Terminate orphaned metadata sync daemons spawned from previously terminated
|
||||||
|
|
|
@ -78,6 +78,7 @@ extern bool IsRebalancerInternalBackend(void);
|
||||||
extern bool IsCitusRunCommandBackend(void);
|
extern bool IsCitusRunCommandBackend(void);
|
||||||
extern bool IsExternalClientBackend(void);
|
extern bool IsExternalClientBackend(void);
|
||||||
extern bool IsCitusShardTransferBackend(void);
|
extern bool IsCitusShardTransferBackend(void);
|
||||||
|
extern bool isCitusMaintenanceDaemonBackend(void);
|
||||||
|
|
||||||
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
|
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
|
||||||
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999
|
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999
|
||||||
|
|
|
@ -44,6 +44,8 @@
|
||||||
/* application name used for connections made by run_command_on_* */
|
/* application name used for connections made by run_command_on_* */
|
||||||
#define CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX "citus_run_command gpid="
|
#define CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX "citus_run_command gpid="
|
||||||
|
|
||||||
|
#define CITUS_MAINTENANCE_DAEMON_APPLICATION_NAME_PREFIX "Citus Maintenance Daemon"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* application name prefix for move/split replication connections.
|
* application name prefix for move/split replication connections.
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in New Issue