Merge pull request #2891 from citusdata/guc_to_disable_2pc_for_single_shard_modify

Introduce citus.single_shard_commit_protocol for if users want 1PC on writes to replicas
pull/2855/head
Philip Dubé 2019-08-15 19:03:06 +00:00 committed by GitHub
commit 130e999ac7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 45 additions and 6 deletions

View File

@ -937,15 +937,37 @@ DistributedExecutionRequiresRollback(DistributedExecution *execution)
} }
if (list_length(task->taskPlacementList) > 1) if (list_length(task->taskPlacementList) > 1)
{
if (SingleShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{ {
/* /*
* Adaptive executor opts to error out on queries if a placement is unhealthy, * Adaptive executor opts to error out on queries if a placement is unhealthy,
* not marking the placement itself unhealthy in the process. * not marking the placement itself unhealthy in the process.
* Use 2PC to rollback placements before the unhealthy shard failed. * Use 2PC to rollback placements before the unhealthy replica failed.
*/ */
return true; return true;
} }
/*
* Some tasks don't set replicationModel thus we only
* rely on the anchorShardId, not replicationModel.
*
* TODO: Do we ever need replicationModel in the Task structure?
* Can't we always rely on anchorShardId?
*/
if (task->anchorShardId != INVALID_SHARD_ID && ReferenceTableShardId(
task->anchorShardId))
{
return true;
}
/*
* Single DML/DDL tasks with replicated tables (non-reference)
* should not require BEGIN/COMMIT/ROLLBACK.
*/
return false;
}
return false; return false;
} }

View File

@ -128,7 +128,7 @@ static const struct config_enum_entry use_secondary_nodes_options[] = {
{ NULL, 0, false } { NULL, 0, false }
}; };
static const struct config_enum_entry multi_shard_commit_protocol_options[] = { static const struct config_enum_entry shard_commit_protocol_options[] = {
{ "1pc", COMMIT_PROTOCOL_1PC, false }, { "1pc", COMMIT_PROTOCOL_1PC, false },
{ "2pc", COMMIT_PROTOCOL_2PC, false }, { "2pc", COMMIT_PROTOCOL_2PC, false },
{ NULL, 0, false } { NULL, 0, false }
@ -930,7 +930,22 @@ RegisterCitusConfigVariables(void)
"performance benefits."), "performance benefits."),
&MultiShardCommitProtocol, &MultiShardCommitProtocol,
COMMIT_PROTOCOL_2PC, COMMIT_PROTOCOL_2PC,
multi_shard_commit_protocol_options, shard_commit_protocol_options,
PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.single_shard_commit_protocol",
gettext_noop(
"Sets the commit protocol for commands modifying a single shards with multiple replicas."),
gettext_noop("When a failure occurs during commands that modify multiple "
"replicas, two-phase commit is required to ensure data is never lost "
"and this is the default. However, changing to 1pc may give small "
"performance benefits."),
&SingleShardCommitProtocol,
COMMIT_PROTOCOL_2PC,
shard_commit_protocol_options,
PGC_USERSET, PGC_USERSET,
0, 0,
NULL, NULL, NULL); NULL, NULL, NULL);

View File

@ -38,6 +38,7 @@ CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NON
/* GUC, the commit protocol to use for commands affecting more than one connection */ /* GUC, the commit protocol to use for commands affecting more than one connection */
int MultiShardCommitProtocol = COMMIT_PROTOCOL_2PC; int MultiShardCommitProtocol = COMMIT_PROTOCOL_2PC;
int SingleShardCommitProtocol = COMMIT_PROTOCOL_2PC;
int SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE; int SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
/* state needed to keep track of operations used during a transaction */ /* state needed to keep track of operations used during a transaction */

View File

@ -75,6 +75,7 @@ extern bool FunctionOpensTransactionBlock;
/* config variable managed via guc.c */ /* config variable managed via guc.c */
extern int MultiShardCommitProtocol; extern int MultiShardCommitProtocol;
extern int SingleShardCommitProtocol;
/* state needed to restore multi-shard commit protocol during VACUUM/ANALYZE */ /* state needed to restore multi-shard commit protocol during VACUUM/ANALYZE */
extern int SavedMultiShardCommitProtocol; extern int SavedMultiShardCommitProtocol;