mirror of https://github.com/citusdata/citus.git
Merge pull request #416 from citusdata/feature/copy-for-hash-partitioning
Add COPY support for hash- and range-partitioned tablespull/432/head
commit
174aa64f14
File diff suppressed because it is too large
Load Diff
|
@ -17,6 +17,7 @@
|
|||
#include "commands/tablecmds.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/multi_utility.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/transmit.h"
|
||||
|
@ -50,7 +51,7 @@ static bool IsTransmitStmt(Node *parsetree);
|
|||
static void VerifyTransmitStmt(CopyStmt *copyStatement);
|
||||
|
||||
/* Local functions forward declarations for processing distributed table commands */
|
||||
static Node * ProcessCopyStmt(CopyStmt *copyStatement);
|
||||
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag);
|
||||
static Node * ProcessIndexStmt(IndexStmt *createIndexStatement,
|
||||
const char *createIndexCommand);
|
||||
static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement,
|
||||
|
@ -122,7 +123,12 @@ multi_ProcessUtility(Node *parsetree,
|
|||
|
||||
if (IsA(parsetree, CopyStmt))
|
||||
{
|
||||
parsetree = ProcessCopyStmt((CopyStmt *) parsetree);
|
||||
parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag);
|
||||
|
||||
if (parsetree == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (IsA(parsetree, IndexStmt))
|
||||
|
@ -297,10 +303,12 @@ VerifyTransmitStmt(CopyStmt *copyStatement)
|
|||
|
||||
/*
|
||||
* ProcessCopyStmt handles Citus specific concerns for COPY like supporting
|
||||
* COPYing from distributed tables and preventing unsupported actions.
|
||||
* COPYing from distributed tables and preventing unsupported actions. The
|
||||
* function returns a modified COPY statement to be executed, or NULL if no
|
||||
* further processing is needed.
|
||||
*/
|
||||
static Node *
|
||||
ProcessCopyStmt(CopyStmt *copyStatement)
|
||||
ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag)
|
||||
{
|
||||
/*
|
||||
* We first check if we have a "COPY (query) TO filename". If we do, copy doesn't
|
||||
|
@ -344,9 +352,8 @@ ProcessCopyStmt(CopyStmt *copyStatement)
|
|||
{
|
||||
if (copyStatement->is_from)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot execute COPY FROM on a distributed table "
|
||||
"on master node")));
|
||||
CitusCopyFrom(copyStatement, completionTag);
|
||||
return NULL;
|
||||
}
|
||||
else if (!copyStatement->is_from)
|
||||
{
|
||||
|
@ -583,6 +590,7 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement)
|
|||
{
|
||||
RangeVar *relation = createIndexStatement->relation;
|
||||
bool missingOk = false;
|
||||
|
||||
/* caller uses ShareLock for non-concurrent indexes, use the same lock here */
|
||||
LOCKMODE lockMode = ShareLock;
|
||||
Oid relationId = RangeVarGetRelid(relation, lockMode, missingOk);
|
||||
|
|
|
@ -20,13 +20,15 @@
|
|||
#include "executor/executor.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/modify_planner.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_explain.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_logical_optimizer.h"
|
||||
#include "distributed/multi_planner.h"
|
||||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/multi_transaction.h"
|
||||
#include "distributed/multi_utility.h"
|
||||
#include "distributed/task_tracker.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
|
@ -67,6 +69,12 @@ static const struct config_enum_entry shard_placement_policy_options[] = {
|
|||
{ NULL, 0, false }
|
||||
};
|
||||
|
||||
static const struct config_enum_entry transaction_manager_options[] = {
|
||||
{ "1pc", TRANSACTION_MANAGER_1PC, false },
|
||||
{ "2pc", TRANSACTION_MANAGER_2PC, false },
|
||||
{ NULL, 0, false }
|
||||
};
|
||||
|
||||
|
||||
/* shared library initialization function */
|
||||
void
|
||||
|
@ -437,6 +445,21 @@ RegisterCitusConfigVariables(void)
|
|||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomEnumVariable(
|
||||
"citus.copy_transaction_manager",
|
||||
gettext_noop("Sets the transaction manager for COPY into distributed tables."),
|
||||
gettext_noop("When a failure occurs during when copying into a distributed "
|
||||
"table, 2PC is required to ensure data is never lost. Change "
|
||||
"this setting to '2pc' from its default '1pc' to enable 2PC."
|
||||
"You must also set max_prepared_transactions on the worker "
|
||||
"nodes. Recovery from failed 2PCs is currently manual."),
|
||||
&CopyTransactionManager,
|
||||
TRANSACTION_MANAGER_1PC,
|
||||
transaction_manager_options,
|
||||
PGC_USERSET,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomEnumVariable(
|
||||
"citus.task_assignment_policy",
|
||||
gettext_noop("Sets the policy to use when assigning tasks to worker nodes."),
|
||||
|
|
|
@ -39,8 +39,6 @@ static HTAB *NodeConnectionHash = NULL;
|
|||
|
||||
/* local function forward declarations */
|
||||
static HTAB * CreateNodeConnectionHash(void);
|
||||
static PGconn * ConnectToNode(char *nodeName, char *nodePort);
|
||||
static char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -99,10 +97,7 @@ GetOrEstablishConnection(char *nodeName, int32 nodePort)
|
|||
|
||||
if (needNewConnection)
|
||||
{
|
||||
StringInfo nodePortString = makeStringInfo();
|
||||
appendStringInfo(nodePortString, "%d", nodePort);
|
||||
|
||||
connection = ConnectToNode(nodeName, nodePortString->data);
|
||||
connection = ConnectToNode(nodeName, nodePort);
|
||||
if (connection != NULL)
|
||||
{
|
||||
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
|
||||
|
@ -264,8 +259,8 @@ CreateNodeConnectionHash(void)
|
|||
* We attempt to connect up to MAX_CONNECT_ATTEMPT times. After that we give up
|
||||
* and return NULL.
|
||||
*/
|
||||
static PGconn *
|
||||
ConnectToNode(char *nodeName, char *nodePort)
|
||||
PGconn *
|
||||
ConnectToNode(char *nodeName, int32 nodePort)
|
||||
{
|
||||
PGconn *connection = NULL;
|
||||
const char *clientEncoding = GetDatabaseEncodingName();
|
||||
|
@ -276,11 +271,14 @@ ConnectToNode(char *nodeName, char *nodePort)
|
|||
"host", "port", "fallback_application_name",
|
||||
"client_encoding", "connect_timeout", "dbname", NULL
|
||||
};
|
||||
char nodePortString[12];
|
||||
const char *valueArray[] = {
|
||||
nodeName, nodePort, "citus", clientEncoding,
|
||||
nodeName, nodePortString, "citus", clientEncoding,
|
||||
CLIENT_CONNECT_TIMEOUT_SECONDS, dbname, NULL
|
||||
};
|
||||
|
||||
sprintf(nodePortString, "%d", nodePort);
|
||||
|
||||
Assert(sizeof(keywordArray) == sizeof(valueArray));
|
||||
|
||||
for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++)
|
||||
|
@ -313,7 +311,7 @@ ConnectToNode(char *nodeName, char *nodePort)
|
|||
* The function returns NULL if the connection has no setting for an option with
|
||||
* the provided keyword.
|
||||
*/
|
||||
static char *
|
||||
char *
|
||||
ConnectionGetOptionValue(PGconn *connection, char *optionKeyword)
|
||||
{
|
||||
char *optionValue = NULL;
|
||||
|
|
|
@ -0,0 +1,238 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* multi_transaction.c
|
||||
* This file contains functions for managing 1PC or 2PC transactions
|
||||
* across many shard placements.
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "libpq-fe.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "distributed/connection_cache.h"
|
||||
#include "distributed/multi_transaction.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
||||
|
||||
/* Local functions forward declarations */
|
||||
static uint32 DistributedTransactionId = 0;
|
||||
|
||||
|
||||
/* Local functions forward declarations */
|
||||
static StringInfo BuildTransactionName(int connectionId);
|
||||
|
||||
|
||||
/*
|
||||
* InitializeDistributedTransaction prepares the distributed transaction ID
|
||||
* used in transaction names.
|
||||
*/
|
||||
void
|
||||
InitializeDistributedTransaction(void)
|
||||
{
|
||||
DistributedTransactionId++;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PrepareRemoteTransactions prepares all transactions on connections in
|
||||
* connectionList for commit if the 2PC transaction manager is enabled.
|
||||
* On failure, it reports an error and stops.
|
||||
*/
|
||||
void
|
||||
PrepareRemoteTransactions(List *connectionList)
|
||||
{
|
||||
ListCell *connectionCell = NULL;
|
||||
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
TransactionConnection *transactionConnection =
|
||||
(TransactionConnection *) lfirst(connectionCell);
|
||||
PGconn *connection = transactionConnection->connection;
|
||||
int64 connectionId = transactionConnection->connectionId;
|
||||
|
||||
PGresult *result = NULL;
|
||||
StringInfo command = makeStringInfo();
|
||||
StringInfo transactionName = BuildTransactionName(connectionId);
|
||||
|
||||
appendStringInfo(command, "PREPARE TRANSACTION '%s'", transactionName->data);
|
||||
|
||||
result = PQexec(connection, command->data);
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
/* a failure to prepare is an implicit rollback */
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
|
||||
|
||||
ReportRemoteError(connection, result);
|
||||
PQclear(result);
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
||||
errmsg("failed to prepare transaction")));
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_PREPARED;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AbortRemoteTransactions aborts all transactions on connections in connectionList.
|
||||
* On failure, it reports a warning and continues to abort all of them.
|
||||
*/
|
||||
void
|
||||
AbortRemoteTransactions(List *connectionList)
|
||||
{
|
||||
ListCell *connectionCell = NULL;
|
||||
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
TransactionConnection *transactionConnection =
|
||||
(TransactionConnection *) lfirst(connectionCell);
|
||||
PGconn *connection = transactionConnection->connection;
|
||||
int64 connectionId = transactionConnection->connectionId;
|
||||
PGresult *result = NULL;
|
||||
|
||||
if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED)
|
||||
{
|
||||
StringInfo command = makeStringInfo();
|
||||
StringInfo transactionName = BuildTransactionName(connectionId);
|
||||
|
||||
appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName->data);
|
||||
|
||||
result = PQexec(connection, command->data);
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||
|
||||
/* log a warning so the user may abort the transaction later */
|
||||
ereport(WARNING, (errmsg("failed to roll back prepared transaction '%s'",
|
||||
transactionName->data),
|
||||
errhint("Run \"%s\" on %s:%s",
|
||||
command->data, nodeName, nodePort)));
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
}
|
||||
else if (transactionConnection->transactionState == TRANSACTION_STATE_OPEN)
|
||||
{
|
||||
/* try to roll back cleanly, if it fails then we won't commit anyway */
|
||||
result = PQexec(connection, "ROLLBACK");
|
||||
PQclear(result);
|
||||
}
|
||||
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CommitRemoteTransactions commits all transactions on connections in connectionList.
|
||||
* On failure, it reports a warning and continues committing all of them.
|
||||
*/
|
||||
void
|
||||
CommitRemoteTransactions(List *connectionList)
|
||||
{
|
||||
ListCell *connectionCell = NULL;
|
||||
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
TransactionConnection *transactionConnection =
|
||||
(TransactionConnection *) lfirst(connectionCell);
|
||||
PGconn *connection = transactionConnection->connection;
|
||||
int64 connectionId = transactionConnection->connectionId;
|
||||
PGresult *result = NULL;
|
||||
|
||||
if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED)
|
||||
{
|
||||
StringInfo command = makeStringInfo();
|
||||
StringInfo transactionName = BuildTransactionName(connectionId);
|
||||
|
||||
/* we shouldn't be committing if any transactions are not prepared */
|
||||
Assert(transactionConnection->transactionState == TRANSACTION_STATE_PREPARED);
|
||||
|
||||
appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName->data);
|
||||
|
||||
result = PQexec(connection, command->data);
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||
|
||||
/* log a warning so the user may commit the transaction later */
|
||||
ereport(WARNING, (errmsg("failed to commit prepared transaction '%s'",
|
||||
transactionName->data),
|
||||
errhint("Run \"%s\" on %s:%s",
|
||||
command->data, nodeName, nodePort)));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* we shouldn't be committing if any transactions are not open */
|
||||
Assert(transactionConnection->transactionState == TRANSACTION_STATE_OPEN);
|
||||
|
||||
/* try to commit, if it fails then the user might lose data */
|
||||
result = PQexec(connection, "COMMIT");
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||
|
||||
ereport(WARNING, (errmsg("failed to commit transaction on %s:%s",
|
||||
nodeName, nodePort)));
|
||||
}
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildTransactionName constructs a transaction name that ensures there are no
|
||||
* collisions with concurrent transactions by the same master node, subsequent
|
||||
* transactions by the same backend, or transactions on a different shard.
|
||||
*
|
||||
* Collisions may occur over time if transactions fail to commit or abort and
|
||||
* are left to linger. This would cause a PREPARE failure for the second
|
||||
* transaction, which causes it to be rolled back. In general, the user
|
||||
* should ensure that prepared transactions do not linger.
|
||||
*/
|
||||
static StringInfo
|
||||
BuildTransactionName(int connectionId)
|
||||
{
|
||||
StringInfo commandString = makeStringInfo();
|
||||
|
||||
appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid,
|
||||
DistributedTransactionId, connectionId);
|
||||
|
||||
return commandString;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CloseConnections closes all connections in connectionList.
|
||||
*/
|
||||
void
|
||||
CloseConnections(List *connectionList)
|
||||
{
|
||||
ListCell *connectionCell = NULL;
|
||||
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
TransactionConnection *transactionConnection =
|
||||
(TransactionConnection *) lfirst(connectionCell);
|
||||
PGconn *connection = transactionConnection->connection;
|
||||
|
||||
PQfinish(connection);
|
||||
}
|
||||
}
|
|
@ -29,6 +29,7 @@
|
|||
#include "catalog/pg_collation.h"
|
||||
#include "commands/copy.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/transmit.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
|
@ -45,7 +46,6 @@ bool BinaryWorkerCopyFormat = false; /* binary format for copying between work
|
|||
int PartitionBufferSize = 16384; /* total partitioning buffer size in KB */
|
||||
|
||||
/* Local variables */
|
||||
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
|
||||
static uint32 FileBufferSizeInBytes = 0; /* file buffer size to init later */
|
||||
|
||||
|
||||
|
@ -64,21 +64,10 @@ static void FilterAndPartitionTable(const char *filterQuery,
|
|||
FileOutputStream *partitionFileArray,
|
||||
uint32 fileCount);
|
||||
static int ColumnIndex(TupleDesc rowDescriptor, const char *columnName);
|
||||
static FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
||||
static PartialCopyState InitRowOutputState(void);
|
||||
static void ClearRowOutputState(PartialCopyState copyState);
|
||||
static void OutputRow(HeapTuple row, TupleDesc rowDescriptor,
|
||||
PartialCopyState rowOutputState, FmgrInfo *columnOutputFunctions);
|
||||
static CopyOutState InitRowOutputState(void);
|
||||
static void ClearRowOutputState(CopyOutState copyState);
|
||||
static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount);
|
||||
static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount);
|
||||
static void CopySendData(PartialCopyState outputState, const void *databuf, int datasize);
|
||||
static void CopySendString(PartialCopyState outputState, const char *str);
|
||||
static void CopySendChar(PartialCopyState outputState, char c);
|
||||
static void CopySendInt32(PartialCopyState outputState, int32 val);
|
||||
static void CopySendInt16(PartialCopyState outputState, int16 val);
|
||||
static void CopyAttributeOutText(PartialCopyState outputState, char *string);
|
||||
static inline void CopyFlushOutput(PartialCopyState outputState,
|
||||
char *start, char *pointer);
|
||||
static uint32 RangePartitionId(Datum partitionValue, const void *context);
|
||||
static uint32 HashPartitionId(Datum partitionValue, const void *context);
|
||||
|
||||
|
@ -731,13 +720,16 @@ FilterAndPartitionTable(const char *filterQuery,
|
|||
FileOutputStream *partitionFileArray,
|
||||
uint32 fileCount)
|
||||
{
|
||||
PartialCopyState rowOutputState = NULL;
|
||||
CopyOutState rowOutputState = NULL;
|
||||
FmgrInfo *columnOutputFunctions = NULL;
|
||||
int partitionColumnIndex = 0;
|
||||
Oid partitionColumnTypeId = InvalidOid;
|
||||
Portal queryPortal = NULL;
|
||||
int connected = 0;
|
||||
int finished = 0;
|
||||
uint32 columnCount = 0;
|
||||
Datum *valueArray = NULL;
|
||||
bool *isNullArray = NULL;
|
||||
|
||||
const char *noPortalName = NULL;
|
||||
const bool readOnly = true;
|
||||
|
@ -784,6 +776,10 @@ FilterAndPartitionTable(const char *filterQuery,
|
|||
OutputBinaryHeaders(partitionFileArray, fileCount);
|
||||
}
|
||||
|
||||
columnCount = (uint32) SPI_tuptable->tupdesc->natts;
|
||||
valueArray = (Datum *) palloc0(columnCount * sizeof(Datum));
|
||||
isNullArray = (bool *) palloc0(columnCount * sizeof(bool));
|
||||
|
||||
while (SPI_processed > 0)
|
||||
{
|
||||
int rowIndex = 0;
|
||||
|
@ -815,13 +811,19 @@ FilterAndPartitionTable(const char *filterQuery,
|
|||
partitionId = 0;
|
||||
}
|
||||
|
||||
OutputRow(row, rowDescriptor, rowOutputState, columnOutputFunctions);
|
||||
/* deconstruct the tuple; this is faster than repeated heap_getattr */
|
||||
heap_deform_tuple(row, rowDescriptor, valueArray, isNullArray);
|
||||
|
||||
AppendCopyRowData(valueArray, isNullArray, rowDescriptor,
|
||||
rowOutputState, columnOutputFunctions);
|
||||
|
||||
rowText = rowOutputState->fe_msgbuf;
|
||||
|
||||
partitionFile = partitionFileArray[partitionId];
|
||||
FileOutputStreamWrite(partitionFile, rowText);
|
||||
|
||||
resetStringInfo(rowText);
|
||||
MemoryContextReset(rowOutputState->rowcontext);
|
||||
}
|
||||
|
||||
SPI_freetuptable(SPI_tuptable);
|
||||
|
@ -829,6 +831,9 @@ FilterAndPartitionTable(const char *filterQuery,
|
|||
SPI_cursor_fetch(queryPortal, fetchForward, prefetchCount);
|
||||
}
|
||||
|
||||
pfree(valueArray);
|
||||
pfree(isNullArray);
|
||||
|
||||
SPI_cursor_close(queryPortal);
|
||||
|
||||
if (BinaryWorkerCopyFormat)
|
||||
|
@ -866,44 +871,6 @@ ColumnIndex(TupleDesc rowDescriptor, const char *columnName)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ColumnOutputFunctions walks over a table's columns, and finds each column's
|
||||
* type information. The function then resolves each type's output function,
|
||||
* and stores and returns these output functions in an array.
|
||||
*/
|
||||
static FmgrInfo *
|
||||
ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
|
||||
{
|
||||
uint32 columnCount = (uint32) rowDescriptor->natts;
|
||||
FmgrInfo *columnOutputFunctions = palloc0(columnCount * sizeof(FmgrInfo));
|
||||
|
||||
uint32 columnIndex = 0;
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
{
|
||||
FmgrInfo *currentOutputFunction = &columnOutputFunctions[columnIndex];
|
||||
Form_pg_attribute currentColumn = rowDescriptor->attrs[columnIndex];
|
||||
Oid columnTypeId = currentColumn->atttypid;
|
||||
Oid outputFunctionId = InvalidOid;
|
||||
bool typeVariableLength = false;
|
||||
|
||||
if (binaryFormat)
|
||||
{
|
||||
getTypeBinaryOutputInfo(columnTypeId, &outputFunctionId, &typeVariableLength);
|
||||
}
|
||||
else
|
||||
{
|
||||
getTypeOutputInfo(columnTypeId, &outputFunctionId, &typeVariableLength);
|
||||
}
|
||||
|
||||
Assert(currentColumn->attisdropped == false);
|
||||
|
||||
fmgr_info(outputFunctionId, currentOutputFunction);
|
||||
}
|
||||
|
||||
return columnOutputFunctions;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InitRowOutputState creates and initializes a copy state object. This object
|
||||
* is internal to the copy command's implementation in Postgres; and we refactor
|
||||
|
@ -914,11 +881,10 @@ ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
|
|||
* must match one another. Therefore, any changes to the default values in the
|
||||
* copy command must be propagated to this function.
|
||||
*/
|
||||
static PartialCopyState
|
||||
static CopyOutState
|
||||
InitRowOutputState(void)
|
||||
{
|
||||
PartialCopyState rowOutputState =
|
||||
(PartialCopyState) palloc0(sizeof(PartialCopyStateData));
|
||||
CopyOutState rowOutputState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
|
||||
|
||||
int fileEncoding = pg_get_client_encoding();
|
||||
int databaseEncoding = GetDatabaseEncoding();
|
||||
|
@ -975,7 +941,7 @@ InitRowOutputState(void)
|
|||
|
||||
/* Clears copy state used for outputting row data. */
|
||||
static void
|
||||
ClearRowOutputState(PartialCopyState rowOutputState)
|
||||
ClearRowOutputState(CopyOutState rowOutputState)
|
||||
{
|
||||
Assert(rowOutputState != NULL);
|
||||
|
||||
|
@ -990,98 +956,6 @@ ClearRowOutputState(PartialCopyState rowOutputState)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* OutputRow serializes one row using the column output functions,
|
||||
* and appends the data to the row output state object's message buffer.
|
||||
* This function is modeled after the CopyOneRowTo() function in
|
||||
* commands/copy.c, but only implements a subset of that functionality.
|
||||
*/
|
||||
static void
|
||||
OutputRow(HeapTuple row, TupleDesc rowDescriptor,
|
||||
PartialCopyState rowOutputState, FmgrInfo *columnOutputFunctions)
|
||||
{
|
||||
MemoryContext oldContext = NULL;
|
||||
uint32 columnIndex = 0;
|
||||
|
||||
uint32 columnCount = (uint32) rowDescriptor->natts;
|
||||
Datum *valueArray = (Datum *) palloc0(columnCount * sizeof(Datum));
|
||||
bool *isNullArray = (bool *) palloc0(columnCount * sizeof(bool));
|
||||
|
||||
/* deconstruct the tuple; this is faster than repeated heap_getattr */
|
||||
heap_deform_tuple(row, rowDescriptor, valueArray, isNullArray);
|
||||
|
||||
/* reset previous tuple's output data, and the temporary memory context */
|
||||
resetStringInfo(rowOutputState->fe_msgbuf);
|
||||
|
||||
MemoryContextReset(rowOutputState->rowcontext);
|
||||
oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext);
|
||||
|
||||
if (rowOutputState->binary)
|
||||
{
|
||||
CopySendInt16(rowOutputState, rowDescriptor->natts);
|
||||
}
|
||||
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
{
|
||||
Datum value = valueArray[columnIndex];
|
||||
bool isNull = isNullArray[columnIndex];
|
||||
bool lastColumn = false;
|
||||
|
||||
if (rowOutputState->binary)
|
||||
{
|
||||
if (!isNull)
|
||||
{
|
||||
FmgrInfo *outputFunctionPointer = &columnOutputFunctions[columnIndex];
|
||||
bytea *outputBytes = SendFunctionCall(outputFunctionPointer, value);
|
||||
|
||||
CopySendInt32(rowOutputState, VARSIZE(outputBytes) - VARHDRSZ);
|
||||
CopySendData(rowOutputState, VARDATA(outputBytes),
|
||||
VARSIZE(outputBytes) - VARHDRSZ);
|
||||
}
|
||||
else
|
||||
{
|
||||
CopySendInt32(rowOutputState, -1);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!isNull)
|
||||
{
|
||||
FmgrInfo *outputFunctionPointer = &columnOutputFunctions[columnIndex];
|
||||
char *columnText = OutputFunctionCall(outputFunctionPointer, value);
|
||||
|
||||
CopyAttributeOutText(rowOutputState, columnText);
|
||||
}
|
||||
else
|
||||
{
|
||||
CopySendString(rowOutputState, rowOutputState->null_print_client);
|
||||
}
|
||||
|
||||
lastColumn = ((columnIndex + 1) == columnCount);
|
||||
if (!lastColumn)
|
||||
{
|
||||
CopySendChar(rowOutputState, rowOutputState->delim[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!rowOutputState->binary)
|
||||
{
|
||||
/* append default line termination string depending on the platform */
|
||||
#ifndef WIN32
|
||||
CopySendChar(rowOutputState, '\n');
|
||||
#else
|
||||
CopySendString(rowOutputState, "\r\n");
|
||||
#endif
|
||||
}
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
pfree(valueArray);
|
||||
pfree(isNullArray);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Write the header of postgres' binary serialization format to each partition file.
|
||||
* This function is used when binary_worker_copy_format is enabled.
|
||||
|
@ -1093,22 +967,14 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount)
|
|||
for (fileIndex = 0; fileIndex < fileCount; fileIndex++)
|
||||
{
|
||||
/* Generate header for a binary copy */
|
||||
const int32 zero = 0;
|
||||
FileOutputStream partitionFile = { 0, 0, 0 };
|
||||
PartialCopyStateData headerOutputStateData;
|
||||
PartialCopyState headerOutputState = (PartialCopyState) & headerOutputStateData;
|
||||
CopyOutStateData headerOutputStateData;
|
||||
CopyOutState headerOutputState = (CopyOutState) & headerOutputStateData;
|
||||
|
||||
memset(headerOutputState, 0, sizeof(PartialCopyStateData));
|
||||
memset(headerOutputState, 0, sizeof(CopyOutStateData));
|
||||
headerOutputState->fe_msgbuf = makeStringInfo();
|
||||
|
||||
/* Signature */
|
||||
CopySendData(headerOutputState, BinarySignature, 11);
|
||||
|
||||
/* Flags field (no OIDs) */
|
||||
CopySendInt32(headerOutputState, zero);
|
||||
|
||||
/* No header extension */
|
||||
CopySendInt32(headerOutputState, zero);
|
||||
AppendCopyBinaryHeaders(headerOutputState);
|
||||
|
||||
partitionFile = partitionFileArray[fileIndex];
|
||||
FileOutputStreamWrite(partitionFile, headerOutputState->fe_msgbuf);
|
||||
|
@ -1127,15 +993,14 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount)
|
|||
for (fileIndex = 0; fileIndex < fileCount; fileIndex++)
|
||||
{
|
||||
/* Generate footer for a binary copy */
|
||||
int16 negative = -1;
|
||||
FileOutputStream partitionFile = { 0, 0, 0 };
|
||||
PartialCopyStateData footerOutputStateData;
|
||||
PartialCopyState footerOutputState = (PartialCopyState) & footerOutputStateData;
|
||||
CopyOutStateData footerOutputStateData;
|
||||
CopyOutState footerOutputState = (CopyOutState) & footerOutputStateData;
|
||||
|
||||
memset(footerOutputState, 0, sizeof(PartialCopyStateData));
|
||||
memset(footerOutputState, 0, sizeof(CopyOutStateData));
|
||||
footerOutputState->fe_msgbuf = makeStringInfo();
|
||||
|
||||
CopySendInt16(footerOutputState, negative);
|
||||
AppendCopyBinaryFooters(footerOutputState);
|
||||
|
||||
partitionFile = partitionFileArray[fileIndex];
|
||||
FileOutputStreamWrite(partitionFile, footerOutputState->fe_msgbuf);
|
||||
|
@ -1143,158 +1008,6 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount)
|
|||
}
|
||||
|
||||
|
||||
/* *INDENT-OFF* */
|
||||
/* Append data to the copy buffer in outputState */
|
||||
static void
|
||||
CopySendData(PartialCopyState outputState, const void *databuf, int datasize)
|
||||
{
|
||||
appendBinaryStringInfo(outputState->fe_msgbuf, databuf, datasize);
|
||||
}
|
||||
|
||||
|
||||
/* Append a striong to the copy buffer in outputState. */
|
||||
static void
|
||||
CopySendString(PartialCopyState outputState, const char *str)
|
||||
{
|
||||
appendBinaryStringInfo(outputState->fe_msgbuf, str, strlen(str));
|
||||
}
|
||||
|
||||
|
||||
/* Append a char to the copy buffer in outputState. */
|
||||
static void
|
||||
CopySendChar(PartialCopyState outputState, char c)
|
||||
{
|
||||
appendStringInfoCharMacro(outputState->fe_msgbuf, c);
|
||||
}
|
||||
|
||||
|
||||
/* Append an int32 to the copy buffer in outputState. */
|
||||
static void
|
||||
CopySendInt32(PartialCopyState outputState, int32 val)
|
||||
{
|
||||
uint32 buf = htonl((uint32) val);
|
||||
CopySendData(outputState, &buf, sizeof(buf));
|
||||
}
|
||||
|
||||
|
||||
/* Append an int16 to the copy buffer in outputState. */
|
||||
static void
|
||||
CopySendInt16(PartialCopyState outputState, int16 val)
|
||||
{
|
||||
uint16 buf = htons((uint16) val);
|
||||
CopySendData(outputState, &buf, sizeof(buf));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Send text representation of one column, with conversion and escaping.
|
||||
*
|
||||
* NB: This function is based on commands/copy.c and doesn't fully conform to
|
||||
* our coding style. The function should be kept in sync with copy.c.
|
||||
*/
|
||||
static void
|
||||
CopyAttributeOutText(PartialCopyState cstate, char *string)
|
||||
{
|
||||
char *pointer = NULL;
|
||||
char *start = NULL;
|
||||
char c = '\0';
|
||||
char delimc = cstate->delim[0];
|
||||
|
||||
if (cstate->need_transcoding)
|
||||
{
|
||||
pointer = pg_server_to_any(string, strlen(string), cstate->file_encoding);
|
||||
}
|
||||
else
|
||||
{
|
||||
pointer = string;
|
||||
}
|
||||
|
||||
/*
|
||||
* We have to grovel through the string searching for control characters
|
||||
* and instances of the delimiter character. In most cases, though, these
|
||||
* are infrequent. To avoid overhead from calling CopySendData once per
|
||||
* character, we dump out all characters between escaped characters in a
|
||||
* single call. The loop invariant is that the data from "start" to "pointer"
|
||||
* can be sent literally, but hasn't yet been.
|
||||
*
|
||||
* As all encodings here are safe, i.e. backend supported ones, we can
|
||||
* skip doing pg_encoding_mblen(), because in valid backend encodings,
|
||||
* extra bytes of a multibyte character never look like ASCII.
|
||||
*/
|
||||
start = pointer;
|
||||
while ((c = *pointer) != '\0')
|
||||
{
|
||||
if ((unsigned char) c < (unsigned char) 0x20)
|
||||
{
|
||||
/*
|
||||
* \r and \n must be escaped, the others are traditional. We
|
||||
* prefer to dump these using the C-like notation, rather than
|
||||
* a backslash and the literal character, because it makes the
|
||||
* dump file a bit more proof against Microsoftish data
|
||||
* mangling.
|
||||
*/
|
||||
switch (c)
|
||||
{
|
||||
case '\b':
|
||||
c = 'b';
|
||||
break;
|
||||
case '\f':
|
||||
c = 'f';
|
||||
break;
|
||||
case '\n':
|
||||
c = 'n';
|
||||
break;
|
||||
case '\r':
|
||||
c = 'r';
|
||||
break;
|
||||
case '\t':
|
||||
c = 't';
|
||||
break;
|
||||
case '\v':
|
||||
c = 'v';
|
||||
break;
|
||||
default:
|
||||
/* If it's the delimiter, must backslash it */
|
||||
if (c == delimc)
|
||||
break;
|
||||
/* All ASCII control chars are length 1 */
|
||||
pointer++;
|
||||
continue; /* fall to end of loop */
|
||||
}
|
||||
/* if we get here, we need to convert the control char */
|
||||
CopyFlushOutput(cstate, start, pointer);
|
||||
CopySendChar(cstate, '\\');
|
||||
CopySendChar(cstate, c);
|
||||
start = ++pointer; /* do not include char in next run */
|
||||
}
|
||||
else if (c == '\\' || c == delimc)
|
||||
{
|
||||
CopyFlushOutput(cstate, start, pointer);
|
||||
CopySendChar(cstate, '\\');
|
||||
start = pointer++; /* we include char in next run */
|
||||
}
|
||||
else
|
||||
{
|
||||
pointer++;
|
||||
}
|
||||
}
|
||||
|
||||
CopyFlushOutput(cstate, start, pointer);
|
||||
}
|
||||
|
||||
|
||||
/* *INDENT-ON* */
|
||||
/* Helper function to send pending copy output */
|
||||
static inline void
|
||||
CopyFlushOutput(PartialCopyState cstate, char *start, char *pointer)
|
||||
{
|
||||
if (pointer > start)
|
||||
{
|
||||
CopySendData(cstate, start, pointer - start);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Helper function that invokes a function with the default collation oid. */
|
||||
Datum
|
||||
CompareCall2(FmgrInfo *functionInfo, Datum leftArgument, Datum rightArgument)
|
||||
|
|
|
@ -54,6 +54,8 @@ typedef struct NodeConnectionEntry
|
|||
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
||||
extern void PurgeConnection(PGconn *connection);
|
||||
extern void ReportRemoteError(PGconn *connection, PGresult *result);
|
||||
extern PGconn * ConnectToNode(char *nodeName, int nodePort);
|
||||
extern char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword);
|
||||
|
||||
|
||||
#endif /* CONNECTION_CACHE_H */
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* multi_copy.h
|
||||
* Declarations for public functions and variables used in COPY for
|
||||
* distributed tables.
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef MULTI_COPY_H
|
||||
#define MULTI_COPY_H
|
||||
|
||||
|
||||
#include "nodes/parsenodes.h"
|
||||
|
||||
|
||||
/* config variable managed via guc.c */
|
||||
extern int CopyTransactionManager;
|
||||
|
||||
|
||||
/*
|
||||
* A smaller version of copy.c's CopyStateData, trimmed to the elements
|
||||
* necessary to copy out results. While it'd be a bit nicer to share code,
|
||||
* it'd require changing core postgres code.
|
||||
*/
|
||||
typedef struct CopyOutStateData
|
||||
{
|
||||
StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
|
||||
* dest == COPY_NEW_FE in COPY FROM */
|
||||
int file_encoding; /* file or remote side's character encoding */
|
||||
bool need_transcoding; /* file encoding diff from server? */
|
||||
bool binary; /* binary format? */
|
||||
char *null_print; /* NULL marker string (server encoding!) */
|
||||
char *null_print_client; /* same converted to file encoding */
|
||||
char *delim; /* column delimiter (must be 1 byte) */
|
||||
|
||||
MemoryContext rowcontext; /* per-row evaluation context */
|
||||
} CopyOutStateData;
|
||||
|
||||
typedef struct CopyOutStateData *CopyOutState;
|
||||
|
||||
|
||||
/* function declarations for copying into a distributed table */
|
||||
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
||||
extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
|
||||
TupleDesc rowDescriptor,
|
||||
CopyOutState rowOutputState,
|
||||
FmgrInfo *columnOutputFunctions);
|
||||
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
|
||||
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
|
||||
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
||||
|
||||
|
||||
#endif /* MULTI_COPY_H */
|
|
@ -0,0 +1,58 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* multi_transaction.h
|
||||
* Type and function declarations used in performing transactions across
|
||||
* shard placements.
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef MULTI_TRANSACTION_H
|
||||
#define MULTI_TRANSACTION_H
|
||||
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
||||
|
||||
/* Enumeration that defines the different transaction managers available */
|
||||
typedef enum
|
||||
{
|
||||
TRANSACTION_MANAGER_1PC = 0,
|
||||
TRANSACTION_MANAGER_2PC = 1
|
||||
} TransactionManagerType;
|
||||
|
||||
/* Enumeration that defines different remote transaction states */
|
||||
typedef enum
|
||||
{
|
||||
TRANSACTION_STATE_INVALID = 0,
|
||||
TRANSACTION_STATE_OPEN,
|
||||
TRANSACTION_STATE_COPY_STARTED,
|
||||
TRANSACTION_STATE_PREPARED,
|
||||
TRANSACTION_STATE_CLOSED
|
||||
} TransactionState;
|
||||
|
||||
/*
|
||||
* TransactionConnection represents a connection to a remote node which is
|
||||
* used to perform a transaction on shard placements.
|
||||
*/
|
||||
typedef struct TransactionConnection
|
||||
{
|
||||
int64 connectionId;
|
||||
TransactionState transactionState;
|
||||
PGconn *connection;
|
||||
} TransactionConnection;
|
||||
|
||||
|
||||
/* Functions declarations for transaction and connection management */
|
||||
extern void InitializeDistributedTransaction(void);
|
||||
extern void PrepareRemoteTransactions(List *connectionList);
|
||||
extern void AbortRemoteTransactions(List *connectionList);
|
||||
extern void CommitRemoteTransactions(List *connectionList);
|
||||
extern void CloseConnections(List *connectionList);
|
||||
|
||||
|
||||
#endif /* MULTI_TRANSACTION_H */
|
|
@ -79,28 +79,6 @@ typedef struct HashPartitionContext
|
|||
} HashPartitionContext;
|
||||
|
||||
|
||||
/*
|
||||
* A smaller version of copy.c's CopyStateData, trimmed to the elements
|
||||
* necessary for re-partition jobs. While it'd be a bit nicer to share code,
|
||||
* it'd require changing core postgres code.
|
||||
*/
|
||||
typedef struct PartialCopyStateData
|
||||
{
|
||||
StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
|
||||
* dest == COPY_NEW_FE in COPY FROM */
|
||||
int file_encoding; /* file or remote side's character encoding */
|
||||
bool need_transcoding; /* file encoding diff from server? */
|
||||
bool binary; /* binary format? */
|
||||
char *null_print; /* NULL marker string (server encoding!) */
|
||||
char *null_print_client; /* same converted to file encoding */
|
||||
char *delim; /* column delimiter (must be 1 byte) */
|
||||
|
||||
MemoryContext rowcontext; /* per-row evaluation context */
|
||||
} PartialCopyStateData;
|
||||
|
||||
typedef struct PartialCopyStateData *PartialCopyState;
|
||||
|
||||
|
||||
/*
|
||||
* FileOutputStream helps buffer write operations to a file; these writes are
|
||||
* then regularly flushed to the underlying file. This structure differs from
|
||||
|
|
|
@ -18,9 +18,6 @@ SELECT master_create_worker_shards('sharded_table', 2, 1);
|
|||
COPY sharded_table TO STDOUT;
|
||||
COPY (SELECT COUNT(*) FROM sharded_table) TO STDOUT;
|
||||
0
|
||||
-- but COPY in is not
|
||||
COPY sharded_table FROM STDIN;
|
||||
ERROR: cannot execute COPY FROM on a distributed table on master node
|
||||
-- cursors may not involve distributed tables
|
||||
DECLARE all_sharded_rows CURSOR FOR SELECT * FROM sharded_table;
|
||||
ERROR: DECLARE CURSOR can only be used in transaction blocks
|
||||
|
|
|
@ -188,9 +188,6 @@ VIETNAM
|
|||
RUSSIA
|
||||
UNITED KINGDOM
|
||||
UNITED STATES
|
||||
-- Ensure that preventing COPY FROM against distributed tables works
|
||||
COPY customer FROM STDIN;
|
||||
ERROR: cannot execute COPY FROM on a distributed table on master node
|
||||
-- Test that we can create on-commit drop tables, and also test creating with
|
||||
-- oids, along with changing column names
|
||||
BEGIN;
|
||||
|
|
|
@ -0,0 +1,190 @@
|
|||
--
|
||||
-- MULTI_COPY
|
||||
--
|
||||
-- Create a new hash-partitioned table into which to COPY
|
||||
CREATE TABLE customer_copy_hash (
|
||||
c_custkey integer,
|
||||
c_name varchar(25) not null,
|
||||
c_address varchar(40),
|
||||
c_nationkey integer,
|
||||
c_phone char(15),
|
||||
c_acctbal decimal(15,2),
|
||||
c_mktsegment char(10),
|
||||
c_comment varchar(117),
|
||||
primary key (c_custkey));
|
||||
SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash');
|
||||
|
||||
-- Test COPY into empty hash-partitioned table
|
||||
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
||||
|
||||
SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
|
||||
|
||||
-- Test empty copy
|
||||
COPY customer_copy_hash FROM STDIN;
|
||||
\.
|
||||
|
||||
-- Test syntax error
|
||||
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
|
||||
1,customer1
|
||||
2,customer2,
|
||||
notinteger,customernot
|
||||
\.
|
||||
|
||||
-- Confirm that no data was copied
|
||||
SELECT count(*) FROM customer_copy_hash;
|
||||
|
||||
-- Test primary key violation
|
||||
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
||||
WITH (FORMAT 'csv');
|
||||
1,customer1
|
||||
2,customer2
|
||||
2,customer2
|
||||
\.
|
||||
|
||||
-- Confirm that no data was copied
|
||||
SELECT count(*) FROM customer_copy_hash;
|
||||
|
||||
-- Test headers option
|
||||
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
||||
WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey));
|
||||
# header
|
||||
1,customer1
|
||||
2,customer2
|
||||
3,customer3
|
||||
\.
|
||||
|
||||
-- Confirm that only first row was skipped
|
||||
SELECT count(*) FROM customer_copy_hash;
|
||||
|
||||
-- Test force_not_null option
|
||||
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
|
||||
WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address));
|
||||
"4","customer4",""
|
||||
\.
|
||||
|
||||
-- Confirm that value is not null
|
||||
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4;
|
||||
|
||||
-- Test force_null option
|
||||
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
|
||||
WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address));
|
||||
"5","customer5",""
|
||||
\.
|
||||
|
||||
-- Confirm that value is null
|
||||
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5;
|
||||
|
||||
-- Test null violation
|
||||
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
||||
WITH (FORMAT 'csv');
|
||||
6,customer6
|
||||
7,customer7
|
||||
8,
|
||||
\.
|
||||
|
||||
-- Confirm that no data was copied
|
||||
SELECT count(*) FROM customer_copy_hash;
|
||||
|
||||
-- Test server-side copy from program
|
||||
COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9'
|
||||
WITH (DELIMITER ' ');
|
||||
|
||||
-- Confirm that data was copied
|
||||
SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9;
|
||||
|
||||
-- Test server-side copy from file
|
||||
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.2.data' WITH (DELIMITER '|');
|
||||
|
||||
-- Confirm that data was copied
|
||||
SELECT count(*) FROM customer_copy_hash;
|
||||
|
||||
-- Test client-side copy from file
|
||||
\COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.3.data' WITH (DELIMITER '|');
|
||||
|
||||
-- Confirm that data was copied
|
||||
SELECT count(*) FROM customer_copy_hash;
|
||||
|
||||
-- Create a new hash-partitioned table with default now() function
|
||||
CREATE TABLE customer_with_default(
|
||||
c_custkey integer,
|
||||
c_name varchar(25) not null,
|
||||
c_time timestamp default now());
|
||||
|
||||
SELECT master_create_distributed_table('customer_with_default', 'c_custkey', 'hash');
|
||||
|
||||
SELECT master_create_worker_shards('customer_with_default', 64, 1);
|
||||
|
||||
-- Test with default values for now() function
|
||||
COPY customer_with_default (c_custkey, c_name) FROM STDIN
|
||||
WITH (FORMAT 'csv');
|
||||
1,customer1
|
||||
2,customer2
|
||||
\.
|
||||
|
||||
-- Confirm that data was copied with now() function
|
||||
SELECT count(*) FROM customer_with_default where c_time IS NOT NULL;
|
||||
|
||||
-- Add columns to the table and perform a COPY
|
||||
ALTER TABLE customer_copy_hash ADD COLUMN extra1 INT DEFAULT 0;
|
||||
ALTER TABLE customer_copy_hash ADD COLUMN extra2 INT DEFAULT 0;
|
||||
|
||||
COPY customer_copy_hash (c_custkey, c_name, extra1, extra2) FROM STDIN CSV;
|
||||
10,customer10,1,5
|
||||
\.
|
||||
|
||||
SELECT * FROM customer_copy_hash WHERE extra1 = 1;
|
||||
|
||||
-- Test dropping an intermediate column
|
||||
ALTER TABLE customer_copy_hash DROP COLUMN extra1;
|
||||
|
||||
COPY customer_copy_hash (c_custkey, c_name, extra2) FROM STDIN CSV;
|
||||
11,customer11,5
|
||||
\.
|
||||
|
||||
SELECT * FROM customer_copy_hash WHERE c_custkey = 11;
|
||||
|
||||
-- Test dropping the last column
|
||||
ALTER TABLE customer_copy_hash DROP COLUMN extra2;
|
||||
|
||||
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN CSV;
|
||||
12,customer12
|
||||
\.
|
||||
|
||||
SELECT * FROM customer_copy_hash WHERE c_custkey = 12;
|
||||
|
||||
-- Create a new range-partitioned table into which to COPY
|
||||
CREATE TABLE customer_copy_range (
|
||||
c_custkey integer,
|
||||
c_name varchar(25),
|
||||
c_address varchar(40),
|
||||
c_nationkey integer,
|
||||
c_phone char(15),
|
||||
c_acctbal decimal(15,2),
|
||||
c_mktsegment char(10),
|
||||
c_comment varchar(117),
|
||||
primary key (c_custkey));
|
||||
|
||||
SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range');
|
||||
|
||||
-- Test COPY into empty range-partitioned table
|
||||
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
||||
|
||||
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
|
||||
\gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
||||
WHERE shardid = :new_shard_id;
|
||||
|
||||
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
|
||||
\gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
|
||||
WHERE shardid = :new_shard_id;
|
||||
|
||||
-- Test copy into range-partitioned table
|
||||
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
||||
|
||||
-- Check whether data went into the right shard (maybe)
|
||||
SELECT min(c_custkey), max(c_custkey), avg(c_custkey), count(*)
|
||||
FROM customer_copy_range WHERE c_custkey <= 500;
|
||||
|
||||
-- Check whether data was copied
|
||||
SELECT count(*) FROM customer_copy_range;
|
|
@ -121,6 +121,11 @@ test: multi_create_insert_proxy
|
|||
test: multi_data_types
|
||||
test: multi_repartitioned_subquery_udf
|
||||
|
||||
# ---------
|
||||
# multi_copy creates hash and range-partitioned tables and performs COPY
|
||||
# ---------
|
||||
test: multi_copy
|
||||
|
||||
# ----------
|
||||
# multi_large_shardid stages more shards into lineitem
|
||||
# ----------
|
||||
|
|
|
@ -0,0 +1,227 @@
|
|||
--
|
||||
-- MULTI_COPY
|
||||
--
|
||||
-- Create a new hash-partitioned table into which to COPY
|
||||
CREATE TABLE customer_copy_hash (
|
||||
c_custkey integer,
|
||||
c_name varchar(25) not null,
|
||||
c_address varchar(40),
|
||||
c_nationkey integer,
|
||||
c_phone char(15),
|
||||
c_acctbal decimal(15,2),
|
||||
c_mktsegment char(10),
|
||||
c_comment varchar(117),
|
||||
primary key (c_custkey));
|
||||
SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Test COPY into empty hash-partitioned table
|
||||
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
||||
ERROR: could not find any shards into which to copy
|
||||
DETAIL: No shards exist for distributed table "customer_copy_hash".
|
||||
HINT: Run master_create_worker_shards to create shards and try again.
|
||||
SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Test empty copy
|
||||
COPY customer_copy_hash FROM STDIN;
|
||||
-- Test syntax error
|
||||
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
|
||||
ERROR: invalid input syntax for integer: "1,customer1"
|
||||
CONTEXT: COPY customer_copy_hash, line 1, column c_custkey: "1,customer1"
|
||||
-- Confirm that no data was copied
|
||||
SELECT count(*) FROM customer_copy_hash;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Test primary key violation
|
||||
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
||||
WITH (FORMAT 'csv');
|
||||
ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_103160"
|
||||
DETAIL: Key (c_custkey)=(2) already exists.
|
||||
-- Confirm that no data was copied
|
||||
SELECT count(*) FROM customer_copy_hash;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Test headers option
|
||||
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
||||
WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey));
|
||||
-- Confirm that only first row was skipped
|
||||
SELECT count(*) FROM customer_copy_hash;
|
||||
count
|
||||
-------
|
||||
3
|
||||
(1 row)
|
||||
|
||||
-- Test force_not_null option
|
||||
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
|
||||
WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address));
|
||||
-- Confirm that value is not null
|
||||
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4;
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- Test force_null option
|
||||
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
|
||||
WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address));
|
||||
-- Confirm that value is null
|
||||
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Test null violation
|
||||
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
||||
WITH (FORMAT 'csv');
|
||||
ERROR: null value in column "c_name" violates not-null constraint
|
||||
DETAIL: Failing row contains (8, null, null, null, null, null, null, null).
|
||||
-- Confirm that no data was copied
|
||||
SELECT count(*) FROM customer_copy_hash;
|
||||
count
|
||||
-------
|
||||
5
|
||||
(1 row)
|
||||
|
||||
-- Test server-side copy from program
|
||||
COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9'
|
||||
WITH (DELIMITER ' ');
|
||||
-- Confirm that data was copied
|
||||
SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9;
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- Test server-side copy from file
|
||||
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.2.data' WITH (DELIMITER '|');
|
||||
-- Confirm that data was copied
|
||||
SELECT count(*) FROM customer_copy_hash;
|
||||
count
|
||||
-------
|
||||
1006
|
||||
(1 row)
|
||||
|
||||
-- Test client-side copy from file
|
||||
\COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.3.data' WITH (DELIMITER '|');
|
||||
-- Confirm that data was copied
|
||||
SELECT count(*) FROM customer_copy_hash;
|
||||
count
|
||||
-------
|
||||
2006
|
||||
(1 row)
|
||||
|
||||
-- Create a new hash-partitioned table with default now() function
|
||||
CREATE TABLE customer_with_default(
|
||||
c_custkey integer,
|
||||
c_name varchar(25) not null,
|
||||
c_time timestamp default now());
|
||||
SELECT master_create_distributed_table('customer_with_default', 'c_custkey', 'hash');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_worker_shards('customer_with_default', 64, 1);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Test with default values for now() function
|
||||
COPY customer_with_default (c_custkey, c_name) FROM STDIN
|
||||
WITH (FORMAT 'csv');
|
||||
-- Confirm that data was copied with now() function
|
||||
SELECT count(*) FROM customer_with_default where c_time IS NOT NULL;
|
||||
count
|
||||
-------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
-- Add columns to the table and perform a COPY
|
||||
ALTER TABLE customer_copy_hash ADD COLUMN extra1 INT DEFAULT 0;
|
||||
ALTER TABLE customer_copy_hash ADD COLUMN extra2 INT DEFAULT 0;
|
||||
COPY customer_copy_hash (c_custkey, c_name, extra1, extra2) FROM STDIN CSV;
|
||||
SELECT * FROM customer_copy_hash WHERE extra1 = 1;
|
||||
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra1 | extra2
|
||||
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------+--------+--------
|
||||
10 | customer10 | | | | | | | 1 | 5
|
||||
(1 row)
|
||||
|
||||
-- Test dropping an intermediate column
|
||||
ALTER TABLE customer_copy_hash DROP COLUMN extra1;
|
||||
COPY customer_copy_hash (c_custkey, c_name, extra2) FROM STDIN CSV;
|
||||
SELECT * FROM customer_copy_hash WHERE c_custkey = 11;
|
||||
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra2
|
||||
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------+--------
|
||||
11 | customer11 | | | | | | | 5
|
||||
(1 row)
|
||||
|
||||
-- Test dropping the last column
|
||||
ALTER TABLE customer_copy_hash DROP COLUMN extra2;
|
||||
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN CSV;
|
||||
SELECT * FROM customer_copy_hash WHERE c_custkey = 12;
|
||||
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment
|
||||
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------
|
||||
12 | customer12 | | | | | |
|
||||
(1 row)
|
||||
|
||||
-- Create a new range-partitioned table into which to COPY
|
||||
CREATE TABLE customer_copy_range (
|
||||
c_custkey integer,
|
||||
c_name varchar(25),
|
||||
c_address varchar(40),
|
||||
c_nationkey integer,
|
||||
c_phone char(15),
|
||||
c_acctbal decimal(15,2),
|
||||
c_mktsegment char(10),
|
||||
c_comment varchar(117),
|
||||
primary key (c_custkey));
|
||||
SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Test COPY into empty range-partitioned table
|
||||
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
||||
ERROR: could not find any shards into which to copy
|
||||
DETAIL: No shards exist for distributed table "customer_copy_range".
|
||||
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
|
||||
\gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
||||
WHERE shardid = :new_shard_id;
|
||||
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
|
||||
\gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
|
||||
WHERE shardid = :new_shard_id;
|
||||
-- Test copy into range-partitioned table
|
||||
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
||||
-- Check whether data went into the right shard (maybe)
|
||||
SELECT min(c_custkey), max(c_custkey), avg(c_custkey), count(*)
|
||||
FROM customer_copy_range WHERE c_custkey <= 500;
|
||||
min | max | avg | count
|
||||
-----+-----+----------------------+-------
|
||||
1 | 500 | 250.5000000000000000 | 500
|
||||
(1 row)
|
||||
|
||||
-- Check whether data was copied
|
||||
SELECT count(*) FROM customer_copy_range;
|
||||
count
|
||||
-------
|
||||
1000
|
||||
(1 row)
|
||||
|
|
@ -10,9 +10,6 @@ SELECT master_create_worker_shards('sharded_table', 2, 1);
|
|||
COPY sharded_table TO STDOUT;
|
||||
COPY (SELECT COUNT(*) FROM sharded_table) TO STDOUT;
|
||||
|
||||
-- but COPY in is not
|
||||
COPY sharded_table FROM STDIN;
|
||||
|
||||
-- cursors may not involve distributed tables
|
||||
DECLARE all_sharded_rows CURSOR FOR SELECT * FROM sharded_table;
|
||||
|
||||
|
|
|
@ -104,9 +104,6 @@ COPY nation TO STDOUT;
|
|||
-- ensure individual cols can be copied out, too
|
||||
COPY nation(n_name) TO STDOUT;
|
||||
|
||||
-- Ensure that preventing COPY FROM against distributed tables works
|
||||
COPY customer FROM STDIN;
|
||||
|
||||
-- Test that we can create on-commit drop tables, and also test creating with
|
||||
-- oids, along with changing column names
|
||||
|
||||
|
|
Loading…
Reference in New Issue