mirror of https://github.com/citusdata/citus.git
Add caching for functions that check the backend type
parent
37dda19f31
commit
7abcfac61f
|
@ -17,6 +17,7 @@
|
|||
|
||||
#include "catalog/pg_proc.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "distributed/backend_data.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/commands.h"
|
||||
|
|
|
@ -42,6 +42,7 @@
|
|||
#include "commands/defrem.h"
|
||||
#include "commands/tablecmds.h"
|
||||
#include "distributed/adaptive_executor.h"
|
||||
#include "distributed/backend_data.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/commands.h"
|
||||
#include "distributed/commands/multi_copy.h"
|
||||
|
|
|
@ -1466,40 +1466,6 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsRebalancerInitiatedBackend returns true if we are in a backend that citus
|
||||
* rebalancer initiated.
|
||||
*/
|
||||
bool
|
||||
IsRebalancerInternalBackend(void)
|
||||
{
|
||||
return application_name && strcmp(application_name, CITUS_REBALANCER_NAME) == 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsCitusRunCommandBackend returns true if we are in a backend that one of
|
||||
* the run_command_on_* functions initiated.
|
||||
*/
|
||||
bool
|
||||
IsCitusRunCommandBackend(void)
|
||||
{
|
||||
return application_name &&
|
||||
strcmp(application_name, CITUS_RUN_COMMAND_APPLICATION_NAME) == 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsCitusInitiatedRemoteBackend returns true if we are in a backend that citus
|
||||
* initiated via remote connection.
|
||||
*/
|
||||
bool
|
||||
IsCitusInternalBackend(void)
|
||||
{
|
||||
return ExtractGlobalPID(application_name) != INVALID_CITUS_INTERNAL_BACKEND_GPID;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ResetConnection preserves the given connection for later usage by
|
||||
* resetting its states.
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "catalog/dependency.h"
|
||||
#include "catalog/pg_class.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "distributed/backend_data.h"
|
||||
#include "distributed/citus_custom_scan.h"
|
||||
#include "distributed/commands/multi_copy.h"
|
||||
#include "distributed/commands/utility_hook.h"
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "catalog/pg_proc.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "distributed/backend_data.h"
|
||||
#include "distributed/metadata_utility.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
|
|
|
@ -2081,6 +2081,7 @@ static void
|
|||
ApplicationNameAssignHook(const char *newval, void *extra)
|
||||
{
|
||||
ResetHideShardsDecision();
|
||||
ResetCitusBackendType();
|
||||
OldApplicationNameAssignHook(newval, extra);
|
||||
}
|
||||
|
||||
|
|
|
@ -78,6 +78,19 @@ typedef struct BackendManagementShmemData
|
|||
BackendData backends[FLEXIBLE_ARRAY_MEMBER];
|
||||
} BackendManagementShmemData;
|
||||
|
||||
/*
|
||||
* CitusBackendType reflects what type of backend we are in. This
|
||||
* can change depending on the application_name.
|
||||
*/
|
||||
typedef enum CitusBackendType
|
||||
{
|
||||
CITUS_BACKEND_NOT_ASSIGNED,
|
||||
CITUS_INTERNAL_BACKEND,
|
||||
CITUS_REBALANCER_BACKEND,
|
||||
CITUS_RUN_COMMAND_BACKEND,
|
||||
EXTERNAL_CLIENT_BACKEND
|
||||
} CitusBackendType;
|
||||
|
||||
|
||||
static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc
|
||||
tupleDescriptor);
|
||||
|
@ -88,10 +101,12 @@ static uint64 GenerateGlobalPID(void);
|
|||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||
static BackendManagementShmemData *backendManagementShmemData = NULL;
|
||||
static BackendData *MyBackendData = NULL;
|
||||
static CitusBackendType CurrentBackendType = CITUS_BACKEND_NOT_ASSIGNED;
|
||||
|
||||
|
||||
static void BackendManagementShmemInit(void);
|
||||
static size_t BackendManagementShmemSize(void);
|
||||
static void DetermineCitusBackendType(void);
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
|
||||
|
@ -1278,3 +1293,87 @@ DecrementExternalClientBackendCounter(void)
|
|||
{
|
||||
pg_atomic_sub_fetch_u32(&backendManagementShmemData->externalClientBackendCounter, 1);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ResetCitusBackendType resets the backend type cache.
|
||||
*/
|
||||
void
|
||||
ResetCitusBackendType(void)
|
||||
{
|
||||
CurrentBackendType = CITUS_BACKEND_NOT_ASSIGNED;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsRebalancerInitiatedBackend returns true if we are in a backend that citus
|
||||
* rebalancer initiated.
|
||||
*/
|
||||
bool
|
||||
IsRebalancerInternalBackend(void)
|
||||
{
|
||||
if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED)
|
||||
{
|
||||
DetermineCitusBackendType();
|
||||
}
|
||||
|
||||
return CurrentBackendType == CITUS_REBALANCER_BACKEND;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsCitusInitiatedRemoteBackend returns true if we are in a backend that citus
|
||||
* initiated via remote connection.
|
||||
*/
|
||||
bool
|
||||
IsCitusInternalBackend(void)
|
||||
{
|
||||
if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED)
|
||||
{
|
||||
DetermineCitusBackendType();
|
||||
}
|
||||
|
||||
return CurrentBackendType == CITUS_INTERNAL_BACKEND;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsCitusRunCommandBackend returns true if we are in a backend that one of
|
||||
* the run_command_on_* functions initiated.
|
||||
*/
|
||||
bool
|
||||
IsCitusRunCommandBackend(void)
|
||||
{
|
||||
if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED)
|
||||
{
|
||||
DetermineCitusBackendType();
|
||||
}
|
||||
|
||||
return CurrentBackendType == CITUS_RUN_COMMAND_BACKEND;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DetermineCitusBackendType determines the type of backend based on the application_name.
|
||||
*/
|
||||
static void
|
||||
DetermineCitusBackendType(void)
|
||||
{
|
||||
if (ExtractGlobalPID(application_name) != INVALID_CITUS_INTERNAL_BACKEND_GPID)
|
||||
{
|
||||
CurrentBackendType = CITUS_INTERNAL_BACKEND;
|
||||
}
|
||||
else if (application_name && strcmp(application_name, CITUS_REBALANCER_NAME) == 0)
|
||||
{
|
||||
CurrentBackendType = CITUS_REBALANCER_BACKEND;
|
||||
}
|
||||
else if (application_name &&
|
||||
strcmp(application_name, CITUS_RUN_COMMAND_APPLICATION_NAME) == 0)
|
||||
{
|
||||
CurrentBackendType = CITUS_RUN_COMMAND_BACKEND;
|
||||
}
|
||||
else
|
||||
{
|
||||
CurrentBackendType = EXTERNAL_CLIENT_BACKEND;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include "catalog/namespace.h"
|
||||
#include "catalog/pg_class.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/backend_data.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/listutils.h"
|
||||
|
|
|
@ -69,6 +69,10 @@ extern LocalTransactionId GetMyProcLocalTransactionId(void);
|
|||
extern int GetExternalClientBackendCount(void);
|
||||
extern uint32 IncrementExternalClientBackendCounter(void);
|
||||
extern void DecrementExternalClientBackendCounter(void);
|
||||
extern bool IsCitusInternalBackend(void);
|
||||
extern bool IsRebalancerInternalBackend(void);
|
||||
extern bool IsCitusRunCommandBackend(void);
|
||||
extern void ResetCitusBackendType(void);
|
||||
|
||||
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
|
||||
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999
|
||||
|
|
|
@ -288,9 +288,6 @@ extern void FinishConnectionListEstablishment(List *multiConnectionList);
|
|||
extern void FinishConnectionEstablishment(MultiConnection *connection);
|
||||
extern void ClaimConnectionExclusively(MultiConnection *connection);
|
||||
extern void UnclaimConnection(MultiConnection *connection);
|
||||
extern bool IsCitusInternalBackend(void);
|
||||
extern bool IsRebalancerInternalBackend(void);
|
||||
extern bool IsCitusRunCommandBackend(void);
|
||||
extern void MarkConnectionConnected(MultiConnection *connection);
|
||||
|
||||
/* waiteventset utilities */
|
||||
|
|
Loading…
Reference in New Issue