From 83c83fdf1a5c75c12452194fbb6f778dae5ff4a5 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 25 Feb 2016 21:13:05 +0100 Subject: [PATCH] Add regression tests and refactor transaction management functions --- src/backend/distributed/commands/multi_copy.c | 471 ++++++++---------- src/backend/distributed/shared_library_init.c | 4 +- .../distributed/utils/multi_transaction.c | 208 ++++++++ .../distributed/utils/transaction_manager.c | 173 ------- src/include/distributed/multi_copy.h | 8 +- src/include/distributed/multi_transaction.h | 57 +++ src/include/distributed/transaction_manager.h | 49 -- src/test/regress/input/multi_copy.source | 141 ++++++ src/test/regress/multi_schedule | 6 + src/test/regress/output/multi_copy.source | 173 +++++++ 10 files changed, 792 insertions(+), 498 deletions(-) create mode 100644 src/backend/distributed/utils/multi_transaction.c delete mode 100644 src/backend/distributed/utils/transaction_manager.c create mode 100644 src/include/distributed/multi_transaction.h delete mode 100644 src/include/distributed/transaction_manager.h create mode 100644 src/test/regress/input/multi_copy.source create mode 100644 src/test/regress/output/multi_copy.source diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 6d3d46e73..701293bbe 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -43,9 +43,9 @@ #include "distributed/master_protocol.h" #include "distributed/multi_copy.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_transaction.h" #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" -#include "distributed/transaction_manager.h" #include "distributed/worker_protocol.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -97,7 +97,7 @@ int CopyTransactionManager = TRANSACTION_MANAGER_1PC; -/* Data structures copy.c, to keep track of COPY processing state */ +/* Data structures from copy.c, to keep track of COPY processing state */ typedef enum CopyDest { COPY_FILE, /* to/from file (or a piped program) */ @@ -227,14 +227,7 @@ typedef struct CopyStateData } CopyStateData; -/* Data structures for keeping track of connections to placements */ -typedef struct PlacementConnection -{ - int64 shardId; - bool prepared; - PGconn* connection; -} PlacementConnection; - +/* ShardConnections represents a set of connections for each placement of a shard */ typedef struct ShardConnections { int64 shardId; @@ -259,15 +252,13 @@ static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue, static void OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, int64 shardId); -static char * ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId); -static void AppendColumnNames(StringInfo buf, List *columnList); -static void AppendCopyOptions(StringInfo buf, List *copyOptionList); +static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId); +static void AppendColumnNames(StringInfo command, List *columnList); +static void AppendCopyOptions(StringInfo command, List *copyOptionList); static void CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections); static List * ConnectionList(HTAB *connectionHash); -static void PrepareCopyTransaction(List *connectionList); -static bool EndRemoteCopy(PGconn *connection); -static void AbortCopyTransaction(List *connectionList); -static void CommitCopyTransaction(List *connectionList); +static void EndRemoteCopy(List *connectionList, bool stopOnFailure); +static void ReportCopyError(PGconn *connection, PGresult *result); /* @@ -323,18 +314,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag) } } - /* load the list of shards and verify that we have shards to copy into */ - shardIntervalList = LoadShardIntervalList(tableId); - if (shardIntervalList == NIL) - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find any shards for query"), - errdetail("No shards exist for distributed table \"%s\".", - relationName), - errhint("Run master_create_worker_shards to create shards " - "and try again."))); - } - + partitionColumn = PartitionColumn(tableId, 0); partitionMethod = PartitionMethod(tableId); if (partitionMethod != DISTRIBUTE_BY_RANGE && partitionMethod != DISTRIBUTE_BY_HASH) { @@ -342,8 +322,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag) errmsg("COPY is only supported for hash- and range-partitioned tables"))); } - partitionColumn = PartitionColumn(tableId, 0); - /* resolve hash function for partition column */ typeEntry = lookup_type_cache(partitionColumn->vartype, TYPECACHE_HASH_PROC_FINFO); hashFunction = &(typeEntry->hash_proc_finfo); @@ -358,22 +336,58 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag) columnValues = palloc0(columnCount * sizeof(Datum)); columnNulls = palloc0(columnCount * sizeof(bool)); - /* - * We create a new memory context called tuple context, and read and write - * each row's values within this memory context. After each read and write, - * we reset the memory context. That way, we immediately release memory - * allocated for each row, and don't bloat memory usage with large input - * files. - */ - tupleContext = AllocSetContextCreate(CurrentMemoryContext, - "COPY Row Memory Context", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + /* load the list of shards and verify that we have shards to copy into */ + shardIntervalList = LoadShardIntervalList(tableId); + if (shardIntervalList == NIL) + { + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find any shards for query"), + errdetail("No shards exist for distributed table \"%s\".", + relationName), + errhint("Run master_create_worker_shards to create shards " + "and try again."))); + } + else + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find any shards for query"), + errdetail("No shards exist for distributed table \"%s\".", + relationName))); + } + } + /* create a mapping of shard id to a connection for each of its placements */ shardConnectionHash = CreateShardConnectionHash(); + /* lock shards in order of shard id to prevent deadlock */ + shardIntervalList = SortList(shardIntervalList, CompareTasksByShardId); + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + int64 shardId = shardInterval->shardId; + + /* prevent concurrent changes to number of placements */ + LockShardDistributionMetadata(shardId, ShareLock); + + /* prevent concurrent update/delete statements */ + LockShardResource(shardId, ShareLock); + } + + /* initialize the shard interval cache */ + shardCount = list_length(shardIntervalList); + shardIntervalCache = SortedShardIntervalArray(shardIntervalList); + + /* determine whether to use binary search */ + if (partitionMethod != DISTRIBUTE_BY_HASH || + !IsUniformHashDistribution(shardIntervalCache, shardCount)) + { + useBinarySearch = true; + } + /* initialize copy state to read from COPY data source */ copyState = BeginCopyFrom(rel, copyStatement->filename, copyStatement->is_program, @@ -392,32 +406,18 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag) errorCallback.previous = error_context_stack; error_context_stack = &errorCallback; - /* lock shards in order of shard id to prevent deadlock */ - shardIntervalList = SortList(shardIntervalList, CompareTasksByShardId); - - foreach(shardIntervalCell, shardIntervalList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - int64 shardId = shardInterval->shardId; - - /* prevent concurrent changes to number of placements */ - LockShardDistributionMetadata(shardId, ShareLock); - - /* prevent concurrent update/delete statements */ - LockShardResource(shardId, ShareLock); - - } - - /* initialize the shard interval cache */ - shardCount = list_length(shardIntervalList); - shardIntervalCache = SortedShardIntervalArray(shardIntervalList); - - /* determine whether to use binary search */ - if (partitionMethod != DISTRIBUTE_BY_HASH || - !IsUniformHashDistribution(shardIntervalCache, shardCount)) - { - useBinarySearch = true; - } + /* + * We create a new memory context called tuple context, and read and write + * each row's values within this memory context. After each read and write, + * we reset the memory context. That way, we immediately release memory + * allocated for each row, and don't bloat memory usage with large input + * files. + */ + tupleContext = AllocSetContextCreate(CurrentMemoryContext, + "COPY Row Memory Context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); /* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */ PG_TRY(); @@ -494,9 +494,14 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag) MemoryContextReset(tupleContext); } - /* prepare two phase commit in replicas */ connectionList = ConnectionList(shardConnectionHash); - PrepareCopyTransaction(connectionList); + + EndRemoteCopy(connectionList, true); + + if (CopyTransactionManager == TRANSACTION_MANAGER_2PC) + { + PrepareTransactions(connectionList); + } } PG_CATCH(); { @@ -505,7 +510,10 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag) /* roll back all transactions */ connectionList = ConnectionList(shardConnectionHash); - AbortCopyTransaction(connectionList); + EndRemoteCopy(connectionList, false); + AbortTransactions(connectionList); + CloseConnections(connectionList); + PG_RE_THROW(); } PG_END_TRY(); @@ -517,15 +525,17 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag) if (QueryCancelPending) { - AbortCopyTransaction(connectionList); + AbortTransactions(connectionList); ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("canceling statement due to user request"))); } else { - CommitCopyTransaction(connectionList); + CommitTransactions(connectionList); } + CloseConnections(connectionList); + if (completionTag != NULL) { snprintf(completionTag, COMPLETION_TAG_BUFSIZE, @@ -703,9 +713,6 @@ static void OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, int64 shardId) { - CitusTransactionManager const *transactionManager = - &CitusTransactionManagerImpl[CopyTransactionManager]; - List *finalizedPlacementList = NIL; List *failedPlacementList = NIL; ListCell *placementCell = NULL; @@ -719,43 +726,42 @@ OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); char *nodeName = placement->nodeName; int nodePort = placement->nodePort; + TransactionConnection *transactionConnection = NULL; + StringInfo copyCommand = NULL; + PGresult *result = NULL; PGconn *connection = ConnectToNode(nodeName, nodePort); - if (connection != NULL) - { - char *copyCommand = ConstructCopyStatement(copyStatement, shardId); - - /* - * New connection: start transaction with copy command on it. - * Append shard id to table name. - */ - if (transactionManager->Begin(connection) && - ExecuteCommand(connection, PGRES_COPY_IN, copyCommand)) - { - PlacementConnection *placementConnection = - (PlacementConnection *) palloc0(sizeof(PlacementConnection)); - - placementConnection->shardId = shardId; - placementConnection->prepared = false; - placementConnection->connection = connection; - - connectionList = lappend(connectionList, placementConnection); - } - else - { - failedPlacementList = lappend(failedPlacementList, placement); - ereport(WARNING, (errcode(ERRCODE_IO_ERROR), - errmsg("Failed to start '%s' on node %s:%d", - copyCommand, nodeName, nodePort))); - } - } - else + if (connection == NULL) { failedPlacementList = lappend(failedPlacementList, placement); - ereport(WARNING, (errcode(ERRCODE_IO_ERROR), - errmsg("Failed to connect to node %s:%d", - nodeName, nodePort))); + continue; } + + result = PQexec(connection, "BEGIN"); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + ReportRemoteError(connection, result); + failedPlacementList = lappend(failedPlacementList, placement); + continue; + } + + copyCommand = ConstructCopyStatement(copyStatement, shardId); + + result = PQexec(connection, copyCommand->data); + if (PQresultStatus(result) != PGRES_COPY_IN) + { + ReportRemoteError(connection, result); + failedPlacementList = lappend(failedPlacementList, placement); + continue; + } + + transactionConnection = palloc0(sizeof(TransactionConnection)); + + transactionConnection->connectionId = shardId; + transactionConnection->transactionState = TRANSACTION_STATE_COPY_STARTED; + transactionConnection->connection = connection; + + connectionList = lappend(connectionList, transactionConnection); } /* if all placements failed, error out */ @@ -778,7 +784,6 @@ OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections shardConnections->shardId = shardId; shardConnections->connectionList = connectionList; - } @@ -786,98 +791,80 @@ OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections * ConstructCopyStattement constructs the text of a COPY statement for a particular * shard. */ -static char * +static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId) { - StringInfo buf = makeStringInfo(); + StringInfo command = makeStringInfo(); char *qualifiedName = NULL; qualifiedName = quote_qualified_identifier(copyStatement->relation->schemaname, copyStatement->relation->relname); - appendStringInfo(buf, "COPY %s_%ld ", qualifiedName, (long) shardId); + appendStringInfo(command, "COPY %s_%ld ", qualifiedName, shardId); if (copyStatement->attlist != NIL) { - AppendColumnNames(buf, copyStatement->attlist); + AppendColumnNames(command, copyStatement->attlist); } - appendStringInfoString(buf, "FROM STDIN"); + appendStringInfoString(command, "FROM STDIN"); if (copyStatement->options) { - appendStringInfoString(buf, " WITH "); + appendStringInfoString(command, " WITH "); - AppendCopyOptions(buf, copyStatement->options); + AppendCopyOptions(command, copyStatement->options); } - return buf->data; + return command; } /* - * AppendCopyOptions deparses a list of CopyStmt options and appends them to buf. + * AppendCopyOptions deparses a list of CopyStmt options and appends them to command. */ static void -AppendCopyOptions(StringInfo buf, List *copyOptionList) +AppendCopyOptions(StringInfo command, List *copyOptionList) { ListCell *optionCell = NULL; char separator = '('; foreach(optionCell, copyOptionList) { - DefElem *defel = (DefElem *) lfirst(optionCell); + DefElem *option = (DefElem *) lfirst(optionCell); - if (strcmp(defel->defname, "header") == 0 && defGetBoolean(defel)) + if (strcmp(option->defname, "header") == 0 && defGetBoolean(option)) { /* worker should not skip header again */ continue; } - appendStringInfo(buf, "%c%s ", separator, defel->defname); + appendStringInfo(command, "%c%s ", separator, option->defname); - if (strcmp(defel->defname, "force_quote") == 0) + if (strcmp(option->defname, "force_not_null") == 0 || + strcmp(option->defname, "force_null") == 0) { - if (!defel->arg) + if (!option->arg) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", - defel->defname))); - } - else if (IsA(defel->arg, A_Star)) - { - appendStringInfoString(buf, "*"); + option->defname))); } else { - AppendColumnNames(buf, (List *) defel->arg); - } - } - else if (strcmp(defel->defname, "force_not_null") == 0 || - strcmp(defel->defname, "force_null") == 0) - { - if (!defel->arg) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("argument to option \"%s\" must be a list of column names", - defel->defname))); - } - else - { - AppendColumnNames(buf, (List *) defel->arg); + AppendColumnNames(command, (List *) option->arg); } } else { - appendStringInfoString(buf, defGetString(defel)); + appendStringInfo(command, "'%s'", defGetString(option)); } separator = ','; } - appendStringInfoChar(buf, ')'); + appendStringInfoChar(command, ')'); } @@ -885,7 +872,7 @@ AppendCopyOptions(StringInfo buf, List *copyOptionList) * AppendColumnList deparses a list of column names into a StringInfo. */ static void -AppendColumnNames(StringInfo buf, List *columnList) +AppendColumnNames(StringInfo command, List *columnList) { ListCell *attributeCell = NULL; char separator = '('; @@ -893,11 +880,11 @@ AppendColumnNames(StringInfo buf, List *columnList) foreach(attributeCell, columnList) { char *columnName = strVal(lfirst(attributeCell)); - appendStringInfo(buf, "%c%s", separator, quote_identifier(columnName)); + appendStringInfo(command, "%c%s", separator, quote_identifier(columnName)); separator = ','; } - appendStringInfoChar(buf, ')'); + appendStringInfoChar(command, ')'); } @@ -910,9 +897,9 @@ CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections) ListCell *connectionCell = NULL; foreach(connectionCell, shardConnections->connectionList) { - PlacementConnection *placementConnection = - (PlacementConnection *) lfirst(connectionCell); - PGconn *connection = placementConnection->connection; + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; int64 shardId = shardConnections->shardId; /* copy the line buffer into the placement */ @@ -922,8 +909,8 @@ CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections) char *nodeName = ConnectionGetOptionValue(connection, "host"); char *nodePort = ConnectionGetOptionValue(connection, "port"); ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("COPY to shard %ld on %s:%s failed", - (long) shardId, nodeName, nodePort))); + errmsg("Failed to COPY to shard %ld on %s:%s", + shardId, nodeName, nodePort))); } } } @@ -945,10 +932,10 @@ ConnectionList(HTAB *connectionHash) ListCell *connectionCell = NULL; foreach(connectionCell, shardConnections->connectionList) { - PlacementConnection *placementConnection = - (PlacementConnection *) lfirst(connectionCell); + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); - connectionList = lappend(connectionList, placementConnection); + connectionList = lappend(connectionList, transactionConnection); } } @@ -957,145 +944,91 @@ ConnectionList(HTAB *connectionHash) /* - * End copy and prepare transaction. - * This function is applied for each shard placement unless some error happen. - * Status of this function is stored in ShardConnections::status field + * EndRemoteCopy ends the COPY input on all connections. If stopOnFailure + * is true, then EndRemoteCopy reports an error on failure, otherwise it + * reports a warning. */ static void -PrepareCopyTransaction(List *connectionList) +EndRemoteCopy(List *connectionList, bool stopOnFailure) { - CitusTransactionManager const *transactionManager = - &CitusTransactionManagerImpl[CopyTransactionManager]; - ListCell *connectionCell = NULL; + foreach(connectionCell, connectionList) { - PlacementConnection *placementConnection = - (PlacementConnection *) lfirst(connectionCell); - PGconn *connection = placementConnection->connection; - int64 shardId = placementConnection->shardId; - - if (EndRemoteCopy(connection) && - transactionManager->Prepare(connection, BuildTransactionId(shardId))) - { - placementConnection->prepared = true; - } - else - { - ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("Failed to prepare transaction for shard %ld", - (long) shardId))); - } - } -} - - -/* - * EndRemoteCopy sends PQputCopyEnd command to the client and checks the result. - */ -static bool -EndRemoteCopy(PGconn *connection) -{ - PGresult *result = NULL; - - int copyEndResult = PQputCopyEnd(connection, NULL); - if (copyEndResult != 1) - { - return false; - } - - while ((result = PQgetResult(connection)) != NULL) - { - int resultStatus = PQresultStatus(result); - if (resultStatus != PGRES_COMMAND_OK) - { - ReportRemoteError(connection, result); - return false; - } - PQclear(result); - } - - return true; -} - - -/* - * AbortCopyTransaction aborts a two-phase commit. It attempts to roll back - * all transactions even if some of them fail, in which case a warning is given - * for each of them. - */ -static void -AbortCopyTransaction(List *connectionList) -{ - CitusTransactionManager const *transactionManager = - &CitusTransactionManagerImpl[CopyTransactionManager]; - - ListCell *connectionCell = NULL; - foreach(connectionCell, connectionList) - { - PlacementConnection *placementConnection = - (PlacementConnection *) lfirst(connectionCell); - PGconn *connection = placementConnection->connection; - int64 shardId = placementConnection->shardId; + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + int64 shardId = transactionConnection->connectionId; char *nodeName = ConnectionGetOptionValue(connection, "host"); char *nodePort = ConnectionGetOptionValue(connection, "port"); + int copyEndResult = 0; + PGresult *result = NULL; - if (placementConnection->prepared) + if (transactionConnection->transactionState != TRANSACTION_STATE_COPY_STARTED) { - char *transactionId = BuildTransactionId(shardId); + /* COPY already ended during the prepare phase */ + continue; + } - if (!transactionManager->RollbackPrepared(connection, transactionId)) + /* end the COPY input */ + copyEndResult = PQputCopyEnd(connection, NULL); + transactionConnection->transactionState = TRANSACTION_STATE_OPEN; + + if (copyEndResult != 1) + { + if (stopOnFailure) { - ereport(WARNING, (errcode(ERRCODE_IO_ERROR), - errmsg("Failed to roll back transaction '%s' on %s:%s", - transactionId, nodeName, nodePort))); + ereport(ERROR, (errcode(ERRCODE_IO_ERROR), + errmsg("Failed to COPY to shard %ld on %s:%s", + shardId, nodeName, nodePort))); } - } - else if (!EndRemoteCopy(connection) && - !transactionManager->Rollback(connection)) - { - ereport(WARNING, (errcode(ERRCODE_IO_ERROR), - errmsg("Failed to COPY to shard %ld on %s:%s", - shardId, nodeName, nodePort))); + + continue; } - PQfinish(connection); + /* check whether there were any COPY errors */ + result = PQgetResult(connection); + if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure) + { + ReportCopyError(connection, result); + } + + PQclear(result); } } /* - * CommitCopyTransaction commits a two-phase commit. It attempts to commit all - * transactionsm even if some of them fail, in which case a warning is given - * for each of them. + * ReportCopyError tries to report a useful error message for the user from + * the remote COPY error messages. */ static void -CommitCopyTransaction(List *connectionList) +ReportCopyError(PGconn *connection, PGresult *result) { - CitusTransactionManager const *transactionManager = - &CitusTransactionManagerImpl[CopyTransactionManager]; + char *remoteMessage = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY); - ListCell *connectionCell = NULL; - foreach(connectionCell, connectionList) + if (remoteMessage != NULL) { - PlacementConnection *placementConnection = - (PlacementConnection *) lfirst(connectionCell); - PGconn *connection = placementConnection->connection; - int64 shardId = placementConnection->shardId; - char *transactionId = BuildTransactionId(shardId); + /* probably a constraint violation, show remote message and detail */ + char *remoteDetail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL); - Assert(placementConnection->prepared); + ereport(ERROR, (errmsg("%s", remoteMessage), + errdetail("%s", remoteDetail))); + } + else + { + /* probably a connection problem, get the message from the connection */ + char *lastNewlineIndex = NULL; - if (!transactionManager->CommitPrepared(connection, transactionId)) + remoteMessage = PQerrorMessage(connection); + lastNewlineIndex = strrchr(remoteMessage, '\n'); + + /* trim trailing newline, if any */ + if (lastNewlineIndex != NULL) { - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); - ereport(WARNING, (errcode(ERRCODE_IO_ERROR), - errmsg("Failed to commit transaction '%s' on %s:%s", - transactionId, nodeName, nodePort))); + *lastNewlineIndex = '\0'; } - PQfinish(connection); + ereport(ERROR, (errmsg("%s", remoteMessage))); } } - diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 0a5db8697..b941b78fd 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -26,11 +26,11 @@ #include "distributed/multi_join_order.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_planner.h" -#include "distributed/multi_router_executor.h" +#include "distributed/multi_router_executor.h" #include "distributed/multi_server_executor.h" +#include "distributed/multi_transaction.h" #include "distributed/multi_utility.h" #include "distributed/task_tracker.h" -#include "distributed/transaction_manager.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "postmaster/postmaster.h" diff --git a/src/backend/distributed/utils/multi_transaction.c b/src/backend/distributed/utils/multi_transaction.c new file mode 100644 index 000000000..b359084da --- /dev/null +++ b/src/backend/distributed/utils/multi_transaction.c @@ -0,0 +1,208 @@ +/*------------------------------------------------------------------------- + * + * multi_transaction.c + * This file contains functions for managing 1PC or 2PC transactions + * across many shard placements. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "libpq-fe.h" +#include "miscadmin.h" + +#include "access/xact.h" +#include "distributed/connection_cache.h" +#include "distributed/multi_transaction.h" +#include "lib/stringinfo.h" +#include "nodes/pg_list.h" + + +/* Local functions forward declarations */ +static StringInfo BuildTransactionName(int connectionId); + + +/* + * PrepareTransactions prepares all transactions on connections in + * connectionList for commit if the 2PC transaction manager is enabled. + * On failure, it reports an error and stops. + */ +void +PrepareTransactions(List *connectionList) +{ + ListCell *connectionCell = NULL; + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + int64 connectionId = transactionConnection->connectionId; + + PGresult *result = NULL; + StringInfo command = makeStringInfo(); + StringInfo transactionName = BuildTransactionName(connectionId); + + appendStringInfo(command, "PREPARE TRANSACTION '%s'", transactionName->data); + + result = PQexec(connection, command->data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + /* a failure to prepare is an implicit rollback */ + transactionConnection->transactionState = TRANSACTION_STATE_CLOSED; + + ReportRemoteError(connection, result); + + ereport(ERROR, (errcode(ERRCODE_IO_ERROR), + errmsg("Failed to prepare transaction"))); + } + + transactionConnection->transactionState = TRANSACTION_STATE_PREPARED; + } +} + + +/* + * AbortTransactions aborts all transactions on connections in connectionList. + * On failure, it reports a warning and continues to abort all of them. + */ +void +AbortTransactions(List *connectionList) +{ + ListCell *connectionCell = NULL; + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + int64 connectionId = transactionConnection->connectionId; + char *nodeName = ConnectionGetOptionValue(connection, "host"); + char *nodePort = ConnectionGetOptionValue(connection, "port"); + PGresult *result = NULL; + + if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED) + { + StringInfo command = makeStringInfo(); + StringInfo transactionName = BuildTransactionName(connectionId); + + appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName->data); + + result = PQexec(connection, command->data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + /* log a warning so the user may abort the transaction later */ + ereport(WARNING, (errmsg("Failed to roll back prepared transaction '%s'", + transactionName->data), + errhint("Run ROLLBACK TRANSACTION '%s' on %s:%s", + transactionName->data, nodeName, nodePort))); + } + + PQclear(result); + } + else if (transactionConnection->transactionState == TRANSACTION_STATE_OPEN) + { + /* try to roll back cleanly, if it fails then we won't commit anyway */ + result = PQexec(connection, "ROLLBACK"); + PQclear(result); + } + + transactionConnection->transactionState = TRANSACTION_STATE_CLOSED; + } +} + + +/* + * CommitTransactions commits all transactions on connections in connectionList. + * On failure, it reports a warning and continues committing all of them. + */ +void +CommitTransactions(List *connectionList) +{ + ListCell *connectionCell = NULL; + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + int64 connectionId = transactionConnection->connectionId; + char *nodeName = ConnectionGetOptionValue(connection, "host"); + char *nodePort = ConnectionGetOptionValue(connection, "port"); + PGresult *result = NULL; + + if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED) + { + StringInfo command = makeStringInfo(); + StringInfo transactionName = BuildTransactionName(connectionId); + + /* we shouldn't be committing if any transactions are not prepared */ + Assert(transactionConnection->transactionState == TRANSACTION_STATE_PREPARED); + + appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName->data); + + result = PQexec(connection, command->data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + /* log a warning so the user may commit the transaction later */ + ereport(WARNING, (errmsg("Failed to commit prepared transaction '%s'", + transactionName->data), + errhint("Run COMMIT TRANSACTION '%s' on %s:%s", + transactionName->data, nodeName, nodePort))); + } + } + else + { + /* we shouldn't be committing if any transactions are not open */ + Assert(transactionConnection->transactionState == TRANSACTION_STATE_OPEN); + + /* try to commit, if it fails then the user might lose data */ + result = PQexec(connection, "COMMIT"); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + ereport(WARNING, (errmsg("Failed to commit transaction on %s:%s", + nodeName, nodePort))); + } + } + + PQclear(result); + + transactionConnection->transactionState = TRANSACTION_STATE_CLOSED; + } +} + + +/* + * BuildTransactionName constructs a unique transaction name from an ID. + */ +static StringInfo +BuildTransactionName(int connectionId) +{ + StringInfo commandString = makeStringInfo(); + + appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid, + GetCurrentTransactionId(), connectionId); + + return commandString; +} + + +/* + * CloseConnections closes all connections in connectionList. + */ +void +CloseConnections(List *connectionList) +{ + ListCell *connectionCell = NULL; + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + + PQfinish(connection); + } +} diff --git a/src/backend/distributed/utils/transaction_manager.c b/src/backend/distributed/utils/transaction_manager.c deleted file mode 100644 index e89aba54b..000000000 --- a/src/backend/distributed/utils/transaction_manager.c +++ /dev/null @@ -1,173 +0,0 @@ -/*------------------------------------------------------------------------- - * - * transaction_manager.c - * This file contains functions that comprise a pluggable API for - * managing transactions across many worker nodes using 1PC or 2PC. - * - * Contributed by Konstantin Knizhnik, Postgres Professional - * - * Copyright (c) 2016, Citus Data, Inc. - * - *------------------------------------------------------------------------- - */ - -#include "postgres.h" -#include "libpq-fe.h" -#include "miscadmin.h" - -#include "access/xact.h" -#include "distributed/connection_cache.h" -#include "distributed/transaction_manager.h" - -static bool BeginTransaction(PGconn *connection); -static bool Prepare1PC(PGconn *connection, char *transactionId); -static bool CommitPrepared1PC(PGconn *connection, char *transactionId); -static bool RollbackPrepared1PC(PGconn *connection, char *transactionId); -static bool RollbackTransaction(PGconn *connection); - -static bool Prepare2PC(PGconn *connection, char *transactionId); -static bool CommitPrepared2PC(PGconn *connection, char *transactionId); -static bool RollbackPrepared2PC(PGconn *connection, char *transactionId); - -static char * Build2PCCommand(char const *command, char *transactionId); - -CitusTransactionManager const CitusTransactionManagerImpl[] = -{ - { BeginTransaction, Prepare1PC, CommitPrepared1PC, - RollbackPrepared1PC, RollbackTransaction }, - { BeginTransaction, Prepare2PC, CommitPrepared2PC, - RollbackPrepared2PC, RollbackTransaction } -}; - - -/* - * BeginTransaction sends a BEGIN command to start a transaction. - */ -static bool -BeginTransaction(PGconn *connection) -{ - return ExecuteCommand(connection, PGRES_COMMAND_OK, "BEGIN"); -} - - -/* - * Prepare1PC does nothing since 1PC mode does not have a prepare phase. - * This function is provided for compatibility with the 2PC API. - */ -static bool -Prepare1PC(PGconn *connection, char *transactionId) -{ - return true; -} - - -/* - * Commit1PC sends a COMMIT command to commit a transaction. - */ -static bool -CommitPrepared1PC(PGconn *connection, char *transactionId) -{ - return ExecuteCommand(connection, PGRES_COMMAND_OK, "COMMIT"); -} - - -/* - * RollbackPrepared1PC sends a ROLLBACK command to roll a transaction - * back. This function is provided for compatibility with the 2PC API. - */ -static bool -RollbackPrepared1PC(PGconn *connection, char *transactionId) -{ - return ExecuteCommand(connection, PGRES_COMMAND_OK, "ROLLBACK"); -} - - -/* - * Prepare2PC sends a PREPARE TRANSACTION command to prepare a 2PC. - */ -static bool -Prepare2PC(PGconn *connection, char *transactionId) -{ - return ExecuteCommand(connection, PGRES_COMMAND_OK, - Build2PCCommand("PREPARE TRANSACTION", transactionId)); -} - - -/* - * CommitPrepared2PC sends a COMMIT TRANSACTION command to commit a 2PC. - */ -static bool -CommitPrepared2PC(PGconn *connection, char *transactionId) -{ - return ExecuteCommand(connection, PGRES_COMMAND_OK, - Build2PCCommand("COMMIT PREPARED", transactionId)); -} - - -/* - * RollbackPrepared2PC sends a COMMIT TRANSACTION command to commit a 2PC. - */ -static bool -RollbackPrepared2PC(PGconn *connection, char *transactionId) -{ - return ExecuteCommand(connection, PGRES_COMMAND_OK, - Build2PCCommand("ROLLBACK PREPARED", transactionId)); -} - - -/* - * RollbackTransaction sends a ROLLBACK command to roll a transaction back. - */ -static bool -RollbackTransaction(PGconn *connection) -{ - return ExecuteCommand(connection, PGRES_COMMAND_OK, "ROLLBACK"); -} - - -/* - * Build2PCCommand builds a command with a unique transaction ID for a two-phase commit. - */ -static char * -Build2PCCommand(char const *command, char *transactionId) -{ - StringInfo commandString = makeStringInfo(); - - appendStringInfo(commandString, "%s '%s'", transactionId); - - return commandString->data; -} - - -/* - * BuildTransactionId helps users construct a unique transaction id from an - * application-specific id. - */ -char * -BuildTransactionId(int localId) -{ - StringInfo commandString = makeStringInfo(); - - appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid, - GetCurrentTransactionId(), localId); - - return commandString->data; -} - - -/* - * ExecuteCommand executes a statement on a remote node and checks its result. - */ -bool -ExecuteCommand(PGconn *connection, ExecStatusType expectedResult, char const *command) -{ - bool ret = true; - PGresult *result = PQexec(connection, command); - if (PQresultStatus(result) != expectedResult) - { - ReportRemoteError(connection, result); - ret = false; - } - PQclear(result); - return ret; -} diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index c9f06c33a..2c754e739 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -1,9 +1,8 @@ /*------------------------------------------------------------------------- * * multi_copy.h - * - * Declarations for public functions and variables used in COPY for - * distributed tables. + * Declarations for public functions and variables used in COPY for + * distributed tables. * * Copyright (c) 2016, Citus Data, Inc. * @@ -14,8 +13,7 @@ #define MULTI_COPY_H -#include "libpq-fe.h" -#include "distributed/transaction_manager.h" +#include "nodes/parsenodes.h" /* config variable managed via guc.c */ diff --git a/src/include/distributed/multi_transaction.h b/src/include/distributed/multi_transaction.h new file mode 100644 index 000000000..e3fe74ad8 --- /dev/null +++ b/src/include/distributed/multi_transaction.h @@ -0,0 +1,57 @@ +/*------------------------------------------------------------------------- + * + * multi_transaction.h + * Type and function declarations used in performing transactions across + * shard placements. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef MULTI_TRANSACTION_H +#define MULTI_TRANSACTION_H + + +#include "libpq-fe.h" +#include "lib/stringinfo.h" +#include "nodes/pg_list.h" + + +/* Enumeration that defines the different transaction managers available */ +typedef enum +{ + TRANSACTION_MANAGER_1PC = 0, + TRANSACTION_MANAGER_2PC = 1 +} TransactionManagerType; + +/* Enumeration that defines different remote transaction states */ +typedef enum +{ + TRANSACTION_STATE_INVALID = 0, + TRANSACTION_STATE_OPEN, + TRANSACTION_STATE_COPY_STARTED, + TRANSACTION_STATE_PREPARED, + TRANSACTION_STATE_CLOSED +} TransactionState; + +/* + * TransactionConnection represents a connection to a remote node which is + * used to perform a transaction on shard placements. + */ +typedef struct TransactionConnection +{ + int64 connectionId; + TransactionState transactionState; + PGconn* connection; +} TransactionConnection; + + +/* Functions declarations for transaction and connection management */ +extern void PrepareTransactions(List *connectionList); +extern void AbortTransactions(List *connectionList); +extern void CommitTransactions(List *connectionList); +extern void CloseConnections(List *connectionList); + + +#endif /* MULTI_TRANSACTION_H */ diff --git a/src/include/distributed/transaction_manager.h b/src/include/distributed/transaction_manager.h deleted file mode 100644 index d3e6834ed..000000000 --- a/src/include/distributed/transaction_manager.h +++ /dev/null @@ -1,49 +0,0 @@ -/*------------------------------------------------------------------------- - * - * transaction_manager.h - * - * Transaction manager API. - * - * Copyright (c) 2016, Citus Data, Inc. - * - *------------------------------------------------------------------------- - */ - -#ifndef TRANSACTION_MANAGER_H -#define TRANSACTION_MANAGER_H - - -#include "libpq-fe.h" -#include "utils/guc.h" - - -/* pluggable transaction manager API */ -typedef struct CitusTransactionManager -{ - bool (*Begin)(PGconn *conn); - bool (*Prepare)(PGconn *conn, char *transactionId); - bool (*CommitPrepared)(PGconn *conn, char *transactionId); - bool (*RollbackPrepared)(PGconn *conn, char *transactionId); - bool (*Rollback)(PGconn *conn); -} CitusTransactionManager; - - -/* Enumeration that defines the transaction manager to use */ -typedef enum -{ - TRANSACTION_MANAGER_1PC = 0, - TRANSACTION_MANAGER_2PC = 1 -} TransactionManagerType; - - -/* Implementations of the transaction manager API */ -extern CitusTransactionManager const CitusTransactionManagerImpl[]; - - -/* Function declarations for copying into a distributed table */ -extern bool ExecuteCommand(PGconn *connection, ExecStatusType expectedResult, - char const *command); -extern char * BuildTransactionId(int localId); - - -#endif /* TRANSACTION_MANAGER_H */ diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source new file mode 100644 index 000000000..d1cea441f --- /dev/null +++ b/src/test/regress/input/multi_copy.source @@ -0,0 +1,141 @@ +-- +-- MULTI_COPY +-- +-- Create a new hash-partitioned table into which to COPY +CREATE TABLE customer_copy_hash ( + c_custkey integer, + c_name varchar(25) not null, + c_address varchar(40), + c_nationkey integer, + c_phone char(15), + c_acctbal decimal(15,2), + c_mktsegment char(10), + c_comment varchar(117), + primary key (c_custkey)); +SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash'); + +-- Test COPY into empty hash-partitioned table +COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|'); + +SELECT master_create_worker_shards('customer_copy_hash', 64, 1); + +-- Test empty copy +COPY customer_copy_hash FROM STDIN; +\. + +-- Test syntax error +COPY customer_copy_hash (c_custkey,c_name) FROM STDIN; +1,customer1 +2,customer2, +notinteger,customernot +\. + +-- Confirm that no data was copied +SELECT count(*) FROM customer_copy_hash; + +-- Test primary key violation +COPY customer_copy_hash (c_custkey, c_name) FROM STDIN +WITH (FORMAT 'csv'); +1,customer1 +2,customer2 +2,customer2 +\. + +-- Confirm that no data was copied +SELECT count(*) FROM customer_copy_hash; + +-- Test headers option +COPY customer_copy_hash (c_custkey, c_name) FROM STDIN +WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey)); +# header +1,customer1 +2,customer2 +3,customer3 +\. + +-- Confirm that only first row was skipped +SELECT count(*) FROM customer_copy_hash; + +-- Test force_not_null option +COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN +WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address)); +"4","customer4","" +\. + +-- Confirm that value is not null +SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4; + +-- Test force_null option +COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN +WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address)); +"5","customer5","" +\. + +-- Confirm that value is null +SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5; + +-- Test null violation +COPY customer_copy_hash (c_custkey, c_name) FROM STDIN +WITH (FORMAT 'csv'); +6,customer6 +7,customer7 +8, +\. + +-- Confirm that no data was copied +SELECT count(*) FROM customer_copy_hash; + +-- Test server-side copy from program +COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9' +WITH (DELIMITER ' '); + +-- Confirm that data was copied +SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9; + +-- Test server-side copy from file +COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.2.data' WITH (DELIMITER '|'); + +-- Confirm that data was copied +SELECT count(*) FROM customer_copy_hash; + +-- Test client-side copy from file +\COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.3.data' WITH (DELIMITER '|'); + +-- Confirm that data was copied +SELECT count(*) FROM customer_copy_hash; + +-- Create a new range-partitioned table into which to COPY +CREATE TABLE customer_copy_range ( + c_custkey integer, + c_name varchar(25), + c_address varchar(40), + c_nationkey integer, + c_phone char(15), + c_acctbal decimal(15,2), + c_mktsegment char(10), + c_comment varchar(117), + primary key (c_custkey)); + +SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range'); + +-- Test COPY into empty range-partitioned table +COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|'); + +SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500 +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000 +WHERE shardid = :new_shard_id; + +-- Test copy into range-partitioned table +COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|'); + +-- Check whether data went into the right shard (maybe) +SELECT count(*) FROM customer_copy_range WHERE c_custkey = 1; + +-- Check whether data was copied +SELECT count(*) FROM customer_copy_range; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 39ea82a34..018a0ce75 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -99,6 +99,7 @@ test: multi_append_table_to_shard # --------- test: multi_outer_join +# # --- # Tests covering mostly modification queries and required preliminary # functionality related to metadata, shard creation, shard pruning and @@ -119,6 +120,11 @@ test: multi_utilities test: multi_create_insert_proxy test: multi_data_types +# --------- +# multi_copy creates hash and range-partitioned tables and performs COPY +# --------- +test: multi_copy + # ---------- # multi_large_shardid stages more shards into lineitem # ---------- diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source new file mode 100644 index 000000000..19ce6670b --- /dev/null +++ b/src/test/regress/output/multi_copy.source @@ -0,0 +1,173 @@ +-- +-- MULTI_COPY +-- +-- Create a new hash-partitioned table into which to COPY +CREATE TABLE customer_copy_hash ( + c_custkey integer, + c_name varchar(25) not null, + c_address varchar(40), + c_nationkey integer, + c_phone char(15), + c_acctbal decimal(15,2), + c_mktsegment char(10), + c_comment varchar(117), + primary key (c_custkey)); +SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +-- Test COPY into empty hash-partitioned table +COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|'); +ERROR: could not find any shards for query +DETAIL: No shards exist for distributed table "customer_copy_hash". +HINT: Run master_create_worker_shards to create shards and try again. +SELECT master_create_worker_shards('customer_copy_hash', 64, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +-- Test empty copy +COPY customer_copy_hash FROM STDIN; +-- Test syntax error +COPY customer_copy_hash (c_custkey,c_name) FROM STDIN; +ERROR: invalid input syntax for integer: "1,customer1" +CONTEXT: COPY customer_copy_hash, line 1, column c_custkey: "1,customer1" +-- Confirm that no data was copied +SELECT count(*) FROM customer_copy_hash; + count +------- + 0 +(1 row) + +-- Test primary key violation +COPY customer_copy_hash (c_custkey, c_name) FROM STDIN +WITH (FORMAT 'csv'); +ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_103160" +DETAIL: Key (c_custkey)=(2) already exists. +CONTEXT: COPY customer_copy_hash, line 4: "" +-- Confirm that no data was copied +SELECT count(*) FROM customer_copy_hash; + count +------- + 0 +(1 row) + +-- Test headers option +COPY customer_copy_hash (c_custkey, c_name) FROM STDIN +WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey)); +-- Confirm that only first row was skipped +SELECT count(*) FROM customer_copy_hash; + count +------- + 3 +(1 row) + +-- Test force_not_null option +COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN +WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address)); +-- Confirm that value is not null +SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4; + count +------- + 1 +(1 row) + +-- Test force_null option +COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN +WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address)); +-- Confirm that value is null +SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5; + count +------- + 0 +(1 row) + +-- Test null violation +COPY customer_copy_hash (c_custkey, c_name) FROM STDIN +WITH (FORMAT 'csv'); +ERROR: null value in column "c_name" violates not-null constraint +DETAIL: Failing row contains (8, null, null, null, null, null, null, null). +CONTEXT: COPY customer_copy_hash, line 4: "" +-- Confirm that no data was copied +SELECT count(*) FROM customer_copy_hash; + count +------- + 5 +(1 row) + +-- Test server-side copy from program +COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9' +WITH (DELIMITER ' '); +-- Confirm that data was copied +SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9; + count +------- + 1 +(1 row) + +-- Test server-side copy from file +COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.2.data' WITH (DELIMITER '|'); +-- Confirm that data was copied +SELECT count(*) FROM customer_copy_hash; + count +------- + 1006 +(1 row) + +-- Test client-side copy from file +\COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.3.data' WITH (DELIMITER '|'); +-- Confirm that data was copied +SELECT count(*) FROM customer_copy_hash; + count +------- + 2006 +(1 row) + +-- Create a new range-partitioned table into which to COPY +CREATE TABLE customer_copy_range ( + c_custkey integer, + c_name varchar(25), + c_address varchar(40), + c_nationkey integer, + c_phone char(15), + c_acctbal decimal(15,2), + c_mktsegment char(10), + c_comment varchar(117), + primary key (c_custkey)); +SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range'); + master_create_distributed_table +--------------------------------- + +(1 row) + +-- Test COPY into empty range-partitioned table +COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|'); +ERROR: could not find any shards for query +DETAIL: No shards exist for distributed table "customer_copy_range". +SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500 +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000 +WHERE shardid = :new_shard_id; +-- Test copy into range-partitioned table +COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|'); +-- Check whether data went into the right shard (maybe) +SELECT count(*) FROM customer_copy_range WHERE c_custkey = 1; + count +------- + 1 +(1 row) + +-- Check whether data was copied +SELECT count(*) FROM customer_copy_range; + count +------- + 1000 +(1 row) +