diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 0c0e889bf..491777f0d 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -4,10 +4,33 @@ * This file contains implementation of COPY utility for distributed * tables. * - * Contributed by Konstantin Knizhnik, Postgres Professional + * The CitusCopyFrom function should be called from the utility hook to + * process COPY ... FROM commands on distributed tables. CitusCopyFrom + * parses the input from stdin, a program executed on the master, or a file + * on the master, and decides into which shard to put the data. It opens a + * new connection for every shard placement and uses the PQputCopyData + * function to copy the data. Because of buffering in PQputCopyData, the + * workers will ingest the data at least partially in parallel. + * + * When failing to connect to a worker, the master marks the placement for + * which it was trying to open a connection as inactive, similar to the way + * DML statements are handled. If a failure occurs after connecting, the + * transaction is rolled back on all the workers. + * + * By default, COPY uses normal transactions on the workers. This can cause + * a problem when some of the transactions fail to commit while others have + * succeeded. To ensure no data is lost, COPY can use two-phase commit, by + * increasing max_prepared_transactions on the worker and setting + * citus.copy_transaction_manager to '2pc'. The default is '1pc'. + * + * Parsing options are processed and enforced on the master, while + * constraints are enforced on the worker. In either case, failure causes + * the whole COPY to roll back. * * Copyright (c) 2016, Citus Data, Inc. * + * With contributions from Postgres Professional. + * *------------------------------------------------------------------------- */ @@ -130,6 +153,8 @@ static void SendCopyDataToPlacements(StringInfo dataBuffer, static List * ConnectionList(HTAB *connectionHash); static void EndRemoteCopy(List *connectionList, bool stopOnFailure); static void ReportCopyError(PGconn *connection, PGresult *result); + +/* Private functions copied and adapted from copy.c in PostgreSQL */ static void CopySendData(CopyOutState outputState, const void *databuf, int datasize); static void CopySendString(CopyOutState outputState, const char *str); static void CopySendChar(CopyOutState outputState, char c); @@ -228,7 +253,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) if (partitionMethod == DISTRIBUTE_BY_HASH) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find any shards for query"), + errmsg("could not find any shards into which to copy"), errdetail("No shards exist for distributed table \"%s\".", relationName), errhint("Run master_create_worker_shards to create shards " @@ -237,7 +262,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) else { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find any shards for query"), + errmsg("could not find any shards into which to copy"), errdetail("No shards exist for distributed table \"%s\".", relationName))); } @@ -304,7 +329,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) Datum partitionColumnValue = 0; ShardInterval *shardInterval = NULL; int64 shardId = 0; - bool found = false; + bool shardConnectionsFound = false; MemoryContext oldContext = NULL; ResetPerTupleExprContext(executorState); @@ -313,7 +338,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) /* parse a row from the input */ nextRowFound = NextCopyFrom(copyState, executorExpressionContext, - columnValues,columnNulls, NULL); + columnValues, columnNulls, NULL); if (!nextRowFound) { @@ -342,7 +367,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) if (shardInterval == NULL) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("no shard for partition column value"))); + errmsg("could not find shard for partition column " + "value"))); } shardId = shardInterval->shardId; @@ -353,8 +379,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) shardConnections = (ShardConnections *) hash_search(shardConnectionHash, &shardId, HASH_ENTER, - &found); - if (!found) + &shardConnectionsFound); + if (!shardConnectionsFound) { /* intialize COPY transactions on shard placements */ shardConnections->shardId = shardId; @@ -393,29 +419,31 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) if (CopyTransactionManager == TRANSACTION_MANAGER_2PC) { - PrepareTransactions(connectionList); + PrepareRemoteTransactions(connectionList); } + EndCopyFrom(copyState); + heap_close(rel, NoLock); + + error_context_stack = errorCallback.previous; + CHECK_FOR_INTERRUPTS(); } PG_CATCH(); { + error_context_stack = errorCallback.previous; + /* roll back all transactions */ connectionList = ConnectionList(shardConnectionHash); EndRemoteCopy(connectionList, false); - AbortTransactions(connectionList); + AbortRemoteTransactions(connectionList); CloseConnections(connectionList); PG_RE_THROW(); } PG_END_TRY(); - EndCopyFrom(copyState); - heap_close(rel, NoLock); - - error_context_stack = errorCallback.previous; - - CommitTransactions(connectionList); + CommitRemoteTransactions(connectionList); CloseConnections(connectionList); if (completionTag != NULL) @@ -491,11 +519,16 @@ ShardIntervalCompareFunction(Var *partitionColumn, char partitionMethod) { compareFunction = GetFunctionInfo(INT4OID, BTREE_AM_OID, BTORDER_PROC); } - else + else if (partitionMethod == DISTRIBUTE_BY_RANGE) { compareFunction = GetFunctionInfo(partitionColumn->vartype, BTREE_AM_OID, BTORDER_PROC); } + else + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unsupported partition method %d", partitionMethod))); + } return compareFunction; } @@ -682,7 +715,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections /* if all placements failed, error out */ if (list_length(failedPlacementList) == list_length(finalizedPlacementList)) { - ereport(ERROR, (errmsg("could not modify any active placements"))); + ereport(ERROR, (errmsg("could not find any active placements"))); } /* otherwise, mark failed placements as inactive: they're stale */ @@ -792,8 +825,6 @@ EndRemoteCopy(List *connectionList, bool stopOnFailure) (TransactionConnection *) lfirst(connectionCell); PGconn *connection = transactionConnection->connection; int64 shardId = transactionConnection->connectionId; - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); int copyEndResult = 0; PGresult *result = NULL; @@ -811,6 +842,9 @@ EndRemoteCopy(List *connectionList, bool stopOnFailure) { if (stopOnFailure) { + char *nodeName = ConnectionGetOptionValue(connection, "host"); + char *nodePort = ConnectionGetOptionValue(connection, "port"); + ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("failed to COPY to shard %ld on %s:%s", shardId, nodeName, nodePort))); diff --git a/src/backend/distributed/utils/multi_transaction.c b/src/backend/distributed/utils/multi_transaction.c index 667542ada..5314ce9d9 100644 --- a/src/backend/distributed/utils/multi_transaction.c +++ b/src/backend/distributed/utils/multi_transaction.c @@ -25,12 +25,12 @@ static StringInfo BuildTransactionName(int connectionId); /* - * PrepareTransactions prepares all transactions on connections in + * PrepareRemoteTransactions prepares all transactions on connections in * connectionList for commit if the 2PC transaction manager is enabled. * On failure, it reports an error and stops. */ void -PrepareTransactions(List *connectionList) +PrepareRemoteTransactions(List *connectionList) { ListCell *connectionCell = NULL; @@ -68,11 +68,11 @@ PrepareTransactions(List *connectionList) /* - * AbortTransactions aborts all transactions on connections in connectionList. + * AbortRemoteTransactions aborts all transactions on connections in connectionList. * On failure, it reports a warning and continues to abort all of them. */ void -AbortTransactions(List *connectionList) +AbortRemoteTransactions(List *connectionList) { ListCell *connectionCell = NULL; @@ -118,11 +118,11 @@ AbortTransactions(List *connectionList) /* - * CommitTransactions commits all transactions on connections in connectionList. + * CommitRemoteTransactions commits all transactions on connections in connectionList. * On failure, it reports a warning and continues committing all of them. */ void -CommitTransactions(List *connectionList) +CommitRemoteTransactions(List *connectionList) { ListCell *connectionCell = NULL; diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 46afd4cb0..7567f1291 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -46,8 +46,10 @@ typedef struct CopyOutStateData *CopyOutState; /* function declarations for copying into a distributed table */ extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); -extern void BuildCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, - CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions); +extern void BuildCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc + rowDescriptor, + CopyOutState rowOutputState, + FmgrInfo *columnOutputFunctions); extern void BuildCopyBinaryHeaders(CopyOutState headerOutputState); extern void BuildCopyBinaryFooters(CopyOutState footerOutputState); extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag); diff --git a/src/include/distributed/multi_transaction.h b/src/include/distributed/multi_transaction.h index 827f0fcaa..533f151f9 100644 --- a/src/include/distributed/multi_transaction.h +++ b/src/include/distributed/multi_transaction.h @@ -48,9 +48,9 @@ typedef struct TransactionConnection /* Functions declarations for transaction and connection management */ -extern void PrepareTransactions(List *connectionList); -extern void AbortTransactions(List *connectionList); -extern void CommitTransactions(List *connectionList); +extern void PrepareRemoteTransactions(List *connectionList); +extern void AbortRemoteTransactions(List *connectionList); +extern void CommitRemoteTransactions(List *connectionList); extern void CloseConnections(List *connectionList); diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 534675df7..2f8be3756 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -20,7 +20,7 @@ SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash' -- Test COPY into empty hash-partitioned table COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|'); -ERROR: could not find any shards for query +ERROR: could not find any shards into which to copy DETAIL: No shards exist for distributed table "customer_copy_hash". HINT: Run master_create_worker_shards to create shards and try again. SELECT master_create_worker_shards('customer_copy_hash', 64, 1); @@ -172,7 +172,7 @@ SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'rang -- Test COPY into empty range-partitioned table COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|'); -ERROR: could not find any shards for query +ERROR: could not find any shards into which to copy DETAIL: No shards exist for distributed table "customer_copy_range". SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id \gset