diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 80e90f88d..4623dfcf3 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -253,7 +253,6 @@ static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId) static void SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList); static void SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *connection); -static void ReportCopyError(MultiConnection *connection, PGresult *result); static uint32 AvailableColumnCount(TupleDesc tupleDescriptor); static Oid TypeForColumnName(Oid relationId, TupleDesc tupleDescriptor, char *columnName); @@ -1049,7 +1048,7 @@ EndRemoteCopy(int64 shardId, List *connectionList) * ReportCopyError tries to report a useful error message for the user from * the remote COPY error messages. */ -static void +void ReportCopyError(MultiConnection *connection, PGresult *result) { char *remoteMessage = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY); diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c new file mode 100644 index 000000000..e16e55cef --- /dev/null +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -0,0 +1,372 @@ +/*------------------------------------------------------------------------- + * + * worker_shard_copy.c + * Functions for copying a shard to desintaion with push copy. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "libpq-fe.h" +#include "postgres.h" +#include "commands/copy.h" +#include "nodes/makefuncs.h" +#include "parser/parse_relation.h" +#include "utils/lsyscache.h" +#include "utils/builtins.h" +#include "distributed/remote_commands.h" +#include "distributed/worker_shard_copy.h" +#include "distributed/commands/multi_copy.h" +#include "distributed/local_multi_copy.h" +#include "distributed/worker_manager.h" +#include "distributed/connection_management.h" +#include "distributed/relation_utils.h" +#include "distributed/version_compat.h" +#include "distributed/local_executor.h" + +/* + * LocalCopyBuffer is used in copy callback to return the copied rows. + * The reason this is a global variable is that we cannot pass an additional + * argument to the copy callback. + */ +static StringInfo LocalCopyBuffer; + +typedef struct ShardCopyDestReceiver +{ + /* public DestReceiver interface */ + DestReceiver pub; + + /* Destination Relation Name */ + FullRelationName *destinationRelation; + + /* descriptor of the tuples that are sent to the worker */ + TupleDesc tupleDescriptor; + + /* state on how to copy out data types */ + CopyOutState copyOutState; + FmgrInfo *columnOutputFunctions; + + /* number of tuples sent */ + int64 tuplesSent; + + /* destination node id */ + uint32_t destinationNodeId; + + /* local copy if destination shard in same node */ + bool useLocalCopy; + + /* + * Connection for destination shard (NULL if useLocalCopy is true) + */ + MultiConnection *connection; + +} ShardCopyDestReceiver; + +static bool ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); +static void ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, + TupleDesc inputTupleDescriptor); +static void ShardCopyDestReceiverShutdown(DestReceiver *destReceiver); +static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver); +static bool CanUseLocalCopy(uint64 destinationNodeId); +static StringInfo ConstructCopyStatement(FullRelationName *relation, bool useBinaryFormat); +static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); +static bool ShouldSendCopyNow(StringInfo buffer); +static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); +static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); + +static bool CanUseLocalCopy(uint64 destinationNodeId) +{ + /* If destination node is same as source, use local copy */ + return GetLocalNodeId() == destinationNodeId; +} + +/* + * ShouldSendCopyNow returns true if the given buffer size exceeds the + * local copy buffer size threshold. + */ +static bool +ShouldSendCopyNow(StringInfo buffer) +{ + /* LocalCopyFlushThreshold is in bytes */ + return buffer->len > LocalCopyFlushThresholdByte; +} + +static bool +ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) +{ + ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest; + CopyOutState copyOutState = copyDest->copyOutState; + TupleDesc tupleDescriptor = copyDest->tupleDescriptor; + FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; + + /* Create connection lazily */ + if(copyDest->tuplesSent == 0 && (!copyDest->useLocalCopy)) + { + int connectionFlags = OUTSIDE_TRANSACTION; + char *currentUser = CurrentUserName(); + WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, false /* missingOk */); + copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, + workerNode->workerName, + workerNode->workerPort, + currentUser, + NULL); + ClaimConnectionExclusively(copyDest->connection); + + StringInfo copyStatement = ConstructCopyStatement(copyDest->destinationRelation, + copyDest->destinationNodeId); + ExecuteCriticalRemoteCommand(copyDest->connection, copyStatement->data); + } + + slot_getallattrs(slot); + Datum *columnValues = slot->tts_values; + bool *columnNulls = slot->tts_isnull; + + if(copyDest->useLocalCopy) + { + WriteLocalTuple(slot, copyDest, copyOutState); + if (ShouldSendCopyNow(copyOutState->fe_msgbuf)) + { + LocalCopyToShard(copyDest, copyOutState); + } + } + else + { + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, + copyOutState, columnOutputFunctions, NULL /* columnCoercionPaths */); + if (!PutRemoteCopyData(copyDest->connection, copyOutState->fe_msgbuf->data, copyOutState->fe_msgbuf->len)) + { + ereport(ERROR, (errcode(ERRCODE_IO_ERROR), + errmsg("Failed to COPY to shard %s,", + copyDest->destinationRelation->relationName), + errdetail("failed to send %d bytes %s", copyOutState->fe_msgbuf->len, + copyOutState->fe_msgbuf->data))); + } + } + + copyDest->tuplesSent++; + return true; +} + +static void +ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor) +{ + ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest; + copyDest->tupleDescriptor = inputTupleDescriptor; + copyDest->tuplesSent = 0; + + const char *delimiterCharacter = "\t"; + const char *nullPrintCharacter = "\\N"; + + /* define how tuples will be serialised */ + CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); + copyOutState->binary = CanUseBinaryCopyFormat(inputTupleDescriptor); + copyOutState->null_print = (char *) nullPrintCharacter; + copyOutState->fe_msgbuf = makeStringInfo(); + copyOutState->delim = (char *) delimiterCharacter; + // not used for shard copy + copyOutState->null_print_client = (char *) nullPrintCharacter; + copyOutState->rowcontext = NULL; + copyDest->copyOutState = copyOutState; + + // TODO(niupre): Explain why this is needed. + copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, + copyOutState->binary); +} + +static void +ShardCopyDestReceiverShutdown(DestReceiver *dest) +{ + ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest; + + if(copyDest->useLocalCopy) + { + if (copyDest->copyOutState != NULL && + copyDest->copyOutState->fe_msgbuf->len > 0) + { + LocalCopyToShard(copyDest, copyDest->copyOutState); + } + } + else if(copyDest->connection != NULL) + { + resetStringInfo(copyDest->copyOutState->fe_msgbuf); + if(copyDest->copyOutState->binary) + { + AppendCopyBinaryFooters(copyDest->copyOutState); + } + + /* end the COPY input */ + if (!PutRemoteCopyEnd(copyDest->connection, NULL /* errormsg */)) + { + ereport(ERROR, (errcode(ERRCODE_IO_ERROR), + errmsg("Failed to COPY to destination shard %s", + copyDest->destinationRelation->relationName))); + } + + /* check whether there were any COPY errors */ + PGresult *result = GetRemoteCommandResult(copyDest->connection, true /* raiseInterrupts */); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + ReportCopyError(copyDest->connection, result); + } + + PQclear(result); + ForgetResults(copyDest->connection); + CloseConnection(copyDest->connection); + } +} + +static void +ShardCopyDestReceiverDestroy(DestReceiver *dest) +{ + ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest; + + if (copyDest->copyOutState) + { + pfree(copyDest->copyOutState); + } + + if (copyDest->columnOutputFunctions) + { + pfree(copyDest->columnOutputFunctions); + } + + pfree(copyDest); +} + +/* + * ConstructCopyStatement constructs the text of a COPY statement + * for copying into a result table + */ +static StringInfo +ConstructCopyStatement(FullRelationName *relation, bool useBinaryFormat) +{ + StringInfo command = makeStringInfo(); + appendStringInfo(command, "COPY %s FROM STDIN", + quote_qualified_identifier(relation->schemaName, relation->relationName)); + + if(useBinaryFormat) + { + appendStringInfo(command, "WITH (format binary)"); + } + + return command; +} + +DestReceiver * CreateShardCopyDestReceiver( + FullRelationName* destinationRelation, + uint32_t destinationNodeId) +{ + ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) palloc0( + sizeof(ShardCopyDestReceiver)); + + /* set up the DestReceiver function pointers */ + copyDest->pub.receiveSlot = ShardCopyDestReceiverReceive; + copyDest->pub.rStartup = ShardCopyDestReceiverStartup; + copyDest->pub.rShutdown = ShardCopyDestReceiverShutdown; + copyDest->pub.rDestroy = ShardCopyDestReceiverDestroy; + copyDest->pub.mydest = DestCopyOut; + + copyDest->destinationNodeId = destinationNodeId; + copyDest->destinationRelation = destinationRelation; + copyDest->tuplesSent = 0; + copyDest->connection = NULL; + copyDest->useLocalCopy = CanUseLocalCopy(destinationNodeId); + + return (DestReceiver *) copyDest; +} + +static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) +{ + /* + * Since we are doing a local copy, the following statements should + * use local execution to see the changes + */ + SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); + + bool isBinaryCopy = localCopyOutState->binary; + bool shouldAddBinaryHeaders = (isBinaryCopy && localCopyOutState->fe_msgbuf->len == 0); + if (shouldAddBinaryHeaders) + { + AppendCopyBinaryHeaders(localCopyOutState); + } + + Datum *columnValues = slot->tts_values; + bool *columnNulls = slot->tts_isnull; + FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; + + AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor, + localCopyOutState, columnOutputFunctions, + NULL /* columnCoercionPaths */); +} + +/* + * LocalCopyToShard finishes local copy for the given destination shard. + */ +static void +LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) +{ + bool isBinaryCopy = localCopyOutState->binary; + if (isBinaryCopy) + { + AppendCopyBinaryFooters(localCopyOutState); + } + + /* + * Set the buffer as a global variable to allow ReadFromLocalBufferCallback + * to read from it. We cannot pass additional arguments to + * ReadFromLocalBufferCallback. + */ + LocalCopyBuffer = localCopyOutState->fe_msgbuf; + + Oid destinationSchemaOid = get_namespace_oid(copyDest->destinationRelation->schemaName, false /* missing_ok */); + Oid destinationShardOid = get_relname_relid(copyDest->destinationRelation->relationName, destinationSchemaOid); + + DefElem *binaryFormatOption = NULL; + if (isBinaryCopy) + { + binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1); + } + + Relation shard = table_open(destinationShardOid, RowExclusiveLock); + ParseState *pState = make_parsestate(NULL /* parentParseState */); + (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, + NULL /* alias */, false /* inh */, false /* inFromCl */); + CopyFromState cstate = BeginCopyFrom_compat(pState, shard, + NULL /* whereClause */, + NULL /* fileName */, + false /* is_program */, + ReadFromLocalBufferCallback, + NULL /* attlist (NULL is all columns) */, + list_make1(binaryFormatOption)); + resetStringInfo(localCopyOutState->fe_msgbuf); + + CopyFrom(cstate); + EndCopyFrom(cstate); + + table_close(shard, NoLock); + free_parsestate(pState); +} + +/* + * ReadFromLocalBufferCallback is the copy callback. + * It always tries to copy maxRead bytes. + */ +static int +ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead) +{ + int bytesRead = 0; + int avail = LocalCopyBuffer->len - LocalCopyBuffer->cursor; + int bytesToRead = Min(avail, maxRead); + if (bytesToRead > 0) + { + memcpy_s(outBuf, bytesToRead, + &LocalCopyBuffer->data[LocalCopyBuffer->cursor], bytesToRead); + } + bytesRead += bytesToRead; + LocalCopyBuffer->cursor += bytesToRead; + + return bytesRead; +} + diff --git a/src/backend/distributed/operations/worker_split_copy.c b/src/backend/distributed/operations/worker_split_copy.c index f015eaf61..b1e565ade 100644 --- a/src/backend/distributed/operations/worker_split_copy.c +++ b/src/backend/distributed/operations/worker_split_copy.c @@ -14,8 +14,11 @@ #include "postgres.h" #include "catalog/namespace.h" #include "utils/lsyscache.h" +#include "distributed/listutils.h" #include "distributed/metadata_cache.h" +#include "distributed/relation_utils.h" #include "distributed/worker_split_copy.h" +#include "distributed/worker_shard_copy.h" typedef struct SplitCopyDestReceiver { @@ -48,28 +51,41 @@ static void SplitCopyDestReceiverDestroy(DestReceiver *copyDest); DestReceiver * CreateSplitCopyDestReceiver(FullRelationName *sourceShard, List* splitCopyInfoList) { - SplitCopyDestReceiver *resultDest = + SplitCopyDestReceiver *splitCopyDest = palloc0(sizeof(SplitCopyDestReceiver)); /* set up the DestReceiver function pointers */ - resultDest->pub.receiveSlot = SplitCopyDestReceiverReceive; - resultDest->pub.rStartup = SplitCopyDestReceiverStartup; - resultDest->pub.rShutdown = SplitCopyDestReceiverShutdown; - resultDest->pub.rDestroy = SplitCopyDestReceiverDestroy; + splitCopyDest->pub.receiveSlot = SplitCopyDestReceiverReceive; + splitCopyDest->pub.rStartup = SplitCopyDestReceiverStartup; + splitCopyDest->pub.rShutdown = SplitCopyDestReceiverShutdown; + splitCopyDest->pub.rDestroy = SplitCopyDestReceiverDestroy; Oid sourceSchemaOid = get_namespace_oid(sourceShard->schemaName, false /* missing_ok */); Oid sourceShardOid = get_relname_relid(sourceShard->relationName, sourceSchemaOid); - resultDest->sourceShardOid = sourceShardOid; + splitCopyDest->sourceShardOid = sourceShardOid; - // TODO(niupre): Create internal destination receivers for each shard. - for (int index = 0; index < splitCopyInfoList->length; index++) + splitCopyDest->splitFactor = splitCopyInfoList->length; + + DestReceiver **shardCopyDests = palloc0(splitCopyDest->splitFactor * sizeof(DestReceiver *)); + SplitCopyInfo **splitCopyInfos = palloc0(splitCopyDest->splitFactor * sizeof(SplitCopyInfo *)); + + SplitCopyInfo *splitCopyInfo = NULL; + int index = 0; + foreach_ptr(splitCopyInfo, splitCopyInfoList) { + DestReceiver *shardCopyDest = CreateShardCopyDestReceiver( + splitCopyInfo->destinationShard, + splitCopyInfo->nodeId); + shardCopyDests[index] = shardCopyDest; + splitCopyInfos[index] = splitCopyInfo; + index++; } - resultDest->splitFactor = splitCopyInfoList->length; + splitCopyDest->shardCopyDestReceiverArray = shardCopyDests; + splitCopyDest->splitCopyInfoArray = splitCopyInfos; - return (DestReceiver *) resultDest; + return (DestReceiver *) splitCopyDest; } static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor) @@ -146,5 +162,8 @@ static void SplitCopyDestReceiverDestroy(DestReceiver *dest) { DestReceiver *shardCopyDest = self->shardCopyDestReceiverArray[index]; shardCopyDest->rDestroy(shardCopyDest); + + pfree(shardCopyDest); + pfree(self->splitCopyInfoArray[index]); } } diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index f7a50644d..90e32e850 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -179,6 +179,7 @@ extern void CheckCopyPermissions(CopyStmt *copyStatement); extern bool IsCopyResultStmt(CopyStmt *copyStatement); extern void ConversionPathForTypes(Oid inputType, Oid destType, CopyCoercionData *result); extern Datum CoerceColumnValue(Datum inputValue, CopyCoercionData *coercionPath); +extern void ReportCopyError(MultiConnection *connection, PGresult *result); #endif /* MULTI_COPY_H */ diff --git a/src/include/distributed/relation_utils.h b/src/include/distributed/relation_utils.h index 873398f00..0e512efad 100644 --- a/src/include/distributed/relation_utils.h +++ b/src/include/distributed/relation_utils.h @@ -15,6 +15,12 @@ #include "utils/relcache.h" +typedef struct FullRelationName +{ + char *schemaName; + char *relationName; +} FullRelationName; + extern char * RelationGetNamespaceName(Relation relation); #endif /* RELATION_UTILS_H */ diff --git a/src/include/distributed/worker_shard_copy.h b/src/include/distributed/worker_shard_copy.h new file mode 100644 index 000000000..c61f86ec5 --- /dev/null +++ b/src/include/distributed/worker_shard_copy.h @@ -0,0 +1,21 @@ +/*------------------------------------------------------------------------- + * + * worker_shard_copy.c + * Copy data to destination shard in a push approach. + * + * Copyright (c) Citus Data, Inc. + * + * + *------------------------------------------------------------------------- + */ + +#ifndef WORKER_SHARD_COPY_H_ +#define WORKER_SHARD_COPY_H_ + +struct FullRelationName; + +extern DestReceiver * CreateShardCopyDestReceiver( + struct FullRelationName* relationName, + uint32_t destinationNodeId); + +#endif /* WORKER_SHARD_COPY_H_ */ diff --git a/src/include/distributed/worker_split_copy.h b/src/include/distributed/worker_split_copy.h index 4af30229b..12e0f4621 100644 --- a/src/include/distributed/worker_split_copy.h +++ b/src/include/distributed/worker_split_copy.h @@ -12,12 +12,6 @@ #ifndef WORKER_SPLIT_COPY_H_ #define WORKER_SPLIT_COPY_H_ -typedef struct FullRelationName -{ - char *schemaName; - char *relationName; -} FullRelationName; - typedef struct SplitCopyInfo { FullRelationName *destinationShard; /* destination shard name */