diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 4d82ab87b..108739832 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -86,6 +86,7 @@ #include "distributed/worker_protocol.h" #include "executor/executor.h" #include "foreign/foreign.h" +#include "libpq/libpq.h" #include "libpq/pqformat.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -197,14 +198,13 @@ static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, ShardConnections *shardConnections, bool stopOnFailure, bool useBinaryCopyFormat); - +static List * RemoveOptionFromList(List *optionList, char *optionName); static bool BinaryOutputFunctionDefined(Oid typeId); static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId, List *connectionList); static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId, List *connectionList); -static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, - bool useBinaryCopyFormat); +static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId); static void SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList); static void SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *connection); @@ -246,13 +246,19 @@ static void EndPlacementStateCopyCommand(CopyPlacementState *placementState, static void UnclaimCopyConnections(List *connectionStateList); static void ShutdownCopyConnectionState(CopyConnectionState *connectionState, CitusCopyDestReceiver *copyDest); +static void CitusCopyTo(CopyStmt *copyStatement, char *completionTag); +static int64 ForwardCopyDataFromConnection(CopyOutState copyOutState, + MultiConnection *connection); /* Private functions copied and adapted from copy.c in PostgreSQL */ +static void SendCopyBegin(CopyOutState cstate); +static void SendCopyEnd(CopyOutState cstate); static void CopySendData(CopyOutState outputState, const void *databuf, int datasize); static void CopySendString(CopyOutState outputState, const char *str); static void CopySendChar(CopyOutState outputState, char c); static void CopySendInt32(CopyOutState outputState, int32 val); static void CopySendInt16(CopyOutState outputState, int16 val); +static void CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine); static void CopyAttributeOutText(CopyOutState outputState, char *string); static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer); static bool CitusSendTupleToPlacements(TupleTableSlot *slot, @@ -571,8 +577,18 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) * From here on we use copyStatement as the template for the command * that we send to workers. This command does not have an attribute * list since NextCopyFrom will generate a value for all columns. + * We also strip options. */ copyStatement->attlist = NIL; + copyStatement->options = NIL; + + if (copyOutState->binary) + { + DefElem *binaryFormatOption = + makeDefElem("format", (Node *) makeString("binary"), -1); + + copyStatement->options = lappend(copyStatement->options, binaryFormatOption); + } while (true) { @@ -686,6 +702,32 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) } +/* + * RemoveOptionFromList removes an option from a list of options in a + * COPY .. WITH (..) statement by name and returns the resulting list. + */ +static List * +RemoveOptionFromList(List *optionList, char *optionName) +{ + ListCell *optionCell = NULL; + ListCell *previousCell = NULL; + + foreach(optionCell, optionList) + { + DefElem *option = (DefElem *) lfirst(optionCell); + + if (strncmp(option->defname, optionName, NAMEDATALEN) == 0) + { + return list_delete_cell(optionList, optionCell, previousCell); + } + + previousCell = optionCell; + } + + return optionList; +} + + /* * OpenCopyConnectionsForNewShards opens a connection for each placement of a shard and * starts a COPY transaction if necessary. If a connection cannot be opened, @@ -759,8 +801,7 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, RemoteTransactionBeginIfNecessary(connection); StringInfo copyCommand = ConstructCopyStatement(copyStatement, - shardConnections->shardId, - useBinaryCopyFormat); + shardConnections->shardId); if (!SendRemoteCommand(connection, copyCommand->data)) { @@ -939,7 +980,7 @@ SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId, List *connection * shard. */ static StringInfo -ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCopyFormat) +ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId) { StringInfo command = makeStringInfo(); @@ -961,35 +1002,76 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCop foreach(columnNameCell, copyStatement->attlist) { - char *columnName = (char *) lfirst(columnNameCell); + char *columnName = strVal(lfirst(columnNameCell)); + const char *quotedColumnName = quote_identifier(columnName); if (!appendedFirstName) { - appendStringInfo(command, "(%s", columnName); + appendStringInfo(command, "(%s", quotedColumnName); appendedFirstName = true; } else { - appendStringInfo(command, ", %s", columnName); + appendStringInfo(command, ", %s", quotedColumnName); } } appendStringInfoString(command, ") "); } - appendStringInfo(command, "FROM STDIN WITH "); - - if (IsCopyResultStmt(copyStatement)) + if (copyStatement->is_from) { - appendStringInfoString(command, "(FORMAT RESULT)"); - } - else if (useBinaryCopyFormat) - { - appendStringInfoString(command, "(FORMAT BINARY)"); + appendStringInfoString(command, "FROM STDIN"); } else { - appendStringInfoString(command, "(FORMAT TEXT)"); + appendStringInfoString(command, "TO STDOUT"); + } + + if (copyStatement->options != NIL) + { + ListCell *optionCell = NULL; + + appendStringInfoString(command, " WITH ("); + + foreach(optionCell, copyStatement->options) + { + DefElem *defel = (DefElem *) lfirst(optionCell); + + if (optionCell != list_head(copyStatement->options)) + { + appendStringInfoString(command, ", "); + } + + appendStringInfo(command, "%s", defel->defname); + + if (defel->arg == NULL) + { + /* option without value */ + } + else if (IsA(defel->arg, String)) + { + char *value = defGetString(defel); + + /* make sure strings are quoted (may contain reserved characters) */ + appendStringInfo(command, " %s", quote_literal_cstr(value)); + } + else if (IsA(defel->arg, List)) + { + List *nameList = defGetStringList(defel); + + appendStringInfo(command, " (%s)", NameListToQuotedString(nameList)); + } + else + { + char *value = defGetString(defel); + + /* numeric options or * should not have quotes */ + appendStringInfo(command, " %s", value); + } + } + + appendStringInfoString(command, ")"); } return command; @@ -1608,6 +1690,67 @@ CreateEmptyShard(char *relationName) /* *INDENT-OFF* */ + + +/* + * Send copy start/stop messages for frontend copies. These have changed + * in past protocol redesigns. + */ +static void +SendCopyBegin(CopyOutState cstate) +{ + if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) + { + /* new way */ + StringInfoData buf; + int natts = list_length(cstate->attnumlist); + int16 format = (cstate->binary ? 1 : 0); + int i; + + pq_beginmessage(&buf, 'H'); + pq_sendbyte(&buf, format); /* overall format */ + pq_sendint16(&buf, natts); + for (i = 0; i < natts; i++) + pq_sendint16(&buf, format); /* per-column formats */ + pq_endmessage(&buf); + cstate->copy_dest = COPY_NEW_FE; + } + else + { + /* old way */ + if (cstate->binary) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY BINARY is not supported to stdout or from stdin"))); + pq_putemptymessage('H'); + /* grottiness needed for old COPY OUT protocol */ + pq_startcopyout(); + cstate->copy_dest = COPY_OLD_FE; + } +} + + +/* End a copy stream sent to the client */ +static void +SendCopyEnd(CopyOutState cstate) +{ + if (cstate->copy_dest == COPY_NEW_FE) + { + /* Shouldn't have any unsent data */ + Assert(cstate->fe_msgbuf->len == 0); + /* Send Copy Done message */ + pq_putemptymessage('c'); + } + else + { + CopySendData(cstate, "\\.", 2); + /* Need to flush out the trailer (this also appends a newline) */ + CopySendEndOfRow(cstate, true); + pq_endcopyout(false); + } +} + + /* Append data to the copy buffer in outputState */ static void CopySendData(CopyOutState outputState, const void *databuf, int datasize) @@ -1650,6 +1793,45 @@ CopySendInt16(CopyOutState outputState, int16 val) } +/* Send the row to the appropriate destination */ +static void +CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine) +{ + StringInfo fe_msgbuf = cstate->fe_msgbuf; + + switch (cstate->copy_dest) + { + case COPY_OLD_FE: + /* The FE/BE protocol uses \n as newline for all platforms */ + if (!cstate->binary && includeEndOfLine) + CopySendChar(cstate, '\n'); + + if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len)) + { + /* no hope of recovering connection sync, so FATAL */ + ereport(FATAL, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection lost during COPY to stdout"))); + } + break; + case COPY_NEW_FE: + /* The FE/BE protocol uses \n as newline for all platforms */ + if (!cstate->binary && includeEndOfLine) + CopySendChar(cstate, '\n'); + + /* Dump the accumulated row as one CopyData message */ + (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); + break; + case COPY_FILE: + case COPY_CALLBACK: + Assert(false); /* Not yet supported. */ + break; + } + + resetStringInfo(fe_msgbuf); +} + + /* * Send text representation of one column, with conversion and escaping. * @@ -1817,7 +1999,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, char *schemaName = get_namespace_name(schemaOid); List *columnNameList = copyDest->columnNameList; - List *quotedColumnNameList = NIL; + List *attributeList = NIL; ListCell *columnNameCell = NULL; @@ -1916,13 +2098,13 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, TypeOutputFunctions(columnCount, finalTypeArray, copyOutState->binary); } - /* ensure the column names are properly quoted in the COPY statement */ + /* wrap the column names as Values */ foreach(columnNameCell, columnNameList) { char *columnName = (char *) lfirst(columnNameCell); - char *quotedColumnName = (char *) quote_identifier(columnName); + Value *columnNameValue = makeString(columnName); - quotedColumnNameList = lappend(quotedColumnNameList, quotedColumnName); + attributeList = lappend(attributeList, columnNameValue); } if (partitionMethod != DISTRIBUTE_BY_NONE && @@ -1949,10 +2131,18 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, { copyStatement->relation = makeRangeVar(schemaName, relationName, -1); copyStatement->options = NIL; + + if (copyOutState->binary) + { + DefElem *binaryFormatOption = + makeDefElem("format", (Node *) makeString("binary"), -1); + + copyStatement->options = lappend(copyStatement->options, binaryFormatOption); + } } copyStatement->query = NULL; - copyStatement->attlist = quotedColumnNameList; + copyStatement->attlist = attributeList; copyStatement->is_from = true; copyStatement->is_program = false; copyStatement->filename = NULL; @@ -2423,12 +2613,22 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS CitusCopyFrom(copyStatement, completionTag); return NULL; } + else if (copyStatement->filename == NULL && !copyStatement->is_program && + !CopyStatementHasFormat(copyStatement, "binary")) + { + /* + * COPY table TO STDOUT is handled by specialized logic to + * avoid buffering the table on the coordinator. This enables + * pg_dump of large tables. + */ + CitusCopyTo(copyStatement, completionTag); + return NULL; + } else { /* - * The copy code only handles SELECTs in COPY ... TO on master tables, - * as that can be done non-invasively. To handle COPY master_rel TO - * the copy statement is replaced by a generated select statement. + * COPY table TO PROGRAM / file is handled by wrapping the table + * in a SELECT * FROM table and going through the result COPY logic. */ ColumnRef *allColumns = makeNode(ColumnRef); SelectStmt *selectStmt = makeNode(SelectStmt); @@ -2505,6 +2705,147 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS } +/* + * CitusCopyTo runs a COPY .. TO STDOUT command on each shard to to a full + * table dump. + */ +static void +CitusCopyTo(CopyStmt *copyStatement, char *completionTag) +{ + ListCell *shardIntervalCell = NULL; + int64 tuplesSent = 0; + + Relation distributedRelation = heap_openrv(copyStatement->relation, AccessShareLock); + Oid relationId = RelationGetRelid(distributedRelation); + TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation); + + CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); + copyOutState->fe_msgbuf = makeStringInfo(); + copyOutState->binary = false; + copyOutState->attnumlist = CopyGetAttnums(tupleDescriptor, distributedRelation, + copyStatement->attlist); + + SendCopyBegin(copyOutState); + + List *shardIntervalList = LoadShardIntervalList(relationId); + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = lfirst(shardIntervalCell); + List *shardPlacementList = ActiveShardPlacementList(shardInterval->shardId); + ListCell *shardPlacementCell = NULL; + int placementIndex = 0; + + StringInfo copyCommand = ConstructCopyStatement(copyStatement, + shardInterval->shardId); + + foreach(shardPlacementCell, shardPlacementList) + { + ShardPlacement *shardPlacement = lfirst(shardPlacementCell); + int connectionFlags = 0; + char *userName = NULL; + const bool raiseErrors = true; + + MultiConnection *connection = GetPlacementConnection(connectionFlags, + shardPlacement, + userName); + + if (placementIndex == list_length(shardPlacementList) - 1) + { + /* last chance for this shard */ + MarkRemoteTransactionCritical(connection); + } + + if (PQstatus(connection->pgConn) != CONNECTION_OK) + { + HandleRemoteTransactionConnectionError(connection, raiseErrors); + continue; + } + + RemoteTransactionBeginIfNecessary(connection); + + if (!SendRemoteCommand(connection, copyCommand->data)) + { + HandleRemoteTransactionConnectionError(connection, raiseErrors); + continue; + } + + PGresult *result = GetRemoteCommandResult(connection, raiseErrors); + if (PQresultStatus(result) != PGRES_COPY_OUT) + { + ReportResultError(connection, result, ERROR); + } + + PQclear(result); + + tuplesSent += ForwardCopyDataFromConnection(copyOutState, connection); + break; + } + + if (shardIntervalCell == list_head(shardIntervalList)) + { + /* remove header after the first shard */ + RemoveOptionFromList(copyStatement->options, "header"); + } + } + + SendCopyEnd(copyOutState); + + heap_close(distributedRelation, AccessShareLock); + + if (completionTag != NULL) + { + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "COPY " UINT64_FORMAT, + tuplesSent); + } +} + + +/* + * ForwardCopyDataFromConnection forwards copy data received over the given connection + * to the client or file descriptor. + */ +static int64 +ForwardCopyDataFromConnection(CopyOutState copyOutState, MultiConnection *connection) +{ + char *receiveBuffer = NULL; + const int useAsync = 0; + bool raiseErrors = true; + int64 tuplesSent = 0; + + /* receive copy data message in a synchronous manner */ + int receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, useAsync); + while (receiveLength > 0) + { + bool includeEndOfLine = false; + + CopySendData(copyOutState, receiveBuffer, receiveLength); + CopySendEndOfRow(copyOutState, includeEndOfLine); + tuplesSent++; + + PQfreemem(receiveBuffer); + + receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, useAsync); + } + + if (receiveLength != -1) + { + ReportConnectionError(connection, ERROR); + } + + PGresult *result = GetRemoteCommandResult(connection, raiseErrors); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + + PQclear(result); + ClearResults(connection, raiseErrors); + + return tuplesSent; +} + + /* * Check whether the current user has the permission to execute a COPY * statement, raise ERROR if not. In some cases we have to do this separately @@ -2932,7 +3273,7 @@ StartPlacementStateCopyCommand(CopyPlacementState *placementState, bool raiseInterrupts = true; bool binaryCopy = copyOutState->binary; - StringInfo copyCommand = ConstructCopyStatement(copyStatement, shardId, binaryCopy); + StringInfo copyCommand = ConstructCopyStatement(copyStatement, shardId); if (!SendRemoteCommand(connection, copyCommand->data)) { diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index e01cf9317..1939789b1 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -24,6 +24,18 @@ #define INVALID_PARTITION_COLUMN_INDEX -1 +/* + * CitusCopyDest indicates the source or destination of a COPY command. + */ +typedef enum CitusCopyDest +{ + COPY_FILE, /* to/from file (or a piped program) */ + COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ + COPY_NEW_FE, /* to/from frontend (3.0 protocol) */ + COPY_CALLBACK /* to/from callback function */ +} CitusCopyDest; + + /* * A smaller version of copy.c's CopyStateData, trimmed to the elements * necessary to copy out results. While it'd be a bit nicer to share code, @@ -31,8 +43,10 @@ */ typedef struct CopyOutStateData { + CitusCopyDest copy_dest; /* type of copy source/destination */ StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for * dest == COPY_NEW_FE in COPY FROM */ + List *attnumlist; /* integer list of attnums to copy */ int file_encoding; /* file or remote side's character encoding */ bool need_transcoding; /* file encoding diff from server? */ bool binary; /* binary format? */ diff --git a/src/test/regress/.gitignore b/src/test/regress/.gitignore index a31b197a3..8a525eed3 100644 --- a/src/test/regress/.gitignore +++ b/src/test/regress/.gitignore @@ -14,6 +14,7 @@ # Regression test output /regression.diffs /regression.out +/test_times.log # Regression test timing /test_times.log diff --git a/src/test/regress/expected/failure_1pc_copy_append.out b/src/test/regress/expected/failure_1pc_copy_append.out index bae1675cb..0bbf05175 100644 --- a/src/test/regress/expected/failure_1pc_copy_append.out +++ b/src/test/regress/expected/failure_1pc_copy_append.out @@ -42,7 +42,7 @@ SELECT citus.dump_network_traffic(); (0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=worker_apply_shard_ddl_command,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']") (0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]") (0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") - (0,coordinator,"['Query(query=COPY public.copy_test_XXXXXX FROM STDIN WITH (FORMAT BINARY))']") + (0,coordinator,"[""Query(query=COPY public.copy_test_XXXXXX FROM STDIN WITH (format 'binary'))""]") (0,worker,"[""Backend(type=G,body=b'\\\\x01\\\\x00\\\\x02\\\\x00\\\\x01\\\\x00\\\\x01')""]") (0,coordinator,"[""CopyData(data=b'PGCOPY\\\\n\\\\xff\\\\r\\\\n\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x04')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x03\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\t')"", ""CopyData(data=b'\\\\xff\\\\xff')"", 'CopyDone()']") (0,worker,"['CommandComplete(command=COPY 4)', 'ReadyForQuery(state=in_transaction_block)']") diff --git a/src/test/regress/expected/failure_1pc_copy_hash.out b/src/test/regress/expected/failure_1pc_copy_hash.out index 1a9d36355..cc9584d02 100644 --- a/src/test/regress/expected/failure_1pc_copy_hash.out +++ b/src/test/regress/expected/failure_1pc_copy_hash.out @@ -39,7 +39,7 @@ SELECT citus.dump_network_traffic(); (0,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']") (0,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]") (0,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") - (0,coordinator,"['Query(query=COPY public.copy_test_XXXXXX (key, value) FROM STDIN WITH (FORMAT BINARY))']") + (0,coordinator,"[""Query(query=COPY public.copy_test_XXXXXX (key, value) FROM STDIN WITH (format 'binary'))""]") (0,worker,"[""Backend(type=G,body=b'\\\\x01\\\\x00\\\\x02\\\\x00\\\\x01\\\\x00\\\\x01')""]") (0,coordinator,"[""CopyData(data=b'PGCOPY\\\\n\\\\xff\\\\r\\\\n\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x00')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x01')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x04')"", ""CopyData(data=b'\\\\x00\\\\x02\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\x03\\\\x00\\\\x00\\\\x00\\\\x04\\\\x00\\\\x00\\\\x00\\\\t')"", ""CopyData(data=b'\\\\xff\\\\xff')"", 'CopyDone()']") (0,worker,"['CommandComplete(command=COPY 4)', 'ReadyForQuery(state=in_transaction_block)']") diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 8763eaad5..6b56ddd96 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -953,8 +953,8 @@ COPY reference_table_test (value_2, value_3, value_4) FROM STDIN WITH CSV; COPY reference_table_test (value_3) FROM STDIN WITH CSV; COPY reference_table_test FROM STDIN WITH CSV; COPY reference_table_test TO STDOUT WITH CSV; -1,1,1,Fri Jan 01 00:00:00 2016 -,2,2,Sat Jan 02 00:00:00 2016 +1,1,1,2016-01-01 00:00:00 +,2,2,2016-01-02 00:00:00 ,,3, ,,, -- INSERT INTO SELECT among reference tables diff --git a/src/test/regress/expected/multi_simple_queries.out b/src/test/regress/expected/multi_simple_queries.out index 7270f0af3..3720a34ec 100644 --- a/src/test/regress/expected/multi_simple_queries.out +++ b/src/test/regress/expected/multi_simple_queries.out @@ -542,8 +542,6 @@ DETAIL: distribution column value: 1 -- copying from a single shard table does not require the master query COPY articles_single_shard TO stdout; -DEBUG: Creating router plan -DEBUG: Plan is router executable 50 10 anjanette 19519 SELECT avg(word_count) FROM articles diff --git a/src/test/regress/expected/multi_simple_queries_0.out b/src/test/regress/expected/multi_simple_queries_0.out index 688724418..995c29b6b 100644 --- a/src/test/regress/expected/multi_simple_queries_0.out +++ b/src/test/regress/expected/multi_simple_queries_0.out @@ -486,8 +486,6 @@ DETAIL: distribution column value: 1 -- copying from a single shard table does not require the master query COPY articles_single_shard TO stdout; -DEBUG: Creating router plan -DEBUG: Plan is router executable 50 10 anjanette 19519 SELECT avg(word_count) FROM articles diff --git a/src/test/regress/expected/pg_dump.out b/src/test/regress/expected/pg_dump.out new file mode 100644 index 000000000..157c95f35 --- /dev/null +++ b/src/test/regress/expected/pg_dump.out @@ -0,0 +1,97 @@ +CREATE TEMPORARY TABLE output (line text); +CREATE SCHEMA dumper; +SET search_path TO 'dumper'; +SET citus.next_shard_id TO 2900000; +SET citus.shard_replication_factor TO 1; +CREATE TABLE data ( + key int, + value text +); +SELECT create_distributed_table('data', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COPY data FROM STDIN WITH (format csv, delimiter '|', escape '\'); +-- duplicate the data using pg_dump +\COPY output FROM PROGRAM 'pg_dump --quote-all-identifiers -h localhost -p 57636 -U postgres -d regression -t dumper.data --data-only | psql -tAX -h localhost -p 57636 -U postgres -d regression' +-- data should now appear twice +COPY data TO STDOUT; +1 {this:is,json:1} +1 {this:is,json:1} +3 {{}:\t} +4 {} +3 {{}:\t} +4 {} +2 {$":9} +2 {$":9} +-- go crazy with names +CREATE TABLE "weird.table" ( + "key," int primary key, + "data.jsonb" jsonb, + "?empty(" text default '' +); +SELECT create_distributed_table('"weird.table"', 'key,'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE INDEX "weird.json_idx" ON "weird.table" USING GIN ("data.jsonb" jsonb_path_ops); +COPY "weird.table" ("key,", "data.jsonb") FROM STDIN WITH (format 'text'); +-- fast table dump with many options +COPY dumper."weird.table" ("data.jsonb", "?empty(")TO STDOUT WITH (format csv, force_quote ("?empty("), null 'null', delimiter '?', quote '_', header 1); +data.jsonb?_?empty(_ +{"weird": {"table": "{:"}}?__ +_{"?\"": []}_?__ +-- do a full pg_dump of the schema, use some weird quote/escape/delimiter characters to capture the full line +\COPY output FROM PROGRAM 'pg_dump -f results/pg_dump.tmp -h localhost -p 57636 -U postgres -d regression -n dumper --quote-all-identifiers' WITH (format csv, delimiter '|', escape '^', quote '^') +-- drop the schema +DROP SCHEMA dumper CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table data +drop cascades to table "weird.table" +-- recreate the schema +\COPY (SELECT line FROM output WHERE line IS NOT NULL) TO PROGRAM 'psql -qtAX -h localhost -p 57636 -U postgres -d regression -f results/pg_dump.tmp' WITH (format csv, delimiter '|', escape '^', quote '^') + +-- redistribute the schema +SELECT create_distributed_table('data', 'key'); +NOTICE: Copying data from local table... + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('"weird.table"', 'key,'); +NOTICE: Copying data from local table... + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- check the table contents +COPY data (value) TO STDOUT WITH (format csv, force_quote *); +"{this:is,json:1}" +"{this:is,json:1}" +"{{}: }" +"{}" +"{{}: }" +"{}" +"{$"":9}" +"{$"":9}" +COPY dumper."weird.table" ("data.jsonb", "?empty(") TO STDOUT WITH (format csv, force_quote ("?empty("), null 'null', header true); +data.jsonb,?empty( +"{""weird"": {""table"": ""{:""}}","" +"{""?\"""": []}","" +SELECT indexname FROM pg_indexes WHERE tablename = 'weird.table' ORDER BY indexname; + indexname +--------------------------------------------------------------------- + weird.json_idx + weird.table_pkey +(2 rows) + +DROP SCHEMA dumper CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table data +drop cascades to table "weird.table" diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 6f8f8375e..3687f3d89 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -203,7 +203,7 @@ test: multi_transaction_recovery # multi_copy creates hash and range-partitioned tables and performs COPY # multi_router_planner creates hash partitioned tables. # --------- -test: multi_copy fast_path_router_modify +test: multi_copy fast_path_router_modify pg_dump test: multi_router_planner multi_router_planner_fast_path null_parameters # ---------- diff --git a/src/test/regress/sql/pg_dump.sql b/src/test/regress/sql/pg_dump.sql new file mode 100644 index 000000000..3426427a2 --- /dev/null +++ b/src/test/regress/sql/pg_dump.sql @@ -0,0 +1,64 @@ +CREATE TEMPORARY TABLE output (line text); + +CREATE SCHEMA dumper; +SET search_path TO 'dumper'; + +SET citus.next_shard_id TO 2900000; +SET citus.shard_replication_factor TO 1; + +CREATE TABLE data ( + key int, + value text +); +SELECT create_distributed_table('data', 'key'); + +COPY data FROM STDIN WITH (format csv, delimiter '|', escape '\'); +1|{"this":"is","json":1} +2|{"$\"":9} +3|{"{}":" "} +4|{} +\. + +-- duplicate the data using pg_dump +\COPY output FROM PROGRAM 'pg_dump --quote-all-identifiers -h localhost -p 57636 -U postgres -d regression -t dumper.data --data-only | psql -tAX -h localhost -p 57636 -U postgres -d regression' + +-- data should now appear twice +COPY data TO STDOUT; + +-- go crazy with names +CREATE TABLE "weird.table" ( + "key," int primary key, + "data.jsonb" jsonb, + "?empty(" text default '' +); +SELECT create_distributed_table('"weird.table"', 'key,'); +CREATE INDEX "weird.json_idx" ON "weird.table" USING GIN ("data.jsonb" jsonb_path_ops); + +COPY "weird.table" ("key,", "data.jsonb") FROM STDIN WITH (format 'text'); +1 {"weird":{"table":"{:"}} +2 {"?\\\"":[]} +\. + +-- fast table dump with many options +COPY dumper."weird.table" ("data.jsonb", "?empty(")TO STDOUT WITH (format csv, force_quote ("?empty("), null 'null', delimiter '?', quote '_', header 1); + +-- do a full pg_dump of the schema, use some weird quote/escape/delimiter characters to capture the full line +\COPY output FROM PROGRAM 'pg_dump -f results/pg_dump.tmp -h localhost -p 57636 -U postgres -d regression -n dumper --quote-all-identifiers' WITH (format csv, delimiter '|', escape '^', quote '^') + +-- drop the schema +DROP SCHEMA dumper CASCADE; + +-- recreate the schema +\COPY (SELECT line FROM output WHERE line IS NOT NULL) TO PROGRAM 'psql -qtAX -h localhost -p 57636 -U postgres -d regression -f results/pg_dump.tmp' WITH (format csv, delimiter '|', escape '^', quote '^') + +-- redistribute the schema +SELECT create_distributed_table('data', 'key'); +SELECT create_distributed_table('"weird.table"', 'key,'); + +-- check the table contents +COPY data (value) TO STDOUT WITH (format csv, force_quote *); +COPY dumper."weird.table" ("data.jsonb", "?empty(") TO STDOUT WITH (format csv, force_quote ("?empty("), null 'null', header true); + +SELECT indexname FROM pg_indexes WHERE tablename = 'weird.table' ORDER BY indexname; + +DROP SCHEMA dumper CASCADE;