mirror of https://github.com/citusdata/citus.git
Rewrite recover_prepared_transactions to be fast, non-blocking
parent
785d94e828
commit
2410c2e450
|
@ -640,3 +640,45 @@ MyBackendGotCancelledDueToDeadlock(void)
|
||||||
|
|
||||||
return cancelledDueToDeadlock;
|
return cancelledDueToDeadlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ActiveDistributedTransactionNumbers returns a list of pointers to
|
||||||
|
* transaction numbers of distributed transactions that are in progress.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
ActiveDistributedTransactionNumbers(void)
|
||||||
|
{
|
||||||
|
List *activeTransactionNumberList = NIL;
|
||||||
|
int curBackend = 0;
|
||||||
|
|
||||||
|
/* build list of starting procs */
|
||||||
|
for (curBackend = 0; curBackend < MaxBackends; curBackend++)
|
||||||
|
{
|
||||||
|
PGPROC *currentProc = &ProcGlobal->allProcs[curBackend];
|
||||||
|
BackendData currentBackendData;
|
||||||
|
uint64 *transactionNumber = NULL;
|
||||||
|
|
||||||
|
if (currentProc->pid == 0)
|
||||||
|
{
|
||||||
|
/* unused PGPROC slot */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
GetBackendDataForProc(currentProc, ¤tBackendData);
|
||||||
|
|
||||||
|
if (!IsInDistributedTransaction(¤tBackendData))
|
||||||
|
{
|
||||||
|
/* not a distributed transaction */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
transactionNumber = (uint64 *) palloc0(sizeof(uint64));
|
||||||
|
*transactionNumber = currentBackendData.transactionId.transactionNumber;
|
||||||
|
|
||||||
|
activeTransactionNumberList = lappend(activeTransactionNumberList,
|
||||||
|
transactionNumber);
|
||||||
|
}
|
||||||
|
|
||||||
|
return activeTransactionNumberList;
|
||||||
|
}
|
||||||
|
|
|
@ -27,6 +27,9 @@
|
||||||
#include "utils/hsearch.h"
|
#include "utils/hsearch.h"
|
||||||
|
|
||||||
|
|
||||||
|
#define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u"
|
||||||
|
|
||||||
|
|
||||||
static void StartRemoteTransactionSavepointBegin(MultiConnection *connection,
|
static void StartRemoteTransactionSavepointBegin(MultiConnection *connection,
|
||||||
SubTransactionId subId);
|
SubTransactionId subId);
|
||||||
static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection,
|
static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection,
|
||||||
|
@ -1241,11 +1244,36 @@ Assign2PCIdentifier(MultiConnection *connection)
|
||||||
|
|
||||||
/* print all numbers as unsigned to guarantee no minus symbols appear in the name */
|
/* print all numbers as unsigned to guarantee no minus symbols appear in the name */
|
||||||
snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN,
|
snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN,
|
||||||
"citus_%u_%u_"UINT64_FORMAT "_%u", GetLocalGroupId(), MyProcPid,
|
PREPARED_TRANSACTION_NAME_FORMAT, GetLocalGroupId(), MyProcPid,
|
||||||
transactionNumber, connectionNumber++);
|
transactionNumber, connectionNumber++);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ParsePreparedTransactionName parses a prepared transaction name to extract
|
||||||
|
* the initiator group ID, initiator process ID, distributed transaction number,
|
||||||
|
* and the connection number. If the transaction name does not match the expected
|
||||||
|
* format ParsePreparedTransactionName returns false, and true otherwise.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
ParsePreparedTransactionName(char *preparedTransactionName, int *groupId, int *procId,
|
||||||
|
uint64 *transactionNumber, uint32 *connectionNumber)
|
||||||
|
{
|
||||||
|
const int expectedFieldCount = 4;
|
||||||
|
int parsedFieldCount = 0;
|
||||||
|
bool nameValid = false;
|
||||||
|
|
||||||
|
parsedFieldCount = sscanf(preparedTransactionName, PREPARED_TRANSACTION_NAME_FORMAT,
|
||||||
|
groupId, procId, transactionNumber, connectionNumber);
|
||||||
|
if (parsedFieldCount == expectedFieldCount)
|
||||||
|
{
|
||||||
|
nameValid = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return nameValid;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WarnAboutLeakedPreparedTransaction issues a WARNING explaining that a
|
* WarnAboutLeakedPreparedTransaction issues a WARNING explaining that a
|
||||||
* prepared transaction could not be committed or rolled back, and explains
|
* prepared transaction could not be committed or rolled back, and explains
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include "access/relscan.h"
|
#include "access/relscan.h"
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
#include "catalog/indexing.h"
|
#include "catalog/indexing.h"
|
||||||
|
#include "distributed/backend_data.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
@ -47,13 +48,11 @@ PG_FUNCTION_INFO_V1(recover_prepared_transactions);
|
||||||
/* Local functions forward declarations */
|
/* Local functions forward declarations */
|
||||||
static int RecoverPreparedTransactions(void);
|
static int RecoverPreparedTransactions(void);
|
||||||
static int RecoverWorkerTransactions(WorkerNode *workerNode);
|
static int RecoverWorkerTransactions(WorkerNode *workerNode);
|
||||||
static List * NameListDifference(List *nameList, List *subtractList);
|
|
||||||
static int CompareNames(const void *leftPointer, const void *rightPointer);
|
|
||||||
static bool FindMatchingName(char **nameArray, int nameCount, char *needle,
|
|
||||||
int *matchIndex);
|
|
||||||
static List * PendingWorkerTransactionList(MultiConnection *connection);
|
static List * PendingWorkerTransactionList(MultiConnection *connection);
|
||||||
static List * UnconfirmedWorkerTransactionsList(int groupId);
|
static bool IsTransactionInProgress(HTAB *activeTransactionNumberSet,
|
||||||
static void DeleteTransactionRecord(int32 groupId, char *transactionName);
|
char *preparedTransactionName);
|
||||||
|
static bool RecoverPreparedTransactionOnWorker(MultiConnection *connection,
|
||||||
|
char *transactionName, bool shouldCommit);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -120,13 +119,6 @@ RecoverPreparedTransactions(void)
|
||||||
ListCell *workerNodeCell = NULL;
|
ListCell *workerNodeCell = NULL;
|
||||||
int recoveredTransactionCount = 0;
|
int recoveredTransactionCount = 0;
|
||||||
|
|
||||||
/*
|
|
||||||
* We block here if metadata transactions are ongoing, since we
|
|
||||||
* mustn't commit/abort their prepared transactions under their
|
|
||||||
* feet. We also prevent concurrent recovery.
|
|
||||||
*/
|
|
||||||
LockRelationOid(DistTransactionRelationId(), ExclusiveLock);
|
|
||||||
|
|
||||||
workerList = ActivePrimaryNodeList();
|
workerList = ActivePrimaryNodeList();
|
||||||
|
|
||||||
foreach(workerNodeCell, workerList)
|
foreach(workerNodeCell, workerList)
|
||||||
|
@ -153,25 +145,33 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
||||||
char *nodeName = workerNode->workerName;
|
char *nodeName = workerNode->workerName;
|
||||||
int nodePort = workerNode->workerPort;
|
int nodePort = workerNode->workerPort;
|
||||||
|
|
||||||
|
List *activeTransactionNumberList = NIL;
|
||||||
|
HTAB *activeTransactionNumberSet = NULL;
|
||||||
|
|
||||||
List *pendingTransactionList = NIL;
|
List *pendingTransactionList = NIL;
|
||||||
ListCell *pendingTransactionCell = NULL;
|
HTAB *pendingTransactionSet = NULL;
|
||||||
|
|
||||||
List *unconfirmedTransactionList = NIL;
|
Relation pgDistTransaction = NULL;
|
||||||
char **unconfirmedTransactionArray = NULL;
|
SysScanDesc scanDescriptor = NULL;
|
||||||
int unconfirmedTransactionCount = 0;
|
ScanKeyData scanKey[1];
|
||||||
int unconfirmedTransactionIndex = 0;
|
int scanKeyCount = 1;
|
||||||
|
bool indexOK = true;
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
|
||||||
List *committedTransactionList = NIL;
|
HASH_SEQ_STATUS status;
|
||||||
ListCell *committedTransactionCell = NULL;
|
|
||||||
|
|
||||||
MemoryContext localContext = NULL;
|
MemoryContext localContext = NULL;
|
||||||
MemoryContext oldContext = NULL;
|
MemoryContext oldContext = NULL;
|
||||||
|
bool recoveryFailed = false;
|
||||||
|
|
||||||
int connectionFlags = SESSION_LIFESPAN;
|
int connectionFlags = SESSION_LIFESPAN;
|
||||||
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
|
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
|
||||||
if (connection->pgConn == NULL)
|
if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK)
|
||||||
{
|
{
|
||||||
/* cannot recover transactions on this worker right now */
|
ereport(WARNING, (errmsg("transaction recovery cannot connect to %s:%d",
|
||||||
|
nodeName, nodePort)));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,191 +182,130 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
||||||
ALLOCSET_DEFAULT_MAXSIZE);
|
ALLOCSET_DEFAULT_MAXSIZE);
|
||||||
oldContext = MemoryContextSwitchTo(localContext);
|
oldContext = MemoryContextSwitchTo(localContext);
|
||||||
|
|
||||||
/* find transactions that were committed, but not yet confirmed */
|
/* take table lock first to avoid running concurrently */
|
||||||
unconfirmedTransactionList = UnconfirmedWorkerTransactionsList(groupId);
|
pgDistTransaction = heap_open(DistTransactionRelationId(), ShareUpdateExclusiveLock);
|
||||||
unconfirmedTransactionList = SortList(unconfirmedTransactionList, CompareNames);
|
tupleDescriptor = RelationGetDescr(pgDistTransaction);
|
||||||
|
|
||||||
/* convert list to an array to use with FindMatchingNames */
|
/* find in-progress distributed transactions */
|
||||||
unconfirmedTransactionCount = list_length(unconfirmedTransactionList);
|
activeTransactionNumberList = ActiveDistributedTransactionNumbers();
|
||||||
unconfirmedTransactionArray =
|
activeTransactionNumberSet = ListToHashSet(activeTransactionNumberList,
|
||||||
(char **) PointerArrayFromList(unconfirmedTransactionList);
|
sizeof(uint64), false);
|
||||||
|
|
||||||
/* find stale prepared transactions on the remote node */
|
/* find stale prepared transactions on the remote node */
|
||||||
pendingTransactionList = PendingWorkerTransactionList(connection);
|
pendingTransactionList = PendingWorkerTransactionList(connection);
|
||||||
pendingTransactionList = SortList(pendingTransactionList, CompareNames);
|
pendingTransactionSet = ListToHashSet(pendingTransactionList, NAMEDATALEN, true);
|
||||||
|
|
||||||
/*
|
/* scan through all recovery records of the current worker */
|
||||||
* Transactions that have no pending prepared transaction are assumed to
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid,
|
||||||
* have been committed. Any records in unconfirmedTransactionList that
|
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
|
||||||
* don't have a transaction in pendingTransactionList can be removed.
|
|
||||||
*/
|
|
||||||
committedTransactionList = NameListDifference(unconfirmedTransactionList,
|
|
||||||
pendingTransactionList);
|
|
||||||
|
|
||||||
/*
|
scanDescriptor = systable_beginscan(pgDistTransaction,
|
||||||
* For each pending prepared transaction, check whether there is a transaction
|
DistTransactionGroupIndexId(), indexOK,
|
||||||
* record. If so, commit. If not, the transaction that started the transaction
|
NULL, scanKeyCount, scanKey);
|
||||||
* must have rolled back and thus the prepared transaction should be aborted.
|
|
||||||
*/
|
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||||
foreach(pendingTransactionCell, pendingTransactionList)
|
|
||||||
{
|
{
|
||||||
char *transactionName = (char *) lfirst(pendingTransactionCell);
|
bool isNull = false;
|
||||||
StringInfo command = makeStringInfo();
|
bool isTransactionInProgress = false;
|
||||||
int executeCommand = 0;
|
bool isTransactionPending = false;
|
||||||
PGresult *result = NULL;
|
|
||||||
|
|
||||||
bool shouldCommit = FindMatchingName(unconfirmedTransactionArray,
|
Datum transactionNameDatum = heap_getattr(heapTuple,
|
||||||
unconfirmedTransactionCount,
|
Anum_pg_dist_transaction_gid,
|
||||||
transactionName,
|
tupleDescriptor, &isNull);
|
||||||
&unconfirmedTransactionIndex);
|
char *transactionName = TextDatumGetCString(transactionNameDatum);
|
||||||
|
|
||||||
if (shouldCommit)
|
isTransactionInProgress = IsTransactionInProgress(activeTransactionNumberSet,
|
||||||
|
transactionName);
|
||||||
|
if (isTransactionInProgress)
|
||||||
{
|
{
|
||||||
/* should have committed this prepared transaction */
|
/*
|
||||||
appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName);
|
* Do not touch in progress transactions as we might mistakenly
|
||||||
}
|
* commit a transaction that is actually in the process of
|
||||||
else
|
* aborting or vice-versa.
|
||||||
{
|
*/
|
||||||
/* no record of this prepared transaction, abort */
|
|
||||||
appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName);
|
|
||||||
}
|
|
||||||
|
|
||||||
executeCommand = ExecuteOptionalRemoteCommand(connection, command->data, &result);
|
|
||||||
if (executeCommand == QUERY_SEND_FAILED)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (executeCommand == RESPONSE_NOT_OKAY)
|
|
||||||
{
|
|
||||||
/* cannot recover this transaction right now */
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
PQclear(result);
|
/*
|
||||||
ForgetResults(connection);
|
* Remove the transaction from the pending list such that only transactions
|
||||||
|
* that need to be aborted remain at the end.
|
||||||
|
*/
|
||||||
|
hash_search(pendingTransactionSet, transactionName, HASH_REMOVE,
|
||||||
|
&isTransactionPending);
|
||||||
|
|
||||||
ereport(NOTICE, (errmsg("recovered a prepared transaction on %s:%d",
|
if (isTransactionPending)
|
||||||
nodeName, nodePort),
|
|
||||||
errcontext("%s", command->data)));
|
|
||||||
|
|
||||||
if (shouldCommit)
|
|
||||||
{
|
|
||||||
committedTransactionList = lappend(committedTransactionList,
|
|
||||||
transactionName);
|
|
||||||
}
|
|
||||||
|
|
||||||
recoveredTransactionCount += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* we can remove the transaction records of confirmed transactions */
|
|
||||||
foreach(committedTransactionCell, committedTransactionList)
|
|
||||||
{
|
|
||||||
char *transactionName = (char *) lfirst(committedTransactionCell);
|
|
||||||
|
|
||||||
DeleteTransactionRecord(groupId, transactionName);
|
|
||||||
}
|
|
||||||
|
|
||||||
MemoryContextReset(localContext);
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
|
||||||
|
|
||||||
return recoveredTransactionCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* NameListDifference returns the difference between the bag of
|
|
||||||
* names in nameList and subtractList. Both are assumed to be
|
|
||||||
* sorted. We cannot use list_difference_ptr here since we need
|
|
||||||
* to compare the actual strings.
|
|
||||||
*/
|
|
||||||
static List *
|
|
||||||
NameListDifference(List *nameList, List *subtractList)
|
|
||||||
{
|
|
||||||
List *differenceList = NIL;
|
|
||||||
ListCell *nameCell = NULL;
|
|
||||||
|
|
||||||
int subtractIndex = 0;
|
|
||||||
int subtractCount = list_length(subtractList);
|
|
||||||
char **subtractArray = (char **) PointerArrayFromList(subtractList);
|
|
||||||
|
|
||||||
foreach(nameCell, nameList)
|
|
||||||
{
|
|
||||||
char *baseName = (char *) lfirst(nameCell);
|
|
||||||
|
|
||||||
bool nameFound = FindMatchingName(subtractArray, subtractCount,
|
|
||||||
baseName, &subtractIndex);
|
|
||||||
|
|
||||||
if (!nameFound)
|
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* baseName is not in subtractArray and thus included
|
* The transaction was committed, but the prepared transaction still exists
|
||||||
* in the difference.
|
* on the worker. Try committing it.
|
||||||
*/
|
*/
|
||||||
differenceList = lappend(differenceList, baseName);
|
bool shouldCommit = true;
|
||||||
|
bool commitSucceeded = RecoverPreparedTransactionOnWorker(connection,
|
||||||
|
transactionName,
|
||||||
|
shouldCommit);
|
||||||
|
if (!commitSucceeded)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Failed to commit on the current worker. Stop without throwing
|
||||||
|
* an error to allow recover_prepared_transactions to continue with
|
||||||
|
* other workers.
|
||||||
|
*/
|
||||||
|
recoveryFailed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
recoveredTransactionCount++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* we've successfully committed the prepared transaction, remove the record */
|
||||||
|
simple_heap_delete(pgDistTransaction, &heapTuple->t_self);
|
||||||
}
|
}
|
||||||
|
|
||||||
pfree(subtractArray);
|
systable_endscan(scanDescriptor);
|
||||||
|
heap_close(pgDistTransaction, NoLock);
|
||||||
|
|
||||||
return differenceList;
|
if (!recoveryFailed)
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CompareNames compares names using strncmp. Its signature allows it to
|
|
||||||
* be used in qsort.
|
|
||||||
*/
|
|
||||||
static int
|
|
||||||
CompareNames(const void *leftPointer, const void *rightPointer)
|
|
||||||
{
|
|
||||||
const char *leftString = *((char **) leftPointer);
|
|
||||||
const char *rightString = *((char **) rightPointer);
|
|
||||||
|
|
||||||
int nameCompare = strncmp(leftString, rightString, NAMEDATALEN);
|
|
||||||
|
|
||||||
return nameCompare;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* FindMatchingName searches for name in nameArray, starting at the
|
|
||||||
* value pointed to by matchIndex and stopping at the first index of
|
|
||||||
* name which is greater or equal to needle. nameArray is assumed
|
|
||||||
* to be sorted.
|
|
||||||
*
|
|
||||||
* The function sets matchIndex to the index of the name and returns
|
|
||||||
* true if the name is equal to needle. If matchIndex >= nameCount,
|
|
||||||
* then the function always returns false.
|
|
||||||
*/
|
|
||||||
static bool
|
|
||||||
FindMatchingName(char **nameArray, int nameCount, char *needle,
|
|
||||||
int *matchIndex)
|
|
||||||
{
|
|
||||||
bool foundMatchingName = false;
|
|
||||||
int searchIndex = *matchIndex;
|
|
||||||
int compareResult = -1;
|
|
||||||
|
|
||||||
while (searchIndex < nameCount)
|
|
||||||
{
|
{
|
||||||
char *testName = nameArray[searchIndex];
|
char *pendingTransactionName = NULL;
|
||||||
compareResult = strncmp(needle, testName, NAMEDATALEN);
|
bool abortSucceeded = true;
|
||||||
|
|
||||||
if (compareResult <= 0)
|
/*
|
||||||
|
* All remaining prepared transactions that are not part of an in-progress
|
||||||
|
* distributed transaction should be aborted since we did not find a recovery
|
||||||
|
* record, which implies the disributed transaction aborted.
|
||||||
|
*/
|
||||||
|
hash_seq_init(&status, pendingTransactionSet);
|
||||||
|
|
||||||
|
while ((pendingTransactionName = hash_seq_search(&status)) != NULL)
|
||||||
{
|
{
|
||||||
break;
|
bool isTransactionInProgress = false;
|
||||||
|
bool shouldCommit = false;
|
||||||
|
|
||||||
|
isTransactionInProgress = IsTransactionInProgress(activeTransactionNumberSet,
|
||||||
|
pendingTransactionName);
|
||||||
|
if (isTransactionInProgress)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
shouldCommit = false;
|
||||||
|
abortSucceeded = RecoverPreparedTransactionOnWorker(connection,
|
||||||
|
pendingTransactionName,
|
||||||
|
shouldCommit);
|
||||||
|
if (!abortSucceeded)
|
||||||
|
{
|
||||||
|
hash_seq_term(&status);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
recoveredTransactionCount++;
|
||||||
}
|
}
|
||||||
|
|
||||||
searchIndex++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
*matchIndex = searchIndex;
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
MemoryContextDelete(localContext);
|
||||||
|
|
||||||
if (compareResult == 0)
|
return recoveredTransactionCount;
|
||||||
{
|
|
||||||
foundMatchingName = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return foundMatchingName;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -420,113 +359,73 @@ PendingWorkerTransactionList(MultiConnection *connection)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* UnconfirmedWorkerTransactionList returns a list of unconfirmed transactions
|
* IsTransactionInProgress returns whether the distributed transaction to which
|
||||||
* for a group of workers from pg_dist_transaction. A transaction is confirmed
|
* preparedTransactionName belongs is still in progress, or false if the
|
||||||
* once we have verified that it does not exist in pg_prepared_xacts on the
|
* transaction name cannot be parsed. This can happen when the user manually
|
||||||
* remote node and the entry in pg_dist_transaction is removed.
|
* inserts into pg_dist_transaction.
|
||||||
*/
|
*/
|
||||||
static List *
|
static bool
|
||||||
UnconfirmedWorkerTransactionsList(int groupId)
|
IsTransactionInProgress(HTAB *activeTransactionNumberSet, char *preparedTransactionName)
|
||||||
{
|
{
|
||||||
List *transactionNameList = NIL;
|
int groupId = 0;
|
||||||
Relation pgDistTransaction = NULL;
|
int procId = 0;
|
||||||
SysScanDesc scanDescriptor = NULL;
|
uint32 connectionNumber = 0;
|
||||||
ScanKeyData scanKey[1];
|
uint64 transactionNumber = 0;
|
||||||
int scanKeyCount = 1;
|
bool isValidName = false;
|
||||||
bool indexOK = true;
|
bool isTransactionInProgress = false;
|
||||||
HeapTuple heapTuple = NULL;
|
|
||||||
|
|
||||||
pgDistTransaction = heap_open(DistTransactionRelationId(), AccessShareLock);
|
isValidName = ParsePreparedTransactionName(preparedTransactionName, &groupId, &procId,
|
||||||
|
&transactionNumber, &connectionNumber);
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid,
|
if (isValidName)
|
||||||
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
|
|
||||||
|
|
||||||
scanDescriptor = systable_beginscan(pgDistTransaction,
|
|
||||||
DistTransactionGroupIndexId(), indexOK,
|
|
||||||
NULL, scanKeyCount, scanKey);
|
|
||||||
|
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
|
||||||
while (HeapTupleIsValid(heapTuple))
|
|
||||||
{
|
{
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction);
|
hash_search(activeTransactionNumberSet, &transactionNumber, HASH_FIND,
|
||||||
bool isNull = false;
|
&isTransactionInProgress);
|
||||||
|
|
||||||
Datum transactionNameDatum = heap_getattr(heapTuple,
|
|
||||||
Anum_pg_dist_transaction_gid,
|
|
||||||
tupleDescriptor, &isNull);
|
|
||||||
|
|
||||||
char *transactionName = TextDatumGetCString(transactionNameDatum);
|
|
||||||
transactionNameList = lappend(transactionNameList, transactionName);
|
|
||||||
|
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
return isTransactionInProgress;
|
||||||
heap_close(pgDistTransaction, AccessShareLock);
|
|
||||||
|
|
||||||
return transactionNameList;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DeleteTransactionRecord opens the pg_dist_transaction system catalog, finds the
|
* RecoverPreparedTransactionOnWorker recovers a single prepared transaction over
|
||||||
* first (unique) row that corresponds to the given transactionName and worker node,
|
* the given connection. If shouldCommit is true we send
|
||||||
* and deletes this row.
|
|
||||||
*/
|
*/
|
||||||
static void
|
static bool
|
||||||
DeleteTransactionRecord(int32 groupId, char *transactionName)
|
RecoverPreparedTransactionOnWorker(MultiConnection *connection, char *transactionName,
|
||||||
|
bool shouldCommit)
|
||||||
{
|
{
|
||||||
Relation pgDistTransaction = NULL;
|
StringInfo command = makeStringInfo();
|
||||||
SysScanDesc scanDescriptor = NULL;
|
PGresult *result = NULL;
|
||||||
ScanKeyData scanKey[2];
|
int executeCommand = 0;
|
||||||
int scanKeyCount = 2;
|
bool raiseInterrupts = false;
|
||||||
bool indexOK = true;
|
|
||||||
HeapTuple heapTuple = NULL;
|
|
||||||
bool heapTupleFound = false;
|
|
||||||
|
|
||||||
pgDistTransaction = heap_open(DistTransactionRelationId(), RowExclusiveLock);
|
if (shouldCommit)
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid,
|
|
||||||
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
|
|
||||||
ScanKeyInit(&scanKey[1], Anum_pg_dist_transaction_gid,
|
|
||||||
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(transactionName));
|
|
||||||
|
|
||||||
scanDescriptor = systable_beginscan(pgDistTransaction,
|
|
||||||
DistTransactionRecordIndexId(), indexOK,
|
|
||||||
NULL, scanKeyCount, scanKey);
|
|
||||||
|
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
|
||||||
while (HeapTupleIsValid(heapTuple))
|
|
||||||
{
|
{
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction);
|
/* should have committed this prepared transaction */
|
||||||
bool isNull = false;
|
appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName);
|
||||||
|
}
|
||||||
Datum gidDatum = heap_getattr(heapTuple,
|
else
|
||||||
Anum_pg_dist_transaction_gid,
|
{
|
||||||
tupleDescriptor, &isNull);
|
/* should have aborted this prepared transaction */
|
||||||
|
appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName);
|
||||||
char *gid = TextDatumGetCString(gidDatum);
|
|
||||||
|
|
||||||
if (strncmp(transactionName, gid, NAMEDATALEN) == 0)
|
|
||||||
{
|
|
||||||
heapTupleFound = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if we couldn't find the transaction record to delete, error out */
|
executeCommand = ExecuteOptionalRemoteCommand(connection, command->data, &result);
|
||||||
if (!heapTupleFound)
|
if (executeCommand == QUERY_SEND_FAILED)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("could not find valid entry for transaction record "
|
return false;
|
||||||
"'%s' in group %d",
|
}
|
||||||
transactionName, groupId)));
|
if (executeCommand == RESPONSE_NOT_OKAY)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
simple_heap_delete(pgDistTransaction, &heapTuple->t_self);
|
PQclear(result);
|
||||||
CommandCounterIncrement();
|
ClearResults(connection, raiseInterrupts);
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
ereport(LOG, (errmsg("recovered a prepared transaction on %s:%d",
|
||||||
heap_close(pgDistTransaction, RowExclusiveLock);
|
connection->hostname, connection->port),
|
||||||
|
errcontext("%s", command->data)));
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,3 +102,49 @@ DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId)
|
||||||
|
|
||||||
return arrayObject;
|
return arrayObject;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ListToHashSet creates a hash table in which the keys are copied from
|
||||||
|
* from itemList and the values are the same as the keys. This can
|
||||||
|
* be used for fast lookups of the presence of a byte array in a set.
|
||||||
|
*
|
||||||
|
* If isStringList is true, then look-ups are performed through string
|
||||||
|
* comparison of strings up to keySize in length. If isStringList is
|
||||||
|
* false, then look-ups are performed by comparing exactly keySize
|
||||||
|
* bytes pointed to by the pointers in itemList.
|
||||||
|
*/
|
||||||
|
HTAB *
|
||||||
|
ListToHashSet(List *itemList, Size keySize, bool isStringList)
|
||||||
|
{
|
||||||
|
HASHCTL info;
|
||||||
|
HTAB *itemSet = NULL;
|
||||||
|
ListCell *itemCell = NULL;
|
||||||
|
int flags = HASH_ELEM;
|
||||||
|
|
||||||
|
/* allocate sufficient capacity for O(1) expected look-up time */
|
||||||
|
int capacity = (int) (list_length(itemList) / 0.75) + 1;
|
||||||
|
|
||||||
|
/* initialise the hash table for looking up keySize bytes */
|
||||||
|
memset(&info, 0, sizeof(info));
|
||||||
|
info.keysize = keySize;
|
||||||
|
info.entrysize = keySize;
|
||||||
|
|
||||||
|
if (!isStringList)
|
||||||
|
{
|
||||||
|
info.hash = tag_hash;
|
||||||
|
flags |= HASH_FUNCTION;
|
||||||
|
}
|
||||||
|
|
||||||
|
itemSet = hash_create("ListToHashSet", capacity, &info, flags);
|
||||||
|
|
||||||
|
foreach(itemCell, itemList)
|
||||||
|
{
|
||||||
|
void *item = lfirst(itemCell);
|
||||||
|
bool foundInSet = false;
|
||||||
|
|
||||||
|
hash_search(itemSet, item, HASH_ENTER, &foundInSet);
|
||||||
|
}
|
||||||
|
|
||||||
|
return itemSet;
|
||||||
|
}
|
||||||
|
|
|
@ -43,5 +43,6 @@ extern void AssignDistributedTransactionId(void);
|
||||||
extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
|
extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
|
||||||
extern void CancelTransactionDueToDeadlock(PGPROC *proc);
|
extern void CancelTransactionDueToDeadlock(PGPROC *proc);
|
||||||
extern bool MyBackendGotCancelledDueToDeadlock(void);
|
extern bool MyBackendGotCancelledDueToDeadlock(void);
|
||||||
|
extern List * ActiveDistributedTransactionNumbers(void);
|
||||||
|
|
||||||
#endif /* BACKEND_DATA_H */
|
#endif /* BACKEND_DATA_H */
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
#include "nodes/pg_list.h"
|
#include "nodes/pg_list.h"
|
||||||
#include "utils/array.h"
|
#include "utils/array.h"
|
||||||
|
#include "utils/hsearch.h"
|
||||||
|
|
||||||
|
|
||||||
/* utility functions declaration shared within this module */
|
/* utility functions declaration shared within this module */
|
||||||
|
@ -25,5 +26,6 @@ extern List * SortList(List *pointerList,
|
||||||
extern void ** PointerArrayFromList(List *pointerList);
|
extern void ** PointerArrayFromList(List *pointerList);
|
||||||
extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount,
|
extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount,
|
||||||
Oid datumTypeId);
|
Oid datumTypeId);
|
||||||
|
extern HTAB * ListToHashSet(List *pointerList, Size keySize, bool isStringList);
|
||||||
|
|
||||||
#endif /* CITUS_LISTUTILS_H */
|
#endif /* CITUS_LISTUTILS_H */
|
||||||
|
|
|
@ -79,6 +79,11 @@ typedef struct RemoteTransaction
|
||||||
} RemoteTransaction;
|
} RemoteTransaction;
|
||||||
|
|
||||||
|
|
||||||
|
/* utility functions for dealing with remote transactions */
|
||||||
|
extern bool ParsePreparedTransactionName(char *preparedTransactionName, int *groupId,
|
||||||
|
int *procId, uint64 *transactionNumber,
|
||||||
|
uint32 *connectionNumber);
|
||||||
|
|
||||||
/* change an individual remote transaction's state */
|
/* change an individual remote transaction's state */
|
||||||
extern void StartRemoteTransactionBegin(struct MultiConnection *connection);
|
extern void StartRemoteTransactionBegin(struct MultiConnection *connection);
|
||||||
extern void FinishRemoteTransactionBegin(struct MultiConnection *connection);
|
extern void FinishRemoteTransactionBegin(struct MultiConnection *connection);
|
||||||
|
|
|
@ -15,8 +15,7 @@ recover_prepared_transactions
|
||||||
0
|
0
|
||||||
step s2-insert:
|
step s2-insert:
|
||||||
INSERT INTO test_transaction_recovery VALUES (1,2);
|
INSERT INTO test_transaction_recovery VALUES (1,2);
|
||||||
<waiting ...>
|
|
||||||
step s1-commit:
|
step s1-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
step s2-insert: <... completed>
|
|
||||||
|
|
|
@ -259,12 +259,6 @@ SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport = :worker_1_po
|
||||||
INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_commit');
|
INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_commit');
|
||||||
INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_be_forgotten');
|
INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_be_forgotten');
|
||||||
SELECT recover_prepared_transactions();
|
SELECT recover_prepared_transactions();
|
||||||
NOTICE: recovered a prepared transaction on localhost:57637
|
|
||||||
CONTEXT: ROLLBACK PREPARED 'citus_0_should_abort'
|
|
||||||
NOTICE: recovered a prepared transaction on localhost:57637
|
|
||||||
CONTEXT: ROLLBACK PREPARED 'citus_0_should_be_sorted_into_middle'
|
|
||||||
NOTICE: recovered a prepared transaction on localhost:57637
|
|
||||||
CONTEXT: COMMIT PREPARED 'citus_0_should_commit'
|
|
||||||
recover_prepared_transactions
|
recover_prepared_transactions
|
||||||
-------------------------------
|
-------------------------------
|
||||||
3
|
3
|
||||||
|
|
|
@ -46,12 +46,6 @@ PREPARE TRANSACTION 'citus_12_should_be_sorted_into_middle';
|
||||||
INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_commit');
|
INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_commit');
|
||||||
INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_be_forgotten');
|
INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_be_forgotten');
|
||||||
SELECT recover_prepared_transactions();
|
SELECT recover_prepared_transactions();
|
||||||
NOTICE: recovered a prepared transaction on localhost:57637
|
|
||||||
CONTEXT: ROLLBACK PREPARED 'citus_12_should_abort'
|
|
||||||
NOTICE: recovered a prepared transaction on localhost:57637
|
|
||||||
CONTEXT: ROLLBACK PREPARED 'citus_12_should_be_sorted_into_middle'
|
|
||||||
NOTICE: recovered a prepared transaction on localhost:57637
|
|
||||||
CONTEXT: COMMIT PREPARED 'citus_12_should_commit'
|
|
||||||
recover_prepared_transactions
|
recover_prepared_transactions
|
||||||
-------------------------------
|
-------------------------------
|
||||||
3
|
3
|
||||||
|
|
|
@ -28,12 +28,6 @@ PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle';
|
||||||
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit');
|
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit');
|
||||||
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten');
|
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten');
|
||||||
SELECT recover_prepared_transactions();
|
SELECT recover_prepared_transactions();
|
||||||
NOTICE: recovered a prepared transaction on localhost:57637
|
|
||||||
CONTEXT: ROLLBACK PREPARED 'citus_0_should_abort'
|
|
||||||
NOTICE: recovered a prepared transaction on localhost:57637
|
|
||||||
CONTEXT: ROLLBACK PREPARED 'citus_0_should_be_sorted_into_middle'
|
|
||||||
NOTICE: recovered a prepared transaction on localhost:57637
|
|
||||||
CONTEXT: COMMIT PREPARED 'citus_0_should_commit'
|
|
||||||
recover_prepared_transactions
|
recover_prepared_transactions
|
||||||
-------------------------------
|
-------------------------------
|
||||||
3
|
3
|
||||||
|
|
|
@ -33,5 +33,5 @@ step "s2-insert"
|
||||||
INSERT INTO test_transaction_recovery VALUES (1,2);
|
INSERT INTO test_transaction_recovery VALUES (1,2);
|
||||||
}
|
}
|
||||||
|
|
||||||
# Recovery and 2PCs should block each other
|
# Recovery and 2PCs should not block each other
|
||||||
permutation "s1-begin" "s1-recover" "s2-insert" "s1-commit"
|
permutation "s1-begin" "s1-recover" "s2-insert" "s1-commit"
|
||||||
|
|
Loading…
Reference in New Issue