mirror of https://github.com/citusdata/citus.git
Observe prepared xacts twice in RecoverWorkerTransactions to avoid race condition
parent
2410c2e450
commit
ae47df01ea
|
@ -644,7 +644,8 @@ MyBackendGotCancelledDueToDeadlock(void)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ActiveDistributedTransactionNumbers returns a list of pointers to
|
* ActiveDistributedTransactionNumbers returns a list of pointers to
|
||||||
* transaction numbers of distributed transactions that are in progress.
|
* transaction numbers of distributed transactions that are in progress
|
||||||
|
* and were started by the node on which it is called.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
ActiveDistributedTransactionNumbers(void)
|
ActiveDistributedTransactionNumbers(void)
|
||||||
|
@ -673,6 +674,12 @@ ActiveDistributedTransactionNumbers(void)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!currentBackendData.transactionId.transactionOriginator)
|
||||||
|
{
|
||||||
|
/* not a coordinator process */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
transactionNumber = (uint64 *) palloc0(sizeof(uint64));
|
transactionNumber = (uint64 *) palloc0(sizeof(uint64));
|
||||||
*transactionNumber = currentBackendData.transactionId.transactionNumber;
|
*transactionNumber = currentBackendData.transactionId.transactionNumber;
|
||||||
|
|
||||||
|
|
|
@ -150,6 +150,8 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
||||||
|
|
||||||
List *pendingTransactionList = NIL;
|
List *pendingTransactionList = NIL;
|
||||||
HTAB *pendingTransactionSet = NULL;
|
HTAB *pendingTransactionSet = NULL;
|
||||||
|
List *recheckTransactionList = NIL;
|
||||||
|
HTAB *recheckTransactionSet = NULL;
|
||||||
|
|
||||||
Relation pgDistTransaction = NULL;
|
Relation pgDistTransaction = NULL;
|
||||||
SysScanDesc scanDescriptor = NULL;
|
SysScanDesc scanDescriptor = NULL;
|
||||||
|
@ -186,23 +188,60 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
||||||
pgDistTransaction = heap_open(DistTransactionRelationId(), ShareUpdateExclusiveLock);
|
pgDistTransaction = heap_open(DistTransactionRelationId(), ShareUpdateExclusiveLock);
|
||||||
tupleDescriptor = RelationGetDescr(pgDistTransaction);
|
tupleDescriptor = RelationGetDescr(pgDistTransaction);
|
||||||
|
|
||||||
/* find in-progress distributed transactions */
|
/*
|
||||||
activeTransactionNumberList = ActiveDistributedTransactionNumbers();
|
* We're going to check the list of prepared transactions on the worker,
|
||||||
activeTransactionNumberSet = ListToHashSet(activeTransactionNumberList,
|
* but some of those prepared transactions might belong to ongoing
|
||||||
sizeof(uint64), false);
|
* distributed transactions.
|
||||||
|
*
|
||||||
|
* We could avoid this by temporarily blocking new prepared transactions
|
||||||
|
* from being created by taking an ExlusiveLock on pg_dist_transaction.
|
||||||
|
* However, this hurts write performance, so instead we avoid blocking
|
||||||
|
* by consulting the list of active distributed transactions, and follow
|
||||||
|
* a carefully chosen order to avoid race conditions:
|
||||||
|
*
|
||||||
|
* 1) P = prepared transactions on worker
|
||||||
|
* 2) A = active distributed transactions
|
||||||
|
* 3) T = pg_dist_transaction snapshot
|
||||||
|
* 4) Q = prepared transactions on worker
|
||||||
|
*
|
||||||
|
* By observing A after P, we get a conclusive answer to which distributed
|
||||||
|
* transactions we observed in P are still in progress. It is safe to recover
|
||||||
|
* the transactions in P - A based on the presence or absence of a record
|
||||||
|
* in T.
|
||||||
|
*
|
||||||
|
* We also remove records in T if there is no prepared transaction, which
|
||||||
|
* we assume means the transaction committed. However, a transaction could
|
||||||
|
* have left prepared transactions and committed between steps 1 and 2.
|
||||||
|
* In that case, we would incorrectly remove the records, while the
|
||||||
|
* prepared transaction is still in place.
|
||||||
|
*
|
||||||
|
* We therefore observe the set of prepared transactions one more time in
|
||||||
|
* step 4. The aforementioned transactions would show up in Q, but not in
|
||||||
|
* P. We can skip those transactions and recover them later.
|
||||||
|
*/
|
||||||
|
|
||||||
/* find stale prepared transactions on the remote node */
|
/* find stale prepared transactions on the remote node */
|
||||||
pendingTransactionList = PendingWorkerTransactionList(connection);
|
pendingTransactionList = PendingWorkerTransactionList(connection);
|
||||||
pendingTransactionSet = ListToHashSet(pendingTransactionList, NAMEDATALEN, true);
|
pendingTransactionSet = ListToHashSet(pendingTransactionList, NAMEDATALEN, true);
|
||||||
|
|
||||||
|
/* find in-progress distributed transactions */
|
||||||
|
activeTransactionNumberList = ActiveDistributedTransactionNumbers();
|
||||||
|
activeTransactionNumberSet = ListToHashSet(activeTransactionNumberList,
|
||||||
|
sizeof(uint64), false);
|
||||||
|
|
||||||
/* scan through all recovery records of the current worker */
|
/* scan through all recovery records of the current worker */
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid,
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid,
|
||||||
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
|
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
|
||||||
|
|
||||||
|
/* get a snapshot of pg_dist_transaction */
|
||||||
scanDescriptor = systable_beginscan(pgDistTransaction,
|
scanDescriptor = systable_beginscan(pgDistTransaction,
|
||||||
DistTransactionGroupIndexId(), indexOK,
|
DistTransactionGroupIndexId(), indexOK,
|
||||||
NULL, scanKeyCount, scanKey);
|
NULL, scanKeyCount, scanKey);
|
||||||
|
|
||||||
|
/* find stale prepared transactions on the remote node */
|
||||||
|
recheckTransactionList = PendingWorkerTransactionList(connection);
|
||||||
|
recheckTransactionSet = ListToHashSet(recheckTransactionList, NAMEDATALEN, true);
|
||||||
|
|
||||||
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||||
{
|
{
|
||||||
bool isNull = false;
|
bool isNull = false;
|
||||||
|
@ -255,9 +294,62 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
||||||
}
|
}
|
||||||
|
|
||||||
recoveredTransactionCount++;
|
recoveredTransactionCount++;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We successfully committed the prepared transaction, safe to delete
|
||||||
|
* the recovery record.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
bool foundNewPreparedTransaction = false;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We found a committed pg_dist_transaction record without a prepared
|
||||||
|
* transaction. In the typical case, this means that the prepared
|
||||||
|
* transaction was successfully committed from the post-commit callback
|
||||||
|
* and we can therefore delete the recovery record. However, there is a
|
||||||
|
* race condition.
|
||||||
|
*
|
||||||
|
* If a transaction started and committed just after we observed the
|
||||||
|
* set of prepared transactions, and just before we took our
|
||||||
|
* pg_dist_transaction snapshot, then we would see its records, but it may
|
||||||
|
* have prepared transactions that failed to commit and that we did not
|
||||||
|
* observe. We should not delete the records, since we would otherwise
|
||||||
|
* roll back the prepared transaction on the next call to
|
||||||
|
* recover_prepared_transactions.
|
||||||
|
*
|
||||||
|
* In addition, if the transaction started after the call it
|
||||||
|
* ActiveDistributedTransactionNumbers, then it may still be in the process
|
||||||
|
* of comitting the prepared transactions in the post-commit callback.
|
||||||
|
*
|
||||||
|
* We check the prepared transactions again after taking the
|
||||||
|
* pg_dist_transaction snapshot. Any prepared transactions that appear
|
||||||
|
* in this set, but not in pendingTransactionSet, may have been created
|
||||||
|
* before the pg_dist_transaction snapshot. We skip deleting the records
|
||||||
|
* for those transactions since there are prepared transactions that
|
||||||
|
* need to be commit, but we cannot commit them now, since they might
|
||||||
|
* still be getting committed and committing them twice would result in
|
||||||
|
* failures.
|
||||||
|
*
|
||||||
|
* In this case, we just leave the records and prepared transactions
|
||||||
|
* for the next call to recover_prepared_transactions.
|
||||||
|
*/
|
||||||
|
hash_search(recheckTransactionSet, transactionName, HASH_FIND,
|
||||||
|
&foundNewPreparedTransaction);
|
||||||
|
|
||||||
|
if (foundNewPreparedTransaction)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* There is a recovery record and definitely no prepared transaction, it
|
||||||
|
* must have been committed immediately, so it's safe to delete the
|
||||||
|
* recovery record.
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we've successfully committed the prepared transaction, remove the record */
|
|
||||||
simple_heap_delete(pgDistTransaction, &heapTuple->t_self);
|
simple_heap_delete(pgDistTransaction, &heapTuple->t_self);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -108,6 +108,7 @@ DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId)
|
||||||
* ListToHashSet creates a hash table in which the keys are copied from
|
* ListToHashSet creates a hash table in which the keys are copied from
|
||||||
* from itemList and the values are the same as the keys. This can
|
* from itemList and the values are the same as the keys. This can
|
||||||
* be used for fast lookups of the presence of a byte array in a set.
|
* be used for fast lookups of the presence of a byte array in a set.
|
||||||
|
* itemList should be a list of pointers.
|
||||||
*
|
*
|
||||||
* If isStringList is true, then look-ups are performed through string
|
* If isStringList is true, then look-ups are performed through string
|
||||||
* comparison of strings up to keySize in length. If isStringList is
|
* comparison of strings up to keySize in length. If isStringList is
|
||||||
|
@ -132,8 +133,7 @@ ListToHashSet(List *itemList, Size keySize, bool isStringList)
|
||||||
|
|
||||||
if (!isStringList)
|
if (!isStringList)
|
||||||
{
|
{
|
||||||
info.hash = tag_hash;
|
flags |= HASH_BLOBS;
|
||||||
flags |= HASH_FUNCTION;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
itemSet = hash_create("ListToHashSet", capacity, &info, flags);
|
itemSet = hash_create("ListToHashSet", capacity, &info, flags);
|
||||||
|
|
Loading…
Reference in New Issue