Migration COPY to new framework.

This implies several behaviour changes:
- COPY is now transactional
- failure to compute stats for append partitioned tables is an error
pull/775/head
Andres Freund 2016-09-18 19:45:25 -07:00
parent 360307f6d5
commit a8f9e983a0
5 changed files with 431 additions and 484 deletions

View File

@ -71,8 +71,6 @@
#include "commands/copy.h" #include "commands/copy.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
@ -80,8 +78,9 @@
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h" #include "distributed/multi_copy.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
@ -127,12 +126,18 @@
#include "utils/tuplestore.h" #include "utils/tuplestore.h"
#include "utils/memutils.h" #include "utils/memutils.h"
/* ShardConnections represents a set of connections for each placement of a shard */
typedef struct ShardConnections
{
int64 shardId;
List *connectionList;
} ShardConnections;
/* constant used in binary protocol */ /* constant used in binary protocol */
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* use a global connection to the master node in order to skip passing it around */ /* use a global connection to the master node in order to skip passing it around */
static PGconn *masterConnection = NULL; static MultiConnection *masterConnection = NULL;
/* Local functions forward declarations */ /* Local functions forward declarations */
@ -141,29 +146,34 @@ static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId); static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId);
static char MasterPartitionMethod(RangeVar *relation); static char MasterPartitionMethod(RangeVar *relation);
static void RemoveMasterOptions(CopyStmt *copyStatement); static void RemoveMasterOptions(CopyStmt *copyStatement);
static void OpenCopyTransactions(CopyStmt *copyStatement, static HTAB * CreateShardConnectionHash(void);
static ShardConnections * GetShardConnections(HTAB *shardConnectionHash, int64 shardId,
bool *shardConnectionsFound);
static void OpenCopyConnections(CopyStmt *copyStatement,
ShardConnections *shardConnections, bool stopOnFailure, ShardConnections *shardConnections, bool stopOnFailure,
bool useBinaryCopyFormat); bool useBinaryCopyFormat);
static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription, static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription,
CopyOutState rowOutputState); CopyOutState rowOutputState);
static List * MasterShardPlacementList(uint64 shardId); static List * MasterShardPlacementList(uint64 shardId);
static List * RemoteFinalizedShardPlacementList(uint64 shardId); static List * RemoteFinalizedShardPlacementList(uint64 shardId);
static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList); static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId,
static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList); List *connectionList);
static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId,
List *connectionList);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId,
bool useBinaryCopyFormat); bool useBinaryCopyFormat);
static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList); static void SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList);
static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, static void SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId,
int64 shardId); MultiConnection *connection);
static void EndRemoteCopy(List *connectionList, bool stopOnFailure); static void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure);
static void ReportCopyError(PGconn *connection, PGresult *result); static void ReportCopyError(MultiConnection *connection, PGresult *result);
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor); static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
static void StartCopyToNewShard(ShardConnections *shardConnections, static int64 StartCopyToNewShard(ShardConnections *shardConnections,
CopyStmt *copyStatement, bool useBinaryCopyFormat); CopyStmt *copyStatement, bool useBinaryCopyFormat);
static int64 MasterCreateEmptyShard(char *relationName); static int64 MasterCreateEmptyShard(char *relationName);
static int64 CreateEmptyShard(char *relationName); static int64 CreateEmptyShard(char *relationName);
static int64 RemoteCreateEmptyShard(char *relationName); static int64 RemoteCreateEmptyShard(char *relationName);
static void FinalizeCopyToNewShard(ShardConnections *shardConnections); static void FinalizeCopyToNewShard(int64 shardId, ShardConnections *shardConnections);
static void MasterUpdateShardStatistics(uint64 shardId); static void MasterUpdateShardStatistics(uint64 shardId);
static void RemoteUpdateShardStatistics(uint64 shardId); static void RemoteUpdateShardStatistics(uint64 shardId);
@ -187,6 +197,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
{ {
bool isCopyFromWorker = false; bool isCopyFromWorker = false;
BeginOrContinueCoordinatedTransaction();
/* disallow COPY to/from file or program except for superusers */ /* disallow COPY to/from file or program except for superusers */
if (copyStatement->filename != NULL && !superuser()) if (copyStatement->filename != NULL && !superuser())
{ {
@ -208,6 +220,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
} }
} }
masterConnection = NULL; /* reset, might still be set after error */
isCopyFromWorker = IsCopyFromWorker(copyStatement); isCopyFromWorker = IsCopyFromWorker(copyStatement);
if (isCopyFromWorker) if (isCopyFromWorker)
{ {
@ -268,26 +281,19 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement); NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement);
char *nodeName = masterNodeAddress->nodeName; char *nodeName = masterNodeAddress->nodeName;
int32 nodePort = masterNodeAddress->nodePort; int32 nodePort = masterNodeAddress->nodePort;
char *nodeUser = CurrentUserName();
if (XactModificationLevel > XACT_MODIFICATION_NONE)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("distributed copy operations must not appear in "
"transaction blocks containing other distributed "
"modifications")));
}
masterConnection = ConnectToNode(nodeName, nodePort, nodeUser);
PG_TRY();
{
PGresult *queryResult = NULL;
Oid relationId = InvalidOid; Oid relationId = InvalidOid;
char partitionMethod = 0; char partitionMethod = 0;
char *schemaName = NULL;
masterConnection = GetNodeConnection(NEW_CONNECTION | CACHED_CONNECTION,
nodeName, nodePort);
ClaimConnectionExclusively(masterConnection);
/* run all metadata commands in a transaction */
AdjustRemoteTransactionState(masterConnection);
/* strip schema name for local reference */ /* strip schema name for local reference */
char *schemaName = copyStatement->relation->schemaname; schemaName = copyStatement->relation->schemaname;
copyStatement->relation->schemaname = NULL; copyStatement->relation->schemaname = NULL;
relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
@ -302,15 +308,6 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
"for append-partitioned tables"))); "for append-partitioned tables")));
} }
/* run all metadata commands in a transaction */
queryResult = PQexec(masterConnection, "BEGIN");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ereport(ERROR, (errmsg("could not start to update master node metadata")));
}
PQclear(queryResult);
/* /*
* Remove master node options from the copy statement because they are not * Remove master node options from the copy statement because they are not
* recognized by PostgreSQL machinery. * recognized by PostgreSQL machinery.
@ -319,28 +316,7 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
CopyToNewShards(copyStatement, completionTag, relationId); CopyToNewShards(copyStatement, completionTag, relationId);
/* commit metadata transactions */ UnclaimConnection(masterConnection);
queryResult = PQexec(masterConnection, "COMMIT");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ereport(ERROR, (errmsg("could not commit master node metadata changes")));
}
PQclear(queryResult);
/* close the connection */
PQfinish(masterConnection);
masterConnection = NULL;
}
PG_CATCH();
{
/* close the connection */
PQfinish(masterConnection);
masterConnection = NULL;
PG_RE_THROW();
}
PG_END_TRY();
} }
@ -371,9 +347,9 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
ShardInterval **shardIntervalCache = NULL; ShardInterval **shardIntervalCache = NULL;
bool useBinarySearch = false; bool useBinarySearch = false;
HTAB *copyConnectionHash = NULL; HTAB *shardConnectionHash = NULL;
HASH_SEQ_STATUS shardConnectionIterator;
ShardConnections *shardConnections = NULL; ShardConnections *shardConnections = NULL;
List *connectionList = NIL;
EState *executorState = NULL; EState *executorState = NULL;
MemoryContext executorTupleContext = NULL; MemoryContext executorTupleContext = NULL;
@ -387,6 +363,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
Var *partitionColumn = PartitionColumn(tableId, 0); Var *partitionColumn = PartitionColumn(tableId, 0);
char partitionMethod = PartitionMethod(tableId); char partitionMethod = PartitionMethod(tableId);
ErrorContextCallback errorCallback;
/* get hash function for partition column */ /* get hash function for partition column */
hashFunction = cacheEntry->hashFunction; hashFunction = cacheEntry->hashFunction;
@ -467,18 +445,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
/* /* create a mapping of shard id to a connection for each of its placements */
* Create a mapping of shard id to a connection for each of its placements. shardConnectionHash = CreateShardConnectionHash();
* The hash should be initialized before the PG_TRY, since it is used in
* PG_CATCH. Otherwise, it may be undefined in the PG_CATCH (see sigsetjmp
* documentation).
*/
copyConnectionHash = CreateShardConnectionHash(TopTransactionContext);
/* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */
PG_TRY();
{
ErrorContextCallback errorCallback;
/* set up callback to identify error line number */ /* set up callback to identify error line number */
errorCallback.callback = CopyFromErrorCallback; errorCallback.callback = CopyFromErrorCallback;
@ -486,9 +454,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
errorCallback.previous = error_context_stack; errorCallback.previous = error_context_stack;
error_context_stack = &errorCallback; error_context_stack = &errorCallback;
/* ensure transactions have unique names on worker nodes */
InitializeDistributedTransaction();
while (true) while (true)
{ {
bool nextRowFound = false; bool nextRowFound = false;
@ -542,18 +507,19 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
/* get existing connections to the shard placements, if any */ /* get existing connections to the shard placements, if any */
shardConnections = GetShardHashConnections(copyConnectionHash, shardId, shardConnections = GetShardConnections(shardConnectionHash,
shardId,
&shardConnectionsFound); &shardConnectionsFound);
if (!shardConnectionsFound) if (!shardConnectionsFound)
{ {
/* open connections and initiate COPY on shard placements */ /* open connections and initiate COPY on shard placements */
OpenCopyTransactions(copyStatement, shardConnections, false, OpenCopyConnections(copyStatement, shardConnections, false,
copyOutState->binary); copyOutState->binary);
/* send copy binary headers to shard placements */ /* send copy binary headers to shard placements */
if (copyOutState->binary) if (copyOutState->binary)
{ {
SendCopyBinaryHeaders(copyOutState, SendCopyBinaryHeaders(copyOutState, shardId,
shardConnections->connectionList); shardConnections->connectionList);
} }
} }
@ -562,58 +528,38 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
resetStringInfo(copyOutState->fe_msgbuf); resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
copyOutState, columnOutputFunctions); copyOutState, columnOutputFunctions);
SendCopyDataToAll(copyOutState->fe_msgbuf, shardConnections->connectionList); SendCopyDataToAll(copyOutState->fe_msgbuf, shardId,
shardConnections->connectionList);
processedRowCount += 1; processedRowCount += 1;
} }
connectionList = ConnectionList(copyConnectionHash);
/* send copy binary footers to all shard placements */
if (copyOutState->binary)
{
SendCopyBinaryFooters(copyOutState, connectionList);
}
/* all lines have been copied, stop showing line number in errors */ /* all lines have been copied, stop showing line number in errors */
error_context_stack = errorCallback.previous; error_context_stack = errorCallback.previous;
/* close the COPY input on all shard placements */ CHECK_FOR_INTERRUPTS();
EndRemoteCopy(connectionList, true);
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) /* finish copy on all open connections */
hash_seq_init(&shardConnectionIterator, shardConnectionHash);
shardConnections = (ShardConnections *) hash_seq_search(&shardConnectionIterator);
while (shardConnections != NULL)
{ {
PrepareRemoteTransactions(connectionList); /* send copy binary footers to all shard placements */
if (copyOutState->binary)
{
SendCopyBinaryFooters(copyOutState, shardConnections->shardId,
shardConnections->connectionList);
}
/* close the COPY input on all shard placements */
EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true);
shardConnections = (ShardConnections *) hash_seq_search(&shardConnectionIterator);
} }
EndCopyFrom(copyState); EndCopyFrom(copyState);
heap_close(distributedRelation, NoLock); heap_close(distributedRelation, NoLock);
/* check for cancellation one last time before committing */
CHECK_FOR_INTERRUPTS();
}
PG_CATCH();
{
List *abortConnectionList = NIL;
/* roll back all transactions */
abortConnectionList = ConnectionList(copyConnectionHash);
EndRemoteCopy(abortConnectionList, false);
AbortRemoteTransactions(abortConnectionList);
CloseConnections(abortConnectionList);
PG_RE_THROW();
}
PG_END_TRY();
/*
* Ready to commit the transaction, this code is below the PG_TRY block because
* we do not want any of the transactions rolled back if a failure occurs. Instead,
* they should be rolled forward.
*/
CommitRemoteTransactions(connectionList, false);
CloseConnections(connectionList);
if (completionTag != NULL) if (completionTag != NULL)
{ {
snprintf(completionTag, COMPLETION_TAG_BUFSIZE, snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
@ -645,11 +591,13 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
const char *delimiterCharacter = "\t"; const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N"; const char *nullPrintCharacter = "\\N";
/* ErrorContextCallback errorCallback;
* Shard connections should be initialized before the PG_TRY, since it is
* used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH int64 currentShardId = INVALID_SHARD_ID;
* (see sigsetjmp documentation). uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
*/ uint64 copiedDataSizeInBytes = 0;
uint64 processedRowCount = 0;
ShardConnections *shardConnections = ShardConnections *shardConnections =
(ShardConnections *) palloc0(sizeof(ShardConnections)); (ShardConnections *) palloc0(sizeof(ShardConnections));
@ -670,16 +618,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
/* we use a PG_TRY block to close connections on errors (e.g. in NextCopyFrom) */
PG_TRY();
{
uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
uint64 copiedDataSizeInBytes = 0;
uint64 processedRowCount = 0;
/* set up callback to identify error line number */ /* set up callback to identify error line number */
ErrorContextCallback errorCallback;
errorCallback.callback = CopyFromErrorCallback; errorCallback.callback = CopyFromErrorCallback;
errorCallback.arg = (void *) copyState; errorCallback.arg = (void *) copyState;
errorCallback.previous = error_context_stack; errorCallback.previous = error_context_stack;
@ -723,13 +662,13 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
if (copiedDataSizeInBytes == 0) if (copiedDataSizeInBytes == 0)
{ {
/* create shard and open connections to shard placements */ /* create shard and open connections to shard placements */
StartCopyToNewShard(shardConnections, copyStatement, currentShardId = StartCopyToNewShard(shardConnections, copyStatement,
copyOutState->binary); copyOutState->binary);
/* send copy binary headers to shard placements */ /* send copy binary headers to shard placements */
if (copyOutState->binary) if (copyOutState->binary)
{ {
SendCopyBinaryHeaders(copyOutState, SendCopyBinaryHeaders(copyOutState, currentShardId,
shardConnections->connectionList); shardConnections->connectionList);
} }
} }
@ -738,7 +677,8 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
resetStringInfo(copyOutState->fe_msgbuf); resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
copyOutState, columnOutputFunctions); copyOutState, columnOutputFunctions);
SendCopyDataToAll(copyOutState->fe_msgbuf, shardConnections->connectionList); SendCopyDataToAll(copyOutState->fe_msgbuf, currentShardId,
shardConnections->connectionList);
messageBufferSize = copyOutState->fe_msgbuf->len; messageBufferSize = copyOutState->fe_msgbuf->len;
copiedDataSizeInBytes = copiedDataSizeInBytes + messageBufferSize; copiedDataSizeInBytes = copiedDataSizeInBytes + messageBufferSize;
@ -753,13 +693,14 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
{ {
if (copyOutState->binary) if (copyOutState->binary)
{ {
SendCopyBinaryFooters(copyOutState, SendCopyBinaryFooters(copyOutState, currentShardId,
shardConnections->connectionList); shardConnections->connectionList);
} }
FinalizeCopyToNewShard(shardConnections); FinalizeCopyToNewShard(currentShardId, shardConnections);
MasterUpdateShardStatistics(shardConnections->shardId); MasterUpdateShardStatistics(shardConnections->shardId);
copiedDataSizeInBytes = 0; copiedDataSizeInBytes = 0;
currentShardId = INVALID_SHARD_ID;
} }
processedRowCount += 1; processedRowCount += 1;
@ -773,12 +714,14 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
*/ */
if (copiedDataSizeInBytes > 0) if (copiedDataSizeInBytes > 0)
{ {
Assert(currentShardId != INVALID_SHARD_ID);
if (copyOutState->binary) if (copyOutState->binary)
{ {
SendCopyBinaryFooters(copyOutState, SendCopyBinaryFooters(copyOutState, currentShardId,
shardConnections->connectionList); shardConnections->connectionList);
} }
FinalizeCopyToNewShard(shardConnections); FinalizeCopyToNewShard(currentShardId, shardConnections);
MasterUpdateShardStatistics(shardConnections->shardId); MasterUpdateShardStatistics(shardConnections->shardId);
} }
@ -793,17 +736,6 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
snprintf(completionTag, COMPLETION_TAG_BUFSIZE, snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processedRowCount); "COPY " UINT64_FORMAT, processedRowCount);
} }
}
PG_CATCH();
{
/* roll back all transactions */
EndRemoteCopy(shardConnections->connectionList, false);
AbortRemoteTransactions(shardConnections->connectionList);
CloseConnections(shardConnections->connectionList);
PG_RE_THROW();
}
PG_END_TRY();
} }
@ -859,7 +791,7 @@ MasterPartitionMethod(RangeVar *relation)
StringInfo partitionMethodCommand = makeStringInfo(); StringInfo partitionMethodCommand = makeStringInfo();
appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName); appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName);
queryResult = PQexec(masterConnection, partitionMethodCommand->data); queryResult = PQexec(masterConnection->conn, partitionMethodCommand->data);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{ {
char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0); char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0);
@ -873,7 +805,7 @@ MasterPartitionMethod(RangeVar *relation)
} }
else else
{ {
WarnRemoteError(masterConnection, queryResult); ReportResultError(masterConnection, queryResult, WARNING);
ereport(ERROR, (errmsg("could not get the partition method of the " ereport(ERROR, (errmsg("could not get the partition method of the "
"distributed table"))); "distributed table")));
} }
@ -913,25 +845,65 @@ RemoveMasterOptions(CopyStmt *copyStatement)
} }
static HTAB *
CreateShardConnectionHash(void)
{
HTAB *shardConnectionsHash = NULL;
int hashFlags = 0;
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(int64);
info.entrysize = sizeof(ShardConnections);
info.hash = tag_hash;
info.hcxt = TopTransactionContext;
hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT;
shardConnectionsHash = hash_create("Shard Connections Hash",
256, &info,
hashFlags);
return shardConnectionsHash;
}
static ShardConnections *
GetShardConnections(HTAB *shardConnectionHash, int64 shardId,
bool *shardConnectionsFound)
{
ShardConnections *shardConnections = NULL;
shardConnections = (ShardConnections *) hash_search(shardConnectionHash,
&shardId,
HASH_ENTER,
shardConnectionsFound);
if (!*shardConnectionsFound)
{
shardConnections->shardId = shardId;
shardConnections->connectionList = NIL;
}
return shardConnections;
}
/* /*
* OpenCopyTransactions opens a connection for each placement of a shard and * OpenCopyConnections opens a connection for each placement of a shard and
* starts a COPY transaction. If a connection cannot be opened, then the shard * starts COPY. If a connection cannot be opened, then the shard placement is
* placement is marked as inactive and the COPY continues with the remaining * marked as inactive and the COPY continues with the remaining shard
* shard placements. * placements.
*/ */
static void static void
OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections, OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
bool stopOnFailure, bool useBinaryCopyFormat) bool stopOnFailure, bool useBinaryCopyFormat)
{ {
List *finalizedPlacementList = NIL; List *finalizedPlacementList = NIL;
List *failedPlacementList = NIL; List *failedPlacementList = NIL;
ListCell *placementCell = NULL; ListCell *placementCell = NULL;
ListCell *failedPlacementCell = NULL;
List *connectionList = NULL; List *connectionList = NULL;
int64 shardId = shardConnections->shardId; int64 shardId = shardConnections->shardId;
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"OpenCopyTransactions", "OpenCopyConnections",
ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
@ -943,28 +915,31 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
if (XactModificationLevel > XACT_MODIFICATION_NONE)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("distributed copy operations must not appear in "
"transaction blocks containing other distributed "
"modifications")));
}
foreach(placementCell, finalizedPlacementList) foreach(placementCell, finalizedPlacementList)
{ {
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
char *nodeName = placement->nodeName; char *nodeName = placement->nodeName;
int nodePort = placement->nodePort; int nodePort = placement->nodePort;
char *nodeUser = CurrentUserName(); MultiConnection *connection = NULL;
PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser); uint32 connectionFlags = NEW_CONNECTION | CACHED_CONNECTION | FOR_DML;
TransactionConnection *transactionConnection = NULL;
StringInfo copyCommand = NULL; StringInfo copyCommand = NULL;
PGresult *result = NULL; PGresult *result = NULL;
if (stopOnFailure)
{
connectionFlags |= CRITICAL_CONNECTION;
}
/*
* FIXME: should make connection establishment parallel, by using
* StartPlacementConnection etc.
*/
connection = GetPlacementConnection(connectionFlags, placement);
ClaimConnectionExclusively(connection);
if (connection == NULL) if (connection == NULL)
{ {
/* FIXME: remove or replace with working code */
if (stopOnFailure) if (stopOnFailure)
{ {
ereport(ERROR, (errmsg("could not open connection to %s:%d", ereport(ERROR, (errmsg("could not open connection to %s:%d",
@ -975,39 +950,26 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
continue; continue;
} }
result = PQexec(connection, "BEGIN"); /* start transaction unless already done so */
if (PQresultStatus(result) != PGRES_COMMAND_OK) AdjustRemoteTransactionState(connection);
{
WarnRemoteError(connection, result);
failedPlacementList = lappend(failedPlacementList, placement);
PQclear(result);
continue;
}
PQclear(result);
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId, copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId,
useBinaryCopyFormat); useBinaryCopyFormat);
result = PQexec(connection, copyCommand->data); result = PQexec(connection->conn, copyCommand->data);
if (PQresultStatus(result) != PGRES_COPY_IN) if (PQresultStatus(result) != PGRES_COPY_IN)
{ {
WarnRemoteError(connection, result); ReportResultError(connection, result, WARNING);
failedPlacementList = lappend(failedPlacementList, placement);
PQclear(result); PQclear(result);
MarkRemoteTransactionFailed(connection, true);
/* failed placements will be invalidated by transaction machinery */
continue; continue;
} }
PQclear(result); PQclear(result);
transactionConnection = palloc0(sizeof(TransactionConnection)); connectionList = lappend(connectionList, connection);
transactionConnection->connectionId = shardConnections->shardId;
transactionConnection->transactionState = TRANSACTION_STATE_COPY_STARTED;
transactionConnection->connection = connection;
connectionList = lappend(connectionList, transactionConnection);
} }
/* if all placements failed, error out */ /* if all placements failed, error out */
@ -1016,20 +978,6 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
ereport(ERROR, (errmsg("could not find any active placements"))); ereport(ERROR, (errmsg("could not find any active placements")));
} }
/*
* If stopOnFailure is true, we just error out and code execution should
* never reach to this point. This is the case for copy from worker nodes.
*/
Assert(!stopOnFailure || list_length(failedPlacementList) == 0);
/* otherwise, mark failed placements as inactive: they're stale */
foreach(failedPlacementCell, failedPlacementList)
{
ShardPlacement *failedPlacement = (ShardPlacement *) lfirst(failedPlacementCell);
UpdateShardPlacementState(failedPlacement->placementId, FILE_INACTIVE);
}
shardConnections->connectionList = connectionList; shardConnections->connectionList = connectionList;
MemoryContextReset(localContext); MemoryContextReset(localContext);
@ -1113,7 +1061,7 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
StringInfo shardPlacementsCommand = makeStringInfo(); StringInfo shardPlacementsCommand = makeStringInfo();
appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId); appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId);
queryResult = PQexec(masterConnection, shardPlacementsCommand->data); queryResult = PQexec(masterConnection->conn, shardPlacementsCommand->data);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{ {
int rowCount = PQntuples(queryResult); int rowCount = PQntuples(queryResult);
@ -1121,14 +1069,17 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
for (rowIndex = 0; rowIndex < rowCount; rowIndex++) for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
{ {
char *nodeName = PQgetvalue(queryResult, rowIndex, 0); char *placementIdString = PQgetvalue(queryResult, rowIndex, 0);
char *nodeName = PQgetvalue(queryResult, rowIndex, 1);
char *nodePortString = PQgetvalue(queryResult, rowIndex, 1); char *nodePortString = PQgetvalue(queryResult, rowIndex, 2);
uint64 placementId = atol(placementIdString);
uint32 nodePort = atoi(nodePortString); uint32 nodePort = atoi(nodePortString);
ShardPlacement *shardPlacement = ShardPlacement *shardPlacement =
(ShardPlacement *) palloc0(sizeof(ShardPlacement)); (ShardPlacement *) palloc0(sizeof(ShardPlacement));
shardPlacement->shardId = shardId;
shardPlacement->placementId = placementId;
shardPlacement->nodeName = nodeName; shardPlacement->nodeName = nodeName;
shardPlacement->nodePort = nodePort; shardPlacement->nodePort = nodePort;
@ -1146,21 +1097,21 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
/* Send copy binary headers to given connections */ /* Send copy binary headers to given connections */
static void static void
SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList) SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId, List *connectionList)
{ {
resetStringInfo(copyOutState->fe_msgbuf); resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryHeaders(copyOutState); AppendCopyBinaryHeaders(copyOutState);
SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList); SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, connectionList);
} }
/* Send copy binary footers to given connections */ /* Send copy binary footers to given connections */
static void static void
SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList) SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId, List *connectionList)
{ {
resetStringInfo(copyOutState->fe_msgbuf); resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryFooters(copyOutState); AppendCopyBinaryFooters(copyOutState);
SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList); SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, connectionList);
} }
@ -1203,17 +1154,15 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCop
* SendCopyDataToAll sends copy data to all connections in a list. * SendCopyDataToAll sends copy data to all connections in a list.
*/ */
static void static void
SendCopyDataToAll(StringInfo dataBuffer, List *connectionList) SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList)
{ {
ListCell *connectionCell = NULL; ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList) foreach(connectionCell, connectionList)
{ {
TransactionConnection *transactionConnection = MultiConnection *connection =
(TransactionConnection *) lfirst(connectionCell); (MultiConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 shardId = transactionConnection->connectionId;
SendCopyDataToPlacement(dataBuffer, connection, shardId); SendCopyDataToPlacement(dataBuffer, shardId, connection);
} }
} }
@ -1223,73 +1172,71 @@ SendCopyDataToAll(StringInfo dataBuffer, List *connectionList)
* over the given connection. * over the given connection.
*/ */
static void static void
SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, int64 shardId) SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *connection)
{ {
int copyResult = PQputCopyData(connection, dataBuffer->data, dataBuffer->len); int copyResult = PQputCopyData(connection->conn, dataBuffer->data, dataBuffer->len);
if (copyResult != 1) if (copyResult != 1)
{ {
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
ereport(ERROR, (errcode(ERRCODE_IO_ERROR), ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to COPY to shard %ld on %s:%s", errmsg("failed to COPY to shard %ld on %s:%d",
shardId, nodeName, nodePort))); shardId, connection->hostname, connection->port)));
} }
} }
/* /*
* EndRemoteCopy ends the COPY input on all connections. If stopOnFailure * EndRemoteCopy ends the COPY input on all connections. If stopOnFailure is
* is true, then EndRemoteCopy reports an error on failure, otherwise it * true, then EndRemoteCopy reports an error on failure, otherwise it reports
* reports a warning or continues. * a warning or continues.
*/ */
static void static void
EndRemoteCopy(List *connectionList, bool stopOnFailure) EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure)
{ {
ListCell *connectionCell = NULL; ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList) foreach(connectionCell, connectionList)
{ {
TransactionConnection *transactionConnection = MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 shardId = transactionConnection->connectionId;
int copyEndResult = 0; int copyEndResult = 0;
PGresult *result = NULL; PGresult *result = NULL;
#if 0
/* FIXME: Is this actually a relevant case? */
if (transactionConnection->transactionState != TRANSACTION_STATE_COPY_STARTED) if (transactionConnection->transactionState != TRANSACTION_STATE_COPY_STARTED)
{ {
/* a failure occurred after having previously called EndRemoteCopy */ /* a failure occurred after having previously called EndRemoteCopy */
continue; continue;
} }
#endif
/* end the COPY input */ /* end the COPY input */
copyEndResult = PQputCopyEnd(connection, NULL); copyEndResult = PQputCopyEnd(connection->conn, NULL);
transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
if (copyEndResult != 1) if (copyEndResult != 1)
{ {
if (stopOnFailure) if (stopOnFailure)
{ {
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
ereport(ERROR, (errcode(ERRCODE_IO_ERROR), ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to COPY to shard %ld on %s:%s", errmsg("failed to COPY to shard %ld on %s:%d",
shardId, nodeName, nodePort))); shardId, connection->hostname, connection->port)));
} }
continue; continue;
} }
/* check whether there were any COPY errors */ /* check whether there were any COPY errors */
result = PQgetResult(connection); result = PQgetResult(connection->conn);
if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure) if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure)
{ {
ReportCopyError(connection, result); ReportCopyError(connection, result);
} }
PQclear(result); PQclear(result);
result = PQgetResult(connection->conn);
Assert(!result);
UnclaimConnection(connection);
} }
} }
@ -1299,7 +1246,7 @@ EndRemoteCopy(List *connectionList, bool stopOnFailure)
* the remote COPY error messages. * the remote COPY error messages.
*/ */
static void static void
ReportCopyError(PGconn *connection, PGresult *result) ReportCopyError(MultiConnection *connection, PGresult *result)
{ {
char *remoteMessage = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY); char *remoteMessage = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
@ -1315,10 +1262,8 @@ ReportCopyError(PGconn *connection, PGresult *result)
{ {
/* probably a connection problem, get the message from the connection */ /* probably a connection problem, get the message from the connection */
char *lastNewlineIndex = NULL; char *lastNewlineIndex = NULL;
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
remoteMessage = PQerrorMessage(connection); remoteMessage = PQerrorMessage(connection->conn);
lastNewlineIndex = strrchr(remoteMessage, '\n'); lastNewlineIndex = strrchr(remoteMessage, '\n');
/* trim trailing newline, if any */ /* trim trailing newline, if any */
@ -1328,7 +1273,8 @@ ReportCopyError(PGconn *connection, PGresult *result)
} }
ereport(ERROR, (errcode(ERRCODE_IO_ERROR), ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to complete COPY on %s:%s", nodeName, nodePort), errmsg("failed to complete COPY on %s:%u",
connection->hostname, connection->port),
errdetail("%s", remoteMessage))); errdetail("%s", remoteMessage)));
} }
} }
@ -1531,7 +1477,7 @@ AppendCopyBinaryFooters(CopyOutState footerOutputState)
* StartCopyToNewShard creates a new shard and related shard placements and * StartCopyToNewShard creates a new shard and related shard placements and
* opens connections to shard placements. * opens connections to shard placements.
*/ */
static void static int64
StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement, StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement,
bool useBinaryCopyFormat) bool useBinaryCopyFormat)
{ {
@ -1543,11 +1489,12 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement,
shardConnections->shardId = shardId; shardConnections->shardId = shardId;
list_free_deep(shardConnections->connectionList);
shardConnections->connectionList = NIL; shardConnections->connectionList = NIL;
/* connect to shards placements and start transactions */ /* connect to shards placements and start transactions */
OpenCopyTransactions(copyStatement, shardConnections, true, useBinaryCopyFormat); OpenCopyConnections(copyStatement, shardConnections, true, useBinaryCopyFormat);
return shardId;
} }
@ -1604,7 +1551,7 @@ RemoteCreateEmptyShard(char *relationName)
StringInfo createEmptyShardCommand = makeStringInfo(); StringInfo createEmptyShardCommand = makeStringInfo();
appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName); appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName);
queryResult = PQexec(masterConnection, createEmptyShardCommand->data); queryResult = PQexec(masterConnection->conn, createEmptyShardCommand->data);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{ {
char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0); char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0);
@ -1613,7 +1560,7 @@ RemoteCreateEmptyShard(char *relationName)
} }
else else
{ {
WarnRemoteError(masterConnection, queryResult); ReportResultError(masterConnection, queryResult, WARNING);
ereport(ERROR, (errmsg("could not create a new empty shard on the remote node"))); ereport(ERROR, (errmsg("could not create a new empty shard on the remote node")));
} }
@ -1624,18 +1571,23 @@ RemoteCreateEmptyShard(char *relationName)
/* /*
* FinalizeCopyToNewShard commits copy transaction and closes connections to * FinalizeCopyToNewShard ends the COPY and marks connection as inactive for
* shard placements. * all shard placements.
*/ */
static void static void
FinalizeCopyToNewShard(ShardConnections *shardConnections) FinalizeCopyToNewShard(int64 shardId, ShardConnections *shardConnections)
{ {
/* close the COPY input on all shard placements */ ListCell *connectionCell = NULL;
EndRemoteCopy(shardConnections->connectionList, true);
/* commit transactions and close connections */ /* close the COPY input on all shard placements */
CommitRemoteTransactions(shardConnections->connectionList, true); EndRemoteCopy(shardId, shardConnections->connectionList, true);
CloseConnections(shardConnections->connectionList);
foreach(connectionCell, shardConnections->connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
UnclaimConnection(connection);
}
} }
@ -1669,7 +1621,7 @@ RemoteUpdateShardStatistics(uint64 shardId)
appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY, appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY,
shardId); shardId);
queryResult = PQexec(masterConnection, updateShardStatisticsCommand->data); queryResult = PQexec(masterConnection->conn, updateShardStatisticsCommand->data);
if (PQresultStatus(queryResult) != PGRES_TUPLES_OK) if (PQresultStatus(queryResult) != PGRES_TUPLES_OK)
{ {
ereport(ERROR, (errmsg("could not update shard statistics"))); ereport(ERROR, (errmsg("could not update shard statistics")));

View File

@ -66,7 +66,7 @@
#define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s" #define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s"
#define CREATE_EMPTY_SHARD_QUERY "SELECT master_create_empty_shard('%s')" #define CREATE_EMPTY_SHARD_QUERY "SELECT master_create_empty_shard('%s')"
#define FINALIZED_SHARD_PLACEMENTS_QUERY \ #define FINALIZED_SHARD_PLACEMENTS_QUERY \
"SELECT nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = %ld" "SELECT placementid, nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = %ld"
#define UPDATE_SHARD_STATISTICS_QUERY \ #define UPDATE_SHARD_STATISTICS_QUERY \
"SELECT master_update_shard_statistics(%ld)" "SELECT master_update_shard_statistics(%ld)"
#define PARTITION_METHOD_QUERY "SELECT part_method FROM master_get_table_metadata('%s');" #define PARTITION_METHOD_QUERY "SELECT part_method FROM master_get_table_metadata('%s');"

View File

@ -160,31 +160,27 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
SELECT count(*) FROM researchers WHERE lab_id = 6; SELECT count(*) FROM researchers WHERE lab_id = 6;
ERROR: cannot open new connections after the first modification command within a transaction ERROR: cannot open new connections after the first modification command within a transaction
ABORT; ABORT;
-- COPY can't happen second, -- Check COPY can happen after INSERT
BEGIN; BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: COPY labs, line 1: "10,Weyland-Yutani"
COMMIT; COMMIT;
-- though it will work if before any modifications -- Check COPY can happen before INSERT
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
SELECT name FROM labs WHERE id = 10; SELECT name FROM labs WHERE id = 10;
name name
---------------- ----------------
Weyland-Yutani Weyland-Yutani
(1 row) Weyland-Yutani
(2 rows)
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
ERROR: cannot open new connections after the first modification command within a transaction
COMMIT; COMMIT;
-- but a double-copy isn't allowed (the first will persist) -- Two COPYs are also ok
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: COPY labs, line 1: "12,fsociety"
COMMIT; COMMIT;
SELECT name FROM labs WHERE id = 11; SELECT name FROM labs WHERE id = 11;
name name
@ -192,13 +188,12 @@ SELECT name FROM labs WHERE id = 11;
Planet Express Planet Express
(1 row) (1 row)
-- finally, ALTER and copy aren't compatible -- finally, check ALTER and copy are compatible
BEGIN; BEGIN;
ALTER TABLE labs ADD COLUMN motto2 text; ALTER TABLE labs ADD COLUMN motto2 text;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: COPY labs, line 1: "12,fsociety,lol"
COMMIT; COMMIT;
ALTER TABLE labs DROP COLUMN motto2;
-- but the DDL should correctly roll back -- but the DDL should correctly roll back
\d labs \d labs
Table "public.labs" Table "public.labs"
@ -207,30 +202,33 @@ COMMIT;
id | bigint | not null id | bigint | not null
name | text | not null name | text | not null
SELECT * FROM labs WHERE id = 12;
id | name
----+------
(0 rows)
-- and if the copy is before the ALTER...
BEGIN;
\copy labs from stdin delimiter ','
ALTER TABLE labs ADD COLUMN motto3 text;
ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications
COMMIT;
-- the DDL fails, but copy persists
\d labs
Table "public.labs"
Column | Type | Modifiers
--------+--------+-----------
id | bigint | not null
name | text | not null
SELECT * FROM labs WHERE id = 12; SELECT * FROM labs WHERE id = 12;
id | name id | name
----+---------- ----+----------
12 | fsociety 12 | fsociety
(1 row) 12 | fsociety
(2 rows)
-- and if the copy is before the ALTER...
BEGIN;
\copy labs from stdin delimiter ','
ALTER TABLE labs ADD COLUMN motto3 text;
ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications
COMMIT;
-- the DDL fails, and copy does not persist
\d labs
Table "public.labs"
Column | Type | Modifiers
--------+--------+-----------
id | bigint | not null
name | text | not null
SELECT * FROM labs WHERE id = 12;
id | name
----+----------
12 | fsociety
12 | fsociety
(2 rows)
-- now, for some special failures... -- now, for some special failures...
CREATE TABLE objects ( CREATE TABLE objects (

View File

@ -702,8 +702,4 @@ SELECT master_create_distributed_table('composite_partition_column_table', 'comp
WARNING: function min(number_pack) does not exist WARNING: function min(number_pack) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts. HINT: No function matches the given name and argument types. You might need to add explicit type casts.
CONTEXT: while executing command on localhost:57637 CONTEXT: while executing command on localhost:57637
WARNING: function min(number_pack) does not exist ERROR: failure on connection marked as essential: localhost:57637
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
CONTEXT: while executing command on localhost:57638
WARNING: could not get statistics for shard public.composite_partition_column_table_560164
DETAIL: Setting shard statistics to NULL

View File

@ -131,7 +131,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
SELECT count(*) FROM researchers WHERE lab_id = 6; SELECT count(*) FROM researchers WHERE lab_id = 6;
ABORT; ABORT;
-- COPY can't happen second, -- Check COPY can happen after INSERT
BEGIN; BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
@ -139,7 +139,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
\. \.
COMMIT; COMMIT;
-- though it will work if before any modifications -- Check COPY can happen before INSERT
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
10,Weyland-Yutani 10,Weyland-Yutani
@ -148,7 +148,7 @@ SELECT name FROM labs WHERE id = 10;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
COMMIT; COMMIT;
-- but a double-copy isn't allowed (the first will persist) -- Two COPYs are also ok
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
11,Planet Express 11,Planet Express
@ -160,13 +160,14 @@ COMMIT;
SELECT name FROM labs WHERE id = 11; SELECT name FROM labs WHERE id = 11;
-- finally, ALTER and copy aren't compatible -- finally, check ALTER and copy are compatible
BEGIN; BEGIN;
ALTER TABLE labs ADD COLUMN motto2 text; ALTER TABLE labs ADD COLUMN motto2 text;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
12,fsociety,lol 12,fsociety,lol
\. \.
COMMIT; COMMIT;
ALTER TABLE labs DROP COLUMN motto2;
-- but the DDL should correctly roll back -- but the DDL should correctly roll back
\d labs \d labs
@ -180,7 +181,7 @@ BEGIN;
ALTER TABLE labs ADD COLUMN motto3 text; ALTER TABLE labs ADD COLUMN motto3 text;
COMMIT; COMMIT;
-- the DDL fails, but copy persists -- the DDL fails, and copy does not persist
\d labs \d labs
SELECT * FROM labs WHERE id = 12; SELECT * FROM labs WHERE id = 12;