From a8f9e983a08f1c223e17a1bb68461480e5f02b40 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sun, 18 Sep 2016 19:45:25 -0700 Subject: [PATCH] Migration COPY to new framework. This implies several behaviour changes: - COPY is now transactional - failure to compute stats for append partitioned tables is an error --- src/backend/distributed/commands/multi_copy.c | 834 +++++++++--------- src/include/distributed/master_protocol.h | 2 +- .../expected/multi_modifying_xacts.out | 62 +- src/test/regress/output/multi_copy.source | 6 +- .../regress/sql/multi_modifying_xacts.sql | 11 +- 5 files changed, 431 insertions(+), 484 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 43299d922..6a18bf533 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -71,8 +71,6 @@ #include "commands/copy.h" #include "commands/defrem.h" #include "distributed/citus_ruleutils.h" -#include "distributed/commit_protocol.h" -#include "distributed/connection_cache.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/master_metadata_utility.h" @@ -80,8 +78,9 @@ #include "distributed/metadata_cache.h" #include "distributed/multi_copy.h" #include "distributed/multi_physical_planner.h" -#include "distributed/multi_shard_transaction.h" #include "distributed/pg_dist_partition.h" +#include "distributed/placement_connection.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" @@ -127,12 +126,18 @@ #include "utils/tuplestore.h" #include "utils/memutils.h" +/* ShardConnections represents a set of connections for each placement of a shard */ +typedef struct ShardConnections +{ + int64 shardId; + List *connectionList; +} ShardConnections; /* constant used in binary protocol */ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* use a global connection to the master node in order to skip passing it around */ -static PGconn *masterConnection = NULL; +static MultiConnection *masterConnection = NULL; /* Local functions forward declarations */ @@ -141,29 +146,34 @@ static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag); static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId); static char MasterPartitionMethod(RangeVar *relation); static void RemoveMasterOptions(CopyStmt *copyStatement); -static void OpenCopyTransactions(CopyStmt *copyStatement, - ShardConnections *shardConnections, bool stopOnFailure, - bool useBinaryCopyFormat); +static HTAB * CreateShardConnectionHash(void); +static ShardConnections * GetShardConnections(HTAB *shardConnectionHash, int64 shardId, + bool *shardConnectionsFound); +static void OpenCopyConnections(CopyStmt *copyStatement, + ShardConnections *shardConnections, bool stopOnFailure, + bool useBinaryCopyFormat); static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription, CopyOutState rowOutputState); static List * MasterShardPlacementList(uint64 shardId); static List * RemoteFinalizedShardPlacementList(uint64 shardId); -static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList); -static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList); +static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId, + List *connectionList); +static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId, + List *connectionList); static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCopyFormat); -static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList); -static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, - int64 shardId); -static void EndRemoteCopy(List *connectionList, bool stopOnFailure); -static void ReportCopyError(PGconn *connection, PGresult *result); +static void SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList); +static void SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, + MultiConnection *connection); +static void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure); +static void ReportCopyError(MultiConnection *connection, PGresult *result); static uint32 AvailableColumnCount(TupleDesc tupleDescriptor); -static void StartCopyToNewShard(ShardConnections *shardConnections, - CopyStmt *copyStatement, bool useBinaryCopyFormat); +static int64 StartCopyToNewShard(ShardConnections *shardConnections, + CopyStmt *copyStatement, bool useBinaryCopyFormat); static int64 MasterCreateEmptyShard(char *relationName); static int64 CreateEmptyShard(char *relationName); static int64 RemoteCreateEmptyShard(char *relationName); -static void FinalizeCopyToNewShard(ShardConnections *shardConnections); +static void FinalizeCopyToNewShard(int64 shardId, ShardConnections *shardConnections); static void MasterUpdateShardStatistics(uint64 shardId); static void RemoteUpdateShardStatistics(uint64 shardId); @@ -187,6 +197,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) { bool isCopyFromWorker = false; + BeginOrContinueCoordinatedTransaction(); + /* disallow COPY to/from file or program except for superusers */ if (copyStatement->filename != NULL && !superuser()) { @@ -208,6 +220,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) } } + masterConnection = NULL; /* reset, might still be set after error */ isCopyFromWorker = IsCopyFromWorker(copyStatement); if (isCopyFromWorker) { @@ -268,79 +281,42 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag) NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement); char *nodeName = masterNodeAddress->nodeName; int32 nodePort = masterNodeAddress->nodePort; - char *nodeUser = CurrentUserName(); + Oid relationId = InvalidOid; + char partitionMethod = 0; + char *schemaName = NULL; - if (XactModificationLevel > XACT_MODIFICATION_NONE) + masterConnection = GetNodeConnection(NEW_CONNECTION | CACHED_CONNECTION, + nodeName, nodePort); + ClaimConnectionExclusively(masterConnection); + + /* run all metadata commands in a transaction */ + AdjustRemoteTransactionState(masterConnection); + + /* strip schema name for local reference */ + schemaName = copyStatement->relation->schemaname; + copyStatement->relation->schemaname = NULL; + + relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); + + /* put schema name back */ + copyStatement->relation->schemaname = schemaName; + + partitionMethod = MasterPartitionMethod(copyStatement->relation); + if (partitionMethod != DISTRIBUTE_BY_APPEND) { - ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("distributed copy operations must not appear in " - "transaction blocks containing other distributed " - "modifications"))); + ereport(ERROR, (errmsg("copy from worker nodes is only supported " + "for append-partitioned tables"))); } - masterConnection = ConnectToNode(nodeName, nodePort, nodeUser); + /* + * Remove master node options from the copy statement because they are not + * recognized by PostgreSQL machinery. + */ + RemoveMasterOptions(copyStatement); - PG_TRY(); - { - PGresult *queryResult = NULL; - Oid relationId = InvalidOid; - char partitionMethod = 0; + CopyToNewShards(copyStatement, completionTag, relationId); - /* strip schema name for local reference */ - char *schemaName = copyStatement->relation->schemaname; - copyStatement->relation->schemaname = NULL; - - relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); - - /* put schema name back */ - copyStatement->relation->schemaname = schemaName; - - partitionMethod = MasterPartitionMethod(copyStatement->relation); - if (partitionMethod != DISTRIBUTE_BY_APPEND) - { - ereport(ERROR, (errmsg("copy from worker nodes is only supported " - "for append-partitioned tables"))); - } - - /* run all metadata commands in a transaction */ - queryResult = PQexec(masterConnection, "BEGIN"); - if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) - { - ereport(ERROR, (errmsg("could not start to update master node metadata"))); - } - - PQclear(queryResult); - - /* - * Remove master node options from the copy statement because they are not - * recognized by PostgreSQL machinery. - */ - RemoveMasterOptions(copyStatement); - - CopyToNewShards(copyStatement, completionTag, relationId); - - /* commit metadata transactions */ - queryResult = PQexec(masterConnection, "COMMIT"); - if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) - { - ereport(ERROR, (errmsg("could not commit master node metadata changes"))); - } - - PQclear(queryResult); - - /* close the connection */ - PQfinish(masterConnection); - masterConnection = NULL; - } - PG_CATCH(); - { - /* close the connection */ - PQfinish(masterConnection); - masterConnection = NULL; - - PG_RE_THROW(); - } - PG_END_TRY(); + UnclaimConnection(masterConnection); } @@ -371,9 +347,9 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) ShardInterval **shardIntervalCache = NULL; bool useBinarySearch = false; - HTAB *copyConnectionHash = NULL; + HTAB *shardConnectionHash = NULL; + HASH_SEQ_STATUS shardConnectionIterator; ShardConnections *shardConnections = NULL; - List *connectionList = NIL; EState *executorState = NULL; MemoryContext executorTupleContext = NULL; @@ -387,6 +363,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) Var *partitionColumn = PartitionColumn(tableId, 0); char partitionMethod = PartitionMethod(tableId); + ErrorContextCallback errorCallback; + /* get hash function for partition column */ hashFunction = cacheEntry->hashFunction; @@ -467,152 +445,120 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); - /* - * Create a mapping of shard id to a connection for each of its placements. - * The hash should be initialized before the PG_TRY, since it is used in - * PG_CATCH. Otherwise, it may be undefined in the PG_CATCH (see sigsetjmp - * documentation). - */ - copyConnectionHash = CreateShardConnectionHash(TopTransactionContext); + /* create a mapping of shard id to a connection for each of its placements */ + shardConnectionHash = CreateShardConnectionHash(); - /* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */ - PG_TRY(); + /* set up callback to identify error line number */ + errorCallback.callback = CopyFromErrorCallback; + errorCallback.arg = (void *) copyState; + errorCallback.previous = error_context_stack; + error_context_stack = &errorCallback; + + while (true) { - ErrorContextCallback errorCallback; + bool nextRowFound = false; + Datum partitionColumnValue = 0; + ShardInterval *shardInterval = NULL; + int64 shardId = 0; + bool shardConnectionsFound = false; + MemoryContext oldContext = NULL; - /* set up callback to identify error line number */ - errorCallback.callback = CopyFromErrorCallback; - errorCallback.arg = (void *) copyState; - errorCallback.previous = error_context_stack; - error_context_stack = &errorCallback; + ResetPerTupleExprContext(executorState); - /* ensure transactions have unique names on worker nodes */ - InitializeDistributedTransaction(); + oldContext = MemoryContextSwitchTo(executorTupleContext); - while (true) + /* parse a row from the input */ + nextRowFound = NextCopyFrom(copyState, executorExpressionContext, + columnValues, columnNulls, NULL); + + if (!nextRowFound) { - bool nextRowFound = false; - Datum partitionColumnValue = 0; - ShardInterval *shardInterval = NULL; - int64 shardId = 0; - bool shardConnectionsFound = false; - MemoryContext oldContext = NULL; - - ResetPerTupleExprContext(executorState); - - oldContext = MemoryContextSwitchTo(executorTupleContext); - - /* parse a row from the input */ - nextRowFound = NextCopyFrom(copyState, executorExpressionContext, - columnValues, columnNulls, NULL); - - if (!nextRowFound) - { - MemoryContextSwitchTo(oldContext); - break; - } - - CHECK_FOR_INTERRUPTS(); - - /* find the partition column value */ - - if (columnNulls[partitionColumn->varattno - 1]) - { - ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), - errmsg("cannot copy row with NULL value " - "in partition column"))); - } - - partitionColumnValue = columnValues[partitionColumn->varattno - 1]; - - /* find the shard interval and id for the partition column value */ - shardInterval = FindShardInterval(partitionColumnValue, shardIntervalCache, - shardCount, partitionMethod, - compareFunction, hashFunction, - useBinarySearch); - if (shardInterval == NULL) - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find shard for partition column " - "value"))); - } - - shardId = shardInterval->shardId; - MemoryContextSwitchTo(oldContext); - - /* get existing connections to the shard placements, if any */ - shardConnections = GetShardHashConnections(copyConnectionHash, shardId, - &shardConnectionsFound); - if (!shardConnectionsFound) - { - /* open connections and initiate COPY on shard placements */ - OpenCopyTransactions(copyStatement, shardConnections, false, - copyOutState->binary); - - /* send copy binary headers to shard placements */ - if (copyOutState->binary) - { - SendCopyBinaryHeaders(copyOutState, - shardConnections->connectionList); - } - } - - /* replicate row to shard placements */ - resetStringInfo(copyOutState->fe_msgbuf); - AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, - copyOutState, columnOutputFunctions); - SendCopyDataToAll(copyOutState->fe_msgbuf, shardConnections->connectionList); - - processedRowCount += 1; + break; } - connectionList = ConnectionList(copyConnectionHash); + CHECK_FOR_INTERRUPTS(); + /* find the partition column value */ + + if (columnNulls[partitionColumn->varattno - 1]) + { + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("cannot copy row with NULL value " + "in partition column"))); + } + + partitionColumnValue = columnValues[partitionColumn->varattno - 1]; + + /* find the shard interval and id for the partition column value */ + shardInterval = FindShardInterval(partitionColumnValue, shardIntervalCache, + shardCount, partitionMethod, + compareFunction, hashFunction, + useBinarySearch); + if (shardInterval == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find shard for partition column " + "value"))); + } + + shardId = shardInterval->shardId; + + MemoryContextSwitchTo(oldContext); + + /* get existing connections to the shard placements, if any */ + shardConnections = GetShardConnections(shardConnectionHash, + shardId, + &shardConnectionsFound); + if (!shardConnectionsFound) + { + /* open connections and initiate COPY on shard placements */ + OpenCopyConnections(copyStatement, shardConnections, false, + copyOutState->binary); + + /* send copy binary headers to shard placements */ + if (copyOutState->binary) + { + SendCopyBinaryHeaders(copyOutState, shardId, + shardConnections->connectionList); + } + } + + /* replicate row to shard placements */ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, + copyOutState, columnOutputFunctions); + SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, + shardConnections->connectionList); + + processedRowCount += 1; + } + + /* all lines have been copied, stop showing line number in errors */ + error_context_stack = errorCallback.previous; + + CHECK_FOR_INTERRUPTS(); + + /* finish copy on all open connections */ + hash_seq_init(&shardConnectionIterator, shardConnectionHash); + shardConnections = (ShardConnections *) hash_seq_search(&shardConnectionIterator); + while (shardConnections != NULL) + { /* send copy binary footers to all shard placements */ if (copyOutState->binary) { - SendCopyBinaryFooters(copyOutState, connectionList); + SendCopyBinaryFooters(copyOutState, shardConnections->shardId, + shardConnections->connectionList); } - /* all lines have been copied, stop showing line number in errors */ - error_context_stack = errorCallback.previous; - /* close the COPY input on all shard placements */ - EndRemoteCopy(connectionList, true); + EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true); - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) - { - PrepareRemoteTransactions(connectionList); - } - - EndCopyFrom(copyState); - heap_close(distributedRelation, NoLock); - - /* check for cancellation one last time before committing */ - CHECK_FOR_INTERRUPTS(); + shardConnections = (ShardConnections *) hash_seq_search(&shardConnectionIterator); } - PG_CATCH(); - { - List *abortConnectionList = NIL; - /* roll back all transactions */ - abortConnectionList = ConnectionList(copyConnectionHash); - EndRemoteCopy(abortConnectionList, false); - AbortRemoteTransactions(abortConnectionList); - CloseConnections(abortConnectionList); - - PG_RE_THROW(); - } - PG_END_TRY(); - - /* - * Ready to commit the transaction, this code is below the PG_TRY block because - * we do not want any of the transactions rolled back if a failure occurs. Instead, - * they should be rolled forward. - */ - CommitRemoteTransactions(connectionList, false); - CloseConnections(connectionList); + EndCopyFrom(copyState); + heap_close(distributedRelation, NoLock); if (completionTag != NULL) { @@ -645,11 +591,13 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) const char *delimiterCharacter = "\t"; const char *nullPrintCharacter = "\\N"; - /* - * Shard connections should be initialized before the PG_TRY, since it is - * used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH - * (see sigsetjmp documentation). - */ + ErrorContextCallback errorCallback; + + int64 currentShardId = INVALID_SHARD_ID; + uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; + uint64 copiedDataSizeInBytes = 0; + uint64 processedRowCount = 0; + ShardConnections *shardConnections = (ShardConnections *) palloc0(sizeof(ShardConnections)); @@ -670,140 +618,124 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); - /* we use a PG_TRY block to close connections on errors (e.g. in NextCopyFrom) */ - PG_TRY(); + /* set up callback to identify error line number */ + errorCallback.callback = CopyFromErrorCallback; + errorCallback.arg = (void *) copyState; + errorCallback.previous = error_context_stack; + + while (true) { - uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; - uint64 copiedDataSizeInBytes = 0; - uint64 processedRowCount = 0; + bool nextRowFound = false; + MemoryContext oldContext = NULL; + uint64 messageBufferSize = 0; - /* set up callback to identify error line number */ - ErrorContextCallback errorCallback; + ResetPerTupleExprContext(executorState); - errorCallback.callback = CopyFromErrorCallback; - errorCallback.arg = (void *) copyState; - errorCallback.previous = error_context_stack; + /* switch to tuple memory context and start showing line number in errors */ + error_context_stack = &errorCallback; + oldContext = MemoryContextSwitchTo(executorTupleContext); - while (true) + /* parse a row from the input */ + nextRowFound = NextCopyFrom(copyState, executorExpressionContext, + columnValues, columnNulls, NULL); + + if (!nextRowFound) { - bool nextRowFound = false; - MemoryContext oldContext = NULL; - uint64 messageBufferSize = 0; - - ResetPerTupleExprContext(executorState); - - /* switch to tuple memory context and start showing line number in errors */ - error_context_stack = &errorCallback; - oldContext = MemoryContextSwitchTo(executorTupleContext); - - /* parse a row from the input */ - nextRowFound = NextCopyFrom(copyState, executorExpressionContext, - columnValues, columnNulls, NULL); - - if (!nextRowFound) - { - /* switch to regular memory context and stop showing line number in errors */ - MemoryContextSwitchTo(oldContext); - error_context_stack = errorCallback.previous; - break; - } - - CHECK_FOR_INTERRUPTS(); - /* switch to regular memory context and stop showing line number in errors */ MemoryContextSwitchTo(oldContext); error_context_stack = errorCallback.previous; - - /* - * If copied data size is zero, this means either this is the first - * line in the copy or we just filled the previous shard up to its - * capacity. Either way, we need to create a new shard and - * start copying new rows into it. - */ - if (copiedDataSizeInBytes == 0) - { - /* create shard and open connections to shard placements */ - StartCopyToNewShard(shardConnections, copyStatement, - copyOutState->binary); - - /* send copy binary headers to shard placements */ - if (copyOutState->binary) - { - SendCopyBinaryHeaders(copyOutState, - shardConnections->connectionList); - } - } - - /* replicate row to shard placements */ - resetStringInfo(copyOutState->fe_msgbuf); - AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, - copyOutState, columnOutputFunctions); - SendCopyDataToAll(copyOutState->fe_msgbuf, shardConnections->connectionList); - - messageBufferSize = copyOutState->fe_msgbuf->len; - copiedDataSizeInBytes = copiedDataSizeInBytes + messageBufferSize; - - /* - * If we filled up this shard to its capacity, send copy binary footers - * to shard placements, commit copy transactions, close connections - * and finally update shard statistics. - * - * */ - if (copiedDataSizeInBytes > shardMaxSizeInBytes) - { - if (copyOutState->binary) - { - SendCopyBinaryFooters(copyOutState, - shardConnections->connectionList); - } - FinalizeCopyToNewShard(shardConnections); - MasterUpdateShardStatistics(shardConnections->shardId); - - copiedDataSizeInBytes = 0; - } - - processedRowCount += 1; + break; } + CHECK_FOR_INTERRUPTS(); + + /* switch to regular memory context and stop showing line number in errors */ + MemoryContextSwitchTo(oldContext); + error_context_stack = errorCallback.previous; + /* - * For the last shard, send copy binary footers to shard placements, - * commit copy transactions, close connections and finally update shard - * statistics. If no row is send, there is no shard to finalize the - * copy command. + * If copied data size is zero, this means either this is the first + * line in the copy or we just filled the previous shard up to its + * capacity. Either way, we need to create a new shard and + * start copying new rows into it. */ - if (copiedDataSizeInBytes > 0) + if (copiedDataSizeInBytes == 0) + { + /* create shard and open connections to shard placements */ + currentShardId = StartCopyToNewShard(shardConnections, copyStatement, + copyOutState->binary); + + /* send copy binary headers to shard placements */ + if (copyOutState->binary) + { + SendCopyBinaryHeaders(copyOutState, currentShardId, + shardConnections->connectionList); + } + } + + /* replicate row to shard placements */ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, + copyOutState, columnOutputFunctions); + SendCopyDataToAll(copyOutState->fe_msgbuf, currentShardId, + shardConnections->connectionList); + + messageBufferSize = copyOutState->fe_msgbuf->len; + copiedDataSizeInBytes = copiedDataSizeInBytes + messageBufferSize; + + /* + * If we filled up this shard to its capacity, send copy binary footers + * to shard placements, commit copy transactions, close connections + * and finally update shard statistics. + * + * */ + if (copiedDataSizeInBytes > shardMaxSizeInBytes) { if (copyOutState->binary) { - SendCopyBinaryFooters(copyOutState, + SendCopyBinaryFooters(copyOutState, currentShardId, shardConnections->connectionList); } - FinalizeCopyToNewShard(shardConnections); + FinalizeCopyToNewShard(currentShardId, shardConnections); MasterUpdateShardStatistics(shardConnections->shardId); + + copiedDataSizeInBytes = 0; + currentShardId = INVALID_SHARD_ID; } - EndCopyFrom(copyState); - heap_close(distributedRelation, NoLock); - - /* check for cancellation one last time before returning */ - CHECK_FOR_INTERRUPTS(); - - if (completionTag != NULL) - { - snprintf(completionTag, COMPLETION_TAG_BUFSIZE, - "COPY " UINT64_FORMAT, processedRowCount); - } + processedRowCount += 1; } - PG_CATCH(); + + /* + * For the last shard, send copy binary footers to shard placements, + * commit copy transactions, close connections and finally update shard + * statistics. If no row is send, there is no shard to finalize the + * copy command. + */ + if (copiedDataSizeInBytes > 0) { - /* roll back all transactions */ - EndRemoteCopy(shardConnections->connectionList, false); - AbortRemoteTransactions(shardConnections->connectionList); - CloseConnections(shardConnections->connectionList); + Assert(currentShardId != INVALID_SHARD_ID); - PG_RE_THROW(); + if (copyOutState->binary) + { + SendCopyBinaryFooters(copyOutState, currentShardId, + shardConnections->connectionList); + } + FinalizeCopyToNewShard(currentShardId, shardConnections); + MasterUpdateShardStatistics(shardConnections->shardId); + } + + EndCopyFrom(copyState); + heap_close(distributedRelation, NoLock); + + /* check for cancellation one last time before returning */ + CHECK_FOR_INTERRUPTS(); + + if (completionTag != NULL) + { + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, + "COPY " UINT64_FORMAT, processedRowCount); } - PG_END_TRY(); } @@ -859,7 +791,7 @@ MasterPartitionMethod(RangeVar *relation) StringInfo partitionMethodCommand = makeStringInfo(); appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName); - queryResult = PQexec(masterConnection, partitionMethodCommand->data); + queryResult = PQexec(masterConnection->conn, partitionMethodCommand->data); if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) { char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0); @@ -873,7 +805,7 @@ MasterPartitionMethod(RangeVar *relation) } else { - WarnRemoteError(masterConnection, queryResult); + ReportResultError(masterConnection, queryResult, WARNING); ereport(ERROR, (errmsg("could not get the partition method of the " "distributed table"))); } @@ -913,25 +845,65 @@ RemoveMasterOptions(CopyStmt *copyStatement) } +static HTAB * +CreateShardConnectionHash(void) +{ + HTAB *shardConnectionsHash = NULL; + int hashFlags = 0; + HASHCTL info; + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(int64); + info.entrysize = sizeof(ShardConnections); + info.hash = tag_hash; + info.hcxt = TopTransactionContext; + + hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT; + shardConnectionsHash = hash_create("Shard Connections Hash", + 256, &info, + hashFlags); + + return shardConnectionsHash; +} + + +static ShardConnections * +GetShardConnections(HTAB *shardConnectionHash, int64 shardId, + bool *shardConnectionsFound) +{ + ShardConnections *shardConnections = NULL; + + shardConnections = (ShardConnections *) hash_search(shardConnectionHash, + &shardId, + HASH_ENTER, + shardConnectionsFound); + if (!*shardConnectionsFound) + { + shardConnections->shardId = shardId; + shardConnections->connectionList = NIL; + } + + return shardConnections; +} + + /* - * OpenCopyTransactions opens a connection for each placement of a shard and - * starts a COPY transaction. If a connection cannot be opened, then the shard - * placement is marked as inactive and the COPY continues with the remaining - * shard placements. + * OpenCopyConnections opens a connection for each placement of a shard and + * starts COPY. If a connection cannot be opened, then the shard placement is + * marked as inactive and the COPY continues with the remaining shard + * placements. */ static void -OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections, - bool stopOnFailure, bool useBinaryCopyFormat) +OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, + bool stopOnFailure, bool useBinaryCopyFormat) { List *finalizedPlacementList = NIL; List *failedPlacementList = NIL; ListCell *placementCell = NULL; - ListCell *failedPlacementCell = NULL; List *connectionList = NULL; int64 shardId = shardConnections->shardId; MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, - "OpenCopyTransactions", + "OpenCopyConnections", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); @@ -943,28 +915,31 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections MemoryContextSwitchTo(oldContext); - if (XactModificationLevel > XACT_MODIFICATION_NONE) - { - ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("distributed copy operations must not appear in " - "transaction blocks containing other distributed " - "modifications"))); - } - foreach(placementCell, finalizedPlacementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); char *nodeName = placement->nodeName; int nodePort = placement->nodePort; - char *nodeUser = CurrentUserName(); - PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser); - - TransactionConnection *transactionConnection = NULL; + MultiConnection *connection = NULL; + uint32 connectionFlags = NEW_CONNECTION | CACHED_CONNECTION | FOR_DML; StringInfo copyCommand = NULL; PGresult *result = NULL; + if (stopOnFailure) + { + connectionFlags |= CRITICAL_CONNECTION; + } + + /* + * FIXME: should make connection establishment parallel, by using + * StartPlacementConnection etc. + */ + connection = GetPlacementConnection(connectionFlags, placement); + ClaimConnectionExclusively(connection); + if (connection == NULL) { + /* FIXME: remove or replace with working code */ if (stopOnFailure) { ereport(ERROR, (errmsg("could not open connection to %s:%d", @@ -975,39 +950,26 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections continue; } - result = PQexec(connection, "BEGIN"); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - WarnRemoteError(connection, result); - failedPlacementList = lappend(failedPlacementList, placement); + /* start transaction unless already done so */ + AdjustRemoteTransactionState(connection); - PQclear(result); - continue; - } - - PQclear(result); copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId, useBinaryCopyFormat); - result = PQexec(connection, copyCommand->data); + result = PQexec(connection->conn, copyCommand->data); if (PQresultStatus(result) != PGRES_COPY_IN) { - WarnRemoteError(connection, result); - failedPlacementList = lappend(failedPlacementList, placement); - + ReportResultError(connection, result, WARNING); PQclear(result); + MarkRemoteTransactionFailed(connection, true); + + /* failed placements will be invalidated by transaction machinery */ continue; } PQclear(result); - transactionConnection = palloc0(sizeof(TransactionConnection)); - - transactionConnection->connectionId = shardConnections->shardId; - transactionConnection->transactionState = TRANSACTION_STATE_COPY_STARTED; - transactionConnection->connection = connection; - - connectionList = lappend(connectionList, transactionConnection); + connectionList = lappend(connectionList, connection); } /* if all placements failed, error out */ @@ -1016,20 +978,6 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections ereport(ERROR, (errmsg("could not find any active placements"))); } - /* - * If stopOnFailure is true, we just error out and code execution should - * never reach to this point. This is the case for copy from worker nodes. - */ - Assert(!stopOnFailure || list_length(failedPlacementList) == 0); - - /* otherwise, mark failed placements as inactive: they're stale */ - foreach(failedPlacementCell, failedPlacementList) - { - ShardPlacement *failedPlacement = (ShardPlacement *) lfirst(failedPlacementCell); - - UpdateShardPlacementState(failedPlacement->placementId, FILE_INACTIVE); - } - shardConnections->connectionList = connectionList; MemoryContextReset(localContext); @@ -1113,7 +1061,7 @@ RemoteFinalizedShardPlacementList(uint64 shardId) StringInfo shardPlacementsCommand = makeStringInfo(); appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId); - queryResult = PQexec(masterConnection, shardPlacementsCommand->data); + queryResult = PQexec(masterConnection->conn, shardPlacementsCommand->data); if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) { int rowCount = PQntuples(queryResult); @@ -1121,14 +1069,17 @@ RemoteFinalizedShardPlacementList(uint64 shardId) for (rowIndex = 0; rowIndex < rowCount; rowIndex++) { - char *nodeName = PQgetvalue(queryResult, rowIndex, 0); - - char *nodePortString = PQgetvalue(queryResult, rowIndex, 1); + char *placementIdString = PQgetvalue(queryResult, rowIndex, 0); + char *nodeName = PQgetvalue(queryResult, rowIndex, 1); + char *nodePortString = PQgetvalue(queryResult, rowIndex, 2); + uint64 placementId = atol(placementIdString); uint32 nodePort = atoi(nodePortString); ShardPlacement *shardPlacement = (ShardPlacement *) palloc0(sizeof(ShardPlacement)); + shardPlacement->shardId = shardId; + shardPlacement->placementId = placementId; shardPlacement->nodeName = nodeName; shardPlacement->nodePort = nodePort; @@ -1146,21 +1097,21 @@ RemoteFinalizedShardPlacementList(uint64 shardId) /* Send copy binary headers to given connections */ static void -SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList) +SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId, List *connectionList) { resetStringInfo(copyOutState->fe_msgbuf); AppendCopyBinaryHeaders(copyOutState); - SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList); + SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, connectionList); } /* Send copy binary footers to given connections */ static void -SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList) +SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId, List *connectionList) { resetStringInfo(copyOutState->fe_msgbuf); AppendCopyBinaryFooters(copyOutState); - SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList); + SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, connectionList); } @@ -1203,17 +1154,15 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCop * SendCopyDataToAll sends copy data to all connections in a list. */ static void -SendCopyDataToAll(StringInfo dataBuffer, List *connectionList) +SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList) { ListCell *connectionCell = NULL; foreach(connectionCell, connectionList) { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - int64 shardId = transactionConnection->connectionId; + MultiConnection *connection = + (MultiConnection *) lfirst(connectionCell); - SendCopyDataToPlacement(dataBuffer, connection, shardId); + SendCopyDataToPlacement(dataBuffer, shardId, connection); } } @@ -1223,73 +1172,71 @@ SendCopyDataToAll(StringInfo dataBuffer, List *connectionList) * over the given connection. */ static void -SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, int64 shardId) +SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *connection) { - int copyResult = PQputCopyData(connection, dataBuffer->data, dataBuffer->len); + int copyResult = PQputCopyData(connection->conn, dataBuffer->data, dataBuffer->len); if (copyResult != 1) { - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); - ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("failed to COPY to shard %ld on %s:%s", - shardId, nodeName, nodePort))); + errmsg("failed to COPY to shard %ld on %s:%d", + shardId, connection->hostname, connection->port))); } } /* - * EndRemoteCopy ends the COPY input on all connections. If stopOnFailure - * is true, then EndRemoteCopy reports an error on failure, otherwise it - * reports a warning or continues. + * EndRemoteCopy ends the COPY input on all connections. If stopOnFailure is + * true, then EndRemoteCopy reports an error on failure, otherwise it reports + * a warning or continues. */ static void -EndRemoteCopy(List *connectionList, bool stopOnFailure) +EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure) { ListCell *connectionCell = NULL; foreach(connectionCell, connectionList) { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - int64 shardId = transactionConnection->connectionId; + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); int copyEndResult = 0; PGresult *result = NULL; +#if 0 + /* FIXME: Is this actually a relevant case? */ if (transactionConnection->transactionState != TRANSACTION_STATE_COPY_STARTED) { /* a failure occurred after having previously called EndRemoteCopy */ continue; } +#endif /* end the COPY input */ - copyEndResult = PQputCopyEnd(connection, NULL); - transactionConnection->transactionState = TRANSACTION_STATE_OPEN; + copyEndResult = PQputCopyEnd(connection->conn, NULL); if (copyEndResult != 1) { if (stopOnFailure) { - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); - ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("failed to COPY to shard %ld on %s:%s", - shardId, nodeName, nodePort))); + errmsg("failed to COPY to shard %ld on %s:%d", + shardId, connection->hostname, connection->port))); } continue; } /* check whether there were any COPY errors */ - result = PQgetResult(connection); + result = PQgetResult(connection->conn); if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure) { ReportCopyError(connection, result); } PQclear(result); + + result = PQgetResult(connection->conn); + Assert(!result); + + UnclaimConnection(connection); } } @@ -1299,7 +1246,7 @@ EndRemoteCopy(List *connectionList, bool stopOnFailure) * the remote COPY error messages. */ static void -ReportCopyError(PGconn *connection, PGresult *result) +ReportCopyError(MultiConnection *connection, PGresult *result) { char *remoteMessage = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY); @@ -1315,10 +1262,8 @@ ReportCopyError(PGconn *connection, PGresult *result) { /* probably a connection problem, get the message from the connection */ char *lastNewlineIndex = NULL; - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); - remoteMessage = PQerrorMessage(connection); + remoteMessage = PQerrorMessage(connection->conn); lastNewlineIndex = strrchr(remoteMessage, '\n'); /* trim trailing newline, if any */ @@ -1328,7 +1273,8 @@ ReportCopyError(PGconn *connection, PGresult *result) } ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("failed to complete COPY on %s:%s", nodeName, nodePort), + errmsg("failed to complete COPY on %s:%u", + connection->hostname, connection->port), errdetail("%s", remoteMessage))); } } @@ -1531,7 +1477,7 @@ AppendCopyBinaryFooters(CopyOutState footerOutputState) * StartCopyToNewShard creates a new shard and related shard placements and * opens connections to shard placements. */ -static void +static int64 StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement, bool useBinaryCopyFormat) { @@ -1543,11 +1489,12 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement, shardConnections->shardId = shardId; - list_free_deep(shardConnections->connectionList); shardConnections->connectionList = NIL; /* connect to shards placements and start transactions */ - OpenCopyTransactions(copyStatement, shardConnections, true, useBinaryCopyFormat); + OpenCopyConnections(copyStatement, shardConnections, true, useBinaryCopyFormat); + + return shardId; } @@ -1604,7 +1551,7 @@ RemoteCreateEmptyShard(char *relationName) StringInfo createEmptyShardCommand = makeStringInfo(); appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName); - queryResult = PQexec(masterConnection, createEmptyShardCommand->data); + queryResult = PQexec(masterConnection->conn, createEmptyShardCommand->data); if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) { char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0); @@ -1613,7 +1560,7 @@ RemoteCreateEmptyShard(char *relationName) } else { - WarnRemoteError(masterConnection, queryResult); + ReportResultError(masterConnection, queryResult, WARNING); ereport(ERROR, (errmsg("could not create a new empty shard on the remote node"))); } @@ -1624,18 +1571,23 @@ RemoteCreateEmptyShard(char *relationName) /* - * FinalizeCopyToNewShard commits copy transaction and closes connections to - * shard placements. + * FinalizeCopyToNewShard ends the COPY and marks connection as inactive for + * all shard placements. */ static void -FinalizeCopyToNewShard(ShardConnections *shardConnections) +FinalizeCopyToNewShard(int64 shardId, ShardConnections *shardConnections) { - /* close the COPY input on all shard placements */ - EndRemoteCopy(shardConnections->connectionList, true); + ListCell *connectionCell = NULL; - /* commit transactions and close connections */ - CommitRemoteTransactions(shardConnections->connectionList, true); - CloseConnections(shardConnections->connectionList); + /* close the COPY input on all shard placements */ + EndRemoteCopy(shardId, shardConnections->connectionList, true); + + foreach(connectionCell, shardConnections->connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + + UnclaimConnection(connection); + } } @@ -1669,7 +1621,7 @@ RemoteUpdateShardStatistics(uint64 shardId) appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY, shardId); - queryResult = PQexec(masterConnection, updateShardStatisticsCommand->data); + queryResult = PQexec(masterConnection->conn, updateShardStatisticsCommand->data); if (PQresultStatus(queryResult) != PGRES_TUPLES_OK) { ereport(ERROR, (errmsg("could not update shard statistics"))); diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 18c5451a4..4e19a2556 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -66,7 +66,7 @@ #define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s" #define CREATE_EMPTY_SHARD_QUERY "SELECT master_create_empty_shard('%s')" #define FINALIZED_SHARD_PLACEMENTS_QUERY \ - "SELECT nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = %ld" + "SELECT placementid, nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = %ld" #define UPDATE_SHARD_STATISTICS_QUERY \ "SELECT master_update_shard_statistics(%ld)" #define PARTITION_METHOD_QUERY "SELECT part_method FROM master_get_table_metadata('%s');" diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index e3ae86ee0..7f6f96cf5 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -160,31 +160,27 @@ INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; ERROR: cannot open new connections after the first modification command within a transaction ABORT; --- COPY can't happen second, +-- Check COPY can happen after INSERT BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); \copy labs from stdin delimiter ',' -ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications -CONTEXT: COPY labs, line 1: "10,Weyland-Yutani" COMMIT; --- though it will work if before any modifications +-- Check COPY can happen before INSERT BEGIN; \copy labs from stdin delimiter ',' SELECT name FROM labs WHERE id = 10; name ---------------- Weyland-Yutani -(1 row) + Weyland-Yutani +(2 rows) INSERT INTO labs VALUES (6, 'Bell Labs'); -ERROR: cannot open new connections after the first modification command within a transaction COMMIT; --- but a double-copy isn't allowed (the first will persist) +-- Two COPYs are also ok BEGIN; \copy labs from stdin delimiter ',' \copy labs from stdin delimiter ',' -ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications -CONTEXT: COPY labs, line 1: "12,fsociety" COMMIT; SELECT name FROM labs WHERE id = 11; name @@ -192,13 +188,12 @@ SELECT name FROM labs WHERE id = 11; Planet Express (1 row) --- finally, ALTER and copy aren't compatible +-- finally, check ALTER and copy are compatible BEGIN; ALTER TABLE labs ADD COLUMN motto2 text; \copy labs from stdin delimiter ',' -ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications -CONTEXT: COPY labs, line 1: "12,fsociety,lol" COMMIT; +ALTER TABLE labs DROP COLUMN motto2; -- but the DDL should correctly roll back \d labs Table "public.labs" @@ -207,30 +202,33 @@ COMMIT; id | bigint | not null name | text | not null -SELECT * FROM labs WHERE id = 12; - id | name -----+------ -(0 rows) - --- and if the copy is before the ALTER... -BEGIN; -\copy labs from stdin delimiter ',' -ALTER TABLE labs ADD COLUMN motto3 text; -ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications -COMMIT; --- the DDL fails, but copy persists -\d labs - Table "public.labs" - Column | Type | Modifiers ---------+--------+----------- - id | bigint | not null - name | text | not null - SELECT * FROM labs WHERE id = 12; id | name ----+---------- 12 | fsociety -(1 row) + 12 | fsociety +(2 rows) + +-- and if the copy is before the ALTER... +BEGIN; +\copy labs from stdin delimiter ',' +ALTER TABLE labs ADD COLUMN motto3 text; +ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications +COMMIT; +-- the DDL fails, and copy does not persist +\d labs + Table "public.labs" + Column | Type | Modifiers +--------+--------+----------- + id | bigint | not null + name | text | not null + +SELECT * FROM labs WHERE id = 12; + id | name +----+---------- + 12 | fsociety + 12 | fsociety +(2 rows) -- now, for some special failures... CREATE TABLE objects ( diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 64880627b..6a01452cb 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -702,8 +702,4 @@ SELECT master_create_distributed_table('composite_partition_column_table', 'comp WARNING: function min(number_pack) does not exist HINT: No function matches the given name and argument types. You might need to add explicit type casts. CONTEXT: while executing command on localhost:57637 -WARNING: function min(number_pack) does not exist -HINT: No function matches the given name and argument types. You might need to add explicit type casts. -CONTEXT: while executing command on localhost:57638 -WARNING: could not get statistics for shard public.composite_partition_column_table_560164 -DETAIL: Setting shard statistics to NULL +ERROR: failure on connection marked as essential: localhost:57637 diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 1cd72b6ae..479ff8ca5 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -131,7 +131,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; ABORT; --- COPY can't happen second, +-- Check COPY can happen after INSERT BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); \copy labs from stdin delimiter ',' @@ -139,7 +139,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs'); \. COMMIT; --- though it will work if before any modifications +-- Check COPY can happen before INSERT BEGIN; \copy labs from stdin delimiter ',' 10,Weyland-Yutani @@ -148,7 +148,7 @@ SELECT name FROM labs WHERE id = 10; INSERT INTO labs VALUES (6, 'Bell Labs'); COMMIT; --- but a double-copy isn't allowed (the first will persist) +-- Two COPYs are also ok BEGIN; \copy labs from stdin delimiter ',' 11,Planet Express @@ -160,13 +160,14 @@ COMMIT; SELECT name FROM labs WHERE id = 11; --- finally, ALTER and copy aren't compatible +-- finally, check ALTER and copy are compatible BEGIN; ALTER TABLE labs ADD COLUMN motto2 text; \copy labs from stdin delimiter ',' 12,fsociety,lol \. COMMIT; +ALTER TABLE labs DROP COLUMN motto2; -- but the DDL should correctly roll back \d labs @@ -180,7 +181,7 @@ BEGIN; ALTER TABLE labs ADD COLUMN motto3 text; COMMIT; --- the DDL fails, but copy persists +-- the DDL fails, and copy does not persist \d labs SELECT * FROM labs WHERE id = 12;