Make COPY compatible with unified executor.

pull/2780/head
Hadi Moshayedi 2019-06-20 14:44:25 +02:00
parent 17d4d3e5ea
commit 4bbae02778
23 changed files with 773 additions and 203 deletions

View File

@ -12,9 +12,15 @@
* parses the master node copy options and handles communication with the master
* node.
*
* It opens a new connection for every shard placement and uses the PQputCopyData
* function to copy the data. Because PQputCopyData transmits data, asynchronously,
* the workers will ingest data at least partially in parallel.
* If this is the first command in the transaction, we open a new connection for
* every shard placement. Otherwise we open as many connections as we can to
* not conflict with previous commands in transactions, in which case some shards
* may share connections. See the comments of CopyConnectionState for how we
* operate in that case.
*
* We use the PQputCopyData function to copy the data. Because PQputCopyData
* transmits data asynchronously, the workers will ingest data at least partially
* in parallel.
*
* For hash-partitioned tables, if it fails to connect to a worker, the master
* marks the placement for which it was trying to open a connection as inactive,
@ -70,6 +76,7 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/multi_executor.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
@ -96,6 +103,86 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* use a global connection to the master node in order to skip passing it around */
static MultiConnection *masterConnection = NULL;
/*
* Data size threshold to switch over the active placement for a connection.
* If this is too low, overhead of starting COPY commands will hurt the
* performance. If this is too high, buffered data will use lots of memory.
* 4MB is a good balance between memory usage and performance. Note that this
* is irrelevant in the common case where we open one connection per placement.
*/
#define COPY_SWITCH_OVER_THRESHOLD (4 * 1024 * 1024)
typedef struct CopyShardState CopyShardState;
typedef struct CopyPlacementState CopyPlacementState;
/*
* Multiple shard placements can share one connection. Each connection has one
* of those placements as the activePlacementState, and others in the
* bufferedPlacementList. When we want to send a tuple to a CopyPlacementState,
* we check if it is the active one in its connectionState, and in this case we
* directly put data on wire. Otherwise, we buffer it so we can put it on wire
* later, when copy ends or a switch-over happens. See CitusSendTupleToPlacements()
* for more details.
*
* This is done so we are compatible with executor.c. If a previous command
* in the current transaction has been executed using executor.c, then
* CopyGetPlacementConnection() might return the same connection for multiple
* placements. We support that case by the buffering mechanism described above.
*
* If no previous command in the current transaction has used executor.c, then
* CopyGetPlacementConnection() returns one connection per placement and no
* buffering happens and we put the copy data directly on connection.
*/
typedef struct CopyConnectionState
{
/* Used as hash key. Equal to PQsocket(connection->pgConn). */
int socket;
MultiConnection *connection;
/*
* Placement for which we have an active COPY going on over connection.
* Can be NULL.
*/
CopyPlacementState *activePlacementState;
/*
* Other placements that we are buffering data for. Later when a switch-over
* happens, we remove an item from this list and set it to activePlacementState.
* In this case, old activePlacementState isn't NULL, is added to this list.
*/
dlist_head bufferedPlacementList;
} CopyConnectionState;
struct CopyPlacementState
{
/* Connection state to which the placemement is assigned to. */
CopyConnectionState *connectionState;
/* State of shard to which the placement belongs to. */
CopyShardState *shardState;
/*
* Buffered COPY data. When the placement is activePlacementState of
* some connection, this is empty. Because in that case we directly
* send the data over connection.
*/
StringInfo data;
/* List node for CopyConnectionState->bufferedPlacementList. */
dlist_node bufferedPlacementNode;
};
struct CopyShardState
{
/* Used as hash key. */
uint64 shardId;
/* List of CopyPlacementStates for all active placements of the shard. */
List *placementStateList;
};
/* Local functions forward declarations */
static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag);
@ -103,9 +190,10 @@ static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId);
static char MasterPartitionMethod(RangeVar *relation);
static void RemoveMasterOptions(CopyStmt *copyStatement);
static void OpenCopyConnections(CopyStmt *copyStatement,
ShardConnections *shardConnections, bool stopOnFailure,
bool useBinaryCopyFormat);
static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
ShardConnections *shardConnections, bool
stopOnFailure,
bool useBinaryCopyFormat);
static bool BinaryOutputFunctionDefined(Oid typeId);
static List * MasterShardPlacementList(uint64 shardId);
@ -145,6 +233,27 @@ static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName);
static bool IsCopyFromWorker(CopyStmt *copyStatement);
static NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
static void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
static HTAB * CreateConnectionStateHash(MemoryContext memoryContext);
static HTAB * CreateShardStateHash(MemoryContext memoryContext);
static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash,
MultiConnection *connection);
static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash, bool stopOnFailure,
bool *found);
static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement,
bool stopOnFailure);
static List * ConnectionStateList(HTAB *connectionStateHash);
static void InitializeCopyShardState(CopyShardState *shardState,
HTAB *connectionStateHash,
uint64 shardId, bool stopOnFailure);
static void StartPlacementStateCopyCommand(CopyPlacementState *placementState,
CopyStmt *copyStatement,
CopyOutState copyOutState);
static void EndPlacementStateCopyCommand(CopyPlacementState *placementState,
CopyOutState copyOutState);
static void UnclaimCopyConnections(List *connectionStateList);
static void ShutdownCopyConnectionState(CopyConnectionState *connectionState,
CitusCopyDestReceiver *copyDest);
/* Private functions copied and adapted from copy.c in PostgreSQL */
static void CopySendData(CopyOutState outputState, const void *databuf, int datasize);
@ -790,14 +899,15 @@ RemoveMasterOptions(CopyStmt *copyStatement)
/*
* OpenCopyConnections opens a connection for each placement of a shard and
* OpenCopyConnectionsForNewShards opens a connection for each placement of a shard and
* starts a COPY transaction if necessary. If a connection cannot be opened,
* then the shard placement is marked as inactive and the COPY continues with the remaining
* shard placements.
*/
static void
OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
bool stopOnFailure, bool useBinaryCopyFormat)
OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
ShardConnections *shardConnections,
bool stopOnFailure, bool useBinaryCopyFormat)
{
List *finalizedPlacementList = NIL;
int failedPlacementCount = 0;
@ -807,7 +917,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
bool raiseInterrupts = true;
MemoryContext localContext =
AllocSetContextCreateExtended(CurrentMemoryContext,
"OpenCopyConnections",
"OpenCopyConnectionsForNewShards",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
@ -830,13 +940,10 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
PGresult *result = NULL;
/*
* Make sure we use a separate connection per placement for hash-distributed
* tables in order to allow multi-shard modifications in the same transaction.
* For hash partitioned tables, connection establishment happens in
* CopyGetPlacementConnection().
*/
if (placement->partitionMethod == DISTRIBUTE_BY_HASH)
{
connectionFlags |= CONNECTION_PER_PLACEMENT;
}
Assert(placement->partitionMethod != DISTRIBUTE_BY_HASH);
connection = GetPlacementConnection(connectionFlags, placement, nodeUser);
@ -1748,8 +1855,8 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement,
shardConnections->connectionList = NIL;
/* connect to shards placements and start transactions */
OpenCopyConnections(copyStatement, shardConnections, stopOnFailure,
useBinaryCopyFormat);
OpenCopyConnectionsForNewShards(copyStatement, shardConnections, stopOnFailure,
useBinaryCopyFormat);
return shardId;
}
@ -2237,7 +2344,8 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyStatement->filename = NULL;
copyDest->copyStatement = copyStatement;
copyDest->shardConnectionHash = CreateShardConnectionHash(TopTransactionContext);
copyDest->shardStateHash = CreateShardStateHash(TopTransactionContext);
copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext);
}
@ -2262,8 +2370,8 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
* We might be able to recover from errors with ROLLBACK TO SAVEPOINT,
* so unclaim the connections before throwing errors.
*/
HTAB *shardConnectionHash = copyDest->shardConnectionHash;
UnclaimAllShardConnections(shardConnectionHash);
List *connectionStateList = ConnectionStateList(copyDest->connectionStateHash);
UnclaimCopyConnections(connectionStateList);
PG_RE_THROW();
}
@ -2283,10 +2391,13 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
TupleDesc tupleDescriptor = copyDest->tupleDescriptor;
CopyStmt *copyStatement = copyDest->copyStatement;
HTAB *shardConnectionHash = copyDest->shardConnectionHash;
CopyShardState *shardState = NULL;
CopyOutState copyOutState = copyDest->copyOutState;
FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions;
CopyCoercionData *columnCoercionPaths = copyDest->columnCoercionPaths;
ListCell *placementStateCell = NULL;
bool cachedShardStateFound = false;
bool firstTupleInShard = false;
bool stopOnFailure = copyDest->stopOnFailure;
@ -2295,9 +2406,6 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
int64 shardId = 0;
bool shardConnectionsFound = false;
ShardConnections *shardConnections = NULL;
EState *executorState = copyDest->executorState;
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
@ -2312,46 +2420,93 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
/* connections hash is kept in memory context */
MemoryContextSwitchTo(copyDest->memoryContext);
/* get existing connections to the shard placements, if any */
shardConnections = GetShardHashConnections(shardConnectionHash, shardId,
&shardConnectionsFound);
if (!shardConnectionsFound)
shardState = GetShardState(shardId, copyDest->shardStateHash,
copyDest->connectionStateHash, stopOnFailure,
&cachedShardStateFound);
if (!cachedShardStateFound)
{
/*
* Keep track of multi shard accesses before opening connection
* the second shard.
*/
if (!copyDest->multiShardCopy && hash_get_num_entries(shardConnectionHash) == 2)
firstTupleInShard = true;
}
if (firstTupleInShard && !copyDest->multiShardCopy &&
hash_get_num_entries(copyDest->shardStateHash) == 2)
{
Oid relationId = copyDest->distributedRelationId;
/* mark as multi shard to skip doing the same thing over and over */
copyDest->multiShardCopy = true;
if (MultiShardConnectionType != SEQUENTIAL_CONNECTION)
{
Oid relationId = copyDest->distributedRelationId;
/* mark as multi shard to skip doing the same thing over and over */
copyDest->multiShardCopy = true;
/* error out of conflicting COPY */
CheckConflictingParallelCopyAccesses(relationId);
/* when we see multiple shard connections, we mark COPY as parallel modify */
RecordParallelModifyAccess(relationId);
}
/* open connections and initiate COPY on shard placements */
OpenCopyConnections(copyStatement, shardConnections, stopOnFailure,
copyOutState->binary);
/* send copy binary headers to shard placements */
if (copyOutState->binary)
{
SendCopyBinaryHeaders(copyOutState, shardId,
shardConnections->connectionList);
}
}
/* replicate row to shard placements */
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
copyOutState, columnOutputFunctions, columnCoercionPaths);
SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, shardConnections->connectionList);
foreach(placementStateCell, shardState->placementStateList)
{
CopyPlacementState *currentPlacementState = lfirst(placementStateCell);
CopyConnectionState *connectionState = currentPlacementState->connectionState;
CopyPlacementState *activePlacementState = connectionState->activePlacementState;
bool switchToCurrentPlacement = false;
bool sendTupleOverConnection = false;
if (activePlacementState == NULL)
{
switchToCurrentPlacement = true;
}
else if (currentPlacementState != activePlacementState &&
currentPlacementState->data->len > COPY_SWITCH_OVER_THRESHOLD)
{
switchToCurrentPlacement = true;
/* before switching, make sure to finish the copy */
EndPlacementStateCopyCommand(activePlacementState, copyOutState);
dlist_push_head(&connectionState->bufferedPlacementList,
&activePlacementState->bufferedPlacementNode);
}
if (switchToCurrentPlacement)
{
StartPlacementStateCopyCommand(currentPlacementState, copyStatement,
copyOutState);
dlist_delete(&currentPlacementState->bufferedPlacementNode);
connectionState->activePlacementState = currentPlacementState;
/* send previously buffered tuples */
SendCopyDataToPlacement(currentPlacementState->data, shardId,
connectionState->connection);
resetStringInfo(currentPlacementState->data);
/* additionaly, we need to send the current tuple too */
sendTupleOverConnection = true;
}
else if (currentPlacementState != activePlacementState)
{
/* buffer data */
StringInfo copyBuffer = copyOutState->fe_msgbuf;
resetStringInfo(copyBuffer);
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
copyOutState, columnOutputFunctions,
columnCoercionPaths);
appendBinaryStringInfo(currentPlacementState->data, copyBuffer->data,
copyBuffer->len);
}
else
{
Assert(currentPlacementState == activePlacementState);
sendTupleOverConnection = true;
}
if (sendTupleOverConnection)
{
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
copyOutState, columnOutputFunctions, columnCoercionPaths);
SendCopyDataToPlacement(copyOutState->fe_msgbuf, shardId,
connectionState->connection);
}
}
MemoryContextSwitchTo(oldContext);
@ -2441,30 +2596,21 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
{
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) destReceiver;
HTAB *shardConnectionHash = copyDest->shardConnectionHash;
List *shardConnectionsList = NIL;
ListCell *shardConnectionsCell = NULL;
CopyOutState copyOutState = copyDest->copyOutState;
HTAB *connectionStateHash = copyDest->connectionStateHash;
List *connectionStateList = NIL;
ListCell *connectionStateCell = NULL;
Relation distributedRelation = copyDest->distributedRelation;
shardConnectionsList = ShardConnectionList(shardConnectionHash);
connectionStateList = ConnectionStateList(connectionStateHash);
PG_TRY();
{
foreach(shardConnectionsCell, shardConnectionsList)
foreach(connectionStateCell, connectionStateList)
{
ShardConnections *shardConnections = (ShardConnections *) lfirst(
shardConnectionsCell);
CopyConnectionState *connectionState =
(CopyConnectionState *) lfirst(connectionStateCell);
/* send copy binary footers to all shard placements */
if (copyOutState->binary)
{
SendCopyBinaryFooters(copyOutState, shardConnections->shardId,
shardConnections->connectionList);
}
/* close the COPY input on all shard placements */
EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList);
ShutdownCopyConnectionState(connectionState, copyDest);
}
}
PG_CATCH();
@ -2473,7 +2619,7 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
* We might be able to recover from errors with ROLLBACK TO SAVEPOINT,
* so unclaim the connections before throwing errors.
*/
UnclaimAllShardConnections(shardConnectionHash);
UnclaimCopyConnections(connectionStateList);
PG_RE_THROW();
}
@ -2483,6 +2629,40 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
}
/*
* ShutdownCopyConnectionState ends the copy command for the current active
* placement on connection, and then sends the rest of the buffers over the
* connection.
*/
static void
ShutdownCopyConnectionState(CopyConnectionState *connectionState,
CitusCopyDestReceiver *copyDest)
{
CopyOutState copyOutState = copyDest->copyOutState;
CopyStmt *copyStatement = copyDest->copyStatement;
dlist_iter iter;
CopyPlacementState *activePlacementState = connectionState->activePlacementState;
if (activePlacementState != NULL)
{
EndPlacementStateCopyCommand(activePlacementState, copyOutState);
}
dlist_foreach(iter, &connectionState->bufferedPlacementList)
{
CopyPlacementState *placementState =
dlist_container(CopyPlacementState, bufferedPlacementNode, iter.cur);
uint64 shardId = placementState->shardState->shardId;
StartPlacementStateCopyCommand(placementState, copyStatement,
copyOutState);
SendCopyDataToPlacement(placementState->data, shardId,
connectionState->connection);
EndPlacementStateCopyCommand(placementState, copyOutState);
}
}
static void
CitusCopyDestReceiverDestroy(DestReceiver *destReceiver)
{
@ -2924,3 +3104,363 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
return attnums;
/* *INDENT-ON* */
}
/*
* CreateConnectionStateHash constructs a hash table which maps from socket
* number to CopyConnectionState, passing the provided MemoryContext to
* hash_create for hash allocations.
*/
static HTAB *
CreateConnectionStateHash(MemoryContext memoryContext)
{
HTAB *connectionStateHash = NULL;
int hashFlags = 0;
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(int);
info.entrysize = sizeof(CopyConnectionState);
info.hcxt = memoryContext;
hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
connectionStateHash = hash_create("Copy Connection State Hash", 128, &info,
hashFlags);
return connectionStateHash;
}
/*
* CreateShardStateHash constructs a hash table which maps from shard
* identifier to CopyShardState, passing the provided MemoryContext to
* hash_create for hash allocations.
*/
static HTAB *
CreateShardStateHash(MemoryContext memoryContext)
{
HTAB *shardStateHash = NULL;
int hashFlags = 0;
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(uint64);
info.entrysize = sizeof(CopyShardState);
info.hcxt = memoryContext;
hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
shardStateHash = hash_create("Copy Shard State Hash", 128, &info, hashFlags);
return shardStateHash;
}
/*
* GetConnectionState finds existing CopyConnectionState for a connection in the
* provided hash. If not found, then a default structure is returned.
*/
static CopyConnectionState *
GetConnectionState(HTAB *connectionStateHash, MultiConnection *connection)
{
CopyConnectionState *connectionState = NULL;
bool found = false;
int socket = PQsocket(connection->pgConn);
Assert(socket != -1);
connectionState = (CopyConnectionState *) hash_search(connectionStateHash, &socket,
HASH_ENTER, &found);
if (!found)
{
connectionState->socket = socket;
connectionState->connection = connection;
connectionState->activePlacementState = NULL;
dlist_init(&connectionState->bufferedPlacementList);
}
return connectionState;
}
/*
* ConnectionStateList returns all CopyConnectionState structures in
* the given hash.
*/
static List *
ConnectionStateList(HTAB *connectionStateHash)
{
List *connectionStateList = NIL;
HASH_SEQ_STATUS status;
CopyConnectionState *connectionState = NULL;
hash_seq_init(&status, connectionStateHash);
connectionState = (CopyConnectionState *) hash_seq_search(&status);
while (connectionState != NULL)
{
connectionStateList = lappend(connectionStateList, connectionState);
connectionState = (CopyConnectionState *) hash_seq_search(&status);
}
return connectionStateList;
}
/*
* GetShardState finds existing CopyShardState for a shard in the provided
* hash. If not found, then a new shard state is returned with all related
* CopyPlacementStates initialized.
*/
static CopyShardState *
GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash, bool stopOnFailure, bool *found)
{
CopyShardState *shardState = NULL;
shardState = (CopyShardState *) hash_search(shardStateHash, &shardId,
HASH_ENTER, found);
if (!*found)
{
InitializeCopyShardState(shardState, connectionStateHash,
shardId, stopOnFailure);
}
return shardState;
}
/*
* InitializeCopyShardState initializes the given shardState. It finds all
* placements for the given shardId, assignes connections to them, and
* adds them to shardState->placementStateList.
*/
static void
InitializeCopyShardState(CopyShardState *shardState,
HTAB *connectionStateHash, uint64 shardId,
bool stopOnFailure)
{
List *finalizedPlacementList = NIL;
ListCell *placementCell = NULL;
int failedPlacementCount = 0;
MemoryContext localContext =
AllocSetContextCreateExtended(CurrentMemoryContext,
"InitializeCopyShardState",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* release finalized placement list at the end of this function */
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
finalizedPlacementList = MasterShardPlacementList(shardId);
MemoryContextSwitchTo(oldContext);
shardState->shardId = shardId;
shardState->placementStateList = NIL;
foreach(placementCell, finalizedPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
CopyConnectionState *connectionState = NULL;
CopyPlacementState *placementState = NULL;
MultiConnection *connection =
CopyGetPlacementConnection(placement, stopOnFailure);
if (connection == NULL)
{
failedPlacementCount++;
continue;
}
connectionState = GetConnectionState(connectionStateHash, connection);
/*
* If this is the first time we are using this connection for copying a
* shard, send begin if necessary.
*/
if (connectionState->activePlacementState == NULL)
{
RemoteTransactionBeginIfNecessary(connection);
}
placementState = palloc0(sizeof(CopyPlacementState));
placementState->shardState = shardState;
placementState->data = makeStringInfo();
placementState->connectionState = connectionState;
/*
* We don't set connectionState->activePlacementState here even if it
* is NULL. Later in CitusSendTupleToPlacements() we set it at the
* same time as calling StartPlacementStateCopyCommand() so we actually
* know the COPY operation for the placement is ongoing.
*/
dlist_push_head(&connectionState->bufferedPlacementList,
&placementState->bufferedPlacementNode);
shardState->placementStateList = lappend(shardState->placementStateList,
placementState);
}
/* if all placements failed, error out */
if (failedPlacementCount == list_length(finalizedPlacementList))
{
ereport(ERROR, (errmsg("could not connect to any active placements")));
}
/*
* If stopOnFailure is true, we just error out and code execution should
* never reach to this point. This is the case for reference tables and
* copy from worker nodes.
*/
Assert(!stopOnFailure || failedPlacementCount == 0);
MemoryContextReset(localContext);
}
/*
* CopyGetPlacementConnection assigns a connection to the given placement. If
* a connection has already been assigned the placement in the current transaction
* then it reuses the connection. Otherwise, it requests a connection for placement.
*/
static MultiConnection *
CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure)
{
MultiConnection *connection = NULL;
uint32 connectionFlags = FOR_DML;
char *nodeUser = CurrentUserName();
ShardPlacementAccess *placementAccess = NULL;
/*
* Determine whether the task has to be assigned to a particular connection
* due to a preceding access to the placement in the same transaction.
*/
placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_DML);
connection = GetConnectionIfPlacementAccessedInXact(connectionFlags,
list_make1(placementAccess),
NULL);
if (connection != NULL)
{
return connection;
}
/*
* For placements that haven't been assigned a connection by a previous command
* in the current transaction, we use a separate connection per placement for
* hash-distributed tables in order to get the maximum performance.
*/
if (placement->partitionMethod == DISTRIBUTE_BY_HASH &&
MultiShardConnectionType != SEQUENTIAL_CONNECTION)
{
connectionFlags |= CONNECTION_PER_PLACEMENT;
}
connection = GetPlacementConnection(connectionFlags, placement, nodeUser);
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
if (stopOnFailure)
{
ReportConnectionError(connection, ERROR);
}
else
{
const bool raiseErrors = true;
HandleRemoteTransactionConnectionError(connection, raiseErrors);
return NULL;
}
}
/*
* Errors are supposed to cause immediate aborts (i.e. we don't
* want to/can't invalidate placements), mark the connection as
* critical so later errors cause failures.
*/
MarkRemoteTransactionCritical(connection);
if (MultiShardConnectionType != SEQUENTIAL_CONNECTION)
{
ClaimConnectionExclusively(connection);
}
return connection;
}
/*
* StartPlacementStateCopyCommand sends the COPY for the given placement. It also
* sends binary headers if this is a binary COPY.
*/
static void
StartPlacementStateCopyCommand(CopyPlacementState *placementState,
CopyStmt *copyStatement, CopyOutState copyOutState)
{
StringInfo copyCommand = NULL;
PGresult *result = NULL;
MultiConnection *connection = placementState->connectionState->connection;
uint64 shardId = placementState->shardState->shardId;
bool raiseInterrupts = true;
bool binaryCopy = copyOutState->binary;
copyCommand = ConstructCopyStatement(copyStatement, shardId, binaryCopy);
if (!SendRemoteCommand(connection, copyCommand->data))
{
ReportConnectionError(connection, ERROR);
}
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (PQresultStatus(result) != PGRES_COPY_IN)
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
if (binaryCopy)
{
SendCopyBinaryHeaders(copyOutState, shardId, list_make1(connection));
}
}
/*
* EndPlacementStateCopyCommand ends the COPY for the given placement. It also
* sends binary footers if this is a binary COPY.
*/
static void
EndPlacementStateCopyCommand(CopyPlacementState *placementState,
CopyOutState copyOutState)
{
MultiConnection *connection = placementState->connectionState->connection;
uint64 shardId = placementState->shardState->shardId;
bool binaryCopy = copyOutState->binary;
/* send footers and end copy command */
if (binaryCopy)
{
SendCopyBinaryFooters(copyOutState, shardId, list_make1(connection));
}
EndRemoteCopy(shardId, list_make1(connection));
}
/*
* UnclaimCopyConnections unclaims all the connections used for COPY.
*/
static void
UnclaimCopyConnections(List *connectionStateList)
{
ListCell *connectionStateCell = NULL;
foreach(connectionStateCell, connectionStateList)
{
CopyConnectionState *connectionState = lfirst(connectionStateCell);
UnclaimConnection(connectionState->connection);
}
}

View File

@ -432,6 +432,36 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList,
}
/*
* GetConnectionIfPlacementAccessedInXact returns the connection over which
* the placement has been access in the transaction. If not found, returns
* NULL.
*/
MultiConnection *
GetConnectionIfPlacementAccessedInXact(int flags, List *placementAccessList,
const char *userName)
{
MultiConnection *connection = NULL;
char *freeUserName = NULL;
List *placementEntryList = NIL;
if (userName == NULL)
{
userName = freeUserName = CurrentUserName();
}
connection = FindPlacementListConnection(flags, placementAccessList,
userName, &placementEntryList);
if (freeUserName != NULL)
{
pfree(freeUserName);
}
return connection;
}
/*
* FindPlacementListConnection determines whether there is a connection that must
* be used to perform the given placement accesses.

View File

@ -69,7 +69,7 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
List *insertTargetList = distributedPlan->insertTargetList;
Oid targetRelationId = distributedPlan->targetRelationId;
char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix;
HTAB *shardConnectionsHash = NULL;
HTAB *shardStateHash = NULL;
ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator")));
@ -100,7 +100,7 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
bool hasReturning = distributedPlan->hasReturning;
bool isModificationQuery = true;
shardConnectionsHash = ExecuteSelectIntoColocatedIntermediateResults(
shardStateHash = ExecuteSelectIntoColocatedIntermediateResults(
targetRelationId,
insertTargetList,
selectQuery,
@ -119,7 +119,7 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
uint64 shardId = task->anchorShardId;
bool shardModified = false;
hash_search(shardConnectionsHash, &shardId, HASH_FIND, &shardModified);
hash_search(shardStateHash, &shardId, HASH_FIND, &shardModified);
if (shardModified)
{
prunedTaskList = lappend(prunedTaskList, task);
@ -128,8 +128,21 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
if (prunedTaskList != NIL)
{
ExecuteMultipleTasks(scanState, prunedTaskList, isModificationQuery,
hasReturning);
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
ExecuteModifyTasksSequentially(scanState, prunedTaskList,
CMD_INSERT, hasReturning);
}
else
{
ExecuteMultipleTasks(scanState, prunedTaskList, isModificationQuery,
hasReturning);
}
if (SortReturning && hasReturning)
{
SortTupleStore(scanState);
}
}
}
else
@ -151,7 +164,7 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
* ExecuteSelectIntoColocatedIntermediateResults executes the given select query
* and inserts tuples into a set of intermediate results that are colocated with
* the target table for further processing of ON CONFLICT or RETURNING. It also
* returns the hash of connections that were used to insert tuplesinto the target
* returns the hash of shard states that were used to insert tuplesinto the target
* relation.
*/
static HTAB *
@ -198,7 +211,7 @@ ExecuteSelectIntoColocatedIntermediateResults(Oid targetRelationId,
XactModificationLevel = XACT_MODIFICATION_DATA;
return copyDest->shardConnectionHash;
return copyDest->shardStateHash;
}

View File

@ -103,7 +103,6 @@ static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
static void AcquireExecutorMultiShardLocks(List *taskList);
static bool RequiresConsistentSnapshot(Task *task);
static void SortTupleStore(CitusScanState *scanState);
static void RouterMultiModifyExecScan(CustomScanState *node);
static void RouterSequentialModifyExecScan(CustomScanState *node);
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
@ -619,7 +618,7 @@ RouterModifyExecScan(CustomScanState *node)
*
* The sorting is done in ASC order.
*/
static void
void
SortTupleStore(CitusScanState *scanState)
{
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);

View File

@ -883,50 +883,6 @@ CheckConflictingParallelRelationAccesses(Oid relationId, ShardPlacementAccessTyp
}
/*
* CheckConflictingParallelCopyAccesses is mostly a wrapper around
* HoldsConflictingLockWithReferencedRelations(). We're only interested in parallel
* accesses to distributed tables that refers reference tables via foreign constraint.
* Since COPY cannot be used in sequential mode, we're erroring out.
*/
void
CheckConflictingParallelCopyAccesses(Oid relationId)
{
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
Oid conflictingReferencingRelationId = InvalidOid;
ShardPlacementAccessType conflictingAccessType = PLACEMENT_ACCESS_SELECT;
if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
cacheEntry->referencedRelationsViaForeignKey != NIL))
{
return;
}
if (HoldsConflictingLockWithReferencedRelations(relationId, PLACEMENT_ACCESS_DML,
&conflictingReferencingRelationId,
&conflictingAccessType))
{
char *relationName = get_rel_name(relationId);
char *conflictingRelationName = get_rel_name(conflictingReferencingRelationId);
char *conflictingAccessTypeText =
PlacementAccessTypeToText(conflictingAccessType);
ereport(ERROR, (errmsg("cannot execute parallel COPY on relation \"%s\" "
"after %s command on reference relation "
"\"%s\" because there is a foreign key between "
"them and \"%s\" has been modified in this transaction",
relationName, conflictingAccessTypeText,
conflictingRelationName, conflictingRelationName),
errdetail("COPY to a distributed table uses a separate set of "
"connections which will not be able to see the "
"uncommitted changes to the reference table."),
errhint("Perform the COPY in a separate transaction.")));
}
}
/*
* HoldsConflictingLockWithReferencedRelations returns true if the input relationId is a
* hash distributed table and it holds any conflicting locks with the reference tables that

View File

@ -92,10 +92,17 @@ typedef struct CitusCopyDestReceiver
/* template for COPY statement to send to workers */
CopyStmt *copyStatement;
/* cached shard metadata for pruning */
HTAB *shardConnectionHash;
bool stopOnFailure;
/*
* shardId to CopyShardState map. Also used in insert_select_executor.c for
* task pruning.
*/
HTAB *shardStateHash;
/* socket to CopyConnectionState map */
HTAB *connectionStateHash;
/* state on how to copy out data types */
CopyOutState copyOutState;
FmgrInfo *columnOutputFunctions;

View File

@ -47,6 +47,7 @@ extern void ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params,
extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
DestReceiver *dest);
extern void SetLocalMultiShardModifyModeToSequential(void);
extern void SortTupleStore(CitusScanState *scanState);
#endif /* MULTI_EXECUTOR_H */

View File

@ -46,12 +46,17 @@ extern MultiConnection * StartPlacementConnection(uint32 flags,
struct ShardPlacement *placement,
const char *userName);
extern MultiConnection * GetConnectionIfPlacementAccessedInXact(int flags,
List *placementAccessList,
const char *userName);
extern MultiConnection * GetPlacementListConnection(uint32 flags,
List *placementAccessList,
const char *userName);
extern MultiConnection * StartPlacementListConnection(uint32 flags,
List *placementAccessList,
const char *userName);
extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType);
extern void ResetPlacementConnectionManagement(void);
extern void MarkFailedShardPlacements(void);

View File

@ -39,7 +39,6 @@ extern RelationAccessMode GetRelationDDLAccessMode(Oid relationId);
extern RelationAccessMode GetRelationDMLAccessMode(Oid relationId);
extern RelationAccessMode GetRelationSelectAccessMode(Oid relationId);
extern bool ShouldRecordRelationAccess(void);
extern void CheckConflictingParallelCopyAccesses(Oid relationId);
extern bool ParallelQueryExecutedInTransaction(void);

View File

@ -13,8 +13,20 @@ s/shard [0-9]+/shard xxxxx/g
# In foreign_key_to_reference_table, normalize shard table names, etc in
# the generated plan
s/"(fkey_ref_|referenced_table_|referencing_table_)[0-9]+"/"\1xxxxxxx"/g
s/"(foreign_key_2_|fkey_ref_to_dist_|fkey_ref_)[0-9]+"/"\1xxxxxxx"/g
s/"(referenced_table_|referencing_table_|referencing_table2_)[0-9]+"/"\1xxxxxxx"/g
s/\(id\)=\([0-9]+\)/(id)=(X)/g
s/\(ref_id\)=\([0-9]+\)/(ref_id)=(X)/g
# Savepoint error messages changed between postgres 10 and 11.
s/savepoint ".*" does not exist/no such savepoint/g
# shard table names for multi_subtransactions
s/"t2_[0-9]+"/"t2_xxxxxxx"/g
# In foreign_key_restriction_enforcement, normalize shard names
s/"(on_update_fkey_table_|fkey_)[0-9]+"/"\1xxxxxxx"/g
# In multi_insert_select_conflict, normalize shard name and constraints
s/"(target_table_|target_table_|test_ref_table_)[0-9]+"/"\1xxxxxxx"/g
s/\(col_1\)=\([0-9]+\)/(col_1)=(X)/g

View File

@ -2,5 +2,9 @@
multi_alter_table_add_constraints
multi_alter_table_statements
foreign_key_to_reference_table
failure_copy_on_hash
failure_savepoints
foreign_key_restriction_enforcement
multi_insert_select
multi_insert_select_conflict
multi_subtransactions

View File

@ -290,10 +290,6 @@ BEGIN;
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "referece_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
COPY on_update_fkey_table FROM STDIN WITH CSV;
ERROR: cannot execute parallel COPY on relation "on_update_fkey_table" after DML command on reference relation "referece_table" because there is a foreign key between them and "referece_table" has been modified in this transaction
DETAIL: COPY to a distributed table uses a separate set of connections which will not be able to see the uncommitted changes to the reference table.
HINT: Perform the COPY in a separate transaction.
CONTEXT: COPY on_update_fkey_table, line 2: "1002,99"
ROLLBACK;
-- case 2.8: UPDATE to a reference table is followed by TRUNCATE
BEGIN;
@ -368,10 +364,6 @@ BEGIN;
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "referece_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
COPY on_update_fkey_table FROM STDIN WITH CSV;
ERROR: cannot execute parallel COPY on relation "on_update_fkey_table" after DDL command on reference relation "referece_table" because there is a foreign key between them and "referece_table" has been modified in this transaction
DETAIL: COPY to a distributed table uses a separate set of connections which will not be able to see the uncommitted changes to the reference table.
HINT: Perform the COPY in a separate transaction.
CONTEXT: COPY on_update_fkey_table, line 2: "1002,99"
ROLLBACK;
-- case 3.8: DDL to a reference table is followed by TRUNCATE
BEGIN;

View File

@ -290,10 +290,6 @@ BEGIN;
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "referece_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
COPY on_update_fkey_table FROM STDIN WITH CSV;
ERROR: cannot execute parallel COPY on relation "on_update_fkey_table" after DML command on reference relation "referece_table" because there is a foreign key between them and "referece_table" has been modified in this transaction
DETAIL: COPY to a distributed table uses a separate set of connections which will not be able to see the uncommitted changes to the reference table.
HINT: Perform the COPY in a separate transaction.
CONTEXT: COPY on_update_fkey_table, line 2: "1002,99"
ROLLBACK;
-- case 2.8: UPDATE to a reference table is followed by TRUNCATE
BEGIN;
@ -368,10 +364,6 @@ BEGIN;
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "referece_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
COPY on_update_fkey_table FROM STDIN WITH CSV;
ERROR: cannot execute parallel COPY on relation "on_update_fkey_table" after DDL command on reference relation "referece_table" because there is a foreign key between them and "referece_table" has been modified in this transaction
DETAIL: COPY to a distributed table uses a separate set of connections which will not be able to see the uncommitted changes to the reference table.
HINT: Perform the COPY in a separate transaction.
CONTEXT: COPY on_update_fkey_table, line 2: "1002,99"
ROLLBACK;
-- case 3.8: DDL to a reference table is followed by TRUNCATE
BEGIN;

View File

@ -24,10 +24,10 @@ step s1-insert-into-select-conflict-update:
col_1 col_2
1 1
5 5
2 2
3 3
4 4
2 2
5 5
step s2-begin:
BEGIN;
@ -121,10 +121,10 @@ step s2-insert-into-select-conflict-update: <... completed>
col_1 col_2
1 1
5 5
2 2
3 3
4 4
2 2
5 5
step s2-commit:
COMMIT;
@ -153,10 +153,10 @@ step s1-insert-into-select-conflict-update:
col_1 col_2
1 1
5 5
2 2
3 3
4 4
2 2
5 5
step s2-begin:
BEGIN;
@ -180,10 +180,10 @@ step s2-insert-into-select-conflict-update: <... completed>
col_1 col_2
1 1
5 5
2 2
3 3
4 4
2 2
5 5
step s2-commit:
COMMIT;
@ -212,10 +212,10 @@ step s1-insert-into-select-conflict-update:
col_1 col_2
1 1
5 5
2 2
3 3
4 4
2 2
5 5
step s2-begin:
BEGIN;
@ -264,10 +264,10 @@ step s1-insert-into-select-conflict-update-replication-factor-2:
col_1 col_2 col_3
1 1
5 5
2 2
3 3
4 4
2 2
5 5
step s2-begin-replication-factor-2:
SET citus.shard_replication_factor to 2;
BEGIN;
@ -292,10 +292,10 @@ step s2-insert-into-select-conflict-update-replication-factor-2: <... completed>
col_1 col_2 col_3
1 1
5 5
2 2
3 3
4 4
2 2
5 5
step s2-commit:
COMMIT;

View File

@ -1721,7 +1721,7 @@ BEGIN;
ALTER TABLE reference_table ADD COLUMN z int;
INSERT INTO raw_events_first (user_id)
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
ERROR: cannot establish a new connection for placement 13300025, since DDL has been executed on a connection that is in use
ERROR: cannot establish a new connection for placement 13300024, since DDL has been executed on a connection that is in use
ROLLBACK;
-- the same test with sequential DDL should work fine
BEGIN;
@ -2219,10 +2219,10 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator
user_id | time | value_1 | value_2 | value_3 | value_4
---------+------+---------+---------+---------+---------
1 | | 11 | | |
5 | | 15 | | |
2 | | 12 | | |
3 | | 13 | | |
4 | | 14 | | |
2 | | 12 | | |
5 | | 15 | | |
(5 rows)
RESET client_min_messages;

View File

@ -302,8 +302,6 @@ SELECT create_reference_table('test_ref_table');
INSERT INTO test_ref_table VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
ALTER TABLE target_table ADD CONSTRAINT fkey FOREIGN KEY (col_1) REFERENCES test_ref_table(key) ON DELETE CASCADE;
-- Since we try to apply DML command after modification on test_ref_table which
-- has foreign key from target_table, following two queries are not supported.
BEGIN;
TRUNCATE test_ref_table CASCADE;
NOTICE: truncate cascades to table "target_table"
@ -313,21 +311,27 @@ NOTICE: truncate cascades to table "target_table"
col_2,
col_1
FROM source_table_1 ON CONFLICT (col_1) DO UPDATE SET col_2 = 55 RETURNING *;
ERROR: cannot execute parallel COPY on relation "target_table" after DDL command on reference relation "test_ref_table" because there is a foreign key between them and "test_ref_table" has been modified in this transaction
DETAIL: COPY to a distributed table uses a separate set of connections which will not be able to see the uncommitted changes to the reference table.
HINT: Perform the COPY in a separate transaction.
ERROR: insert or update on table "target_table_1900000" violates foreign key constraint "fkey_1900000"
DETAIL: Key (col_1)=(1) is not present in table "test_ref_table_1900012".
CONTEXT: while executing command on localhost:57637
ROLLBACK;
BEGIN;
DELETE FROM test_ref_table;
DELETE FROM test_ref_table WHERE key > 10;
INSERT INTO
target_table
SELECT
col_2,
col_1
FROM source_table_1 ON CONFLICT (col_1) DO UPDATE SET col_2 = 55 RETURNING *;
ERROR: cannot execute parallel COPY on relation "target_table" after DML command on reference relation "test_ref_table" because there is a foreign key between them and "test_ref_table" has been modified in this transaction
DETAIL: COPY to a distributed table uses a separate set of connections which will not be able to see the uncommitted changes to the reference table.
HINT: Perform the COPY in a separate transaction.
FROM source_table_1 ON CONFLICT (col_1) DO UPDATE SET col_2 = 1 RETURNING *;
col_1 | col_2
-------+-------
1 | 1
2 | 1
3 | 1
4 | 1
5 | 1
(5 rows)
ROLLBACK;
-- Following two queries are supported since we no not modify but only select from
-- the target_table after modification on test_ref_table.

View File

@ -217,20 +217,16 @@ BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs');
\copy labs from stdin delimiter ','
COMMIT;
-- COPY cannot be performed if multiple shards were modified over the same connection
-- COPY can be performed if multiple shards were modified over the same connection
BEGIN;
INSERT INTO researchers VALUES (2, 1, 'Knuth Donald');
INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie');
\copy researchers from stdin delimiter ','
ERROR: cannot establish a new connection for placement 1200003, since DML has been executed on a connection that is in use
CONTEXT: COPY researchers, line 2: "10,6,Lesport Lampie"
ROLLBACK;
-- COPY cannot be performed after a multi-row INSERT that uses one connection
-- COPY can be performed after a multi-row INSERT that uses one connection
BEGIN;
INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'), (10, 6, 'Lamport Leslie');
\copy researchers from stdin delimiter ','
ERROR: cannot establish a new connection for placement 1200003, since DML has been executed on a connection that is in use
CONTEXT: COPY researchers, line 2: "10,6,Lesport Lampie"
ROLLBACK;
-- after a COPY you can modify multiple shards, since they'll use different connections
BEGIN;
@ -381,9 +377,9 @@ DELETE FROM researchers WHERE lab_id = 6;
\copy researchers FROM STDIN delimiter ','
COMMIT;
WARNING: illegal value
WARNING: failed to commit critical transaction on localhost:57638, metadata is likely out of sync
WARNING: failed to commit transaction on localhost:57638
WARNING: illegal value
WARNING: failed to commit critical transaction on localhost:57637, metadata is likely out of sync
WARNING: failed to commit transaction on localhost:57637
WARNING: could not commit transaction for shard 1200001 on any active node
ERROR: could not commit transaction on any active node
\unset VERBOSITY
@ -1431,13 +1427,12 @@ SELECT user_id FROM items ORDER BY user_id;
6
(2 rows)
-- should not be able to open multiple connections per node after INSERTing over one connection
-- should be able to open multiple connections per node after INSERTing over one connection
BEGIN;
INSERT INTO users VALUES (2, 'burak');
INSERT INTO users VALUES (3, 'burak');
\COPY items FROM STDIN WITH CSV
ERROR: cannot establish a new connection for placement 1200042, since DML has been executed on a connection that is in use
END;
ROLLBACK;
-- cannot perform parallel DDL after a co-located table has been read over 1 connection
BEGIN;
SELECT id FROM users WHERE id = 1;

View File

@ -575,8 +575,6 @@ BEGIN;
(1 row)
\COPY test_seq_copy FROM STDIN DELIMITER AS ',';
ERROR: cannot establish a new connection for placement 16073, since DDL has been executed on a connection that is in use
CONTEXT: COPY test_seq_copy, line 2: "2,2"
ROLLBACK;
SELECT distributed_2PCs_are_equal_to_worker_count();
distributed_2pcs_are_equal_to_worker_count

View File

@ -813,6 +813,16 @@ SELECT 1 AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM gener
SELECT create_distributed_table('trigger_flush','a');
ABORT;
-- trigger switch-over when using single connection per worker
BEGIN;
SET citus.shard_count TO 3;
SET citus.multi_shard_modify_mode TO 'sequential';
CREATE UNLOGGED TABLE trigger_switchover(a int, b int, c int, d int, e int, f int, g int, h int);
SELECT create_distributed_table('trigger_switchover','a');
INSERT INTO trigger_switchover
SELECT s AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,250000) s;
ABORT;
-- copy into a table with a JSONB column
CREATE TABLE copy_jsonb (key text, value jsonb, extra jsonb default '["default"]'::jsonb);
SELECT create_distributed_table('copy_jsonb', 'key', colocate_with => 'none');

View File

@ -935,7 +935,8 @@ CONTEXT: COPY numbers_hash_other, line 1: "1,1"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
ERROR: could not connect to any active placements
ERROR: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
-- verify shards for numbers_hash_other are still valid
-- since copy has failed altogether
@ -1091,6 +1092,20 @@ NOTICE: Copying data from local table...
(1 row)
ABORT;
-- trigger switch-over when using single connection per worker
BEGIN;
SET citus.shard_count TO 3;
SET citus.multi_shard_modify_mode TO 'sequential';
CREATE UNLOGGED TABLE trigger_switchover(a int, b int, c int, d int, e int, f int, g int, h int);
SELECT create_distributed_table('trigger_switchover','a');
create_distributed_table
--------------------------
(1 row)
INSERT INTO trigger_switchover
SELECT s AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,250000) s;
ABORT;
-- copy into a table with a JSONB column
CREATE TABLE copy_jsonb (key text, value jsonb, extra jsonb default '["default"]'::jsonb);

View File

@ -163,11 +163,11 @@ ROLLBACK;
BEGIN;
UPDATE referece_table SET id = 101 WHERE id = 99;
COPY on_update_fkey_table FROM STDIN WITH CSV;
1001,99
1002,99
1003,99
1004,99
1005,99
1001,101
1002,101
1003,101
1004,101
1005,101
\.
ROLLBACK;

View File

@ -165,8 +165,6 @@ SELECT create_reference_table('test_ref_table');
INSERT INTO test_ref_table VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
ALTER TABLE target_table ADD CONSTRAINT fkey FOREIGN KEY (col_1) REFERENCES test_ref_table(key) ON DELETE CASCADE;
-- Since we try to apply DML command after modification on test_ref_table which
-- has foreign key from target_table, following two queries are not supported.
BEGIN;
TRUNCATE test_ref_table CASCADE;
INSERT INTO
@ -178,13 +176,13 @@ BEGIN;
ROLLBACK;
BEGIN;
DELETE FROM test_ref_table;
DELETE FROM test_ref_table WHERE key > 10;
INSERT INTO
target_table
SELECT
col_2,
col_1
FROM source_table_1 ON CONFLICT (col_1) DO UPDATE SET col_2 = 55 RETURNING *;
FROM source_table_1 ON CONFLICT (col_1) DO UPDATE SET col_2 = 1 RETURNING *;
ROLLBACK;
-- Following two queries are supported since we no not modify but only select from

View File

@ -179,7 +179,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
\.
COMMIT;
-- COPY cannot be performed if multiple shards were modified over the same connection
-- COPY can be performed if multiple shards were modified over the same connection
BEGIN;
INSERT INTO researchers VALUES (2, 1, 'Knuth Donald');
INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie');
@ -189,7 +189,7 @@ INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie');
\.
ROLLBACK;
-- COPY cannot be performed after a multi-row INSERT that uses one connection
-- COPY can be performed after a multi-row INSERT that uses one connection
BEGIN;
INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'), (10, 6, 'Lamport Leslie');
\copy researchers from stdin delimiter ','
@ -1064,7 +1064,7 @@ END;
SELECT user_id FROM items ORDER BY user_id;
-- should not be able to open multiple connections per node after INSERTing over one connection
-- should be able to open multiple connections per node after INSERTing over one connection
BEGIN;
INSERT INTO users VALUES (2, 'burak');
INSERT INTO users VALUES (3, 'burak');
@ -1072,7 +1072,7 @@ INSERT INTO users VALUES (3, 'burak');
2,item-2,0
3,item-3,0
\.
END;
ROLLBACK;
-- cannot perform parallel DDL after a co-located table has been read over 1 connection
BEGIN;