diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index dad7372b5..a6dba0a38 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -640,3 +640,45 @@ MyBackendGotCancelledDueToDeadlock(void) return cancelledDueToDeadlock; } + + +/* + * ActiveDistributedTransactionNumbers returns a list of pointers to + * transaction numbers of distributed transactions that are in progress. + */ +List * +ActiveDistributedTransactionNumbers(void) +{ + List *activeTransactionNumberList = NIL; + int curBackend = 0; + + /* build list of starting procs */ + for (curBackend = 0; curBackend < MaxBackends; curBackend++) + { + PGPROC *currentProc = &ProcGlobal->allProcs[curBackend]; + BackendData currentBackendData; + uint64 *transactionNumber = NULL; + + if (currentProc->pid == 0) + { + /* unused PGPROC slot */ + continue; + } + + GetBackendDataForProc(currentProc, ¤tBackendData); + + if (!IsInDistributedTransaction(¤tBackendData)) + { + /* not a distributed transaction */ + continue; + } + + transactionNumber = (uint64 *) palloc0(sizeof(uint64)); + *transactionNumber = currentBackendData.transactionId.transactionNumber; + + activeTransactionNumberList = lappend(activeTransactionNumberList, + transactionNumber); + } + + return activeTransactionNumberList; +} diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 615caf6dd..35a63c52b 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -27,6 +27,9 @@ #include "utils/hsearch.h" +#define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u" + + static void StartRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactionId subId); static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection, @@ -1241,11 +1244,36 @@ Assign2PCIdentifier(MultiConnection *connection) /* print all numbers as unsigned to guarantee no minus symbols appear in the name */ snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN, - "citus_%u_%u_"UINT64_FORMAT "_%u", GetLocalGroupId(), MyProcPid, + PREPARED_TRANSACTION_NAME_FORMAT, GetLocalGroupId(), MyProcPid, transactionNumber, connectionNumber++); } +/* + * ParsePreparedTransactionName parses a prepared transaction name to extract + * the initiator group ID, initiator process ID, distributed transaction number, + * and the connection number. If the transaction name does not match the expected + * format ParsePreparedTransactionName returns false, and true otherwise. + */ +bool +ParsePreparedTransactionName(char *preparedTransactionName, int *groupId, int *procId, + uint64 *transactionNumber, uint32 *connectionNumber) +{ + const int expectedFieldCount = 4; + int parsedFieldCount = 0; + bool nameValid = false; + + parsedFieldCount = sscanf(preparedTransactionName, PREPARED_TRANSACTION_NAME_FORMAT, + groupId, procId, transactionNumber, connectionNumber); + if (parsedFieldCount == expectedFieldCount) + { + nameValid = true; + } + + return nameValid; +} + + /* * WarnAboutLeakedPreparedTransaction issues a WARNING explaining that a * prepared transaction could not be committed or rolled back, and explains diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index a79557d97..1ef581522 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -24,6 +24,7 @@ #include "access/relscan.h" #include "access/xact.h" #include "catalog/indexing.h" +#include "distributed/backend_data.h" #include "distributed/connection_management.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" @@ -47,13 +48,11 @@ PG_FUNCTION_INFO_V1(recover_prepared_transactions); /* Local functions forward declarations */ static int RecoverPreparedTransactions(void); static int RecoverWorkerTransactions(WorkerNode *workerNode); -static List * NameListDifference(List *nameList, List *subtractList); -static int CompareNames(const void *leftPointer, const void *rightPointer); -static bool FindMatchingName(char **nameArray, int nameCount, char *needle, - int *matchIndex); static List * PendingWorkerTransactionList(MultiConnection *connection); -static List * UnconfirmedWorkerTransactionsList(int groupId); -static void DeleteTransactionRecord(int32 groupId, char *transactionName); +static bool IsTransactionInProgress(HTAB *activeTransactionNumberSet, + char *preparedTransactionName); +static bool RecoverPreparedTransactionOnWorker(MultiConnection *connection, + char *transactionName, bool shouldCommit); /* @@ -120,13 +119,6 @@ RecoverPreparedTransactions(void) ListCell *workerNodeCell = NULL; int recoveredTransactionCount = 0; - /* - * We block here if metadata transactions are ongoing, since we - * mustn't commit/abort their prepared transactions under their - * feet. We also prevent concurrent recovery. - */ - LockRelationOid(DistTransactionRelationId(), ExclusiveLock); - workerList = ActivePrimaryNodeList(); foreach(workerNodeCell, workerList) @@ -153,25 +145,33 @@ RecoverWorkerTransactions(WorkerNode *workerNode) char *nodeName = workerNode->workerName; int nodePort = workerNode->workerPort; + List *activeTransactionNumberList = NIL; + HTAB *activeTransactionNumberSet = NULL; + List *pendingTransactionList = NIL; - ListCell *pendingTransactionCell = NULL; + HTAB *pendingTransactionSet = NULL; - List *unconfirmedTransactionList = NIL; - char **unconfirmedTransactionArray = NULL; - int unconfirmedTransactionCount = 0; - int unconfirmedTransactionIndex = 0; + Relation pgDistTransaction = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + bool indexOK = true; + HeapTuple heapTuple = NULL; + TupleDesc tupleDescriptor = NULL; - List *committedTransactionList = NIL; - ListCell *committedTransactionCell = NULL; + HASH_SEQ_STATUS status; MemoryContext localContext = NULL; MemoryContext oldContext = NULL; + bool recoveryFailed = false; int connectionFlags = SESSION_LIFESPAN; MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); - if (connection->pgConn == NULL) + if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK) { - /* cannot recover transactions on this worker right now */ + ereport(WARNING, (errmsg("transaction recovery cannot connect to %s:%d", + nodeName, nodePort))); + return 0; } @@ -182,191 +182,130 @@ RecoverWorkerTransactions(WorkerNode *workerNode) ALLOCSET_DEFAULT_MAXSIZE); oldContext = MemoryContextSwitchTo(localContext); - /* find transactions that were committed, but not yet confirmed */ - unconfirmedTransactionList = UnconfirmedWorkerTransactionsList(groupId); - unconfirmedTransactionList = SortList(unconfirmedTransactionList, CompareNames); + /* take table lock first to avoid running concurrently */ + pgDistTransaction = heap_open(DistTransactionRelationId(), ShareUpdateExclusiveLock); + tupleDescriptor = RelationGetDescr(pgDistTransaction); - /* convert list to an array to use with FindMatchingNames */ - unconfirmedTransactionCount = list_length(unconfirmedTransactionList); - unconfirmedTransactionArray = - (char **) PointerArrayFromList(unconfirmedTransactionList); + /* find in-progress distributed transactions */ + activeTransactionNumberList = ActiveDistributedTransactionNumbers(); + activeTransactionNumberSet = ListToHashSet(activeTransactionNumberList, + sizeof(uint64), false); /* find stale prepared transactions on the remote node */ pendingTransactionList = PendingWorkerTransactionList(connection); - pendingTransactionList = SortList(pendingTransactionList, CompareNames); + pendingTransactionSet = ListToHashSet(pendingTransactionList, NAMEDATALEN, true); - /* - * Transactions that have no pending prepared transaction are assumed to - * have been committed. Any records in unconfirmedTransactionList that - * don't have a transaction in pendingTransactionList can be removed. - */ - committedTransactionList = NameListDifference(unconfirmedTransactionList, - pendingTransactionList); + /* scan through all recovery records of the current worker */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId)); - /* - * For each pending prepared transaction, check whether there is a transaction - * record. If so, commit. If not, the transaction that started the transaction - * must have rolled back and thus the prepared transaction should be aborted. - */ - foreach(pendingTransactionCell, pendingTransactionList) + scanDescriptor = systable_beginscan(pgDistTransaction, + DistTransactionGroupIndexId(), indexOK, + NULL, scanKeyCount, scanKey); + + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) { - char *transactionName = (char *) lfirst(pendingTransactionCell); - StringInfo command = makeStringInfo(); - int executeCommand = 0; - PGresult *result = NULL; + bool isNull = false; + bool isTransactionInProgress = false; + bool isTransactionPending = false; - bool shouldCommit = FindMatchingName(unconfirmedTransactionArray, - unconfirmedTransactionCount, - transactionName, - &unconfirmedTransactionIndex); + Datum transactionNameDatum = heap_getattr(heapTuple, + Anum_pg_dist_transaction_gid, + tupleDescriptor, &isNull); + char *transactionName = TextDatumGetCString(transactionNameDatum); - if (shouldCommit) + isTransactionInProgress = IsTransactionInProgress(activeTransactionNumberSet, + transactionName); + if (isTransactionInProgress) { - /* should have committed this prepared transaction */ - appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName); - } - else - { - /* no record of this prepared transaction, abort */ - appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName); - } - - executeCommand = ExecuteOptionalRemoteCommand(connection, command->data, &result); - if (executeCommand == QUERY_SEND_FAILED) - { - break; - } - if (executeCommand == RESPONSE_NOT_OKAY) - { - /* cannot recover this transaction right now */ + /* + * Do not touch in progress transactions as we might mistakenly + * commit a transaction that is actually in the process of + * aborting or vice-versa. + */ continue; } - PQclear(result); - ForgetResults(connection); + /* + * Remove the transaction from the pending list such that only transactions + * that need to be aborted remain at the end. + */ + hash_search(pendingTransactionSet, transactionName, HASH_REMOVE, + &isTransactionPending); - ereport(NOTICE, (errmsg("recovered a prepared transaction on %s:%d", - nodeName, nodePort), - errcontext("%s", command->data))); - - if (shouldCommit) - { - committedTransactionList = lappend(committedTransactionList, - transactionName); - } - - recoveredTransactionCount += 1; - } - - /* we can remove the transaction records of confirmed transactions */ - foreach(committedTransactionCell, committedTransactionList) - { - char *transactionName = (char *) lfirst(committedTransactionCell); - - DeleteTransactionRecord(groupId, transactionName); - } - - MemoryContextReset(localContext); - MemoryContextSwitchTo(oldContext); - - return recoveredTransactionCount; -} - - -/* - * NameListDifference returns the difference between the bag of - * names in nameList and subtractList. Both are assumed to be - * sorted. We cannot use list_difference_ptr here since we need - * to compare the actual strings. - */ -static List * -NameListDifference(List *nameList, List *subtractList) -{ - List *differenceList = NIL; - ListCell *nameCell = NULL; - - int subtractIndex = 0; - int subtractCount = list_length(subtractList); - char **subtractArray = (char **) PointerArrayFromList(subtractList); - - foreach(nameCell, nameList) - { - char *baseName = (char *) lfirst(nameCell); - - bool nameFound = FindMatchingName(subtractArray, subtractCount, - baseName, &subtractIndex); - - if (!nameFound) + if (isTransactionPending) { /* - * baseName is not in subtractArray and thus included - * in the difference. + * The transaction was committed, but the prepared transaction still exists + * on the worker. Try committing it. */ - differenceList = lappend(differenceList, baseName); + bool shouldCommit = true; + bool commitSucceeded = RecoverPreparedTransactionOnWorker(connection, + transactionName, + shouldCommit); + if (!commitSucceeded) + { + /* + * Failed to commit on the current worker. Stop without throwing + * an error to allow recover_prepared_transactions to continue with + * other workers. + */ + recoveryFailed = true; + break; + } + + recoveredTransactionCount++; } + + /* we've successfully committed the prepared transaction, remove the record */ + simple_heap_delete(pgDistTransaction, &heapTuple->t_self); } - pfree(subtractArray); + systable_endscan(scanDescriptor); + heap_close(pgDistTransaction, NoLock); - return differenceList; -} - - -/* - * CompareNames compares names using strncmp. Its signature allows it to - * be used in qsort. - */ -static int -CompareNames(const void *leftPointer, const void *rightPointer) -{ - const char *leftString = *((char **) leftPointer); - const char *rightString = *((char **) rightPointer); - - int nameCompare = strncmp(leftString, rightString, NAMEDATALEN); - - return nameCompare; -} - - -/* - * FindMatchingName searches for name in nameArray, starting at the - * value pointed to by matchIndex and stopping at the first index of - * name which is greater or equal to needle. nameArray is assumed - * to be sorted. - * - * The function sets matchIndex to the index of the name and returns - * true if the name is equal to needle. If matchIndex >= nameCount, - * then the function always returns false. - */ -static bool -FindMatchingName(char **nameArray, int nameCount, char *needle, - int *matchIndex) -{ - bool foundMatchingName = false; - int searchIndex = *matchIndex; - int compareResult = -1; - - while (searchIndex < nameCount) + if (!recoveryFailed) { - char *testName = nameArray[searchIndex]; - compareResult = strncmp(needle, testName, NAMEDATALEN); + char *pendingTransactionName = NULL; + bool abortSucceeded = true; - if (compareResult <= 0) + /* + * All remaining prepared transactions that are not part of an in-progress + * distributed transaction should be aborted since we did not find a recovery + * record, which implies the disributed transaction aborted. + */ + hash_seq_init(&status, pendingTransactionSet); + + while ((pendingTransactionName = hash_seq_search(&status)) != NULL) { - break; + bool isTransactionInProgress = false; + bool shouldCommit = false; + + isTransactionInProgress = IsTransactionInProgress(activeTransactionNumberSet, + pendingTransactionName); + if (isTransactionInProgress) + { + continue; + } + + shouldCommit = false; + abortSucceeded = RecoverPreparedTransactionOnWorker(connection, + pendingTransactionName, + shouldCommit); + if (!abortSucceeded) + { + hash_seq_term(&status); + break; + } + + recoveredTransactionCount++; } - - searchIndex++; } - *matchIndex = searchIndex; + MemoryContextSwitchTo(oldContext); + MemoryContextDelete(localContext); - if (compareResult == 0) - { - foundMatchingName = true; - } - - return foundMatchingName; + return recoveredTransactionCount; } @@ -420,113 +359,73 @@ PendingWorkerTransactionList(MultiConnection *connection) /* - * UnconfirmedWorkerTransactionList returns a list of unconfirmed transactions - * for a group of workers from pg_dist_transaction. A transaction is confirmed - * once we have verified that it does not exist in pg_prepared_xacts on the - * remote node and the entry in pg_dist_transaction is removed. + * IsTransactionInProgress returns whether the distributed transaction to which + * preparedTransactionName belongs is still in progress, or false if the + * transaction name cannot be parsed. This can happen when the user manually + * inserts into pg_dist_transaction. */ -static List * -UnconfirmedWorkerTransactionsList(int groupId) +static bool +IsTransactionInProgress(HTAB *activeTransactionNumberSet, char *preparedTransactionName) { - List *transactionNameList = NIL; - Relation pgDistTransaction = NULL; - SysScanDesc scanDescriptor = NULL; - ScanKeyData scanKey[1]; - int scanKeyCount = 1; - bool indexOK = true; - HeapTuple heapTuple = NULL; + int groupId = 0; + int procId = 0; + uint32 connectionNumber = 0; + uint64 transactionNumber = 0; + bool isValidName = false; + bool isTransactionInProgress = false; - pgDistTransaction = heap_open(DistTransactionRelationId(), AccessShareLock); - - ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid, - BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId)); - - scanDescriptor = systable_beginscan(pgDistTransaction, - DistTransactionGroupIndexId(), indexOK, - NULL, scanKeyCount, scanKey); - - heapTuple = systable_getnext(scanDescriptor); - while (HeapTupleIsValid(heapTuple)) + isValidName = ParsePreparedTransactionName(preparedTransactionName, &groupId, &procId, + &transactionNumber, &connectionNumber); + if (isValidName) { - TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction); - bool isNull = false; - - Datum transactionNameDatum = heap_getattr(heapTuple, - Anum_pg_dist_transaction_gid, - tupleDescriptor, &isNull); - - char *transactionName = TextDatumGetCString(transactionNameDatum); - transactionNameList = lappend(transactionNameList, transactionName); - - heapTuple = systable_getnext(scanDescriptor); + hash_search(activeTransactionNumberSet, &transactionNumber, HASH_FIND, + &isTransactionInProgress); } - systable_endscan(scanDescriptor); - heap_close(pgDistTransaction, AccessShareLock); - - return transactionNameList; + return isTransactionInProgress; } /* - * DeleteTransactionRecord opens the pg_dist_transaction system catalog, finds the - * first (unique) row that corresponds to the given transactionName and worker node, - * and deletes this row. + * RecoverPreparedTransactionOnWorker recovers a single prepared transaction over + * the given connection. If shouldCommit is true we send */ -static void -DeleteTransactionRecord(int32 groupId, char *transactionName) +static bool +RecoverPreparedTransactionOnWorker(MultiConnection *connection, char *transactionName, + bool shouldCommit) { - Relation pgDistTransaction = NULL; - SysScanDesc scanDescriptor = NULL; - ScanKeyData scanKey[2]; - int scanKeyCount = 2; - bool indexOK = true; - HeapTuple heapTuple = NULL; - bool heapTupleFound = false; + StringInfo command = makeStringInfo(); + PGresult *result = NULL; + int executeCommand = 0; + bool raiseInterrupts = false; - pgDistTransaction = heap_open(DistTransactionRelationId(), RowExclusiveLock); - - ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid, - BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId)); - ScanKeyInit(&scanKey[1], Anum_pg_dist_transaction_gid, - BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(transactionName)); - - scanDescriptor = systable_beginscan(pgDistTransaction, - DistTransactionRecordIndexId(), indexOK, - NULL, scanKeyCount, scanKey); - - heapTuple = systable_getnext(scanDescriptor); - while (HeapTupleIsValid(heapTuple)) + if (shouldCommit) { - TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction); - bool isNull = false; - - Datum gidDatum = heap_getattr(heapTuple, - Anum_pg_dist_transaction_gid, - tupleDescriptor, &isNull); - - char *gid = TextDatumGetCString(gidDatum); - - if (strncmp(transactionName, gid, NAMEDATALEN) == 0) - { - heapTupleFound = true; - break; - } - - heapTuple = systable_getnext(scanDescriptor); + /* should have committed this prepared transaction */ + appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName); + } + else + { + /* should have aborted this prepared transaction */ + appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName); } - /* if we couldn't find the transaction record to delete, error out */ - if (!heapTupleFound) + executeCommand = ExecuteOptionalRemoteCommand(connection, command->data, &result); + if (executeCommand == QUERY_SEND_FAILED) { - ereport(ERROR, (errmsg("could not find valid entry for transaction record " - "'%s' in group %d", - transactionName, groupId))); + return false; + } + if (executeCommand == RESPONSE_NOT_OKAY) + { + return false; } - simple_heap_delete(pgDistTransaction, &heapTuple->t_self); - CommandCounterIncrement(); + PQclear(result); + ClearResults(connection, raiseInterrupts); - systable_endscan(scanDescriptor); - heap_close(pgDistTransaction, RowExclusiveLock); + ereport(LOG, (errmsg("recovered a prepared transaction on %s:%d", + connection->hostname, connection->port), + errcontext("%s", command->data))); + + return true; } diff --git a/src/backend/distributed/utils/listutils.c b/src/backend/distributed/utils/listutils.c index 56224eaac..1735ff3e9 100644 --- a/src/backend/distributed/utils/listutils.c +++ b/src/backend/distributed/utils/listutils.c @@ -102,3 +102,49 @@ DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId) return arrayObject; } + + +/* + * ListToHashSet creates a hash table in which the keys are copied from + * from itemList and the values are the same as the keys. This can + * be used for fast lookups of the presence of a byte array in a set. + * + * If isStringList is true, then look-ups are performed through string + * comparison of strings up to keySize in length. If isStringList is + * false, then look-ups are performed by comparing exactly keySize + * bytes pointed to by the pointers in itemList. + */ +HTAB * +ListToHashSet(List *itemList, Size keySize, bool isStringList) +{ + HASHCTL info; + HTAB *itemSet = NULL; + ListCell *itemCell = NULL; + int flags = HASH_ELEM; + + /* allocate sufficient capacity for O(1) expected look-up time */ + int capacity = (int) (list_length(itemList) / 0.75) + 1; + + /* initialise the hash table for looking up keySize bytes */ + memset(&info, 0, sizeof(info)); + info.keysize = keySize; + info.entrysize = keySize; + + if (!isStringList) + { + info.hash = tag_hash; + flags |= HASH_FUNCTION; + } + + itemSet = hash_create("ListToHashSet", capacity, &info, flags); + + foreach(itemCell, itemList) + { + void *item = lfirst(itemCell); + bool foundInSet = false; + + hash_search(itemSet, item, HASH_ENTER, &foundInSet); + } + + return itemSet; +} diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index 2231885ef..5388f8d42 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -43,5 +43,6 @@ extern void AssignDistributedTransactionId(void); extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); extern void CancelTransactionDueToDeadlock(PGPROC *proc); extern bool MyBackendGotCancelledDueToDeadlock(void); +extern List * ActiveDistributedTransactionNumbers(void); #endif /* BACKEND_DATA_H */ diff --git a/src/include/distributed/listutils.h b/src/include/distributed/listutils.h index 4d205f046..f08807190 100644 --- a/src/include/distributed/listutils.h +++ b/src/include/distributed/listutils.h @@ -17,6 +17,7 @@ #include "nodes/pg_list.h" #include "utils/array.h" +#include "utils/hsearch.h" /* utility functions declaration shared within this module */ @@ -25,5 +26,6 @@ extern List * SortList(List *pointerList, extern void ** PointerArrayFromList(List *pointerList); extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId); +extern HTAB * ListToHashSet(List *pointerList, Size keySize, bool isStringList); #endif /* CITUS_LISTUTILS_H */ diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h index 30ab3bb92..d3bd2ccf2 100644 --- a/src/include/distributed/remote_transaction.h +++ b/src/include/distributed/remote_transaction.h @@ -79,6 +79,11 @@ typedef struct RemoteTransaction } RemoteTransaction; +/* utility functions for dealing with remote transactions */ +extern bool ParsePreparedTransactionName(char *preparedTransactionName, int *groupId, + int *procId, uint64 *transactionNumber, + uint32 *connectionNumber); + /* change an individual remote transaction's state */ extern void StartRemoteTransactionBegin(struct MultiConnection *connection); extern void FinishRemoteTransactionBegin(struct MultiConnection *connection); diff --git a/src/test/regress/expected/isolation_transaction_recovery.out b/src/test/regress/expected/isolation_transaction_recovery.out index b1d007401..5d3a12121 100644 --- a/src/test/regress/expected/isolation_transaction_recovery.out +++ b/src/test/regress/expected/isolation_transaction_recovery.out @@ -15,8 +15,7 @@ recover_prepared_transactions 0 step s2-insert: INSERT INTO test_transaction_recovery VALUES (1,2); - + step s1-commit: COMMIT; -step s2-insert: <... completed> diff --git a/src/test/regress/expected/multi_mx_metadata.out b/src/test/regress/expected/multi_mx_metadata.out index 5e3e01700..004e04d91 100644 --- a/src/test/regress/expected/multi_mx_metadata.out +++ b/src/test/regress/expected/multi_mx_metadata.out @@ -259,12 +259,6 @@ SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport = :worker_1_po INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_commit'); INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_be_forgotten'); SELECT recover_prepared_transactions(); -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: ROLLBACK PREPARED 'citus_0_should_abort' -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: ROLLBACK PREPARED 'citus_0_should_be_sorted_into_middle' -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: COMMIT PREPARED 'citus_0_should_commit' recover_prepared_transactions ------------------------------- 3 diff --git a/src/test/regress/expected/multi_mx_transaction_recovery.out b/src/test/regress/expected/multi_mx_transaction_recovery.out index 4e3e33e5e..adc8c4bbb 100644 --- a/src/test/regress/expected/multi_mx_transaction_recovery.out +++ b/src/test/regress/expected/multi_mx_transaction_recovery.out @@ -46,12 +46,6 @@ PREPARE TRANSACTION 'citus_12_should_be_sorted_into_middle'; INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_commit'); INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_be_forgotten'); SELECT recover_prepared_transactions(); -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: ROLLBACK PREPARED 'citus_12_should_abort' -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: ROLLBACK PREPARED 'citus_12_should_be_sorted_into_middle' -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: COMMIT PREPARED 'citus_12_should_commit' recover_prepared_transactions ------------------------------- 3 diff --git a/src/test/regress/expected/multi_transaction_recovery.out b/src/test/regress/expected/multi_transaction_recovery.out index 3643ee8b3..245160e9b 100644 --- a/src/test/regress/expected/multi_transaction_recovery.out +++ b/src/test/regress/expected/multi_transaction_recovery.out @@ -28,12 +28,6 @@ PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle'; INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit'); INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten'); SELECT recover_prepared_transactions(); -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: ROLLBACK PREPARED 'citus_0_should_abort' -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: ROLLBACK PREPARED 'citus_0_should_be_sorted_into_middle' -NOTICE: recovered a prepared transaction on localhost:57637 -CONTEXT: COMMIT PREPARED 'citus_0_should_commit' recover_prepared_transactions ------------------------------- 3 diff --git a/src/test/regress/specs/isolation_transaction_recovery.spec b/src/test/regress/specs/isolation_transaction_recovery.spec index 3bdc0c9bd..019f1931d 100644 --- a/src/test/regress/specs/isolation_transaction_recovery.spec +++ b/src/test/regress/specs/isolation_transaction_recovery.spec @@ -33,5 +33,5 @@ step "s2-insert" INSERT INTO test_transaction_recovery VALUES (1,2); } -# Recovery and 2PCs should block each other +# Recovery and 2PCs should not block each other permutation "s1-begin" "s1-recover" "s2-insert" "s1-commit"