mirror of https://github.com/citusdata/citus.git
Fix flakyness in isolation_citus_dist_activity (#6263)
Sometimes in CI our isolation_citus_dist_activity test fails randomly like this: ```diff step s2-view-dist: SELECT query, citus_nodename_for_nodeid(citus_nodeid_for_gpid(global_pid)), citus_nodeport_for_nodeid(citus_nodeid_for_gpid(global_pid)), state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%'), ('%citus_add_node%')) AND backend_type = 'client backend' ORDER BY query DESC; query |citus_nodename_for_nodeid|citus_nodeport_for_nodeid|state |wait_event_type|wait_event|usename |datname ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+-------------------------+-------------------+---------------+----------+--------+---------- INSERT INTO test_table VALUES (100, 100); |localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression -(1 row) + + SELECT coalesce(to_jsonb(array_agg(csa_from_one_node.*)), '[{}]'::JSONB) + FROM ( + SELECT global_pid, worker_query AS is_worker_query, pg_stat_activity.* FROM + pg_stat_activity LEFT JOIN get_all_active_transactions() ON process_id = pid + ) AS csa_from_one_node; + |localhost | 57636|active | | |postgres|regression +(2 rows) step s3-view-worker: ``` Source: https://app.circleci.com/pipelines/github/citusdata/citus/26605/workflows/56d284d2-5bb3-4e64-a0ea-7b9b1626e7cd/jobs/760633 The reason for this is that citus_dist_stat_activity sometimes shows the query that it uses itself to get the data from pg_stat_activity. This is actually a bug, because it's a worker query and thus shouldn't show up there. To try and solve this bug, we remove two small opportunities for a race condition. These race conditions could happen when the backenddata was marked as active, but the distributedCommandOriginator was not set correctly yet/anymore. There was an opportunity for this to happen both during connection start and shutdown.pull/5756/merge
parent
33af407ac8
commit
d68654680b
|
@ -654,9 +654,17 @@ void
|
||||||
StartupCitusBackend(void)
|
StartupCitusBackend(void)
|
||||||
{
|
{
|
||||||
InitializeMaintenanceDaemonBackend();
|
InitializeMaintenanceDaemonBackend();
|
||||||
InitializeBackendData();
|
|
||||||
RegisterConnectionCleanup();
|
/*
|
||||||
|
* For queries this will be a no-op. But for background daemons we might
|
||||||
|
* still need to initialize the backend data. For those backaground daemons
|
||||||
|
* it doesn't really matter that we temporarily assign
|
||||||
|
* INVALID_CITUS_INTERNAL_BACKEND_GPID, since we override it again two
|
||||||
|
* lines below.
|
||||||
|
*/
|
||||||
|
InitializeBackendData(INVALID_CITUS_INTERNAL_BACKEND_GPID);
|
||||||
AssignGlobalPID();
|
AssignGlobalPID();
|
||||||
|
RegisterConnectionCleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -728,8 +736,8 @@ CitusCleanupConnectionsAtExit(int code, Datum arg)
|
||||||
DeallocateReservedConnections();
|
DeallocateReservedConnections();
|
||||||
|
|
||||||
/* we don't want any monitoring view/udf to show already exited backends */
|
/* we don't want any monitoring view/udf to show already exited backends */
|
||||||
UnSetGlobalPID();
|
|
||||||
SetActiveMyBackend(false);
|
SetActiveMyBackend(false);
|
||||||
|
UnSetGlobalPID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2669,36 +2677,28 @@ CitusAuthHook(Port *port, int status)
|
||||||
"regular client connections",
|
"regular client connections",
|
||||||
MaxClientConnections)));
|
MaxClientConnections)));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Right after this, before we assign global pid, this backend
|
* Right after this, but before we assign global pid, this backend might
|
||||||
* might get blocked by a DDL as that happens during parsing.
|
* get blocked by a DDL as that happens during parsing.
|
||||||
*
|
*
|
||||||
* That's why, lets mark the backend as an external backend
|
* That's why, we now initialize its backend data, with the gpid.
|
||||||
* which is likely to execute a distributed command.
|
*
|
||||||
*
|
* We do this so that this backend gets the chance to show up in
|
||||||
* We do this so that this backend gets the chance to show
|
* citus_lock_waits.
|
||||||
* up in citus_lock_waits.
|
*
|
||||||
*
|
* We cannot assign a new global PID yet here, because that would require
|
||||||
* We cannot assign a new global PID yet here, because that
|
* reading from catalogs, but that's not allowed this early in the
|
||||||
* would require reading from catalogs, but that's not allowed
|
* connection startup (because no database has been assigned yet).
|
||||||
* this early in the connection startup (because no database
|
*
|
||||||
* has been assigned yet).
|
* A second reason is for backends that never call StartupCitusBackend. For
|
||||||
*/
|
* those we already set the global PID in the backend data here to be able
|
||||||
InitializeBackendData();
|
* to do blocked process detection on connections that are opened over a
|
||||||
SetBackendDataDistributedCommandOriginator(true);
|
* replication connection. A replication connection backend will never call
|
||||||
}
|
* StartupCitusBackend, which normally sets up the global PID.
|
||||||
else
|
*/
|
||||||
{
|
InitializeBackendData(gpid);
|
||||||
/*
|
|
||||||
* We set the global PID in the backend data here already to be able to
|
|
||||||
* do blocked process detection on connections that are opened over a
|
|
||||||
* replication connection. A replication connection backend will never
|
|
||||||
* call StartupCitusBackend, which normally sets up the global PID.
|
|
||||||
*/
|
|
||||||
InitializeBackendData();
|
|
||||||
SetBackendDataGlobalPID(gpid);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* let other authentication hooks to kick in first */
|
/* let other authentication hooks to kick in first */
|
||||||
if (original_client_auth_hook)
|
if (original_client_auth_hook)
|
||||||
|
|
|
@ -671,13 +671,17 @@ TotalProcCount(void)
|
||||||
* the Citus extension is present, and after any subsequent invalidation of
|
* the Citus extension is present, and after any subsequent invalidation of
|
||||||
* pg_dist_partition (see InvalidateMetadataSystemCache()).
|
* pg_dist_partition (see InvalidateMetadataSystemCache()).
|
||||||
*
|
*
|
||||||
* We only need to initialise MyBackendData once. The only goal here is to make
|
* We only need to initialise MyBackendData once. The main goal here is to make
|
||||||
* sure that we don't use the backend data from a previous backend with the same
|
* sure that we don't use the backend data from a previous backend with the same
|
||||||
* pgprocno. Resetting the backend data after a distributed transaction happens
|
* pgprocno. Resetting the backend data after a distributed transaction happens
|
||||||
* on COMMIT/ABORT through transaction callbacks.
|
* on COMMIT/ABORT through transaction callbacks.
|
||||||
|
*
|
||||||
|
* We do also initialize the distributedCommandOriginator and globalPID values
|
||||||
|
* based on these values. This is to make sure that once the backend date is
|
||||||
|
* initialized this backend can be correctly shown in citus_lock_waits.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
InitializeBackendData(void)
|
InitializeBackendData(uint64 globalPID)
|
||||||
{
|
{
|
||||||
if (MyBackendData != NULL)
|
if (MyBackendData != NULL)
|
||||||
{
|
{
|
||||||
|
@ -695,10 +699,27 @@ InitializeBackendData(void)
|
||||||
|
|
||||||
LockBackendSharedMemory(LW_EXCLUSIVE);
|
LockBackendSharedMemory(LW_EXCLUSIVE);
|
||||||
|
|
||||||
/* zero out the backend data */
|
/* zero out the backend its transaction id */
|
||||||
UnSetDistributedTransactionId();
|
UnSetDistributedTransactionId();
|
||||||
UnSetGlobalPID();
|
UnSetGlobalPID();
|
||||||
|
|
||||||
|
SpinLockAcquire(&MyBackendData->mutex);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Use the given globalPID to initialize
|
||||||
|
*/
|
||||||
|
if (globalPID == INVALID_CITUS_INTERNAL_BACKEND_GPID)
|
||||||
|
{
|
||||||
|
MyBackendData->distributedCommandOriginator =
|
||||||
|
true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MyBackendData->globalPID = globalPID;
|
||||||
|
MyBackendData->distributedCommandOriginator = false;
|
||||||
|
}
|
||||||
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Signal that this backend is active and should show up
|
* Signal that this backend is active and should show up
|
||||||
* on activity monitors.
|
* on activity monitors.
|
||||||
|
@ -893,23 +914,6 @@ AssignGlobalPID(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* SetBackendDataGlobalPID sets the global PID. This specifically does not read
|
|
||||||
* from catalog tables, because it should be safe to run from our
|
|
||||||
* authentication hook.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
SetBackendDataGlobalPID(uint64 globalPID)
|
|
||||||
{
|
|
||||||
SpinLockAcquire(&MyBackendData->mutex);
|
|
||||||
|
|
||||||
MyBackendData->globalPID = globalPID;
|
|
||||||
MyBackendData->distributedCommandOriginator = false;
|
|
||||||
|
|
||||||
SpinLockRelease(&MyBackendData->mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SetBackendDataDistributedCommandOriginator is used to set the distributedCommandOriginator
|
* SetBackendDataDistributedCommandOriginator is used to set the distributedCommandOriginator
|
||||||
* field on MyBackendData.
|
* field on MyBackendData.
|
||||||
|
|
|
@ -50,7 +50,7 @@ extern void BackendManagementShmemInit(void);
|
||||||
extern size_t BackendManagementShmemSize(void);
|
extern size_t BackendManagementShmemSize(void);
|
||||||
extern void InitializeBackendManagement(void);
|
extern void InitializeBackendManagement(void);
|
||||||
extern int TotalProcCount(void);
|
extern int TotalProcCount(void);
|
||||||
extern void InitializeBackendData(void);
|
extern void InitializeBackendData(uint64 globalPID);
|
||||||
extern void LockBackendSharedMemory(LWLockMode lockMode);
|
extern void LockBackendSharedMemory(LWLockMode lockMode);
|
||||||
extern void UnlockBackendSharedMemory(void);
|
extern void UnlockBackendSharedMemory(void);
|
||||||
extern void UnSetDistributedTransactionId(void);
|
extern void UnSetDistributedTransactionId(void);
|
||||||
|
|
Loading…
Reference in New Issue