Merge pull request #1696 from citusdata/fast_recovery

Rewrite recover_prepared_transactions to be faster, non-blocking
pull/1817/head
Marco Slot 2017-11-20 13:50:32 +00:00 committed by GitHub
commit 7b3b59c278
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 437 additions and 301 deletions

View File

@ -640,3 +640,52 @@ MyBackendGotCancelledDueToDeadlock(void)
return cancelledDueToDeadlock; return cancelledDueToDeadlock;
} }
/*
* ActiveDistributedTransactionNumbers returns a list of pointers to
* transaction numbers of distributed transactions that are in progress
* and were started by the node on which it is called.
*/
List *
ActiveDistributedTransactionNumbers(void)
{
List *activeTransactionNumberList = NIL;
int curBackend = 0;
/* build list of starting procs */
for (curBackend = 0; curBackend < MaxBackends; curBackend++)
{
PGPROC *currentProc = &ProcGlobal->allProcs[curBackend];
BackendData currentBackendData;
uint64 *transactionNumber = NULL;
if (currentProc->pid == 0)
{
/* unused PGPROC slot */
continue;
}
GetBackendDataForProc(currentProc, &currentBackendData);
if (!IsInDistributedTransaction(&currentBackendData))
{
/* not a distributed transaction */
continue;
}
if (!currentBackendData.transactionId.transactionOriginator)
{
/* not a coordinator process */
continue;
}
transactionNumber = (uint64 *) palloc0(sizeof(uint64));
*transactionNumber = currentBackendData.transactionId.transactionNumber;
activeTransactionNumberList = lappend(activeTransactionNumberList,
transactionNumber);
}
return activeTransactionNumberList;
}

View File

@ -27,6 +27,9 @@
#include "utils/hsearch.h" #include "utils/hsearch.h"
#define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u"
static void StartRemoteTransactionSavepointBegin(MultiConnection *connection, static void StartRemoteTransactionSavepointBegin(MultiConnection *connection,
SubTransactionId subId); SubTransactionId subId);
static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection, static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection,
@ -1241,11 +1244,36 @@ Assign2PCIdentifier(MultiConnection *connection)
/* print all numbers as unsigned to guarantee no minus symbols appear in the name */ /* print all numbers as unsigned to guarantee no minus symbols appear in the name */
snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN, snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN,
"citus_%u_%u_"UINT64_FORMAT "_%u", GetLocalGroupId(), MyProcPid, PREPARED_TRANSACTION_NAME_FORMAT, GetLocalGroupId(), MyProcPid,
transactionNumber, connectionNumber++); transactionNumber, connectionNumber++);
} }
/*
* ParsePreparedTransactionName parses a prepared transaction name to extract
* the initiator group ID, initiator process ID, distributed transaction number,
* and the connection number. If the transaction name does not match the expected
* format ParsePreparedTransactionName returns false, and true otherwise.
*/
bool
ParsePreparedTransactionName(char *preparedTransactionName, int *groupId, int *procId,
uint64 *transactionNumber, uint32 *connectionNumber)
{
const int expectedFieldCount = 4;
int parsedFieldCount = 0;
bool nameValid = false;
parsedFieldCount = sscanf(preparedTransactionName, PREPARED_TRANSACTION_NAME_FORMAT,
groupId, procId, transactionNumber, connectionNumber);
if (parsedFieldCount == expectedFieldCount)
{
nameValid = true;
}
return nameValid;
}
/* /*
* WarnAboutLeakedPreparedTransaction issues a WARNING explaining that a * WarnAboutLeakedPreparedTransaction issues a WARNING explaining that a
* prepared transaction could not be committed or rolled back, and explains * prepared transaction could not be committed or rolled back, and explains

View File

@ -24,6 +24,7 @@
#include "access/relscan.h" #include "access/relscan.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "distributed/backend_data.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -47,13 +48,11 @@ PG_FUNCTION_INFO_V1(recover_prepared_transactions);
/* Local functions forward declarations */ /* Local functions forward declarations */
static int RecoverPreparedTransactions(void); static int RecoverPreparedTransactions(void);
static int RecoverWorkerTransactions(WorkerNode *workerNode); static int RecoverWorkerTransactions(WorkerNode *workerNode);
static List * NameListDifference(List *nameList, List *subtractList);
static int CompareNames(const void *leftPointer, const void *rightPointer);
static bool FindMatchingName(char **nameArray, int nameCount, char *needle,
int *matchIndex);
static List * PendingWorkerTransactionList(MultiConnection *connection); static List * PendingWorkerTransactionList(MultiConnection *connection);
static List * UnconfirmedWorkerTransactionsList(int groupId); static bool IsTransactionInProgress(HTAB *activeTransactionNumberSet,
static void DeleteTransactionRecord(int32 groupId, char *transactionName); char *preparedTransactionName);
static bool RecoverPreparedTransactionOnWorker(MultiConnection *connection,
char *transactionName, bool shouldCommit);
/* /*
@ -120,13 +119,6 @@ RecoverPreparedTransactions(void)
ListCell *workerNodeCell = NULL; ListCell *workerNodeCell = NULL;
int recoveredTransactionCount = 0; int recoveredTransactionCount = 0;
/*
* We block here if metadata transactions are ongoing, since we
* mustn't commit/abort their prepared transactions under their
* feet. We also prevent concurrent recovery.
*/
LockRelationOid(DistTransactionRelationId(), ExclusiveLock);
workerList = ActivePrimaryNodeList(); workerList = ActivePrimaryNodeList();
foreach(workerNodeCell, workerList) foreach(workerNodeCell, workerList)
@ -153,25 +145,35 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
char *nodeName = workerNode->workerName; char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort; int nodePort = workerNode->workerPort;
List *activeTransactionNumberList = NIL;
HTAB *activeTransactionNumberSet = NULL;
List *pendingTransactionList = NIL; List *pendingTransactionList = NIL;
ListCell *pendingTransactionCell = NULL; HTAB *pendingTransactionSet = NULL;
List *recheckTransactionList = NIL;
HTAB *recheckTransactionSet = NULL;
List *unconfirmedTransactionList = NIL; Relation pgDistTransaction = NULL;
char **unconfirmedTransactionArray = NULL; SysScanDesc scanDescriptor = NULL;
int unconfirmedTransactionCount = 0; ScanKeyData scanKey[1];
int unconfirmedTransactionIndex = 0; int scanKeyCount = 1;
bool indexOK = true;
HeapTuple heapTuple = NULL;
TupleDesc tupleDescriptor = NULL;
List *committedTransactionList = NIL; HASH_SEQ_STATUS status;
ListCell *committedTransactionCell = NULL;
MemoryContext localContext = NULL; MemoryContext localContext = NULL;
MemoryContext oldContext = NULL; MemoryContext oldContext = NULL;
bool recoveryFailed = false;
int connectionFlags = SESSION_LIFESPAN; int connectionFlags = SESSION_LIFESPAN;
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
if (connection->pgConn == NULL) if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK)
{ {
/* cannot recover transactions on this worker right now */ ereport(WARNING, (errmsg("transaction recovery cannot connect to %s:%d",
nodeName, nodePort)));
return 0; return 0;
} }
@ -182,191 +184,221 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
oldContext = MemoryContextSwitchTo(localContext); oldContext = MemoryContextSwitchTo(localContext);
/* find transactions that were committed, but not yet confirmed */ /* take table lock first to avoid running concurrently */
unconfirmedTransactionList = UnconfirmedWorkerTransactionsList(groupId); pgDistTransaction = heap_open(DistTransactionRelationId(), ShareUpdateExclusiveLock);
unconfirmedTransactionList = SortList(unconfirmedTransactionList, CompareNames); tupleDescriptor = RelationGetDescr(pgDistTransaction);
/* convert list to an array to use with FindMatchingNames */ /*
unconfirmedTransactionCount = list_length(unconfirmedTransactionList); * We're going to check the list of prepared transactions on the worker,
unconfirmedTransactionArray = * but some of those prepared transactions might belong to ongoing
(char **) PointerArrayFromList(unconfirmedTransactionList); * distributed transactions.
*
* We could avoid this by temporarily blocking new prepared transactions
* from being created by taking an ExlusiveLock on pg_dist_transaction.
* However, this hurts write performance, so instead we avoid blocking
* by consulting the list of active distributed transactions, and follow
* a carefully chosen order to avoid race conditions:
*
* 1) P = prepared transactions on worker
* 2) A = active distributed transactions
* 3) T = pg_dist_transaction snapshot
* 4) Q = prepared transactions on worker
*
* By observing A after P, we get a conclusive answer to which distributed
* transactions we observed in P are still in progress. It is safe to recover
* the transactions in P - A based on the presence or absence of a record
* in T.
*
* We also remove records in T if there is no prepared transaction, which
* we assume means the transaction committed. However, a transaction could
* have left prepared transactions and committed between steps 1 and 2.
* In that case, we would incorrectly remove the records, while the
* prepared transaction is still in place.
*
* We therefore observe the set of prepared transactions one more time in
* step 4. The aforementioned transactions would show up in Q, but not in
* P. We can skip those transactions and recover them later.
*/
/* find stale prepared transactions on the remote node */ /* find stale prepared transactions on the remote node */
pendingTransactionList = PendingWorkerTransactionList(connection); pendingTransactionList = PendingWorkerTransactionList(connection);
pendingTransactionList = SortList(pendingTransactionList, CompareNames); pendingTransactionSet = ListToHashSet(pendingTransactionList, NAMEDATALEN, true);
/* /* find in-progress distributed transactions */
* Transactions that have no pending prepared transaction are assumed to activeTransactionNumberList = ActiveDistributedTransactionNumbers();
* have been committed. Any records in unconfirmedTransactionList that activeTransactionNumberSet = ListToHashSet(activeTransactionNumberList,
* don't have a transaction in pendingTransactionList can be removed. sizeof(uint64), false);
*/
committedTransactionList = NameListDifference(unconfirmedTransactionList,
pendingTransactionList);
/* /* scan through all recovery records of the current worker */
* For each pending prepared transaction, check whether there is a transaction ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid,
* record. If so, commit. If not, the transaction that started the transaction BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
* must have rolled back and thus the prepared transaction should be aborted.
*/ /* get a snapshot of pg_dist_transaction */
foreach(pendingTransactionCell, pendingTransactionList) scanDescriptor = systable_beginscan(pgDistTransaction,
DistTransactionGroupIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
/* find stale prepared transactions on the remote node */
recheckTransactionList = PendingWorkerTransactionList(connection);
recheckTransactionSet = ListToHashSet(recheckTransactionList, NAMEDATALEN, true);
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{ {
char *transactionName = (char *) lfirst(pendingTransactionCell); bool isNull = false;
StringInfo command = makeStringInfo(); bool isTransactionInProgress = false;
int executeCommand = 0; bool foundPreparedTransactionBeforeCommit = false;
PGresult *result = NULL; bool foundPreparedTransactionAfterCommit = false;
bool shouldCommit = FindMatchingName(unconfirmedTransactionArray, Datum transactionNameDatum = heap_getattr(heapTuple,
unconfirmedTransactionCount, Anum_pg_dist_transaction_gid,
transactionName, tupleDescriptor, &isNull);
&unconfirmedTransactionIndex); char *transactionName = TextDatumGetCString(transactionNameDatum);
if (shouldCommit) isTransactionInProgress = IsTransactionInProgress(activeTransactionNumberSet,
transactionName);
if (isTransactionInProgress)
{ {
/* should have committed this prepared transaction */ /*
appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName); * Do not touch in progress transactions as we might mistakenly
} * commit a transaction that is actually in the process of
else * aborting or vice-versa.
{ */
/* no record of this prepared transaction, abort */
appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName);
}
executeCommand = ExecuteOptionalRemoteCommand(connection, command->data, &result);
if (executeCommand == QUERY_SEND_FAILED)
{
break;
}
if (executeCommand == RESPONSE_NOT_OKAY)
{
/* cannot recover this transaction right now */
continue; continue;
} }
PQclear(result); /*
ForgetResults(connection); * Remove the transaction from the pending list such that only transactions
* that need to be aborted remain at the end.
*/
hash_search(pendingTransactionSet, transactionName, HASH_REMOVE,
&foundPreparedTransactionBeforeCommit);
ereport(NOTICE, (errmsg("recovered a prepared transaction on %s:%d", hash_search(recheckTransactionSet, transactionName, HASH_FIND,
nodeName, nodePort), &foundPreparedTransactionAfterCommit);
errcontext("%s", command->data)));
if (shouldCommit) if (foundPreparedTransactionBeforeCommit && foundPreparedTransactionAfterCommit)
{
committedTransactionList = lappend(committedTransactionList,
transactionName);
}
recoveredTransactionCount += 1;
}
/* we can remove the transaction records of confirmed transactions */
foreach(committedTransactionCell, committedTransactionList)
{
char *transactionName = (char *) lfirst(committedTransactionCell);
DeleteTransactionRecord(groupId, transactionName);
}
MemoryContextReset(localContext);
MemoryContextSwitchTo(oldContext);
return recoveredTransactionCount;
}
/*
* NameListDifference returns the difference between the bag of
* names in nameList and subtractList. Both are assumed to be
* sorted. We cannot use list_difference_ptr here since we need
* to compare the actual strings.
*/
static List *
NameListDifference(List *nameList, List *subtractList)
{
List *differenceList = NIL;
ListCell *nameCell = NULL;
int subtractIndex = 0;
int subtractCount = list_length(subtractList);
char **subtractArray = (char **) PointerArrayFromList(subtractList);
foreach(nameCell, nameList)
{
char *baseName = (char *) lfirst(nameCell);
bool nameFound = FindMatchingName(subtractArray, subtractCount,
baseName, &subtractIndex);
if (!nameFound)
{ {
/* /*
* baseName is not in subtractArray and thus included * The transaction was committed, but the prepared transaction still exists
* in the difference. * on the worker. Try committing it.
*
* We double check that the recovery record exists both before and after
* checking ActiveDistributedTransactionNumbers(), since we may have
* observed a prepared transaction that was committed immediately after.
*/
bool shouldCommit = true;
bool commitSucceeded = RecoverPreparedTransactionOnWorker(connection,
transactionName,
shouldCommit);
if (!commitSucceeded)
{
/*
* Failed to commit on the current worker. Stop without throwing
* an error to allow recover_prepared_transactions to continue with
* other workers.
*/
recoveryFailed = true;
break;
}
recoveredTransactionCount++;
/*
* We successfully committed the prepared transaction, safe to delete
* the recovery record.
*/ */
differenceList = lappend(differenceList, baseName);
} }
} else if (foundPreparedTransactionAfterCommit)
pfree(subtractArray);
return differenceList;
}
/*
* CompareNames compares names using strncmp. Its signature allows it to
* be used in qsort.
*/
static int
CompareNames(const void *leftPointer, const void *rightPointer)
{
const char *leftString = *((char **) leftPointer);
const char *rightString = *((char **) rightPointer);
int nameCompare = strncmp(leftString, rightString, NAMEDATALEN);
return nameCompare;
}
/*
* FindMatchingName searches for name in nameArray, starting at the
* value pointed to by matchIndex and stopping at the first index of
* name which is greater or equal to needle. nameArray is assumed
* to be sorted.
*
* The function sets matchIndex to the index of the name and returns
* true if the name is equal to needle. If matchIndex >= nameCount,
* then the function always returns false.
*/
static bool
FindMatchingName(char **nameArray, int nameCount, char *needle,
int *matchIndex)
{
bool foundMatchingName = false;
int searchIndex = *matchIndex;
int compareResult = -1;
while (searchIndex < nameCount)
{
char *testName = nameArray[searchIndex];
compareResult = strncmp(needle, testName, NAMEDATALEN);
if (compareResult <= 0)
{ {
break; /*
* We found a committed pg_dist_transaction record that initially did
* not have a prepared transaction, but did when we checked again.
*
* If a transaction started and committed just after we observed the
* set of prepared transactions, and just before we called
* ActiveDistributedTransactionNumbers, then we would see a recovery
* record without a prepared transaction in pendingTransactionSet,
* but there may be prepared transactions that failed to commit.
* We should not delete the records for those prepared transactions,
* since we would otherwise roll back them on the next call to
* recover_prepared_transactions.
*
* In addition, if the transaction started after the call to
* ActiveDistributedTransactionNumbers and finished just before our
* pg_dist_transaction snapshot, then it may still be in the process
* of comitting the prepared transactions in the post-commit callback
* and we should not touch the prepared transactions.
*
* To handle these cases, we just leave the records and prepared
* transactions for the next call to recover_prepared_transactions
* and skip them here.
*/
continue;
}
else
{
/*
* We found a recovery record without any prepared transaction. It
* must have already been committed, so it's safe to delete the
* recovery record.
*
* Transactions that started after we observed pendingTransactionSet,
* but successfully committed their prepared transactions before
* ActiveDistributedTransactionNumbers are indistinguishable from
* transactions that committed at an earlier time, in which case it's
* safe delete the recovery record as well.
*/
} }
searchIndex++; simple_heap_delete(pgDistTransaction, &heapTuple->t_self);
} }
*matchIndex = searchIndex; systable_endscan(scanDescriptor);
heap_close(pgDistTransaction, NoLock);
if (compareResult == 0) if (!recoveryFailed)
{ {
foundMatchingName = true; char *pendingTransactionName = NULL;
bool abortSucceeded = true;
/*
* All remaining prepared transactions that are not part of an in-progress
* distributed transaction should be aborted since we did not find a recovery
* record, which implies the disributed transaction aborted.
*/
hash_seq_init(&status, pendingTransactionSet);
while ((pendingTransactionName = hash_seq_search(&status)) != NULL)
{
bool isTransactionInProgress = false;
bool shouldCommit = false;
isTransactionInProgress = IsTransactionInProgress(activeTransactionNumberSet,
pendingTransactionName);
if (isTransactionInProgress)
{
continue;
}
shouldCommit = false;
abortSucceeded = RecoverPreparedTransactionOnWorker(connection,
pendingTransactionName,
shouldCommit);
if (!abortSucceeded)
{
hash_seq_term(&status);
break;
}
recoveredTransactionCount++;
}
} }
return foundMatchingName; MemoryContextSwitchTo(oldContext);
MemoryContextDelete(localContext);
return recoveredTransactionCount;
} }
@ -420,113 +452,73 @@ PendingWorkerTransactionList(MultiConnection *connection)
/* /*
* UnconfirmedWorkerTransactionList returns a list of unconfirmed transactions * IsTransactionInProgress returns whether the distributed transaction to which
* for a group of workers from pg_dist_transaction. A transaction is confirmed * preparedTransactionName belongs is still in progress, or false if the
* once we have verified that it does not exist in pg_prepared_xacts on the * transaction name cannot be parsed. This can happen when the user manually
* remote node and the entry in pg_dist_transaction is removed. * inserts into pg_dist_transaction.
*/ */
static List * static bool
UnconfirmedWorkerTransactionsList(int groupId) IsTransactionInProgress(HTAB *activeTransactionNumberSet, char *preparedTransactionName)
{ {
List *transactionNameList = NIL; int groupId = 0;
Relation pgDistTransaction = NULL; int procId = 0;
SysScanDesc scanDescriptor = NULL; uint32 connectionNumber = 0;
ScanKeyData scanKey[1]; uint64 transactionNumber = 0;
int scanKeyCount = 1; bool isValidName = false;
bool indexOK = true; bool isTransactionInProgress = false;
HeapTuple heapTuple = NULL;
pgDistTransaction = heap_open(DistTransactionRelationId(), AccessShareLock); isValidName = ParsePreparedTransactionName(preparedTransactionName, &groupId, &procId,
&transactionNumber, &connectionNumber);
ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid, if (isValidName)
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
scanDescriptor = systable_beginscan(pgDistTransaction,
DistTransactionGroupIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{ {
TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction); hash_search(activeTransactionNumberSet, &transactionNumber, HASH_FIND,
bool isNull = false; &isTransactionInProgress);
Datum transactionNameDatum = heap_getattr(heapTuple,
Anum_pg_dist_transaction_gid,
tupleDescriptor, &isNull);
char *transactionName = TextDatumGetCString(transactionNameDatum);
transactionNameList = lappend(transactionNameList, transactionName);
heapTuple = systable_getnext(scanDescriptor);
} }
systable_endscan(scanDescriptor); return isTransactionInProgress;
heap_close(pgDistTransaction, AccessShareLock);
return transactionNameList;
} }
/* /*
* DeleteTransactionRecord opens the pg_dist_transaction system catalog, finds the * RecoverPreparedTransactionOnWorker recovers a single prepared transaction over
* first (unique) row that corresponds to the given transactionName and worker node, * the given connection. If shouldCommit is true we send
* and deletes this row.
*/ */
static void static bool
DeleteTransactionRecord(int32 groupId, char *transactionName) RecoverPreparedTransactionOnWorker(MultiConnection *connection, char *transactionName,
bool shouldCommit)
{ {
Relation pgDistTransaction = NULL; StringInfo command = makeStringInfo();
SysScanDesc scanDescriptor = NULL; PGresult *result = NULL;
ScanKeyData scanKey[2]; int executeCommand = 0;
int scanKeyCount = 2; bool raiseInterrupts = false;
bool indexOK = true;
HeapTuple heapTuple = NULL;
bool heapTupleFound = false;
pgDistTransaction = heap_open(DistTransactionRelationId(), RowExclusiveLock); if (shouldCommit)
ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
ScanKeyInit(&scanKey[1], Anum_pg_dist_transaction_gid,
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(transactionName));
scanDescriptor = systable_beginscan(pgDistTransaction,
DistTransactionRecordIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{ {
TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction); /* should have committed this prepared transaction */
bool isNull = false; appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName);
}
Datum gidDatum = heap_getattr(heapTuple, else
Anum_pg_dist_transaction_gid, {
tupleDescriptor, &isNull); /* should have aborted this prepared transaction */
appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName);
char *gid = TextDatumGetCString(gidDatum);
if (strncmp(transactionName, gid, NAMEDATALEN) == 0)
{
heapTupleFound = true;
break;
}
heapTuple = systable_getnext(scanDescriptor);
} }
/* if we couldn't find the transaction record to delete, error out */ executeCommand = ExecuteOptionalRemoteCommand(connection, command->data, &result);
if (!heapTupleFound) if (executeCommand == QUERY_SEND_FAILED)
{ {
ereport(ERROR, (errmsg("could not find valid entry for transaction record " return false;
"'%s' in group %d", }
transactionName, groupId))); if (executeCommand == RESPONSE_NOT_OKAY)
{
return false;
} }
simple_heap_delete(pgDistTransaction, &heapTuple->t_self); PQclear(result);
CommandCounterIncrement(); ClearResults(connection, raiseInterrupts);
systable_endscan(scanDescriptor); ereport(LOG, (errmsg("recovered a prepared transaction on %s:%d",
heap_close(pgDistTransaction, RowExclusiveLock); connection->hostname, connection->port),
errcontext("%s", command->data)));
return true;
} }

View File

@ -102,3 +102,49 @@ DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId)
return arrayObject; return arrayObject;
} }
/*
* ListToHashSet creates a hash table in which the keys are copied from
* from itemList and the values are the same as the keys. This can
* be used for fast lookups of the presence of a byte array in a set.
* itemList should be a list of pointers.
*
* If isStringList is true, then look-ups are performed through string
* comparison of strings up to keySize in length. If isStringList is
* false, then look-ups are performed by comparing exactly keySize
* bytes pointed to by the pointers in itemList.
*/
HTAB *
ListToHashSet(List *itemList, Size keySize, bool isStringList)
{
HASHCTL info;
HTAB *itemSet = NULL;
ListCell *itemCell = NULL;
int flags = HASH_ELEM;
/* allocate sufficient capacity for O(1) expected look-up time */
int capacity = (int) (list_length(itemList) / 0.75) + 1;
/* initialise the hash table for looking up keySize bytes */
memset(&info, 0, sizeof(info));
info.keysize = keySize;
info.entrysize = keySize;
if (!isStringList)
{
flags |= HASH_BLOBS;
}
itemSet = hash_create("ListToHashSet", capacity, &info, flags);
foreach(itemCell, itemList)
{
void *item = lfirst(itemCell);
bool foundInSet = false;
hash_search(itemSet, item, HASH_ENTER, &foundInSet);
}
return itemSet;
}

View File

@ -43,5 +43,6 @@ extern void AssignDistributedTransactionId(void);
extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
extern void CancelTransactionDueToDeadlock(PGPROC *proc); extern void CancelTransactionDueToDeadlock(PGPROC *proc);
extern bool MyBackendGotCancelledDueToDeadlock(void); extern bool MyBackendGotCancelledDueToDeadlock(void);
extern List * ActiveDistributedTransactionNumbers(void);
#endif /* BACKEND_DATA_H */ #endif /* BACKEND_DATA_H */

View File

@ -17,6 +17,7 @@
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "utils/array.h" #include "utils/array.h"
#include "utils/hsearch.h"
/* utility functions declaration shared within this module */ /* utility functions declaration shared within this module */
@ -25,5 +26,6 @@ extern List * SortList(List *pointerList,
extern void ** PointerArrayFromList(List *pointerList); extern void ** PointerArrayFromList(List *pointerList);
extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount, extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount,
Oid datumTypeId); Oid datumTypeId);
extern HTAB * ListToHashSet(List *pointerList, Size keySize, bool isStringList);
#endif /* CITUS_LISTUTILS_H */ #endif /* CITUS_LISTUTILS_H */

View File

@ -79,6 +79,11 @@ typedef struct RemoteTransaction
} RemoteTransaction; } RemoteTransaction;
/* utility functions for dealing with remote transactions */
extern bool ParsePreparedTransactionName(char *preparedTransactionName, int *groupId,
int *procId, uint64 *transactionNumber,
uint32 *connectionNumber);
/* change an individual remote transaction's state */ /* change an individual remote transaction's state */
extern void StartRemoteTransactionBegin(struct MultiConnection *connection); extern void StartRemoteTransactionBegin(struct MultiConnection *connection);
extern void FinishRemoteTransactionBegin(struct MultiConnection *connection); extern void FinishRemoteTransactionBegin(struct MultiConnection *connection);

View File

@ -98,7 +98,7 @@ step s1-get-current-transaction-id:
row row
(0,287) (0,289)
step s2-get-first-worker-active-transactions: step s2-get-first-worker-active-transactions:
SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number) SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number)
FROM FROM
@ -109,4 +109,4 @@ step s2-get-first-worker-active-transactions:
nodename nodeport success result nodename nodeport success result
localhost 57637 t (0,287) localhost 57637 t (0,289)

View File

@ -29,11 +29,11 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
290 289 f 292 291 f
transactionnumberwaitingtransactionnumbers transactionnumberwaitingtransactionnumbers
289 291
290 289 292 291
step s1-abort: step s1-abort:
ABORT; ABORT;
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
294 293 f 296 295 f
295 293 f 297 295 f
295 294 t 297 296 t
transactionnumberwaitingtransactionnumbers transactionnumberwaitingtransactionnumbers
293 295
294 293 296 295
295 293,294 297 295,296
step s1-abort: step s1-abort:
ABORT; ABORT;

View File

@ -16,7 +16,7 @@ step s1-finish:
COMMIT; COMMIT;
step s2-insert: <... completed> step s2-insert: <... completed>
error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102320" error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102321"
step s2-finish: step s2-finish:
COMMIT; COMMIT;

View File

@ -15,8 +15,31 @@ recover_prepared_transactions
0 0
step s2-insert: step s2-insert:
INSERT INTO test_transaction_recovery VALUES (1,2); INSERT INTO test_transaction_recovery VALUES (1,2);
step s1-commit:
COMMIT;
starting permutation: s1-begin s1-recover s2-recover s1-commit
create_reference_table
step s1-begin:
BEGIN;
step s1-recover:
SELECT recover_prepared_transactions();
recover_prepared_transactions
0
step s2-recover:
SELECT recover_prepared_transactions();
<waiting ...> <waiting ...>
step s1-commit: step s1-commit:
COMMIT; COMMIT;
step s2-insert: <... completed> step s2-recover: <... completed>
recover_prepared_transactions
0

View File

@ -259,12 +259,6 @@ SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport = :worker_1_po
INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_commit'); INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_commit');
INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_be_forgotten'); INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_be_forgotten');
SELECT recover_prepared_transactions(); SELECT recover_prepared_transactions();
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: ROLLBACK PREPARED 'citus_0_should_abort'
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: ROLLBACK PREPARED 'citus_0_should_be_sorted_into_middle'
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: COMMIT PREPARED 'citus_0_should_commit'
recover_prepared_transactions recover_prepared_transactions
------------------------------- -------------------------------
3 3

View File

@ -46,12 +46,6 @@ PREPARE TRANSACTION 'citus_12_should_be_sorted_into_middle';
INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_commit'); INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_commit');
INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_be_forgotten'); INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_be_forgotten');
SELECT recover_prepared_transactions(); SELECT recover_prepared_transactions();
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: ROLLBACK PREPARED 'citus_12_should_abort'
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: ROLLBACK PREPARED 'citus_12_should_be_sorted_into_middle'
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: COMMIT PREPARED 'citus_12_should_commit'
recover_prepared_transactions recover_prepared_transactions
------------------------------- -------------------------------
3 3

View File

@ -28,12 +28,6 @@ PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle';
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit'); INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit');
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten'); INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten');
SELECT recover_prepared_transactions(); SELECT recover_prepared_transactions();
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: ROLLBACK PREPARED 'citus_0_should_abort'
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: ROLLBACK PREPARED 'citus_0_should_be_sorted_into_middle'
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: COMMIT PREPARED 'citus_0_should_commit'
recover_prepared_transactions recover_prepared_transactions
------------------------------- -------------------------------
3 3

View File

@ -33,5 +33,13 @@ step "s2-insert"
INSERT INTO test_transaction_recovery VALUES (1,2); INSERT INTO test_transaction_recovery VALUES (1,2);
} }
# Recovery and 2PCs should block each other step "s2-recover"
{
SELECT recover_prepared_transactions();
}
# Recovery and 2PCs should not block each other
permutation "s1-begin" "s1-recover" "s2-insert" "s1-commit" permutation "s1-begin" "s1-recover" "s2-insert" "s1-commit"
# Recovery should not run concurrently
permutation "s1-begin" "s1-recover" "s2-recover" "s1-commit"