mirror of https://github.com/citusdata/citus.git
Check the PGPROC's validity properly
We used to only check whether the PID is valid or not. However, Postgres does not necessarily set the PID of the backend to 0 when it exists. Instead, we need to be able to check it from procArray. IsBackendPid() is what pg_stat_activity also relies on for a similar purpose.pull/6076/head
parent
fba71f7c15
commit
6c65d29924
|
@ -647,6 +647,7 @@ CitusCleanupConnectionsAtExit(int code, Datum arg)
|
||||||
|
|
||||||
/* we don't want any monitoring view/udf to show already exited backends */
|
/* we don't want any monitoring view/udf to show already exited backends */
|
||||||
UnSetGlobalPID();
|
UnSetGlobalPID();
|
||||||
|
SetActiveMyBackend(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,7 @@
|
||||||
#include "storage/ipc.h"
|
#include "storage/ipc.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
#include "storage/lwlock.h"
|
#include "storage/lwlock.h"
|
||||||
|
#include "storage/procarray.h"
|
||||||
#include "storage/proc.h"
|
#include "storage/proc.h"
|
||||||
#include "storage/spin.h"
|
#include "storage/spin.h"
|
||||||
#include "storage/s_lock.h"
|
#include "storage/s_lock.h"
|
||||||
|
@ -392,9 +393,9 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
|
||||||
|
|
||||||
SpinLockAcquire(¤tBackend->mutex);
|
SpinLockAcquire(¤tBackend->mutex);
|
||||||
|
|
||||||
if (currentProc->pid == 0)
|
if (currentProc->pid == 0 || !currentBackend->activeBackend)
|
||||||
{
|
{
|
||||||
/* unused PGPROC slot */
|
/* unused PGPROC slot or the backend already exited */
|
||||||
SpinLockRelease(¤tBackend->mutex);
|
SpinLockRelease(¤tBackend->mutex);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -698,6 +699,12 @@ InitializeBackendData(void)
|
||||||
UnSetDistributedTransactionId();
|
UnSetDistributedTransactionId();
|
||||||
UnSetGlobalPID();
|
UnSetGlobalPID();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Signal that this backend is active and should show up
|
||||||
|
* on activity monitors.
|
||||||
|
*/
|
||||||
|
SetActiveMyBackend(true);
|
||||||
|
|
||||||
UnlockBackendSharedMemory();
|
UnlockBackendSharedMemory();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -746,6 +753,24 @@ UnSetGlobalPID(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SetActiveMyBackend is a wrapper around MyBackendData->activeBackend.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
SetActiveMyBackend(bool value)
|
||||||
|
{
|
||||||
|
/* backend does not exist if the extension is not created */
|
||||||
|
if (MyBackendData)
|
||||||
|
{
|
||||||
|
SpinLockAcquire(&MyBackendData->mutex);
|
||||||
|
|
||||||
|
MyBackendData->activeBackend = value;
|
||||||
|
|
||||||
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LockBackendSharedMemory is a simple wrapper around LWLockAcquire on the
|
* LockBackendSharedMemory is a simple wrapper around LWLockAcquire on the
|
||||||
* shared memory lock.
|
* shared memory lock.
|
||||||
|
@ -1224,6 +1249,16 @@ ActiveDistributedTransactionNumbers(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
GetBackendDataForProc(currentProc, ¤tBackendData);
|
GetBackendDataForProc(currentProc, ¤tBackendData);
|
||||||
|
if (!currentBackendData.activeBackend)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Skip if the PGPROC slot is unused. We should normally use
|
||||||
|
* IsBackendPid() to be able to skip reliably all the exited
|
||||||
|
* processes. However, that is a costly operation. Instead, we
|
||||||
|
* keep track of activeBackend in Citus code.
|
||||||
|
*/
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (!IsInDistributedTransaction(¤tBackendData))
|
if (!IsInDistributedTransaction(¤tBackendData))
|
||||||
{
|
{
|
||||||
|
|
|
@ -561,13 +561,23 @@ BuildLocalWaitGraph(bool onlyDistributedTx)
|
||||||
PGPROC *currentProc = &ProcGlobal->allProcs[curBackend];
|
PGPROC *currentProc = &ProcGlobal->allProcs[curBackend];
|
||||||
BackendData currentBackendData;
|
BackendData currentBackendData;
|
||||||
|
|
||||||
/* skip if the PGPROC slot is unused */
|
|
||||||
if (currentProc->pid == 0)
|
if (currentProc->pid == 0)
|
||||||
{
|
{
|
||||||
|
/* skip if the PGPROC slot is unused */
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
GetBackendDataForProc(currentProc, ¤tBackendData);
|
GetBackendDataForProc(currentProc, ¤tBackendData);
|
||||||
|
if (!currentBackendData.activeBackend)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Skip if the PGPROC slot is unused. We should normally use
|
||||||
|
* IsBackendPid() to be able to skip reliably all the exited
|
||||||
|
* processes. However, that is a costly operation. Instead, we
|
||||||
|
* keep track of activeBackend in Citus code.
|
||||||
|
*/
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Only start searching from distributed transactions, since we only
|
* Only start searching from distributed transactions, since we only
|
||||||
|
|
|
@ -42,6 +42,7 @@ typedef struct BackendData
|
||||||
uint64 globalPID;
|
uint64 globalPID;
|
||||||
bool distributedCommandOriginator;
|
bool distributedCommandOriginator;
|
||||||
DistributedTransactionId transactionId;
|
DistributedTransactionId transactionId;
|
||||||
|
bool activeBackend; /* set to false when backend exists */
|
||||||
} BackendData;
|
} BackendData;
|
||||||
|
|
||||||
|
|
||||||
|
@ -54,6 +55,7 @@ extern void LockBackendSharedMemory(LWLockMode lockMode);
|
||||||
extern void UnlockBackendSharedMemory(void);
|
extern void UnlockBackendSharedMemory(void);
|
||||||
extern void UnSetDistributedTransactionId(void);
|
extern void UnSetDistributedTransactionId(void);
|
||||||
extern void UnSetGlobalPID(void);
|
extern void UnSetGlobalPID(void);
|
||||||
|
extern void SetActiveMyBackend(bool value);
|
||||||
extern void AssignDistributedTransactionId(void);
|
extern void AssignDistributedTransactionId(void);
|
||||||
extern void AssignGlobalPID(void);
|
extern void AssignGlobalPID(void);
|
||||||
extern uint64 GetGlobalPID(void);
|
extern uint64 GetGlobalPID(void);
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
Parsed test spec with 3 sessions
|
Parsed test spec with 5 sessions
|
||||||
|
|
||||||
starting permutation: s1-grant s1-begin-insert s2-begin-insert s3-as-admin s3-as-user-1 s3-as-readonly s3-as-monitor s1-commit s2-commit
|
starting permutation: s1-grant s1-begin-insert s2-begin-insert s3-as-admin s3-as-user-1 s3-as-readonly s3-as-monitor s1-commit s2-commit
|
||||||
step s1-grant:
|
step s1-grant:
|
||||||
|
@ -93,3 +93,34 @@ step s1-commit:
|
||||||
step s2-commit:
|
step s2-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s4-record-pid s3-show-activity s5-kill s3-show-activity
|
||||||
|
step s4-record-pid:
|
||||||
|
SELECT pg_backend_pid() INTO selected_pid;
|
||||||
|
|
||||||
|
step s3-show-activity:
|
||||||
|
SET ROLE postgres;
|
||||||
|
select count(*) from get_all_active_transactions() where process_id IN (SELECT * FROM selected_pid);
|
||||||
|
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s5-kill:
|
||||||
|
SELECT pg_terminate_backend(pg_backend_pid) FROM selected_pid;
|
||||||
|
|
||||||
|
pg_terminate_backend
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s3-show-activity:
|
||||||
|
SET ROLE postgres;
|
||||||
|
select count(*) from get_all_active_transactions() where process_id IN (SELECT * FROM selected_pid);
|
||||||
|
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ teardown
|
||||||
{
|
{
|
||||||
DROP TABLE test_table;
|
DROP TABLE test_table;
|
||||||
DROP USER test_user_1, test_user_2, test_readonly, test_monitor;
|
DROP USER test_user_1, test_user_2, test_readonly, test_monitor;
|
||||||
|
DROP TABLE IF EXISTS selected_pid;
|
||||||
}
|
}
|
||||||
|
|
||||||
session "s1"
|
session "s1"
|
||||||
|
@ -100,4 +101,26 @@ step "s3-as-monitor"
|
||||||
SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0;
|
SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
step "s3-show-activity"
|
||||||
|
{
|
||||||
|
SET ROLE postgres;
|
||||||
|
select count(*) from get_all_active_transactions() where process_id IN (SELECT * FROM selected_pid);
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s4"
|
||||||
|
|
||||||
|
step "s4-record-pid"
|
||||||
|
{
|
||||||
|
SELECT pg_backend_pid() INTO selected_pid;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s5"
|
||||||
|
|
||||||
|
step "s5-kill"
|
||||||
|
{
|
||||||
|
SELECT pg_terminate_backend(pg_backend_pid) FROM selected_pid;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
permutation "s1-grant" "s1-begin-insert" "s2-begin-insert" "s3-as-admin" "s3-as-user-1" "s3-as-readonly" "s3-as-monitor" "s1-commit" "s2-commit"
|
permutation "s1-grant" "s1-begin-insert" "s2-begin-insert" "s3-as-admin" "s3-as-user-1" "s3-as-readonly" "s3-as-monitor" "s1-commit" "s2-commit"
|
||||||
|
permutation "s4-record-pid" "s3-show-activity" "s5-kill" "s3-show-activity"
|
||||||
|
|
Loading…
Reference in New Issue