mirror of https://github.com/citusdata/citus.git
Move router executor over to new connection & transaction framework.
Note that this changes behaviour in a few edgecases (as demonstrated by the tests). There's also a few FIXMEs.pull/775/head
parent
8a6a8fae9b
commit
360307f6d5
|
@ -28,7 +28,6 @@
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "distributed/citus_clauses.h"
|
#include "distributed/citus_clauses.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/connection_cache.h"
|
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
@ -36,9 +35,11 @@
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/multi_planner.h"
|
#include "distributed/multi_planner.h"
|
||||||
#include "distributed/multi_router_executor.h"
|
#include "distributed/multi_router_executor.h"
|
||||||
|
#include "distributed/placement_connection.h"
|
||||||
#include "distributed/relay_utility.h"
|
#include "distributed/relay_utility.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
|
#include "distributed/remote_commands.h"
|
||||||
#include "executor/execdesc.h"
|
#include "executor/execdesc.h"
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
#include "executor/instrument.h"
|
#include "executor/instrument.h"
|
||||||
|
@ -66,29 +67,9 @@
|
||||||
/* controls use of locks to enforce safe commutativity */
|
/* controls use of locks to enforce safe commutativity */
|
||||||
bool AllModificationsCommutative = false;
|
bool AllModificationsCommutative = false;
|
||||||
|
|
||||||
/*
|
|
||||||
* The following static variables are necessary to track the progression of
|
|
||||||
* multi-statement transactions managed by the router executor. After the first
|
|
||||||
* modification within a transaction, the executor populates a hash with the
|
|
||||||
* transaction's initial participants (nodes hit by that initial modification).
|
|
||||||
*
|
|
||||||
* To keep track of the reverse mapping (from shards to nodes), we have a list
|
|
||||||
* of XactShardConnSets, which map a shard identifier to a set of connection
|
|
||||||
* hash entries. This list is walked by MarkRemainingInactivePlacements to
|
|
||||||
* ensure we mark placements as failed if they reject a COMMIT.
|
|
||||||
*
|
|
||||||
* Beyond that, there's a backend hook to register xact callbacks and a flag to
|
|
||||||
* track when a user tries to roll back to a savepoint (not allowed).
|
|
||||||
*/
|
|
||||||
static HTAB *xactParticipantHash = NULL;
|
|
||||||
static List *xactShardConnSetList = NIL;
|
|
||||||
static bool subXactAbortAttempted = false;
|
|
||||||
|
|
||||||
/* functions needed during start phase */
|
/* functions needed during start phase */
|
||||||
static void InitTransactionStateForTask(Task *task);
|
|
||||||
static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery);
|
static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery);
|
||||||
static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode);
|
static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode);
|
||||||
static HTAB * CreateXactParticipantHash(void);
|
|
||||||
|
|
||||||
/* functions needed during run phase */
|
/* functions needed during run phase */
|
||||||
static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
|
static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
|
||||||
|
@ -98,27 +79,14 @@ static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
|
||||||
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
|
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
|
||||||
DestReceiver *destination,
|
DestReceiver *destination,
|
||||||
Tuplestorestate *tupleStore);
|
Tuplestorestate *tupleStore);
|
||||||
static PGconn * GetConnectionForPlacement(ShardPlacement *placement,
|
|
||||||
bool isModificationQuery);
|
|
||||||
static void PurgeConnectionForPlacement(ShardPlacement *placement);
|
|
||||||
static void RemoveXactConnection(PGconn *connection);
|
|
||||||
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
||||||
Oid **parameterTypes,
|
Oid **parameterTypes,
|
||||||
const char ***parameterValues);
|
const char ***parameterValues);
|
||||||
static bool SendQueryInSingleRowMode(PGconn *connection, char *query,
|
static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query,
|
||||||
ParamListInfo paramListInfo);
|
ParamListInfo paramListInfo);
|
||||||
static bool StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
static bool StoreQueryResult(MaterialState *routerState, MultiConnection *connection,
|
||||||
TupleDesc tupleDescriptor, int64 *rows);
|
TupleDesc tupleDescriptor, int64 *rows);
|
||||||
static bool ConsumeQueryResult(PGconn *connection, int64 *rows);
|
static bool ConsumeQueryResult(MultiConnection *connection, int64 *rows);
|
||||||
static void RecordShardIdParticipant(uint64 affectedShardId,
|
|
||||||
NodeConnectionEntry *participantEntry);
|
|
||||||
|
|
||||||
/* functions needed by callbacks and hooks */
|
|
||||||
static void RouterTransactionCallback(XactEvent event, void *arg);
|
|
||||||
static void RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId,
|
|
||||||
SubTransactionId parentSubid, void *arg);
|
|
||||||
static void ExecuteTransactionEnd(bool commit);
|
|
||||||
static void MarkRemainingInactivePlacements(void);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -154,9 +122,9 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
|
||||||
* but some customers already use functions that touch multiple shards
|
* but some customers already use functions that touch multiple shards
|
||||||
* from within a function, so we'll ignore functions for now.
|
* from within a function, so we'll ignore functions for now.
|
||||||
*/
|
*/
|
||||||
if (IsTransactionBlock() && xactParticipantHash == NULL)
|
if (IsTransactionBlock())
|
||||||
{
|
{
|
||||||
InitTransactionStateForTask(task);
|
BeginOrContinueCoordinatedTransaction();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -186,62 +154,6 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* InitTransactionStateForTask is called during executor start with the first
|
|
||||||
* modifying (INSERT/UPDATE/DELETE) task during a transaction. It creates the
|
|
||||||
* transaction participant hash, opens connections to this task's nodes, and
|
|
||||||
* populates the hash with those connections after sending BEGIN commands to
|
|
||||||
* each. If a node fails to respond, its connection is set to NULL to prevent
|
|
||||||
* further interaction with it during the transaction.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
InitTransactionStateForTask(Task *task)
|
|
||||||
{
|
|
||||||
ListCell *placementCell = NULL;
|
|
||||||
|
|
||||||
xactParticipantHash = CreateXactParticipantHash();
|
|
||||||
|
|
||||||
foreach(placementCell, task->taskPlacementList)
|
|
||||||
{
|
|
||||||
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
|
|
||||||
NodeConnectionKey participantKey;
|
|
||||||
NodeConnectionEntry *participantEntry = NULL;
|
|
||||||
bool entryFound = false;
|
|
||||||
|
|
||||||
PGconn *connection = NULL;
|
|
||||||
|
|
||||||
MemSet(&participantKey, 0, sizeof(participantKey));
|
|
||||||
strlcpy(participantKey.nodeName, placement->nodeName,
|
|
||||||
MAX_NODE_LENGTH + 1);
|
|
||||||
participantKey.nodePort = placement->nodePort;
|
|
||||||
|
|
||||||
participantEntry = hash_search(xactParticipantHash, &participantKey,
|
|
||||||
HASH_ENTER, &entryFound);
|
|
||||||
Assert(!entryFound);
|
|
||||||
|
|
||||||
connection = GetOrEstablishConnection(placement->nodeName,
|
|
||||||
placement->nodePort);
|
|
||||||
if (connection != NULL)
|
|
||||||
{
|
|
||||||
PGresult *result = PQexec(connection, "BEGIN");
|
|
||||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
|
||||||
{
|
|
||||||
WarnRemoteError(connection, result);
|
|
||||||
PurgeConnection(connection);
|
|
||||||
|
|
||||||
connection = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
PQclear(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
participantEntry->connection = connection;
|
|
||||||
}
|
|
||||||
|
|
||||||
XactModificationLevel = XACT_MODIFICATION_DATA;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CommutativityRuleToLockMode determines the commutativity rule for the given
|
* CommutativityRuleToLockMode determines the commutativity rule for the given
|
||||||
* command and returns the appropriate lock mode to enforce that rule. The
|
* command and returns the appropriate lock mode to enforce that rule. The
|
||||||
|
@ -311,33 +223,6 @@ AcquireExecutorShardLock(Task *task, LOCKMODE lockMode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CreateXactParticipantHash initializes the map used to store the connections
|
|
||||||
* needed to process distributed transactions. Unlike the connection cache, we
|
|
||||||
* permit NULL connections here to signify that a participant has seen an error
|
|
||||||
* and is no longer receiving commands during a transaction. This hash should
|
|
||||||
* be walked at transaction end to send final COMMIT or ABORT commands.
|
|
||||||
*/
|
|
||||||
static HTAB *
|
|
||||||
CreateXactParticipantHash(void)
|
|
||||||
{
|
|
||||||
HTAB *xactParticipantHash = NULL;
|
|
||||||
HASHCTL info;
|
|
||||||
int hashFlags = 0;
|
|
||||||
|
|
||||||
MemSet(&info, 0, sizeof(info));
|
|
||||||
info.keysize = sizeof(NodeConnectionKey);
|
|
||||||
info.entrysize = sizeof(NodeConnectionEntry);
|
|
||||||
info.hcxt = TopTransactionContext;
|
|
||||||
hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
|
|
||||||
|
|
||||||
xactParticipantHash = hash_create("citus xact participant hash", 32, &info,
|
|
||||||
hashFlags);
|
|
||||||
|
|
||||||
return xactParticipantHash;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RouterExecutorRun actually executes a single task on a worker.
|
* RouterExecutorRun actually executes a single task on a worker.
|
||||||
*/
|
*/
|
||||||
|
@ -422,6 +307,12 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
|
||||||
|
|
||||||
/* mark underlying query as having executed */
|
/* mark underlying query as having executed */
|
||||||
routerState->eof_underlying = true;
|
routerState->eof_underlying = true;
|
||||||
|
|
||||||
|
/* have performed modifications now */
|
||||||
|
if (isModificationQuery)
|
||||||
|
{
|
||||||
|
XactModificationLevel = XACT_MODIFICATION_DATA;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if the underlying query produced output, return it */
|
/* if the underlying query produced output, return it */
|
||||||
|
@ -489,6 +380,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
int64 affectedTupleCount = -1;
|
int64 affectedTupleCount = -1;
|
||||||
bool gotResults = false;
|
bool gotResults = false;
|
||||||
char *queryString = task->queryString;
|
char *queryString = task->queryString;
|
||||||
|
int connectionFlags = NEW_CONNECTION | CACHED_CONNECTION | SESSION_LIFESPAN;
|
||||||
|
|
||||||
if (isModificationQuery && task->requiresMasterEvaluation)
|
if (isModificationQuery && task->requiresMasterEvaluation)
|
||||||
{
|
{
|
||||||
|
@ -506,6 +398,11 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
elog(DEBUG4, "query after master evaluation: %s", queryString);
|
elog(DEBUG4, "query after master evaluation: %s", queryString);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isModificationQuery)
|
||||||
|
{
|
||||||
|
connectionFlags |= FOR_DML;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Try to run the query to completion on one placement. If the query fails
|
* Try to run the query to completion on one placement. If the query fails
|
||||||
* attempt the query on the next placement.
|
* attempt the query on the next placement.
|
||||||
|
@ -515,8 +412,16 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
|
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
|
||||||
bool queryOK = false;
|
bool queryOK = false;
|
||||||
int64 currentAffectedTupleCount = 0;
|
int64 currentAffectedTupleCount = 0;
|
||||||
PGconn *connection = GetConnectionForPlacement(taskPlacement,
|
|
||||||
isModificationQuery);
|
/*
|
||||||
|
* FIXME: It's not actually correct to use only one shard placement
|
||||||
|
* here for router queries involving multiple relations. We should
|
||||||
|
* check that this connection is the only modifying one associated
|
||||||
|
* with all the involved shards.
|
||||||
|
*/
|
||||||
|
MultiConnection *connection = GetPlacementConnection(connectionFlags,
|
||||||
|
taskPlacement);
|
||||||
|
AdjustRemoteTransactionState(connection);
|
||||||
|
|
||||||
if (connection == NULL)
|
if (connection == NULL)
|
||||||
{
|
{
|
||||||
|
@ -527,7 +432,6 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
|
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
|
||||||
if (!queryOK)
|
if (!queryOK)
|
||||||
{
|
{
|
||||||
PurgeConnectionForPlacement(taskPlacement);
|
|
||||||
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -584,8 +488,6 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
PurgeConnectionForPlacement(taskPlacement);
|
|
||||||
|
|
||||||
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
|
@ -662,145 +564,14 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* GetConnectionForPlacement is the main entry point for acquiring a connection
|
|
||||||
* within the router executor. By using placements (rather than node names and
|
|
||||||
* ports) to identify connections, the router executor can keep track of shards
|
|
||||||
* used by multi-statement transactions and error out if a transaction tries
|
|
||||||
* to reach a new node altogether). In the single-statement commands context,
|
|
||||||
* GetConnectionForPlacement simply falls through to GetOrEstablishConnection.
|
|
||||||
*/
|
|
||||||
static PGconn *
|
|
||||||
GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery)
|
|
||||||
{
|
|
||||||
NodeConnectionKey participantKey;
|
|
||||||
NodeConnectionEntry *participantEntry = NULL;
|
|
||||||
bool entryFound = false;
|
|
||||||
|
|
||||||
/* if not in a transaction, fall through to connection cache */
|
|
||||||
if (xactParticipantHash == NULL)
|
|
||||||
{
|
|
||||||
PGconn *connection = GetOrEstablishConnection(placement->nodeName,
|
|
||||||
placement->nodePort);
|
|
||||||
|
|
||||||
return connection;
|
|
||||||
}
|
|
||||||
|
|
||||||
Assert(IsTransactionBlock());
|
|
||||||
|
|
||||||
MemSet(&participantKey, 0, sizeof(participantKey));
|
|
||||||
strlcpy(participantKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
|
|
||||||
participantKey.nodePort = placement->nodePort;
|
|
||||||
|
|
||||||
participantEntry = hash_search(xactParticipantHash, &participantKey, HASH_FIND,
|
|
||||||
&entryFound);
|
|
||||||
|
|
||||||
if (entryFound)
|
|
||||||
{
|
|
||||||
if (isModificationQuery)
|
|
||||||
{
|
|
||||||
RecordShardIdParticipant(placement->shardId, participantEntry);
|
|
||||||
}
|
|
||||||
|
|
||||||
return participantEntry->connection;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
|
|
||||||
errmsg("no transaction participant matches %s:%d",
|
|
||||||
placement->nodeName, placement->nodePort),
|
|
||||||
errdetail("Transactions which modify distributed tables may only "
|
|
||||||
"target nodes affected by the modification command "
|
|
||||||
"which began the transaction.")));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* PurgeConnectionForPlacement provides a way to purge an invalid connection
|
|
||||||
* from all relevant connection hashes using the placement involved in the
|
|
||||||
* query at the time of the error. If a transaction is ongoing, this function
|
|
||||||
* ensures the right node's connection is set to NULL in the participant map
|
|
||||||
* for the transaction in addition to purging the connection cache's entry.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
PurgeConnectionForPlacement(ShardPlacement *placement)
|
|
||||||
{
|
|
||||||
NodeConnectionKey nodeKey;
|
|
||||||
char *currentUser = CurrentUserName();
|
|
||||||
|
|
||||||
MemSet(&nodeKey, 0, sizeof(NodeConnectionKey));
|
|
||||||
strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
|
|
||||||
nodeKey.nodePort = placement->nodePort;
|
|
||||||
strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN);
|
|
||||||
|
|
||||||
PurgeConnectionByKey(&nodeKey);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* The following is logically identical to RemoveXactConnection, but since
|
|
||||||
* we have a ShardPlacement to help build a NodeConnectionKey, we avoid
|
|
||||||
* any penalty incurred by calling BuildKeyForConnection, which must ex-
|
|
||||||
* tract host, port, and user from the connection options list.
|
|
||||||
*/
|
|
||||||
if (xactParticipantHash != NULL)
|
|
||||||
{
|
|
||||||
NodeConnectionEntry *participantEntry = NULL;
|
|
||||||
bool entryFound = false;
|
|
||||||
|
|
||||||
Assert(IsTransactionBlock());
|
|
||||||
|
|
||||||
/* the participant hash doesn't use the user field */
|
|
||||||
MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser));
|
|
||||||
participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND,
|
|
||||||
&entryFound);
|
|
||||||
|
|
||||||
Assert(entryFound);
|
|
||||||
|
|
||||||
participantEntry->connection = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Removes a given connection from the transaction participant hash, based on
|
|
||||||
* the host and port of the provided connection. If the hash is not NULL, it
|
|
||||||
* MUST contain the provided connection, or a FATAL error is raised.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
RemoveXactConnection(PGconn *connection)
|
|
||||||
{
|
|
||||||
NodeConnectionKey nodeKey;
|
|
||||||
NodeConnectionEntry *participantEntry = NULL;
|
|
||||||
bool entryFound = false;
|
|
||||||
|
|
||||||
if (xactParticipantHash == NULL)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
BuildKeyForConnection(connection, &nodeKey);
|
|
||||||
|
|
||||||
/* the participant hash doesn't use the user field */
|
|
||||||
MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser));
|
|
||||||
participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND,
|
|
||||||
&entryFound);
|
|
||||||
|
|
||||||
if (!entryFound)
|
|
||||||
{
|
|
||||||
ereport(FATAL, (errmsg("could not find specified transaction connection")));
|
|
||||||
}
|
|
||||||
|
|
||||||
participantEntry->connection = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SendQueryInSingleRowMode sends the given query on the connection in an
|
* SendQueryInSingleRowMode sends the given query on the connection in an
|
||||||
* asynchronous way. The function also sets the single-row mode on the
|
* asynchronous way. The function also sets the single-row mode on the
|
||||||
* connection so that we receive results a row at a time.
|
* connection so that we receive results a row at a time.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramListInfo)
|
SendQueryInSingleRowMode(MultiConnection *connection, char *query,
|
||||||
|
ParamListInfo paramListInfo)
|
||||||
{
|
{
|
||||||
int querySent = 0;
|
int querySent = 0;
|
||||||
int singleRowMode = 0;
|
int singleRowMode = 0;
|
||||||
|
@ -814,24 +585,25 @@ SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramLis
|
||||||
ExtractParametersFromParamListInfo(paramListInfo, ¶meterTypes,
|
ExtractParametersFromParamListInfo(paramListInfo, ¶meterTypes,
|
||||||
¶meterValues);
|
¶meterValues);
|
||||||
|
|
||||||
querySent = PQsendQueryParams(connection, query, parameterCount, parameterTypes,
|
querySent = PQsendQueryParams(connection->conn, query, parameterCount,
|
||||||
parameterValues, NULL, NULL, 0);
|
parameterTypes, parameterValues,
|
||||||
|
NULL, NULL, 0);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
querySent = PQsendQuery(connection, query);
|
querySent = PQsendQuery(connection->conn, query);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (querySent == 0)
|
if (querySent == 0)
|
||||||
{
|
{
|
||||||
WarnRemoteError(connection, NULL);
|
ReportConnectionError(connection, WARNING);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
singleRowMode = PQsetSingleRowMode(connection);
|
singleRowMode = PQsetSingleRowMode(connection->conn);
|
||||||
if (singleRowMode == 0)
|
if (singleRowMode == 0)
|
||||||
{
|
{
|
||||||
WarnRemoteError(connection, NULL);
|
ReportConnectionError(connection, WARNING);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -904,7 +676,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT
|
||||||
* the connection.
|
* the connection.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
StoreQueryResult(MaterialState *routerState, MultiConnection *connection,
|
||||||
TupleDesc tupleDescriptor, int64 *rows)
|
TupleDesc tupleDescriptor, int64 *rows)
|
||||||
{
|
{
|
||||||
AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
|
AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
|
||||||
|
@ -939,7 +711,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
||||||
uint32 columnCount = 0;
|
uint32 columnCount = 0;
|
||||||
ExecStatusType resultStatus = 0;
|
ExecStatusType resultStatus = 0;
|
||||||
|
|
||||||
PGresult *result = PQgetResult(connection);
|
PGresult *result = PQgetResult(connection->conn);
|
||||||
if (result == NULL)
|
if (result == NULL)
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
|
@ -962,12 +734,11 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
||||||
|
|
||||||
if (raiseError)
|
if (raiseError)
|
||||||
{
|
{
|
||||||
RemoveXactConnection(connection);
|
ReportResultError(connection, result, ERROR);
|
||||||
ReraiseRemoteError(connection, result);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
WarnRemoteError(connection, result);
|
ReportResultError(connection, result, WARNING);
|
||||||
}
|
}
|
||||||
|
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
|
@ -1032,7 +803,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
||||||
* has been an error.
|
* has been an error.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
ConsumeQueryResult(PGconn *connection, int64 *rows)
|
ConsumeQueryResult(MultiConnection *connection, int64 *rows)
|
||||||
{
|
{
|
||||||
bool commandFailed = false;
|
bool commandFailed = false;
|
||||||
bool gotResponse = false;
|
bool gotResponse = false;
|
||||||
|
@ -1046,7 +817,7 @@ ConsumeQueryResult(PGconn *connection, int64 *rows)
|
||||||
*/
|
*/
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
PGresult *result = PQgetResult(connection);
|
PGresult *result = PQgetResult(connection->conn);
|
||||||
ExecStatusType status = PGRES_COMMAND_OK;
|
ExecStatusType status = PGRES_COMMAND_OK;
|
||||||
|
|
||||||
if (result == NULL)
|
if (result == NULL)
|
||||||
|
@ -1074,12 +845,11 @@ ConsumeQueryResult(PGconn *connection, int64 *rows)
|
||||||
|
|
||||||
if (raiseError)
|
if (raiseError)
|
||||||
{
|
{
|
||||||
RemoveXactConnection(connection);
|
ReportResultError(connection, result, ERROR);
|
||||||
ReraiseRemoteError(connection, result);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
WarnRemoteError(connection, result);
|
ReportResultError(connection, result, WARNING);
|
||||||
}
|
}
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
|
|
||||||
|
@ -1117,50 +887,6 @@ ConsumeQueryResult(PGconn *connection, int64 *rows)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* RecordShardIdParticipant registers a connection as being involved with a
|
|
||||||
* particular shard during a multi-statement transaction.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
RecordShardIdParticipant(uint64 affectedShardId, NodeConnectionEntry *participantEntry)
|
|
||||||
{
|
|
||||||
XactShardConnSet *shardConnSetMatch = NULL;
|
|
||||||
ListCell *listCell = NULL;
|
|
||||||
MemoryContext oldContext = NULL;
|
|
||||||
List *connectionEntryList = NIL;
|
|
||||||
|
|
||||||
/* check whether an entry already exists for this shard */
|
|
||||||
foreach(listCell, xactShardConnSetList)
|
|
||||||
{
|
|
||||||
XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(listCell);
|
|
||||||
|
|
||||||
if (shardConnSet->shardId == affectedShardId)
|
|
||||||
{
|
|
||||||
shardConnSetMatch = shardConnSet;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* entries must last through the whole top-level transaction */
|
|
||||||
oldContext = MemoryContextSwitchTo(TopTransactionContext);
|
|
||||||
|
|
||||||
/* if no entry found, make one */
|
|
||||||
if (shardConnSetMatch == NULL)
|
|
||||||
{
|
|
||||||
shardConnSetMatch = (XactShardConnSet *) palloc0(sizeof(XactShardConnSet));
|
|
||||||
shardConnSetMatch->shardId = affectedShardId;
|
|
||||||
|
|
||||||
xactShardConnSetList = lappend(xactShardConnSetList, shardConnSetMatch);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* add connection, avoiding duplicates */
|
|
||||||
connectionEntryList = shardConnSetMatch->connectionEntryList;
|
|
||||||
shardConnSetMatch->connectionEntryList = list_append_unique_ptr(connectionEntryList,
|
|
||||||
participantEntry);
|
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RouterExecutorFinish cleans up after a distributed execution.
|
* RouterExecutorFinish cleans up after a distributed execution.
|
||||||
*/
|
*/
|
||||||
|
@ -1195,221 +921,3 @@ RouterExecutorEnd(QueryDesc *queryDesc)
|
||||||
queryDesc->estate = NULL;
|
queryDesc->estate = NULL;
|
||||||
queryDesc->totaltime = NULL;
|
queryDesc->totaltime = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* RegisterRouterExecutorXactCallbacks registers this executor's callbacks.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
RegisterRouterExecutorXactCallbacks(void)
|
|
||||||
{
|
|
||||||
RegisterXactCallback(RouterTransactionCallback, NULL);
|
|
||||||
RegisterSubXactCallback(RouterSubtransactionCallback, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* RouterTransactionCallback handles committing or aborting remote transactions
|
|
||||||
* after the local one has committed or aborted. It only sends COMMIT or ABORT
|
|
||||||
* commands to still-healthy remotes; the failed ones are marked as inactive if
|
|
||||||
* after a successful COMMIT (no need to mark on ABORTs).
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
RouterTransactionCallback(XactEvent event, void *arg)
|
|
||||||
{
|
|
||||||
if (XactModificationLevel != XACT_MODIFICATION_DATA)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (event)
|
|
||||||
{
|
|
||||||
case XACT_EVENT_PARALLEL_COMMIT:
|
|
||||||
case XACT_EVENT_COMMIT:
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case XACT_EVENT_PARALLEL_ABORT:
|
|
||||||
case XACT_EVENT_ABORT:
|
|
||||||
{
|
|
||||||
bool commit = false;
|
|
||||||
|
|
||||||
ExecuteTransactionEnd(commit);
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* no support for prepare with multi-statement transactions */
|
|
||||||
case XACT_EVENT_PREPARE:
|
|
||||||
case XACT_EVENT_PRE_PREPARE:
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
||||||
errmsg("cannot prepare a transaction that modified "
|
|
||||||
"distributed tables")));
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case XACT_EVENT_PARALLEL_PRE_COMMIT:
|
|
||||||
case XACT_EVENT_PRE_COMMIT:
|
|
||||||
{
|
|
||||||
bool commit = true;
|
|
||||||
|
|
||||||
if (subXactAbortAttempted)
|
|
||||||
{
|
|
||||||
subXactAbortAttempted = false;
|
|
||||||
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
||||||
errmsg("cannot ROLLBACK TO SAVEPOINT in transactions "
|
|
||||||
"which modify distributed tables")));
|
|
||||||
}
|
|
||||||
|
|
||||||
ExecuteTransactionEnd(commit);
|
|
||||||
MarkRemainingInactivePlacements();
|
|
||||||
|
|
||||||
/* leave early to avoid resetting transaction state */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* reset transaction state */
|
|
||||||
xactParticipantHash = NULL;
|
|
||||||
xactShardConnSetList = NIL;
|
|
||||||
subXactAbortAttempted = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* RouterSubtransactionCallback silently keeps track of any attempt to ROLLBACK
|
|
||||||
* TO SAVEPOINT, which is not permitted by this executor. At transaction end,
|
|
||||||
* the executor checks whether such a rollback was attempted and, if so, errors
|
|
||||||
* out entirely (with an appropriate message).
|
|
||||||
*
|
|
||||||
* This implementation permits savepoints so long as no rollbacks occur.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId,
|
|
||||||
SubTransactionId parentSubid, void *arg)
|
|
||||||
{
|
|
||||||
if ((xactParticipantHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB))
|
|
||||||
{
|
|
||||||
subXactAbortAttempted = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ExecuteTransactionEnd ends any remote transactions still taking place on
|
|
||||||
* remote nodes. It uses xactParticipantHash to know which nodes need any
|
|
||||||
* final COMMIT or ABORT commands. Nodes which fail a final COMMIT will have
|
|
||||||
* their connection field set to NULL to permit placement invalidation.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
ExecuteTransactionEnd(bool commit)
|
|
||||||
{
|
|
||||||
const char *sqlCommand = commit ? "COMMIT TRANSACTION" : "ABORT TRANSACTION";
|
|
||||||
HASH_SEQ_STATUS scan;
|
|
||||||
NodeConnectionEntry *participant;
|
|
||||||
bool completed = !commit; /* aborts are assumed completed */
|
|
||||||
|
|
||||||
if (xactParticipantHash == NULL)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
hash_seq_init(&scan, xactParticipantHash);
|
|
||||||
while ((participant = (NodeConnectionEntry *) hash_seq_search(&scan)))
|
|
||||||
{
|
|
||||||
PGconn *connection = participant->connection;
|
|
||||||
PGresult *result = NULL;
|
|
||||||
|
|
||||||
if (PQstatus(connection) != CONNECTION_OK)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
result = PQexec(connection, sqlCommand);
|
|
||||||
if (PQresultStatus(result) == PGRES_COMMAND_OK)
|
|
||||||
{
|
|
||||||
completed = true;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
WarnRemoteError(connection, result);
|
|
||||||
PurgeConnection(participant->connection);
|
|
||||||
|
|
||||||
participant->connection = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
PQclear(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!completed)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not commit transaction on any active nodes")));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* MarkRemainingInactivePlacements takes care of marking placements of a shard
|
|
||||||
* inactive after some of the placements rejected the final COMMIT phase of a
|
|
||||||
* transaction. This step is skipped if all placements reject the COMMIT, since
|
|
||||||
* in that case no modifications to the placement have persisted.
|
|
||||||
*
|
|
||||||
* Failures are detected by checking the connection field of the entries in the
|
|
||||||
* connection set for each shard: it is always set to NULL after errors.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
MarkRemainingInactivePlacements(void)
|
|
||||||
{
|
|
||||||
ListCell *shardConnSetCell = NULL;
|
|
||||||
|
|
||||||
foreach(shardConnSetCell, xactShardConnSetList)
|
|
||||||
{
|
|
||||||
XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(shardConnSetCell);
|
|
||||||
List *participantList = shardConnSet->connectionEntryList;
|
|
||||||
ListCell *participantCell = NULL;
|
|
||||||
int successes = list_length(participantList); /* assume full success */
|
|
||||||
|
|
||||||
/* determine how many actual successes there were: subtract failures */
|
|
||||||
foreach(participantCell, participantList)
|
|
||||||
{
|
|
||||||
NodeConnectionEntry *participant = NULL;
|
|
||||||
participant = (NodeConnectionEntry *) lfirst(participantCell);
|
|
||||||
|
|
||||||
/* other codes sets connection to NULL after errors */
|
|
||||||
if (participant->connection == NULL)
|
|
||||||
{
|
|
||||||
successes--;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* if no nodes succeeded for this shard, don't do anything */
|
|
||||||
if (successes == 0)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* otherwise, ensure failed placements are marked inactive */
|
|
||||||
foreach(participantCell, participantList)
|
|
||||||
{
|
|
||||||
NodeConnectionEntry *participant = NULL;
|
|
||||||
participant = (NodeConnectionEntry *) lfirst(participantCell);
|
|
||||||
|
|
||||||
if (participant->connection == NULL)
|
|
||||||
{
|
|
||||||
uint64 shardId = shardConnSet->shardId;
|
|
||||||
NodeConnectionKey *nodeKey = &participant->cacheKey;
|
|
||||||
uint64 shardLength = 0;
|
|
||||||
uint64 placementId = INVALID_PLACEMENT_ID;
|
|
||||||
|
|
||||||
placementId = DeleteShardPlacementRow(shardId, nodeKey->nodeName,
|
|
||||||
nodeKey->nodePort);
|
|
||||||
InsertShardPlacementRow(shardId, placementId, FILE_INACTIVE, shardLength,
|
|
||||||
nodeKey->nodeName, nodeKey->nodePort);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -160,7 +160,6 @@ _PG_init(void)
|
||||||
InitPlacementConnectionManagement();
|
InitPlacementConnectionManagement();
|
||||||
|
|
||||||
/* initialize transaction callbacks */
|
/* initialize transaction callbacks */
|
||||||
RegisterRouterExecutorXactCallbacks();
|
|
||||||
RegisterShardPlacementXactCallbacks();
|
RegisterShardPlacementXactCallbacks();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,18 +17,6 @@
|
||||||
#include "nodes/pg_list.h"
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* XactShardConnSet keeps track of the mapping from shard to the set of nodes
|
|
||||||
* involved in multi-statement transaction-wrapped modifications of that shard.
|
|
||||||
* This information is used to mark placements inactive at transaction close.
|
|
||||||
*/
|
|
||||||
typedef struct XactShardConnSet
|
|
||||||
{
|
|
||||||
uint64 shardId; /* identifier of the shard that was modified */
|
|
||||||
List *connectionEntryList; /* NodeConnectionEntry pointers to participating nodes */
|
|
||||||
} XactShardConnSet;
|
|
||||||
|
|
||||||
|
|
||||||
/* Config variables managed via guc.c */
|
/* Config variables managed via guc.c */
|
||||||
extern bool AllModificationsCommutative;
|
extern bool AllModificationsCommutative;
|
||||||
|
|
||||||
|
@ -37,6 +25,5 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task);
|
||||||
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
|
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
|
||||||
extern void RouterExecutorFinish(QueryDesc *queryDesc);
|
extern void RouterExecutorFinish(QueryDesc *queryDesc);
|
||||||
extern void RouterExecutorEnd(QueryDesc *queryDesc);
|
extern void RouterExecutorEnd(QueryDesc *queryDesc);
|
||||||
extern void RegisterRouterExecutorXactCallbacks(void);
|
|
||||||
|
|
||||||
#endif /* MULTI_ROUTER_EXECUTOR_H_ */
|
#endif /* MULTI_ROUTER_EXECUTOR_H_ */
|
||||||
|
|
|
@ -135,8 +135,7 @@ SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
|
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
|
||||||
ERROR: no transaction participant matches localhost:57638
|
ERROR: cannot open new connections after the first modification command within a transaction
|
||||||
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- this logic even applies to router SELECTs occurring after a modification:
|
-- this logic even applies to router SELECTs occurring after a modification:
|
||||||
-- selecting from the modified node is fine...
|
-- selecting from the modified node is fine...
|
||||||
|
@ -159,34 +158,8 @@ AND sp.nodeport = :worker_1_port
|
||||||
AND s.logicalrelid = 'researchers'::regclass;
|
AND s.logicalrelid = 'researchers'::regclass;
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||||
ERROR: no transaction participant matches localhost:57638
|
ERROR: cannot open new connections after the first modification command within a transaction
|
||||||
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
|
|
||||||
ABORT;
|
ABORT;
|
||||||
-- applies to DDL, too
|
|
||||||
BEGIN;
|
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
|
||||||
ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications
|
|
||||||
COMMIT;
|
|
||||||
-- whether it occurs first or second
|
|
||||||
BEGIN;
|
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
|
||||||
ERROR: distributed data modifications must not appear in transaction blocks which contain distributed DDL commands
|
|
||||||
COMMIT;
|
|
||||||
-- but the DDL should correctly roll back
|
|
||||||
\d labs
|
|
||||||
Table "public.labs"
|
|
||||||
Column | Type | Modifiers
|
|
||||||
--------+--------+-----------
|
|
||||||
id | bigint | not null
|
|
||||||
name | text | not null
|
|
||||||
|
|
||||||
SELECT * FROM labs WHERE id = 6;
|
|
||||||
id | name
|
|
||||||
----+------
|
|
||||||
(0 rows)
|
|
||||||
|
|
||||||
-- COPY can't happen second,
|
-- COPY can't happen second,
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
@ -204,6 +177,7 @@ SELECT name FROM labs WHERE id = 10;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
ERROR: cannot open new connections after the first modification command within a transaction
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- but a double-copy isn't allowed (the first will persist)
|
-- but a double-copy isn't allowed (the first will persist)
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
@ -220,7 +194,7 @@ SELECT name FROM labs WHERE id = 11;
|
||||||
|
|
||||||
-- finally, ALTER and copy aren't compatible
|
-- finally, ALTER and copy aren't compatible
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto2 text;
|
||||||
\copy labs from stdin delimiter ','
|
\copy labs from stdin delimiter ','
|
||||||
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
|
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
|
||||||
CONTEXT: COPY labs, line 1: "12,fsociety,lol"
|
CONTEXT: COPY labs, line 1: "12,fsociety,lol"
|
||||||
|
@ -241,7 +215,7 @@ SELECT * FROM labs WHERE id = 12;
|
||||||
-- and if the copy is before the ALTER...
|
-- and if the copy is before the ALTER...
|
||||||
BEGIN;
|
BEGIN;
|
||||||
\copy labs from stdin delimiter ','
|
\copy labs from stdin delimiter ','
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto3 text;
|
||||||
ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications
|
ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- the DDL fails, but copy persists
|
-- the DDL fails, but copy persists
|
||||||
|
@ -512,6 +486,7 @@ INSERT INTO labs VALUES (9, 'BAD');
|
||||||
COMMIT;
|
COMMIT;
|
||||||
WARNING: illegal value
|
WARNING: illegal value
|
||||||
CONTEXT: while executing command on localhost:57637
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
ERROR: could not commit transaction on any active nodes
|
||||||
-- data to objects should be persisted, but labs should not...
|
-- data to objects should be persisted, but labs should not...
|
||||||
SELECT * FROM objects WHERE id = 1;
|
SELECT * FROM objects WHERE id = 1;
|
||||||
id | name
|
id | name
|
||||||
|
@ -536,9 +511,8 @@ ORDER BY s.logicalrelid, sp.shardstate;
|
||||||
logicalrelid | shardstate | count
|
logicalrelid | shardstate | count
|
||||||
--------------+------------+-------
|
--------------+------------+-------
|
||||||
labs | 1 | 1
|
labs | 1 | 1
|
||||||
objects | 1 | 1
|
objects | 1 | 2
|
||||||
objects | 3 | 1
|
(2 rows)
|
||||||
(3 rows)
|
|
||||||
|
|
||||||
-- some append-partitioned tests for good measure
|
-- some append-partitioned tests for good measure
|
||||||
CREATE TABLE append_researchers ( LIKE researchers );
|
CREATE TABLE append_researchers ( LIKE researchers );
|
||||||
|
|
|
@ -131,22 +131,6 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
-- applies to DDL, too
|
|
||||||
BEGIN;
|
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
|
||||||
COMMIT;
|
|
||||||
|
|
||||||
-- whether it occurs first or second
|
|
||||||
BEGIN;
|
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
|
||||||
COMMIT;
|
|
||||||
|
|
||||||
-- but the DDL should correctly roll back
|
|
||||||
\d labs
|
|
||||||
SELECT * FROM labs WHERE id = 6;
|
|
||||||
|
|
||||||
-- COPY can't happen second,
|
-- COPY can't happen second,
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
@ -178,7 +162,7 @@ SELECT name FROM labs WHERE id = 11;
|
||||||
|
|
||||||
-- finally, ALTER and copy aren't compatible
|
-- finally, ALTER and copy aren't compatible
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto2 text;
|
||||||
\copy labs from stdin delimiter ','
|
\copy labs from stdin delimiter ','
|
||||||
12,fsociety,lol
|
12,fsociety,lol
|
||||||
\.
|
\.
|
||||||
|
@ -193,7 +177,7 @@ BEGIN;
|
||||||
\copy labs from stdin delimiter ','
|
\copy labs from stdin delimiter ','
|
||||||
12,fsociety
|
12,fsociety
|
||||||
\.
|
\.
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto3 text;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
-- the DDL fails, but copy persists
|
-- the DDL fails, but copy persists
|
||||||
|
|
Loading…
Reference in New Issue