Use CommitContext for subxact mgmt and reduce memory usage in CommitContext (#6099)

(Hopefully) Fixes #5000.

If memory allocation done for `SubXactContext *state` in `PushSubXact()`
fails, then `PopSubXact()` might segfault, for example, when grabbing
the
topmost `SubXactContext` from `activeSubXactContexts` if this is the
first
ever subxact within the current xact, with the following stack trace:
```c
citus.so!list_nth_cell(const List * list, int n) (\opt\pgenv\pgsql-14.3\include\server\nodes\pg_list.h:260)
citus.so!PopSubXact(SubTransactionId subId) (\home\onurctirtir\citus\src\backend\distributed\transaction\transaction_management.c:761)
citus.so!CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, SubTransactionId parentSubid, void * arg) (\home\onurctirtir\citus\src\backend\distributed\transaction\transaction_management.c:673)
CallSubXactCallbacks(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid) (\opt\pgenv\src\postgresql-14.3\src\backend\access\transam\xact.c:3644)
AbortSubTransaction() (\opt\pgenv\src\postgresql-14.3\src\backend\access\transam\xact.c:5058)
AbortCurrentTransaction() (\opt\pgenv\src\postgresql-14.3\src\backend\access\transam\xact.c:3366)
PostgresMain(int argc, char ** argv, const char * dbname, const char * username) (\opt\pgenv\src\postgresql-14.3\src\backend\tcop\postgres.c:4250)
BackendRun(Port * port) (\opt\pgenv\src\postgresql-14.3\src\backend\postmaster\postmaster.c:4530)
BackendStartup(Port * port) (\opt\pgenv\src\postgresql-14.3\src\backend\postmaster\postmaster.c:4252)
ServerLoop() (\opt\pgenv\src\postgresql-14.3\src\backend\postmaster\postmaster.c:1745)
PostmasterMain(int argc, char ** argv) (\opt\pgenv\src\postgresql-14.3\src\backend\postmaster\postmaster.c:1417)
main(int argc, char ** argv) (\opt\pgenv\src\postgresql-14.3\src\backend\main\main.c:209)
```

For this reason, to be more defensive against memory-allocation errors
that could happen at `PushSubXact()`, now we use our pre-allocated
memory
context for the objects created in `PushSubXact()`.

This commit also attempts reducing the memory allocations done under
CommitContext to reduce the chances of consuming all the memory
available
to CommitContext.

Note that it's problematic to encounter with such a memory-allocation
error for other objects created in `PushSubXact()` as well, so above is
an **example** scenario that might result in a segfault.

DESCRIPTION: Fixes a bug that might cause segfaults when handling deeply
nested subtransactions
pull/6469/head
Onur Tirtir 2022-11-03 00:57:32 +03:00 committed by GitHub
parent a5f7f001b0
commit 1af28b3f27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 137 additions and 72 deletions

View File

@ -3,6 +3,11 @@
* remote_transaction.c
* Management of transaction spanning more than one node.
*
* Since the functions defined in this file mostly allocate in
* CitusXactCallbackContext, we mostly try doing allocations on stack.
* And when it's hard to do so, we at least try freeing the heap memory
* immediately after an object becomes useless.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
@ -106,11 +111,21 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
appendStringInfoString(beginAndSetDistributedTransactionId, activeSetStmts->data);
}
char *assignDistributedTransactionIdCommand = AssignDistributedTransactionIdCommand();
/* add SELECT assign_distributed_transaction_id ... */
appendStringInfoString(beginAndSetDistributedTransactionId,
AssignDistributedTransactionIdCommand());
assignDistributedTransactionIdCommand);
if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data))
pfree(assignDistributedTransactionIdCommand);
bool success = SendRemoteCommand(connection,
beginAndSetDistributedTransactionId->data);
pfree(beginAndSetDistributedTransactionId->data);
pfree(beginAndSetDistributedTransactionId);
if (!success)
{
const bool raiseErrors = true;
@ -169,7 +184,11 @@ AssignDistributedTransactionIdCommand(void)
distributedTransactionId->transactionNumber,
timestamp);
return assignDistributedTransactionId->data;
/* free the StringInfo but not the buffer itself */
char *command = assignDistributedTransactionId->data;
pfree(assignDistributedTransactionId);
return command;
}
@ -268,16 +287,21 @@ StartRemoteTransactionCommit(MultiConnection *connection)
}
else if (transaction->transactionState == REMOTE_TRANS_PREPARED)
{
/* commit the prepared transaction */
StringInfoData command;
initStringInfo(&command);
appendStringInfo(&command, "COMMIT PREPARED %s",
quote_literal_cstr(transaction->preparedName));
/*
* Commit the prepared transaction.
*
* We need to allocate 420 bytes for command buffer (including '\0'):
* - len("COMMIT PREPARED ") = 16
* - maximum quoted length of transaction->preparedName = 2 * 200 + 3 = 403
*/
char command[420];
char *quotedPrepName = quote_literal_cstr(transaction->preparedName);
SafeSnprintf(command, sizeof(command), "COMMIT PREPARED %s", quotedPrepName);
pfree(quotedPrepName);
transaction->transactionState = REMOTE_TRANS_2PC_COMMITTING;
if (!SendRemoteCommand(connection, command.data))
if (!SendRemoteCommand(connection, command))
{
HandleRemoteTransactionConnectionError(connection, raiseErrors);
}
@ -395,16 +419,21 @@ StartRemoteTransactionAbort(MultiConnection *connection)
if (transaction->transactionState == REMOTE_TRANS_PREPARING ||
transaction->transactionState == REMOTE_TRANS_PREPARED)
{
StringInfoData command;
/* await PREPARE TRANSACTION results, closing the connection would leave it dangling */
ForgetResults(connection);
initStringInfo(&command);
appendStringInfo(&command, "ROLLBACK PREPARED %s",
quote_literal_cstr(transaction->preparedName));
/*
* Await PREPARE TRANSACTION results, closing the connection would leave it dangling.
*
* We need to allocate 422 bytes for command buffer (including '\0'):
* - len("ROLLBACK PREPARED ") = 18
* - maximum quoted length of transaction->preparedName = 2 * 200 + 3 = 403
*/
char command[422];
char *quotedPrepName = quote_literal_cstr(transaction->preparedName);
SafeSnprintf(command, sizeof(command), "ROLLBACK PREPARED %s", quotedPrepName);
pfree(quotedPrepName);
if (!SendRemoteCommand(connection, command.data))
if (!SendRemoteCommand(connection, command))
{
HandleRemoteTransactionConnectionError(connection, raiseErrors);
}
@ -498,7 +527,6 @@ void
StartRemoteTransactionPrepare(struct MultiConnection *connection)
{
RemoteTransaction *transaction = &connection->remoteTransaction;
StringInfoData command;
const bool raiseErrors = true;
/* can't prepare a nonexistant transaction */
@ -519,11 +547,17 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection)
LogTransactionRecord(workerNode->groupId, transaction->preparedName);
}
initStringInfo(&command);
appendStringInfo(&command, "PREPARE TRANSACTION %s",
quote_literal_cstr(transaction->preparedName));
/*
* We need to allocate 424 bytes for command buffer (including '\0'):
* - len("PREPARE TRANSACTION ") = 20
* - maximum quoted length of transaction->preparedName = 2 * 200 + 3 = 403
*/
char command[424];
char *quotedPrepName = quote_literal_cstr(transaction->preparedName);
SafeSnprintf(command, sizeof(command), "PREPARE TRANSACTION %s", quotedPrepName);
pfree(quotedPrepName);
if (!SendRemoteCommand(connection, command.data))
if (!SendRemoteCommand(connection, command))
{
HandleRemoteTransactionConnectionError(connection, raiseErrors);
}
@ -841,6 +875,8 @@ CoordinatedRemoteTransactionsPrepare(void)
}
CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
list_free(connectionList);
}
@ -905,6 +941,8 @@ CoordinatedRemoteTransactionsCommit(void)
FinishRemoteTransactionCommit(connection);
}
list_free(connectionList);
}
@ -958,6 +996,8 @@ CoordinatedRemoteTransactionsAbort(void)
FinishRemoteTransactionAbort(connection);
}
list_free(connectionList);
}
@ -1008,6 +1048,8 @@ CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId)
transaction->lastSuccessfulSubXact = subId;
}
}
list_free(connectionList);
}
@ -1053,6 +1095,8 @@ CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId)
FinishRemoteTransactionSavepointRelease(connection, subId);
}
list_free(connectionList);
}
@ -1134,6 +1178,8 @@ CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId)
*/
UnclaimConnection(connection);
}
list_free(connectionList);
}
@ -1145,10 +1191,17 @@ static void
StartRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactionId subId)
{
const bool raiseErrors = true;
StringInfo savepointCommand = makeStringInfo();
appendStringInfo(savepointCommand, "SAVEPOINT savepoint_%u", subId);
if (!SendRemoteCommand(connection, savepointCommand->data))
/*
* We need to allocate 31 bytes for command buffer (including '\0'):
* - len("SAVEPOINT savepoint_") = 20
* - maximum length of str(subId) = 10
*/
char savepointCommand[31];
SafeSnprintf(savepointCommand, sizeof(savepointCommand), "SAVEPOINT savepoint_%u",
subId);
if (!SendRemoteCommand(connection, savepointCommand))
{
HandleRemoteTransactionConnectionError(connection, raiseErrors);
}
@ -1184,10 +1237,17 @@ StartRemoteTransactionSavepointRelease(MultiConnection *connection,
SubTransactionId subId)
{
const bool raiseErrors = true;
StringInfo savepointCommand = makeStringInfo();
appendStringInfo(savepointCommand, "RELEASE SAVEPOINT savepoint_%u", subId);
if (!SendRemoteCommand(connection, savepointCommand->data))
/*
* We need to allocate 39 bytes for command buffer (including '\0'):
* - len("RELEASE SAVEPOINT savepoint_") = 28
* - maximum length of str(subId) = 10
*/
char savepointCommand[39];
SafeSnprintf(savepointCommand, sizeof(savepointCommand),
"RELEASE SAVEPOINT savepoint_%u", subId);
if (!SendRemoteCommand(connection, savepointCommand))
{
HandleRemoteTransactionConnectionError(connection, raiseErrors);
}
@ -1224,10 +1284,17 @@ StartRemoteTransactionSavepointRollback(MultiConnection *connection,
SubTransactionId subId)
{
const bool raiseErrors = false;
StringInfo savepointCommand = makeStringInfo();
appendStringInfo(savepointCommand, "ROLLBACK TO SAVEPOINT savepoint_%u", subId);
if (!SendRemoteCommand(connection, savepointCommand->data))
/*
* We need to allocate 43 bytes for command buffer (including '\0'):
* - len("ROLLBACK TO SAVEPOINT savepoint_") = 32
* - maximum length of str(subId) = 10
*/
char savepointCommand[43];
SafeSnprintf(savepointCommand, sizeof(savepointCommand),
"ROLLBACK TO SAVEPOINT savepoint_%u", subId);
if (!SendRemoteCommand(connection, savepointCommand))
{
HandleRemoteTransactionConnectionError(connection, raiseErrors);
}

View File

@ -90,11 +90,14 @@ StringInfo activeSetStmts;
* PostgreSQL with a sub-xact callback). At present, the context of a subxact
* includes a subxact identifier as well as any SET LOCAL statements propagated
* to workers during the sub-transaction.
*
* To be clear, last item of activeSubXactContexts list corresponds to top of
* stack.
*/
static List *activeSubXactContexts = NIL;
/* some pre-allocated memory so we don't need to call malloc() during callbacks */
MemoryContext CommitContext = NULL;
MemoryContext CitusXactCallbackContext = NULL;
/*
* Should this coordinated transaction use 2PC? Set by
@ -245,11 +248,11 @@ InitializeTransactionManagement(void)
AdjustMaxPreparedTransactions();
/* set aside 8kb of memory for use in CoordinatedTransactionCallback */
CommitContext = AllocSetContextCreateInternal(TopMemoryContext,
"CommitContext",
8 * 1024,
8 * 1024,
8 * 1024);
CitusXactCallbackContext = AllocSetContextCreateInternal(TopMemoryContext,
"CitusXactCallbackContext",
8 * 1024,
8 * 1024,
8 * 1024);
}
@ -274,7 +277,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
*
* One possible source of errors is memory allocation failures. To minimize
* the chance of those happening we've pre-allocated some memory in the
* CommitContext, it has 8kb of memory that we're allowed to use.
* CitusXactCallbackContext, it has 8kb of memory that we're allowed to use.
*
* We only do this in the COMMIT callback because:
* - Errors thrown in other callbacks (such as PRE_COMMIT) won't cause
@ -283,8 +286,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* postgres already creates a TransactionAbortContext which performs this
* trick, so there's no need for us to do it again.
*/
MemoryContext previousContext = CurrentMemoryContext;
MemoryContextSwitchTo(CommitContext);
MemoryContext previousContext =
MemoryContextSwitchTo(CitusXactCallbackContext);
if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED)
{
@ -322,9 +325,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
PlacementMovedUsingLogicalReplicationInTX = false;
/* empty the CommitContext to ensure we're not leaking memory */
/* empty the CitusXactCallbackContext to ensure we're not leaking memory */
MemoryContextSwitchTo(previousContext);
MemoryContextReset(CommitContext);
MemoryContextReset(CitusXactCallbackContext);
/* Set CreateCitusTransactionLevel to 0 since original transaction is about to be
* committed.
@ -381,6 +384,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
ResetGlobalVariables();
ResetRelationAccessHash();
/* empty the CitusXactCallbackContext to ensure we're not leaking memory */
MemoryContextReset(CitusXactCallbackContext);
/*
* Clear MetadataCache table if we're aborting from a CREATE EXTENSION Citus
* so that any created OIDs from the table are cleared and invalidated. We
@ -630,16 +636,25 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
*/
case SUBXACT_EVENT_START_SUB:
{
MemoryContext previousContext =
MemoryContextSwitchTo(CitusXactCallbackContext);
PushSubXact(subId);
if (InCoordinatedTransaction())
{
CoordinatedRemoteTransactionsSavepointBegin(subId);
}
MemoryContextSwitchTo(previousContext);
break;
}
case SUBXACT_EVENT_COMMIT_SUB:
{
MemoryContext previousContext =
MemoryContextSwitchTo(CitusXactCallbackContext);
if (InCoordinatedTransaction())
{
CoordinatedRemoteTransactionsSavepointRelease(subId);
@ -652,11 +667,17 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
{
SetCreateCitusTransactionLevel(GetCitusCreationLevel() - 1);
}
MemoryContextSwitchTo(previousContext);
break;
}
case SUBXACT_EVENT_ABORT_SUB:
{
MemoryContext previousContext =
MemoryContextSwitchTo(CitusXactCallbackContext);
/*
* Stop showing message for now, will re-enable when executing
* the next statement.
@ -684,6 +705,9 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
InvalidateMetadataSystemCache();
SetCreateCitusTransactionLevel(0);
}
MemoryContextSwitchTo(previousContext);
break;
}
@ -731,28 +755,14 @@ AdjustMaxPreparedTransactions(void)
static void
PushSubXact(SubTransactionId subId)
{
/*
* We need to allocate these in TopTransactionContext instead of current
* subxact's memory context. This is because AtSubCommit_Memory won't
* delete the subxact's memory context unless it is empty, and this
* can cause in memory leaks. For emptiness it just checks if the memory
* has been reset, and we cannot reset the subxact context since other
* data can be in the context that are needed by upper commits.
*
* See https://github.com/citusdata/citus/issues/3999
*/
MemoryContext old_context = MemoryContextSwitchTo(TopTransactionContext);
/* save provided subId as well as propagated SET LOCAL stmts */
SubXactContext *state = palloc(sizeof(SubXactContext));
state->subId = subId;
state->setLocalCmds = activeSetStmts;
/* append to list and reset active set stmts for upcoming sub-xact */
activeSubXactContexts = lcons(state, activeSubXactContexts);
activeSubXactContexts = lappend(activeSubXactContexts, state);
activeSetStmts = makeStringInfo();
MemoryContextSwitchTo(old_context);
}
@ -760,7 +770,7 @@ PushSubXact(SubTransactionId subId)
static void
PopSubXact(SubTransactionId subId)
{
SubXactContext *state = linitial(activeSubXactContexts);
SubXactContext *state = llast(activeSubXactContexts);
Assert(state->subId == subId);
@ -787,7 +797,7 @@ PopSubXact(SubTransactionId subId)
*/
pfree(state);
activeSubXactContexts = list_delete_first(activeSubXactContexts);
activeSubXactContexts = list_delete_last(activeSubXactContexts);
}
@ -795,19 +805,7 @@ PopSubXact(SubTransactionId subId)
List *
ActiveSubXactContexts(void)
{
List *reversedSubXactStates = NIL;
/*
* activeSubXactContexts is in reversed temporal order, so we reverse it to get it
* in temporal order.
*/
SubXactContext *state = NULL;
foreach_ptr(state, activeSubXactContexts)
{
reversedSubXactStates = lcons(state, reversedSubXactStates);
}
return reversedSubXactStates;
return activeSubXactContexts;
}