mirror of https://github.com/citusdata/citus.git
Refactor COPY to accept a generic tuple source
parent
a0fa7bf130
commit
ea7ce936b7
|
@ -8,9 +8,9 @@
|
|||
* COPY ... FROM commands on distributed tables. CitusCopyFrom parses the input
|
||||
* from stdin, a program, or a file, and decides to copy new rows to existing
|
||||
* shards or new shards based on the partition method of the distributed table.
|
||||
* If copy is run a worker node, CitusCopyFrom calls CopyFromWorkerNode which
|
||||
* parses the master node copy options and handles communication with the master
|
||||
* node.
|
||||
* If copy is run a worker node, CitusCopyFrom calls LocalCopyFromWorkerNode
|
||||
* which parses the master node copy options and handles communication with
|
||||
* the master node.
|
||||
*
|
||||
* It opens a new connection for every shard placement and uses the PQputCopyData
|
||||
* function to copy the data. Because PQputCopyData transmits data, asynchronously,
|
||||
|
@ -128,6 +128,15 @@
|
|||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
/* struct type to use a local COPY .. FROM statement as a tuple source */
|
||||
typedef struct LocalCopyContext
|
||||
{
|
||||
CopyStmt *copyStatement;
|
||||
CopyState copyState;
|
||||
EState *executorState;
|
||||
} LocalCopyContext;
|
||||
|
||||
|
||||
/* constant used in binary protocol */
|
||||
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
|
||||
|
||||
|
@ -136,21 +145,28 @@ static PGconn *masterConnection = NULL;
|
|||
|
||||
|
||||
/* Local functions forward declarations */
|
||||
static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag);
|
||||
static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
|
||||
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId);
|
||||
static CopyTupleSource * CreateLocalCopyTupleSource(CopyStmt *copyStatement);
|
||||
static void LocalCopyOpen(void *context, Relation relation,
|
||||
ErrorContextCallback *errorCallback);
|
||||
static bool LocalCopyNextTuple(void *context, Datum *columnValues, bool *columnNulls);
|
||||
static void LocalCopyClose(void *context);
|
||||
static uint64 LocalCopyFromWorkerNode(CopyStmt *copyStatement);
|
||||
static uint64 CopyToExistingShards(CopyTupleSource *tupleSource, RangeVar *relation);
|
||||
static uint64 CopyToNewShards(CopyTupleSource *tupleSource, RangeVar *relation,
|
||||
Oid relationId);
|
||||
static char MasterPartitionMethod(RangeVar *relation);
|
||||
static void RemoveMasterOptions(CopyStmt *copyStatement);
|
||||
static void OpenCopyTransactions(CopyStmt *copyStatement,
|
||||
ShardConnections *shardConnections, bool stopOnFailure,
|
||||
bool useBinaryCopyFormat);
|
||||
static void OpenCopyTransactions(RangeVar *relation, ShardConnections *shardConnections,
|
||||
bool stopOnFailure, bool useBinaryCopyFormat);
|
||||
static CopyOutState CreateCopyOutState(TupleDesc tupleDescriptor,
|
||||
MemoryContext rowContext);
|
||||
static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription,
|
||||
CopyOutState rowOutputState);
|
||||
static List * MasterShardPlacementList(uint64 shardId);
|
||||
static List * RemoteFinalizedShardPlacementList(uint64 shardId);
|
||||
static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList);
|
||||
static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList);
|
||||
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId,
|
||||
static StringInfo ConstructCopyStatement(RangeVar *relation, int64 shardId,
|
||||
bool useBinaryCopyFormat);
|
||||
static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList);
|
||||
static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection,
|
||||
|
@ -158,8 +174,8 @@ static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection,
|
|||
static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
|
||||
static void ReportCopyError(PGconn *connection, PGresult *result);
|
||||
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
|
||||
static void StartCopyToNewShard(ShardConnections *shardConnections,
|
||||
CopyStmt *copyStatement, bool useBinaryCopyFormat);
|
||||
static void StartCopyToNewShard(RangeVar *relation, ShardConnections *shardConnections,
|
||||
bool useBinaryCopyFormat);
|
||||
static int64 MasterCreateEmptyShard(char *relationName);
|
||||
static int64 CreateEmptyShard(char *relationName);
|
||||
static int64 RemoteCreateEmptyShard(char *relationName);
|
||||
|
@ -186,6 +202,7 @@ void
|
|||
CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
||||
{
|
||||
bool isCopyFromWorker = false;
|
||||
uint64 processedRowCount = 0;
|
||||
|
||||
/* disallow COPY to/from file or program except for superusers */
|
||||
if (copyStatement->filename != NULL && !superuser())
|
||||
|
@ -211,33 +228,143 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
isCopyFromWorker = IsCopyFromWorker(copyStatement);
|
||||
if (isCopyFromWorker)
|
||||
{
|
||||
CopyFromWorkerNode(copyStatement, completionTag);
|
||||
processedRowCount = LocalCopyFromWorkerNode(copyStatement);
|
||||
}
|
||||
else
|
||||
{
|
||||
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
|
||||
char partitionMethod = PartitionMethod(relationId);
|
||||
CopyTupleSource *tupleSource = CreateLocalCopyTupleSource(copyStatement);
|
||||
RangeVar *relation = copyStatement->relation;
|
||||
|
||||
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod ==
|
||||
DISTRIBUTE_BY_RANGE)
|
||||
{
|
||||
CopyToExistingShards(copyStatement, completionTag);
|
||||
}
|
||||
else if (partitionMethod == DISTRIBUTE_BY_APPEND)
|
||||
{
|
||||
CopyToNewShards(copyStatement, completionTag, relationId);
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("unsupported partition method")));
|
||||
}
|
||||
processedRowCount = CopyTupleSourceToShards(tupleSource, relation);
|
||||
}
|
||||
|
||||
if (completionTag != NULL)
|
||||
{
|
||||
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
|
||||
"COPY " UINT64_FORMAT, processedRowCount);
|
||||
}
|
||||
|
||||
XactModificationLevel = XACT_MODIFICATION_DATA;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CopyTupleSourceToShards copies the tuples provided by tupleSource to
|
||||
* the shards of the given relation. For hash- and range-distributed
|
||||
* tables it copies to the existing shards and for append-distributed
|
||||
* tables it adds a new shard.
|
||||
*/
|
||||
uint64
|
||||
CopyTupleSourceToShards(CopyTupleSource *tupleSource, RangeVar *relation)
|
||||
{
|
||||
uint64 processedRowCount = 0;
|
||||
|
||||
Oid relationId = RangeVarGetRelid(relation, NoLock, false);
|
||||
char partitionMethod = PartitionMethod(relationId);
|
||||
|
||||
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE)
|
||||
{
|
||||
processedRowCount = CopyToExistingShards(tupleSource, relation);
|
||||
}
|
||||
else if (partitionMethod == DISTRIBUTE_BY_APPEND)
|
||||
{
|
||||
processedRowCount = CopyToNewShards(tupleSource, relation, relationId);
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("unsupported partition method")));
|
||||
}
|
||||
|
||||
return processedRowCount;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateLocalCopyTupleSource creates and returns a tuple source for a local
|
||||
* COPY .. FROM command.
|
||||
*/
|
||||
static CopyTupleSource *
|
||||
CreateLocalCopyTupleSource(CopyStmt *copyStatement)
|
||||
{
|
||||
LocalCopyContext *localCopyContext = palloc0(sizeof(LocalCopyContext));
|
||||
CopyTupleSource *tupleSource = palloc0(sizeof(CopyTupleSource));
|
||||
EState *executorState = CreateExecutorState();
|
||||
|
||||
localCopyContext->copyStatement = copyStatement;
|
||||
localCopyContext->copyState = NULL;
|
||||
localCopyContext->executorState = executorState;
|
||||
|
||||
tupleSource->context = localCopyContext;
|
||||
tupleSource->rowContext = GetPerTupleMemoryContext(executorState);
|
||||
tupleSource->Open = LocalCopyOpen;
|
||||
tupleSource->NextTuple = LocalCopyNextTuple;
|
||||
tupleSource->Close = LocalCopyClose;
|
||||
|
||||
return tupleSource;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LocalCopyOpen opens the COPY input for copying into the given relation.
|
||||
*/
|
||||
static void
|
||||
LocalCopyOpen(void *context, Relation relation, ErrorContextCallback *errorCallback)
|
||||
{
|
||||
LocalCopyContext *localCopyContext = (LocalCopyContext *) context;
|
||||
|
||||
CopyStmt *copyStatement = localCopyContext->copyStatement;
|
||||
CopyState copyState = NULL;
|
||||
|
||||
copyState = BeginCopyFrom(relation,
|
||||
copyStatement->filename,
|
||||
copyStatement->is_program,
|
||||
copyStatement->attlist,
|
||||
copyStatement->options);
|
||||
|
||||
localCopyContext->copyState = copyState;
|
||||
|
||||
errorCallback->callback = CopyFromErrorCallback;
|
||||
errorCallback->arg = (void *) copyState;
|
||||
errorCallback->previous = error_context_stack;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LocalCopyNextTuple reads a tuple from the COPY input. columnValeus
|
||||
* will contain values for all columns, using the default values for
|
||||
* columns that were not provided by the COPY command. columnNulls
|
||||
* is true for columns that contain NULL. LocalCopyNextTuple returns
|
||||
* if the end of the input is reached and true otherwise.
|
||||
*/
|
||||
static bool
|
||||
LocalCopyNextTuple(void *context, Datum *columnValues, bool *columnNulls)
|
||||
{
|
||||
LocalCopyContext *localCopyContext = (LocalCopyContext *) context;
|
||||
|
||||
CopyState copyState = localCopyContext->copyState;
|
||||
EState *executorState = localCopyContext->executorState;
|
||||
ExprContext *expressionContext = GetPerTupleExprContext(executorState);
|
||||
|
||||
bool nextRowFound = NextCopyFrom(copyState, expressionContext, columnValues,
|
||||
columnNulls, NULL);
|
||||
|
||||
return nextRowFound;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LocalCopyClose closes the COPY input.
|
||||
*/
|
||||
static void
|
||||
LocalCopyClose(void *context)
|
||||
{
|
||||
LocalCopyContext *localCopyContext = (LocalCopyContext *) context;
|
||||
|
||||
EndCopyFrom(localCopyContext->copyState);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsCopyFromWorker checks if the given copy statement has the master host option.
|
||||
*/
|
||||
|
@ -259,16 +386,17 @@ IsCopyFromWorker(CopyStmt *copyStatement)
|
|||
|
||||
|
||||
/*
|
||||
* CopyFromWorkerNode implements the COPY table_name FROM ... from worker nodes
|
||||
* for append-partitioned tables.
|
||||
* LocalCopyFromWorkerNode implements the COPY table_name FROM ... from worker
|
||||
* nodes for append-partitioned tables.
|
||||
*/
|
||||
static void
|
||||
CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
|
||||
static uint64
|
||||
LocalCopyFromWorkerNode(CopyStmt *copyStatement)
|
||||
{
|
||||
NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement);
|
||||
char *nodeName = masterNodeAddress->nodeName;
|
||||
int32 nodePort = masterNodeAddress->nodePort;
|
||||
char *nodeUser = CurrentUserName();
|
||||
uint64 processedRowCount = 0;
|
||||
|
||||
if (XactModificationLevel > XACT_MODIFICATION_NONE)
|
||||
{
|
||||
|
@ -285,17 +413,19 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
|
|||
PGresult *queryResult = NULL;
|
||||
Oid relationId = InvalidOid;
|
||||
char partitionMethod = 0;
|
||||
CopyTupleSource *tupleSource = NULL;
|
||||
|
||||
/* strip schema name for local reference */
|
||||
char *schemaName = copyStatement->relation->schemaname;
|
||||
copyStatement->relation->schemaname = NULL;
|
||||
RangeVar *relation = copyStatement->relation;
|
||||
char *schemaName = relation->schemaname;
|
||||
relation->schemaname = NULL;
|
||||
|
||||
relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
|
||||
relationId = RangeVarGetRelid(relation, NoLock, false);
|
||||
|
||||
/* put schema name back */
|
||||
copyStatement->relation->schemaname = schemaName;
|
||||
relation->schemaname = schemaName;
|
||||
|
||||
partitionMethod = MasterPartitionMethod(copyStatement->relation);
|
||||
partitionMethod = MasterPartitionMethod(relation);
|
||||
if (partitionMethod != DISTRIBUTE_BY_APPEND)
|
||||
{
|
||||
ereport(ERROR, (errmsg("copy from worker nodes is only supported "
|
||||
|
@ -317,7 +447,9 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
|
|||
*/
|
||||
RemoveMasterOptions(copyStatement);
|
||||
|
||||
CopyToNewShards(copyStatement, completionTag, relationId);
|
||||
tupleSource = CreateLocalCopyTupleSource(copyStatement);
|
||||
|
||||
processedRowCount = CopyToNewShards(tupleSource, relation, relationId);
|
||||
|
||||
/* commit metadata transactions */
|
||||
queryResult = PQexec(masterConnection, "COMMIT");
|
||||
|
@ -341,6 +473,8 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
|
|||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
return processedRowCount;
|
||||
}
|
||||
|
||||
|
||||
|
@ -349,10 +483,10 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
|
|||
* range-partitioned tables where there are already shards into which to copy
|
||||
* rows.
|
||||
*/
|
||||
static void
|
||||
CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||
static uint64
|
||||
CopyToExistingShards(CopyTupleSource *tupleSource, RangeVar *relation)
|
||||
{
|
||||
Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
|
||||
Oid tableId = RangeVarGetRelid(relation, NoLock, false);
|
||||
char *relationName = get_rel_name(tableId);
|
||||
Relation distributedRelation = NULL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
|
@ -363,8 +497,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
FmgrInfo *compareFunction = NULL;
|
||||
bool hasUniformHashDistribution = false;
|
||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId);
|
||||
const char *delimiterCharacter = "\t";
|
||||
const char *nullPrintCharacter = "\\N";
|
||||
|
||||
int shardCount = 0;
|
||||
List *shardIntervalList = NULL;
|
||||
|
@ -375,11 +507,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
ShardConnections *shardConnections = NULL;
|
||||
List *connectionList = NIL;
|
||||
|
||||
EState *executorState = NULL;
|
||||
MemoryContext executorTupleContext = NULL;
|
||||
ExprContext *executorExpressionContext = NULL;
|
||||
|
||||
CopyState copyState = NULL;
|
||||
CopyOutState copyOutState = NULL;
|
||||
FmgrInfo *columnOutputFunctions = NULL;
|
||||
uint64 processedRowCount = 0;
|
||||
|
@ -446,25 +573,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
useBinarySearch = true;
|
||||
}
|
||||
|
||||
/* initialize copy state to read from COPY data source */
|
||||
copyState = BeginCopyFrom(distributedRelation,
|
||||
copyStatement->filename,
|
||||
copyStatement->is_program,
|
||||
copyStatement->attlist,
|
||||
copyStatement->options);
|
||||
|
||||
executorState = CreateExecutorState();
|
||||
executorTupleContext = GetPerTupleMemoryContext(executorState);
|
||||
executorExpressionContext = GetPerTupleExprContext(executorState);
|
||||
|
||||
copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
|
||||
copyOutState->delim = (char *) delimiterCharacter;
|
||||
copyOutState->null_print = (char *) nullPrintCharacter;
|
||||
copyOutState->null_print_client = (char *) nullPrintCharacter;
|
||||
copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState);
|
||||
copyOutState->fe_msgbuf = makeStringInfo();
|
||||
copyOutState->rowcontext = executorTupleContext;
|
||||
|
||||
copyOutState = CreateCopyOutState(tupleDescriptor, tupleSource->rowContext);
|
||||
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
|
||||
|
||||
/*
|
||||
|
@ -480,10 +589,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
{
|
||||
ErrorContextCallback errorCallback;
|
||||
|
||||
/* open the tuple source */
|
||||
tupleSource->Open(tupleSource->context, distributedRelation, &errorCallback);
|
||||
|
||||
/* set up callback to identify error line number */
|
||||
errorCallback.callback = CopyFromErrorCallback;
|
||||
errorCallback.arg = (void *) copyState;
|
||||
errorCallback.previous = error_context_stack;
|
||||
error_context_stack = &errorCallback;
|
||||
|
||||
/* ensure transactions have unique names on worker nodes */
|
||||
|
@ -498,14 +607,13 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
bool shardConnectionsFound = false;
|
||||
MemoryContext oldContext = NULL;
|
||||
|
||||
ResetPerTupleExprContext(executorState);
|
||||
MemoryContextReset(tupleSource->rowContext);
|
||||
|
||||
oldContext = MemoryContextSwitchTo(executorTupleContext);
|
||||
oldContext = MemoryContextSwitchTo(tupleSource->rowContext);
|
||||
|
||||
/* parse a row from the input */
|
||||
nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
|
||||
columnValues, columnNulls, NULL);
|
||||
|
||||
nextRowFound = tupleSource->NextTuple(tupleSource->context, columnValues,
|
||||
columnNulls);
|
||||
if (!nextRowFound)
|
||||
{
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
@ -547,7 +655,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
if (!shardConnectionsFound)
|
||||
{
|
||||
/* open connections and initiate COPY on shard placements */
|
||||
OpenCopyTransactions(copyStatement, shardConnections, false,
|
||||
OpenCopyTransactions(relation, shardConnections, false,
|
||||
copyOutState->binary);
|
||||
|
||||
/* send copy binary headers to shard placements */
|
||||
|
@ -586,7 +694,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
PrepareRemoteTransactions(connectionList);
|
||||
}
|
||||
|
||||
EndCopyFrom(copyState);
|
||||
tupleSource->Close(tupleSource->context);
|
||||
|
||||
heap_close(distributedRelation, NoLock);
|
||||
|
||||
/* check for cancellation one last time before committing */
|
||||
|
@ -614,11 +723,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
CommitRemoteTransactions(connectionList, false);
|
||||
CloseConnections(connectionList);
|
||||
|
||||
if (completionTag != NULL)
|
||||
{
|
||||
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
|
||||
"COPY " UINT64_FORMAT, processedRowCount);
|
||||
}
|
||||
return processedRowCount;
|
||||
}
|
||||
|
||||
|
||||
|
@ -626,10 +731,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
* CopyToNewShards implements the COPY table_name FROM ... for append-partitioned
|
||||
* tables where we create new shards into which to copy rows.
|
||||
*/
|
||||
static void
|
||||
CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
||||
static uint64
|
||||
CopyToNewShards(CopyTupleSource *tupleSource, RangeVar *relation, Oid relationId)
|
||||
{
|
||||
FmgrInfo *columnOutputFunctions = NULL;
|
||||
uint64 processedRowCount = 0;
|
||||
|
||||
/* allocate column values and nulls arrays */
|
||||
Relation distributedRelation = heap_open(relationId, RowExclusiveLock);
|
||||
|
@ -638,36 +743,18 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
|||
Datum *columnValues = palloc0(columnCount * sizeof(Datum));
|
||||
bool *columnNulls = palloc0(columnCount * sizeof(bool));
|
||||
|
||||
EState *executorState = CreateExecutorState();
|
||||
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
|
||||
ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState);
|
||||
|
||||
const char *delimiterCharacter = "\t";
|
||||
const char *nullPrintCharacter = "\\N";
|
||||
ShardConnections *shardConnections = NULL;
|
||||
CopyOutState copyOutState = NULL;
|
||||
FmgrInfo *columnOutputFunctions = NULL;
|
||||
|
||||
/*
|
||||
* Shard connections should be initialized before the PG_TRY, since it is
|
||||
* used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH
|
||||
* (see sigsetjmp documentation).
|
||||
*/
|
||||
ShardConnections *shardConnections =
|
||||
(ShardConnections *) palloc0(sizeof(ShardConnections));
|
||||
|
||||
/* initialize copy state to read from COPY data source */
|
||||
CopyState copyState = BeginCopyFrom(distributedRelation,
|
||||
copyStatement->filename,
|
||||
copyStatement->is_program,
|
||||
copyStatement->attlist,
|
||||
copyStatement->options);
|
||||
|
||||
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
|
||||
copyOutState->delim = (char *) delimiterCharacter;
|
||||
copyOutState->null_print = (char *) nullPrintCharacter;
|
||||
copyOutState->null_print_client = (char *) nullPrintCharacter;
|
||||
copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState);
|
||||
copyOutState->fe_msgbuf = makeStringInfo();
|
||||
copyOutState->rowcontext = executorTupleContext;
|
||||
shardConnections = palloc0(sizeof(ShardConnections));
|
||||
|
||||
copyOutState = CreateCopyOutState(tupleDescriptor, tupleSource->rowContext);
|
||||
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
|
||||
|
||||
/* we use a PG_TRY block to close connections on errors (e.g. in NextCopyFrom) */
|
||||
|
@ -675,14 +762,12 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
|||
{
|
||||
uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
|
||||
uint64 copiedDataSizeInBytes = 0;
|
||||
uint64 processedRowCount = 0;
|
||||
|
||||
/* set up callback to identify error line number */
|
||||
ErrorContextCallback errorCallback;
|
||||
|
||||
errorCallback.callback = CopyFromErrorCallback;
|
||||
errorCallback.arg = (void *) copyState;
|
||||
errorCallback.previous = error_context_stack;
|
||||
/* open the tuple source */
|
||||
tupleSource->Open(tupleSource->context, distributedRelation, &errorCallback);
|
||||
|
||||
while (true)
|
||||
{
|
||||
|
@ -690,16 +775,18 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
|||
MemoryContext oldContext = NULL;
|
||||
uint64 messageBufferSize = 0;
|
||||
|
||||
ResetPerTupleExprContext(executorState);
|
||||
/* clear the previous tuple's memory */
|
||||
MemoryContextReset(tupleSource->rowContext);
|
||||
|
||||
/* switch to tuple memory context and start showing line number in errors */
|
||||
/* start showing the line number in errors */
|
||||
error_context_stack = &errorCallback;
|
||||
oldContext = MemoryContextSwitchTo(executorTupleContext);
|
||||
|
||||
/* switch to tuple memory context */
|
||||
oldContext = MemoryContextSwitchTo(tupleSource->rowContext);
|
||||
|
||||
/* parse a row from the input */
|
||||
nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
|
||||
columnValues, columnNulls, NULL);
|
||||
|
||||
nextRowFound = tupleSource->NextTuple(tupleSource->context, columnValues,
|
||||
columnNulls);
|
||||
if (!nextRowFound)
|
||||
{
|
||||
/* switch to regular memory context and stop showing line number in errors */
|
||||
|
@ -723,8 +810,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
|||
if (copiedDataSizeInBytes == 0)
|
||||
{
|
||||
/* create shard and open connections to shard placements */
|
||||
StartCopyToNewShard(shardConnections, copyStatement,
|
||||
copyOutState->binary);
|
||||
StartCopyToNewShard(relation, shardConnections, copyOutState->binary);
|
||||
|
||||
/* send copy binary headers to shard placements */
|
||||
if (copyOutState->binary)
|
||||
|
@ -782,17 +868,12 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
|||
MasterUpdateShardStatistics(shardConnections->shardId);
|
||||
}
|
||||
|
||||
EndCopyFrom(copyState);
|
||||
tupleSource->Close(tupleSource->context);
|
||||
|
||||
heap_close(distributedRelation, NoLock);
|
||||
|
||||
/* check for cancellation one last time before returning */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
if (completionTag != NULL)
|
||||
{
|
||||
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
|
||||
"COPY " UINT64_FORMAT, processedRowCount);
|
||||
}
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
|
@ -804,6 +885,8 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
|||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
return processedRowCount;
|
||||
}
|
||||
|
||||
|
||||
|
@ -920,7 +1003,7 @@ RemoveMasterOptions(CopyStmt *copyStatement)
|
|||
* shard placements.
|
||||
*/
|
||||
static void
|
||||
OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections,
|
||||
OpenCopyTransactions(RangeVar *relation, ShardConnections *shardConnections,
|
||||
bool stopOnFailure, bool useBinaryCopyFormat)
|
||||
{
|
||||
List *finalizedPlacementList = NIL;
|
||||
|
@ -986,7 +1069,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
|||
}
|
||||
|
||||
PQclear(result);
|
||||
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId,
|
||||
copyCommand = ConstructCopyStatement(relation, shardConnections->shardId,
|
||||
useBinaryCopyFormat);
|
||||
|
||||
result = PQexec(connection, copyCommand->data);
|
||||
|
@ -1040,6 +1123,28 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateCopyOutState creates a copy output state to pass to the COPY
|
||||
* functions.
|
||||
*/
|
||||
static CopyOutState
|
||||
CreateCopyOutState(TupleDesc tupleDescriptor, MemoryContext rowContext)
|
||||
{
|
||||
static const char *delimiterCharacter = "\t";
|
||||
static const char *nullPrintCharacter = "\\N";
|
||||
|
||||
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
|
||||
copyOutState->delim = (char *) delimiterCharacter;
|
||||
copyOutState->null_print = (char *) nullPrintCharacter;
|
||||
copyOutState->null_print_client = (char *) nullPrintCharacter;
|
||||
copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState);
|
||||
copyOutState->fe_msgbuf = makeStringInfo();
|
||||
copyOutState->rowcontext = rowContext;
|
||||
|
||||
return copyOutState;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CanUseBinaryCopyFormat iterates over columns of the relation given in rowOutputState
|
||||
* and looks for a column whose type is array of user-defined type or composite type.
|
||||
|
@ -1173,12 +1278,12 @@ SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList)
|
|||
* shard.
|
||||
*/
|
||||
static StringInfo
|
||||
ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCopyFormat)
|
||||
ConstructCopyStatement(RangeVar *relation, int64 shardId, bool useBinaryCopyFormat)
|
||||
{
|
||||
StringInfo command = makeStringInfo();
|
||||
|
||||
char *schemaName = copyStatement->relation->schemaname;
|
||||
char *relationName = copyStatement->relation->relname;
|
||||
char *schemaName = relation->schemaname;
|
||||
char *relationName = relation->relname;
|
||||
|
||||
char *shardName = pstrdup(relationName);
|
||||
char *shardQualifiedName = NULL;
|
||||
|
@ -1536,11 +1641,11 @@ AppendCopyBinaryFooters(CopyOutState footerOutputState)
|
|||
* opens connections to shard placements.
|
||||
*/
|
||||
static void
|
||||
StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement,
|
||||
StartCopyToNewShard(RangeVar *relation, ShardConnections *shardConnections,
|
||||
bool useBinaryCopyFormat)
|
||||
{
|
||||
char *relationName = copyStatement->relation->relname;
|
||||
char *schemaName = copyStatement->relation->schemaname;
|
||||
char *relationName = relation->relname;
|
||||
char *schemaName = relation->schemaname;
|
||||
char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
|
||||
|
||||
int64 shardId = MasterCreateEmptyShard(qualifiedName);
|
||||
|
@ -1551,7 +1656,7 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement,
|
|||
shardConnections->connectionList = NIL;
|
||||
|
||||
/* connect to shards placements and start transactions */
|
||||
OpenCopyTransactions(copyStatement, shardConnections, true, useBinaryCopyFormat);
|
||||
OpenCopyTransactions(relation, shardConnections, true, useBinaryCopyFormat);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -43,8 +43,20 @@ typedef struct NodeAddress
|
|||
int32 nodePort;
|
||||
} NodeAddress;
|
||||
|
||||
/* struct type for a generic source of tuples to copy to shards */
|
||||
typedef struct CopyTupleSource
|
||||
{
|
||||
void *context;
|
||||
MemoryContext rowContext;
|
||||
|
||||
void (*Open)(void *context, Relation relation, ErrorContextCallback *errorCallback);
|
||||
bool (*NextTuple)(void *context, Datum *columnValues, bool *columnNulls);
|
||||
void (*Close)(void *context);
|
||||
} CopyTupleSource;
|
||||
|
||||
|
||||
/* function declarations for copying into a distributed table */
|
||||
extern uint64 CopyTupleSourceToShards(CopyTupleSource *tupleSource, RangeVar *relation);
|
||||
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
||||
extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
|
||||
TupleDesc rowDescriptor,
|
||||
|
|
Loading…
Reference in New Issue