mirror of https://github.com/citusdata/citus.git
Merge pull request #471 from citusdata/rename_copy_transaction_manager
Rename citus.copy_transaction_manager to citus.multi_shard_commit_protocolpull/472/head^2
commit
66eab4e629
|
@ -25,7 +25,7 @@
|
||||||
* hash or range-partitioned tables, this can cause a problem when some of the
|
* hash or range-partitioned tables, this can cause a problem when some of the
|
||||||
* transactions fail to commit while others have succeeded. To ensure no data
|
* transactions fail to commit while others have succeeded. To ensure no data
|
||||||
* is lost, COPY can use two-phase commit, by increasing max_prepared_transactions
|
* is lost, COPY can use two-phase commit, by increasing max_prepared_transactions
|
||||||
* on the worker and setting citus.copy_transaction_manager to '2pc'. The default
|
* on the worker and setting citus.multi_shard_commit_protocol to '2pc'. The default
|
||||||
* is '1pc'. This is not a problem for append-partitioned tables because new
|
* is '1pc'. This is not a problem for append-partitioned tables because new
|
||||||
* shards are created and in the case of failure, metadata changes are rolled
|
* shards are created and in the case of failure, metadata changes are rolled
|
||||||
* back on the master node.
|
* back on the master node.
|
||||||
|
@ -125,9 +125,6 @@
|
||||||
#define INITIAL_CONNECTION_CACHE_SIZE 1001
|
#define INITIAL_CONNECTION_CACHE_SIZE 1001
|
||||||
|
|
||||||
|
|
||||||
/* the transaction manager to use for COPY commands */
|
|
||||||
int CopyTransactionManager = TRANSACTION_MANAGER_1PC;
|
|
||||||
|
|
||||||
/* constant used in binary protocol */
|
/* constant used in binary protocol */
|
||||||
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
|
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
|
||||||
|
|
||||||
|
@ -449,7 +446,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
/* close the COPY input on all shard placements */
|
/* close the COPY input on all shard placements */
|
||||||
EndRemoteCopy(connectionList, true);
|
EndRemoteCopy(connectionList, true);
|
||||||
|
|
||||||
if (CopyTransactionManager == TRANSACTION_MANAGER_2PC)
|
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
|
||||||
{
|
{
|
||||||
PrepareRemoteTransactions(connectionList);
|
PrepareRemoteTransactions(connectionList);
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,9 +68,9 @@ static const struct config_enum_entry shard_placement_policy_options[] = {
|
||||||
{ NULL, 0, false }
|
{ NULL, 0, false }
|
||||||
};
|
};
|
||||||
|
|
||||||
static const struct config_enum_entry transaction_manager_options[] = {
|
static const struct config_enum_entry multi_shard_commit_protocol_options[] = {
|
||||||
{ "1pc", TRANSACTION_MANAGER_1PC, false },
|
{ "1pc", COMMIT_PROTOCOL_1PC, false },
|
||||||
{ "2pc", TRANSACTION_MANAGER_2PC, false },
|
{ "2pc", COMMIT_PROTOCOL_2PC, false },
|
||||||
{ NULL, 0, false }
|
{ NULL, 0, false }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -448,16 +448,17 @@ RegisterCitusConfigVariables(void)
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomEnumVariable(
|
DefineCustomEnumVariable(
|
||||||
"citus.copy_transaction_manager",
|
"citus.multi_shard_commit_protocol",
|
||||||
gettext_noop("Sets the transaction manager for COPY into distributed tables."),
|
gettext_noop("Sets the commit protocol for commands modifying multiple shards."),
|
||||||
gettext_noop("When a failure occurs during when copying into a distributed "
|
gettext_noop("When a failure occurs during commands that modify multiple "
|
||||||
"table, 2PC is required to ensure data is never lost. Change "
|
"shards (currently, only COPY on distributed tables modifies more "
|
||||||
"this setting to '2pc' from its default '1pc' to enable 2PC."
|
"than one shard), two-phase commit is required to ensure data is "
|
||||||
"You must also set max_prepared_transactions on the worker "
|
"never lost. Change this setting to '2pc' from its default '1pc' to "
|
||||||
"nodes. Recovery from failed 2PCs is currently manual."),
|
"enable 2 PC. You must also set max_prepared_transactions on the "
|
||||||
&CopyTransactionManager,
|
"worker nodes. Recovery from failed 2PCs is currently manual."),
|
||||||
TRANSACTION_MANAGER_1PC,
|
&MultiShardCommitProtocol,
|
||||||
transaction_manager_options,
|
COMMIT_PROTOCOL_1PC,
|
||||||
|
multi_shard_commit_protocol_options,
|
||||||
PGC_USERSET,
|
PGC_USERSET,
|
||||||
0,
|
0,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
|
@ -28,6 +28,10 @@ static uint32 DistributedTransactionId = 0;
|
||||||
static StringInfo BuildTransactionName(int connectionId);
|
static StringInfo BuildTransactionName(int connectionId);
|
||||||
|
|
||||||
|
|
||||||
|
/* the commit protocol to use for COPY commands */
|
||||||
|
int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* InitializeDistributedTransaction prepares the distributed transaction ID
|
* InitializeDistributedTransaction prepares the distributed transaction ID
|
||||||
* used in transaction names.
|
* used in transaction names.
|
||||||
|
@ -41,7 +45,7 @@ InitializeDistributedTransaction(void)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PrepareRemoteTransactions prepares all transactions on connections in
|
* PrepareRemoteTransactions prepares all transactions on connections in
|
||||||
* connectionList for commit if the 2PC transaction manager is enabled.
|
* connectionList for commit if the 2PC commit protocol is enabled.
|
||||||
* On failure, it reports an error and stops.
|
* On failure, it reports an error and stops.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
|
|
||||||
/* config variable managed via guc.c */
|
/* config variable managed via guc.c */
|
||||||
extern int CopyTransactionManager;
|
extern int MultiShardCommitProtocol;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -18,12 +18,12 @@
|
||||||
#include "nodes/pg_list.h"
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
|
|
||||||
/* Enumeration that defines the different transaction managers available */
|
/* Enumeration that defines the different commit protocols available */
|
||||||
typedef enum
|
typedef enum
|
||||||
{
|
{
|
||||||
TRANSACTION_MANAGER_1PC = 0,
|
COMMIT_PROTOCOL_1PC = 0,
|
||||||
TRANSACTION_MANAGER_2PC = 1
|
COMMIT_PROTOCOL_2PC = 1
|
||||||
} TransactionManagerType;
|
} CommitProtocolType;
|
||||||
|
|
||||||
/* Enumeration that defines different remote transaction states */
|
/* Enumeration that defines different remote transaction states */
|
||||||
typedef enum
|
typedef enum
|
||||||
|
@ -47,6 +47,10 @@ typedef struct TransactionConnection
|
||||||
} TransactionConnection;
|
} TransactionConnection;
|
||||||
|
|
||||||
|
|
||||||
|
/* config variable managed via guc.c */
|
||||||
|
extern int MultiShardCommitProtocol;
|
||||||
|
|
||||||
|
|
||||||
/* Functions declarations for transaction and connection management */
|
/* Functions declarations for transaction and connection management */
|
||||||
extern void InitializeDistributedTransaction(void);
|
extern void InitializeDistributedTransaction(void);
|
||||||
extern void PrepareRemoteTransactions(List *connectionList);
|
extern void PrepareRemoteTransactions(List *connectionList);
|
||||||
|
|
Loading…
Reference in New Issue