diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index ab20aed5d..e5ea30f03 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -78,6 +78,7 @@ static bool ShouldSendCopyNow(StringInfo buffer); static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); +static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); static bool CanUseLocalCopy(uint64 destinationNodeId) @@ -98,6 +99,37 @@ ShouldSendCopyNow(StringInfo buffer) return buffer->len > LocalCopyFlushThresholdByte; } +static void +ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) +{ + int connectionFlags = OUTSIDE_TRANSACTION; + char *currentUser = CurrentUserName(); + WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, + false /* missingOk */); + copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, + workerNode->workerName, + workerNode->workerPort, + currentUser, + NULL /* database (current) */); + ClaimConnectionExclusively(copyDest->connection); + + StringInfo copyStatement = ConstructCopyStatement( + copyDest->destinationShardFullyQualifiedName, + copyDest->copyOutState->binary); + + if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) + { + ReportConnectionError(copyDest->connection, ERROR); + } + + PGresult *result = GetRemoteCommandResult(copyDest->connection, true /* raiseInterrupts */); + if (PQresultStatus(result) != PGRES_COPY_IN) + { + ReportResultError(copyDest->connection, result, ERROR); + } + + PQclear(result); +} static bool ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) @@ -112,24 +144,10 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); - /* Create connection lazily */ + /* If remote copy, connect lazily and initiate copy */ if (copyDest->tuplesSent == 0 && (!copyDest->useLocalCopy)) { - int connectionFlags = OUTSIDE_TRANSACTION; - char *currentUser = CurrentUserName(); - WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, - false /* missingOk */); - copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, - workerNode->workerName, - workerNode->workerPort, - currentUser, - NULL /* database (current) */); - ClaimConnectionExclusively(copyDest->connection); - - StringInfo copyStatement = ConstructCopyStatement( - copyDest->destinationShardFullyQualifiedName, - copyDest->copyOutState->binary); - ExecuteCriticalRemoteCommand(copyDest->connection, copyStatement->data); + ConnectToRemoteAndStartCopy(copyDest); } slot_getallattrs(slot); @@ -148,8 +166,13 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) else { FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; - resetStringInfo(copyOutState->fe_msgbuf); + + if (copyDest->copyOutState->binary && copyDest->tuplesSent == 0) + { + AppendCopyBinaryHeaders(copyDest->copyOutState); + } + AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor, copyOutState, columnOutputFunctions, NULL /* columnCoercionPaths */); @@ -159,9 +182,13 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName); char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName); + char *errorMessage = PQerrorMessage(copyDest->connection->pgConn); + ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("Failed to COPY to shard %s.%s,", - destinationShardSchemaName,destinationShardRelationName ), + errmsg("Failed to COPY to shard %s.%s : %s,", + destinationShardSchemaName, + destinationShardRelationName, + errorMessage), errdetail("failed to send %d bytes %s on node %u", copyOutState->fe_msgbuf->len, copyOutState->fe_msgbuf->data, @@ -310,7 +337,7 @@ ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool useBinaryF if (useBinaryFormat) { - appendStringInfo(command, "WITH (format binary);"); + appendStringInfo(command, " WITH (format binary);"); } else {