/*------------------------------------------------------------------------- * * transaction_management.c * * Transaction management for Citus. Most of the work is delegated to other * subsystems, this files, and especially CoordinatedTransactionCallback, * coordinates the work between them. * * Copyright (c) 2016, Citus Data, Inc. * *------------------------------------------------------------------------- */ #include "postgres.h" #include "libpq-fe.h" #include "miscadmin.h" #include "access/twophase.h" #include "access/xact.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" #include "distributed/transaction_management.h" #include "utils/hsearch.h" #include "utils/guc.h" CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; /* GUC, the commit protocol to use for commands affecting more than one connection */ int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC; /* state needed to keep track of operations used during a transaction */ XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; static bool subXactAbortAttempted = false; /* transaction management functions */ static void CoordinatedTransactionCallback(XactEvent event, void *arg); static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, SubTransactionId parentSubid, void *arg); /* remaining functions */ static void AdjustMaxPreparedTransactions(void); void InitializeTransactionManagement(void) { /* hook into transaction machinery */ RegisterXactCallback(CoordinatedTransactionCallback, NULL); RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL); AdjustMaxPreparedTransactions(); } /* * Transaction management callback, handling coordinated transaction, and * transaction independent connection management. * * NB: There should only ever be a single transaction callback in citus, the * ordering between the callbacks and thee actions within those callbacks * otherwise becomes too undeterministic / hard to reason about. */ static void CoordinatedTransactionCallback(XactEvent event, void *arg) { switch (event) { case XACT_EVENT_COMMIT: { /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { AfterXactConnectionHandling(true); } Assert(!subXactAbortAttempted); CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; } break; case XACT_EVENT_ABORT: { /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { AfterXactConnectionHandling(false); } CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; subXactAbortAttempted = false; } break; case XACT_EVENT_PARALLEL_COMMIT: case XACT_EVENT_PARALLEL_ABORT: case XACT_EVENT_PREPARE: { } break; case XACT_EVENT_PRE_COMMIT: { if (subXactAbortAttempted) { subXactAbortAttempted = false; if (XactModificationLevel != XACT_MODIFICATION_NONE) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot ROLLBACK TO SAVEPOINT in transactions " "which modify distributed tables"))); } } } break; case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PRE_PREPARE: { if (CurrentCoordinatedTransactionState > COORD_TRANS_NONE) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot use 2PC in transactions involving " "multiple servers"))); } } break; } } /* * Subtransaction callback - currently only used to remember whether a * savepoint has been rolled back, as we don't support that. */ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, SubTransactionId parentSubid, void *arg) { if (event == SUBXACT_EVENT_ABORT_SUB) { subXactAbortAttempted = true; } } /* * AdjustMaxPreparedTransactions configures the number of available prepared * transaction slots at startup. */ static void AdjustMaxPreparedTransactions(void) { /* * As Citus uses 2PC internally, there always should be some available. As * the default is 0, we increase it to something appropriate * (connections * 2 currently). If the user explicitly configured 2PC, we * leave the configuration alone - there might have been intent behind the * decision. */ if (max_prepared_xacts == 0) { char newvalue[12]; snprintf(newvalue, sizeof(newvalue), "%d", MaxConnections * 2); SetConfigOption("max_prepared_transactions", newvalue, PGC_POSTMASTER, PGC_S_OVERRIDE); ereport(LOG, (errmsg("number of prepared transactions has not been " "configured, overriding"), errdetail("max_prepared_transactions is now set to %s", newvalue))); } }