mirror of https://github.com/citusdata/citus.git
Make GPIDs work with pg_dist_poolinfo (#6588)
The original implementation of GPIDs didn't work correctly when using `pg_dist_poolinfo` together with PgBouncer. The reason is that it assumed that once a connection was made to a worker, the originating GPID should stay the same for ever. But when pg_dist_poolinfo is used this isn't the case, because the same connection on the worker might be used by different backends of the coordinator. This fixes that issue by updating the GPID whenever a new application name is set on a connection. This is the only thing that's needed, because PgBouncer already sets the application name correctly on the server connection whenever a client is updated.pull/6576/head
parent
ad3407b5ff
commit
92689a8362
|
@ -159,6 +159,12 @@ static int ReplicationModel = REPLICATION_MODEL_STREAMING;
|
|||
/* we override the application_name assign_hook and keep a pointer to the old one */
|
||||
static GucStringAssignHook OldApplicationNameAssignHook = NULL;
|
||||
|
||||
/*
|
||||
* Flag to indicate when ApplicationNameAssignHook becomes responsible for
|
||||
* updating the global pid.
|
||||
*/
|
||||
static bool FinishedStartupCitusBackend = false;
|
||||
|
||||
static object_access_hook_type PrevObjectAccessHook = NULL;
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||
|
@ -677,10 +683,11 @@ StartupCitusBackend(void)
|
|||
* this is a no-op, since InitializeBackendData will already have extracted
|
||||
* the gpid from the application_name.
|
||||
*/
|
||||
AssignGlobalPID();
|
||||
AssignGlobalPID(application_name);
|
||||
|
||||
SetBackendDataDatabaseId();
|
||||
RegisterConnectionCleanup();
|
||||
FinishedStartupCitusBackend = true;
|
||||
}
|
||||
|
||||
|
||||
|
@ -2597,7 +2604,30 @@ static void
|
|||
ApplicationNameAssignHook(const char *newval, void *extra)
|
||||
{
|
||||
ResetHideShardsDecision();
|
||||
ResetCitusBackendType();
|
||||
DetermineCitusBackendType(newval);
|
||||
|
||||
/*
|
||||
* AssignGlobalPID might read from catalog tables to get the the local
|
||||
* nodeid. But ApplicationNameAssignHook might be called before catalog
|
||||
* access is available to the backend (such as in early stages of
|
||||
* authentication). We use StartupCitusBackend to initialize the global pid
|
||||
* after catalogs are available. After that happens this hook becomes
|
||||
* responsible to update the global pid on later application_name changes.
|
||||
* So we set the FinishedStartupCitusBackend flag in StartupCitusBackend to
|
||||
* indicate when this responsibility handoff has happened.
|
||||
*
|
||||
* Another solution to the catalog table acccess problem would be to update
|
||||
* global pid lazily, like we do for HideShards. But that's not possible
|
||||
* for the global pid, since it is stored in shared memory instead of in a
|
||||
* process-local global variable. So other processes might want to read it
|
||||
* before this process has updated it. So instead we try to set it as early
|
||||
* as reasonably possible, which is also why we extract global pids in the
|
||||
* AuthHook already (extracting doesn't require catalog access).
|
||||
*/
|
||||
if (FinishedStartupCitusBackend)
|
||||
{
|
||||
AssignGlobalPID(newval);
|
||||
}
|
||||
OldApplicationNameAssignHook(newval, extra);
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ test_assign_global_pid(PG_FUNCTION_ARGS)
|
|||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
AssignGlobalPID();
|
||||
AssignGlobalPID(application_name);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ static int64 GetRemoteProcessId(void);
|
|||
PG_FUNCTION_INFO_V1(start_session_level_connection_to_node);
|
||||
PG_FUNCTION_INFO_V1(run_commands_on_session_level_connection_to_node);
|
||||
PG_FUNCTION_INFO_V1(stop_session_level_connection_to_node);
|
||||
PG_FUNCTION_INFO_V1(override_backend_data_command_originator);
|
||||
PG_FUNCTION_INFO_V1(override_backend_data_gpid);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -121,12 +121,13 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS)
|
|||
ExecuteCriticalRemoteCommand(singleConnection, setAppName);
|
||||
|
||||
/*
|
||||
* We are hackily overriding the remote processes' worker_query to be false
|
||||
* such that relevant observibility UDFs work fine.
|
||||
* We are hackily overriding the remote processes' gpid value such that
|
||||
* blocked process detection continues to work.
|
||||
*/
|
||||
StringInfo overrideBackendDataCommandOriginator = makeStringInfo();
|
||||
appendStringInfo(overrideBackendDataCommandOriginator,
|
||||
"SELECT override_backend_data_command_originator(true);");
|
||||
"SELECT override_backend_data_gpid(%lu);",
|
||||
GetGlobalPID());
|
||||
ExecuteCriticalRemoteCommand(singleConnection,
|
||||
overrideBackendDataCommandOriginator->data);
|
||||
|
||||
|
@ -187,17 +188,16 @@ run_commands_on_session_level_connection_to_node(PG_FUNCTION_ARGS)
|
|||
|
||||
|
||||
/*
|
||||
* override_backend_data_command_originator is a wrapper around
|
||||
* SetBackendDataDistributedCommandOriginator().
|
||||
* override_backend_data_gpid is a wrapper around SetBackendDataGpid().
|
||||
*/
|
||||
Datum
|
||||
override_backend_data_command_originator(PG_FUNCTION_ARGS)
|
||||
override_backend_data_gpid(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
bool distributedCommandOriginator = PG_GETARG_BOOL(0);
|
||||
uint64 gpid = PG_GETARG_INT64(0);
|
||||
|
||||
SetBackendDataDistributedCommandOriginator(distributedCommandOriginator);
|
||||
SetBackendDataGlobalPID(gpid);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
|
|
@ -878,55 +878,55 @@ AssignDistributedTransactionId(void)
|
|||
|
||||
|
||||
/*
|
||||
* AssignGlobalPID assigns a global process id for the current backend.
|
||||
* If this is a Citus initiated backend, which means it is distributed part of a distributed
|
||||
* query, then this function assigns the global pid extracted from the application name.
|
||||
* If not, this function assigns a new generated global pid.
|
||||
* AssignGlobalPID assigns a global process id for the current backend based on
|
||||
* the given applicationName. If this is a Citus initiated backend, which means
|
||||
* it is distributed part of a distributed query, then this function assigns
|
||||
* the global pid extracted from the application name. If not, this function
|
||||
* assigns a new generated global pid.
|
||||
*
|
||||
* If a global PID is already assigned to this backend, then this function is a
|
||||
* no-op. In most scenarios this would already be the case, because a newly
|
||||
* assigned global PID would be the same as a proviously assigned one. But
|
||||
* there's two important cases where the newly assigned global PID would be
|
||||
* different from the previous one:
|
||||
* 1. The current backend is an internal backend and in the meantime the
|
||||
* application_name was changed to one without a gpid, e.g.
|
||||
* citus_rebalancer. In this case we don't want to throw away the original
|
||||
* gpid of the query originator, because that would mess up distributed
|
||||
* deadlock detection involving this backend.
|
||||
* 2. The current backend is an external backend and the node id of the current
|
||||
* node changed. Updating the gpid to match the nodeid might actually seem
|
||||
* like a desirable property, but that's not the case. Updating the gpid
|
||||
* with the new nodeid would mess up distributed deadlock and originator
|
||||
* detection of queries too. Because if this backend already opened
|
||||
* connections to other nodes, then those backends will still have the old
|
||||
* gpid.
|
||||
* There's one special case where we don't want to assign a new pid and keep
|
||||
* the old pid on purpose: The current backend is an external backend and the
|
||||
* node id of the current node changed since the previous call to
|
||||
* AssingGlobalPID. Updating the gpid to match the nodeid might seem like a
|
||||
* desirable property, but that's not the case. Mainly, because existing cached
|
||||
* connections will still report as the old gpid on the worker. So updating the
|
||||
* gpid with the new nodeid would mess up distributed deadlock and originator
|
||||
* detection of queries done using those old connections. So if this is an
|
||||
* external backend for which a gpid was already generated, then we don't
|
||||
* change the gpid.
|
||||
*
|
||||
* NOTE: This function can be called arbitrary amount of times for the same
|
||||
* backend, due to being called by StartupCitusBackend.
|
||||
*/
|
||||
void
|
||||
AssignGlobalPID(void)
|
||||
AssignGlobalPID(const char *applicationName)
|
||||
{
|
||||
if (GetGlobalPID() != INVALID_CITUS_INTERNAL_BACKEND_GPID)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
uint64 globalPID = INVALID_CITUS_INTERNAL_BACKEND_GPID;
|
||||
bool distributedCommandOriginator = false;
|
||||
bool distributedCommandOriginator = IsExternalClientBackend();
|
||||
|
||||
if (!IsCitusInternalBackend())
|
||||
if (distributedCommandOriginator)
|
||||
{
|
||||
globalPID = GenerateGlobalPID();
|
||||
distributedCommandOriginator = IsExternalClientBackend();
|
||||
}
|
||||
else
|
||||
{
|
||||
globalPID = ExtractGlobalPID(application_name);
|
||||
globalPID = ExtractGlobalPID(applicationName);
|
||||
}
|
||||
|
||||
SpinLockAcquire(&MyBackendData->mutex);
|
||||
|
||||
MyBackendData->globalPID = globalPID;
|
||||
MyBackendData->distributedCommandOriginator = distributedCommandOriginator;
|
||||
|
||||
/*
|
||||
* Skip updating globalpid when we were a command originator and still are
|
||||
* and we already have a valid global pid assigned.
|
||||
* See function comment for detailed explanation.
|
||||
*/
|
||||
if (!MyBackendData->distributedCommandOriginator ||
|
||||
!distributedCommandOriginator ||
|
||||
MyBackendData->globalPID == INVALID_CITUS_INTERNAL_BACKEND_GPID)
|
||||
{
|
||||
MyBackendData->globalPID = globalPID;
|
||||
MyBackendData->distributedCommandOriginator = distributedCommandOriginator;
|
||||
}
|
||||
SpinLockRelease(&MyBackendData->mutex);
|
||||
}
|
||||
|
||||
|
@ -948,19 +948,22 @@ SetBackendDataDatabaseId(void)
|
|||
|
||||
|
||||
/*
|
||||
* SetBackendDataDistributedCommandOriginator is used to set the distributedCommandOriginator
|
||||
* field on MyBackendData.
|
||||
* SetBackendDataGlobalPID is used to set the gpid field on MyBackendData.
|
||||
*
|
||||
* IMPORTANT: This should not be used for normal operations. It's a very hacky
|
||||
* way of setting the gpid, which is only used in our MX isolation tests. The
|
||||
* main problem is that it does not set distributedCommandOriginator to the
|
||||
* correct value.
|
||||
*/
|
||||
void
|
||||
SetBackendDataDistributedCommandOriginator(bool distributedCommandOriginator)
|
||||
SetBackendDataGlobalPID(uint64 gpid)
|
||||
{
|
||||
if (!MyBackendData)
|
||||
{
|
||||
return;
|
||||
}
|
||||
SpinLockAcquire(&MyBackendData->mutex);
|
||||
MyBackendData->distributedCommandOriginator =
|
||||
distributedCommandOriginator;
|
||||
MyBackendData->globalPID = gpid;
|
||||
SpinLockRelease(&MyBackendData->mutex);
|
||||
}
|
||||
|
||||
|
@ -1389,16 +1392,6 @@ DecrementExternalClientBackendCounter(void)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ResetCitusBackendType resets the backend type cache.
|
||||
*/
|
||||
void
|
||||
ResetCitusBackendType(void)
|
||||
{
|
||||
CurrentBackendType = CITUS_BACKEND_NOT_ASSIGNED;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsRebalancerInitiatedBackend returns true if we are in a backend that citus
|
||||
* rebalancer initiated.
|
||||
|
|
|
@ -56,12 +56,10 @@ extern void UnSetDistributedTransactionId(void);
|
|||
extern void UnSetGlobalPID(void);
|
||||
extern void SetActiveMyBackend(bool value);
|
||||
extern void AssignDistributedTransactionId(void);
|
||||
extern void AssignGlobalPID(void);
|
||||
extern void SetBackendDataGlobalPID(uint64 globalPID);
|
||||
extern void AssignGlobalPID(const char *applicationName);
|
||||
extern uint64 GetGlobalPID(void);
|
||||
extern void SetBackendDataDatabaseId(void);
|
||||
extern void SetBackendDataDistributedCommandOriginator(bool
|
||||
distributedCommandOriginator);
|
||||
extern void SetBackendDataGlobalPID(uint64 gpid);
|
||||
extern uint64 ExtractGlobalPID(const char *applicationName);
|
||||
extern int ExtractNodeIdFromGlobalPID(uint64 globalPID, bool missingOk);
|
||||
extern int ExtractProcessIdFromGlobalPID(uint64 globalPID);
|
||||
|
@ -79,7 +77,6 @@ extern bool IsCitusInternalBackend(void);
|
|||
extern bool IsRebalancerInternalBackend(void);
|
||||
extern bool IsCitusRunCommandBackend(void);
|
||||
extern bool IsExternalClientBackend(void);
|
||||
extern void ResetCitusBackendType(void);
|
||||
|
||||
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
|
||||
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999
|
||||
|
|
|
@ -8,12 +8,12 @@ setup
|
|||
LANGUAGE C STRICT VOLATILE
|
||||
AS 'citus', $$start_session_level_connection_to_node$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION override_backend_data_command_originator(bool)
|
||||
CREATE OR REPLACE FUNCTION override_backend_data_gpid(bigint)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT IMMUTABLE
|
||||
AS 'citus', $$override_backend_data_command_originator$$;
|
||||
AS 'citus', $$override_backend_data_gpid$$;
|
||||
|
||||
SELECT run_command_on_workers($$SET citus.enable_metadata_sync TO off;CREATE OR REPLACE FUNCTION override_backend_data_command_originator(bool)
|
||||
SELECT run_command_on_workers($$SET citus.enable_metadata_sync TO off;CREATE OR REPLACE FUNCTION override_backend_data_gpid(bigint)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT IMMUTABLE
|
||||
AS 'citus'$$);
|
||||
|
|
Loading…
Reference in New Issue