Merge pull request #2706 from citusdata/simplify_EndRemoteCopy

Remove stopOnFailure flag from EndRemoteCopy()
pull/2703/head
Hadi Moshayedi 2019-05-13 12:51:15 -08:00 committed by GitHub
commit 1b4dc44996
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 8 additions and 17 deletions

View File

@ -631,7 +631,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
shardConnections->connectionList); shardConnections->connectionList);
} }
EndRemoteCopy(currentShardId, shardConnections->connectionList, true); EndRemoteCopy(currentShardId, shardConnections->connectionList);
MasterUpdateShardStatistics(shardConnections->shardId); MasterUpdateShardStatistics(shardConnections->shardId);
copiedDataSizeInBytes = 0; copiedDataSizeInBytes = 0;
@ -655,7 +655,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
SendCopyBinaryFooters(copyOutState, currentShardId, SendCopyBinaryFooters(copyOutState, currentShardId,
shardConnections->connectionList); shardConnections->connectionList);
} }
EndRemoteCopy(currentShardId, shardConnections->connectionList, true); EndRemoteCopy(currentShardId, shardConnections->connectionList);
MasterUpdateShardStatistics(shardConnections->shardId); MasterUpdateShardStatistics(shardConnections->shardId);
} }
@ -1194,11 +1194,10 @@ SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *c
/* /*
* EndRemoteCopy ends the COPY input on all connections, and unclaims connections. * EndRemoteCopy ends the COPY input on all connections, and unclaims connections.
* If stopOnFailure is true, then EndRemoteCopy reports an error on failure, * This reports an error on failure.
* otherwise it reports a warning or continues.
*/ */
void void
EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure) EndRemoteCopy(int64 shardId, List *connectionList)
{ {
ListCell *connectionCell = NULL; ListCell *connectionCell = NULL;
@ -1211,21 +1210,14 @@ EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure)
/* end the COPY input */ /* end the COPY input */
if (!PutRemoteCopyEnd(connection, NULL)) if (!PutRemoteCopyEnd(connection, NULL))
{ {
if (!stopOnFailure)
{
continue;
}
ereport(ERROR, (errcode(ERRCODE_IO_ERROR), ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to COPY to shard " INT64_FORMAT " on %s:%d", errmsg("failed to COPY to shard " INT64_FORMAT " on %s:%d",
shardId, connection->hostname, connection->port))); shardId, connection->hostname, connection->port)));
continue;
} }
/* check whether there were any COPY errors */ /* check whether there were any COPY errors */
result = GetRemoteCommandResult(connection, raiseInterrupts); result = GetRemoteCommandResult(connection, raiseInterrupts);
if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure) if (PQresultStatus(result) != PGRES_COMMAND_OK)
{ {
ReportCopyError(connection, result); ReportCopyError(connection, result);
} }
@ -2456,8 +2448,7 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
} }
/* close the COPY input on all shard placements */ /* close the COPY input on all shard placements */
EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList);
true);
} }
} }
PG_CATCH(); PG_CATCH();

View File

@ -448,7 +448,7 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver)
} }
/* close the COPY input */ /* close the COPY input */
EndRemoteCopy(0, connectionList, true); EndRemoteCopy(0, connectionList);
if (resultDest->writeLocalFile) if (resultDest->writeLocalFile)
{ {

View File

@ -131,7 +131,7 @@ extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
CopyCoercionData *columnCoercionPaths); CopyCoercionData *columnCoercionPaths);
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState); extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState); extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
extern void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure); extern void EndRemoteCopy(int64 shardId, List *connectionList);
extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
const char *queryString); const char *queryString);
extern void CheckCopyPermissions(CopyStmt *copyStatement); extern void CheckCopyPermissions(CopyStmt *copyStatement);