Implement direct COPY table TO stdout

pull/3361/head
Marco Slot 2019-03-08 17:56:01 +01:00
parent 3f7c5a5cf6
commit 038e5999cb
11 changed files with 549 additions and 36 deletions

View File

@ -86,6 +86,7 @@
#include "distributed/worker_protocol.h"
#include "executor/executor.h"
#include "foreign/foreign.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
@ -197,14 +198,13 @@ static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
ShardConnections *shardConnections, bool
stopOnFailure,
bool useBinaryCopyFormat);
static List * RemoveOptionFromList(List *optionList, char *optionName);
static bool BinaryOutputFunctionDefined(Oid typeId);
static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId,
List *connectionList);
static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId,
List *connectionList);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId,
bool useBinaryCopyFormat);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
static void SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList);
static void SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId,
MultiConnection *connection);
@ -246,13 +246,19 @@ static void EndPlacementStateCopyCommand(CopyPlacementState *placementState,
static void UnclaimCopyConnections(List *connectionStateList);
static void ShutdownCopyConnectionState(CopyConnectionState *connectionState,
CitusCopyDestReceiver *copyDest);
static void CitusCopyTo(CopyStmt *copyStatement, char *completionTag);
static int64 ForwardCopyDataFromConnection(CopyOutState copyOutState,
MultiConnection *connection);
/* Private functions copied and adapted from copy.c in PostgreSQL */
static void SendCopyBegin(CopyOutState cstate);
static void SendCopyEnd(CopyOutState cstate);
static void CopySendData(CopyOutState outputState, const void *databuf, int datasize);
static void CopySendString(CopyOutState outputState, const char *str);
static void CopySendChar(CopyOutState outputState, char c);
static void CopySendInt32(CopyOutState outputState, int32 val);
static void CopySendInt16(CopyOutState outputState, int16 val);
static void CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine);
static void CopyAttributeOutText(CopyOutState outputState, char *string);
static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer);
static bool CitusSendTupleToPlacements(TupleTableSlot *slot,
@ -571,8 +577,18 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
* From here on we use copyStatement as the template for the command
* that we send to workers. This command does not have an attribute
* list since NextCopyFrom will generate a value for all columns.
* We also strip options.
*/
copyStatement->attlist = NIL;
copyStatement->options = NIL;
if (copyOutState->binary)
{
DefElem *binaryFormatOption =
makeDefElem("format", (Node *) makeString("binary"), -1);
copyStatement->options = lappend(copyStatement->options, binaryFormatOption);
}
while (true)
{
@ -686,6 +702,32 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
}
/*
* RemoveOptionFromList removes an option from a list of options in a
* COPY .. WITH (..) statement by name and returns the resulting list.
*/
static List *
RemoveOptionFromList(List *optionList, char *optionName)
{
ListCell *optionCell = NULL;
ListCell *previousCell = NULL;
foreach(optionCell, optionList)
{
DefElem *option = (DefElem *) lfirst(optionCell);
if (strncmp(option->defname, optionName, NAMEDATALEN) == 0)
{
return list_delete_cell(optionList, optionCell, previousCell);
}
previousCell = optionCell;
}
return optionList;
}
/*
* OpenCopyConnectionsForNewShards opens a connection for each placement of a shard and
* starts a COPY transaction if necessary. If a connection cannot be opened,
@ -759,8 +801,7 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
RemoteTransactionBeginIfNecessary(connection);
StringInfo copyCommand = ConstructCopyStatement(copyStatement,
shardConnections->shardId,
useBinaryCopyFormat);
shardConnections->shardId);
if (!SendRemoteCommand(connection, copyCommand->data))
{
@ -939,7 +980,7 @@ SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId, List *connection
* shard.
*/
static StringInfo
ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCopyFormat)
ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId)
{
StringInfo command = makeStringInfo();
@ -961,35 +1002,76 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCop
foreach(columnNameCell, copyStatement->attlist)
{
char *columnName = (char *) lfirst(columnNameCell);
char *columnName = strVal(lfirst(columnNameCell));
const char *quotedColumnName = quote_identifier(columnName);
if (!appendedFirstName)
{
appendStringInfo(command, "(%s", columnName);
appendStringInfo(command, "(%s", quotedColumnName);
appendedFirstName = true;
}
else
{
appendStringInfo(command, ", %s", columnName);
appendStringInfo(command, ", %s", quotedColumnName);
}
}
appendStringInfoString(command, ") ");
}
appendStringInfo(command, "FROM STDIN WITH ");
if (IsCopyResultStmt(copyStatement))
if (copyStatement->is_from)
{
appendStringInfoString(command, "(FORMAT RESULT)");
}
else if (useBinaryCopyFormat)
{
appendStringInfoString(command, "(FORMAT BINARY)");
appendStringInfoString(command, "FROM STDIN");
}
else
{
appendStringInfoString(command, "(FORMAT TEXT)");
appendStringInfoString(command, "TO STDOUT");
}
if (copyStatement->options != NIL)
{
ListCell *optionCell = NULL;
appendStringInfoString(command, " WITH (");
foreach(optionCell, copyStatement->options)
{
DefElem *defel = (DefElem *) lfirst(optionCell);
if (optionCell != list_head(copyStatement->options))
{
appendStringInfoString(command, ", ");
}
appendStringInfo(command, "%s", defel->defname);
if (defel->arg == NULL)
{
/* option without value */
}
else if (IsA(defel->arg, String))
{
char *value = defGetString(defel);
/* make sure strings are quoted (may contain reserved characters) */
appendStringInfo(command, " %s", quote_literal_cstr(value));
}
else if (IsA(defel->arg, List))
{
List *nameList = defGetStringList(defel);
appendStringInfo(command, " (%s)", NameListToQuotedString(nameList));
}
else
{
char *value = defGetString(defel);
/* numeric options or * should not have quotes */
appendStringInfo(command, " %s", value);
}
}
appendStringInfoString(command, ")");
}
return command;
@ -1608,6 +1690,67 @@ CreateEmptyShard(char *relationName)
/* *INDENT-OFF* */
/*
* Send copy start/stop messages for frontend copies. These have changed
* in past protocol redesigns.
*/
static void
SendCopyBegin(CopyOutState cstate)
{
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
{
/* new way */
StringInfoData buf;
int natts = list_length(cstate->attnumlist);
int16 format = (cstate->binary ? 1 : 0);
int i;
pq_beginmessage(&buf, 'H');
pq_sendbyte(&buf, format); /* overall format */
pq_sendint16(&buf, natts);
for (i = 0; i < natts; i++)
pq_sendint16(&buf, format); /* per-column formats */
pq_endmessage(&buf);
cstate->copy_dest = COPY_NEW_FE;
}
else
{
/* old way */
if (cstate->binary)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY BINARY is not supported to stdout or from stdin")));
pq_putemptymessage('H');
/* grottiness needed for old COPY OUT protocol */
pq_startcopyout();
cstate->copy_dest = COPY_OLD_FE;
}
}
/* End a copy stream sent to the client */
static void
SendCopyEnd(CopyOutState cstate)
{
if (cstate->copy_dest == COPY_NEW_FE)
{
/* Shouldn't have any unsent data */
Assert(cstate->fe_msgbuf->len == 0);
/* Send Copy Done message */
pq_putemptymessage('c');
}
else
{
CopySendData(cstate, "\\.", 2);
/* Need to flush out the trailer (this also appends a newline) */
CopySendEndOfRow(cstate, true);
pq_endcopyout(false);
}
}
/* Append data to the copy buffer in outputState */
static void
CopySendData(CopyOutState outputState, const void *databuf, int datasize)
@ -1650,6 +1793,45 @@ CopySendInt16(CopyOutState outputState, int16 val)
}
/* Send the row to the appropriate destination */
static void
CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine)
{
StringInfo fe_msgbuf = cstate->fe_msgbuf;
switch (cstate->copy_dest)
{
case COPY_OLD_FE:
/* The FE/BE protocol uses \n as newline for all platforms */
if (!cstate->binary && includeEndOfLine)
CopySendChar(cstate, '\n');
if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
{
/* no hope of recovering connection sync, so FATAL */
ereport(FATAL,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection lost during COPY to stdout")));
}
break;
case COPY_NEW_FE:
/* The FE/BE protocol uses \n as newline for all platforms */
if (!cstate->binary && includeEndOfLine)
CopySendChar(cstate, '\n');
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
case COPY_FILE:
case COPY_CALLBACK:
Assert(false); /* Not yet supported. */
break;
}
resetStringInfo(fe_msgbuf);
}
/*
* Send text representation of one column, with conversion and escaping.
*
@ -1817,7 +1999,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
char *schemaName = get_namespace_name(schemaOid);
List *columnNameList = copyDest->columnNameList;
List *quotedColumnNameList = NIL;
List *attributeList = NIL;
ListCell *columnNameCell = NULL;
@ -1916,13 +2098,13 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
TypeOutputFunctions(columnCount, finalTypeArray, copyOutState->binary);
}
/* ensure the column names are properly quoted in the COPY statement */
/* wrap the column names as Values */
foreach(columnNameCell, columnNameList)
{
char *columnName = (char *) lfirst(columnNameCell);
char *quotedColumnName = (char *) quote_identifier(columnName);
Value *columnNameValue = makeString(columnName);
quotedColumnNameList = lappend(quotedColumnNameList, quotedColumnName);
attributeList = lappend(attributeList, columnNameValue);
}
if (partitionMethod != DISTRIBUTE_BY_NONE &&
@ -1949,10 +2131,18 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
{
copyStatement->relation = makeRangeVar(schemaName, relationName, -1);
copyStatement->options = NIL;
if (copyOutState->binary)
{
DefElem *binaryFormatOption =
makeDefElem("format", (Node *) makeString("binary"), -1);
copyStatement->options = lappend(copyStatement->options, binaryFormatOption);
}
}
copyStatement->query = NULL;
copyStatement->attlist = quotedColumnNameList;
copyStatement->attlist = attributeList;
copyStatement->is_from = true;
copyStatement->is_program = false;
copyStatement->filename = NULL;
@ -2423,12 +2613,22 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
CitusCopyFrom(copyStatement, completionTag);
return NULL;
}
else if (copyStatement->filename == NULL && !copyStatement->is_program &&
!CopyStatementHasFormat(copyStatement, "binary"))
{
/*
* COPY table TO STDOUT is handled by specialized logic to
* avoid buffering the table on the coordinator. This enables
* pg_dump of large tables.
*/
CitusCopyTo(copyStatement, completionTag);
return NULL;
}
else
{
/*
* The copy code only handles SELECTs in COPY ... TO on master tables,
* as that can be done non-invasively. To handle COPY master_rel TO
* the copy statement is replaced by a generated select statement.
* COPY table TO PROGRAM / file is handled by wrapping the table
* in a SELECT * FROM table and going through the result COPY logic.
*/
ColumnRef *allColumns = makeNode(ColumnRef);
SelectStmt *selectStmt = makeNode(SelectStmt);
@ -2505,6 +2705,147 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
}
/*
* CitusCopyTo runs a COPY .. TO STDOUT command on each shard to to a full
* table dump.
*/
static void
CitusCopyTo(CopyStmt *copyStatement, char *completionTag)
{
ListCell *shardIntervalCell = NULL;
int64 tuplesSent = 0;
Relation distributedRelation = heap_openrv(copyStatement->relation, AccessShareLock);
Oid relationId = RelationGetRelid(distributedRelation);
TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation);
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->binary = false;
copyOutState->attnumlist = CopyGetAttnums(tupleDescriptor, distributedRelation,
copyStatement->attlist);
SendCopyBegin(copyOutState);
List *shardIntervalList = LoadShardIntervalList(relationId);
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = lfirst(shardIntervalCell);
List *shardPlacementList = ActiveShardPlacementList(shardInterval->shardId);
ListCell *shardPlacementCell = NULL;
int placementIndex = 0;
StringInfo copyCommand = ConstructCopyStatement(copyStatement,
shardInterval->shardId);
foreach(shardPlacementCell, shardPlacementList)
{
ShardPlacement *shardPlacement = lfirst(shardPlacementCell);
int connectionFlags = 0;
char *userName = NULL;
const bool raiseErrors = true;
MultiConnection *connection = GetPlacementConnection(connectionFlags,
shardPlacement,
userName);
if (placementIndex == list_length(shardPlacementList) - 1)
{
/* last chance for this shard */
MarkRemoteTransactionCritical(connection);
}
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
HandleRemoteTransactionConnectionError(connection, raiseErrors);
continue;
}
RemoteTransactionBeginIfNecessary(connection);
if (!SendRemoteCommand(connection, copyCommand->data))
{
HandleRemoteTransactionConnectionError(connection, raiseErrors);
continue;
}
PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
if (PQresultStatus(result) != PGRES_COPY_OUT)
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
tuplesSent += ForwardCopyDataFromConnection(copyOutState, connection);
break;
}
if (shardIntervalCell == list_head(shardIntervalList))
{
/* remove header after the first shard */
RemoveOptionFromList(copyStatement->options, "header");
}
}
SendCopyEnd(copyOutState);
heap_close(distributedRelation, AccessShareLock);
if (completionTag != NULL)
{
snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "COPY " UINT64_FORMAT,
tuplesSent);
}
}
/*
* ForwardCopyDataFromConnection forwards copy data received over the given connection
* to the client or file descriptor.
*/
static int64
ForwardCopyDataFromConnection(CopyOutState copyOutState, MultiConnection *connection)
{
char *receiveBuffer = NULL;
const int useAsync = 0;
bool raiseErrors = true;
int64 tuplesSent = 0;
/* receive copy data message in a synchronous manner */
int receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, useAsync);
while (receiveLength > 0)
{
bool includeEndOfLine = false;
CopySendData(copyOutState, receiveBuffer, receiveLength);
CopySendEndOfRow(copyOutState, includeEndOfLine);
tuplesSent++;
PQfreemem(receiveBuffer);
receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, useAsync);
}
if (receiveLength != -1)
{
ReportConnectionError(connection, ERROR);
}
PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
ClearResults(connection, raiseErrors);
return tuplesSent;
}
/*
* Check whether the current user has the permission to execute a COPY
* statement, raise ERROR if not. In some cases we have to do this separately
@ -2932,7 +3273,7 @@ StartPlacementStateCopyCommand(CopyPlacementState *placementState,
bool raiseInterrupts = true;
bool binaryCopy = copyOutState->binary;
StringInfo copyCommand = ConstructCopyStatement(copyStatement, shardId, binaryCopy);
StringInfo copyCommand = ConstructCopyStatement(copyStatement, shardId);
if (!SendRemoteCommand(connection, copyCommand->data))
{

View File

@ -24,6 +24,18 @@
#define INVALID_PARTITION_COLUMN_INDEX -1
/*
* CitusCopyDest indicates the source or destination of a COPY command.
*/
typedef enum CitusCopyDest
{
COPY_FILE, /* to/from file (or a piped program) */
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
COPY_CALLBACK /* to/from callback function */
} CitusCopyDest;
/*
* A smaller version of copy.c's CopyStateData, trimmed to the elements
* necessary to copy out results. While it'd be a bit nicer to share code,
@ -31,8 +43,10 @@
*/
typedef struct CopyOutStateData
{
CitusCopyDest copy_dest; /* type of copy source/destination */
StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
* dest == COPY_NEW_FE in COPY FROM */
List *attnumlist; /* integer list of attnums to copy */
int file_encoding; /* file or remote side's character encoding */
bool need_transcoding; /* file encoding diff from server? */
bool binary; /* binary format? */

View File

@ -14,6 +14,7 @@
# Regression test output
/regression.diffs
/regression.out
/test_times.log
# Regression test timing
/test_times.log

View File

@ -42,7 +42,7 @@ SELECT citus.dump_network_traffic();
(0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=worker_apply_shard_ddl_command,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']")
(0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]")
(0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
(0,coordinator,"['Query(query=COPY public.copy_test_XXXXXX FROM STDIN WITH (FORMAT BINARY))']")
(0,coordinator,"[""Query(query=COPY public.copy_test_XXXXXX FROM STDIN WITH (format 'binary'))""]")
(0,worker,"[""Backend(type=G,body=b'\\\\x01\\\\x00\\\\x02\\\\x00\\\\x01\\\\x00\\\\x01')""]")
(0,coordinator,"[""CopyData(data=b'PGCOPY\\\\n\\\\xff\\\\r\\\\n\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x04')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x03\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\t')"", ""CopyData(data=b'\\\\xff\\\\xff')"", 'CopyDone()']")
(0,worker,"['CommandComplete(command=COPY 4)', 'ReadyForQuery(state=in_transaction_block)']")

View File

@ -39,7 +39,7 @@ SELECT citus.dump_network_traffic();
(0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']")
(0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]")
(0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
(0,coordinator,"['Query(query=COPY public.copy_test_XXXXXX (key, value) FROM STDIN WITH (FORMAT BINARY))']")
(0,coordinator,"[""Query(query=COPY public.copy_test_XXXXXX (key, value) FROM STDIN WITH (format 'binary'))""]")
(0,worker,"[""Backend(type=G,body=b'\\\\x01\\\\x00\\\\x02\\\\x00\\\\x01\\\\x00\\\\x01')""]")
(0,coordinator,"[""CopyData(data=b'PGCOPY\\\\n\\\\xff\\\\r\\\\n\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x04')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x03\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\t')"", ""CopyData(data=b'\\\\xff\\\\xff')"", 'CopyDone()']")
(0,worker,"['CommandComplete(command=COPY 4)', 'ReadyForQuery(state=in_transaction_block)']")

View File

@ -953,8 +953,8 @@ COPY reference_table_test (value_2, value_3, value_4) FROM STDIN WITH CSV;
COPY reference_table_test (value_3) FROM STDIN WITH CSV;
COPY reference_table_test FROM STDIN WITH CSV;
COPY reference_table_test TO STDOUT WITH CSV;
1,1,1,Fri Jan 01 00:00:00 2016
,2,2,Sat Jan 02 00:00:00 2016
1,1,1,2016-01-01 00:00:00
,2,2,2016-01-02 00:00:00
,,3,
,,,
-- INSERT INTO SELECT among reference tables

View File

@ -542,8 +542,6 @@ DETAIL: distribution column value: 1
-- copying from a single shard table does not require the master query
COPY articles_single_shard TO stdout;
DEBUG: Creating router plan
DEBUG: Plan is router executable
50 10 anjanette 19519
SELECT avg(word_count)
FROM articles

View File

@ -486,8 +486,6 @@ DETAIL: distribution column value: 1
-- copying from a single shard table does not require the master query
COPY articles_single_shard TO stdout;
DEBUG: Creating router plan
DEBUG: Plan is router executable
50 10 anjanette 19519
SELECT avg(word_count)
FROM articles

View File

@ -0,0 +1,97 @@
CREATE TEMPORARY TABLE output (line text);
CREATE SCHEMA dumper;
SET search_path TO 'dumper';
SET citus.next_shard_id TO 2900000;
SET citus.shard_replication_factor TO 1;
CREATE TABLE data (
key int,
value text
);
SELECT create_distributed_table('data', 'key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COPY data FROM STDIN WITH (format csv, delimiter '|', escape '\');
-- duplicate the data using pg_dump
\COPY output FROM PROGRAM 'pg_dump --quote-all-identifiers -h localhost -p 57636 -U postgres -d regression -t dumper.data --data-only | psql -tAX -h localhost -p 57636 -U postgres -d regression'
-- data should now appear twice
COPY data TO STDOUT;
1 {this:is,json:1}
1 {this:is,json:1}
3 {{}:\t}
4 {}
3 {{}:\t}
4 {}
2 {$":9}
2 {$":9}
-- go crazy with names
CREATE TABLE "weird.table" (
"key," int primary key,
"data.jsonb" jsonb,
"?empty(" text default ''
);
SELECT create_distributed_table('"weird.table"', 'key,');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE INDEX "weird.json_idx" ON "weird.table" USING GIN ("data.jsonb" jsonb_path_ops);
COPY "weird.table" ("key,", "data.jsonb") FROM STDIN WITH (format 'text');
-- fast table dump with many options
COPY dumper."weird.table" ("data.jsonb", "?empty(")TO STDOUT WITH (format csv, force_quote ("?empty("), null 'null', delimiter '?', quote '_', header 1);
data.jsonb?_?empty(_
{"weird": {"table": "{:"}}?__
_{"?\"": []}_?__
-- do a full pg_dump of the schema, use some weird quote/escape/delimiter characters to capture the full line
\COPY output FROM PROGRAM 'pg_dump -f results/pg_dump.tmp -h localhost -p 57636 -U postgres -d regression -n dumper --quote-all-identifiers' WITH (format csv, delimiter '|', escape '^', quote '^')
-- drop the schema
DROP SCHEMA dumper CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table data
drop cascades to table "weird.table"
-- recreate the schema
\COPY (SELECT line FROM output WHERE line IS NOT NULL) TO PROGRAM 'psql -qtAX -h localhost -p 57636 -U postgres -d regression -f results/pg_dump.tmp' WITH (format csv, delimiter '|', escape '^', quote '^')
-- redistribute the schema
SELECT create_distributed_table('data', 'key');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('"weird.table"', 'key,');
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- check the table contents
COPY data (value) TO STDOUT WITH (format csv, force_quote *);
"{this:is,json:1}"
"{this:is,json:1}"
"{{}: }"
"{}"
"{{}: }"
"{}"
"{$"":9}"
"{$"":9}"
COPY dumper."weird.table" ("data.jsonb", "?empty(") TO STDOUT WITH (format csv, force_quote ("?empty("), null 'null', header true);
data.jsonb,?empty(
"{""weird"": {""table"": ""{:""}}",""
"{""?\"""": []}",""
SELECT indexname FROM pg_indexes WHERE tablename = 'weird.table' ORDER BY indexname;
indexname
---------------------------------------------------------------------
weird.json_idx
weird.table_pkey
(2 rows)
DROP SCHEMA dumper CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table data
drop cascades to table "weird.table"

View File

@ -203,7 +203,7 @@ test: multi_transaction_recovery
# multi_copy creates hash and range-partitioned tables and performs COPY
# multi_router_planner creates hash partitioned tables.
# ---------
test: multi_copy fast_path_router_modify
test: multi_copy fast_path_router_modify pg_dump
test: multi_router_planner multi_router_planner_fast_path null_parameters
# ----------

View File

@ -0,0 +1,64 @@
CREATE TEMPORARY TABLE output (line text);
CREATE SCHEMA dumper;
SET search_path TO 'dumper';
SET citus.next_shard_id TO 2900000;
SET citus.shard_replication_factor TO 1;
CREATE TABLE data (
key int,
value text
);
SELECT create_distributed_table('data', 'key');
COPY data FROM STDIN WITH (format csv, delimiter '|', escape '\');
1|{"this":"is","json":1}
2|{"$\"":9}
3|{"{}":" "}
4|{}
\.
-- duplicate the data using pg_dump
\COPY output FROM PROGRAM 'pg_dump --quote-all-identifiers -h localhost -p 57636 -U postgres -d regression -t dumper.data --data-only | psql -tAX -h localhost -p 57636 -U postgres -d regression'
-- data should now appear twice
COPY data TO STDOUT;
-- go crazy with names
CREATE TABLE "weird.table" (
"key," int primary key,
"data.jsonb" jsonb,
"?empty(" text default ''
);
SELECT create_distributed_table('"weird.table"', 'key,');
CREATE INDEX "weird.json_idx" ON "weird.table" USING GIN ("data.jsonb" jsonb_path_ops);
COPY "weird.table" ("key,", "data.jsonb") FROM STDIN WITH (format 'text');
1 {"weird":{"table":"{:"}}
2 {"?\\\"":[]}
\.
-- fast table dump with many options
COPY dumper."weird.table" ("data.jsonb", "?empty(")TO STDOUT WITH (format csv, force_quote ("?empty("), null 'null', delimiter '?', quote '_', header 1);
-- do a full pg_dump of the schema, use some weird quote/escape/delimiter characters to capture the full line
\COPY output FROM PROGRAM 'pg_dump -f results/pg_dump.tmp -h localhost -p 57636 -U postgres -d regression -n dumper --quote-all-identifiers' WITH (format csv, delimiter '|', escape '^', quote '^')
-- drop the schema
DROP SCHEMA dumper CASCADE;
-- recreate the schema
\COPY (SELECT line FROM output WHERE line IS NOT NULL) TO PROGRAM 'psql -qtAX -h localhost -p 57636 -U postgres -d regression -f results/pg_dump.tmp' WITH (format csv, delimiter '|', escape '^', quote '^')
-- redistribute the schema
SELECT create_distributed_table('data', 'key');
SELECT create_distributed_table('"weird.table"', 'key,');
-- check the table contents
COPY data (value) TO STDOUT WITH (format csv, force_quote *);
COPY dumper."weird.table" ("data.jsonb", "?empty(") TO STDOUT WITH (format csv, force_quote ("?empty("), null 'null', header true);
SELECT indexname FROM pg_indexes WHERE tablename = 'weird.table' ORDER BY indexname;
DROP SCHEMA dumper CASCADE;