diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index a6dba0a38..7356f1cf9 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -644,7 +644,8 @@ MyBackendGotCancelledDueToDeadlock(void) /* * ActiveDistributedTransactionNumbers returns a list of pointers to - * transaction numbers of distributed transactions that are in progress. + * transaction numbers of distributed transactions that are in progress + * and were started by the node on which it is called. */ List * ActiveDistributedTransactionNumbers(void) @@ -673,6 +674,12 @@ ActiveDistributedTransactionNumbers(void) continue; } + if (!currentBackendData.transactionId.transactionOriginator) + { + /* not a coordinator process */ + continue; + } + transactionNumber = (uint64 *) palloc0(sizeof(uint64)); *transactionNumber = currentBackendData.transactionId.transactionNumber; diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 1ef581522..2b16d7769 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -150,6 +150,8 @@ RecoverWorkerTransactions(WorkerNode *workerNode) List *pendingTransactionList = NIL; HTAB *pendingTransactionSet = NULL; + List *recheckTransactionList = NIL; + HTAB *recheckTransactionSet = NULL; Relation pgDistTransaction = NULL; SysScanDesc scanDescriptor = NULL; @@ -186,23 +188,60 @@ RecoverWorkerTransactions(WorkerNode *workerNode) pgDistTransaction = heap_open(DistTransactionRelationId(), ShareUpdateExclusiveLock); tupleDescriptor = RelationGetDescr(pgDistTransaction); - /* find in-progress distributed transactions */ - activeTransactionNumberList = ActiveDistributedTransactionNumbers(); - activeTransactionNumberSet = ListToHashSet(activeTransactionNumberList, - sizeof(uint64), false); + /* + * We're going to check the list of prepared transactions on the worker, + * but some of those prepared transactions might belong to ongoing + * distributed transactions. + * + * We could avoid this by temporarily blocking new prepared transactions + * from being created by taking an ExlusiveLock on pg_dist_transaction. + * However, this hurts write performance, so instead we avoid blocking + * by consulting the list of active distributed transactions, and follow + * a carefully chosen order to avoid race conditions: + * + * 1) P = prepared transactions on worker + * 2) A = active distributed transactions + * 3) T = pg_dist_transaction snapshot + * 4) Q = prepared transactions on worker + * + * By observing A after P, we get a conclusive answer to which distributed + * transactions we observed in P are still in progress. It is safe to recover + * the transactions in P - A based on the presence or absence of a record + * in T. + * + * We also remove records in T if there is no prepared transaction, which + * we assume means the transaction committed. However, a transaction could + * have left prepared transactions and committed between steps 1 and 2. + * In that case, we would incorrectly remove the records, while the + * prepared transaction is still in place. + * + * We therefore observe the set of prepared transactions one more time in + * step 4. The aforementioned transactions would show up in Q, but not in + * P. We can skip those transactions and recover them later. + */ /* find stale prepared transactions on the remote node */ pendingTransactionList = PendingWorkerTransactionList(connection); pendingTransactionSet = ListToHashSet(pendingTransactionList, NAMEDATALEN, true); + /* find in-progress distributed transactions */ + activeTransactionNumberList = ActiveDistributedTransactionNumbers(); + activeTransactionNumberSet = ListToHashSet(activeTransactionNumberList, + sizeof(uint64), false); + /* scan through all recovery records of the current worker */ ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId)); + /* get a snapshot of pg_dist_transaction */ scanDescriptor = systable_beginscan(pgDistTransaction, DistTransactionGroupIndexId(), indexOK, NULL, scanKeyCount, scanKey); + /* find stale prepared transactions on the remote node */ + recheckTransactionList = PendingWorkerTransactionList(connection); + recheckTransactionSet = ListToHashSet(recheckTransactionList, NAMEDATALEN, true); + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) { bool isNull = false; @@ -255,9 +294,62 @@ RecoverWorkerTransactions(WorkerNode *workerNode) } recoveredTransactionCount++; + + /* + * We successfully committed the prepared transaction, safe to delete + * the recovery record. + */ + } + else + { + bool foundNewPreparedTransaction = false; + + /* + * We found a committed pg_dist_transaction record without a prepared + * transaction. In the typical case, this means that the prepared + * transaction was successfully committed from the post-commit callback + * and we can therefore delete the recovery record. However, there is a + * race condition. + * + * If a transaction started and committed just after we observed the + * set of prepared transactions, and just before we took our + * pg_dist_transaction snapshot, then we would see its records, but it may + * have prepared transactions that failed to commit and that we did not + * observe. We should not delete the records, since we would otherwise + * roll back the prepared transaction on the next call to + * recover_prepared_transactions. + * + * In addition, if the transaction started after the call it + * ActiveDistributedTransactionNumbers, then it may still be in the process + * of comitting the prepared transactions in the post-commit callback. + * + * We check the prepared transactions again after taking the + * pg_dist_transaction snapshot. Any prepared transactions that appear + * in this set, but not in pendingTransactionSet, may have been created + * before the pg_dist_transaction snapshot. We skip deleting the records + * for those transactions since there are prepared transactions that + * need to be commit, but we cannot commit them now, since they might + * still be getting committed and committing them twice would result in + * failures. + * + * In this case, we just leave the records and prepared transactions + * for the next call to recover_prepared_transactions. + */ + hash_search(recheckTransactionSet, transactionName, HASH_FIND, + &foundNewPreparedTransaction); + + if (foundNewPreparedTransaction) + { + continue; + } + + /* + * There is a recovery record and definitely no prepared transaction, it + * must have been committed immediately, so it's safe to delete the + * recovery record. + */ } - /* we've successfully committed the prepared transaction, remove the record */ simple_heap_delete(pgDistTransaction, &heapTuple->t_self); } diff --git a/src/backend/distributed/utils/listutils.c b/src/backend/distributed/utils/listutils.c index 1735ff3e9..2b7a68ddc 100644 --- a/src/backend/distributed/utils/listutils.c +++ b/src/backend/distributed/utils/listutils.c @@ -108,6 +108,7 @@ DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId) * ListToHashSet creates a hash table in which the keys are copied from * from itemList and the values are the same as the keys. This can * be used for fast lookups of the presence of a byte array in a set. + * itemList should be a list of pointers. * * If isStringList is true, then look-ups are performed through string * comparison of strings up to keySize in length. If isStringList is @@ -132,8 +133,7 @@ ListToHashSet(List *itemList, Size keySize, bool isStringList) if (!isStringList) { - info.hash = tag_hash; - flags |= HASH_FUNCTION; + flags |= HASH_BLOBS; } itemSet = hash_create("ListToHashSet", capacity, &info, flags);