diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 80d574341..72d11710f 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -938,12 +938,34 @@ DistributedExecutionRequiresRollback(DistributedExecution *execution) if (list_length(task->taskPlacementList) > 1) { + if (SingleShardCommitProtocol == COMMIT_PROTOCOL_2PC) + { + /* + * Adaptive executor opts to error out on queries if a placement is unhealthy, + * not marking the placement itself unhealthy in the process. + * Use 2PC to rollback placements before the unhealthy replica failed. + */ + return true; + } + /* - * Adaptive executor opts to error out on queries if a placement is unhealthy, - * not marking the placement itself unhealthy in the process. - * Use 2PC to rollback placements before the unhealthy shard failed. + * Some tasks don't set replicationModel thus we only + * rely on the anchorShardId, not replicationModel. + * + * TODO: Do we ever need replicationModel in the Task structure? + * Can't we always rely on anchorShardId? */ - return true; + if (task->anchorShardId != INVALID_SHARD_ID && ReferenceTableShardId( + task->anchorShardId)) + { + return true; + } + + /* + * Single DML/DDL tasks with replicated tables (non-reference) + * should not require BEGIN/COMMIT/ROLLBACK. + */ + return false; } return false; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index abdf0a01e..ab3b0dd16 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -128,7 +128,7 @@ static const struct config_enum_entry use_secondary_nodes_options[] = { { NULL, 0, false } }; -static const struct config_enum_entry multi_shard_commit_protocol_options[] = { +static const struct config_enum_entry shard_commit_protocol_options[] = { { "1pc", COMMIT_PROTOCOL_1PC, false }, { "2pc", COMMIT_PROTOCOL_2PC, false }, { NULL, 0, false } @@ -930,7 +930,22 @@ RegisterCitusConfigVariables(void) "performance benefits."), &MultiShardCommitProtocol, COMMIT_PROTOCOL_2PC, - multi_shard_commit_protocol_options, + shard_commit_protocol_options, + PGC_USERSET, + 0, + NULL, NULL, NULL); + + DefineCustomEnumVariable( + "citus.single_shard_commit_protocol", + gettext_noop( + "Sets the commit protocol for commands modifying a single shards with multiple replicas."), + gettext_noop("When a failure occurs during commands that modify multiple " + "replicas, two-phase commit is required to ensure data is never lost " + "and this is the default. However, changing to 1pc may give small " + "performance benefits."), + &SingleShardCommitProtocol, + COMMIT_PROTOCOL_2PC, + shard_commit_protocol_options, PGC_USERSET, 0, NULL, NULL, NULL); diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 985e0b2f7..0549277d3 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -38,6 +38,7 @@ CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NON /* GUC, the commit protocol to use for commands affecting more than one connection */ int MultiShardCommitProtocol = COMMIT_PROTOCOL_2PC; +int SingleShardCommitProtocol = COMMIT_PROTOCOL_2PC; int SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE; /* state needed to keep track of operations used during a transaction */ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 414fff800..70365a9d6 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -75,6 +75,7 @@ extern bool FunctionOpensTransactionBlock; /* config variable managed via guc.c */ extern int MultiShardCommitProtocol; +extern int SingleShardCommitProtocol; /* state needed to restore multi-shard commit protocol during VACUUM/ANALYZE */ extern int SavedMultiShardCommitProtocol;