Send COPY rows in binary format

pull/416/head
Metin Doslu 2016-03-29 18:25:33 +02:00 committed by Marco Slot
parent d25ee8fbd8
commit 1150ce6414
10 changed files with 848 additions and 712 deletions

File diff suppressed because it is too large Load Diff

View File

@ -303,7 +303,9 @@ VerifyTransmitStmt(CopyStmt *copyStatement)
/*
* ProcessCopyStmt handles Citus specific concerns for COPY like supporting
* COPYing from distributed tables and preventing unsupported actions.
* COPYing from distributed tables and preventing unsupported actions. The
* function returns a modified COPY statement to be executed, or NULL if no
* further processing is needed.
*/
static Node *
ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag)
@ -588,6 +590,7 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement)
{
RangeVar *relation = createIndexStatement->relation;
bool missingOk = false;
/* caller uses ShareLock for non-concurrent indexes, use the same lock here */
LOCKMODE lockMode = ShareLock;
Oid relationId = RangeVarGetRelid(relation, lockMode, missingOk);

View File

@ -20,17 +20,32 @@
#include "nodes/pg_list.h"
/* Local functions forward declarations */
static uint32 DistributedTransactionId = 0;
/* Local functions forward declarations */
static StringInfo BuildTransactionName(int connectionId);
/*
* PrepareTransactions prepares all transactions on connections in
* InitializeDistributedTransaction prepares the distributed transaction ID
* used in transaction names.
*/
void
InitializeDistributedTransaction(void)
{
DistributedTransactionId++;
}
/*
* PrepareRemoteTransactions prepares all transactions on connections in
* connectionList for commit if the 2PC transaction manager is enabled.
* On failure, it reports an error and stops.
*/
void
PrepareTransactions(List *connectionList)
PrepareRemoteTransactions(List *connectionList)
{
ListCell *connectionCell = NULL;
@ -57,7 +72,7 @@ PrepareTransactions(List *connectionList)
PQclear(result);
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to prepare transaction")));
errmsg("failed to prepare transaction")));
}
PQclear(result);
@ -68,11 +83,11 @@ PrepareTransactions(List *connectionList)
/*
* AbortTransactions aborts all transactions on connections in connectionList.
* AbortRemoteTransactions aborts all transactions on connections in connectionList.
* On failure, it reports a warning and continues to abort all of them.
*/
void
AbortTransactions(List *connectionList)
AbortRemoteTransactions(List *connectionList)
{
ListCell *connectionCell = NULL;
@ -82,8 +97,6 @@ AbortTransactions(List *connectionList)
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 connectionId = transactionConnection->connectionId;
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
PGresult *result = NULL;
if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED)
@ -96,11 +109,14 @@ AbortTransactions(List *connectionList)
result = PQexec(connection, command->data);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
/* log a warning so the user may abort the transaction later */
ereport(WARNING, (errmsg("Failed to roll back prepared transaction '%s'",
ereport(WARNING, (errmsg("failed to roll back prepared transaction '%s'",
transactionName->data),
errhint("Run ROLLBACK TRANSACTION '%s' on %s:%s",
transactionName->data, nodeName, nodePort)));
errhint("Run \"%s\" on %s:%s",
command->data, nodeName, nodePort)));
}
PQclear(result);
@ -118,11 +134,11 @@ AbortTransactions(List *connectionList)
/*
* CommitTransactions commits all transactions on connections in connectionList.
* CommitRemoteTransactions commits all transactions on connections in connectionList.
* On failure, it reports a warning and continues committing all of them.
*/
void
CommitTransactions(List *connectionList)
CommitRemoteTransactions(List *connectionList)
{
ListCell *connectionCell = NULL;
@ -132,8 +148,6 @@ CommitTransactions(List *connectionList)
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 connectionId = transactionConnection->connectionId;
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
PGresult *result = NULL;
if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED)
@ -149,11 +163,14 @@ CommitTransactions(List *connectionList)
result = PQexec(connection, command->data);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
/* log a warning so the user may commit the transaction later */
ereport(WARNING, (errmsg("Failed to commit prepared transaction '%s'",
ereport(WARNING, (errmsg("failed to commit prepared transaction '%s'",
transactionName->data),
errhint("Run COMMIT TRANSACTION '%s' on %s:%s",
transactionName->data, nodeName, nodePort)));
errhint("Run \"%s\" on %s:%s",
command->data, nodeName, nodePort)));
}
}
else
@ -165,7 +182,10 @@ CommitTransactions(List *connectionList)
result = PQexec(connection, "COMMIT");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
ereport(WARNING, (errmsg("Failed to commit transaction on %s:%s",
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
ereport(WARNING, (errmsg("failed to commit transaction on %s:%s",
nodeName, nodePort)));
}
}
@ -178,7 +198,14 @@ CommitTransactions(List *connectionList)
/*
* BuildTransactionName constructs a unique transaction name from an ID.
* BuildTransactionName constructs a transaction name that ensures there are no
* collisions with concurrent transactions by the same master node, subsequent
* transactions by the same backend, or transactions on a different shard.
*
* Collisions may occur over time if transactions fail to commit or abort and
* are left to linger. This would cause a PREPARE failure for the second
* transaction, which causes it to be rolled back. In general, the user
* should ensure that prepared transactions do not linger.
*/
static StringInfo
BuildTransactionName(int connectionId)
@ -186,7 +213,7 @@ BuildTransactionName(int connectionId)
StringInfo commandString = makeStringInfo();
appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid,
GetCurrentTransactionId(), connectionId);
DistributedTransactionId, connectionId);
return commandString;
}

View File

@ -29,6 +29,7 @@
#include "catalog/pg_collation.h"
#include "commands/copy.h"
#include "commands/defrem.h"
#include "distributed/multi_copy.h"
#include "distributed/resource_lock.h"
#include "distributed/transmit.h"
#include "distributed/worker_protocol.h"
@ -45,7 +46,6 @@ bool BinaryWorkerCopyFormat = false; /* binary format for copying between work
int PartitionBufferSize = 16384; /* total partitioning buffer size in KB */
/* Local variables */
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
static uint32 FileBufferSizeInBytes = 0; /* file buffer size to init later */
@ -64,21 +64,10 @@ static void FilterAndPartitionTable(const char *filterQuery,
FileOutputStream *partitionFileArray,
uint32 fileCount);
static int ColumnIndex(TupleDesc rowDescriptor, const char *columnName);
static FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
static PartialCopyState InitRowOutputState(void);
static void ClearRowOutputState(PartialCopyState copyState);
static void OutputRow(HeapTuple row, TupleDesc rowDescriptor,
PartialCopyState rowOutputState, FmgrInfo *columnOutputFunctions);
static CopyOutState InitRowOutputState(void);
static void ClearRowOutputState(CopyOutState copyState);
static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount);
static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount);
static void CopySendData(PartialCopyState outputState, const void *databuf, int datasize);
static void CopySendString(PartialCopyState outputState, const char *str);
static void CopySendChar(PartialCopyState outputState, char c);
static void CopySendInt32(PartialCopyState outputState, int32 val);
static void CopySendInt16(PartialCopyState outputState, int16 val);
static void CopyAttributeOutText(PartialCopyState outputState, char *string);
static inline void CopyFlushOutput(PartialCopyState outputState,
char *start, char *pointer);
static uint32 RangePartitionId(Datum partitionValue, const void *context);
static uint32 HashPartitionId(Datum partitionValue, const void *context);
@ -731,13 +720,16 @@ FilterAndPartitionTable(const char *filterQuery,
FileOutputStream *partitionFileArray,
uint32 fileCount)
{
PartialCopyState rowOutputState = NULL;
CopyOutState rowOutputState = NULL;
FmgrInfo *columnOutputFunctions = NULL;
int partitionColumnIndex = 0;
Oid partitionColumnTypeId = InvalidOid;
Portal queryPortal = NULL;
int connected = 0;
int finished = 0;
uint32 columnCount = 0;
Datum *valueArray = NULL;
bool *isNullArray = NULL;
const char *noPortalName = NULL;
const bool readOnly = true;
@ -784,6 +776,10 @@ FilterAndPartitionTable(const char *filterQuery,
OutputBinaryHeaders(partitionFileArray, fileCount);
}
columnCount = (uint32) SPI_tuptable->tupdesc->natts;
valueArray = (Datum *) palloc0(columnCount * sizeof(Datum));
isNullArray = (bool *) palloc0(columnCount * sizeof(bool));
while (SPI_processed > 0)
{
int rowIndex = 0;
@ -815,13 +811,19 @@ FilterAndPartitionTable(const char *filterQuery,
partitionId = 0;
}
OutputRow(row, rowDescriptor, rowOutputState, columnOutputFunctions);
/* deconstruct the tuple; this is faster than repeated heap_getattr */
heap_deform_tuple(row, rowDescriptor, valueArray, isNullArray);
AppendCopyRowData(valueArray, isNullArray, rowDescriptor,
rowOutputState, columnOutputFunctions);
rowText = rowOutputState->fe_msgbuf;
partitionFile = partitionFileArray[partitionId];
FileOutputStreamWrite(partitionFile, rowText);
resetStringInfo(rowText);
MemoryContextReset(rowOutputState->rowcontext);
}
SPI_freetuptable(SPI_tuptable);
@ -829,6 +831,9 @@ FilterAndPartitionTable(const char *filterQuery,
SPI_cursor_fetch(queryPortal, fetchForward, prefetchCount);
}
pfree(valueArray);
pfree(isNullArray);
SPI_cursor_close(queryPortal);
if (BinaryWorkerCopyFormat)
@ -866,44 +871,6 @@ ColumnIndex(TupleDesc rowDescriptor, const char *columnName)
}
/*
* ColumnOutputFunctions walks over a table's columns, and finds each column's
* type information. The function then resolves each type's output function,
* and stores and returns these output functions in an array.
*/
static FmgrInfo *
ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
{
uint32 columnCount = (uint32) rowDescriptor->natts;
FmgrInfo *columnOutputFunctions = palloc0(columnCount * sizeof(FmgrInfo));
uint32 columnIndex = 0;
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
FmgrInfo *currentOutputFunction = &columnOutputFunctions[columnIndex];
Form_pg_attribute currentColumn = rowDescriptor->attrs[columnIndex];
Oid columnTypeId = currentColumn->atttypid;
Oid outputFunctionId = InvalidOid;
bool typeVariableLength = false;
if (binaryFormat)
{
getTypeBinaryOutputInfo(columnTypeId, &outputFunctionId, &typeVariableLength);
}
else
{
getTypeOutputInfo(columnTypeId, &outputFunctionId, &typeVariableLength);
}
Assert(currentColumn->attisdropped == false);
fmgr_info(outputFunctionId, currentOutputFunction);
}
return columnOutputFunctions;
}
/*
* InitRowOutputState creates and initializes a copy state object. This object
* is internal to the copy command's implementation in Postgres; and we refactor
@ -914,11 +881,10 @@ ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
* must match one another. Therefore, any changes to the default values in the
* copy command must be propagated to this function.
*/
static PartialCopyState
static CopyOutState
InitRowOutputState(void)
{
PartialCopyState rowOutputState =
(PartialCopyState) palloc0(sizeof(PartialCopyStateData));
CopyOutState rowOutputState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
int fileEncoding = pg_get_client_encoding();
int databaseEncoding = GetDatabaseEncoding();
@ -975,7 +941,7 @@ InitRowOutputState(void)
/* Clears copy state used for outputting row data. */
static void
ClearRowOutputState(PartialCopyState rowOutputState)
ClearRowOutputState(CopyOutState rowOutputState)
{
Assert(rowOutputState != NULL);
@ -990,98 +956,6 @@ ClearRowOutputState(PartialCopyState rowOutputState)
}
/*
* OutputRow serializes one row using the column output functions,
* and appends the data to the row output state object's message buffer.
* This function is modeled after the CopyOneRowTo() function in
* commands/copy.c, but only implements a subset of that functionality.
*/
static void
OutputRow(HeapTuple row, TupleDesc rowDescriptor,
PartialCopyState rowOutputState, FmgrInfo *columnOutputFunctions)
{
MemoryContext oldContext = NULL;
uint32 columnIndex = 0;
uint32 columnCount = (uint32) rowDescriptor->natts;
Datum *valueArray = (Datum *) palloc0(columnCount * sizeof(Datum));
bool *isNullArray = (bool *) palloc0(columnCount * sizeof(bool));
/* deconstruct the tuple; this is faster than repeated heap_getattr */
heap_deform_tuple(row, rowDescriptor, valueArray, isNullArray);
/* reset previous tuple's output data, and the temporary memory context */
resetStringInfo(rowOutputState->fe_msgbuf);
MemoryContextReset(rowOutputState->rowcontext);
oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext);
if (rowOutputState->binary)
{
CopySendInt16(rowOutputState, rowDescriptor->natts);
}
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
Datum value = valueArray[columnIndex];
bool isNull = isNullArray[columnIndex];
bool lastColumn = false;
if (rowOutputState->binary)
{
if (!isNull)
{
FmgrInfo *outputFunctionPointer = &columnOutputFunctions[columnIndex];
bytea *outputBytes = SendFunctionCall(outputFunctionPointer, value);
CopySendInt32(rowOutputState, VARSIZE(outputBytes) - VARHDRSZ);
CopySendData(rowOutputState, VARDATA(outputBytes),
VARSIZE(outputBytes) - VARHDRSZ);
}
else
{
CopySendInt32(rowOutputState, -1);
}
}
else
{
if (!isNull)
{
FmgrInfo *outputFunctionPointer = &columnOutputFunctions[columnIndex];
char *columnText = OutputFunctionCall(outputFunctionPointer, value);
CopyAttributeOutText(rowOutputState, columnText);
}
else
{
CopySendString(rowOutputState, rowOutputState->null_print_client);
}
lastColumn = ((columnIndex + 1) == columnCount);
if (!lastColumn)
{
CopySendChar(rowOutputState, rowOutputState->delim[0]);
}
}
}
if (!rowOutputState->binary)
{
/* append default line termination string depending on the platform */
#ifndef WIN32
CopySendChar(rowOutputState, '\n');
#else
CopySendString(rowOutputState, "\r\n");
#endif
}
MemoryContextSwitchTo(oldContext);
pfree(valueArray);
pfree(isNullArray);
}
/*
* Write the header of postgres' binary serialization format to each partition file.
* This function is used when binary_worker_copy_format is enabled.
@ -1093,22 +967,14 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount)
for (fileIndex = 0; fileIndex < fileCount; fileIndex++)
{
/* Generate header for a binary copy */
const int32 zero = 0;
FileOutputStream partitionFile = { 0, 0, 0 };
PartialCopyStateData headerOutputStateData;
PartialCopyState headerOutputState = (PartialCopyState) & headerOutputStateData;
CopyOutStateData headerOutputStateData;
CopyOutState headerOutputState = (CopyOutState) & headerOutputStateData;
memset(headerOutputState, 0, sizeof(PartialCopyStateData));
memset(headerOutputState, 0, sizeof(CopyOutStateData));
headerOutputState->fe_msgbuf = makeStringInfo();
/* Signature */
CopySendData(headerOutputState, BinarySignature, 11);
/* Flags field (no OIDs) */
CopySendInt32(headerOutputState, zero);
/* No header extension */
CopySendInt32(headerOutputState, zero);
AppendCopyBinaryHeaders(headerOutputState);
partitionFile = partitionFileArray[fileIndex];
FileOutputStreamWrite(partitionFile, headerOutputState->fe_msgbuf);
@ -1127,15 +993,14 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount)
for (fileIndex = 0; fileIndex < fileCount; fileIndex++)
{
/* Generate footer for a binary copy */
int16 negative = -1;
FileOutputStream partitionFile = { 0, 0, 0 };
PartialCopyStateData footerOutputStateData;
PartialCopyState footerOutputState = (PartialCopyState) & footerOutputStateData;
CopyOutStateData footerOutputStateData;
CopyOutState footerOutputState = (CopyOutState) & footerOutputStateData;
memset(footerOutputState, 0, sizeof(PartialCopyStateData));
memset(footerOutputState, 0, sizeof(CopyOutStateData));
footerOutputState->fe_msgbuf = makeStringInfo();
CopySendInt16(footerOutputState, negative);
AppendCopyBinaryFooters(footerOutputState);
partitionFile = partitionFileArray[fileIndex];
FileOutputStreamWrite(partitionFile, footerOutputState->fe_msgbuf);
@ -1143,158 +1008,6 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount)
}
/* *INDENT-OFF* */
/* Append data to the copy buffer in outputState */
static void
CopySendData(PartialCopyState outputState, const void *databuf, int datasize)
{
appendBinaryStringInfo(outputState->fe_msgbuf, databuf, datasize);
}
/* Append a striong to the copy buffer in outputState. */
static void
CopySendString(PartialCopyState outputState, const char *str)
{
appendBinaryStringInfo(outputState->fe_msgbuf, str, strlen(str));
}
/* Append a char to the copy buffer in outputState. */
static void
CopySendChar(PartialCopyState outputState, char c)
{
appendStringInfoCharMacro(outputState->fe_msgbuf, c);
}
/* Append an int32 to the copy buffer in outputState. */
static void
CopySendInt32(PartialCopyState outputState, int32 val)
{
uint32 buf = htonl((uint32) val);
CopySendData(outputState, &buf, sizeof(buf));
}
/* Append an int16 to the copy buffer in outputState. */
static void
CopySendInt16(PartialCopyState outputState, int16 val)
{
uint16 buf = htons((uint16) val);
CopySendData(outputState, &buf, sizeof(buf));
}
/*
* Send text representation of one column, with conversion and escaping.
*
* NB: This function is based on commands/copy.c and doesn't fully conform to
* our coding style. The function should be kept in sync with copy.c.
*/
static void
CopyAttributeOutText(PartialCopyState cstate, char *string)
{
char *pointer = NULL;
char *start = NULL;
char c = '\0';
char delimc = cstate->delim[0];
if (cstate->need_transcoding)
{
pointer = pg_server_to_any(string, strlen(string), cstate->file_encoding);
}
else
{
pointer = string;
}
/*
* We have to grovel through the string searching for control characters
* and instances of the delimiter character. In most cases, though, these
* are infrequent. To avoid overhead from calling CopySendData once per
* character, we dump out all characters between escaped characters in a
* single call. The loop invariant is that the data from "start" to "pointer"
* can be sent literally, but hasn't yet been.
*
* As all encodings here are safe, i.e. backend supported ones, we can
* skip doing pg_encoding_mblen(), because in valid backend encodings,
* extra bytes of a multibyte character never look like ASCII.
*/
start = pointer;
while ((c = *pointer) != '\0')
{
if ((unsigned char) c < (unsigned char) 0x20)
{
/*
* \r and \n must be escaped, the others are traditional. We
* prefer to dump these using the C-like notation, rather than
* a backslash and the literal character, because it makes the
* dump file a bit more proof against Microsoftish data
* mangling.
*/
switch (c)
{
case '\b':
c = 'b';
break;
case '\f':
c = 'f';
break;
case '\n':
c = 'n';
break;
case '\r':
c = 'r';
break;
case '\t':
c = 't';
break;
case '\v':
c = 'v';
break;
default:
/* If it's the delimiter, must backslash it */
if (c == delimc)
break;
/* All ASCII control chars are length 1 */
pointer++;
continue; /* fall to end of loop */
}
/* if we get here, we need to convert the control char */
CopyFlushOutput(cstate, start, pointer);
CopySendChar(cstate, '\\');
CopySendChar(cstate, c);
start = ++pointer; /* do not include char in next run */
}
else if (c == '\\' || c == delimc)
{
CopyFlushOutput(cstate, start, pointer);
CopySendChar(cstate, '\\');
start = pointer++; /* we include char in next run */
}
else
{
pointer++;
}
}
CopyFlushOutput(cstate, start, pointer);
}
/* *INDENT-ON* */
/* Helper function to send pending copy output */
static inline void
CopyFlushOutput(PartialCopyState cstate, char *start, char *pointer)
{
if (pointer > start)
{
CopySendData(cstate, start, pointer - start);
}
}
/* Helper function that invokes a function with the default collation oid. */
Datum
CompareCall2(FmgrInfo *functionInfo, Datum leftArgument, Datum rightArgument)

View File

@ -20,7 +20,36 @@
extern int CopyTransactionManager;
/*
* A smaller version of copy.c's CopyStateData, trimmed to the elements
* necessary to copy out results. While it'd be a bit nicer to share code,
* it'd require changing core postgres code.
*/
typedef struct CopyOutStateData
{
StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
* dest == COPY_NEW_FE in COPY FROM */
int file_encoding; /* file or remote side's character encoding */
bool need_transcoding; /* file encoding diff from server? */
bool binary; /* binary format? */
char *null_print; /* NULL marker string (server encoding!) */
char *null_print_client; /* same converted to file encoding */
char *delim; /* column delimiter (must be 1 byte) */
MemoryContext rowcontext; /* per-row evaluation context */
} CopyOutStateData;
typedef struct CopyOutStateData *CopyOutState;
/* function declarations for copying into a distributed table */
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
TupleDesc rowDescriptor,
CopyOutState rowOutputState,
FmgrInfo *columnOutputFunctions);
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);

View File

@ -48,9 +48,10 @@ typedef struct TransactionConnection
/* Functions declarations for transaction and connection management */
extern void PrepareTransactions(List *connectionList);
extern void AbortTransactions(List *connectionList);
extern void CommitTransactions(List *connectionList);
extern void InitializeDistributedTransaction(void);
extern void PrepareRemoteTransactions(List *connectionList);
extern void AbortRemoteTransactions(List *connectionList);
extern void CommitRemoteTransactions(List *connectionList);
extern void CloseConnections(List *connectionList);

View File

@ -79,28 +79,6 @@ typedef struct HashPartitionContext
} HashPartitionContext;
/*
* A smaller version of copy.c's CopyStateData, trimmed to the elements
* necessary for re-partition jobs. While it'd be a bit nicer to share code,
* it'd require changing core postgres code.
*/
typedef struct PartialCopyStateData
{
StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
* dest == COPY_NEW_FE in COPY FROM */
int file_encoding; /* file or remote side's character encoding */
bool need_transcoding; /* file encoding diff from server? */
bool binary; /* binary format? */
char *null_print; /* NULL marker string (server encoding!) */
char *null_print_client; /* same converted to file encoding */
char *delim; /* column delimiter (must be 1 byte) */
MemoryContext rowcontext; /* per-row evaluation context */
} PartialCopyStateData;
typedef struct PartialCopyStateData *PartialCopyState;
/*
* FileOutputStream helps buffer write operations to a file; these writes are
* then regularly flushed to the underlying file. This structure differs from

View File

@ -104,6 +104,54 @@ SELECT count(*) FROM customer_copy_hash;
-- Confirm that data was copied
SELECT count(*) FROM customer_copy_hash;
-- Create a new hash-partitioned table with default now() function
CREATE TABLE customer_with_default(
c_custkey integer,
c_name varchar(25) not null,
c_time timestamp default now());
SELECT master_create_distributed_table('customer_with_default', 'c_custkey', 'hash');
SELECT master_create_worker_shards('customer_with_default', 64, 1);
-- Test with default values for now() function
COPY customer_with_default (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
1,customer1
2,customer2
\.
-- Confirm that data was copied with now() function
SELECT count(*) FROM customer_with_default where c_time IS NOT NULL;
-- Add columns to the table and perform a COPY
ALTER TABLE customer_copy_hash ADD COLUMN extra1 INT DEFAULT 0;
ALTER TABLE customer_copy_hash ADD COLUMN extra2 INT DEFAULT 0;
COPY customer_copy_hash (c_custkey, c_name, extra1, extra2) FROM STDIN CSV;
10,customer10,1,5
\.
SELECT * FROM customer_copy_hash WHERE extra1 = 1;
-- Test dropping an intermediate column
ALTER TABLE customer_copy_hash DROP COLUMN extra1;
COPY customer_copy_hash (c_custkey, c_name, extra2) FROM STDIN CSV;
11,customer11,5
\.
SELECT * FROM customer_copy_hash WHERE c_custkey = 11;
-- Test dropping the last column
ALTER TABLE customer_copy_hash DROP COLUMN extra2;
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN CSV;
12,customer12
\.
SELECT * FROM customer_copy_hash WHERE c_custkey = 12;
-- Create a new range-partitioned table into which to COPY
CREATE TABLE customer_copy_range (
c_custkey integer,

View File

@ -100,7 +100,6 @@ test: multi_append_table_to_shard
# ---------
test: multi_outer_join
#
# ---
# Tests covering mostly modification queries and required preliminary
# functionality related to metadata, shard creation, shard pruning and

View File

@ -20,7 +20,7 @@ SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash'
-- Test COPY into empty hash-partitioned table
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
ERROR: could not find any shards for query
ERROR: could not find any shards into which to copy
DETAIL: No shards exist for distributed table "customer_copy_hash".
HINT: Run master_create_worker_shards to create shards and try again.
SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
@ -47,7 +47,6 @@ COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_103160"
DETAIL: Key (c_custkey)=(2) already exists.
CONTEXT: COPY customer_copy_hash, line 4: ""
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
count
@ -90,7 +89,6 @@ COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
ERROR: null value in column "c_name" violates not-null constraint
DETAIL: Failing row contains (8, null, null, null, null, null, null, null).
CONTEXT: COPY customer_copy_hash, line 4: ""
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
count
@ -126,6 +124,61 @@ SELECT count(*) FROM customer_copy_hash;
2006
(1 row)
-- Create a new hash-partitioned table with default now() function
CREATE TABLE customer_with_default(
c_custkey integer,
c_name varchar(25) not null,
c_time timestamp default now());
SELECT master_create_distributed_table('customer_with_default', 'c_custkey', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('customer_with_default', 64, 1);
master_create_worker_shards
-----------------------------
(1 row)
-- Test with default values for now() function
COPY customer_with_default (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
-- Confirm that data was copied with now() function
SELECT count(*) FROM customer_with_default where c_time IS NOT NULL;
count
-------
2
(1 row)
-- Add columns to the table and perform a COPY
ALTER TABLE customer_copy_hash ADD COLUMN extra1 INT DEFAULT 0;
ALTER TABLE customer_copy_hash ADD COLUMN extra2 INT DEFAULT 0;
COPY customer_copy_hash (c_custkey, c_name, extra1, extra2) FROM STDIN CSV;
SELECT * FROM customer_copy_hash WHERE extra1 = 1;
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra1 | extra2
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------+--------+--------
10 | customer10 | | | | | | | 1 | 5
(1 row)
-- Test dropping an intermediate column
ALTER TABLE customer_copy_hash DROP COLUMN extra1;
COPY customer_copy_hash (c_custkey, c_name, extra2) FROM STDIN CSV;
SELECT * FROM customer_copy_hash WHERE c_custkey = 11;
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra2
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------+--------
11 | customer11 | | | | | | | 5
(1 row)
-- Test dropping the last column
ALTER TABLE customer_copy_hash DROP COLUMN extra2;
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN CSV;
SELECT * FROM customer_copy_hash WHERE c_custkey = 12;
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------
12 | customer12 | | | | | |
(1 row)
-- Create a new range-partitioned table into which to COPY
CREATE TABLE customer_copy_range (
c_custkey integer,
@ -145,7 +198,7 @@ SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'rang
-- Test COPY into empty range-partitioned table
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
ERROR: could not find any shards for query
ERROR: could not find any shards into which to copy
DETAIL: No shards exist for distributed table "customer_copy_range".
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
\gset