Permit "single-shard" transactions

Allows the use of modification commands (INSERT/UPDATE/DELETE) within
transaction blocks (delimited by BEGIN and ROLLBACK/COMMIT), so long as
all modifications hit a subset of nodes involved in the first such com-
mand in the transaction. This does not circumvent the requirement that
each individual modification command must still target a single shard.

For instance, after sending BEGIN, a user might INSERT some rows to a
shard replicated on two nodes. Subsequent modifications can hit other
shards, so long as they are on one or both of these nodes.

SAVEPOINTs are supported, though if the user actually attempts to send
a ROLLBACK command that specifies a SAVEPOINT they will receive an
ERROR at the end of the topmost transaction.

Placements are only marked inactive if at least one replica succeeds
in a transaction where others fail. Non-atomic behavior is possible if
the shard targeted by the initial modification within a transaction has
a higher replication factor than another shard within the same block
and a node with the latter shard has a failure during the COMMIT phase.

Other methods of denoting transaction blocks (multi-statement commands
sent all at once and functions written in e.g. PL/pgSQL or other such
languages) are not presently supported; their treatment remains the
same as before.
pull/627/head
Jason Petersen 2016-06-23 15:43:55 -06:00
parent f7a1191b0a
commit 5d525fba24
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
12 changed files with 1553 additions and 65 deletions

View File

@ -94,6 +94,13 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
char *effectiveDatabaseName = NULL;
char *effectiveUserName = NULL;
if (IsModifyingTransaction)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot open new connections after the first modification "
"command within a transaction")));
}
if (connectionId == INVALID_CONNECTION_ID)
{
ereport(WARNING, (errmsg("could not allocate connection in connection pool")));
@ -174,6 +181,13 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
return connectionId;
}
if (IsModifyingTransaction)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot open new connections after the first modification "
"command within a transaction")));
}
/* transcribe connection paremeters to string */
snprintf(connInfoString, STRING_BUFFER_SIZE, CONN_INFO_TEMPLATE,
nodeName, nodePort, nodeDatabase, userName, CLIENT_CONNECT_TIMEOUT);

View File

@ -11,45 +11,84 @@
* Copyright (c) 2012-2016, Citus Data, Inc.
*/
#include "postgres.h"
#include "postgres.h" /* IWYU pragma: keep */
#include "c.h"
#include "fmgr.h"
#include "fmgr.h" /* IWYU pragma: keep */
#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/xact.h"
#include <string.h>
#include "access/htup.h"
#include "access/sdir.h"
#include "access/transam.h"
#include "catalog/pg_type.h"
#include "access/tupdesc.h"
#include "access/xact.h"
#include "distributed/citus_clauses.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/connection_cache.h"
#include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/relay_utility.h"
#include "distributed/resource_lock.h"
#include "executor/execdesc.h"
#include "executor/executor.h"
#include "executor/instrument.h"
#include "executor/tuptable.h"
#include "lib/stringinfo.h"
#include "nodes/execnodes.h"
#include "nodes/nodes.h"
#include "nodes/params.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "optimizer/clauses.h"
#include "utils/builtins.h"
#include "nodes/plannodes.h"
#include "storage/ipc.h"
#include "storage/lock.h"
#include "tcop/dest.h"
#include "utils/elog.h"
#include "utils/errcodes.h"
#include "utils/hsearch.h"
#include "utils/int8.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/palloc.h"
#include "utils/int8.h"
#if (PG_VERSION_NUM >= 90500)
#include "utils/ruleutils.h"
#endif
#include "utils/tuplestore.h"
/* controls use of locks to enforce safe commutativity */
bool AllModificationsCommutative = false;
/*
* The following static variables are necessary to track the progression of
* multi-statement transactions managed by the router executor. After the first
* modification within a transaction, the executor populates a hash with the
* transaction's initial participants (nodes hit by that initial modification).
*
* To keep track of the reverse mapping (from shards to nodes), we have a list
* of XactShardConnSets, which map a shard identifier to a set of connection
* hash entries. This list is walked by MarkRemainingInactivePlacements to
* ensure we mark placements as failed if they reject a COMMIT.
*
* Beyond that, there's a backend hook to register xact callbacks and a flag to
* track when a user tries to roll back to a savepoint (not allowed).
*/
static HTAB *xactParticipantHash = NULL;
static List *xactShardConnSetList = NIL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static bool subXactAbortAttempted = false;
/* functions needed during start phase */
static void InitTransactionStateForTask(Task *task);
static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery);
static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode);
static HTAB * CreateXactParticipantHash(void);
/* functions needed during run phase */
static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
Task *task,
bool isModificationQuery,
@ -57,7 +96,9 @@ static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
DestReceiver *destination,
Tuplestorestate *tupleStore);
static void DeparseShardQuery(Query *query, Task *task, StringInfo queryString);
static PGconn * GetConnectionForPlacement(ShardPlacement *placement,
bool isModificationQuery);
static void PurgeConnectionForPlacement(ShardPlacement *placement);
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
@ -66,6 +107,16 @@ static bool SendQueryInSingleRowMode(PGconn *connection, char *query,
static bool StoreQueryResult(MaterialState *routerState, PGconn *connection,
TupleDesc tupleDescriptor, int64 *rows);
static bool ConsumeQueryResult(PGconn *connection, int64 *rows);
static void RecordShardIdParticipant(uint64 affectedShardId,
NodeConnectionEntry *participantEntry);
/* functions needed by callbacks and hooks */
static void RegisterRouterExecutorXactCallbacks(void);
static void RouterTransactionCallback(XactEvent event, void *arg);
static void RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg);
static void ExecuteTransactionEnd(bool commit);
static void MarkRemainingInactivePlacements(void);
/*
@ -82,12 +133,21 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
/* ensure that the task is not NULL */
Assert(task != NULL);
/* disallow transactions and triggers during distributed modify commands */
/* disallow triggers during distributed modify commands */
if (commandType != CMD_SELECT)
{
bool topLevel = true;
PreventTransactionChain(topLevel, "distributed commands");
eflags |= EXEC_FLAG_SKIP_TRIGGERS;
/*
* We could naturally handle function-based transactions (i.e. those
* using PL/pgSQL or similar) by checking the type of queryDesc->dest,
* but some customers already use functions that touch multiple shards
* from within a function, so we'll ignore functions for now.
*/
if (IsTransactionBlock() && xactParticipantHash == NULL)
{
InitTransactionStateForTask(task);
}
}
/* signal that it is a router execution */
@ -122,6 +182,62 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
}
/*
* InitTransactionStateForTask is called during executor start with the first
* modifying (INSERT/UPDATE/DELETE) task during a transaction. It creates the
* transaction participant hash, opens connections to this task's nodes, and
* populates the hash with those connections after sending BEGIN commands to
* each. If a node fails to respond, its connection is set to NULL to prevent
* further interaction with it during the transaction.
*/
static void
InitTransactionStateForTask(Task *task)
{
ListCell *placementCell = NULL;
xactParticipantHash = CreateXactParticipantHash();
foreach(placementCell, task->taskPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
NodeConnectionKey participantKey;
NodeConnectionEntry *participantEntry = NULL;
bool entryFound = false;
PGconn *connection = NULL;
MemSet(&participantKey, 0, sizeof(participantKey));
strlcpy(participantKey.nodeName, placement->nodeName,
MAX_NODE_LENGTH + 1);
participantKey.nodePort = placement->nodePort;
participantEntry = hash_search(xactParticipantHash, &participantKey,
HASH_ENTER, &entryFound);
Assert(!entryFound);
connection = GetOrEstablishConnection(placement->nodeName,
placement->nodePort);
if (connection != NULL)
{
PGresult *result = PQexec(connection, "BEGIN");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
WarnRemoteError(connection, result);
PurgeConnection(connection);
connection = NULL;
}
PQclear(result);
}
participantEntry->connection = connection;
}
IsModifyingTransaction = true;
}
/*
* CommutativityRuleToLockMode determines the commutativity rule for the given
* command and returns the appropriate lock mode to enforce that rule. The
@ -188,6 +304,40 @@ AcquireExecutorShardLock(Task *task, LOCKMODE lockMode)
}
/*
* CreateXactParticipantHash initializes the map used to store the connections
* needed to process distributed transactions. Unlike the connection cache, we
* permit NULL connections here to signify that a participant has seen an error
* and is no longer receiving commands during a transaction. This hash should
* be walked at transaction end to send final COMMIT or ABORT commands.
*/
static HTAB *
CreateXactParticipantHash(void)
{
HTAB *xactParticipantHash = NULL;
HASHCTL info;
int hashFlags = 0;
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(NodeConnectionKey);
info.entrysize = sizeof(NodeConnectionEntry);
info.hcxt = TopTransactionContext;
hashFlags = (HASH_ELEM | HASH_CONTEXT);
#if (PG_VERSION_NUM >= 90500)
hashFlags |= HASH_BLOBS;
#else
hashFlags |= HASH_FUNCTION;
info.hash = tag_hash;
#endif
xactParticipantHash = hash_create("citus xact participant hash", 32, &info,
hashFlags);
return xactParticipantHash;
}
/*
* RouterExecutorRun actually executes a single task on a worker.
*/
@ -328,7 +478,6 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
List *taskPlacementList = task->taskPlacementList;
ListCell *taskPlacementCell = NULL;
List *failedPlacementList = NIL;
ListCell *failedPlacementCell = NULL;
int64 affectedTupleCount = -1;
bool gotResults = false;
char *queryString = task->queryString;
@ -338,10 +487,11 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
PlannedStmt *planStatement = queryDesc->plannedstmt;
MultiPlan *multiPlan = GetMultiPlan(planStatement);
Query *query = multiPlan->workerJob->jobQuery;
Oid relid = ((RangeTblEntry *) linitial(query->rtable))->relid;
StringInfo queryStringInfo = makeStringInfo();
ExecuteMasterEvaluableFunctions(query);
DeparseShardQuery(query, task, queryStringInfo);
deparse_shard_query(query, relid, task->anchorShardId, queryStringInfo);
queryString = queryStringInfo->data;
elog(DEBUG4, "query before master evaluation: %s", task->queryString);
@ -355,11 +505,10 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
foreach(taskPlacementCell, taskPlacementList)
{
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
char *nodeName = taskPlacement->nodeName;
int32 nodePort = taskPlacement->nodePort;
bool queryOK = false;
int64 currentAffectedTupleCount = 0;
PGconn *connection = GetOrEstablishConnection(nodeName, nodePort);
PGconn *connection = GetConnectionForPlacement(taskPlacement,
isModificationQuery);
if (connection == NULL)
{
@ -370,7 +519,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK)
{
PurgeConnection(connection);
PurgeConnectionForPlacement(taskPlacement);
failedPlacementList = lappend(failedPlacementList, taskPlacement);
continue;
}
@ -404,7 +553,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
"to modify "INT64_FORMAT,
currentAffectedTupleCount, affectedTupleCount),
errdetail("modified placement on %s:%d",
nodeName, nodePort)));
taskPlacement->nodeName, taskPlacement->nodePort)));
}
#if (PG_VERSION_NUM < 90600)
@ -427,7 +576,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
}
else
{
PurgeConnection(connection);
PurgeConnectionForPlacement(taskPlacement);
failedPlacementList = lappend(failedPlacementList, taskPlacement);
@ -437,6 +586,8 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
if (isModificationQuery)
{
ListCell *failedPlacementCell = NULL;
/* if all placements failed, error out */
if (list_length(failedPlacementList) == list_length(task->taskPlacementList))
{
@ -463,16 +614,6 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
}
static void
DeparseShardQuery(Query *query, Task *task, StringInfo queryString)
{
uint64 shardId = task->anchorShardId;
Oid relid = ((RangeTblEntry *) linitial(query->rtable))->relid;
deparse_shard_query(query, relid, shardId, queryString);
}
/*
* ReturnRowsFromTuplestore moves rows from a given tuplestore into a
* receiver. It performs the necessary limiting to support cursors.
@ -517,6 +658,99 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
}
/*
* GetConnectionForPlacement is the main entry point for acquiring a connection
* within the router executor. By using placements (rather than node names and
* ports) to identify connections, the router executor can keep track of shards
* used by multi-statement transactions and error out if a transaction tries
* to reach a new node altogether). In the single-statement commands context,
* GetConnectionForPlacement simply falls through to GetOrEstablishConnection.
*/
static PGconn *
GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery)
{
NodeConnectionKey participantKey;
NodeConnectionEntry *participantEntry = NULL;
bool entryFound = false;
/* if not in a transaction, fall through to connection cache */
if (xactParticipantHash == NULL)
{
PGconn *connection = GetOrEstablishConnection(placement->nodeName,
placement->nodePort);
return connection;
}
Assert(IsTransactionBlock());
MemSet(&participantKey, 0, sizeof(participantKey));
strlcpy(participantKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
participantKey.nodePort = placement->nodePort;
participantEntry = hash_search(xactParticipantHash, &participantKey, HASH_FIND,
&entryFound);
if (entryFound)
{
if (isModificationQuery)
{
RecordShardIdParticipant(placement->shardId, participantEntry);
}
return participantEntry->connection;
}
else
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
errmsg("no transaction participant matches %s:%d",
placement->nodeName, placement->nodePort),
errdetail("Transactions which modify distributed tables may only "
"target nodes affected by the modification command "
"which began the transaction.")));
}
}
/*
* PurgeConnectionForPlacement provides a way to purge an invalid connection
* from all relevant connection hashes using the placement involved in the
* query at the time of the error. If a transaction is ongoing, this function
* ensures the right node's connection is set to NULL in the participant map
* for the transaction in addition to purging the connection cache's entry.
*/
static void
PurgeConnectionForPlacement(ShardPlacement *placement)
{
NodeConnectionKey nodeKey;
char *currentUser = CurrentUserName();
MemSet(&nodeKey, 0, sizeof(NodeConnectionKey));
strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
nodeKey.nodePort = placement->nodePort;
strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN);
PurgeConnectionByKey(&nodeKey);
if (xactParticipantHash != NULL)
{
NodeConnectionEntry *participantEntry = NULL;
bool entryFound = false;
Assert(IsTransactionBlock());
/* the participant hash doesn't use the user field */
MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser));
participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND,
&entryFound);
Assert(entryFound);
participantEntry->connection = NULL;
}
}
/*
* SendQueryInSingleRowMode sends the given query on the connection in an
* asynchronous way. The function also sets the single-row mode on the
@ -830,6 +1064,50 @@ ConsumeQueryResult(PGconn *connection, int64 *rows)
}
/*
* RecordShardIdParticipant registers a connection as being involved with a
* particular shard during a multi-statement transaction.
*/
static void
RecordShardIdParticipant(uint64 affectedShardId, NodeConnectionEntry *participantEntry)
{
XactShardConnSet *shardConnSetMatch = NULL;
ListCell *listCell = NULL;
MemoryContext oldContext = NULL;
List *connectionEntryList = NIL;
/* check whether an entry already exists for this shard */
foreach(listCell, xactShardConnSetList)
{
XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(listCell);
if (shardConnSet->shardId == affectedShardId)
{
shardConnSetMatch = shardConnSet;
}
}
/* entries must last through the whole top-level transaction */
oldContext = MemoryContextSwitchTo(TopTransactionContext);
/* if no entry found, make one */
if (shardConnSetMatch == NULL)
{
shardConnSetMatch = (XactShardConnSet *) palloc0(sizeof(XactShardConnSet));
shardConnSetMatch->shardId = affectedShardId;
xactShardConnSetList = lappend(xactShardConnSetList, shardConnSetMatch);
}
/* add connection, avoiding duplicates */
connectionEntryList = shardConnSetMatch->connectionEntryList;
shardConnSetMatch->connectionEntryList = list_append_unique_ptr(connectionEntryList,
participantEntry);
MemoryContextSwitchTo(oldContext);
}
/*
* RouterExecutorFinish cleans up after a distributed execution.
*/
@ -864,3 +1142,240 @@ RouterExecutorEnd(QueryDesc *queryDesc)
queryDesc->estate = NULL;
queryDesc->totaltime = NULL;
}
/*
* InstallRouterExecutorShmemHook simply installs a hook (intended to be called
* once during backend startup), which will itself register all the transaction
* callbacks needed by this executor.
*/
void
InstallRouterExecutorShmemHook(void)
{
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = RegisterRouterExecutorXactCallbacks;
}
/*
* RegisterRouterExecutorXactCallbacks registers (sub-)transaction callbacks
* needed by this executor before calling any previous shmem startup hooks.
*/
static void
RegisterRouterExecutorXactCallbacks(void)
{
RegisterXactCallback(RouterTransactionCallback, NULL);
RegisterSubXactCallback(RouterSubtransactionCallback, NULL);
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
}
/*
* RouterTransactionCallback handles committing or aborting remote transactions
* after the local one has committed or aborted. It only sends COMMIT or ABORT
* commands to still-healthy remotes; the failed ones are marked as inactive if
* after a successful COMMIT (no need to mark on ABORTs).
*/
static void
RouterTransactionCallback(XactEvent event, void *arg)
{
if (xactParticipantHash == NULL)
{
return;
}
switch (event)
{
#if (PG_VERSION_NUM >= 90500)
case XACT_EVENT_PARALLEL_COMMIT:
#endif
case XACT_EVENT_COMMIT:
{
break;
}
#if (PG_VERSION_NUM >= 90500)
case XACT_EVENT_PARALLEL_ABORT:
#endif
case XACT_EVENT_ABORT:
{
bool commit = false;
ExecuteTransactionEnd(commit);
break;
}
/* no support for prepare with multi-statement transactions */
case XACT_EVENT_PREPARE:
case XACT_EVENT_PRE_PREPARE:
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot prepare a transaction that modified "
"distributed tables")));
break;
}
#if (PG_VERSION_NUM >= 90500)
case XACT_EVENT_PARALLEL_PRE_COMMIT:
#endif
case XACT_EVENT_PRE_COMMIT:
{
bool commit = true;
if (subXactAbortAttempted)
{
subXactAbortAttempted = false;
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot ROLLBACK TO SAVEPOINT in transactions "
"which modify distributed tables")));
}
ExecuteTransactionEnd(commit);
MarkRemainingInactivePlacements();
/* leave early to avoid resetting transaction state */
return;
}
}
/* reset transaction state */
IsModifyingTransaction = false;
xactParticipantHash = NULL;
xactShardConnSetList = NIL;
subXactAbortAttempted = false;
}
/*
* RouterSubtransactionCallback silently keeps track of any attempt to ROLLBACK
* TO SAVEPOINT, which is not permitted by this executor. At transaction end,
* the executor checks whether such a rollback was attempted and, if so, errors
* out entirely (with an appropriate message).
*
* This implementation permits savepoints so long as no rollbacks occur.
*/
static void
RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg)
{
if ((xactParticipantHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB))
{
subXactAbortAttempted = true;
}
}
/*
* ExecuteTransactionEnd ends any remote transactions still taking place on
* remote nodes. It uses xactParticipantHash to know which nodes need any
* final COMMIT or ABORT commands. Nodes which fail a final COMMIT will have
* their connection field set to NULL to permit placement invalidation.
*/
static void
ExecuteTransactionEnd(bool commit)
{
const char *sqlCommand = commit ? "COMMIT TRANSACTION" : "ABORT TRANSACTION";
HASH_SEQ_STATUS scan;
NodeConnectionEntry *participant;
bool completed = !commit; /* aborts are assumed completed */
hash_seq_init(&scan, xactParticipantHash);
while ((participant = (NodeConnectionEntry *) hash_seq_search(&scan)))
{
PGconn *connection = participant->connection;
PGresult *result = NULL;
if (PQstatus(connection) != CONNECTION_OK)
{
continue;
}
result = PQexec(connection, sqlCommand);
if (PQresultStatus(result) == PGRES_COMMAND_OK)
{
completed = true;
}
else
{
WarnRemoteError(connection, result);
PurgeConnection(participant->connection);
participant->connection = NULL;
}
PQclear(result);
}
if (!completed)
{
ereport(ERROR, (errmsg("could not commit transaction on any active nodes")));
}
}
/*
* MarkRemainingInactivePlacements takes care of marking placements of a shard
* inactive after some of the placements rejected the final COMMIT phase of a
* transaction. This step is skipped if all placements reject the COMMIT, since
* in that case no modifications to the placement have persisted.
*
* Failures are detected by checking the connection field of the entries in the
* connection set for each shard: it is always set to NULL after errors.
*/
static void
MarkRemainingInactivePlacements(void)
{
ListCell *shardConnSetCell = NULL;
foreach(shardConnSetCell, xactShardConnSetList)
{
XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(shardConnSetCell);
List *participantList = shardConnSet->connectionEntryList;
ListCell *participantCell = NULL;
int successes = list_length(participantList); /* assume full success */
/* determine how many actual successes there were: subtract failures */
foreach(participantCell, participantList)
{
NodeConnectionEntry *participant = NULL;
participant = (NodeConnectionEntry *) lfirst(participantCell);
/* other codes sets connection to NULL after errors */
if (participant->connection == NULL)
{
successes--;
}
}
/* if no nodes succeeded for this shard, don't do anything */
if (successes == 0)
{
continue;
}
/* otherwise, ensure failed placements are marked inactive */
foreach(participantCell, participantList)
{
NodeConnectionEntry *participant = NULL;
participant = (NodeConnectionEntry *) lfirst(participantCell);
if (participant->connection == NULL)
{
uint64 shardId = shardConnSet->shardId;
NodeConnectionKey *nodeKey = &participant->cacheKey;
uint64 shardLength = 0;
DeleteShardPlacementRow(shardId, nodeKey->nodeName, nodeKey->nodePort);
InsertShardPlacementRow(shardId, FILE_INACTIVE, shardLength,
nodeKey->nodeName, nodeKey->nodePort);
}
}
}
}

View File

@ -147,6 +147,9 @@ _PG_init(void)
/* initialize worker node manager */
WorkerNodeRegister();
/* initialize router executor callbacks */
InstallRouterExecutorShmemHook();
}

View File

@ -32,6 +32,9 @@
#include "utils/palloc.h"
/* state needed to prevent new connections during modifying transactions */
bool IsModifyingTransaction = false;
/*
* NodeConnectionHash is the connection hash itself. It begins uninitialized.
* The first call to GetOrEstablishConnection triggers hash creation.
@ -81,9 +84,9 @@ GetOrEstablishConnection(char *nodeName, int32 nodePort)
}
memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey));
strncpy(nodeConnectionKey.nodeName, nodeName, MAX_NODE_LENGTH);
strlcpy(nodeConnectionKey.nodeName, nodeName, MAX_NODE_LENGTH + 1);
nodeConnectionKey.nodePort = nodePort;
strncpy(nodeConnectionKey.nodeUser, userName, NAMEDATALEN);
strlcpy(nodeConnectionKey.nodeUser, userName, NAMEDATALEN);
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
HASH_FIND, &entryFound);
@ -124,11 +127,10 @@ void
PurgeConnection(PGconn *connection)
{
NodeConnectionKey nodeConnectionKey;
NodeConnectionEntry *nodeConnectionEntry = NULL;
bool entryFound = false;
char *nodeNameString = NULL;
char *nodePortString = NULL;
char *nodeUserString = NULL;
PGconn *purgedConnection = NULL;
nodeNameString = ConnectionGetOptionValue(connection, "host");
if (nodeNameString == NULL)
@ -152,42 +154,54 @@ PurgeConnection(PGconn *connection)
}
memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey));
strncpy(nodeConnectionKey.nodeName, nodeNameString, MAX_NODE_LENGTH);
strlcpy(nodeConnectionKey.nodeName, nodeNameString, MAX_NODE_LENGTH + 1);
nodeConnectionKey.nodePort = pg_atoi(nodePortString, sizeof(int32), 0);
strncpy(nodeConnectionKey.nodeUser, nodeUserString, NAMEDATALEN);
strlcpy(nodeConnectionKey.nodeUser, nodeUserString, NAMEDATALEN);
pfree(nodeNameString);
pfree(nodePortString);
pfree(nodeUserString);
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
HASH_REMOVE, &entryFound);
purgedConnection = PurgeConnectionByKey(&nodeConnectionKey);
/*
* It's possible the provided connection matches the host and port for
* an entry in the hash without being precisely the same connection. In
* that case, we will want to close the provided connection in addition
* to the one from the hash (which was closed by PurgeConnectionByKey).
*/
if (purgedConnection != connection)
{
ereport(WARNING, (errmsg("hash entry for \"%s:%d\" contained different "
"connection than that provided by caller",
nodeConnectionKey.nodeName,
nodeConnectionKey.nodePort)));
PQfinish(connection);
}
}
PGconn *
PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey)
{
bool entryFound = false;
NodeConnectionEntry *nodeConnectionEntry = NULL;
nodeConnectionEntry = hash_search(NodeConnectionHash, nodeConnectionKey, HASH_REMOVE,
&entryFound);
if (entryFound)
{
/*
* It's possible the provided connection matches the host and port for
* an entry in the hash without being precisely the same connection. In
* that case, we will want to close the hash's connection (because the
* entry has already been removed) in addition to the provided one.
*/
if (nodeConnectionEntry->connection != connection)
{
ereport(WARNING, (errmsg("hash entry for \"%s:%d\" contained different "
"connection than that provided by caller",
nodeConnectionKey.nodeName,
nodeConnectionKey.nodePort)));
PQfinish(nodeConnectionEntry->connection);
}
PQfinish(nodeConnectionEntry->connection);
}
else
{
ereport(WARNING, (errcode(ERRCODE_NO_DATA),
errmsg("could not find hash entry for connection to \"%s:%d\"",
nodeConnectionKey.nodeName,
nodeConnectionKey.nodePort)));
nodeConnectionKey->nodeName,
nodeConnectionKey->nodePort)));
}
PQfinish(connection);
return nodeConnectionEntry->connection;
}
@ -370,6 +384,13 @@ ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser)
sprintf(nodePortString, "%d", nodePort);
if (IsModifyingTransaction)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot open new connections after the first modification "
"command within a transaction")));
}
Assert(sizeof(keywordArray) == sizeof(valueArray));
for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++)

View File

@ -399,8 +399,8 @@ TrackerCleanupJobSchemas(void)
cleanupTask->assignedAt = HIGH_PRIORITY_TASK_TIME;
cleanupTask->taskStatus = TASK_ASSIGNED;
strncpy(cleanupTask->taskCallString, JOB_SCHEMA_CLEANUP, TASK_CALL_STRING_SIZE);
strncpy(cleanupTask->databaseName, databaseName, NAMEDATALEN);
strlcpy(cleanupTask->taskCallString, JOB_SCHEMA_CLEANUP, TASK_CALL_STRING_SIZE);
strlcpy(cleanupTask->databaseName, databaseName, NAMEDATALEN);
/* zero out all other fields */
cleanupTask->connectionId = INVALID_CONNECTION_ID;

View File

@ -313,13 +313,13 @@ CreateTask(uint64 jobId, uint32 taskId, char *taskCallString)
/* enter the worker task into shared hash and initialize the task */
workerTask = WorkerTasksHashEnter(jobId, taskId);
workerTask->assignedAt = assignmentTime;
strncpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE);
strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE);
workerTask->taskStatus = TASK_ASSIGNED;
workerTask->connectionId = INVALID_CONNECTION_ID;
workerTask->failureCount = 0;
strncpy(workerTask->databaseName, databaseName, NAMEDATALEN);
strncpy(workerTask->userName, userName, NAMEDATALEN);
strlcpy(workerTask->databaseName, databaseName, NAMEDATALEN);
strlcpy(workerTask->userName, userName, NAMEDATALEN);
}
@ -350,13 +350,13 @@ UpdateTask(WorkerTask *workerTask, char *taskCallString)
}
else if (taskStatus == TASK_PERMANENTLY_FAILED)
{
strncpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE);
strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE);
workerTask->failureCount = 0;
workerTask->taskStatus = TASK_ASSIGNED;
}
else
{
strncpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE);
strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE);
workerTask->failureCount = 0;
}
}

View File

@ -53,9 +53,14 @@ typedef struct NodeConnectionEntry
} NodeConnectionEntry;
/* state needed to prevent new connections during modifying transactions */
extern bool IsModifyingTransaction;
/* function declarations for obtaining and using a connection */
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
extern void PurgeConnection(PGconn *connection);
extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey);
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
extern void WarnRemoteError(PGconn *connection, PGresult *result);
extern void ReraiseRemoteError(PGconn *connection, PGresult *result);

View File

@ -14,6 +14,9 @@
#ifndef MULTI_PHYSICAL_PLANNER_H
#define MULTI_PHYSICAL_PLANNER_H
#include "postgres.h"
#include "c.h"
#include "datatype/timestamp.h"
#include "distributed/citus_nodes.h"
#include "distributed/master_metadata_utility.h"

View File

@ -9,9 +9,27 @@
#ifndef MULTI_ROUTER_EXECUTOR_H_
#define MULTI_ROUTER_EXECUTOR_H_
#include "c.h"
#include "access/sdir.h"
#include "distributed/multi_physical_planner.h"
#include "executor/execdesc.h"
#include "nodes/pg_list.h"
/*
* XactShardConnSet keeps track of the mapping from shard to the set of nodes
* involved in multi-statement transaction-wrapped modifications of that shard.
* This information is used to mark placements inactive at transaction close.
*/
typedef struct XactShardConnSet
{
uint64 shardId; /* identifier of the shard that was modified */
List *connectionEntryList; /* NodeConnectionEntry pointers to participating nodes */
} XactShardConnSet;
/* Config variables managed via guc.c */
extern bool AllModificationsCommutative;
@ -19,5 +37,6 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task);
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
extern void RouterExecutorFinish(QueryDesc *queryDesc);
extern void RouterExecutorEnd(QueryDesc *queryDesc);
extern void InstallRouterExecutorShmemHook(void);
#endif /* MULTI_ROUTER_EXECUTOR_H_ */

View File

@ -0,0 +1,507 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1200000;
-- ===================================================================
-- test end-to-end modification functionality
-- ===================================================================
CREATE TABLE researchers (
id bigint NOT NULL,
lab_id int NOT NULL,
name text NOT NULL
);
CREATE TABLE labs (
id bigint NOT NULL,
name text NOT NULL
);
SELECT master_create_distributed_table('researchers', 'lab_id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('researchers', 2, 2);
master_create_worker_shards
-----------------------------
(1 row)
SELECT master_create_distributed_table('labs', 'id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('labs', 1, 1);
master_create_worker_shards
-----------------------------
(1 row)
-- add some data
INSERT INTO researchers VALUES (1, 1, 'Donald Knuth');
INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth');
INSERT INTO researchers VALUES (3, 2, 'Tony Hoare');
INSERT INTO researchers VALUES (4, 2, 'Kenneth Iverson');
-- replace a researcher, reusing their id
BEGIN;
DELETE FROM researchers WHERE lab_id = 1 AND id = 2;
INSERT INTO researchers VALUES (2, 1, 'John Backus');
COMMIT;
SELECT name FROM researchers WHERE lab_id = 1 AND id = 2;
name
-------------
John Backus
(1 row)
-- abort a modification
BEGIN;
DELETE FROM researchers WHERE lab_id = 1 AND id = 1;
ABORT;
SELECT name FROM researchers WHERE lab_id = 1 AND id = 1;
name
--------------
Donald Knuth
(1 row)
-- creating savepoints should work...
BEGIN;
INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie');
SAVEPOINT hire_thompson;
INSERT INTO researchers VALUES (6, 3, 'Ken Thompson');
COMMIT;
SELECT name FROM researchers WHERE lab_id = 3 AND id = 6;
name
--------------
Ken Thompson
(1 row)
-- even if created by PL/pgSQL...
BEGIN;
DO $$
BEGIN
INSERT INTO researchers VALUES (10, 10, 'Edsger Dijkstra');
EXCEPTION
WHEN not_null_violation THEN
RAISE NOTICE 'caught not_null_violation';
END $$;
COMMIT;
-- but rollback should not
BEGIN;
INSERT INTO researchers VALUES (7, 4, 'Jim Gray');
SAVEPOINT hire_engelbart;
INSERT INTO researchers VALUES (8, 4, 'Douglas Engelbart');
ROLLBACK TO hire_engelbart;
COMMIT;
ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables
SELECT name FROM researchers WHERE lab_id = 4;
name
------
(0 rows)
BEGIN;
DO $$
BEGIN
INSERT INTO researchers VALUES (NULL, 10, 'Edsger Dijkstra');
EXCEPTION
WHEN not_null_violation THEN
RAISE NOTICE 'caught not_null_violation';
END $$;
NOTICE: caught not_null_violation
COMMIT;
ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables
-- should be valid to edit labs after researchers...
BEGIN;
INSERT INTO researchers VALUES (8, 5, 'Douglas Engelbart');
INSERT INTO labs VALUES (5, 'Los Alamos');
COMMIT;
SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id;
id | lab_id | name | id | name
----+--------+-------------------+----+------------
8 | 5 | Douglas Engelbart | 5 | Los Alamos
(1 row)
-- but not the other way around (would require expanding xact participants)...
BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs');
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
ERROR: no transaction participant matches localhost:57638
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
COMMIT;
-- this logic even applies to router SELECTs occurring after a modification:
-- selecting from the modified node is fine...
BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs');
SELECT count(*) FROM researchers WHERE lab_id = 6;
count
-------
0
(1 row)
ABORT;
-- but if a SELECT needs to go to new node, that's a problem...
BEGIN;
UPDATE pg_dist_shard_placement AS sp SET shardstate = 3
FROM pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND sp.nodename = 'localhost'
AND sp.nodeport = :worker_1_port
AND s.logicalrelid = 'researchers'::regclass;
INSERT INTO labs VALUES (6, 'Bell Labs');
SELECT count(*) FROM researchers WHERE lab_id = 6;
ERROR: no transaction participant matches localhost:57638
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
ABORT;
-- applies to DDL or COPY, too
BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs');
ALTER TABLE labs ADD COLUMN text motto;
ERROR: distributed DDL commands cannot run inside a transaction block
COMMIT;
BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs');
\copy labs from stdin delimiter ','
ERROR: cannot open new connections after the first modification command within a transaction
CONTEXT: COPY labs, line 1: "10,Weyland-Yutani"
COMMIT;
-- though the copy will work if before any modifications
BEGIN;
\copy labs from stdin delimiter ','
SELECT name FROM labs WHERE id = 10;
name
----------------
Weyland-Yutani
(1 row)
INSERT INTO labs VALUES (6, 'Bell Labs');
COMMIT;
-- now, for some special failures...
CREATE TABLE objects (
id bigint PRIMARY KEY,
name text NOT NULL
);
SELECT master_create_distributed_table('objects', 'id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('objects', 1, 2);
master_create_worker_shards
-----------------------------
(1 row)
-- test primary key violations
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (1, 'orange');
ERROR: duplicate key value violates unique constraint "objects_pkey_1200003"
DETAIL: Key (id)=(1) already exists.
CONTEXT: while executing command on localhost:57637
COMMIT;
-- data shouldn't have persisted...
SELECT * FROM objects WHERE id = 1;
id | name
----+------
(0 rows)
-- and placements should still be healthy...
SELECT count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND sp.shardstate = 1
AND s.logicalrelid = 'objects'::regclass;
count
-------
2
(1 row)
-- create trigger on one worker to reject certain values
\c - - - :worker_2_port
CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$
BEGIN
IF (NEW.name = 'BAD') THEN
RAISE 'illegal value';
END IF;
RETURN NEW;
END;
$rb$ LANGUAGE plpgsql;
CREATE CONSTRAINT TRIGGER reject_bad
AFTER INSERT ON objects_1200003
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
-- test partial failure; worker_1 succeeds, 2 fails
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
WARNING: illegal value
CONTEXT: while executing command on localhost:57638
INSERT INTO labs VALUES (7, 'E Corp');
COMMIT;
-- data should be persisted
SELECT * FROM objects WHERE id = 2;
id | name
----+------
2 | BAD
(1 row)
SELECT * FROM labs WHERE id = 7;
id | name
----+--------
7 | E Corp
(1 row)
-- but one placement should be bad
SELECT count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND sp.nodename = 'localhost'
AND sp.nodeport = :worker_2_port
AND sp.shardstate = 3
AND s.logicalrelid = 'objects'::regclass;
count
-------
1
(1 row)
DELETE FROM objects;
-- mark shards as healthy again; delete all data
UPDATE pg_dist_shard_placement AS sp SET shardstate = 1
FROM pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND s.logicalrelid = 'objects'::regclass;
-- what if there are errors on different shards at different times?
\c - - - :worker_1_port
CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$
BEGIN
IF (NEW.name = 'BAD') THEN
RAISE 'illegal value';
END IF;
RETURN NEW;
END;
$rb$ LANGUAGE plpgsql;
CREATE CONSTRAINT TRIGGER reject_bad
AFTER INSERT ON labs_1200002
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
WARNING: illegal value
CONTEXT: while executing command on localhost:57638
INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD');
WARNING: illegal value
CONTEXT: while executing command on localhost:57637
ERROR: could not modify any active placements
COMMIT;
-- data should NOT be persisted
SELECT * FROM objects WHERE id = 1;
id | name
----+------
(0 rows)
SELECT * FROM labs WHERE id = 8;
id | name
----+------
(0 rows)
-- all placements should remain healthy
SELECT count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND sp.shardstate = 1
AND (s.logicalrelid = 'objects'::regclass OR
s.logicalrelid = 'labs'::regclass);
count
-------
3
(1 row)
-- what if the failures happen at COMMIT time?
\c - - - :worker_2_port
DROP TRIGGER reject_bad ON objects_1200003;
CREATE CONSTRAINT TRIGGER reject_bad
AFTER INSERT ON objects_1200003
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
-- should be the same story as before, just at COMMIT time
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
INSERT INTO labs VALUES (9, 'Umbrella Corporation');
COMMIT;
WARNING: illegal value
CONTEXT: while executing command on localhost:57638
-- data should be persisted
SELECT * FROM objects WHERE id = 2;
id | name
----+------
2 | BAD
(1 row)
SELECT * FROM labs WHERE id = 7;
id | name
----+--------
7 | E Corp
(1 row)
-- but one placement should be bad
SELECT count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND sp.nodename = 'localhost'
AND sp.nodeport = :worker_2_port
AND sp.shardstate = 3
AND s.logicalrelid = 'objects'::regclass;
count
-------
1
(1 row)
DELETE FROM objects;
-- mark shards as healthy again; delete all data
UPDATE pg_dist_shard_placement AS sp SET shardstate = 1
FROM pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND s.logicalrelid = 'objects'::regclass;
-- what if all nodes have failures at COMMIT time?
\c - - - :worker_1_port
DROP TRIGGER reject_bad ON labs_1200002;
CREATE CONSTRAINT TRIGGER reject_bad
AFTER INSERT ON labs_1200002
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD');
COMMIT;
WARNING: illegal value
CONTEXT: while executing command on localhost:57638
WARNING: illegal value
CONTEXT: while executing command on localhost:57637
ERROR: could not commit transaction on any active nodes
-- data should NOT be persisted
SELECT * FROM objects WHERE id = 1;
id | name
----+------
(0 rows)
SELECT * FROM labs WHERE id = 8;
id | name
----+------
(0 rows)
-- all placements should remain healthy
SELECT count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND sp.shardstate = 1
AND (s.logicalrelid = 'objects'::regclass OR
s.logicalrelid = 'labs'::regclass);
count
-------
3
(1 row)
-- what if one shard (objects) succeeds but another (labs) completely fails?
\c - - - :worker_2_port
DROP TRIGGER reject_bad ON objects_1200003;
\c - - - :master_port
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD');
COMMIT;
WARNING: illegal value
CONTEXT: while executing command on localhost:57637
-- data to objects should be persisted, but labs should not...
SELECT * FROM objects WHERE id = 1;
id | name
----+-------
1 | apple
(1 row)
SELECT * FROM labs WHERE id = 8;
id | name
----+------
(0 rows)
-- labs should be healthy, but one object placement shouldn't be
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND (s.logicalrelid = 'objects'::regclass OR
s.logicalrelid = 'labs'::regclass)
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
logicalrelid | shardstate | count
--------------+------------+-------
labs | 1 | 1
objects | 1 | 1
objects | 3 | 1
(3 rows)
-- some append-partitioned tests for good measure
CREATE TABLE append_researchers ( LIKE researchers );
SELECT master_create_distributed_table('append_researchers', 'id', 'append');
master_create_distributed_table
---------------------------------
(1 row)
SET citus.shard_replication_factor TO 1;
SELECT master_create_empty_shard('append_researchers') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 500000
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('append_researchers') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 500000, shardmaxvalue = 1000000
WHERE shardid = :new_shard_id;
SET citus.shard_replication_factor TO DEFAULT;
-- try single-shard INSERT
BEGIN;
INSERT INTO append_researchers VALUES (0, 0, 'John Backus');
COMMIT;
SELECT * FROM append_researchers WHERE id = 0;
id | lab_id | name
----+--------+-------------
0 | 0 | John Backus
(1 row)
-- try rollback
BEGIN;
DELETE FROM append_researchers WHERE id = 0;
ROLLBACK;
SELECT * FROM append_researchers WHERE id = 0;
id | lab_id | name
----+--------+-------------
0 | 0 | John Backus
(1 row)
-- try hitting shard on other node
BEGIN;
INSERT INTO append_researchers VALUES (1, 1, 'John McCarthy');
INSERT INTO append_researchers VALUES (500000, 500000, 'Tony Hoare');
ERROR: distributed modifications must target exactly one shard
ROLLBACK;
SELECT * FROM append_researchers;
id | lab_id | name
----+--------+-------------
0 | 0 | John Backus
(1 row)

View File

@ -129,6 +129,7 @@ test: multi_utilities
test: multi_create_insert_proxy
test: multi_data_types
test: multi_repartitioned_subquery_udf
test: multi_modifying_xacts
# ---------
# multi_copy creates hash and range-partitioned tables and performs COPY

View File

@ -0,0 +1,400 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1200000;
-- ===================================================================
-- test end-to-end modification functionality
-- ===================================================================
CREATE TABLE researchers (
id bigint NOT NULL,
lab_id int NOT NULL,
name text NOT NULL
);
CREATE TABLE labs (
id bigint NOT NULL,
name text NOT NULL
);
SELECT master_create_distributed_table('researchers', 'lab_id', 'hash');
SELECT master_create_worker_shards('researchers', 2, 2);
SELECT master_create_distributed_table('labs', 'id', 'hash');
SELECT master_create_worker_shards('labs', 1, 1);
-- add some data
INSERT INTO researchers VALUES (1, 1, 'Donald Knuth');
INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth');
INSERT INTO researchers VALUES (3, 2, 'Tony Hoare');
INSERT INTO researchers VALUES (4, 2, 'Kenneth Iverson');
-- replace a researcher, reusing their id
BEGIN;
DELETE FROM researchers WHERE lab_id = 1 AND id = 2;
INSERT INTO researchers VALUES (2, 1, 'John Backus');
COMMIT;
SELECT name FROM researchers WHERE lab_id = 1 AND id = 2;
-- abort a modification
BEGIN;
DELETE FROM researchers WHERE lab_id = 1 AND id = 1;
ABORT;
SELECT name FROM researchers WHERE lab_id = 1 AND id = 1;
-- creating savepoints should work...
BEGIN;
INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie');
SAVEPOINT hire_thompson;
INSERT INTO researchers VALUES (6, 3, 'Ken Thompson');
COMMIT;
SELECT name FROM researchers WHERE lab_id = 3 AND id = 6;
-- even if created by PL/pgSQL...
BEGIN;
DO $$
BEGIN
INSERT INTO researchers VALUES (10, 10, 'Edsger Dijkstra');
EXCEPTION
WHEN not_null_violation THEN
RAISE NOTICE 'caught not_null_violation';
END $$;
COMMIT;
-- but rollback should not
BEGIN;
INSERT INTO researchers VALUES (7, 4, 'Jim Gray');
SAVEPOINT hire_engelbart;
INSERT INTO researchers VALUES (8, 4, 'Douglas Engelbart');
ROLLBACK TO hire_engelbart;
COMMIT;
SELECT name FROM researchers WHERE lab_id = 4;
BEGIN;
DO $$
BEGIN
INSERT INTO researchers VALUES (NULL, 10, 'Edsger Dijkstra');
EXCEPTION
WHEN not_null_violation THEN
RAISE NOTICE 'caught not_null_violation';
END $$;
COMMIT;
-- should be valid to edit labs after researchers...
BEGIN;
INSERT INTO researchers VALUES (8, 5, 'Douglas Engelbart');
INSERT INTO labs VALUES (5, 'Los Alamos');
COMMIT;
SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id;
-- but not the other way around (would require expanding xact participants)...
BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs');
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
COMMIT;
-- this logic even applies to router SELECTs occurring after a modification:
-- selecting from the modified node is fine...
BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs');
SELECT count(*) FROM researchers WHERE lab_id = 6;
ABORT;
-- but if a SELECT needs to go to new node, that's a problem...
BEGIN;
UPDATE pg_dist_shard_placement AS sp SET shardstate = 3
FROM pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND sp.nodename = 'localhost'
AND sp.nodeport = :worker_1_port
AND s.logicalrelid = 'researchers'::regclass;
INSERT INTO labs VALUES (6, 'Bell Labs');
SELECT count(*) FROM researchers WHERE lab_id = 6;
ABORT;
-- applies to DDL or COPY, too
BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs');
ALTER TABLE labs ADD COLUMN text motto;
COMMIT;
BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs');
\copy labs from stdin delimiter ','
10,Weyland-Yutani
\.
COMMIT;
-- though the copy will work if before any modifications
BEGIN;
\copy labs from stdin delimiter ','
10,Weyland-Yutani
\.
SELECT name FROM labs WHERE id = 10;
INSERT INTO labs VALUES (6, 'Bell Labs');
COMMIT;
-- now, for some special failures...
CREATE TABLE objects (
id bigint PRIMARY KEY,
name text NOT NULL
);
SELECT master_create_distributed_table('objects', 'id', 'hash');
SELECT master_create_worker_shards('objects', 1, 2);
-- test primary key violations
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (1, 'orange');
COMMIT;
-- data shouldn't have persisted...
SELECT * FROM objects WHERE id = 1;
-- and placements should still be healthy...
SELECT count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND sp.shardstate = 1
AND s.logicalrelid = 'objects'::regclass;
-- create trigger on one worker to reject certain values
\c - - - :worker_2_port
CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$
BEGIN
IF (NEW.name = 'BAD') THEN
RAISE 'illegal value';
END IF;
RETURN NEW;
END;
$rb$ LANGUAGE plpgsql;
CREATE CONSTRAINT TRIGGER reject_bad
AFTER INSERT ON objects_1200003
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
-- test partial failure; worker_1 succeeds, 2 fails
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
INSERT INTO labs VALUES (7, 'E Corp');
COMMIT;
-- data should be persisted
SELECT * FROM objects WHERE id = 2;
SELECT * FROM labs WHERE id = 7;
-- but one placement should be bad
SELECT count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND sp.nodename = 'localhost'
AND sp.nodeport = :worker_2_port
AND sp.shardstate = 3
AND s.logicalrelid = 'objects'::regclass;
DELETE FROM objects;
-- mark shards as healthy again; delete all data
UPDATE pg_dist_shard_placement AS sp SET shardstate = 1
FROM pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND s.logicalrelid = 'objects'::regclass;
-- what if there are errors on different shards at different times?
\c - - - :worker_1_port
CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$
BEGIN
IF (NEW.name = 'BAD') THEN
RAISE 'illegal value';
END IF;
RETURN NEW;
END;
$rb$ LANGUAGE plpgsql;
CREATE CONSTRAINT TRIGGER reject_bad
AFTER INSERT ON labs_1200002
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD');
COMMIT;
-- data should NOT be persisted
SELECT * FROM objects WHERE id = 1;
SELECT * FROM labs WHERE id = 8;
-- all placements should remain healthy
SELECT count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND sp.shardstate = 1
AND (s.logicalrelid = 'objects'::regclass OR
s.logicalrelid = 'labs'::regclass);
-- what if the failures happen at COMMIT time?
\c - - - :worker_2_port
DROP TRIGGER reject_bad ON objects_1200003;
CREATE CONSTRAINT TRIGGER reject_bad
AFTER INSERT ON objects_1200003
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
-- should be the same story as before, just at COMMIT time
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
INSERT INTO labs VALUES (9, 'Umbrella Corporation');
COMMIT;
-- data should be persisted
SELECT * FROM objects WHERE id = 2;
SELECT * FROM labs WHERE id = 7;
-- but one placement should be bad
SELECT count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND sp.nodename = 'localhost'
AND sp.nodeport = :worker_2_port
AND sp.shardstate = 3
AND s.logicalrelid = 'objects'::regclass;
DELETE FROM objects;
-- mark shards as healthy again; delete all data
UPDATE pg_dist_shard_placement AS sp SET shardstate = 1
FROM pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND s.logicalrelid = 'objects'::regclass;
-- what if all nodes have failures at COMMIT time?
\c - - - :worker_1_port
DROP TRIGGER reject_bad ON labs_1200002;
CREATE CONSTRAINT TRIGGER reject_bad
AFTER INSERT ON labs_1200002
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD');
COMMIT;
-- data should NOT be persisted
SELECT * FROM objects WHERE id = 1;
SELECT * FROM labs WHERE id = 8;
-- all placements should remain healthy
SELECT count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND sp.shardstate = 1
AND (s.logicalrelid = 'objects'::regclass OR
s.logicalrelid = 'labs'::regclass);
-- what if one shard (objects) succeeds but another (labs) completely fails?
\c - - - :worker_2_port
DROP TRIGGER reject_bad ON objects_1200003;
\c - - - :master_port
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD');
COMMIT;
-- data to objects should be persisted, but labs should not...
SELECT * FROM objects WHERE id = 1;
SELECT * FROM labs WHERE id = 8;
-- labs should be healthy, but one object placement shouldn't be
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp,
pg_dist_shard AS s
WHERE sp.shardid = s.shardid
AND (s.logicalrelid = 'objects'::regclass OR
s.logicalrelid = 'labs'::regclass)
GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate;
-- some append-partitioned tests for good measure
CREATE TABLE append_researchers ( LIKE researchers );
SELECT master_create_distributed_table('append_researchers', 'id', 'append');
SET citus.shard_replication_factor TO 1;
SELECT master_create_empty_shard('append_researchers') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 500000
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('append_researchers') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 500000, shardmaxvalue = 1000000
WHERE shardid = :new_shard_id;
SET citus.shard_replication_factor TO DEFAULT;
-- try single-shard INSERT
BEGIN;
INSERT INTO append_researchers VALUES (0, 0, 'John Backus');
COMMIT;
SELECT * FROM append_researchers WHERE id = 0;
-- try rollback
BEGIN;
DELETE FROM append_researchers WHERE id = 0;
ROLLBACK;
SELECT * FROM append_researchers WHERE id = 0;
-- try hitting shard on other node
BEGIN;
INSERT INTO append_researchers VALUES (1, 1, 'John McCarthy');
INSERT INTO append_researchers VALUES (500000, 500000, 'Tony Hoare');
ROLLBACK;
SELECT * FROM append_researchers;