From 2410c2e450ef44c13fee1b8781f0576f8696afd4 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 11 Oct 2017 14:38:08 +0200 Subject: [PATCH 1/4] Rewrite recover_prepared_transactions to be fast, non-blocking --- .../distributed/transaction/backend_data.c | 42 ++ .../transaction/remote_transaction.c | 30 +- .../transaction/transaction_recovery.c | 435 +++++++----------- src/backend/distributed/utils/listutils.c | 46 ++ src/include/distributed/backend_data.h | 1 + src/include/distributed/listutils.h | 2 + src/include/distributed/remote_transaction.h | 5 + .../isolation_transaction_recovery.out | 3 +- .../regress/expected/multi_mx_metadata.out | 6 - .../multi_mx_transaction_recovery.out | 6 - .../expected/multi_transaction_recovery.out | 6 - .../specs/isolation_transaction_recovery.spec | 2 +- 12 files changed, 294 insertions(+), 290 deletions(-) diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index dad7372b5..a6dba0a38 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -640,3 +640,45 @@ MyBackendGotCancelledDueToDeadlock(void) return cancelledDueToDeadlock; } + + +/* + * ActiveDistributedTransactionNumbers returns a list of pointers to + * transaction numbers of distributed transactions that are in progress. + */ +List * +ActiveDistributedTransactionNumbers(void) +{ + List *activeTransactionNumberList = NIL; + int curBackend = 0; + + /* build list of starting procs */ + for (curBackend = 0; curBackend < MaxBackends; curBackend++) + { + PGPROC *currentProc = &ProcGlobal->allProcs[curBackend]; + BackendData currentBackendData; + uint64 *transactionNumber = NULL; + + if (currentProc->pid == 0) + { + /* unused PGPROC slot */ + continue; + } + + GetBackendDataForProc(currentProc, ¤tBackendData); + + if (!IsInDistributedTransaction(¤tBackendData)) + { + /* not a distributed transaction */ + continue; + } + + transactionNumber = (uint64 *) palloc0(sizeof(uint64)); + *transactionNumber = currentBackendData.transactionId.transactionNumber; + + activeTransactionNumberList = lappend(activeTransactionNumberList, + transactionNumber); + } + + return activeTransactionNumberList; +} diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 615caf6dd..35a63c52b 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -27,6 +27,9 @@ #include "utils/hsearch.h" +#define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u" + + static void StartRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactionId subId); static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection, @@ -1241,11 +1244,36 @@ Assign2PCIdentifier(MultiConnection *connection) /* print all numbers as unsigned to guarantee no minus symbols appear in the name */ snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN, - "citus_%u_%u_"UINT64_FORMAT "_%u", GetLocalGroupId(), MyProcPid, + PREPARED_TRANSACTION_NAME_FORMAT, GetLocalGroupId(), MyProcPid, transactionNumber, connectionNumber++); } +/* + * ParsePreparedTransactionName parses a prepared transaction name to extract + * the initiator group ID, initiator process ID, distributed transaction number, + * and the connection number. If the transaction name does not match the expected + * format ParsePreparedTransactionName returns false, and true otherwise. + */ +bool +ParsePreparedTransactionName(char *preparedTransactionName, int *groupId, int *procId, + uint64 *transactionNumber, uint32 *connectionNumber) +{ + const int expectedFieldCount = 4; + int parsedFieldCount = 0; + bool nameValid = false; + + parsedFieldCount = sscanf(preparedTransactionName, PREPARED_TRANSACTION_NAME_FORMAT, + groupId, procId, transactionNumber, connectionNumber); + if (parsedFieldCount == expectedFieldCount) + { + nameValid = true; + } + + return nameValid; +} + + /* * WarnAboutLeakedPreparedTransaction issues a WARNING explaining that a * prepared transaction could not be committed or rolled back, and explains diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index a79557d97..1ef581522 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -24,6 +24,7 @@ #include "access/relscan.h" #include "access/xact.h" #include "catalog/indexing.h" +#include "distributed/backend_data.h" #include "distributed/connection_management.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" @@ -47,13 +48,11 @@ PG_FUNCTION_INFO_V1(recover_prepared_transactions); /* Local functions forward declarations */ static int RecoverPreparedTransactions(void); static int RecoverWorkerTransactions(WorkerNode *workerNode); -static List * NameListDifference(List *nameList, List *subtractList); -static int CompareNames(const void *leftPointer, const void *rightPointer); -static bool FindMatchingName(char **nameArray, int nameCount, char *needle, - int *matchIndex); static List * PendingWorkerTransactionList(MultiConnection *connection); -static List * UnconfirmedWorkerTransactionsList(int groupId); -static void DeleteTransactionRecord(int32 groupId, char *transactionName); +static bool IsTransactionInProgress(HTAB *activeTransactionNumberSet, + char *preparedTransactionName); +static bool RecoverPreparedTransactionOnWorker(MultiConnection *connection, + char *transactionName, bool shouldCommit); /* @@ -120,13 +119,6 @@ RecoverPreparedTransactions(void) ListCell *workerNodeCell = NULL; int recoveredTransactionCount = 0; - /* - * We block here if metadata transactions are ongoing, since we - * mustn't commit/abort their prepared transactions under their - * feet. We also prevent concurrent recovery. - */ - LockRelationOid(DistTransactionRelationId(), ExclusiveLock); - workerList = ActivePrimaryNodeList(); foreach(workerNodeCell, workerList) @@ -153,25 +145,33 @@ RecoverWorkerTransactions(WorkerNode *workerNode) char *nodeName = workerNode->workerName; int nodePort = workerNode->workerPort; + List *activeTransactionNumberList = NIL; + HTAB *activeTransactionNumberSet = NULL; + List *pendingTransactionList = NIL; - ListCell *pendingTransactionCell = NULL; + HTAB *pendingTransactionSet = NULL; - List *unconfirmedTransactionList = NIL; - char **unconfirmedTransactionArray = NULL; - int unconfirmedTransactionCount = 0; - int unconfirmedTransactionIndex = 0; + Relation pgDistTransaction = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + bool indexOK = true; + HeapTuple heapTuple = NULL; + TupleDesc tupleDescriptor = NULL; - List *committedTransactionList = NIL; - ListCell *committedTransactionCell = NULL; + HASH_SEQ_STATUS status; MemoryContext localContext = NULL; MemoryContext oldContext = NULL; + bool recoveryFailed = false; int connectionFlags = SESSION_LIFESPAN; MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); - if (connection->pgConn == NULL) + if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK) { - /* cannot recover transactions on this worker right now */ + ereport(WARNING, (errmsg("transaction recovery cannot connect to %s:%d", + nodeName, nodePort))); + return 0; } @@ -182,191 +182,130 @@ RecoverWorkerTransactions(WorkerNode *workerNode) ALLOCSET_DEFAULT_MAXSIZE); oldContext = MemoryContextSwitchTo(localContext); - /* find transactions that were committed, but not yet confirmed */ - unconfirmedTransactionList = UnconfirmedWorkerTransactionsList(groupId); - unconfirmedTransactionList = SortList(unconfirmedTransactionList, CompareNames); + /* take table lock first to avoid running concurrently */ + pgDistTransaction = heap_open(DistTransactionRelationId(), ShareUpdateExclusiveLock); + tupleDescriptor = RelationGetDescr(pgDistTransaction); - /* convert list to an array to use with FindMatchingNames */ - unconfirmedTransactionCount = list_length(unconfirmedTransactionList); - unconfirmedTransactionArray = - (char **) PointerArrayFromList(unconfirmedTransactionList); + /* find in-progress distributed transactions */ + activeTransactionNumberList = ActiveDistributedTransactionNumbers(); + activeTransactionNumberSet = ListToHashSet(activeTransactionNumberList, + sizeof(uint64), false); /* find stale prepared transactions on the remote node */ pendingTransactionList = PendingWorkerTransactionList(connection); - pendingTransactionList = SortList(pendingTransactionList, CompareNames); + pendingTransactionSet = ListToHashSet(pendingTransactionList, NAMEDATALEN, true); - /* - * Transactions that have no pending prepared transaction are assumed to - * have been committed. Any records in unconfirmedTransactionList that - * don't have a transaction in pendingTransactionList can be removed. - */ - committedTransactionList = NameListDifference(unconfirmedTransactionList, - pendingTransactionList); + /* scan through all recovery records of the current worker */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId)); - /* - * For each pending prepared transaction, check whether there is a transaction - * record. If so, commit. If not, the transaction that started the transaction - * must have rolled back and thus the prepared transaction should be aborted. - */ - foreach(pendingTransactionCell, pendingTransactionList) + scanDescriptor = systable_beginscan(pgDistTransaction, + DistTransactionGroupIndexId(), indexOK, + NULL, scanKeyCount, scanKey); + + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) { - char *transactionName = (char *) lfirst(pendingTransactionCell); - StringInfo command = makeStringInfo(); - int executeCommand = 0; - PGresult *result = NULL; + bool isNull = false; + bool isTransactionInProgress = false; + bool isTransactionPending = false; - bool shouldCommit = FindMatchingName(unconfirmedTransactionArray, - unconfirmedTransactionCount, - transactionName, - &unconfirmedTransactionIndex); + Datum transactionNameDatum = heap_getattr(heapTuple, + Anum_pg_dist_transaction_gid, + tupleDescriptor, &isNull); + char *transactionName = TextDatumGetCString(transactionNameDatum); - if (shouldCommit) + isTransactionInProgress = IsTransactionInProgress(activeTransactionNumberSet, + transactionName); + if (isTransactionInProgress) { - /* should have committed this prepared transaction */ - appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName); - } - else - { - /* no record of this prepared transaction, abort */ - appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName); - } - - executeCommand = ExecuteOptionalRemoteCommand(connection, command->data, &result); - if (executeCommand == QUERY_SEND_FAILED) - { - break; - } - if (executeCommand == RESPONSE_NOT_OKAY) - { - /* cannot recover this transaction right now */ + /* + * Do not touch in progress transactions as we might mistakenly + * commit a transaction that is actually in the process of + * aborting or vice-versa. + */ continue; } - PQclear(result); - ForgetResults(connection); + /* + * Remove the transaction from the pending list such that only transactions + * that need to be aborted remain at the end. + */ + hash_search(pendingTransactionSet, transactionName, HASH_REMOVE, + &isTransactionPending); - ereport(NOTICE, (errmsg("recovered a prepared transaction on %s:%d", - nodeName, nodePort), - errcontext("%s", command->data))); - - if (shouldCommit) - { - committedTransactionList = lappend(committedTransactionList, - transactionName); - } - - recoveredTransactionCount += 1; - } - - /* we can remove the transaction records of confirmed transactions */ - foreach(committedTransactionCell, committedTransactionList) - { - char *transactionName = (char *) lfirst(committedTransactionCell); - - DeleteTransactionRecord(groupId, transactionName); - } - - MemoryContextReset(localContext); - MemoryContextSwitchTo(oldContext); - - return recoveredTransactionCount; -} - - -/* - * NameListDifference returns the difference between the bag of - * names in nameList and subtractList. Both are assumed to be - * sorted. We cannot use list_difference_ptr here since we need - * to compare the actual strings. - */ -static List * -NameListDifference(List *nameList, List *subtractList) -{ - List *differenceList = NIL; - ListCell *nameCell = NULL; - - int subtractIndex = 0; - int subtractCount = list_length(subtractList); - char **subtractArray = (char **) PointerArrayFromList(subtractList); - - foreach(nameCell, nameList) - { - char *baseName = (char *) lfirst(nameCell); - - bool nameFound = FindMatchingName(subtractArray, subtractCount, - baseName, &subtractIndex); - - if (!nameFound) + if (isTransactionPending) { /* - * baseName is not in subtractArray and thus included - * in the difference. + * The transaction was committed, but the prepared transaction still exists + * on the worker. Try committing it. */ - differenceList = lappend(differenceList, baseName); + bool shouldCommit = true; + bool commitSucceeded = RecoverPreparedTransactionOnWorker(connection, + transactionName, + shouldCommit); + if (!commitSucceeded) + { + /* + * Failed to commit on the current worker. Stop without throwing + * an error to allow recover_prepared_transactions to continue with + * other workers. + */ + recoveryFailed = true; + break; + } + + recoveredTransactionCount++; } + + /* we've successfully committed the prepared transaction, remove the record */ + simple_heap_delete(pgDistTransaction, &heapTuple->t_self); } - pfree(subtractArray); + systable_endscan(scanDescriptor); + heap_close(pgDistTransaction, NoLock); - return differenceList; -} - - -/* - * CompareNames compares names using strncmp. Its signature allows it to - * be used in qsort. - */ -static int -CompareNames(const void *leftPointer, const void *rightPointer) -{ - const char *leftString = *((char **) leftPointer); - const char *rightString = *((char **) rightPointer); - - int nameCompare = strncmp(leftString, rightString, NAMEDATALEN); - - return nameCompare; -} - - -/* - * FindMatchingName searches for name in nameArray, starting at the - * value pointed to by matchIndex and stopping at the first index of - * name which is greater or equal to needle. nameArray is assumed - * to be sorted. - * - * The function sets matchIndex to the index of the name and returns - * true if the name is equal to needle. If matchIndex >= nameCount, - * then the function always returns false. - */ -static bool -FindMatchingName(char **nameArray, int nameCount, char *needle, - int *matchIndex) -{ - bool foundMatchingName = false; - int searchIndex = *matchIndex; - int compareResult = -1; - - while (searchIndex < nameCount) + if (!recoveryFailed) { - char *testName = nameArray[searchIndex]; - compareResult = strncmp(needle, testName, NAMEDATALEN); + char *pendingTransactionName = NULL; + bool abortSucceeded = true; - if (compareResult <= 0) + /* + * All remaining prepared transactions that are not part of an in-progress + * distributed transaction should be aborted since we did not find a recovery + * record, which implies the disributed transaction aborted. + */ + hash_seq_init(&status, pendingTransactionSet); + + while ((pendingTransactionName = hash_seq_search(&status)) != NULL) { - break; + bool isTransactionInProgress = false; + bool shouldCommit = false; + + isTransactionInProgress = IsTransactionInProgress(activeTransactionNumberSet, + pendingTransactionName); + if (isTransactionInProgress) + { + continue; + } + + shouldCommit = false; + abortSucceeded = RecoverPreparedTransactionOnWorker(connection, + pendingTransactionName, + shouldCommit); + if (!abortSucceeded) + { + hash_seq_term(&status); + break; + } + + recoveredTransactionCount++; } - - searchIndex++; } - *matchIndex = searchIndex; + MemoryContextSwitchTo(oldContext); + MemoryContextDelete(localContext); - if (compareResult == 0) - { - foundMatchingName = true; - } - - return foundMatchingName; + return recoveredTransactionCount; } @@ -420,113 +359,73 @@ PendingWorkerTransactionList(MultiConnection *connection) /* - * UnconfirmedWorkerTransactionList returns a list of unconfirmed transactions - * for a group of workers from pg_dist_transaction. A transaction is confirmed - * once we have verified that it does not exist in pg_prepared_xacts on the - * remote node and the entry in pg_dist_transaction is removed. + * IsTransactionInProgress returns whether the distributed transaction to which + * preparedTransactionName belongs is still in progress, or false if the + * transaction name cannot be parsed. This can happen when the user manually + * inserts into pg_dist_transaction. */ -static List * -UnconfirmedWorkerTransactionsList(int groupId) +static bool +IsTransactionInProgress(HTAB *activeTransactionNumberSet, char *preparedTransactionName) { - List *transactionNameList = NIL; - Relation pgDistTransaction = NULL; - SysScanDesc scanDescriptor = NULL; - ScanKeyData scanKey[1]; - int scanKeyCount = 1; - bool indexOK = true; - HeapTuple heapTuple = NULL; + int groupId = 0; + int procId = 0; + uint32 connectionNumber = 0; + uint64 transactionNumber = 0; + bool isValidName = false; + bool isTransactionInProgress = false; - pgDistTransaction = heap_open(DistTransactionRelationId(), AccessShareLock); - - ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid, - BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId)); - - scanDescriptor = systable_beginscan(pgDistTransaction, - DistTransactionGroupIndexId(), indexOK, - NULL, scanKeyCount, scanKey); - - heapTuple = systable_getnext(scanDescriptor); - while (HeapTupleIsValid(heapTuple)) + isValidName = ParsePreparedTransactionName(preparedTransactionName, &groupId, &procId, + &transactionNumber, &connectionNumber); + if (isValidName) { - TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction); - bool isNull = false; - - Datum transactionNameDatum = heap_getattr(heapTuple, - Anum_pg_dist_transaction_gid, - tupleDescriptor, &isNull); - - char *transactionName = TextDatumGetCString(transactionNameDatum); - transactionNameList = lappend(transactionNameList, transactionName); - - heapTuple = systable_getnext(scanDescriptor); + hash_search(activeTransactionNumberSet, &transactionNumber, HASH_FIND, + &isTransactionInProgress); } - systable_endscan(scanDescriptor); - heap_close(pgDistTransaction, AccessShareLock); - - return transactionNameList; + return isTransactionInProgress; } /* - * DeleteTransactionRecord opens the pg_dist_transaction system catalog, finds the - * first (unique) row that corresponds to the given transactionName and worker node, - * and deletes this row. + * RecoverPreparedTransactionOnWorker recovers a single prepared transaction over + * the given connection. If shouldCommit is true we send */ -static void -DeleteTransactionRecord(int32 groupId, char *transactionName) +static bool +RecoverPreparedTransactionOnWorker(MultiConnection *connection, char *transactionName, + bool shouldCommit) { - Relation pgDistTransaction = NULL; - SysScanDesc scanDescriptor = NULL; - ScanKeyData scanKey[2]; - int scanKeyCount = 2; - bool indexOK = true; - HeapTuple heapTuple = NULL; - bool heapTupleFound = false; + StringInfo command = makeStringInfo(); + PGresult *result = NULL; + int executeCommand = 0; + bool raiseInterrupts = false; - pgDistTransaction = heap_open(DistTransactionRelationId(), RowExclusiveLock); - - ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid, - BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId)); - ScanKeyInit(&scanKey[1], Anum_pg_dist_transaction_gid, - BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(transactionName)); - - scanDescriptor = systable_beginscan(pgDistTransaction, - DistTransactionRecordIndexId(), indexOK, - NULL, scanKeyCount, scanKey); - - heapTuple = systable_getnext(scanDescriptor); - while (HeapTupleIsValid(heapTuple)) + if (shouldCommit) { - TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction); - bool isNull = false; - - Datum gidDatum = heap_getattr(heapTuple, - Anum_pg_dist_transaction_gid, - tupleDescriptor, &isNull); - - char *gid = TextDatumGetCString(gidDatum); - - if (strncmp(transactionName, gid, NAMEDATALEN) == 0) - { - heapTupleFound = true; - break; - } - - heapTuple = systable_getnext(scanDescriptor); + /* should have committed this prepared transaction */ + appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName); + } + else + { + /* should have aborted this prepared transaction */ + appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName); } - /* if we couldn't find the transaction record to delete, error out */ - if (!heapTupleFound) + executeCommand = ExecuteOptionalRemoteCommand(connection, command->data, &result); + if (executeCommand == QUERY_SEND_FAILED) { - ereport(ERROR, (errmsg("could not find valid entry for transaction record " - "'%s' in group %d", - transactionName, groupId))); + return false; + } + if (executeCommand == RESPONSE_NOT_OKAY) + { + return false; } - simple_heap_delete(pgDistTransaction, &heapTuple->t_self); - CommandCounterIncrement(); + PQclear(result); + ClearResults(connection, raiseInterrupts); - systable_endscan(scanDescriptor); - heap_close(pgDistTransaction, RowExclusiveLock); + ereport(LOG, (errmsg("recovered a prepared transaction on %s:%d", + connection->hostname, connection->port), + errcontext("%s", command->data))); + + return true; } diff --git a/src/backend/distributed/utils/listutils.c b/src/backend/distributed/utils/listutils.c index 56224eaac..1735ff3e9 100644 --- a/src/backend/distributed/utils/listutils.c +++ b/src/backend/distributed/utils/listutils.c @@ -102,3 +102,49 @@ DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId) return arrayObject; } + + +/* + * ListToHashSet creates a hash table in which the keys are copied from + * from itemList and the values are the same as the keys. This can + * be used for fast lookups of the presence of a byte array in a set. + * + * If isStringList is true, then look-ups are performed through string + * comparison of strings up to keySize in length. If isStringList is + * false, then look-ups are performed by comparing exactly keySize + * bytes pointed to by the pointers in itemList. + */ +HTAB * +ListToHashSet(List *itemList, Size keySize, bool isStringList) +{ + HASHCTL info; + HTAB *itemSet = NULL; + ListCell *itemCell = NULL; + int flags = HASH_ELEM; + + /* allocate sufficient capacity for O(1) expected look-up time */ + int capacity = (int) (list_length(itemList) / 0.75) + 1; + + /* initialise the hash table for looking up keySize bytes */ + memset(&info, 0, sizeof(info)); + info.keysize = keySize; + info.entrysize = keySize; + + if (!isStringList) + { + info.hash = tag_hash; + flags |= HASH_FUNCTION; + } + + itemSet = hash_create("ListToHashSet", capacity, &info, flags); + + foreach(itemCell, itemList) + { + void *item = lfirst(itemCell); + bool foundInSet = false; + + hash_search(itemSet, item, HASH_ENTER, &foundInSet); + } + + return itemSet; +} diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index 2231885ef..5388f8d42 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -43,5 +43,6 @@ extern void AssignDistributedTransactionId(void); extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); extern void CancelTransactionDueToDeadlock(PGPROC *proc); extern bool MyBackendGotCancelledDueToDeadlock(void); +extern List * ActiveDistributedTransactionNumbers(void); #endif /* BACKEND_DATA_H */ diff --git a/src/include/distributed/listutils.h b/src/include/distributed/listutils.h index 4d205f046..f08807190 100644 --- a/src/include/distributed/listutils.h +++ b/src/include/distributed/listutils.h @@ -17,6 +17,7 @@ #include "nodes/pg_list.h" #include "utils/array.h" +#include "utils/hsearch.h" /* utility functions declaration shared within this module */ @@ -25,5 +26,6 @@ extern List * SortList(List *pointerList, extern void ** PointerArrayFromList(List *pointerList); extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId); +extern HTAB * ListToHashSet(List *pointerList, Size keySize, bool isStringList); #endif /* CITUS_LISTUTILS_H */ diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h index 30ab3bb92..d3bd2ccf2 100644 --- a/src/include/distributed/remote_transaction.h +++ b/src/include/distributed/remote_transaction.h @@ -79,6 +79,11 @@ typedef struct RemoteTransaction } RemoteTransaction; +/* utility functions for dealing with remote transactions */ +extern bool ParsePreparedTransactionName(char *preparedTransactionName, int *groupId, + int *procId, uint64 *transactionNumber, + uint32 *connectionNumber); + /* change an individual remote transaction's state */ extern void StartRemoteTransactionBegin(struct MultiConnection *connection); extern void FinishRemoteTransactionBegin(struct MultiConnection *connection); diff --git a/src/test/regress/expected/isolation_transaction_recovery.out b/src/test/regress/expected/isolation_transaction_recovery.out index b1d007401..5d3a12121 100644 --- a/src/test/regress/expected/isolation_transaction_recovery.out +++ b/src/test/regress/expected/isolation_transaction_recovery.out @@ -15,8 +15,7 @@ recover_prepared_transactions 0 step s2-insert: INSERT INTO test_transaction_recovery VALUES (1,2); - + step s1-commit: COMMIT; -step s2-insert: <... completed> diff --git a/src/test/regress/expected/multi_mx_metadata.out b/src/test/regress/expected/multi_mx_metadata.out index 5e3e01700..004e04d91 100644 --- a/src/test/regress/expected/multi_mx_metadata.out +++ b/src/test/regress/expected/multi_mx_metadata.out @@ -259,12 +259,6 @@ SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport = :worker_1_po INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_commit'); INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_be_forgotten'); SELECT recover_prepared_transactions(); -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: ROLLBACK PREPARED 'citus_0_should_abort' -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: ROLLBACK PREPARED 'citus_0_should_be_sorted_into_middle' -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: COMMIT PREPARED 'citus_0_should_commit' recover_prepared_transactions ------------------------------- 3 diff --git a/src/test/regress/expected/multi_mx_transaction_recovery.out b/src/test/regress/expected/multi_mx_transaction_recovery.out index 4e3e33e5e..adc8c4bbb 100644 --- a/src/test/regress/expected/multi_mx_transaction_recovery.out +++ b/src/test/regress/expected/multi_mx_transaction_recovery.out @@ -46,12 +46,6 @@ PREPARE TRANSACTION 'citus_12_should_be_sorted_into_middle'; INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_commit'); INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_be_forgotten'); SELECT recover_prepared_transactions(); -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: ROLLBACK PREPARED 'citus_12_should_abort' -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: ROLLBACK PREPARED 'citus_12_should_be_sorted_into_middle' -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: COMMIT PREPARED 'citus_12_should_commit' recover_prepared_transactions ------------------------------- 3 diff --git a/src/test/regress/expected/multi_transaction_recovery.out b/src/test/regress/expected/multi_transaction_recovery.out index 3643ee8b3..245160e9b 100644 --- a/src/test/regress/expected/multi_transaction_recovery.out +++ b/src/test/regress/expected/multi_transaction_recovery.out @@ -28,12 +28,6 @@ PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle'; INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit'); INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten'); SELECT recover_prepared_transactions(); -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: ROLLBACK PREPARED 'citus_0_should_abort' -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: ROLLBACK PREPARED 'citus_0_should_be_sorted_into_middle' -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: COMMIT PREPARED 'citus_0_should_commit' recover_prepared_transactions ------------------------------- 3 diff --git a/src/test/regress/specs/isolation_transaction_recovery.spec b/src/test/regress/specs/isolation_transaction_recovery.spec index 3bdc0c9bd..019f1931d 100644 --- a/src/test/regress/specs/isolation_transaction_recovery.spec +++ b/src/test/regress/specs/isolation_transaction_recovery.spec @@ -33,5 +33,5 @@ step "s2-insert" INSERT INTO test_transaction_recovery VALUES (1,2); } -# Recovery and 2PCs should block each other +# Recovery and 2PCs should not block each other permutation "s1-begin" "s1-recover" "s2-insert" "s1-commit" From ae47df01ea22d3f7222115bcc5ca4eff6ac889fd Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 16 Nov 2017 12:43:26 +0100 Subject: [PATCH 2/4] Observe prepared xacts twice in RecoverWorkerTransactions to avoid race condition --- .../distributed/transaction/backend_data.c | 9 +- .../transaction/transaction_recovery.c | 102 +++++++++++++++++- src/backend/distributed/utils/listutils.c | 4 +- 3 files changed, 107 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index a6dba0a38..7356f1cf9 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -644,7 +644,8 @@ MyBackendGotCancelledDueToDeadlock(void) /* * ActiveDistributedTransactionNumbers returns a list of pointers to - * transaction numbers of distributed transactions that are in progress. + * transaction numbers of distributed transactions that are in progress + * and were started by the node on which it is called. */ List * ActiveDistributedTransactionNumbers(void) @@ -673,6 +674,12 @@ ActiveDistributedTransactionNumbers(void) continue; } + if (!currentBackendData.transactionId.transactionOriginator) + { + /* not a coordinator process */ + continue; + } + transactionNumber = (uint64 *) palloc0(sizeof(uint64)); *transactionNumber = currentBackendData.transactionId.transactionNumber; diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 1ef581522..2b16d7769 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -150,6 +150,8 @@ RecoverWorkerTransactions(WorkerNode *workerNode) List *pendingTransactionList = NIL; HTAB *pendingTransactionSet = NULL; + List *recheckTransactionList = NIL; + HTAB *recheckTransactionSet = NULL; Relation pgDistTransaction = NULL; SysScanDesc scanDescriptor = NULL; @@ -186,23 +188,60 @@ RecoverWorkerTransactions(WorkerNode *workerNode) pgDistTransaction = heap_open(DistTransactionRelationId(), ShareUpdateExclusiveLock); tupleDescriptor = RelationGetDescr(pgDistTransaction); - /* find in-progress distributed transactions */ - activeTransactionNumberList = ActiveDistributedTransactionNumbers(); - activeTransactionNumberSet = ListToHashSet(activeTransactionNumberList, - sizeof(uint64), false); + /* + * We're going to check the list of prepared transactions on the worker, + * but some of those prepared transactions might belong to ongoing + * distributed transactions. + * + * We could avoid this by temporarily blocking new prepared transactions + * from being created by taking an ExlusiveLock on pg_dist_transaction. + * However, this hurts write performance, so instead we avoid blocking + * by consulting the list of active distributed transactions, and follow + * a carefully chosen order to avoid race conditions: + * + * 1) P = prepared transactions on worker + * 2) A = active distributed transactions + * 3) T = pg_dist_transaction snapshot + * 4) Q = prepared transactions on worker + * + * By observing A after P, we get a conclusive answer to which distributed + * transactions we observed in P are still in progress. It is safe to recover + * the transactions in P - A based on the presence or absence of a record + * in T. + * + * We also remove records in T if there is no prepared transaction, which + * we assume means the transaction committed. However, a transaction could + * have left prepared transactions and committed between steps 1 and 2. + * In that case, we would incorrectly remove the records, while the + * prepared transaction is still in place. + * + * We therefore observe the set of prepared transactions one more time in + * step 4. The aforementioned transactions would show up in Q, but not in + * P. We can skip those transactions and recover them later. + */ /* find stale prepared transactions on the remote node */ pendingTransactionList = PendingWorkerTransactionList(connection); pendingTransactionSet = ListToHashSet(pendingTransactionList, NAMEDATALEN, true); + /* find in-progress distributed transactions */ + activeTransactionNumberList = ActiveDistributedTransactionNumbers(); + activeTransactionNumberSet = ListToHashSet(activeTransactionNumberList, + sizeof(uint64), false); + /* scan through all recovery records of the current worker */ ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId)); + /* get a snapshot of pg_dist_transaction */ scanDescriptor = systable_beginscan(pgDistTransaction, DistTransactionGroupIndexId(), indexOK, NULL, scanKeyCount, scanKey); + /* find stale prepared transactions on the remote node */ + recheckTransactionList = PendingWorkerTransactionList(connection); + recheckTransactionSet = ListToHashSet(recheckTransactionList, NAMEDATALEN, true); + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) { bool isNull = false; @@ -255,9 +294,62 @@ RecoverWorkerTransactions(WorkerNode *workerNode) } recoveredTransactionCount++; + + /* + * We successfully committed the prepared transaction, safe to delete + * the recovery record. + */ + } + else + { + bool foundNewPreparedTransaction = false; + + /* + * We found a committed pg_dist_transaction record without a prepared + * transaction. In the typical case, this means that the prepared + * transaction was successfully committed from the post-commit callback + * and we can therefore delete the recovery record. However, there is a + * race condition. + * + * If a transaction started and committed just after we observed the + * set of prepared transactions, and just before we took our + * pg_dist_transaction snapshot, then we would see its records, but it may + * have prepared transactions that failed to commit and that we did not + * observe. We should not delete the records, since we would otherwise + * roll back the prepared transaction on the next call to + * recover_prepared_transactions. + * + * In addition, if the transaction started after the call it + * ActiveDistributedTransactionNumbers, then it may still be in the process + * of comitting the prepared transactions in the post-commit callback. + * + * We check the prepared transactions again after taking the + * pg_dist_transaction snapshot. Any prepared transactions that appear + * in this set, but not in pendingTransactionSet, may have been created + * before the pg_dist_transaction snapshot. We skip deleting the records + * for those transactions since there are prepared transactions that + * need to be commit, but we cannot commit them now, since they might + * still be getting committed and committing them twice would result in + * failures. + * + * In this case, we just leave the records and prepared transactions + * for the next call to recover_prepared_transactions. + */ + hash_search(recheckTransactionSet, transactionName, HASH_FIND, + &foundNewPreparedTransaction); + + if (foundNewPreparedTransaction) + { + continue; + } + + /* + * There is a recovery record and definitely no prepared transaction, it + * must have been committed immediately, so it's safe to delete the + * recovery record. + */ } - /* we've successfully committed the prepared transaction, remove the record */ simple_heap_delete(pgDistTransaction, &heapTuple->t_self); } diff --git a/src/backend/distributed/utils/listutils.c b/src/backend/distributed/utils/listutils.c index 1735ff3e9..2b7a68ddc 100644 --- a/src/backend/distributed/utils/listutils.c +++ b/src/backend/distributed/utils/listutils.c @@ -108,6 +108,7 @@ DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId) * ListToHashSet creates a hash table in which the keys are copied from * from itemList and the values are the same as the keys. This can * be used for fast lookups of the presence of a byte array in a set. + * itemList should be a list of pointers. * * If isStringList is true, then look-ups are performed through string * comparison of strings up to keySize in length. If isStringList is @@ -132,8 +133,7 @@ ListToHashSet(List *itemList, Size keySize, bool isStringList) if (!isStringList) { - info.hash = tag_hash; - flags |= HASH_FUNCTION; + flags |= HASH_BLOBS; } itemSet = hash_create("ListToHashSet", capacity, &info, flags); From fe798cf0f99d98962c6cfd28332bfa06febb2f74 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 20 Nov 2017 11:51:50 +0100 Subject: [PATCH 3/4] Add recovery vs. recovery isolation test --- .../isolation_distributed_transaction_id.out | 4 ++-- .../isolation_dump_global_wait_edges.out | 18 +++++++------- .../isolation_replace_wait_function.out | 2 +- .../isolation_transaction_recovery.out | 24 +++++++++++++++++++ .../specs/isolation_transaction_recovery.spec | 8 +++++++ 5 files changed, 44 insertions(+), 12 deletions(-) diff --git a/src/test/regress/expected/isolation_distributed_transaction_id.out b/src/test/regress/expected/isolation_distributed_transaction_id.out index e9dc48dc5..7188d948f 100644 --- a/src/test/regress/expected/isolation_distributed_transaction_id.out +++ b/src/test/regress/expected/isolation_distributed_transaction_id.out @@ -98,7 +98,7 @@ step s1-get-current-transaction-id: row -(0,287) +(0,289) step s2-get-first-worker-active-transactions: SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number) FROM @@ -109,4 +109,4 @@ step s2-get-first-worker-active-transactions: nodename nodeport success result -localhost 57637 t (0,287) +localhost 57637 t (0,289) diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out index 16de25852..cd4a3193c 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -29,11 +29,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -290 289 f +292 291 f transactionnumberwaitingtransactionnumbers -289 -290 289 +291 +292 291 step s1-abort: ABORT; @@ -77,14 +77,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -294 293 f -295 293 f -295 294 t +296 295 f +297 295 f +297 296 t transactionnumberwaitingtransactionnumbers -293 -294 293 -295 293,294 +295 +296 295 +297 295,296 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_replace_wait_function.out b/src/test/regress/expected/isolation_replace_wait_function.out index b51e6ac4d..784b25224 100644 --- a/src/test/regress/expected/isolation_replace_wait_function.out +++ b/src/test/regress/expected/isolation_replace_wait_function.out @@ -16,7 +16,7 @@ step s1-finish: COMMIT; step s2-insert: <... completed> -error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102320" +error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102321" step s2-finish: COMMIT; diff --git a/src/test/regress/expected/isolation_transaction_recovery.out b/src/test/regress/expected/isolation_transaction_recovery.out index 5d3a12121..411a75feb 100644 --- a/src/test/regress/expected/isolation_transaction_recovery.out +++ b/src/test/regress/expected/isolation_transaction_recovery.out @@ -19,3 +19,27 @@ step s2-insert: step s1-commit: COMMIT; + +starting permutation: s1-begin s1-recover s2-recover s1-commit +create_reference_table + + +step s1-begin: + BEGIN; + +step s1-recover: + SELECT recover_prepared_transactions(); + +recover_prepared_transactions + +0 +step s2-recover: + SELECT recover_prepared_transactions(); + +step s1-commit: + COMMIT; + +step s2-recover: <... completed> +recover_prepared_transactions + +0 diff --git a/src/test/regress/specs/isolation_transaction_recovery.spec b/src/test/regress/specs/isolation_transaction_recovery.spec index 019f1931d..5c1ee7557 100644 --- a/src/test/regress/specs/isolation_transaction_recovery.spec +++ b/src/test/regress/specs/isolation_transaction_recovery.spec @@ -33,5 +33,13 @@ step "s2-insert" INSERT INTO test_transaction_recovery VALUES (1,2); } +step "s2-recover" +{ + SELECT recover_prepared_transactions(); +} + # Recovery and 2PCs should not block each other permutation "s1-begin" "s1-recover" "s2-insert" "s1-commit" + +# Recovery should not run concurrently +permutation "s1-begin" "s1-recover" "s2-recover" "s1-commit" From 9793218122cbff2756558b192c3fc12f2e03ff3a Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 20 Nov 2017 13:18:31 +0100 Subject: [PATCH 4/4] Do not commit already-committed prepared transactions in recovery --- .../transaction/transaction_recovery.c | 79 ++++++++++--------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 2b16d7769..38841e8e3 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -246,7 +246,8 @@ RecoverWorkerTransactions(WorkerNode *workerNode) { bool isNull = false; bool isTransactionInProgress = false; - bool isTransactionPending = false; + bool foundPreparedTransactionBeforeCommit = false; + bool foundPreparedTransactionAfterCommit = false; Datum transactionNameDatum = heap_getattr(heapTuple, Anum_pg_dist_transaction_gid, @@ -270,13 +271,20 @@ RecoverWorkerTransactions(WorkerNode *workerNode) * that need to be aborted remain at the end. */ hash_search(pendingTransactionSet, transactionName, HASH_REMOVE, - &isTransactionPending); + &foundPreparedTransactionBeforeCommit); - if (isTransactionPending) + hash_search(recheckTransactionSet, transactionName, HASH_FIND, + &foundPreparedTransactionAfterCommit); + + if (foundPreparedTransactionBeforeCommit && foundPreparedTransactionAfterCommit) { /* * The transaction was committed, but the prepared transaction still exists * on the worker. Try committing it. + * + * We double check that the recovery record exists both before and after + * checking ActiveDistributedTransactionNumbers(), since we may have + * observed a prepared transaction that was committed immediately after. */ bool shouldCommit = true; bool commitSucceeded = RecoverPreparedTransactionOnWorker(connection, @@ -300,53 +308,46 @@ RecoverWorkerTransactions(WorkerNode *workerNode) * the recovery record. */ } - else + else if (foundPreparedTransactionAfterCommit) { - bool foundNewPreparedTransaction = false; - /* - * We found a committed pg_dist_transaction record without a prepared - * transaction. In the typical case, this means that the prepared - * transaction was successfully committed from the post-commit callback - * and we can therefore delete the recovery record. However, there is a - * race condition. + * We found a committed pg_dist_transaction record that initially did + * not have a prepared transaction, but did when we checked again. * * If a transaction started and committed just after we observed the - * set of prepared transactions, and just before we took our - * pg_dist_transaction snapshot, then we would see its records, but it may - * have prepared transactions that failed to commit and that we did not - * observe. We should not delete the records, since we would otherwise - * roll back the prepared transaction on the next call to + * set of prepared transactions, and just before we called + * ActiveDistributedTransactionNumbers, then we would see a recovery + * record without a prepared transaction in pendingTransactionSet, + * but there may be prepared transactions that failed to commit. + * We should not delete the records for those prepared transactions, + * since we would otherwise roll back them on the next call to * recover_prepared_transactions. * - * In addition, if the transaction started after the call it - * ActiveDistributedTransactionNumbers, then it may still be in the process - * of comitting the prepared transactions in the post-commit callback. + * In addition, if the transaction started after the call to + * ActiveDistributedTransactionNumbers and finished just before our + * pg_dist_transaction snapshot, then it may still be in the process + * of comitting the prepared transactions in the post-commit callback + * and we should not touch the prepared transactions. * - * We check the prepared transactions again after taking the - * pg_dist_transaction snapshot. Any prepared transactions that appear - * in this set, but not in pendingTransactionSet, may have been created - * before the pg_dist_transaction snapshot. We skip deleting the records - * for those transactions since there are prepared transactions that - * need to be commit, but we cannot commit them now, since they might - * still be getting committed and committing them twice would result in - * failures. - * - * In this case, we just leave the records and prepared transactions - * for the next call to recover_prepared_transactions. + * To handle these cases, we just leave the records and prepared + * transactions for the next call to recover_prepared_transactions + * and skip them here. */ - hash_search(recheckTransactionSet, transactionName, HASH_FIND, - &foundNewPreparedTransaction); - - if (foundNewPreparedTransaction) - { - continue; - } + continue; + } + else + { /* - * There is a recovery record and definitely no prepared transaction, it - * must have been committed immediately, so it's safe to delete the + * We found a recovery record without any prepared transaction. It + * must have already been committed, so it's safe to delete the * recovery record. + * + * Transactions that started after we observed pendingTransactionSet, + * but successfully committed their prepared transactions before + * ActiveDistributedTransactionNumbers are indistinguishable from + * transactions that committed at an earlier time, in which case it's + * safe delete the recovery record as well. */ }