diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index e79c38d8e..789e2b816 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -36,6 +36,7 @@ #include "distributed/placement_connection.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" +#include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "utils/builtins.h" @@ -206,10 +207,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) char *shardTableName = NULL; char *shardQualifiedName = NULL; List *shardPlacementList = NIL; - List *succeededPlacementList = NIL; - List *failedPlacementList = NIL; ListCell *shardPlacementCell = NULL; - ListCell *failedPlacementCell = NULL; uint64 newShardSize = 0; uint64 shardMaxSizeInBytes = 0; float4 shardFillLevel = 0.0; @@ -261,13 +259,16 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) errhint("Try running master_create_empty_shard() first"))); } + BeginOrContinueCoordinatedTransaction(); + /* issue command to append table to each shard placement */ foreach(shardPlacementCell, shardPlacementList) { ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); - char *workerName = shardPlacement->nodeName; - uint32 workerPort = shardPlacement->nodePort; - List *queryResultList = NIL; + MultiConnection *connection = GetPlacementConnection(FOR_DML, shardPlacement, + NULL); + PGresult *queryResult = NULL; + int executeResult = 0; StringInfo workerAppendQuery = makeStringInfo(); appendStringInfo(workerAppendQuery, WORKER_APPEND_TABLE_TO_SHARD, @@ -275,43 +276,22 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) quote_literal_cstr(sourceTableName), quote_literal_cstr(sourceNodeName), sourceNodePort); - /* inserting data should be performed by the current user */ - queryResultList = ExecuteRemoteQuery(workerName, workerPort, NULL, - workerAppendQuery); - if (queryResultList != NIL) + executeResult = ExecuteOptionalRemoteCommand(connection, workerAppendQuery->data, + &queryResult); + PQclear(queryResult); + ForgetResults(connection); + + if (executeResult != 0) { - succeededPlacementList = lappend(succeededPlacementList, shardPlacement); - } - else - { - failedPlacementList = lappend(failedPlacementList, shardPlacement); + MarkRemoteTransactionFailed(connection, false); } } - /* before updating metadata, check that we appended to at least one shard */ - if (succeededPlacementList == NIL) - { - ereport(ERROR, (errmsg("could not append table to any shard placement"))); - } - - /* make sure we don't process cancel signals */ - HOLD_INTERRUPTS(); - - /* mark shard placements that we couldn't append to as inactive */ - foreach(failedPlacementCell, failedPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(failedPlacementCell); - uint64 placementId = placement->placementId; - char *workerName = placement->nodeName; - uint32 workerPort = placement->nodePort; - - UpdateShardPlacementState(placementId, FILE_INACTIVE); - - ereport(WARNING, (errmsg("could not append table to shard \"%s\" on node " - "\"%s:%u\"", shardQualifiedName, workerName, - workerPort), - errdetail("Marking this shard placement as inactive"))); - } + /* + * Abort if all placements failed, mark placements invalid if only some failed. By + * doing this UpdateShardStatistics never works on failed placements. + */ + CheckForFailedPlacements(true, CoordinatedTransactionUses2PC); /* update shard statistics and get new shard size */ newShardSize = UpdateShardStatistics(shardId); @@ -320,8 +300,6 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; shardFillLevel = ((float4) newShardSize / (float4) shardMaxSizeInBytes); - RESUME_INTERRUPTS(); - PG_RETURN_FLOAT4(shardFillLevel); } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 40a0e4651..9c22858ac 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -48,7 +48,7 @@ static bool subXactAbortAttempted = false; * CoordinatedTransactionUse2PC(), e.g. if DDL was issued and * MultiShardCommitProtocol was set to 2PC. */ -static bool CurrentTransactionUse2PC = false; +bool CoordinatedTransactionUses2PC = false; /* transaction management functions */ static void CoordinatedTransactionCallback(XactEvent event, void *arg); @@ -113,7 +113,7 @@ CoordinatedTransactionUse2PC(void) { Assert(InCoordinatedTransaction()); - CurrentTransactionUse2PC = true; + CoordinatedTransactionUses2PC = true; } @@ -167,7 +167,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; dlist_init(&InProgressTransactions); - CurrentTransactionUse2PC = false; + CoordinatedTransactionUses2PC = false; } break; @@ -202,7 +202,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; dlist_init(&InProgressTransactions); - CurrentTransactionUse2PC = false; + CoordinatedTransactionUses2PC = false; subXactAbortAttempted = false; } break; @@ -247,9 +247,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * case that iteratively executed commands marked all placements * as invalid. */ - CheckForFailedPlacements(true, CurrentTransactionUse2PC); + CheckForFailedPlacements(true, CoordinatedTransactionUses2PC); - if (CurrentTransactionUse2PC) + if (CoordinatedTransactionUses2PC) { CoordinatedRemoteTransactionsPrepare(); CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; @@ -270,7 +270,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * Check again whether shards/placement successfully * committed. This handles failure at COMMIT/PREPARE time. */ - CheckForFailedPlacements(false, CurrentTransactionUse2PC); + CheckForFailedPlacements(false, CoordinatedTransactionUses2PC); } break; diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index dd0c691ed..3f600f064 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -66,6 +66,9 @@ extern CoordinatedTransactionState CurrentCoordinatedTransactionState; /* list of connections that are part of the current coordinated transaction */ extern dlist_head InProgressTransactions; +/* whether we've been asked to use 2PC (by calling CoordinatedTransactionUse2PC()) */ +extern bool CoordinatedTransactionUses2PC; + /* * Coordinated transaction management. */