diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 59a509507..bf619ebc0 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -3,6 +3,11 @@ * remote_transaction.c * Management of transaction spanning more than one node. * + * Since the functions defined in this file mostly allocate in + * CitusXactCallbackContext, we mostly try doing allocations on stack. + * And when it's hard to do so, we at least try freeing the heap memory + * immediately after an object becomes useless. + * * Copyright (c) Citus Data, Inc. * *------------------------------------------------------------------------- @@ -106,11 +111,21 @@ StartRemoteTransactionBegin(struct MultiConnection *connection) appendStringInfoString(beginAndSetDistributedTransactionId, activeSetStmts->data); } + char *assignDistributedTransactionIdCommand = AssignDistributedTransactionIdCommand(); + /* add SELECT assign_distributed_transaction_id ... */ appendStringInfoString(beginAndSetDistributedTransactionId, - AssignDistributedTransactionIdCommand()); + assignDistributedTransactionIdCommand); - if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data)) + pfree(assignDistributedTransactionIdCommand); + + bool success = SendRemoteCommand(connection, + beginAndSetDistributedTransactionId->data); + + pfree(beginAndSetDistributedTransactionId->data); + pfree(beginAndSetDistributedTransactionId); + + if (!success) { const bool raiseErrors = true; @@ -169,7 +184,11 @@ AssignDistributedTransactionIdCommand(void) distributedTransactionId->transactionNumber, timestamp); - return assignDistributedTransactionId->data; + /* free the StringInfo but not the buffer itself */ + char *command = assignDistributedTransactionId->data; + pfree(assignDistributedTransactionId); + + return command; } @@ -268,16 +287,21 @@ StartRemoteTransactionCommit(MultiConnection *connection) } else if (transaction->transactionState == REMOTE_TRANS_PREPARED) { - /* commit the prepared transaction */ - StringInfoData command; - - initStringInfo(&command); - appendStringInfo(&command, "COMMIT PREPARED %s", - quote_literal_cstr(transaction->preparedName)); + /* + * Commit the prepared transaction. + * + * We need to allocate 420 bytes for command buffer (including '\0'): + * - len("COMMIT PREPARED ") = 16 + * - maximum quoted length of transaction->preparedName = 2 * 200 + 3 = 403 + */ + char command[420]; + char *quotedPrepName = quote_literal_cstr(transaction->preparedName); + SafeSnprintf(command, sizeof(command), "COMMIT PREPARED %s", quotedPrepName); + pfree(quotedPrepName); transaction->transactionState = REMOTE_TRANS_2PC_COMMITTING; - if (!SendRemoteCommand(connection, command.data)) + if (!SendRemoteCommand(connection, command)) { HandleRemoteTransactionConnectionError(connection, raiseErrors); } @@ -395,16 +419,21 @@ StartRemoteTransactionAbort(MultiConnection *connection) if (transaction->transactionState == REMOTE_TRANS_PREPARING || transaction->transactionState == REMOTE_TRANS_PREPARED) { - StringInfoData command; - - /* await PREPARE TRANSACTION results, closing the connection would leave it dangling */ ForgetResults(connection); - initStringInfo(&command); - appendStringInfo(&command, "ROLLBACK PREPARED %s", - quote_literal_cstr(transaction->preparedName)); + /* + * Await PREPARE TRANSACTION results, closing the connection would leave it dangling. + * + * We need to allocate 422 bytes for command buffer (including '\0'): + * - len("ROLLBACK PREPARED ") = 18 + * - maximum quoted length of transaction->preparedName = 2 * 200 + 3 = 403 + */ + char command[422]; + char *quotedPrepName = quote_literal_cstr(transaction->preparedName); + SafeSnprintf(command, sizeof(command), "ROLLBACK PREPARED %s", quotedPrepName); + pfree(quotedPrepName); - if (!SendRemoteCommand(connection, command.data)) + if (!SendRemoteCommand(connection, command)) { HandleRemoteTransactionConnectionError(connection, raiseErrors); } @@ -498,7 +527,6 @@ void StartRemoteTransactionPrepare(struct MultiConnection *connection) { RemoteTransaction *transaction = &connection->remoteTransaction; - StringInfoData command; const bool raiseErrors = true; /* can't prepare a nonexistant transaction */ @@ -519,11 +547,17 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection) LogTransactionRecord(workerNode->groupId, transaction->preparedName); } - initStringInfo(&command); - appendStringInfo(&command, "PREPARE TRANSACTION %s", - quote_literal_cstr(transaction->preparedName)); + /* + * We need to allocate 424 bytes for command buffer (including '\0'): + * - len("PREPARE TRANSACTION ") = 20 + * - maximum quoted length of transaction->preparedName = 2 * 200 + 3 = 403 + */ + char command[424]; + char *quotedPrepName = quote_literal_cstr(transaction->preparedName); + SafeSnprintf(command, sizeof(command), "PREPARE TRANSACTION %s", quotedPrepName); + pfree(quotedPrepName); - if (!SendRemoteCommand(connection, command.data)) + if (!SendRemoteCommand(connection, command)) { HandleRemoteTransactionConnectionError(connection, raiseErrors); } @@ -841,6 +875,8 @@ CoordinatedRemoteTransactionsPrepare(void) } CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; + + list_free(connectionList); } @@ -905,6 +941,8 @@ CoordinatedRemoteTransactionsCommit(void) FinishRemoteTransactionCommit(connection); } + + list_free(connectionList); } @@ -958,6 +996,8 @@ CoordinatedRemoteTransactionsAbort(void) FinishRemoteTransactionAbort(connection); } + + list_free(connectionList); } @@ -1008,6 +1048,8 @@ CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId) transaction->lastSuccessfulSubXact = subId; } } + + list_free(connectionList); } @@ -1053,6 +1095,8 @@ CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId) FinishRemoteTransactionSavepointRelease(connection, subId); } + + list_free(connectionList); } @@ -1134,6 +1178,8 @@ CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId) */ UnclaimConnection(connection); } + + list_free(connectionList); } @@ -1145,10 +1191,17 @@ static void StartRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactionId subId) { const bool raiseErrors = true; - StringInfo savepointCommand = makeStringInfo(); - appendStringInfo(savepointCommand, "SAVEPOINT savepoint_%u", subId); - if (!SendRemoteCommand(connection, savepointCommand->data)) + /* + * We need to allocate 31 bytes for command buffer (including '\0'): + * - len("SAVEPOINT savepoint_") = 20 + * - maximum length of str(subId) = 10 + */ + char savepointCommand[31]; + SafeSnprintf(savepointCommand, sizeof(savepointCommand), "SAVEPOINT savepoint_%u", + subId); + + if (!SendRemoteCommand(connection, savepointCommand)) { HandleRemoteTransactionConnectionError(connection, raiseErrors); } @@ -1184,10 +1237,17 @@ StartRemoteTransactionSavepointRelease(MultiConnection *connection, SubTransactionId subId) { const bool raiseErrors = true; - StringInfo savepointCommand = makeStringInfo(); - appendStringInfo(savepointCommand, "RELEASE SAVEPOINT savepoint_%u", subId); - if (!SendRemoteCommand(connection, savepointCommand->data)) + /* + * We need to allocate 39 bytes for command buffer (including '\0'): + * - len("RELEASE SAVEPOINT savepoint_") = 28 + * - maximum length of str(subId) = 10 + */ + char savepointCommand[39]; + SafeSnprintf(savepointCommand, sizeof(savepointCommand), + "RELEASE SAVEPOINT savepoint_%u", subId); + + if (!SendRemoteCommand(connection, savepointCommand)) { HandleRemoteTransactionConnectionError(connection, raiseErrors); } @@ -1224,10 +1284,17 @@ StartRemoteTransactionSavepointRollback(MultiConnection *connection, SubTransactionId subId) { const bool raiseErrors = false; - StringInfo savepointCommand = makeStringInfo(); - appendStringInfo(savepointCommand, "ROLLBACK TO SAVEPOINT savepoint_%u", subId); - if (!SendRemoteCommand(connection, savepointCommand->data)) + /* + * We need to allocate 43 bytes for command buffer (including '\0'): + * - len("ROLLBACK TO SAVEPOINT savepoint_") = 32 + * - maximum length of str(subId) = 10 + */ + char savepointCommand[43]; + SafeSnprintf(savepointCommand, sizeof(savepointCommand), + "ROLLBACK TO SAVEPOINT savepoint_%u", subId); + + if (!SendRemoteCommand(connection, savepointCommand)) { HandleRemoteTransactionConnectionError(connection, raiseErrors); } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 82c3fb5f7..332298e3a 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -90,11 +90,14 @@ StringInfo activeSetStmts; * PostgreSQL with a sub-xact callback). At present, the context of a subxact * includes a subxact identifier as well as any SET LOCAL statements propagated * to workers during the sub-transaction. + * + * To be clear, last item of activeSubXactContexts list corresponds to top of + * stack. */ static List *activeSubXactContexts = NIL; /* some pre-allocated memory so we don't need to call malloc() during callbacks */ -MemoryContext CommitContext = NULL; +MemoryContext CitusXactCallbackContext = NULL; /* * Should this coordinated transaction use 2PC? Set by @@ -245,11 +248,11 @@ InitializeTransactionManagement(void) AdjustMaxPreparedTransactions(); /* set aside 8kb of memory for use in CoordinatedTransactionCallback */ - CommitContext = AllocSetContextCreateInternal(TopMemoryContext, - "CommitContext", - 8 * 1024, - 8 * 1024, - 8 * 1024); + CitusXactCallbackContext = AllocSetContextCreateInternal(TopMemoryContext, + "CitusXactCallbackContext", + 8 * 1024, + 8 * 1024, + 8 * 1024); } @@ -274,7 +277,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * * One possible source of errors is memory allocation failures. To minimize * the chance of those happening we've pre-allocated some memory in the - * CommitContext, it has 8kb of memory that we're allowed to use. + * CitusXactCallbackContext, it has 8kb of memory that we're allowed to use. * * We only do this in the COMMIT callback because: * - Errors thrown in other callbacks (such as PRE_COMMIT) won't cause @@ -283,8 +286,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * postgres already creates a TransactionAbortContext which performs this * trick, so there's no need for us to do it again. */ - MemoryContext previousContext = CurrentMemoryContext; - MemoryContextSwitchTo(CommitContext); + MemoryContext previousContext = + MemoryContextSwitchTo(CitusXactCallbackContext); if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) { @@ -322,9 +325,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) PlacementMovedUsingLogicalReplicationInTX = false; - /* empty the CommitContext to ensure we're not leaking memory */ + /* empty the CitusXactCallbackContext to ensure we're not leaking memory */ MemoryContextSwitchTo(previousContext); - MemoryContextReset(CommitContext); + MemoryContextReset(CitusXactCallbackContext); /* Set CreateCitusTransactionLevel to 0 since original transaction is about to be * committed. @@ -381,6 +384,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetGlobalVariables(); ResetRelationAccessHash(); + /* empty the CitusXactCallbackContext to ensure we're not leaking memory */ + MemoryContextReset(CitusXactCallbackContext); + /* * Clear MetadataCache table if we're aborting from a CREATE EXTENSION Citus * so that any created OIDs from the table are cleared and invalidated. We @@ -630,16 +636,25 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, */ case SUBXACT_EVENT_START_SUB: { + MemoryContext previousContext = + MemoryContextSwitchTo(CitusXactCallbackContext); + PushSubXact(subId); if (InCoordinatedTransaction()) { CoordinatedRemoteTransactionsSavepointBegin(subId); } + + MemoryContextSwitchTo(previousContext); + break; } case SUBXACT_EVENT_COMMIT_SUB: { + MemoryContext previousContext = + MemoryContextSwitchTo(CitusXactCallbackContext); + if (InCoordinatedTransaction()) { CoordinatedRemoteTransactionsSavepointRelease(subId); @@ -652,11 +667,17 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, { SetCreateCitusTransactionLevel(GetCitusCreationLevel() - 1); } + + MemoryContextSwitchTo(previousContext); + break; } case SUBXACT_EVENT_ABORT_SUB: { + MemoryContext previousContext = + MemoryContextSwitchTo(CitusXactCallbackContext); + /* * Stop showing message for now, will re-enable when executing * the next statement. @@ -684,6 +705,9 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, InvalidateMetadataSystemCache(); SetCreateCitusTransactionLevel(0); } + + MemoryContextSwitchTo(previousContext); + break; } @@ -731,28 +755,14 @@ AdjustMaxPreparedTransactions(void) static void PushSubXact(SubTransactionId subId) { - /* - * We need to allocate these in TopTransactionContext instead of current - * subxact's memory context. This is because AtSubCommit_Memory won't - * delete the subxact's memory context unless it is empty, and this - * can cause in memory leaks. For emptiness it just checks if the memory - * has been reset, and we cannot reset the subxact context since other - * data can be in the context that are needed by upper commits. - * - * See https://github.com/citusdata/citus/issues/3999 - */ - MemoryContext old_context = MemoryContextSwitchTo(TopTransactionContext); - /* save provided subId as well as propagated SET LOCAL stmts */ SubXactContext *state = palloc(sizeof(SubXactContext)); state->subId = subId; state->setLocalCmds = activeSetStmts; /* append to list and reset active set stmts for upcoming sub-xact */ - activeSubXactContexts = lcons(state, activeSubXactContexts); + activeSubXactContexts = lappend(activeSubXactContexts, state); activeSetStmts = makeStringInfo(); - - MemoryContextSwitchTo(old_context); } @@ -760,7 +770,7 @@ PushSubXact(SubTransactionId subId) static void PopSubXact(SubTransactionId subId) { - SubXactContext *state = linitial(activeSubXactContexts); + SubXactContext *state = llast(activeSubXactContexts); Assert(state->subId == subId); @@ -787,7 +797,7 @@ PopSubXact(SubTransactionId subId) */ pfree(state); - activeSubXactContexts = list_delete_first(activeSubXactContexts); + activeSubXactContexts = list_delete_last(activeSubXactContexts); } @@ -795,19 +805,7 @@ PopSubXact(SubTransactionId subId) List * ActiveSubXactContexts(void) { - List *reversedSubXactStates = NIL; - - /* - * activeSubXactContexts is in reversed temporal order, so we reverse it to get it - * in temporal order. - */ - SubXactContext *state = NULL; - foreach_ptr(state, activeSubXactContexts) - { - reversedSubXactStates = lcons(state, reversedSubXactStates); - } - - return reversedSubXactStates; + return activeSubXactContexts; }