diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 08ca5c831..8ee677e61 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -206,10 +206,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) char *shardTableName = NULL; char *shardQualifiedName = NULL; List *shardPlacementList = NIL; - List *succeededPlacementList = NIL; - List *failedPlacementList = NIL; ListCell *shardPlacementCell = NULL; - ListCell *failedPlacementCell = NULL; uint64 newShardSize = 0; uint64 shardMaxSizeInBytes = 0; float4 shardFillLevel = 0.0; @@ -261,6 +258,8 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) errhint("Try running master_create_empty_shard() first"))); } + BeginOrContinueCoordinatedTransaction(); + /* issue command to append table to each shard placement */ foreach(shardPlacementCell, shardPlacementList) { @@ -281,39 +280,10 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) PQclear(queryResult); ForgetResults(connection); - if (executeCommand == 0) + if (executeCommand != 0) { - succeededPlacementList = lappend(succeededPlacementList, shardPlacement); + MarkRemoteTransactionFailed(connection, false); } - else - { - failedPlacementList = lappend(failedPlacementList, shardPlacement); - } - } - - /* before updating metadata, check that we appended to at least one shard */ - if (succeededPlacementList == NIL) - { - ereport(ERROR, (errmsg("could not append table to any shard placement"))); - } - - /* make sure we don't process cancel signals */ - HOLD_INTERRUPTS(); - - /* mark shard placements that we couldn't append to as inactive */ - foreach(failedPlacementCell, failedPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(failedPlacementCell); - uint64 placementId = placement->placementId; - char *workerName = placement->nodeName; - uint32 workerPort = placement->nodePort; - - UpdateShardPlacementState(placementId, FILE_INACTIVE); - - ereport(WARNING, (errmsg("could not append table to shard \"%s\" on node " - "\"%s:%u\"", shardQualifiedName, workerName, - workerPort), - errdetail("Marking this shard placement as inactive"))); } /* update shard statistics and get new shard size */ @@ -323,8 +293,6 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; shardFillLevel = ((float4) newShardSize / (float4) shardMaxSizeInBytes); - RESUME_INTERRUPTS(); - PG_RETURN_FLOAT4(shardFillLevel); }