From 8486f76e153d960891d7dedfe0e011388a76991a Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 14 Aug 2017 21:47:35 +0200 Subject: [PATCH] Auto-recover 2PC transactions --- .../distributed/executor/multi_utility.c | 25 -------- src/backend/distributed/shared_library_init.c | 23 +++++-- .../transaction/transaction_recovery.c | 9 ++- src/backend/distributed/utils/maintenanced.c | 60 +++++++++++++++++-- .../distributed/transaction_recovery.h | 5 ++ .../expected/multi_transaction_recovery.out | 6 +- .../sql/multi_transaction_recovery.sql | 6 +- 7 files changed, 91 insertions(+), 43 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index ab6e5ac83..0275c3d03 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -156,7 +156,6 @@ static bool IsAlterTableRenameStmt(RenameStmt *renameStmt); static bool AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement, AlterTableCmd *command); static void ExecuteDistributedDDLJob(DDLJob *ddlJob); -static void ShowNoticeIfNotUsing2PC(void); static List * DDLTaskList(Oid relationId, const char *commandString); static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt); static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt); @@ -169,9 +168,6 @@ static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) static void PostProcessUtility(Node *parsetree); -static bool warnedUserAbout2PC = false; - - /* * multi_ProcessUtility9x is the 9.x-compatible wrapper for Citus' main utility * hook. It simply adapts the old-style hook to call into the new-style (10+) @@ -2876,8 +2872,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) if (!ddlJob->concurrentIndexCmd) { - ShowNoticeIfNotUsing2PC(); - if (shouldSyncMetadata) { SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); @@ -2919,25 +2913,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) } -/* - * ShowNoticeIfNotUsing2PC shows a notice message about using 2PC by setting - * citus.multi_shard_commit_protocol to 2PC. The notice message is shown only once in a - * session - */ -static void -ShowNoticeIfNotUsing2PC(void) -{ - if (MultiShardCommitProtocol != COMMIT_PROTOCOL_2PC && !warnedUserAbout2PC) - { - ereport(NOTICE, (errmsg("using one-phase commit for distributed DDL commands"), - errhint("You can enable two-phase commit for extra safety with: " - "SET citus.multi_shard_commit_protocol TO '2pc'"))); - - warnedUserAbout2PC = true; - } -} - - /* * DDLTaskList builds a list of tasks to execute a DDL command on a * given list of shards. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 070a5980e..bd123627e 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -43,6 +43,7 @@ #include "distributed/statistics_collection.h" #include "distributed/task_tracker.h" #include "distributed/transaction_management.h" +#include "distributed/transaction_recovery.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "postmaster/postmaster.h" @@ -457,6 +458,20 @@ RegisterCitusConfigVariables(void) 0, ErrorIfNotASuitableDeadlockFactor, NULL, NULL); + DefineCustomIntVariable( + "citus.recover_2pc_interval", + gettext_noop("Sets the time to wait between recovering 2PCs."), + gettext_noop("2PC transaction recovery needs to run every so often " + "to clean up records in pg_dist_transaction and " + "potentially roll failed 2PCs forward. This setting " + "determines how often recovery should run, " + "use -1 to disable."), + &Recover2PCInterval, + 60000, -1, 7*24*3600*1000, + PGC_SIGHUP, + GUC_UNIT_MS, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_deadlock_prevention", gettext_noop("Prevents transactions from expanding to multiple nodes"), @@ -670,11 +685,9 @@ RegisterCitusConfigVariables(void) "citus.multi_shard_commit_protocol", gettext_noop("Sets the commit protocol for commands modifying multiple shards."), gettext_noop("When a failure occurs during commands that modify multiple " - "shards (currently, only COPY on distributed tables modifies more " - "than one shard), two-phase commit is required to ensure data is " - "never lost. Change this setting to '2pc' from its default '1pc' to " - "enable 2 PC. You must also set max_prepared_transactions on the " - "worker nodes. Recovery from failed 2PCs is currently manual."), + "shards, two-phase commit is required to ensure data is never lost " + "and this is the default. However, changing to 1pc may give small " + "performance benefits."), &MultiShardCommitProtocol, COMMIT_PROTOCOL_1PC, multi_shard_commit_protocol_options, diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 38841e8e3..ffec8bb69 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -46,7 +46,6 @@ PG_FUNCTION_INFO_V1(recover_prepared_transactions); /* Local functions forward declarations */ -static int RecoverPreparedTransactions(void); static int RecoverWorkerTransactions(WorkerNode *workerNode); static List * PendingWorkerTransactionList(MultiConnection *connection); static bool IsTransactionInProgress(HTAB *activeTransactionNumberSet, @@ -66,7 +65,7 @@ recover_prepared_transactions(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); - recoveredTransactionCount = RecoverPreparedTransactions(); + recoveredTransactionCount = RecoverTwoPhaseCommits(); PG_RETURN_INT32(recoveredTransactionCount); } @@ -109,11 +108,11 @@ LogTransactionRecord(int groupId, char *transactionName) /* - * RecoverPreparedTransactions recovers any pending prepared + * RecoverTwoPhaseCommits recovers any pending prepared * transactions started by this node on other nodes. */ -static int -RecoverPreparedTransactions(void) +int +RecoverTwoPhaseCommits(void) { List *workerList = NIL; ListCell *workerNodeCell = NULL; diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 17c35a08d..6d4f34612 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -22,8 +22,10 @@ #include "pgstat.h" #include "access/xact.h" +#include "access/xlog.h" #include "catalog/pg_extension.h" #include "citus_version.h" +#include "catalog/pg_namespace.h" #include "commands/extension.h" #include "libpq/pqsignal.h" #include "catalog/namespace.h" @@ -32,8 +34,10 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/statistics_collection.h" +#include "distributed/transaction_recovery.h" #include "nodes/makefuncs.h" #include "postmaster/bgworker.h" +#include "nodes/makefuncs.h" #include "storage/ipc.h" #include "storage/proc.h" #include "storage/latch.h" @@ -41,6 +45,7 @@ #include "storage/lwlock.h" #include "tcop/tcopprot.h" #include "utils/memutils.h" +#include "utils/lsyscache.h" /* @@ -80,6 +85,7 @@ typedef struct MaintenanceDaemonDBData /* config variable for distributed deadlock detection timeout */ double DistributedDeadlockDetectionTimeoutFactor = 2.0; +int Recover2PCInterval = 60000; static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL; @@ -221,6 +227,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000); bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false; ErrorContextCallback errorCallback; + TimestampTz lastRecoveryTime = 0; /* * Look up this worker's configuration. @@ -361,9 +368,54 @@ CitusMaintenanceDaemonMain(Datum main_arg) } #endif + /* + * If enabled, run 2PC recovery on primary nodes (where !RecoveryInProgress()), + * since we'll write to the pg_dist_transaction log. + */ + if (Recover2PCInterval > 0 && !RecoveryInProgress() && + TimestampDifferenceExceeds(lastRecoveryTime, GetCurrentTimestamp(), + Recover2PCInterval)) + { + int recoveredTransactionCount = 0; + + InvalidateMetadataSystemCache(); + StartTransactionCommand(); + + if (!LockCitusExtension()) + { + ereport(DEBUG1, (errmsg("could not lock the citus extension, " + "skipping 2PC recovery"))); + } + else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) + { + /* + * Record last recovery time at start to ensure we run once per + * Recover2PCInterval even if RecoverTwoPhaseCommits takes some time. + */ + lastRecoveryTime = GetCurrentTimestamp(); + + recoveredTransactionCount = RecoverTwoPhaseCommits(); + } + + CommitTransactionCommand(); + + if (recoveredTransactionCount > 0) + { + ereport(LOG, (errmsg("maintenance daemon recovered %d distributed " + "transactions", + recoveredTransactionCount))); + } + + /* make sure we don't wait too long */ + timeout = Min(timeout, Recover2PCInterval); + } + /* the config value -1 disables the distributed deadlock detection */ if (DistributedDeadlockDetectionTimeoutFactor != -1.0) { + double deadlockTimeout = + DistributedDeadlockDetectionTimeoutFactor * (double) DeadlockTimeout; + InvalidateMetadataSystemCache(); StartTransactionCommand(); @@ -397,13 +449,13 @@ CitusMaintenanceDaemonMain(Datum main_arg) * citus.distributed_deadlock_detection_factor 2), we'd be able to cancel * ~10 distributed deadlocks per second. */ - timeout = - DistributedDeadlockDetectionTimeoutFactor * (double) DeadlockTimeout; - if (foundDeadlock) { - timeout = timeout / 20.0; + deadlockTimeout = deadlockTimeout / 20.0; } + + /* make sure we don't wait too long */ + timeout = Min(timeout, deadlockTimeout); } /* diff --git a/src/include/distributed/transaction_recovery.h b/src/include/distributed/transaction_recovery.h index d204ef753..9f359fc18 100644 --- a/src/include/distributed/transaction_recovery.h +++ b/src/include/distributed/transaction_recovery.h @@ -12,8 +12,13 @@ #define TRANSACTION_RECOVERY_H +/* GUC to configure interval for 2PC auto-recovery */ +extern int Recover2PCInterval; + + /* Functions declarations for worker transactions */ extern void LogTransactionRecord(int groupId, char *transactionName); +extern int RecoverTwoPhaseCommits(void); #endif /* TRANSACTION_RECOVERY_H */ diff --git a/src/test/regress/expected/multi_transaction_recovery.out b/src/test/regress/expected/multi_transaction_recovery.out index 245160e9b..3a0766aca 100644 --- a/src/test/regress/expected/multi_transaction_recovery.out +++ b/src/test/regress/expected/multi_transaction_recovery.out @@ -99,7 +99,7 @@ SELECT count(*) FROM pg_dist_transaction; 0 (1 row) --- Committed DDL commands should write 4 transaction recovery records +-- Aborted DDL commands should not write transaction recovery records BEGIN; ALTER TABLE test_recovery ADD COLUMN y text; ROLLBACK; @@ -109,6 +109,7 @@ SELECT count(*) FROM pg_dist_transaction; 0 (1 row) +-- Committed DDL commands should write 4 transaction recovery records ALTER TABLE test_recovery ADD COLUMN y text; SELECT count(*) FROM pg_dist_transaction; count @@ -167,7 +168,7 @@ SELECT count(*) FROM pg_dist_transaction; 0 (1 row) --- Committed INSERT..SELECT should write 4 transaction recovery records +-- Aborted INSERT..SELECT should not write transaction recovery records BEGIN; INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery; ROLLBACK; @@ -177,6 +178,7 @@ SELECT count(*) FROM pg_dist_transaction; 0 (1 row) +-- Committed INSERT..SELECT should write 4 transaction recovery records INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery; SELECT count(*) FROM pg_dist_transaction; count diff --git a/src/test/regress/sql/multi_transaction_recovery.sql b/src/test/regress/sql/multi_transaction_recovery.sql index 02960fa96..4bea3ea98 100644 --- a/src/test/regress/sql/multi_transaction_recovery.sql +++ b/src/test/regress/sql/multi_transaction_recovery.sql @@ -56,12 +56,13 @@ SELECT recover_prepared_transactions(); INSERT INTO test_recovery VALUES ('hello'); SELECT count(*) FROM pg_dist_transaction; --- Committed DDL commands should write 4 transaction recovery records +-- Aborted DDL commands should not write transaction recovery records BEGIN; ALTER TABLE test_recovery ADD COLUMN y text; ROLLBACK; SELECT count(*) FROM pg_dist_transaction; +-- Committed DDL commands should write 4 transaction recovery records ALTER TABLE test_recovery ADD COLUMN y text; SELECT count(*) FROM pg_dist_transaction; @@ -80,12 +81,13 @@ SELECT count(*) FROM pg_dist_transaction; SELECT recover_prepared_transactions(); SELECT count(*) FROM pg_dist_transaction; --- Committed INSERT..SELECT should write 4 transaction recovery records +-- Aborted INSERT..SELECT should not write transaction recovery records BEGIN; INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery; ROLLBACK; SELECT count(*) FROM pg_dist_transaction; +-- Committed INSERT..SELECT should write 4 transaction recovery records INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery; SELECT count(*) FROM pg_dist_transaction;