mirror of https://github.com/citusdata/citus.git
Merge pull request #3361 from citusdata/copy_out
Implement direct COPY table TO stdoutpull/3517/head
commit
3e7d4fd739
|
@ -86,6 +86,7 @@
|
|||
#include "distributed/worker_protocol.h"
|
||||
#include "executor/executor.h"
|
||||
#include "foreign/foreign.h"
|
||||
#include "libpq/libpq.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "nodes/nodeFuncs.h"
|
||||
|
@ -197,14 +198,13 @@ static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
|
|||
ShardConnections *shardConnections, bool
|
||||
stopOnFailure,
|
||||
bool useBinaryCopyFormat);
|
||||
|
||||
static List * RemoveOptionFromList(List *optionList, char *optionName);
|
||||
static bool BinaryOutputFunctionDefined(Oid typeId);
|
||||
static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId,
|
||||
List *connectionList);
|
||||
static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId,
|
||||
List *connectionList);
|
||||
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId,
|
||||
bool useBinaryCopyFormat);
|
||||
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
|
||||
static void SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList);
|
||||
static void SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId,
|
||||
MultiConnection *connection);
|
||||
|
@ -246,13 +246,19 @@ static void EndPlacementStateCopyCommand(CopyPlacementState *placementState,
|
|||
static void UnclaimCopyConnections(List *connectionStateList);
|
||||
static void ShutdownCopyConnectionState(CopyConnectionState *connectionState,
|
||||
CitusCopyDestReceiver *copyDest);
|
||||
static void CitusCopyTo(CopyStmt *copyStatement, char *completionTag);
|
||||
static int64 ForwardCopyDataFromConnection(CopyOutState copyOutState,
|
||||
MultiConnection *connection);
|
||||
|
||||
/* Private functions copied and adapted from copy.c in PostgreSQL */
|
||||
static void SendCopyBegin(CopyOutState cstate);
|
||||
static void SendCopyEnd(CopyOutState cstate);
|
||||
static void CopySendData(CopyOutState outputState, const void *databuf, int datasize);
|
||||
static void CopySendString(CopyOutState outputState, const char *str);
|
||||
static void CopySendChar(CopyOutState outputState, char c);
|
||||
static void CopySendInt32(CopyOutState outputState, int32 val);
|
||||
static void CopySendInt16(CopyOutState outputState, int16 val);
|
||||
static void CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine);
|
||||
static void CopyAttributeOutText(CopyOutState outputState, char *string);
|
||||
static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer);
|
||||
static bool CitusSendTupleToPlacements(TupleTableSlot *slot,
|
||||
|
@ -571,8 +577,18 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
|||
* From here on we use copyStatement as the template for the command
|
||||
* that we send to workers. This command does not have an attribute
|
||||
* list since NextCopyFrom will generate a value for all columns.
|
||||
* We also strip options.
|
||||
*/
|
||||
copyStatement->attlist = NIL;
|
||||
copyStatement->options = NIL;
|
||||
|
||||
if (copyOutState->binary)
|
||||
{
|
||||
DefElem *binaryFormatOption =
|
||||
makeDefElem("format", (Node *) makeString("binary"), -1);
|
||||
|
||||
copyStatement->options = lappend(copyStatement->options, binaryFormatOption);
|
||||
}
|
||||
|
||||
while (true)
|
||||
{
|
||||
|
@ -686,6 +702,32 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* RemoveOptionFromList removes an option from a list of options in a
|
||||
* COPY .. WITH (..) statement by name and returns the resulting list.
|
||||
*/
|
||||
static List *
|
||||
RemoveOptionFromList(List *optionList, char *optionName)
|
||||
{
|
||||
ListCell *optionCell = NULL;
|
||||
ListCell *previousCell = NULL;
|
||||
|
||||
foreach(optionCell, optionList)
|
||||
{
|
||||
DefElem *option = (DefElem *) lfirst(optionCell);
|
||||
|
||||
if (strncmp(option->defname, optionName, NAMEDATALEN) == 0)
|
||||
{
|
||||
return list_delete_cell(optionList, optionCell, previousCell);
|
||||
}
|
||||
|
||||
previousCell = optionCell;
|
||||
}
|
||||
|
||||
return optionList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* OpenCopyConnectionsForNewShards opens a connection for each placement of a shard and
|
||||
* starts a COPY transaction if necessary. If a connection cannot be opened,
|
||||
|
@ -759,8 +801,7 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
|
|||
RemoteTransactionBeginIfNecessary(connection);
|
||||
|
||||
StringInfo copyCommand = ConstructCopyStatement(copyStatement,
|
||||
shardConnections->shardId,
|
||||
useBinaryCopyFormat);
|
||||
shardConnections->shardId);
|
||||
|
||||
if (!SendRemoteCommand(connection, copyCommand->data))
|
||||
{
|
||||
|
@ -939,7 +980,7 @@ SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId, List *connection
|
|||
* shard.
|
||||
*/
|
||||
static StringInfo
|
||||
ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCopyFormat)
|
||||
ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId)
|
||||
{
|
||||
StringInfo command = makeStringInfo();
|
||||
|
||||
|
@ -961,35 +1002,76 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCop
|
|||
|
||||
foreach(columnNameCell, copyStatement->attlist)
|
||||
{
|
||||
char *columnName = (char *) lfirst(columnNameCell);
|
||||
char *columnName = strVal(lfirst(columnNameCell));
|
||||
const char *quotedColumnName = quote_identifier(columnName);
|
||||
|
||||
if (!appendedFirstName)
|
||||
{
|
||||
appendStringInfo(command, "(%s", columnName);
|
||||
appendStringInfo(command, "(%s", quotedColumnName);
|
||||
appendedFirstName = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfo(command, ", %s", columnName);
|
||||
appendStringInfo(command, ", %s", quotedColumnName);
|
||||
}
|
||||
}
|
||||
|
||||
appendStringInfoString(command, ") ");
|
||||
}
|
||||
|
||||
appendStringInfo(command, "FROM STDIN WITH ");
|
||||
|
||||
if (IsCopyResultStmt(copyStatement))
|
||||
if (copyStatement->is_from)
|
||||
{
|
||||
appendStringInfoString(command, "(FORMAT RESULT)");
|
||||
}
|
||||
else if (useBinaryCopyFormat)
|
||||
{
|
||||
appendStringInfoString(command, "(FORMAT BINARY)");
|
||||
appendStringInfoString(command, "FROM STDIN");
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfoString(command, "(FORMAT TEXT)");
|
||||
appendStringInfoString(command, "TO STDOUT");
|
||||
}
|
||||
|
||||
if (copyStatement->options != NIL)
|
||||
{
|
||||
ListCell *optionCell = NULL;
|
||||
|
||||
appendStringInfoString(command, " WITH (");
|
||||
|
||||
foreach(optionCell, copyStatement->options)
|
||||
{
|
||||
DefElem *defel = (DefElem *) lfirst(optionCell);
|
||||
|
||||
if (optionCell != list_head(copyStatement->options))
|
||||
{
|
||||
appendStringInfoString(command, ", ");
|
||||
}
|
||||
|
||||
appendStringInfo(command, "%s", defel->defname);
|
||||
|
||||
if (defel->arg == NULL)
|
||||
{
|
||||
/* option without value */
|
||||
}
|
||||
else if (IsA(defel->arg, String))
|
||||
{
|
||||
char *value = defGetString(defel);
|
||||
|
||||
/* make sure strings are quoted (may contain reserved characters) */
|
||||
appendStringInfo(command, " %s", quote_literal_cstr(value));
|
||||
}
|
||||
else if (IsA(defel->arg, List))
|
||||
{
|
||||
List *nameList = defGetStringList(defel);
|
||||
|
||||
appendStringInfo(command, " (%s)", NameListToQuotedString(nameList));
|
||||
}
|
||||
else
|
||||
{
|
||||
char *value = defGetString(defel);
|
||||
|
||||
/* numeric options or * should not have quotes */
|
||||
appendStringInfo(command, " %s", value);
|
||||
}
|
||||
}
|
||||
|
||||
appendStringInfoString(command, ")");
|
||||
}
|
||||
|
||||
return command;
|
||||
|
@ -1608,6 +1690,67 @@ CreateEmptyShard(char *relationName)
|
|||
|
||||
|
||||
/* *INDENT-OFF* */
|
||||
|
||||
|
||||
/*
|
||||
* Send copy start/stop messages for frontend copies. These have changed
|
||||
* in past protocol redesigns.
|
||||
*/
|
||||
static void
|
||||
SendCopyBegin(CopyOutState cstate)
|
||||
{
|
||||
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
|
||||
{
|
||||
/* new way */
|
||||
StringInfoData buf;
|
||||
int natts = list_length(cstate->attnumlist);
|
||||
int16 format = (cstate->binary ? 1 : 0);
|
||||
int i;
|
||||
|
||||
pq_beginmessage(&buf, 'H');
|
||||
pq_sendbyte(&buf, format); /* overall format */
|
||||
pq_sendint16(&buf, natts);
|
||||
for (i = 0; i < natts; i++)
|
||||
pq_sendint16(&buf, format); /* per-column formats */
|
||||
pq_endmessage(&buf);
|
||||
cstate->copy_dest = COPY_NEW_FE;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* old way */
|
||||
if (cstate->binary)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("COPY BINARY is not supported to stdout or from stdin")));
|
||||
pq_putemptymessage('H');
|
||||
/* grottiness needed for old COPY OUT protocol */
|
||||
pq_startcopyout();
|
||||
cstate->copy_dest = COPY_OLD_FE;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* End a copy stream sent to the client */
|
||||
static void
|
||||
SendCopyEnd(CopyOutState cstate)
|
||||
{
|
||||
if (cstate->copy_dest == COPY_NEW_FE)
|
||||
{
|
||||
/* Shouldn't have any unsent data */
|
||||
Assert(cstate->fe_msgbuf->len == 0);
|
||||
/* Send Copy Done message */
|
||||
pq_putemptymessage('c');
|
||||
}
|
||||
else
|
||||
{
|
||||
CopySendData(cstate, "\\.", 2);
|
||||
/* Need to flush out the trailer (this also appends a newline) */
|
||||
CopySendEndOfRow(cstate, true);
|
||||
pq_endcopyout(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Append data to the copy buffer in outputState */
|
||||
static void
|
||||
CopySendData(CopyOutState outputState, const void *databuf, int datasize)
|
||||
|
@ -1650,6 +1793,45 @@ CopySendInt16(CopyOutState outputState, int16 val)
|
|||
}
|
||||
|
||||
|
||||
/* Send the row to the appropriate destination */
|
||||
static void
|
||||
CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine)
|
||||
{
|
||||
StringInfo fe_msgbuf = cstate->fe_msgbuf;
|
||||
|
||||
switch (cstate->copy_dest)
|
||||
{
|
||||
case COPY_OLD_FE:
|
||||
/* The FE/BE protocol uses \n as newline for all platforms */
|
||||
if (!cstate->binary && includeEndOfLine)
|
||||
CopySendChar(cstate, '\n');
|
||||
|
||||
if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
|
||||
{
|
||||
/* no hope of recovering connection sync, so FATAL */
|
||||
ereport(FATAL,
|
||||
(errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("connection lost during COPY to stdout")));
|
||||
}
|
||||
break;
|
||||
case COPY_NEW_FE:
|
||||
/* The FE/BE protocol uses \n as newline for all platforms */
|
||||
if (!cstate->binary && includeEndOfLine)
|
||||
CopySendChar(cstate, '\n');
|
||||
|
||||
/* Dump the accumulated row as one CopyData message */
|
||||
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
|
||||
break;
|
||||
case COPY_FILE:
|
||||
case COPY_CALLBACK:
|
||||
Assert(false); /* Not yet supported. */
|
||||
break;
|
||||
}
|
||||
|
||||
resetStringInfo(fe_msgbuf);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Send text representation of one column, with conversion and escaping.
|
||||
*
|
||||
|
@ -1817,7 +1999,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
char *schemaName = get_namespace_name(schemaOid);
|
||||
|
||||
List *columnNameList = copyDest->columnNameList;
|
||||
List *quotedColumnNameList = NIL;
|
||||
List *attributeList = NIL;
|
||||
|
||||
ListCell *columnNameCell = NULL;
|
||||
|
||||
|
@ -1916,13 +2098,13 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
TypeOutputFunctions(columnCount, finalTypeArray, copyOutState->binary);
|
||||
}
|
||||
|
||||
/* ensure the column names are properly quoted in the COPY statement */
|
||||
/* wrap the column names as Values */
|
||||
foreach(columnNameCell, columnNameList)
|
||||
{
|
||||
char *columnName = (char *) lfirst(columnNameCell);
|
||||
char *quotedColumnName = (char *) quote_identifier(columnName);
|
||||
Value *columnNameValue = makeString(columnName);
|
||||
|
||||
quotedColumnNameList = lappend(quotedColumnNameList, quotedColumnName);
|
||||
attributeList = lappend(attributeList, columnNameValue);
|
||||
}
|
||||
|
||||
if (partitionMethod != DISTRIBUTE_BY_NONE &&
|
||||
|
@ -1949,10 +2131,18 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
{
|
||||
copyStatement->relation = makeRangeVar(schemaName, relationName, -1);
|
||||
copyStatement->options = NIL;
|
||||
|
||||
if (copyOutState->binary)
|
||||
{
|
||||
DefElem *binaryFormatOption =
|
||||
makeDefElem("format", (Node *) makeString("binary"), -1);
|
||||
|
||||
copyStatement->options = lappend(copyStatement->options, binaryFormatOption);
|
||||
}
|
||||
}
|
||||
|
||||
copyStatement->query = NULL;
|
||||
copyStatement->attlist = quotedColumnNameList;
|
||||
copyStatement->attlist = attributeList;
|
||||
copyStatement->is_from = true;
|
||||
copyStatement->is_program = false;
|
||||
copyStatement->filename = NULL;
|
||||
|
@ -2423,12 +2613,22 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
|
|||
CitusCopyFrom(copyStatement, completionTag);
|
||||
return NULL;
|
||||
}
|
||||
else if (copyStatement->filename == NULL && !copyStatement->is_program &&
|
||||
!CopyStatementHasFormat(copyStatement, "binary"))
|
||||
{
|
||||
/*
|
||||
* COPY table TO STDOUT is handled by specialized logic to
|
||||
* avoid buffering the table on the coordinator. This enables
|
||||
* pg_dump of large tables.
|
||||
*/
|
||||
CitusCopyTo(copyStatement, completionTag);
|
||||
return NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* The copy code only handles SELECTs in COPY ... TO on master tables,
|
||||
* as that can be done non-invasively. To handle COPY master_rel TO
|
||||
* the copy statement is replaced by a generated select statement.
|
||||
* COPY table TO PROGRAM / file is handled by wrapping the table
|
||||
* in a SELECT * FROM table and going through the result COPY logic.
|
||||
*/
|
||||
ColumnRef *allColumns = makeNode(ColumnRef);
|
||||
SelectStmt *selectStmt = makeNode(SelectStmt);
|
||||
|
@ -2505,6 +2705,147 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusCopyTo runs a COPY .. TO STDOUT command on each shard to to a full
|
||||
* table dump.
|
||||
*/
|
||||
static void
|
||||
CitusCopyTo(CopyStmt *copyStatement, char *completionTag)
|
||||
{
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
int64 tuplesSent = 0;
|
||||
|
||||
Relation distributedRelation = heap_openrv(copyStatement->relation, AccessShareLock);
|
||||
Oid relationId = RelationGetRelid(distributedRelation);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation);
|
||||
|
||||
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
|
||||
copyOutState->fe_msgbuf = makeStringInfo();
|
||||
copyOutState->binary = false;
|
||||
copyOutState->attnumlist = CopyGetAttnums(tupleDescriptor, distributedRelation,
|
||||
copyStatement->attlist);
|
||||
|
||||
SendCopyBegin(copyOutState);
|
||||
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
ShardInterval *shardInterval = lfirst(shardIntervalCell);
|
||||
List *shardPlacementList = ActiveShardPlacementList(shardInterval->shardId);
|
||||
ListCell *shardPlacementCell = NULL;
|
||||
int placementIndex = 0;
|
||||
|
||||
StringInfo copyCommand = ConstructCopyStatement(copyStatement,
|
||||
shardInterval->shardId);
|
||||
|
||||
foreach(shardPlacementCell, shardPlacementList)
|
||||
{
|
||||
ShardPlacement *shardPlacement = lfirst(shardPlacementCell);
|
||||
int connectionFlags = 0;
|
||||
char *userName = NULL;
|
||||
const bool raiseErrors = true;
|
||||
|
||||
MultiConnection *connection = GetPlacementConnection(connectionFlags,
|
||||
shardPlacement,
|
||||
userName);
|
||||
|
||||
if (placementIndex == list_length(shardPlacementList) - 1)
|
||||
{
|
||||
/* last chance for this shard */
|
||||
MarkRemoteTransactionCritical(connection);
|
||||
}
|
||||
|
||||
if (PQstatus(connection->pgConn) != CONNECTION_OK)
|
||||
{
|
||||
HandleRemoteTransactionConnectionError(connection, raiseErrors);
|
||||
continue;
|
||||
}
|
||||
|
||||
RemoteTransactionBeginIfNecessary(connection);
|
||||
|
||||
if (!SendRemoteCommand(connection, copyCommand->data))
|
||||
{
|
||||
HandleRemoteTransactionConnectionError(connection, raiseErrors);
|
||||
continue;
|
||||
}
|
||||
|
||||
PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
|
||||
if (PQresultStatus(result) != PGRES_COPY_OUT)
|
||||
{
|
||||
ReportResultError(connection, result, ERROR);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
||||
tuplesSent += ForwardCopyDataFromConnection(copyOutState, connection);
|
||||
break;
|
||||
}
|
||||
|
||||
if (shardIntervalCell == list_head(shardIntervalList))
|
||||
{
|
||||
/* remove header after the first shard */
|
||||
RemoveOptionFromList(copyStatement->options, "header");
|
||||
}
|
||||
}
|
||||
|
||||
SendCopyEnd(copyOutState);
|
||||
|
||||
heap_close(distributedRelation, AccessShareLock);
|
||||
|
||||
if (completionTag != NULL)
|
||||
{
|
||||
snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "COPY " UINT64_FORMAT,
|
||||
tuplesSent);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ForwardCopyDataFromConnection forwards copy data received over the given connection
|
||||
* to the client or file descriptor.
|
||||
*/
|
||||
static int64
|
||||
ForwardCopyDataFromConnection(CopyOutState copyOutState, MultiConnection *connection)
|
||||
{
|
||||
char *receiveBuffer = NULL;
|
||||
const int useAsync = 0;
|
||||
bool raiseErrors = true;
|
||||
int64 tuplesSent = 0;
|
||||
|
||||
/* receive copy data message in a synchronous manner */
|
||||
int receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, useAsync);
|
||||
while (receiveLength > 0)
|
||||
{
|
||||
bool includeEndOfLine = false;
|
||||
|
||||
CopySendData(copyOutState, receiveBuffer, receiveLength);
|
||||
CopySendEndOfRow(copyOutState, includeEndOfLine);
|
||||
tuplesSent++;
|
||||
|
||||
PQfreemem(receiveBuffer);
|
||||
|
||||
receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, useAsync);
|
||||
}
|
||||
|
||||
if (receiveLength != -1)
|
||||
{
|
||||
ReportConnectionError(connection, ERROR);
|
||||
}
|
||||
|
||||
PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
|
||||
if (!IsResponseOK(result))
|
||||
{
|
||||
ReportResultError(connection, result, ERROR);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
ClearResults(connection, raiseErrors);
|
||||
|
||||
return tuplesSent;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Check whether the current user has the permission to execute a COPY
|
||||
* statement, raise ERROR if not. In some cases we have to do this separately
|
||||
|
@ -2932,7 +3273,7 @@ StartPlacementStateCopyCommand(CopyPlacementState *placementState,
|
|||
bool raiseInterrupts = true;
|
||||
bool binaryCopy = copyOutState->binary;
|
||||
|
||||
StringInfo copyCommand = ConstructCopyStatement(copyStatement, shardId, binaryCopy);
|
||||
StringInfo copyCommand = ConstructCopyStatement(copyStatement, shardId);
|
||||
|
||||
if (!SendRemoteCommand(connection, copyCommand->data))
|
||||
{
|
||||
|
|
|
@ -24,6 +24,18 @@
|
|||
#define INVALID_PARTITION_COLUMN_INDEX -1
|
||||
|
||||
|
||||
/*
|
||||
* CitusCopyDest indicates the source or destination of a COPY command.
|
||||
*/
|
||||
typedef enum CitusCopyDest
|
||||
{
|
||||
COPY_FILE, /* to/from file (or a piped program) */
|
||||
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
|
||||
COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
|
||||
COPY_CALLBACK /* to/from callback function */
|
||||
} CitusCopyDest;
|
||||
|
||||
|
||||
/*
|
||||
* A smaller version of copy.c's CopyStateData, trimmed to the elements
|
||||
* necessary to copy out results. While it'd be a bit nicer to share code,
|
||||
|
@ -31,8 +43,10 @@
|
|||
*/
|
||||
typedef struct CopyOutStateData
|
||||
{
|
||||
CitusCopyDest copy_dest; /* type of copy source/destination */
|
||||
StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
|
||||
* dest == COPY_NEW_FE in COPY FROM */
|
||||
List *attnumlist; /* integer list of attnums to copy */
|
||||
int file_encoding; /* file or remote side's character encoding */
|
||||
bool need_transcoding; /* file encoding diff from server? */
|
||||
bool binary; /* binary format? */
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# Regression test output
|
||||
/regression.diffs
|
||||
/regression.out
|
||||
/test_times.log
|
||||
|
||||
# Regression test timing
|
||||
/test_times.log
|
||||
|
|
|
@ -42,7 +42,7 @@ SELECT citus.dump_network_traffic();
|
|||
(0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=worker_apply_shard_ddl_command,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']")
|
||||
(0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]")
|
||||
(0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
|
||||
(0,coordinator,"['Query(query=COPY public.copy_test_XXXXXX FROM STDIN WITH (FORMAT BINARY))']")
|
||||
(0,coordinator,"[""Query(query=COPY public.copy_test_XXXXXX FROM STDIN WITH (format 'binary'))""]")
|
||||
(0,worker,"[""Backend(type=G,body=b'\\\\x01\\\\x00\\\\x02\\\\x00\\\\x01\\\\x00\\\\x01')""]")
|
||||
(0,coordinator,"[""CopyData(data=b'PGCOPY\\\\n\\\\xff\\\\r\\\\n\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x04')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x03\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\t')"", ""CopyData(data=b'\\\\xff\\\\xff')"", 'CopyDone()']")
|
||||
(0,worker,"['CommandComplete(command=COPY 4)', 'ReadyForQuery(state=in_transaction_block)']")
|
||||
|
|
|
@ -39,7 +39,7 @@ SELECT citus.dump_network_traffic();
|
|||
(0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']")
|
||||
(0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]")
|
||||
(0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
|
||||
(0,coordinator,"['Query(query=COPY public.copy_test_XXXXXX (key, value) FROM STDIN WITH (FORMAT BINARY))']")
|
||||
(0,coordinator,"[""Query(query=COPY public.copy_test_XXXXXX (key, value) FROM STDIN WITH (format 'binary'))""]")
|
||||
(0,worker,"[""Backend(type=G,body=b'\\\\x01\\\\x00\\\\x02\\\\x00\\\\x01\\\\x00\\\\x01')""]")
|
||||
(0,coordinator,"[""CopyData(data=b'PGCOPY\\\\n\\\\xff\\\\r\\\\n\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x04')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x03\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\t')"", ""CopyData(data=b'\\\\xff\\\\xff')"", 'CopyDone()']")
|
||||
(0,worker,"['CommandComplete(command=COPY 4)', 'ReadyForQuery(state=in_transaction_block)']")
|
||||
|
|
|
@ -953,8 +953,8 @@ COPY reference_table_test (value_2, value_3, value_4) FROM STDIN WITH CSV;
|
|||
COPY reference_table_test (value_3) FROM STDIN WITH CSV;
|
||||
COPY reference_table_test FROM STDIN WITH CSV;
|
||||
COPY reference_table_test TO STDOUT WITH CSV;
|
||||
1,1,1,Fri Jan 01 00:00:00 2016
|
||||
,2,2,Sat Jan 02 00:00:00 2016
|
||||
1,1,1,2016-01-01 00:00:00
|
||||
,2,2,2016-01-02 00:00:00
|
||||
,,3,
|
||||
,,,
|
||||
-- INSERT INTO SELECT among reference tables
|
||||
|
|
|
@ -542,8 +542,6 @@ DETAIL: distribution column value: 1
|
|||
|
||||
-- copying from a single shard table does not require the master query
|
||||
COPY articles_single_shard TO stdout;
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
50 10 anjanette 19519
|
||||
SELECT avg(word_count)
|
||||
FROM articles
|
||||
|
|
|
@ -486,8 +486,6 @@ DETAIL: distribution column value: 1
|
|||
|
||||
-- copying from a single shard table does not require the master query
|
||||
COPY articles_single_shard TO stdout;
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
50 10 anjanette 19519
|
||||
SELECT avg(word_count)
|
||||
FROM articles
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
CREATE TEMPORARY TABLE output (line text);
|
||||
CREATE SCHEMA dumper;
|
||||
SET search_path TO 'dumper';
|
||||
SET citus.next_shard_id TO 2900000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE data (
|
||||
key int,
|
||||
value text
|
||||
);
|
||||
SELECT create_distributed_table('data', 'key');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COPY data FROM STDIN WITH (format csv, delimiter '|', escape '\');
|
||||
-- duplicate the data using pg_dump
|
||||
\COPY output FROM PROGRAM 'pg_dump --quote-all-identifiers -h localhost -p 57636 -U postgres -d regression -t dumper.data --data-only | psql -tAX -h localhost -p 57636 -U postgres -d regression'
|
||||
-- data should now appear twice
|
||||
COPY data TO STDOUT;
|
||||
1 {this:is,json:1}
|
||||
1 {this:is,json:1}
|
||||
3 {{}:\t}
|
||||
4 {}
|
||||
3 {{}:\t}
|
||||
4 {}
|
||||
2 {$":9}
|
||||
2 {$":9}
|
||||
-- go crazy with names
|
||||
CREATE TABLE "weird.table" (
|
||||
"key," int primary key,
|
||||
"data.jsonb" jsonb,
|
||||
"?empty(" text default ''
|
||||
);
|
||||
SELECT create_distributed_table('"weird.table"', 'key,');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE INDEX "weird.json_idx" ON "weird.table" USING GIN ("data.jsonb" jsonb_path_ops);
|
||||
COPY "weird.table" ("key,", "data.jsonb") FROM STDIN WITH (format 'text');
|
||||
-- fast table dump with many options
|
||||
COPY dumper."weird.table" ("data.jsonb", "?empty(")TO STDOUT WITH (format csv, force_quote ("?empty("), null 'null', delimiter '?', quote '_', header 1);
|
||||
data.jsonb?_?empty(_
|
||||
{"weird": {"table": "{:"}}?__
|
||||
_{"?\"": []}_?__
|
||||
-- do a full pg_dump of the schema, use some weird quote/escape/delimiter characters to capture the full line
|
||||
\COPY output FROM PROGRAM 'pg_dump -f results/pg_dump.tmp -h localhost -p 57636 -U postgres -d regression -n dumper --quote-all-identifiers' WITH (format csv, delimiter '|', escape '^', quote '^')
|
||||
-- drop the schema
|
||||
DROP SCHEMA dumper CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
DETAIL: drop cascades to table data
|
||||
drop cascades to table "weird.table"
|
||||
-- recreate the schema
|
||||
\COPY (SELECT line FROM output WHERE line IS NOT NULL) TO PROGRAM 'psql -qtAX -h localhost -p 57636 -U postgres -d regression -f results/pg_dump.tmp' WITH (format csv, delimiter '|', escape '^', quote '^')
|
||||
|
||||
-- redistribute the schema
|
||||
SELECT create_distributed_table('data', 'key');
|
||||
NOTICE: Copying data from local table...
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('"weird.table"', 'key,');
|
||||
NOTICE: Copying data from local table...
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- check the table contents
|
||||
COPY data (value) TO STDOUT WITH (format csv, force_quote *);
|
||||
"{this:is,json:1}"
|
||||
"{this:is,json:1}"
|
||||
"{{}: }"
|
||||
"{}"
|
||||
"{{}: }"
|
||||
"{}"
|
||||
"{$"":9}"
|
||||
"{$"":9}"
|
||||
COPY dumper."weird.table" ("data.jsonb", "?empty(") TO STDOUT WITH (format csv, force_quote ("?empty("), null 'null', header true);
|
||||
data.jsonb,?empty(
|
||||
"{""weird"": {""table"": ""{:""}}",""
|
||||
"{""?\"""": []}",""
|
||||
SELECT indexname FROM pg_indexes WHERE tablename = 'weird.table' ORDER BY indexname;
|
||||
indexname
|
||||
---------------------------------------------------------------------
|
||||
weird.json_idx
|
||||
weird.table_pkey
|
||||
(2 rows)
|
||||
|
||||
DROP SCHEMA dumper CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
DETAIL: drop cascades to table data
|
||||
drop cascades to table "weird.table"
|
|
@ -203,7 +203,7 @@ test: multi_transaction_recovery
|
|||
# multi_copy creates hash and range-partitioned tables and performs COPY
|
||||
# multi_router_planner creates hash partitioned tables.
|
||||
# ---------
|
||||
test: multi_copy fast_path_router_modify
|
||||
test: multi_copy fast_path_router_modify pg_dump
|
||||
test: multi_router_planner multi_router_planner_fast_path null_parameters
|
||||
|
||||
# ----------
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
CREATE TEMPORARY TABLE output (line text);
|
||||
|
||||
CREATE SCHEMA dumper;
|
||||
SET search_path TO 'dumper';
|
||||
|
||||
SET citus.next_shard_id TO 2900000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
CREATE TABLE data (
|
||||
key int,
|
||||
value text
|
||||
);
|
||||
SELECT create_distributed_table('data', 'key');
|
||||
|
||||
COPY data FROM STDIN WITH (format csv, delimiter '|', escape '\');
|
||||
1|{"this":"is","json":1}
|
||||
2|{"$\"":9}
|
||||
3|{"{}":" "}
|
||||
4|{}
|
||||
\.
|
||||
|
||||
-- duplicate the data using pg_dump
|
||||
\COPY output FROM PROGRAM 'pg_dump --quote-all-identifiers -h localhost -p 57636 -U postgres -d regression -t dumper.data --data-only | psql -tAX -h localhost -p 57636 -U postgres -d regression'
|
||||
|
||||
-- data should now appear twice
|
||||
COPY data TO STDOUT;
|
||||
|
||||
-- go crazy with names
|
||||
CREATE TABLE "weird.table" (
|
||||
"key," int primary key,
|
||||
"data.jsonb" jsonb,
|
||||
"?empty(" text default ''
|
||||
);
|
||||
SELECT create_distributed_table('"weird.table"', 'key,');
|
||||
CREATE INDEX "weird.json_idx" ON "weird.table" USING GIN ("data.jsonb" jsonb_path_ops);
|
||||
|
||||
COPY "weird.table" ("key,", "data.jsonb") FROM STDIN WITH (format 'text');
|
||||
1 {"weird":{"table":"{:"}}
|
||||
2 {"?\\\"":[]}
|
||||
\.
|
||||
|
||||
-- fast table dump with many options
|
||||
COPY dumper."weird.table" ("data.jsonb", "?empty(")TO STDOUT WITH (format csv, force_quote ("?empty("), null 'null', delimiter '?', quote '_', header 1);
|
||||
|
||||
-- do a full pg_dump of the schema, use some weird quote/escape/delimiter characters to capture the full line
|
||||
\COPY output FROM PROGRAM 'pg_dump -f results/pg_dump.tmp -h localhost -p 57636 -U postgres -d regression -n dumper --quote-all-identifiers' WITH (format csv, delimiter '|', escape '^', quote '^')
|
||||
|
||||
-- drop the schema
|
||||
DROP SCHEMA dumper CASCADE;
|
||||
|
||||
-- recreate the schema
|
||||
\COPY (SELECT line FROM output WHERE line IS NOT NULL) TO PROGRAM 'psql -qtAX -h localhost -p 57636 -U postgres -d regression -f results/pg_dump.tmp' WITH (format csv, delimiter '|', escape '^', quote '^')
|
||||
|
||||
-- redistribute the schema
|
||||
SELECT create_distributed_table('data', 'key');
|
||||
SELECT create_distributed_table('"weird.table"', 'key,');
|
||||
|
||||
-- check the table contents
|
||||
COPY data (value) TO STDOUT WITH (format csv, force_quote *);
|
||||
COPY dumper."weird.table" ("data.jsonb", "?empty(") TO STDOUT WITH (format csv, force_quote ("?empty("), null 'null', header true);
|
||||
|
||||
SELECT indexname FROM pg_indexes WHERE tablename = 'weird.table' ORDER BY indexname;
|
||||
|
||||
DROP SCHEMA dumper CASCADE;
|
Loading…
Reference in New Issue