mirror of https://github.com/citusdata/citus.git
Address race condition in InitializeBackendData (#6285)
Sometimes in CI our isolation_citus_dist_activity test fails randomly like this: ```diff step s2-view-dist: SELECT query, citus_nodename_for_nodeid(citus_nodeid_for_gpid(global_pid)), citus_nodeport_for_nodeid(citus_nodeid_for_gpid(global_pid)), state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%'), ('%citus_add_node%')) AND backend_type = 'client backend' ORDER BY query DESC; query |citus_nodename_for_nodeid|citus_nodeport_for_nodeid|state |wait_event_type|wait_event|usename |datname ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+-------------------------+-------------------+---------------+----------+--------+---------- INSERT INTO test_table VALUES (100, 100); |localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression -(1 row) + + SELECT coalesce(to_jsonb(array_agg(csa_from_one_node.*)), '[{}]'::JSONB) + FROM ( + SELECT global_pid, worker_query AS is_worker_query, pg_stat_activity.* FROM + pg_stat_activity LEFT JOIN get_all_active_transactions() ON process_id = pid + ) AS csa_from_one_node; + |localhost | 57636|active | | |postgres|regression +(2 rows) step s3-view-worker: ``` Source: https://app.circleci.com/pipelines/github/citusdata/citus/26692/workflows/3406e4b4-b686-4667-bec6-8253ee0809b1/jobs/765119 I intended to fix this with #6263, but the fix turned out to be insufficient. This PR tries to address the issue by setting distributedCommandOriginator correctly in more situations. However, even with this change it's still possible to reproduce the flaky test in CI. In any case this should fix at least some instances of this issue. In passing this changes the isolation_citus_dist_activity test to allow running it multiple times in a row.emelsimsek
parent
7c8cc7fc61
commit
1c5b8588fe
|
@ -2643,10 +2643,17 @@ StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
|
|||
static void
|
||||
CitusAuthHook(Port *port, int status)
|
||||
{
|
||||
uint64 gpid = ExtractGlobalPID(port->application_name);
|
||||
/*
|
||||
* We determine the backend type here because other calls in this hook rely
|
||||
* on it, both IsExternalClientBackend and InitializeBackendData. These
|
||||
* calls would normally initialize its value based on the application_name
|
||||
* global, but this global is not set yet at this point in the connection
|
||||
* initialization. So here we determine it based on the value from Port.
|
||||
*/
|
||||
DetermineCitusBackendType(port->application_name);
|
||||
|
||||
/* external connections to not have a GPID immediately */
|
||||
if (gpid == INVALID_CITUS_INTERNAL_BACKEND_GPID)
|
||||
if (IsExternalClientBackend())
|
||||
{
|
||||
/*
|
||||
* We raise the shared connection counter pre-emptively. As a result, we may
|
||||
|
@ -2698,7 +2705,8 @@ CitusAuthHook(Port *port, int status)
|
|||
* replication connection. A replication connection backend will never call
|
||||
* StartupCitusBackend, which normally sets up the global PID.
|
||||
*/
|
||||
InitializeBackendData(gpid);
|
||||
InitializeBackendData(port->application_name);
|
||||
|
||||
|
||||
/* let other authentication hooks to kick in first */
|
||||
if (original_client_auth_hook)
|
||||
|
|
|
@ -105,9 +105,6 @@ static BackendData *MyBackendData = NULL;
|
|||
static CitusBackendType CurrentBackendType = CITUS_BACKEND_NOT_ASSIGNED;
|
||||
|
||||
|
||||
static void DetermineCitusBackendType(void);
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
|
||||
PG_FUNCTION_INFO_V1(get_current_transaction_id);
|
||||
PG_FUNCTION_INFO_V1(get_global_active_transactions);
|
||||
|
@ -681,7 +678,7 @@ TotalProcCount(void)
|
|||
* initialized this backend can be correctly shown in citus_lock_waits.
|
||||
*/
|
||||
void
|
||||
InitializeBackendData(uint64 globalPID)
|
||||
InitializeBackendData(const char *applicationName)
|
||||
{
|
||||
if (MyBackendData != NULL)
|
||||
{
|
||||
|
@ -693,6 +690,8 @@ InitializeBackendData(uint64 globalPID)
|
|||
return;
|
||||
}
|
||||
|
||||
uint64 gpid = ExtractGlobalPID(applicationName);
|
||||
|
||||
MyBackendData = &backendManagementShmemData->backends[MyProc->pgprocno];
|
||||
|
||||
Assert(MyBackendData);
|
||||
|
@ -704,20 +703,8 @@ InitializeBackendData(uint64 globalPID)
|
|||
UnSetGlobalPID();
|
||||
|
||||
SpinLockAcquire(&MyBackendData->mutex);
|
||||
|
||||
/*
|
||||
* Use the given globalPID to initialize
|
||||
*/
|
||||
if (globalPID == INVALID_CITUS_INTERNAL_BACKEND_GPID)
|
||||
{
|
||||
MyBackendData->distributedCommandOriginator =
|
||||
true;
|
||||
}
|
||||
else
|
||||
{
|
||||
MyBackendData->globalPID = globalPID;
|
||||
MyBackendData->distributedCommandOriginator = false;
|
||||
}
|
||||
MyBackendData->distributedCommandOriginator = IsExternalClientBackend();
|
||||
MyBackendData->globalPID = gpid;
|
||||
SpinLockRelease(&MyBackendData->mutex);
|
||||
|
||||
/*
|
||||
|
@ -1045,7 +1032,7 @@ citus_pid_for_gpid(PG_FUNCTION_ARGS)
|
|||
* if the application name is not compatible with Citus' application names returns 0.
|
||||
*/
|
||||
uint64
|
||||
ExtractGlobalPID(char *applicationName)
|
||||
ExtractGlobalPID(const char *applicationName)
|
||||
{
|
||||
/* does application name exist */
|
||||
if (!applicationName)
|
||||
|
@ -1371,7 +1358,7 @@ IsRebalancerInternalBackend(void)
|
|||
{
|
||||
if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED)
|
||||
{
|
||||
DetermineCitusBackendType();
|
||||
DetermineCitusBackendType(application_name);
|
||||
}
|
||||
|
||||
return CurrentBackendType == CITUS_REBALANCER_BACKEND;
|
||||
|
@ -1387,7 +1374,7 @@ IsCitusInternalBackend(void)
|
|||
{
|
||||
if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED)
|
||||
{
|
||||
DetermineCitusBackendType();
|
||||
DetermineCitusBackendType(application_name);
|
||||
}
|
||||
|
||||
return CurrentBackendType == CITUS_INTERNAL_BACKEND;
|
||||
|
@ -1403,29 +1390,41 @@ IsCitusRunCommandBackend(void)
|
|||
{
|
||||
if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED)
|
||||
{
|
||||
DetermineCitusBackendType();
|
||||
DetermineCitusBackendType(application_name);
|
||||
}
|
||||
|
||||
return CurrentBackendType == CITUS_RUN_COMMAND_BACKEND;
|
||||
}
|
||||
|
||||
|
||||
bool
|
||||
IsExternalClientBackend(void)
|
||||
{
|
||||
if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED)
|
||||
{
|
||||
DetermineCitusBackendType(application_name);
|
||||
}
|
||||
|
||||
return CurrentBackendType == EXTERNAL_CLIENT_BACKEND;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DetermineCitusBackendType determines the type of backend based on the application_name.
|
||||
*/
|
||||
static void
|
||||
DetermineCitusBackendType(void)
|
||||
void
|
||||
DetermineCitusBackendType(const char *applicationName)
|
||||
{
|
||||
if (ExtractGlobalPID(application_name) != INVALID_CITUS_INTERNAL_BACKEND_GPID)
|
||||
if (ExtractGlobalPID(applicationName) != INVALID_CITUS_INTERNAL_BACKEND_GPID)
|
||||
{
|
||||
CurrentBackendType = CITUS_INTERNAL_BACKEND;
|
||||
}
|
||||
else if (application_name && strcmp(application_name, CITUS_REBALANCER_NAME) == 0)
|
||||
else if (applicationName && strcmp(applicationName, CITUS_REBALANCER_NAME) == 0)
|
||||
{
|
||||
CurrentBackendType = CITUS_REBALANCER_BACKEND;
|
||||
}
|
||||
else if (application_name &&
|
||||
strcmp(application_name, CITUS_RUN_COMMAND_APPLICATION_NAME) == 0)
|
||||
else if (applicationName &&
|
||||
strcmp(applicationName, CITUS_RUN_COMMAND_APPLICATION_NAME) == 0)
|
||||
{
|
||||
CurrentBackendType = CITUS_RUN_COMMAND_BACKEND;
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ extern void BackendManagementShmemInit(void);
|
|||
extern size_t BackendManagementShmemSize(void);
|
||||
extern void InitializeBackendManagement(void);
|
||||
extern int TotalProcCount(void);
|
||||
extern void InitializeBackendData(uint64 globalPID);
|
||||
extern void InitializeBackendData(const char *applicationName);
|
||||
extern void LockBackendSharedMemory(LWLockMode lockMode);
|
||||
extern void UnlockBackendSharedMemory(void);
|
||||
extern void UnSetDistributedTransactionId(void);
|
||||
|
@ -62,7 +62,7 @@ extern void SetBackendDataGlobalPID(uint64 globalPID);
|
|||
extern uint64 GetGlobalPID(void);
|
||||
extern void SetBackendDataDistributedCommandOriginator(bool
|
||||
distributedCommandOriginator);
|
||||
extern uint64 ExtractGlobalPID(char *applicationName);
|
||||
extern uint64 ExtractGlobalPID(const char *applicationName);
|
||||
extern int ExtractNodeIdFromGlobalPID(uint64 globalPID, bool missingOk);
|
||||
extern int ExtractProcessIdFromGlobalPID(uint64 globalPID);
|
||||
extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
|
||||
|
@ -74,9 +74,11 @@ extern LocalTransactionId GetMyProcLocalTransactionId(void);
|
|||
extern int GetExternalClientBackendCount(void);
|
||||
extern uint32 IncrementExternalClientBackendCounter(void);
|
||||
extern void DecrementExternalClientBackendCounter(void);
|
||||
extern void DetermineCitusBackendType(const char *applicationName);
|
||||
extern bool IsCitusInternalBackend(void);
|
||||
extern bool IsRebalancerInternalBackend(void);
|
||||
extern bool IsCitusRunCommandBackend(void);
|
||||
extern bool IsExternalClientBackend(void);
|
||||
extern void ResetCitusBackendType(void);
|
||||
|
||||
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
|
||||
|
|
|
@ -116,7 +116,7 @@ step s3-view-worker:
|
|||
|
||||
query |citus_nodename_for_nodeid|citus_nodeport_for_nodeid|state |wait_event_type|wait_event|usename |datname
|
||||
---------------------------------------------------------------------
|
||||
INSERT INTO public.test_table_1300008 (column1, column2) VALUES (100, 100)|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
||||
INSERT INTO public.test_table_1300003 (column1, column2) VALUES (100, 100)|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
||||
(1 row)
|
||||
|
||||
step s2-rollback:
|
||||
|
@ -180,10 +180,10 @@ step s3-view-worker:
|
|||
|
||||
query |citus_nodename_for_nodeid|citus_nodeport_for_nodeid|state |wait_event_type|wait_event|usename |datname
|
||||
---------------------------------------------------------------------
|
||||
SELECT count(*) AS count FROM public.test_table_1300014 test_table WHERE true|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
||||
SELECT count(*) AS count FROM public.test_table_1300013 test_table WHERE true|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
||||
SELECT count(*) AS count FROM public.test_table_1300012 test_table WHERE true|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
||||
SELECT count(*) AS count FROM public.test_table_1300011 test_table WHERE true|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
||||
SELECT count(*) AS count FROM public.test_table_1300004 test_table WHERE true|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
||||
SELECT count(*) AS count FROM public.test_table_1300003 test_table WHERE true|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
||||
SELECT count(*) AS count FROM public.test_table_1300002 test_table WHERE true|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
||||
SELECT count(*) AS count FROM public.test_table_1300001 test_table WHERE true|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
||||
(4 rows)
|
||||
|
||||
step s2-rollback:
|
||||
|
@ -247,7 +247,7 @@ step s3-view-worker:
|
|||
|
||||
query |citus_nodename_for_nodeid|citus_nodeport_for_nodeid|state |wait_event_type|wait_event|usename |datname
|
||||
---------------------------------------------------------------------
|
||||
SELECT count(*) AS count FROM public.test_table_1300017 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
||||
SELECT count(*) AS count FROM public.test_table_1300002 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression
|
||||
(1 row)
|
||||
|
||||
step s2-rollback:
|
||||
|
|
|
@ -9,7 +9,7 @@ setup
|
|||
AS 'citus', $$test_assign_global_pid$$;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 4;
|
||||
select setval('pg_dist_shardid_seq', GREATEST(1300000, nextval('pg_dist_shardid_seq')));
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1300001;
|
||||
|
||||
CREATE TABLE test_table(column1 int, column2 int);
|
||||
SELECT create_distributed_table('test_table', 'column1');
|
||||
|
|
Loading…
Reference in New Issue