Use new transaction management api in master_append_table_to_shard

pull/1149/head
Brian Cloutier 2017-01-20 19:15:33 +03:00
parent e3c9483bae
commit d91a32b585
1 changed files with 4 additions and 36 deletions

View File

@ -206,10 +206,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
char *shardTableName = NULL; char *shardTableName = NULL;
char *shardQualifiedName = NULL; char *shardQualifiedName = NULL;
List *shardPlacementList = NIL; List *shardPlacementList = NIL;
List *succeededPlacementList = NIL;
List *failedPlacementList = NIL;
ListCell *shardPlacementCell = NULL; ListCell *shardPlacementCell = NULL;
ListCell *failedPlacementCell = NULL;
uint64 newShardSize = 0; uint64 newShardSize = 0;
uint64 shardMaxSizeInBytes = 0; uint64 shardMaxSizeInBytes = 0;
float4 shardFillLevel = 0.0; float4 shardFillLevel = 0.0;
@ -261,6 +258,8 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
errhint("Try running master_create_empty_shard() first"))); errhint("Try running master_create_empty_shard() first")));
} }
BeginOrContinueCoordinatedTransaction();
/* issue command to append table to each shard placement */ /* issue command to append table to each shard placement */
foreach(shardPlacementCell, shardPlacementList) foreach(shardPlacementCell, shardPlacementList)
{ {
@ -281,39 +280,10 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
PQclear(queryResult); PQclear(queryResult);
ForgetResults(connection); ForgetResults(connection);
if (executeCommand == 0) if (executeCommand != 0)
{ {
succeededPlacementList = lappend(succeededPlacementList, shardPlacement); MarkRemoteTransactionFailed(connection, false);
} }
else
{
failedPlacementList = lappend(failedPlacementList, shardPlacement);
}
}
/* before updating metadata, check that we appended to at least one shard */
if (succeededPlacementList == NIL)
{
ereport(ERROR, (errmsg("could not append table to any shard placement")));
}
/* make sure we don't process cancel signals */
HOLD_INTERRUPTS();
/* mark shard placements that we couldn't append to as inactive */
foreach(failedPlacementCell, failedPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(failedPlacementCell);
uint64 placementId = placement->placementId;
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
UpdateShardPlacementState(placementId, FILE_INACTIVE);
ereport(WARNING, (errmsg("could not append table to shard \"%s\" on node "
"\"%s:%u\"", shardQualifiedName, workerName,
workerPort),
errdetail("Marking this shard placement as inactive")));
} }
/* update shard statistics and get new shard size */ /* update shard statistics and get new shard size */
@ -323,8 +293,6 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
shardFillLevel = ((float4) newShardSize / (float4) shardMaxSizeInBytes); shardFillLevel = ((float4) newShardSize / (float4) shardMaxSizeInBytes);
RESUME_INTERRUPTS();
PG_RETURN_FLOAT4(shardFillLevel); PG_RETURN_FLOAT4(shardFillLevel);
} }