diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index a4ecb7a43..59f800123 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -737,6 +737,7 @@ MasterPartitionMethod(RangeVar *relation) { char partitionMethod = '\0'; PGresult *queryResult = NULL; + bool raiseInterrupts = true; char *relationName = relation->relname; char *schemaName = relation->schemaname; @@ -745,7 +746,11 @@ MasterPartitionMethod(RangeVar *relation) StringInfo partitionMethodCommand = makeStringInfo(); appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName); - queryResult = PQexec(masterConnection->pgConn, partitionMethodCommand->data); + if (!SendRemoteCommand(masterConnection, partitionMethodCommand->data)) + { + ReportConnectionError(masterConnection, ERROR); + } + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) { char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0); @@ -766,6 +771,9 @@ MasterPartitionMethod(RangeVar *relation) PQclear(queryResult); + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); + Assert(!queryResult); + return partitionMethod; } @@ -814,6 +822,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, ListCell *placementCell = NULL; List *connectionList = NULL; int64 shardId = shardConnections->shardId; + bool raiseInterrupts = true; MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "OpenCopyConnections", @@ -871,15 +880,19 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, MarkRemoteTransactionCritical(connection); ClaimConnectionExclusively(connection); RemoteTransactionBeginIfNecessary(connection); + copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId, useBinaryCopyFormat); - result = PQexec(connection->pgConn, copyCommand->data); + if (!SendRemoteCommand(connection, copyCommand->data)) + { + ReportConnectionError(connection, ERROR); + } + result = GetRemoteCommandResult(connection, raiseInterrupts); if (PQresultStatus(result) != PGRES_COPY_IN) { ReportResultError(connection, result, ERROR); } - PQclear(result); connectionList = lappend(connectionList, connection); } @@ -1013,11 +1026,16 @@ RemoteFinalizedShardPlacementList(uint64 shardId) { List *finalizedPlacementList = NIL; PGresult *queryResult = NULL; + bool raiseInterrupts = true; StringInfo shardPlacementsCommand = makeStringInfo(); appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId); - queryResult = PQexec(masterConnection->pgConn, shardPlacementsCommand->data); + if (!SendRemoteCommand(masterConnection, shardPlacementsCommand->data)) + { + ReportConnectionError(masterConnection, ERROR); + } + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) { int rowCount = PQntuples(queryResult); @@ -1047,6 +1065,9 @@ RemoteFinalizedShardPlacementList(uint64 shardId) } PQclear(queryResult); + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); + Assert(!queryResult); + return finalizedPlacementList; } @@ -1128,8 +1149,7 @@ SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList) static void SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *connection) { - int copyResult = PQputCopyData(connection->pgConn, dataBuffer->data, dataBuffer->len); - if (copyResult != 1) + if (!PutRemoteCopyData(connection, dataBuffer->data, dataBuffer->len)) { ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("failed to COPY to shard %ld on %s:%d", @@ -1153,13 +1173,11 @@ EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure) foreach(connectionCell, connectionList) { MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); - int copyEndResult = 0; PGresult *result = NULL; + bool raiseInterrupts = true; /* end the COPY input */ - copyEndResult = PQputCopyEnd(connection->pgConn, NULL); - - if (copyEndResult != 1) + if (!PutRemoteCopyEnd(connection, NULL)) { if (stopOnFailure) { @@ -1172,7 +1190,7 @@ EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure) } /* check whether there were any COPY errors */ - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure) { ReportCopyError(connection, result); @@ -1492,11 +1510,16 @@ RemoteCreateEmptyShard(char *relationName) { int64 shardId = 0; PGresult *queryResult = NULL; + bool raiseInterrupts = true; StringInfo createEmptyShardCommand = makeStringInfo(); appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName); - queryResult = PQexec(masterConnection->pgConn, createEmptyShardCommand->data); + if (!SendRemoteCommand(masterConnection, createEmptyShardCommand->data)) + { + ReportConnectionError(masterConnection, ERROR); + } + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) { char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0); @@ -1510,6 +1533,8 @@ RemoteCreateEmptyShard(char *relationName) } PQclear(queryResult); + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); + Assert(!queryResult); return shardId; } @@ -1540,18 +1565,25 @@ static void RemoteUpdateShardStatistics(uint64 shardId) { PGresult *queryResult = NULL; + bool raiseInterrupts = true; StringInfo updateShardStatisticsCommand = makeStringInfo(); appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY, shardId); - queryResult = PQexec(masterConnection->pgConn, updateShardStatisticsCommand->data); + if (!SendRemoteCommand(masterConnection, updateShardStatisticsCommand->data)) + { + ReportConnectionError(masterConnection, ERROR); + } + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); if (PQresultStatus(queryResult) != PGRES_TUPLES_OK) { ereport(ERROR, (errmsg("could not update shard statistics"))); } PQclear(queryResult); + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); + Assert(!queryResult); }