mirror of https://github.com/citusdata/citus.git
Add caching for functions that check the backend type
parent
556f43f24a
commit
7c6784b1f4
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
#include "catalog/pg_proc.h"
|
#include "catalog/pg_proc.h"
|
||||||
#include "commands/defrem.h"
|
#include "commands/defrem.h"
|
||||||
|
#include "distributed/backend_data.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
#include "distributed/commands.h"
|
#include "distributed/commands.h"
|
||||||
|
|
|
@ -42,6 +42,7 @@
|
||||||
#include "commands/defrem.h"
|
#include "commands/defrem.h"
|
||||||
#include "commands/tablecmds.h"
|
#include "commands/tablecmds.h"
|
||||||
#include "distributed/adaptive_executor.h"
|
#include "distributed/adaptive_executor.h"
|
||||||
|
#include "distributed/backend_data.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
#include "distributed/commands.h"
|
#include "distributed/commands.h"
|
||||||
#include "distributed/commands/multi_copy.h"
|
#include "distributed/commands/multi_copy.h"
|
||||||
|
|
|
@ -1466,40 +1466,6 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* IsRebalancerInitiatedBackend returns true if we are in a backend that citus
|
|
||||||
* rebalancer initiated.
|
|
||||||
*/
|
|
||||||
bool
|
|
||||||
IsRebalancerInternalBackend(void)
|
|
||||||
{
|
|
||||||
return application_name && strcmp(application_name, CITUS_REBALANCER_NAME) == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* IsCitusRunCommandBackend returns true if we are in a backend that one of
|
|
||||||
* the run_command_on_* functions initiated.
|
|
||||||
*/
|
|
||||||
bool
|
|
||||||
IsCitusRunCommandBackend(void)
|
|
||||||
{
|
|
||||||
return application_name &&
|
|
||||||
strcmp(application_name, CITUS_RUN_COMMAND_APPLICATION_NAME) == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* IsCitusInitiatedRemoteBackend returns true if we are in a backend that citus
|
|
||||||
* initiated via remote connection.
|
|
||||||
*/
|
|
||||||
bool
|
|
||||||
IsCitusInternalBackend(void)
|
|
||||||
{
|
|
||||||
return ExtractGlobalPID(application_name) != INVALID_CITUS_INTERNAL_BACKEND_GPID;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ResetConnection preserves the given connection for later usage by
|
* ResetConnection preserves the given connection for later usage by
|
||||||
* resetting its states.
|
* resetting its states.
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "catalog/dependency.h"
|
#include "catalog/dependency.h"
|
||||||
#include "catalog/pg_class.h"
|
#include "catalog/pg_class.h"
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
|
#include "distributed/backend_data.h"
|
||||||
#include "distributed/citus_custom_scan.h"
|
#include "distributed/citus_custom_scan.h"
|
||||||
#include "distributed/commands/multi_copy.h"
|
#include "distributed/commands/multi_copy.h"
|
||||||
#include "distributed/commands/utility_hook.h"
|
#include "distributed/commands/utility_hook.h"
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "catalog/pg_proc.h"
|
#include "catalog/pg_proc.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "commands/defrem.h"
|
#include "commands/defrem.h"
|
||||||
|
#include "distributed/backend_data.h"
|
||||||
#include "distributed/metadata_utility.h"
|
#include "distributed/metadata_utility.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
|
|
|
@ -2032,6 +2032,7 @@ static void
|
||||||
ApplicationNameAssignHook(const char *newval, void *extra)
|
ApplicationNameAssignHook(const char *newval, void *extra)
|
||||||
{
|
{
|
||||||
ResetHideShardsDecision();
|
ResetHideShardsDecision();
|
||||||
|
ResetCitusBackendType();
|
||||||
OldApplicationNameAssignHook(newval, extra);
|
OldApplicationNameAssignHook(newval, extra);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -78,6 +78,19 @@ typedef struct BackendManagementShmemData
|
||||||
BackendData backends[FLEXIBLE_ARRAY_MEMBER];
|
BackendData backends[FLEXIBLE_ARRAY_MEMBER];
|
||||||
} BackendManagementShmemData;
|
} BackendManagementShmemData;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CitusBackendType reflects what type of backend we are in. This
|
||||||
|
* can change depending on the application_name.
|
||||||
|
*/
|
||||||
|
typedef enum CitusBackendType
|
||||||
|
{
|
||||||
|
CITUS_BACKEND_NOT_ASSIGNED,
|
||||||
|
CITUS_INTERNAL_BACKEND,
|
||||||
|
CITUS_REBALANCER_BACKEND,
|
||||||
|
CITUS_RUN_COMMAND_BACKEND,
|
||||||
|
EXTERNAL_CLIENT_BACKEND
|
||||||
|
} CitusBackendType;
|
||||||
|
|
||||||
|
|
||||||
static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc
|
static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc
|
||||||
tupleDescriptor);
|
tupleDescriptor);
|
||||||
|
@ -88,10 +101,12 @@ static uint64 GenerateGlobalPID(void);
|
||||||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||||
static BackendManagementShmemData *backendManagementShmemData = NULL;
|
static BackendManagementShmemData *backendManagementShmemData = NULL;
|
||||||
static BackendData *MyBackendData = NULL;
|
static BackendData *MyBackendData = NULL;
|
||||||
|
static CitusBackendType CurrentBackendType = CITUS_BACKEND_NOT_ASSIGNED;
|
||||||
|
|
||||||
|
|
||||||
static void BackendManagementShmemInit(void);
|
static void BackendManagementShmemInit(void);
|
||||||
static size_t BackendManagementShmemSize(void);
|
static size_t BackendManagementShmemSize(void);
|
||||||
|
static void DetermineCitusBackendType(void);
|
||||||
|
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
|
PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
|
||||||
|
@ -1278,3 +1293,87 @@ DecrementExternalClientBackendCounter(void)
|
||||||
{
|
{
|
||||||
pg_atomic_sub_fetch_u32(&backendManagementShmemData->externalClientBackendCounter, 1);
|
pg_atomic_sub_fetch_u32(&backendManagementShmemData->externalClientBackendCounter, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ResetCitusBackendType resets the backend type cache.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ResetCitusBackendType(void)
|
||||||
|
{
|
||||||
|
CurrentBackendType = CITUS_BACKEND_NOT_ASSIGNED;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsRebalancerInitiatedBackend returns true if we are in a backend that citus
|
||||||
|
* rebalancer initiated.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
IsRebalancerInternalBackend(void)
|
||||||
|
{
|
||||||
|
if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED)
|
||||||
|
{
|
||||||
|
DetermineCitusBackendType();
|
||||||
|
}
|
||||||
|
|
||||||
|
return CurrentBackendType == CITUS_REBALANCER_BACKEND;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsCitusInitiatedRemoteBackend returns true if we are in a backend that citus
|
||||||
|
* initiated via remote connection.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
IsCitusInternalBackend(void)
|
||||||
|
{
|
||||||
|
if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED)
|
||||||
|
{
|
||||||
|
DetermineCitusBackendType();
|
||||||
|
}
|
||||||
|
|
||||||
|
return CurrentBackendType == CITUS_INTERNAL_BACKEND;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsCitusRunCommandBackend returns true if we are in a backend that one of
|
||||||
|
* the run_command_on_* functions initiated.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
IsCitusRunCommandBackend(void)
|
||||||
|
{
|
||||||
|
if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED)
|
||||||
|
{
|
||||||
|
DetermineCitusBackendType();
|
||||||
|
}
|
||||||
|
|
||||||
|
return CurrentBackendType == CITUS_RUN_COMMAND_BACKEND;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DetermineCitusBackendType determines the type of backend based on the application_name.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
DetermineCitusBackendType(void)
|
||||||
|
{
|
||||||
|
if (ExtractGlobalPID(application_name) != INVALID_CITUS_INTERNAL_BACKEND_GPID)
|
||||||
|
{
|
||||||
|
CurrentBackendType = CITUS_INTERNAL_BACKEND;
|
||||||
|
}
|
||||||
|
else if (application_name && strcmp(application_name, CITUS_REBALANCER_NAME) == 0)
|
||||||
|
{
|
||||||
|
CurrentBackendType = CITUS_REBALANCER_BACKEND;
|
||||||
|
}
|
||||||
|
else if (application_name &&
|
||||||
|
strcmp(application_name, CITUS_RUN_COMMAND_APPLICATION_NAME) == 0)
|
||||||
|
{
|
||||||
|
CurrentBackendType = CITUS_RUN_COMMAND_BACKEND;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
CurrentBackendType = EXTERNAL_CLIENT_BACKEND;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
#include "catalog/pg_class.h"
|
#include "catalog/pg_class.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
|
#include "distributed/backend_data.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/coordinator_protocol.h"
|
#include "distributed/coordinator_protocol.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
|
|
|
@ -69,6 +69,10 @@ extern LocalTransactionId GetMyProcLocalTransactionId(void);
|
||||||
extern int GetExternalClientBackendCount(void);
|
extern int GetExternalClientBackendCount(void);
|
||||||
extern uint32 IncrementExternalClientBackendCounter(void);
|
extern uint32 IncrementExternalClientBackendCounter(void);
|
||||||
extern void DecrementExternalClientBackendCounter(void);
|
extern void DecrementExternalClientBackendCounter(void);
|
||||||
|
extern bool IsCitusInternalBackend(void);
|
||||||
|
extern bool IsRebalancerInternalBackend(void);
|
||||||
|
extern bool IsCitusRunCommandBackend(void);
|
||||||
|
extern void ResetCitusBackendType(void);
|
||||||
|
|
||||||
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
|
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
|
||||||
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999
|
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999
|
||||||
|
|
|
@ -288,9 +288,6 @@ extern void FinishConnectionListEstablishment(List *multiConnectionList);
|
||||||
extern void FinishConnectionEstablishment(MultiConnection *connection);
|
extern void FinishConnectionEstablishment(MultiConnection *connection);
|
||||||
extern void ClaimConnectionExclusively(MultiConnection *connection);
|
extern void ClaimConnectionExclusively(MultiConnection *connection);
|
||||||
extern void UnclaimConnection(MultiConnection *connection);
|
extern void UnclaimConnection(MultiConnection *connection);
|
||||||
extern bool IsCitusInternalBackend(void);
|
|
||||||
extern bool IsRebalancerInternalBackend(void);
|
|
||||||
extern bool IsCitusRunCommandBackend(void);
|
|
||||||
extern void MarkConnectionConnected(MultiConnection *connection);
|
extern void MarkConnectionConnected(MultiConnection *connection);
|
||||||
|
|
||||||
/* waiteventset utilities */
|
/* waiteventset utilities */
|
||||||
|
|
Loading…
Reference in New Issue