mirror of https://github.com/citusdata/citus.git
Remote copy hello world works
parent
91ca1fb0b4
commit
bb2f72f91e
|
@ -78,6 +78,7 @@ static bool ShouldSendCopyNow(StringInfo buffer);
|
||||||
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
|
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
|
||||||
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
|
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
|
||||||
localCopyOutState);
|
localCopyOutState);
|
||||||
|
static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest);
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
CanUseLocalCopy(uint64 destinationNodeId)
|
CanUseLocalCopy(uint64 destinationNodeId)
|
||||||
|
@ -98,6 +99,37 @@ ShouldSendCopyNow(StringInfo buffer)
|
||||||
return buffer->len > LocalCopyFlushThresholdByte;
|
return buffer->len > LocalCopyFlushThresholdByte;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
|
||||||
|
{
|
||||||
|
int connectionFlags = OUTSIDE_TRANSACTION;
|
||||||
|
char *currentUser = CurrentUserName();
|
||||||
|
WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId,
|
||||||
|
false /* missingOk */);
|
||||||
|
copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags,
|
||||||
|
workerNode->workerName,
|
||||||
|
workerNode->workerPort,
|
||||||
|
currentUser,
|
||||||
|
NULL /* database (current) */);
|
||||||
|
ClaimConnectionExclusively(copyDest->connection);
|
||||||
|
|
||||||
|
StringInfo copyStatement = ConstructCopyStatement(
|
||||||
|
copyDest->destinationShardFullyQualifiedName,
|
||||||
|
copyDest->copyOutState->binary);
|
||||||
|
|
||||||
|
if (!SendRemoteCommand(copyDest->connection, copyStatement->data))
|
||||||
|
{
|
||||||
|
ReportConnectionError(copyDest->connection, ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
PGresult *result = GetRemoteCommandResult(copyDest->connection, true /* raiseInterrupts */);
|
||||||
|
if (PQresultStatus(result) != PGRES_COPY_IN)
|
||||||
|
{
|
||||||
|
ReportResultError(copyDest->connection, result, ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
|
@ -112,24 +144,10 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
|
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
|
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
|
||||||
|
|
||||||
/* Create connection lazily */
|
/* If remote copy, connect lazily and initiate copy */
|
||||||
if (copyDest->tuplesSent == 0 && (!copyDest->useLocalCopy))
|
if (copyDest->tuplesSent == 0 && (!copyDest->useLocalCopy))
|
||||||
{
|
{
|
||||||
int connectionFlags = OUTSIDE_TRANSACTION;
|
ConnectToRemoteAndStartCopy(copyDest);
|
||||||
char *currentUser = CurrentUserName();
|
|
||||||
WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId,
|
|
||||||
false /* missingOk */);
|
|
||||||
copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags,
|
|
||||||
workerNode->workerName,
|
|
||||||
workerNode->workerPort,
|
|
||||||
currentUser,
|
|
||||||
NULL /* database (current) */);
|
|
||||||
ClaimConnectionExclusively(copyDest->connection);
|
|
||||||
|
|
||||||
StringInfo copyStatement = ConstructCopyStatement(
|
|
||||||
copyDest->destinationShardFullyQualifiedName,
|
|
||||||
copyDest->copyOutState->binary);
|
|
||||||
ExecuteCriticalRemoteCommand(copyDest->connection, copyStatement->data);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
slot_getallattrs(slot);
|
slot_getallattrs(slot);
|
||||||
|
@ -148,8 +166,13 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions;
|
FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions;
|
||||||
|
|
||||||
resetStringInfo(copyOutState->fe_msgbuf);
|
resetStringInfo(copyOutState->fe_msgbuf);
|
||||||
|
|
||||||
|
if (copyDest->copyOutState->binary && copyDest->tuplesSent == 0)
|
||||||
|
{
|
||||||
|
AppendCopyBinaryHeaders(copyDest->copyOutState);
|
||||||
|
}
|
||||||
|
|
||||||
AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor,
|
AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor,
|
||||||
copyOutState, columnOutputFunctions,
|
copyOutState, columnOutputFunctions,
|
||||||
NULL /* columnCoercionPaths */);
|
NULL /* columnCoercionPaths */);
|
||||||
|
@ -159,9 +182,13 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName);
|
char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName);
|
||||||
char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName);
|
char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName);
|
||||||
|
|
||||||
|
char *errorMessage = PQerrorMessage(copyDest->connection->pgConn);
|
||||||
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
||||||
errmsg("Failed to COPY to shard %s.%s,",
|
errmsg("Failed to COPY to shard %s.%s : %s,",
|
||||||
destinationShardSchemaName,destinationShardRelationName ),
|
destinationShardSchemaName,
|
||||||
|
destinationShardRelationName,
|
||||||
|
errorMessage),
|
||||||
errdetail("failed to send %d bytes %s on node %u",
|
errdetail("failed to send %d bytes %s on node %u",
|
||||||
copyOutState->fe_msgbuf->len,
|
copyOutState->fe_msgbuf->len,
|
||||||
copyOutState->fe_msgbuf->data,
|
copyOutState->fe_msgbuf->data,
|
||||||
|
@ -310,7 +337,7 @@ ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool useBinaryF
|
||||||
|
|
||||||
if (useBinaryFormat)
|
if (useBinaryFormat)
|
||||||
{
|
{
|
||||||
appendStringInfo(command, "WITH (format binary);");
|
appendStringInfo(command, " WITH (format binary);");
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue