Various code style improvements for COPY

pull/366/head
Marco Slot 2016-03-25 16:23:39 +01:00
parent eea877ce4e
commit 7328390f3b
5 changed files with 69 additions and 33 deletions

View File

@ -4,10 +4,33 @@
* This file contains implementation of COPY utility for distributed
* tables.
*
* Contributed by Konstantin Knizhnik, Postgres Professional
* The CitusCopyFrom function should be called from the utility hook to
* process COPY ... FROM commands on distributed tables. CitusCopyFrom
* parses the input from stdin, a program executed on the master, or a file
* on the master, and decides into which shard to put the data. It opens a
* new connection for every shard placement and uses the PQputCopyData
* function to copy the data. Because of buffering in PQputCopyData, the
* workers will ingest the data at least partially in parallel.
*
* When failing to connect to a worker, the master marks the placement for
* which it was trying to open a connection as inactive, similar to the way
* DML statements are handled. If a failure occurs after connecting, the
* transaction is rolled back on all the workers.
*
* By default, COPY uses normal transactions on the workers. This can cause
* a problem when some of the transactions fail to commit while others have
* succeeded. To ensure no data is lost, COPY can use two-phase commit, by
* increasing max_prepared_transactions on the worker and setting
* citus.copy_transaction_manager to '2pc'. The default is '1pc'.
*
* Parsing options are processed and enforced on the master, while
* constraints are enforced on the worker. In either case, failure causes
* the whole COPY to roll back.
*
* Copyright (c) 2016, Citus Data, Inc.
*
* With contributions from Postgres Professional.
*
*-------------------------------------------------------------------------
*/
@ -130,6 +153,8 @@ static void SendCopyDataToPlacements(StringInfo dataBuffer,
static List * ConnectionList(HTAB *connectionHash);
static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
static void ReportCopyError(PGconn *connection, PGresult *result);
/* Private functions copied and adapted from copy.c in PostgreSQL */
static void CopySendData(CopyOutState outputState, const void *databuf, int datasize);
static void CopySendString(CopyOutState outputState, const char *str);
static void CopySendChar(CopyOutState outputState, char c);
@ -228,7 +253,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
if (partitionMethod == DISTRIBUTE_BY_HASH)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not find any shards for query"),
errmsg("could not find any shards into which to copy"),
errdetail("No shards exist for distributed table \"%s\".",
relationName),
errhint("Run master_create_worker_shards to create shards "
@ -237,7 +262,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
else
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not find any shards for query"),
errmsg("could not find any shards into which to copy"),
errdetail("No shards exist for distributed table \"%s\".",
relationName)));
}
@ -304,7 +329,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
Datum partitionColumnValue = 0;
ShardInterval *shardInterval = NULL;
int64 shardId = 0;
bool found = false;
bool shardConnectionsFound = false;
MemoryContext oldContext = NULL;
ResetPerTupleExprContext(executorState);
@ -313,7 +338,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
/* parse a row from the input */
nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
columnValues,columnNulls, NULL);
columnValues, columnNulls, NULL);
if (!nextRowFound)
{
@ -342,7 +367,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
if (shardInterval == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("no shard for partition column value")));
errmsg("could not find shard for partition column "
"value")));
}
shardId = shardInterval->shardId;
@ -353,8 +379,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
shardConnections = (ShardConnections *) hash_search(shardConnectionHash,
&shardId,
HASH_ENTER,
&found);
if (!found)
&shardConnectionsFound);
if (!shardConnectionsFound)
{
/* intialize COPY transactions on shard placements */
shardConnections->shardId = shardId;
@ -393,29 +419,31 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
if (CopyTransactionManager == TRANSACTION_MANAGER_2PC)
{
PrepareTransactions(connectionList);
PrepareRemoteTransactions(connectionList);
}
EndCopyFrom(copyState);
heap_close(rel, NoLock);
error_context_stack = errorCallback.previous;
CHECK_FOR_INTERRUPTS();
}
PG_CATCH();
{
error_context_stack = errorCallback.previous;
/* roll back all transactions */
connectionList = ConnectionList(shardConnectionHash);
EndRemoteCopy(connectionList, false);
AbortTransactions(connectionList);
AbortRemoteTransactions(connectionList);
CloseConnections(connectionList);
PG_RE_THROW();
}
PG_END_TRY();
EndCopyFrom(copyState);
heap_close(rel, NoLock);
error_context_stack = errorCallback.previous;
CommitTransactions(connectionList);
CommitRemoteTransactions(connectionList);
CloseConnections(connectionList);
if (completionTag != NULL)
@ -491,11 +519,16 @@ ShardIntervalCompareFunction(Var *partitionColumn, char partitionMethod)
{
compareFunction = GetFunctionInfo(INT4OID, BTREE_AM_OID, BTORDER_PROC);
}
else
else if (partitionMethod == DISTRIBUTE_BY_RANGE)
{
compareFunction = GetFunctionInfo(partitionColumn->vartype,
BTREE_AM_OID, BTORDER_PROC);
}
else
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unsupported partition method %d", partitionMethod)));
}
return compareFunction;
}
@ -682,7 +715,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
/* if all placements failed, error out */
if (list_length(failedPlacementList) == list_length(finalizedPlacementList))
{
ereport(ERROR, (errmsg("could not modify any active placements")));
ereport(ERROR, (errmsg("could not find any active placements")));
}
/* otherwise, mark failed placements as inactive: they're stale */
@ -792,8 +825,6 @@ EndRemoteCopy(List *connectionList, bool stopOnFailure)
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 shardId = transactionConnection->connectionId;
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
int copyEndResult = 0;
PGresult *result = NULL;
@ -811,6 +842,9 @@ EndRemoteCopy(List *connectionList, bool stopOnFailure)
{
if (stopOnFailure)
{
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to COPY to shard %ld on %s:%s",
shardId, nodeName, nodePort)));

View File

@ -25,12 +25,12 @@ static StringInfo BuildTransactionName(int connectionId);
/*
* PrepareTransactions prepares all transactions on connections in
* PrepareRemoteTransactions prepares all transactions on connections in
* connectionList for commit if the 2PC transaction manager is enabled.
* On failure, it reports an error and stops.
*/
void
PrepareTransactions(List *connectionList)
PrepareRemoteTransactions(List *connectionList)
{
ListCell *connectionCell = NULL;
@ -68,11 +68,11 @@ PrepareTransactions(List *connectionList)
/*
* AbortTransactions aborts all transactions on connections in connectionList.
* AbortRemoteTransactions aborts all transactions on connections in connectionList.
* On failure, it reports a warning and continues to abort all of them.
*/
void
AbortTransactions(List *connectionList)
AbortRemoteTransactions(List *connectionList)
{
ListCell *connectionCell = NULL;
@ -118,11 +118,11 @@ AbortTransactions(List *connectionList)
/*
* CommitTransactions commits all transactions on connections in connectionList.
* CommitRemoteTransactions commits all transactions on connections in connectionList.
* On failure, it reports a warning and continues committing all of them.
*/
void
CommitTransactions(List *connectionList)
CommitRemoteTransactions(List *connectionList)
{
ListCell *connectionCell = NULL;

View File

@ -46,8 +46,10 @@ typedef struct CopyOutStateData *CopyOutState;
/* function declarations for copying into a distributed table */
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
extern void BuildCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions);
extern void BuildCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc
rowDescriptor,
CopyOutState rowOutputState,
FmgrInfo *columnOutputFunctions);
extern void BuildCopyBinaryHeaders(CopyOutState headerOutputState);
extern void BuildCopyBinaryFooters(CopyOutState footerOutputState);
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);

View File

@ -48,9 +48,9 @@ typedef struct TransactionConnection
/* Functions declarations for transaction and connection management */
extern void PrepareTransactions(List *connectionList);
extern void AbortTransactions(List *connectionList);
extern void CommitTransactions(List *connectionList);
extern void PrepareRemoteTransactions(List *connectionList);
extern void AbortRemoteTransactions(List *connectionList);
extern void CommitRemoteTransactions(List *connectionList);
extern void CloseConnections(List *connectionList);

View File

@ -20,7 +20,7 @@ SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash'
-- Test COPY into empty hash-partitioned table
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
ERROR: could not find any shards for query
ERROR: could not find any shards into which to copy
DETAIL: No shards exist for distributed table "customer_copy_hash".
HINT: Run master_create_worker_shards to create shards and try again.
SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
@ -172,7 +172,7 @@ SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'rang
-- Test COPY into empty range-partitioned table
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
ERROR: could not find any shards for query
ERROR: could not find any shards into which to copy
DETAIL: No shards exist for distributed table "customer_copy_range".
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
\gset