mirror of https://github.com/citusdata/citus.git
Remove stopOnFailure flag from EndRemoteCopy()
parent
39e806c276
commit
b5c0ca45f1
|
@ -631,7 +631,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
||||||
shardConnections->connectionList);
|
shardConnections->connectionList);
|
||||||
}
|
}
|
||||||
|
|
||||||
EndRemoteCopy(currentShardId, shardConnections->connectionList, true);
|
EndRemoteCopy(currentShardId, shardConnections->connectionList);
|
||||||
MasterUpdateShardStatistics(shardConnections->shardId);
|
MasterUpdateShardStatistics(shardConnections->shardId);
|
||||||
|
|
||||||
copiedDataSizeInBytes = 0;
|
copiedDataSizeInBytes = 0;
|
||||||
|
@ -655,7 +655,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
||||||
SendCopyBinaryFooters(copyOutState, currentShardId,
|
SendCopyBinaryFooters(copyOutState, currentShardId,
|
||||||
shardConnections->connectionList);
|
shardConnections->connectionList);
|
||||||
}
|
}
|
||||||
EndRemoteCopy(currentShardId, shardConnections->connectionList, true);
|
EndRemoteCopy(currentShardId, shardConnections->connectionList);
|
||||||
MasterUpdateShardStatistics(shardConnections->shardId);
|
MasterUpdateShardStatistics(shardConnections->shardId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1194,11 +1194,10 @@ SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *c
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* EndRemoteCopy ends the COPY input on all connections, and unclaims connections.
|
* EndRemoteCopy ends the COPY input on all connections, and unclaims connections.
|
||||||
* If stopOnFailure is true, then EndRemoteCopy reports an error on failure,
|
* This reports an error on failure.
|
||||||
* otherwise it reports a warning or continues.
|
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure)
|
EndRemoteCopy(int64 shardId, List *connectionList)
|
||||||
{
|
{
|
||||||
ListCell *connectionCell = NULL;
|
ListCell *connectionCell = NULL;
|
||||||
|
|
||||||
|
@ -1211,21 +1210,14 @@ EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure)
|
||||||
/* end the COPY input */
|
/* end the COPY input */
|
||||||
if (!PutRemoteCopyEnd(connection, NULL))
|
if (!PutRemoteCopyEnd(connection, NULL))
|
||||||
{
|
{
|
||||||
if (!stopOnFailure)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
||||||
errmsg("failed to COPY to shard " INT64_FORMAT " on %s:%d",
|
errmsg("failed to COPY to shard " INT64_FORMAT " on %s:%d",
|
||||||
shardId, connection->hostname, connection->port)));
|
shardId, connection->hostname, connection->port)));
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* check whether there were any COPY errors */
|
/* check whether there were any COPY errors */
|
||||||
result = GetRemoteCommandResult(connection, raiseInterrupts);
|
result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||||
if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure)
|
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
ReportCopyError(connection, result);
|
ReportCopyError(connection, result);
|
||||||
}
|
}
|
||||||
|
@ -2456,8 +2448,7 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* close the COPY input on all shard placements */
|
/* close the COPY input on all shard placements */
|
||||||
EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList,
|
EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList);
|
||||||
true);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
PG_CATCH();
|
PG_CATCH();
|
||||||
|
|
|
@ -448,7 +448,7 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* close the COPY input */
|
/* close the COPY input */
|
||||||
EndRemoteCopy(0, connectionList, true);
|
EndRemoteCopy(0, connectionList);
|
||||||
|
|
||||||
if (resultDest->writeLocalFile)
|
if (resultDest->writeLocalFile)
|
||||||
{
|
{
|
||||||
|
|
|
@ -131,7 +131,7 @@ extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
|
||||||
CopyCoercionData *columnCoercionPaths);
|
CopyCoercionData *columnCoercionPaths);
|
||||||
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
|
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
|
||||||
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
|
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
|
||||||
extern void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure);
|
extern void EndRemoteCopy(int64 shardId, List *connectionList);
|
||||||
extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
|
extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
|
||||||
const char *queryString);
|
const char *queryString);
|
||||||
extern void CheckCopyPermissions(CopyStmt *copyStatement);
|
extern void CheckCopyPermissions(CopyStmt *copyStatement);
|
||||||
|
|
Loading…
Reference in New Issue