Port master_append_table_to_shard to new connection API (#1149)

If any placements fail it doesn't update shard statistics on those placements.

A minor enabling refactor: Make CoordinatedTransactionUses2PC public (it used to be CoordinatedTransactionUse2PC but that symbol already existed, so renamed it as well)
pull/1938/head
Brian Cloutier 2017-01-23 15:57:44 +02:00 committed by GitHub
parent c618a46a35
commit e4b65d03a2
3 changed files with 29 additions and 48 deletions

View File

@ -36,6 +36,7 @@
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "utils/builtins.h" #include "utils/builtins.h"
@ -206,10 +207,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
char *shardTableName = NULL; char *shardTableName = NULL;
char *shardQualifiedName = NULL; char *shardQualifiedName = NULL;
List *shardPlacementList = NIL; List *shardPlacementList = NIL;
List *succeededPlacementList = NIL;
List *failedPlacementList = NIL;
ListCell *shardPlacementCell = NULL; ListCell *shardPlacementCell = NULL;
ListCell *failedPlacementCell = NULL;
uint64 newShardSize = 0; uint64 newShardSize = 0;
uint64 shardMaxSizeInBytes = 0; uint64 shardMaxSizeInBytes = 0;
float4 shardFillLevel = 0.0; float4 shardFillLevel = 0.0;
@ -261,13 +259,16 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
errhint("Try running master_create_empty_shard() first"))); errhint("Try running master_create_empty_shard() first")));
} }
BeginOrContinueCoordinatedTransaction();
/* issue command to append table to each shard placement */ /* issue command to append table to each shard placement */
foreach(shardPlacementCell, shardPlacementList) foreach(shardPlacementCell, shardPlacementList)
{ {
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell);
char *workerName = shardPlacement->nodeName; MultiConnection *connection = GetPlacementConnection(FOR_DML, shardPlacement,
uint32 workerPort = shardPlacement->nodePort; NULL);
List *queryResultList = NIL; PGresult *queryResult = NULL;
int executeResult = 0;
StringInfo workerAppendQuery = makeStringInfo(); StringInfo workerAppendQuery = makeStringInfo();
appendStringInfo(workerAppendQuery, WORKER_APPEND_TABLE_TO_SHARD, appendStringInfo(workerAppendQuery, WORKER_APPEND_TABLE_TO_SHARD,
@ -275,43 +276,22 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
quote_literal_cstr(sourceTableName), quote_literal_cstr(sourceTableName),
quote_literal_cstr(sourceNodeName), sourceNodePort); quote_literal_cstr(sourceNodeName), sourceNodePort);
/* inserting data should be performed by the current user */ executeResult = ExecuteOptionalRemoteCommand(connection, workerAppendQuery->data,
queryResultList = ExecuteRemoteQuery(workerName, workerPort, NULL, &queryResult);
workerAppendQuery); PQclear(queryResult);
if (queryResultList != NIL) ForgetResults(connection);
if (executeResult != 0)
{ {
succeededPlacementList = lappend(succeededPlacementList, shardPlacement); MarkRemoteTransactionFailed(connection, false);
}
else
{
failedPlacementList = lappend(failedPlacementList, shardPlacement);
} }
} }
/* before updating metadata, check that we appended to at least one shard */ /*
if (succeededPlacementList == NIL) * Abort if all placements failed, mark placements invalid if only some failed. By
{ * doing this UpdateShardStatistics never works on failed placements.
ereport(ERROR, (errmsg("could not append table to any shard placement"))); */
} CheckForFailedPlacements(true, CoordinatedTransactionUses2PC);
/* make sure we don't process cancel signals */
HOLD_INTERRUPTS();
/* mark shard placements that we couldn't append to as inactive */
foreach(failedPlacementCell, failedPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(failedPlacementCell);
uint64 placementId = placement->placementId;
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
UpdateShardPlacementState(placementId, FILE_INACTIVE);
ereport(WARNING, (errmsg("could not append table to shard \"%s\" on node "
"\"%s:%u\"", shardQualifiedName, workerName,
workerPort),
errdetail("Marking this shard placement as inactive")));
}
/* update shard statistics and get new shard size */ /* update shard statistics and get new shard size */
newShardSize = UpdateShardStatistics(shardId); newShardSize = UpdateShardStatistics(shardId);
@ -320,8 +300,6 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
shardFillLevel = ((float4) newShardSize / (float4) shardMaxSizeInBytes); shardFillLevel = ((float4) newShardSize / (float4) shardMaxSizeInBytes);
RESUME_INTERRUPTS();
PG_RETURN_FLOAT4(shardFillLevel); PG_RETURN_FLOAT4(shardFillLevel);
} }

View File

@ -48,7 +48,7 @@ static bool subXactAbortAttempted = false;
* CoordinatedTransactionUse2PC(), e.g. if DDL was issued and * CoordinatedTransactionUse2PC(), e.g. if DDL was issued and
* MultiShardCommitProtocol was set to 2PC. * MultiShardCommitProtocol was set to 2PC.
*/ */
static bool CurrentTransactionUse2PC = false; bool CoordinatedTransactionUses2PC = false;
/* transaction management functions */ /* transaction management functions */
static void CoordinatedTransactionCallback(XactEvent event, void *arg); static void CoordinatedTransactionCallback(XactEvent event, void *arg);
@ -113,7 +113,7 @@ CoordinatedTransactionUse2PC(void)
{ {
Assert(InCoordinatedTransaction()); Assert(InCoordinatedTransaction());
CurrentTransactionUse2PC = true; CoordinatedTransactionUses2PC = true;
} }
@ -167,7 +167,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
CurrentCoordinatedTransactionState = COORD_TRANS_NONE; CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
XactModificationLevel = XACT_MODIFICATION_NONE; XactModificationLevel = XACT_MODIFICATION_NONE;
dlist_init(&InProgressTransactions); dlist_init(&InProgressTransactions);
CurrentTransactionUse2PC = false; CoordinatedTransactionUses2PC = false;
} }
break; break;
@ -202,7 +202,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
CurrentCoordinatedTransactionState = COORD_TRANS_NONE; CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
XactModificationLevel = XACT_MODIFICATION_NONE; XactModificationLevel = XACT_MODIFICATION_NONE;
dlist_init(&InProgressTransactions); dlist_init(&InProgressTransactions);
CurrentTransactionUse2PC = false; CoordinatedTransactionUses2PC = false;
subXactAbortAttempted = false; subXactAbortAttempted = false;
} }
break; break;
@ -247,9 +247,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* case that iteratively executed commands marked all placements * case that iteratively executed commands marked all placements
* as invalid. * as invalid.
*/ */
CheckForFailedPlacements(true, CurrentTransactionUse2PC); CheckForFailedPlacements(true, CoordinatedTransactionUses2PC);
if (CurrentTransactionUse2PC) if (CoordinatedTransactionUses2PC)
{ {
CoordinatedRemoteTransactionsPrepare(); CoordinatedRemoteTransactionsPrepare();
CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
@ -270,7 +270,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* Check again whether shards/placement successfully * Check again whether shards/placement successfully
* committed. This handles failure at COMMIT/PREPARE time. * committed. This handles failure at COMMIT/PREPARE time.
*/ */
CheckForFailedPlacements(false, CurrentTransactionUse2PC); CheckForFailedPlacements(false, CoordinatedTransactionUses2PC);
} }
break; break;

View File

@ -66,6 +66,9 @@ extern CoordinatedTransactionState CurrentCoordinatedTransactionState;
/* list of connections that are part of the current coordinated transaction */ /* list of connections that are part of the current coordinated transaction */
extern dlist_head InProgressTransactions; extern dlist_head InProgressTransactions;
/* whether we've been asked to use 2PC (by calling CoordinatedTransactionUse2PC()) */
extern bool CoordinatedTransactionUses2PC;
/* /*
* Coordinated transaction management. * Coordinated transaction management.
*/ */