mirror of https://github.com/citusdata/citus.git
Merge pull request #2573 from citusdata/fix_more_spinlocks
Move functions calls that can fail to outside of spinlockpull/2578/head
commit
1d421e60f9
|
@ -97,6 +97,11 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
Oid userId = GetUserId();
|
Oid userId = GetUserId();
|
||||||
|
|
||||||
|
/* prepare data before acquiring spinlock to protect against errors */
|
||||||
|
int32 initiatorNodeIdentifier = PG_GETARG_INT32(0);
|
||||||
|
uint64 transactionNumber = PG_GETARG_INT64(1);
|
||||||
|
TimestampTz timestamp = PG_GETARG_TIMESTAMPTZ(2);
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
/* MyBackendData should always be avaliable, just out of paranoia */
|
/* MyBackendData should always be avaliable, just out of paranoia */
|
||||||
|
@ -125,9 +130,9 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS)
|
||||||
MyBackendData->databaseId = MyDatabaseId;
|
MyBackendData->databaseId = MyDatabaseId;
|
||||||
MyBackendData->userId = userId;
|
MyBackendData->userId = userId;
|
||||||
|
|
||||||
MyBackendData->transactionId.initiatorNodeIdentifier = PG_GETARG_INT32(0);
|
MyBackendData->transactionId.initiatorNodeIdentifier = initiatorNodeIdentifier;
|
||||||
MyBackendData->transactionId.transactionNumber = PG_GETARG_INT64(1);
|
MyBackendData->transactionId.transactionNumber = transactionNumber;
|
||||||
MyBackendData->transactionId.timestamp = PG_GETARG_TIMESTAMPTZ(2);
|
MyBackendData->transactionId.timestamp = timestamp;
|
||||||
MyBackendData->transactionId.transactionOriginator = false;
|
MyBackendData->transactionId.transactionOriginator = false;
|
||||||
|
|
||||||
MyBackendData->citusBackend.initiatorNodeIdentifier =
|
MyBackendData->citusBackend.initiatorNodeIdentifier =
|
||||||
|
@ -386,7 +391,6 @@ static void
|
||||||
StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
|
StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
|
||||||
{
|
{
|
||||||
int backendIndex = 0;
|
int backendIndex = 0;
|
||||||
|
|
||||||
Datum values[ACTIVE_TRANSACTION_COLUMN_COUNT];
|
Datum values[ACTIVE_TRANSACTION_COLUMN_COUNT];
|
||||||
bool isNulls[ACTIVE_TRANSACTION_COLUMN_COUNT];
|
bool isNulls[ACTIVE_TRANSACTION_COLUMN_COUNT];
|
||||||
bool showAllTransactions = superuser();
|
bool showAllTransactions = superuser();
|
||||||
|
@ -414,6 +418,13 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
|
||||||
&backendManagementShmemData->backends[backendIndex];
|
&backendManagementShmemData->backends[backendIndex];
|
||||||
bool coordinatorOriginatedQuery = false;
|
bool coordinatorOriginatedQuery = false;
|
||||||
|
|
||||||
|
/* to work on data after releasing g spinlock to protect against errors */
|
||||||
|
Oid databaseId = InvalidOid;
|
||||||
|
int backendPid = -1;
|
||||||
|
int initiatorNodeIdentifier = -1;
|
||||||
|
uint64 transactionNumber = 0;
|
||||||
|
TimestampTz transactionIdTimestamp = 0;
|
||||||
|
|
||||||
SpinLockAcquire(¤tBackend->mutex);
|
SpinLockAcquire(¤tBackend->mutex);
|
||||||
|
|
||||||
/* we're only interested in backends initiated by Citus */
|
/* we're only interested in backends initiated by Citus */
|
||||||
|
@ -433,9 +444,9 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
values[0] = ObjectIdGetDatum(currentBackend->databaseId);
|
databaseId = currentBackend->databaseId;
|
||||||
values[1] = Int32GetDatum(ProcGlobal->allProcs[backendIndex].pid);
|
backendPid = ProcGlobal->allProcs[backendIndex].pid;
|
||||||
values[2] = Int32GetDatum(currentBackend->citusBackend.initiatorNodeIdentifier);
|
initiatorNodeIdentifier = currentBackend->citusBackend.initiatorNodeIdentifier;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We prefer to use worker_query instead of transactionOriginator in the user facing
|
* We prefer to use worker_query instead of transactionOriginator in the user facing
|
||||||
|
@ -446,13 +457,19 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
|
||||||
* inside a distributed transaction.
|
* inside a distributed transaction.
|
||||||
*/
|
*/
|
||||||
coordinatorOriginatedQuery = currentBackend->citusBackend.transactionOriginator;
|
coordinatorOriginatedQuery = currentBackend->citusBackend.transactionOriginator;
|
||||||
values[3] = !coordinatorOriginatedQuery;
|
|
||||||
|
|
||||||
values[4] = UInt64GetDatum(currentBackend->transactionId.transactionNumber);
|
transactionNumber = currentBackend->transactionId.transactionNumber;
|
||||||
values[5] = TimestampTzGetDatum(currentBackend->transactionId.timestamp);
|
transactionIdTimestamp = currentBackend->transactionId.timestamp;
|
||||||
|
|
||||||
SpinLockRelease(¤tBackend->mutex);
|
SpinLockRelease(¤tBackend->mutex);
|
||||||
|
|
||||||
|
values[0] = ObjectIdGetDatum(databaseId);
|
||||||
|
values[1] = Int32GetDatum(backendPid);
|
||||||
|
values[2] = Int32GetDatum(initiatorNodeIdentifier);
|
||||||
|
values[3] = !coordinatorOriginatedQuery;
|
||||||
|
values[4] = UInt64GetDatum(transactionNumber);
|
||||||
|
values[5] = TimestampTzGetDatum(transactionIdTimestamp);
|
||||||
|
|
||||||
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
Loading…
Reference in New Issue