mirror of https://github.com/citusdata/citus.git
Auto-recover 2PC transactions
parent
64a5d5da22
commit
8486f76e15
|
@ -156,7 +156,6 @@ static bool IsAlterTableRenameStmt(RenameStmt *renameStmt);
|
||||||
static bool AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement,
|
static bool AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement,
|
||||||
AlterTableCmd *command);
|
AlterTableCmd *command);
|
||||||
static void ExecuteDistributedDDLJob(DDLJob *ddlJob);
|
static void ExecuteDistributedDDLJob(DDLJob *ddlJob);
|
||||||
static void ShowNoticeIfNotUsing2PC(void);
|
|
||||||
static List * DDLTaskList(Oid relationId, const char *commandString);
|
static List * DDLTaskList(Oid relationId, const char *commandString);
|
||||||
static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt);
|
static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt);
|
||||||
static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt);
|
static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt);
|
||||||
|
@ -169,9 +168,6 @@ static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
|
||||||
static void PostProcessUtility(Node *parsetree);
|
static void PostProcessUtility(Node *parsetree);
|
||||||
|
|
||||||
|
|
||||||
static bool warnedUserAbout2PC = false;
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* multi_ProcessUtility9x is the 9.x-compatible wrapper for Citus' main utility
|
* multi_ProcessUtility9x is the 9.x-compatible wrapper for Citus' main utility
|
||||||
* hook. It simply adapts the old-style hook to call into the new-style (10+)
|
* hook. It simply adapts the old-style hook to call into the new-style (10+)
|
||||||
|
@ -2876,8 +2872,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
||||||
|
|
||||||
if (!ddlJob->concurrentIndexCmd)
|
if (!ddlJob->concurrentIndexCmd)
|
||||||
{
|
{
|
||||||
ShowNoticeIfNotUsing2PC();
|
|
||||||
|
|
||||||
if (shouldSyncMetadata)
|
if (shouldSyncMetadata)
|
||||||
{
|
{
|
||||||
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
|
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
|
||||||
|
@ -2919,25 +2913,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ShowNoticeIfNotUsing2PC shows a notice message about using 2PC by setting
|
|
||||||
* citus.multi_shard_commit_protocol to 2PC. The notice message is shown only once in a
|
|
||||||
* session
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
ShowNoticeIfNotUsing2PC(void)
|
|
||||||
{
|
|
||||||
if (MultiShardCommitProtocol != COMMIT_PROTOCOL_2PC && !warnedUserAbout2PC)
|
|
||||||
{
|
|
||||||
ereport(NOTICE, (errmsg("using one-phase commit for distributed DDL commands"),
|
|
||||||
errhint("You can enable two-phase commit for extra safety with: "
|
|
||||||
"SET citus.multi_shard_commit_protocol TO '2pc'")));
|
|
||||||
|
|
||||||
warnedUserAbout2PC = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DDLTaskList builds a list of tasks to execute a DDL command on a
|
* DDLTaskList builds a list of tasks to execute a DDL command on a
|
||||||
* given list of shards.
|
* given list of shards.
|
||||||
|
|
|
@ -43,6 +43,7 @@
|
||||||
#include "distributed/statistics_collection.h"
|
#include "distributed/statistics_collection.h"
|
||||||
#include "distributed/task_tracker.h"
|
#include "distributed/task_tracker.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
|
#include "distributed/transaction_recovery.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "postmaster/postmaster.h"
|
#include "postmaster/postmaster.h"
|
||||||
|
@ -457,6 +458,20 @@ RegisterCitusConfigVariables(void)
|
||||||
0,
|
0,
|
||||||
ErrorIfNotASuitableDeadlockFactor, NULL, NULL);
|
ErrorIfNotASuitableDeadlockFactor, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomIntVariable(
|
||||||
|
"citus.recover_2pc_interval",
|
||||||
|
gettext_noop("Sets the time to wait between recovering 2PCs."),
|
||||||
|
gettext_noop("2PC transaction recovery needs to run every so often "
|
||||||
|
"to clean up records in pg_dist_transaction and "
|
||||||
|
"potentially roll failed 2PCs forward. This setting "
|
||||||
|
"determines how often recovery should run, "
|
||||||
|
"use -1 to disable."),
|
||||||
|
&Recover2PCInterval,
|
||||||
|
60000, -1, 7*24*3600*1000,
|
||||||
|
PGC_SIGHUP,
|
||||||
|
GUC_UNIT_MS,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.enable_deadlock_prevention",
|
"citus.enable_deadlock_prevention",
|
||||||
gettext_noop("Prevents transactions from expanding to multiple nodes"),
|
gettext_noop("Prevents transactions from expanding to multiple nodes"),
|
||||||
|
@ -670,11 +685,9 @@ RegisterCitusConfigVariables(void)
|
||||||
"citus.multi_shard_commit_protocol",
|
"citus.multi_shard_commit_protocol",
|
||||||
gettext_noop("Sets the commit protocol for commands modifying multiple shards."),
|
gettext_noop("Sets the commit protocol for commands modifying multiple shards."),
|
||||||
gettext_noop("When a failure occurs during commands that modify multiple "
|
gettext_noop("When a failure occurs during commands that modify multiple "
|
||||||
"shards (currently, only COPY on distributed tables modifies more "
|
"shards, two-phase commit is required to ensure data is never lost "
|
||||||
"than one shard), two-phase commit is required to ensure data is "
|
"and this is the default. However, changing to 1pc may give small "
|
||||||
"never lost. Change this setting to '2pc' from its default '1pc' to "
|
"performance benefits."),
|
||||||
"enable 2 PC. You must also set max_prepared_transactions on the "
|
|
||||||
"worker nodes. Recovery from failed 2PCs is currently manual."),
|
|
||||||
&MultiShardCommitProtocol,
|
&MultiShardCommitProtocol,
|
||||||
COMMIT_PROTOCOL_1PC,
|
COMMIT_PROTOCOL_1PC,
|
||||||
multi_shard_commit_protocol_options,
|
multi_shard_commit_protocol_options,
|
||||||
|
|
|
@ -46,7 +46,6 @@ PG_FUNCTION_INFO_V1(recover_prepared_transactions);
|
||||||
|
|
||||||
|
|
||||||
/* Local functions forward declarations */
|
/* Local functions forward declarations */
|
||||||
static int RecoverPreparedTransactions(void);
|
|
||||||
static int RecoverWorkerTransactions(WorkerNode *workerNode);
|
static int RecoverWorkerTransactions(WorkerNode *workerNode);
|
||||||
static List * PendingWorkerTransactionList(MultiConnection *connection);
|
static List * PendingWorkerTransactionList(MultiConnection *connection);
|
||||||
static bool IsTransactionInProgress(HTAB *activeTransactionNumberSet,
|
static bool IsTransactionInProgress(HTAB *activeTransactionNumberSet,
|
||||||
|
@ -66,7 +65,7 @@ recover_prepared_transactions(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
recoveredTransactionCount = RecoverPreparedTransactions();
|
recoveredTransactionCount = RecoverTwoPhaseCommits();
|
||||||
|
|
||||||
PG_RETURN_INT32(recoveredTransactionCount);
|
PG_RETURN_INT32(recoveredTransactionCount);
|
||||||
}
|
}
|
||||||
|
@ -109,11 +108,11 @@ LogTransactionRecord(int groupId, char *transactionName)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RecoverPreparedTransactions recovers any pending prepared
|
* RecoverTwoPhaseCommits recovers any pending prepared
|
||||||
* transactions started by this node on other nodes.
|
* transactions started by this node on other nodes.
|
||||||
*/
|
*/
|
||||||
static int
|
int
|
||||||
RecoverPreparedTransactions(void)
|
RecoverTwoPhaseCommits(void)
|
||||||
{
|
{
|
||||||
List *workerList = NIL;
|
List *workerList = NIL;
|
||||||
ListCell *workerNodeCell = NULL;
|
ListCell *workerNodeCell = NULL;
|
||||||
|
|
|
@ -22,8 +22,10 @@
|
||||||
#include "pgstat.h"
|
#include "pgstat.h"
|
||||||
|
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
|
#include "access/xlog.h"
|
||||||
#include "catalog/pg_extension.h"
|
#include "catalog/pg_extension.h"
|
||||||
#include "citus_version.h"
|
#include "citus_version.h"
|
||||||
|
#include "catalog/pg_namespace.h"
|
||||||
#include "commands/extension.h"
|
#include "commands/extension.h"
|
||||||
#include "libpq/pqsignal.h"
|
#include "libpq/pqsignal.h"
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
|
@ -32,8 +34,10 @@
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/statistics_collection.h"
|
#include "distributed/statistics_collection.h"
|
||||||
|
#include "distributed/transaction_recovery.h"
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
#include "postmaster/bgworker.h"
|
#include "postmaster/bgworker.h"
|
||||||
|
#include "nodes/makefuncs.h"
|
||||||
#include "storage/ipc.h"
|
#include "storage/ipc.h"
|
||||||
#include "storage/proc.h"
|
#include "storage/proc.h"
|
||||||
#include "storage/latch.h"
|
#include "storage/latch.h"
|
||||||
|
@ -41,6 +45,7 @@
|
||||||
#include "storage/lwlock.h"
|
#include "storage/lwlock.h"
|
||||||
#include "tcop/tcopprot.h"
|
#include "tcop/tcopprot.h"
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
|
#include "utils/lsyscache.h"
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -80,6 +85,7 @@ typedef struct MaintenanceDaemonDBData
|
||||||
|
|
||||||
/* config variable for distributed deadlock detection timeout */
|
/* config variable for distributed deadlock detection timeout */
|
||||||
double DistributedDeadlockDetectionTimeoutFactor = 2.0;
|
double DistributedDeadlockDetectionTimeoutFactor = 2.0;
|
||||||
|
int Recover2PCInterval = 60000;
|
||||||
|
|
||||||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||||
static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL;
|
static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL;
|
||||||
|
@ -221,6 +227,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
|
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
|
||||||
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
|
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
|
||||||
ErrorContextCallback errorCallback;
|
ErrorContextCallback errorCallback;
|
||||||
|
TimestampTz lastRecoveryTime = 0;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Look up this worker's configuration.
|
* Look up this worker's configuration.
|
||||||
|
@ -361,9 +368,54 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If enabled, run 2PC recovery on primary nodes (where !RecoveryInProgress()),
|
||||||
|
* since we'll write to the pg_dist_transaction log.
|
||||||
|
*/
|
||||||
|
if (Recover2PCInterval > 0 && !RecoveryInProgress() &&
|
||||||
|
TimestampDifferenceExceeds(lastRecoveryTime, GetCurrentTimestamp(),
|
||||||
|
Recover2PCInterval))
|
||||||
|
{
|
||||||
|
int recoveredTransactionCount = 0;
|
||||||
|
|
||||||
|
InvalidateMetadataSystemCache();
|
||||||
|
StartTransactionCommand();
|
||||||
|
|
||||||
|
if (!LockCitusExtension())
|
||||||
|
{
|
||||||
|
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
|
||||||
|
"skipping 2PC recovery")));
|
||||||
|
}
|
||||||
|
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Record last recovery time at start to ensure we run once per
|
||||||
|
* Recover2PCInterval even if RecoverTwoPhaseCommits takes some time.
|
||||||
|
*/
|
||||||
|
lastRecoveryTime = GetCurrentTimestamp();
|
||||||
|
|
||||||
|
recoveredTransactionCount = RecoverTwoPhaseCommits();
|
||||||
|
}
|
||||||
|
|
||||||
|
CommitTransactionCommand();
|
||||||
|
|
||||||
|
if (recoveredTransactionCount > 0)
|
||||||
|
{
|
||||||
|
ereport(LOG, (errmsg("maintenance daemon recovered %d distributed "
|
||||||
|
"transactions",
|
||||||
|
recoveredTransactionCount)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* make sure we don't wait too long */
|
||||||
|
timeout = Min(timeout, Recover2PCInterval);
|
||||||
|
}
|
||||||
|
|
||||||
/* the config value -1 disables the distributed deadlock detection */
|
/* the config value -1 disables the distributed deadlock detection */
|
||||||
if (DistributedDeadlockDetectionTimeoutFactor != -1.0)
|
if (DistributedDeadlockDetectionTimeoutFactor != -1.0)
|
||||||
{
|
{
|
||||||
|
double deadlockTimeout =
|
||||||
|
DistributedDeadlockDetectionTimeoutFactor * (double) DeadlockTimeout;
|
||||||
|
|
||||||
InvalidateMetadataSystemCache();
|
InvalidateMetadataSystemCache();
|
||||||
StartTransactionCommand();
|
StartTransactionCommand();
|
||||||
|
|
||||||
|
@ -397,13 +449,13 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
* citus.distributed_deadlock_detection_factor 2), we'd be able to cancel
|
* citus.distributed_deadlock_detection_factor 2), we'd be able to cancel
|
||||||
* ~10 distributed deadlocks per second.
|
* ~10 distributed deadlocks per second.
|
||||||
*/
|
*/
|
||||||
timeout =
|
|
||||||
DistributedDeadlockDetectionTimeoutFactor * (double) DeadlockTimeout;
|
|
||||||
|
|
||||||
if (foundDeadlock)
|
if (foundDeadlock)
|
||||||
{
|
{
|
||||||
timeout = timeout / 20.0;
|
deadlockTimeout = deadlockTimeout / 20.0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* make sure we don't wait too long */
|
||||||
|
timeout = Min(timeout, deadlockTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -12,8 +12,13 @@
|
||||||
#define TRANSACTION_RECOVERY_H
|
#define TRANSACTION_RECOVERY_H
|
||||||
|
|
||||||
|
|
||||||
|
/* GUC to configure interval for 2PC auto-recovery */
|
||||||
|
extern int Recover2PCInterval;
|
||||||
|
|
||||||
|
|
||||||
/* Functions declarations for worker transactions */
|
/* Functions declarations for worker transactions */
|
||||||
extern void LogTransactionRecord(int groupId, char *transactionName);
|
extern void LogTransactionRecord(int groupId, char *transactionName);
|
||||||
|
extern int RecoverTwoPhaseCommits(void);
|
||||||
|
|
||||||
|
|
||||||
#endif /* TRANSACTION_RECOVERY_H */
|
#endif /* TRANSACTION_RECOVERY_H */
|
||||||
|
|
|
@ -99,7 +99,7 @@ SELECT count(*) FROM pg_dist_transaction;
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Committed DDL commands should write 4 transaction recovery records
|
-- Aborted DDL commands should not write transaction recovery records
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE test_recovery ADD COLUMN y text;
|
ALTER TABLE test_recovery ADD COLUMN y text;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
@ -109,6 +109,7 @@ SELECT count(*) FROM pg_dist_transaction;
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- Committed DDL commands should write 4 transaction recovery records
|
||||||
ALTER TABLE test_recovery ADD COLUMN y text;
|
ALTER TABLE test_recovery ADD COLUMN y text;
|
||||||
SELECT count(*) FROM pg_dist_transaction;
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
count
|
count
|
||||||
|
@ -167,7 +168,7 @@ SELECT count(*) FROM pg_dist_transaction;
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Committed INSERT..SELECT should write 4 transaction recovery records
|
-- Aborted INSERT..SELECT should not write transaction recovery records
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
|
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
@ -177,6 +178,7 @@ SELECT count(*) FROM pg_dist_transaction;
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- Committed INSERT..SELECT should write 4 transaction recovery records
|
||||||
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
|
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
|
||||||
SELECT count(*) FROM pg_dist_transaction;
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
count
|
count
|
||||||
|
|
|
@ -56,12 +56,13 @@ SELECT recover_prepared_transactions();
|
||||||
INSERT INTO test_recovery VALUES ('hello');
|
INSERT INTO test_recovery VALUES ('hello');
|
||||||
SELECT count(*) FROM pg_dist_transaction;
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
-- Committed DDL commands should write 4 transaction recovery records
|
-- Aborted DDL commands should not write transaction recovery records
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE test_recovery ADD COLUMN y text;
|
ALTER TABLE test_recovery ADD COLUMN y text;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
SELECT count(*) FROM pg_dist_transaction;
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
|
-- Committed DDL commands should write 4 transaction recovery records
|
||||||
ALTER TABLE test_recovery ADD COLUMN y text;
|
ALTER TABLE test_recovery ADD COLUMN y text;
|
||||||
|
|
||||||
SELECT count(*) FROM pg_dist_transaction;
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
@ -80,12 +81,13 @@ SELECT count(*) FROM pg_dist_transaction;
|
||||||
SELECT recover_prepared_transactions();
|
SELECT recover_prepared_transactions();
|
||||||
SELECT count(*) FROM pg_dist_transaction;
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
-- Committed INSERT..SELECT should write 4 transaction recovery records
|
-- Aborted INSERT..SELECT should not write transaction recovery records
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
|
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
SELECT count(*) FROM pg_dist_transaction;
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
|
-- Committed INSERT..SELECT should write 4 transaction recovery records
|
||||||
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
|
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
|
||||||
|
|
||||||
SELECT count(*) FROM pg_dist_transaction;
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
Loading…
Reference in New Issue