mirror of https://github.com/citusdata/citus.git
Properly set worker_query and use
parent
1a1b94aa24
commit
3c5a146ae1
|
@ -152,7 +152,6 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
MyBackendData->citusBackend.initiatorNodeIdentifier =
|
MyBackendData->citusBackend.initiatorNodeIdentifier =
|
||||||
MyBackendData->transactionId.initiatorNodeIdentifier;
|
MyBackendData->transactionId.initiatorNodeIdentifier;
|
||||||
MyBackendData->citusBackend.transactionOriginator = false;
|
|
||||||
|
|
||||||
SpinLockRelease(&MyBackendData->mutex);
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
|
|
||||||
|
@ -410,15 +409,12 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
|
||||||
initiatorNodeIdentifier = currentBackend->citusBackend.initiatorNodeIdentifier;
|
initiatorNodeIdentifier = currentBackend->citusBackend.initiatorNodeIdentifier;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We prefer to use worker_query instead of transactionOriginator in the user facing
|
* We prefer to use worker_query instead of distributedCommandOriginator in
|
||||||
* functions since its more intuitive. Thus, we negate the result before returning.
|
* the user facing functions since its more intuitive. Thus,
|
||||||
*
|
* we negate the result before returning.
|
||||||
* We prefer to use citusBackend's transactionOriginator field over transactionId's
|
|
||||||
* field with the same name. The reason is that it also covers backends that are not
|
|
||||||
* inside a distributed transaction.
|
|
||||||
*/
|
*/
|
||||||
bool coordinatorOriginatedQuery =
|
bool coordinatorOriginatedQuery =
|
||||||
currentBackend->citusBackend.transactionOriginator;
|
currentBackend->distributedCommandOriginator;
|
||||||
|
|
||||||
transactionNumber = currentBackend->transactionId.transactionNumber;
|
transactionNumber = currentBackend->transactionId.transactionNumber;
|
||||||
TimestampTz transactionIdTimestamp = currentBackend->transactionId.timestamp;
|
TimestampTz transactionIdTimestamp = currentBackend->transactionId.timestamp;
|
||||||
|
@ -663,7 +659,6 @@ UnSetDistributedTransactionId(void)
|
||||||
MyBackendData->transactionId.timestamp = 0;
|
MyBackendData->transactionId.timestamp = 0;
|
||||||
|
|
||||||
MyBackendData->citusBackend.initiatorNodeIdentifier = -1;
|
MyBackendData->citusBackend.initiatorNodeIdentifier = -1;
|
||||||
MyBackendData->citusBackend.transactionOriginator = false;
|
|
||||||
|
|
||||||
SpinLockRelease(&MyBackendData->mutex);
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
}
|
}
|
||||||
|
@ -682,6 +677,7 @@ UnSetGlobalPID(void)
|
||||||
SpinLockAcquire(&MyBackendData->mutex);
|
SpinLockAcquire(&MyBackendData->mutex);
|
||||||
|
|
||||||
MyBackendData->globalPID = 0;
|
MyBackendData->globalPID = 0;
|
||||||
|
MyBackendData->distributedCommandOriginator = false;
|
||||||
|
|
||||||
SpinLockRelease(&MyBackendData->mutex);
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
}
|
}
|
||||||
|
@ -776,7 +772,6 @@ AssignDistributedTransactionId(void)
|
||||||
MyBackendData->transactionId.timestamp = currentTimestamp;
|
MyBackendData->transactionId.timestamp = currentTimestamp;
|
||||||
|
|
||||||
MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
|
MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
|
||||||
MyBackendData->citusBackend.transactionOriginator = true;
|
|
||||||
|
|
||||||
SpinLockRelease(&MyBackendData->mutex);
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
}
|
}
|
||||||
|
@ -798,7 +793,6 @@ MarkCitusInitiatedCoordinatorBackend(void)
|
||||||
SpinLockAcquire(&MyBackendData->mutex);
|
SpinLockAcquire(&MyBackendData->mutex);
|
||||||
|
|
||||||
MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
|
MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
|
||||||
MyBackendData->citusBackend.transactionOriginator = true;
|
|
||||||
|
|
||||||
SpinLockRelease(&MyBackendData->mutex);
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
}
|
}
|
||||||
|
@ -814,10 +808,12 @@ void
|
||||||
AssignGlobalPID(void)
|
AssignGlobalPID(void)
|
||||||
{
|
{
|
||||||
uint64 globalPID = INVALID_CITUS_INTERNAL_BACKEND_GPID;
|
uint64 globalPID = INVALID_CITUS_INTERNAL_BACKEND_GPID;
|
||||||
|
bool distributedCommandOriginator = false;
|
||||||
|
|
||||||
if (!IsCitusInternalBackend())
|
if (!IsCitusInternalBackend())
|
||||||
{
|
{
|
||||||
globalPID = GenerateGlobalPID();
|
globalPID = GenerateGlobalPID();
|
||||||
|
distributedCommandOriginator = true;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -826,6 +822,7 @@ AssignGlobalPID(void)
|
||||||
|
|
||||||
SpinLockAcquire(&MyBackendData->mutex);
|
SpinLockAcquire(&MyBackendData->mutex);
|
||||||
MyBackendData->globalPID = globalPID;
|
MyBackendData->globalPID = globalPID;
|
||||||
|
MyBackendData->distributedCommandOriginator = distributedCommandOriginator;
|
||||||
SpinLockRelease(&MyBackendData->mutex);
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -157,10 +157,9 @@ FROM \
|
||||||
WHERE \
|
WHERE \
|
||||||
backend_type = 'client backend' \
|
backend_type = 'client backend' \
|
||||||
AND \
|
AND \
|
||||||
pg_stat_activity.query NOT ILIKE '%stat_activity%' \
|
worker_query = False \
|
||||||
AND \
|
AND \
|
||||||
pg_stat_activity.application_name NOT SIMILAR TO 'citus_internal gpid=\\d+'; \
|
pg_stat_activity.query NOT ILIKE '%stat_activity%';"
|
||||||
"
|
|
||||||
|
|
||||||
#define CITUS_WORKER_STAT_ACTIVITY_QUERY \
|
#define CITUS_WORKER_STAT_ACTIVITY_QUERY \
|
||||||
"\
|
"\
|
||||||
|
@ -195,7 +194,7 @@ FROM \
|
||||||
get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp, global_id) \
|
get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp, global_id) \
|
||||||
ON pg_stat_activity.pid = dist_txs.process_id \
|
ON pg_stat_activity.pid = dist_txs.process_id \
|
||||||
WHERE \
|
WHERE \
|
||||||
pg_stat_activity.application_name SIMILAR TO 'citus_internal gpid=\\d+' \
|
worker_query = True \
|
||||||
AND \
|
AND \
|
||||||
pg_stat_activity.query NOT ILIKE '%stat_activity%';"
|
pg_stat_activity.query NOT ILIKE '%stat_activity%';"
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,6 @@
|
||||||
typedef struct CitusInitiatedBackend
|
typedef struct CitusInitiatedBackend
|
||||||
{
|
{
|
||||||
int initiatorNodeIdentifier;
|
int initiatorNodeIdentifier;
|
||||||
bool transactionOriginator;
|
|
||||||
} CitusInitiatedBackend;
|
} CitusInitiatedBackend;
|
||||||
|
|
||||||
|
|
||||||
|
@ -51,6 +50,7 @@ typedef struct BackendData
|
||||||
slock_t mutex;
|
slock_t mutex;
|
||||||
bool cancelledDueToDeadlock;
|
bool cancelledDueToDeadlock;
|
||||||
uint64 globalPID;
|
uint64 globalPID;
|
||||||
|
bool distributedCommandOriginator;
|
||||||
CitusInitiatedBackend citusBackend;
|
CitusInitiatedBackend citusBackend;
|
||||||
DistributedTransactionId transactionId;
|
DistributedTransactionId transactionId;
|
||||||
} BackendData;
|
} BackendData;
|
||||||
|
|
Loading…
Reference in New Issue