Set placement to inactive on connection failure in COPY

pull/1168/head
Marco Slot 2017-01-24 12:24:01 +01:00 committed by Murat Tuncer
parent 4476a7be81
commit d0c76407b8
1 changed files with 13 additions and 12 deletions

View File

@ -834,7 +834,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
bool stopOnFailure, bool useBinaryCopyFormat) bool stopOnFailure, bool useBinaryCopyFormat)
{ {
List *finalizedPlacementList = NIL; List *finalizedPlacementList = NIL;
List *failedPlacementList = NIL; int failedPlacementCount = 0;
ListCell *placementCell = NULL; ListCell *placementCell = NULL;
List *connectionList = NULL; List *connectionList = NULL;
int64 shardId = shardConnections->shardId; int64 shardId = shardConnections->shardId;
@ -863,8 +863,6 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
foreach(placementCell, finalizedPlacementList) foreach(placementCell, finalizedPlacementList)
{ {
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
char *nodeName = placement->nodeName;
int nodePort = placement->nodePort;
char *nodeUser = CurrentUserName(); char *nodeUser = CurrentUserName();
MultiConnection *connection = NULL; MultiConnection *connection = NULL;
uint32 connectionFlags = FOR_DML; uint32 connectionFlags = FOR_DML;
@ -877,13 +875,17 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
{ {
if (stopOnFailure) if (stopOnFailure)
{ {
ereport(ERROR, (errmsg("could not open connection to %s:%d", ReportConnectionError(connection, ERROR);
nodeName, nodePort)));
} }
else
{
ReportConnectionError(connection, WARNING);
MarkRemoteTransactionFailed(connection, true);
failedPlacementList = lappend(failedPlacementList, placement); failedPlacementCount++;
continue; continue;
} }
}
/* /*
* If errors are supposed to cause immediate aborts (i.e. we don't * If errors are supposed to cause immediate aborts (i.e. we don't
@ -907,8 +909,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
PQclear(result); PQclear(result);
/* failed placements will be invalidated by transaction machinery */ failedPlacementCount++;
failedPlacementList = lappend(failedPlacementList, placement);
continue; continue;
} }
@ -917,9 +918,9 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
} }
/* if all placements failed, error out */ /* if all placements failed, error out */
if (list_length(failedPlacementList) == list_length(finalizedPlacementList)) if (failedPlacementCount == list_length(finalizedPlacementList))
{ {
ereport(ERROR, (errmsg("could not find any active placements"))); ereport(ERROR, (errmsg("could not connect to any active placements")));
} }
/* /*
@ -927,7 +928,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
* never reach to this point. This is the case for reference tables and * never reach to this point. This is the case for reference tables and
* copy from worker nodes. * copy from worker nodes.
*/ */
Assert(!stopOnFailure || list_length(failedPlacementList) == 0); Assert(!stopOnFailure || failedPlacementCount == 0);
shardConnections->connectionList = connectionList; shardConnections->connectionList = connectionList;