Move multi_copy.c to interrupt aware libpq wrappers.

pull/1650/head
Andres Freund 2017-06-30 18:20:54 -07:00 committed by Metin Doslu
parent 0722b3f003
commit a66ece75a0
1 changed files with 45 additions and 13 deletions

View File

@ -737,6 +737,7 @@ MasterPartitionMethod(RangeVar *relation)
{
char partitionMethod = '\0';
PGresult *queryResult = NULL;
bool raiseInterrupts = true;
char *relationName = relation->relname;
char *schemaName = relation->schemaname;
@ -745,7 +746,11 @@ MasterPartitionMethod(RangeVar *relation)
StringInfo partitionMethodCommand = makeStringInfo();
appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName);
queryResult = PQexec(masterConnection->pgConn, partitionMethodCommand->data);
if (!SendRemoteCommand(masterConnection, partitionMethodCommand->data))
{
ReportConnectionError(masterConnection, ERROR);
}
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0);
@ -766,6 +771,9 @@ MasterPartitionMethod(RangeVar *relation)
PQclear(queryResult);
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
Assert(!queryResult);
return partitionMethod;
}
@ -814,6 +822,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
ListCell *placementCell = NULL;
List *connectionList = NULL;
int64 shardId = shardConnections->shardId;
bool raiseInterrupts = true;
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"OpenCopyConnections",
@ -871,15 +880,19 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
MarkRemoteTransactionCritical(connection);
ClaimConnectionExclusively(connection);
RemoteTransactionBeginIfNecessary(connection);
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId,
useBinaryCopyFormat);
result = PQexec(connection->pgConn, copyCommand->data);
if (!SendRemoteCommand(connection, copyCommand->data))
{
ReportConnectionError(connection, ERROR);
}
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (PQresultStatus(result) != PGRES_COPY_IN)
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
connectionList = lappend(connectionList, connection);
}
@ -1013,11 +1026,16 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
{
List *finalizedPlacementList = NIL;
PGresult *queryResult = NULL;
bool raiseInterrupts = true;
StringInfo shardPlacementsCommand = makeStringInfo();
appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId);
queryResult = PQexec(masterConnection->pgConn, shardPlacementsCommand->data);
if (!SendRemoteCommand(masterConnection, shardPlacementsCommand->data))
{
ReportConnectionError(masterConnection, ERROR);
}
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
int rowCount = PQntuples(queryResult);
@ -1047,6 +1065,9 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
}
PQclear(queryResult);
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
Assert(!queryResult);
return finalizedPlacementList;
}
@ -1128,8 +1149,7 @@ SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList)
static void
SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *connection)
{
int copyResult = PQputCopyData(connection->pgConn, dataBuffer->data, dataBuffer->len);
if (copyResult != 1)
if (!PutRemoteCopyData(connection, dataBuffer->data, dataBuffer->len))
{
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to COPY to shard %ld on %s:%d",
@ -1153,13 +1173,11 @@ EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure)
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
int copyEndResult = 0;
PGresult *result = NULL;
bool raiseInterrupts = true;
/* end the COPY input */
copyEndResult = PQputCopyEnd(connection->pgConn, NULL);
if (copyEndResult != 1)
if (!PutRemoteCopyEnd(connection, NULL))
{
if (stopOnFailure)
{
@ -1172,7 +1190,7 @@ EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure)
}
/* check whether there were any COPY errors */
result = PQgetResult(connection->pgConn);
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure)
{
ReportCopyError(connection, result);
@ -1492,11 +1510,16 @@ RemoteCreateEmptyShard(char *relationName)
{
int64 shardId = 0;
PGresult *queryResult = NULL;
bool raiseInterrupts = true;
StringInfo createEmptyShardCommand = makeStringInfo();
appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName);
queryResult = PQexec(masterConnection->pgConn, createEmptyShardCommand->data);
if (!SendRemoteCommand(masterConnection, createEmptyShardCommand->data))
{
ReportConnectionError(masterConnection, ERROR);
}
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0);
@ -1510,6 +1533,8 @@ RemoteCreateEmptyShard(char *relationName)
}
PQclear(queryResult);
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
Assert(!queryResult);
return shardId;
}
@ -1540,18 +1565,25 @@ static void
RemoteUpdateShardStatistics(uint64 shardId)
{
PGresult *queryResult = NULL;
bool raiseInterrupts = true;
StringInfo updateShardStatisticsCommand = makeStringInfo();
appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY,
shardId);
queryResult = PQexec(masterConnection->pgConn, updateShardStatisticsCommand->data);
if (!SendRemoteCommand(masterConnection, updateShardStatisticsCommand->data))
{
ReportConnectionError(masterConnection, ERROR);
}
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
if (PQresultStatus(queryResult) != PGRES_TUPLES_OK)
{
ereport(ERROR, (errmsg("could not update shard statistics")));
}
PQclear(queryResult);
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
Assert(!queryResult);
}